1package tsdb 2 3import ( 4 "bytes" 5 "encoding/binary" 6 "errors" 7 "fmt" 8 "os" 9 "path/filepath" 10 "sort" 11 "sync" 12 13 "github.com/cespare/xxhash" 14 "github.com/influxdata/influxdb/models" 15 "github.com/influxdata/influxdb/pkg/binaryutil" 16 "go.uber.org/zap" 17 "golang.org/x/sync/errgroup" 18) 19 20var ( 21 ErrSeriesFileClosed = errors.New("tsdb: series file closed") 22 ErrInvalidSeriesPartitionID = errors.New("tsdb: invalid series partition id") 23) 24 25// SeriesIDSize is the size in bytes of a series key ID. 26const SeriesIDSize = 8 27 28const ( 29 // SeriesFilePartitionN is the number of partitions a series file is split into. 30 SeriesFilePartitionN = 8 31) 32 33// SeriesFile represents the section of the index that holds series data. 34type SeriesFile struct { 35 path string 36 partitions []*SeriesPartition 37 38 refs sync.RWMutex // RWMutex to track references to the SeriesFile that are in use. 39 40 Logger *zap.Logger 41} 42 43// NewSeriesFile returns a new instance of SeriesFile. 44func NewSeriesFile(path string) *SeriesFile { 45 return &SeriesFile{ 46 path: path, 47 Logger: zap.NewNop(), 48 } 49} 50 51// Open memory maps the data file at the file's path. 52func (f *SeriesFile) Open() error { 53 // Wait for all references to be released and prevent new ones from being acquired. 54 f.refs.Lock() 55 defer f.refs.Unlock() 56 57 // Create path if it doesn't exist. 58 if err := os.MkdirAll(filepath.Join(f.path), 0777); err != nil { 59 return err 60 } 61 62 // Open partitions. 63 f.partitions = make([]*SeriesPartition, 0, SeriesFilePartitionN) 64 for i := 0; i < SeriesFilePartitionN; i++ { 65 p := NewSeriesPartition(i, f.SeriesPartitionPath(i)) 66 p.Logger = f.Logger.With(zap.Int("partition", p.ID())) 67 if err := p.Open(); err != nil { 68 f.Logger.Error("Unable to open series file", 69 zap.String("path", f.path), 70 zap.Int("partition", p.ID()), 71 zap.Error(err)) 72 f.close() 73 return err 74 } 75 f.partitions = append(f.partitions, p) 76 } 77 78 return nil 79} 80 81func (f *SeriesFile) close() (err error) { 82 for _, p := range f.partitions { 83 if e := p.Close(); e != nil && err == nil { 84 err = e 85 } 86 } 87 88 return err 89} 90 91// Close unmaps the data file. 92func (f *SeriesFile) Close() (err error) { 93 f.refs.Lock() 94 defer f.refs.Unlock() 95 return f.close() 96} 97 98// Path returns the path to the file. 99func (f *SeriesFile) Path() string { return f.path } 100 101// SeriesPartitionPath returns the path to a given partition. 102func (f *SeriesFile) SeriesPartitionPath(i int) string { 103 return filepath.Join(f.path, fmt.Sprintf("%02x", i)) 104} 105 106// Partitions returns all partitions. 107func (f *SeriesFile) Partitions() []*SeriesPartition { return f.partitions } 108 109// Retain adds a reference count to the file. It returns a release func. 110func (f *SeriesFile) Retain() func() { 111 if f != nil { 112 f.refs.RLock() 113 114 // Return the RUnlock func as the release func to be called when done. 115 return f.refs.RUnlock 116 } 117 return nop 118} 119 120// EnableCompactions allows compactions to run. 121func (f *SeriesFile) EnableCompactions() { 122 for _, p := range f.partitions { 123 p.EnableCompactions() 124 } 125} 126 127// DisableCompactions prevents new compactions from running. 128func (f *SeriesFile) DisableCompactions() { 129 for _, p := range f.partitions { 130 p.DisableCompactions() 131 } 132} 133 134// Wait waits for all Retains to be released. 135func (f *SeriesFile) Wait() { 136 f.refs.Lock() 137 defer f.refs.Unlock() 138} 139 140// CreateSeriesListIfNotExists creates a list of series in bulk if they don't exist. 141// The returned ids slice returns IDs for every name+tags, creating new series IDs as needed. 142func (f *SeriesFile) CreateSeriesListIfNotExists(names [][]byte, tagsSlice []models.Tags) ([]uint64, error) { 143 keys := GenerateSeriesKeys(names, tagsSlice) 144 keyPartitionIDs := f.SeriesKeysPartitionIDs(keys) 145 ids := make([]uint64, len(keys)) 146 147 var g errgroup.Group 148 for i := range f.partitions { 149 p := f.partitions[i] 150 g.Go(func() error { 151 return p.CreateSeriesListIfNotExists(keys, keyPartitionIDs, ids) 152 }) 153 } 154 if err := g.Wait(); err != nil { 155 return nil, err 156 } 157 return ids, nil 158} 159 160// DeleteSeriesID flags a series as permanently deleted. 161// If the series is reintroduced later then it must create a new id. 162func (f *SeriesFile) DeleteSeriesID(id uint64) error { 163 p := f.SeriesIDPartition(id) 164 if p == nil { 165 return ErrInvalidSeriesPartitionID 166 } 167 return p.DeleteSeriesID(id) 168} 169 170// IsDeleted returns true if the ID has been deleted before. 171func (f *SeriesFile) IsDeleted(id uint64) bool { 172 p := f.SeriesIDPartition(id) 173 if p == nil { 174 return false 175 } 176 return p.IsDeleted(id) 177} 178 179// SeriesKey returns the series key for a given id. 180func (f *SeriesFile) SeriesKey(id uint64) []byte { 181 if id == 0 { 182 return nil 183 } 184 p := f.SeriesIDPartition(id) 185 if p == nil { 186 return nil 187 } 188 return p.SeriesKey(id) 189} 190 191// SeriesKeys returns a list of series keys from a list of ids. 192func (f *SeriesFile) SeriesKeys(ids []uint64) [][]byte { 193 keys := make([][]byte, len(ids)) 194 for i := range ids { 195 keys[i] = f.SeriesKey(ids[i]) 196 } 197 return keys 198} 199 200// Series returns the parsed series name and tags for an offset. 201func (f *SeriesFile) Series(id uint64) ([]byte, models.Tags) { 202 key := f.SeriesKey(id) 203 if key == nil { 204 return nil, nil 205 } 206 return ParseSeriesKey(key) 207} 208 209// SeriesID return the series id for the series. 210func (f *SeriesFile) SeriesID(name []byte, tags models.Tags, buf []byte) uint64 { 211 key := AppendSeriesKey(buf[:0], name, tags) 212 keyPartition := f.SeriesKeyPartition(key) 213 if keyPartition == nil { 214 return 0 215 } 216 return keyPartition.FindIDBySeriesKey(key) 217} 218 219// HasSeries return true if the series exists. 220func (f *SeriesFile) HasSeries(name []byte, tags models.Tags, buf []byte) bool { 221 return f.SeriesID(name, tags, buf) > 0 222} 223 224// SeriesCount returns the number of series. 225func (f *SeriesFile) SeriesCount() uint64 { 226 var n uint64 227 for _, p := range f.partitions { 228 n += p.SeriesCount() 229 } 230 return n 231} 232 233// SeriesIterator returns an iterator over all the series. 234func (f *SeriesFile) SeriesIDIterator() SeriesIDIterator { 235 var ids []uint64 236 for _, p := range f.partitions { 237 ids = p.AppendSeriesIDs(ids) 238 } 239 sort.Sort(uint64Slice(ids)) 240 return NewSeriesIDSliceIterator(ids) 241} 242 243func (f *SeriesFile) SeriesIDPartitionID(id uint64) int { 244 return int((id - 1) % SeriesFilePartitionN) 245} 246 247func (f *SeriesFile) SeriesIDPartition(id uint64) *SeriesPartition { 248 partitionID := f.SeriesIDPartitionID(id) 249 if partitionID >= len(f.partitions) { 250 return nil 251 } 252 return f.partitions[partitionID] 253} 254 255func (f *SeriesFile) SeriesKeysPartitionIDs(keys [][]byte) []int { 256 partitionIDs := make([]int, len(keys)) 257 for i := range keys { 258 partitionIDs[i] = f.SeriesKeyPartitionID(keys[i]) 259 } 260 return partitionIDs 261} 262 263func (f *SeriesFile) SeriesKeyPartitionID(key []byte) int { 264 return int(xxhash.Sum64(key) % SeriesFilePartitionN) 265} 266 267func (f *SeriesFile) SeriesKeyPartition(key []byte) *SeriesPartition { 268 partitionID := f.SeriesKeyPartitionID(key) 269 if partitionID >= len(f.partitions) { 270 return nil 271 } 272 return f.partitions[partitionID] 273} 274 275// AppendSeriesKey serializes name and tags to a byte slice. 276// The total length is prepended as a uvarint. 277func AppendSeriesKey(dst []byte, name []byte, tags models.Tags) []byte { 278 buf := make([]byte, binary.MaxVarintLen64) 279 origLen := len(dst) 280 281 // The tag count is variable encoded, so we need to know ahead of time what 282 // the size of the tag count value will be. 283 tcBuf := make([]byte, binary.MaxVarintLen64) 284 tcSz := binary.PutUvarint(tcBuf, uint64(len(tags))) 285 286 // Size of name/tags. Does not include total length. 287 size := 0 + // 288 2 + // size of measurement 289 len(name) + // measurement 290 tcSz + // size of number of tags 291 (4 * len(tags)) + // length of each tag key and value 292 tags.Size() // size of tag keys/values 293 294 // Variable encode length. 295 totalSz := binary.PutUvarint(buf, uint64(size)) 296 297 // If caller doesn't provide a buffer then pre-allocate an exact one. 298 if dst == nil { 299 dst = make([]byte, 0, size+totalSz) 300 } 301 302 // Append total length. 303 dst = append(dst, buf[:totalSz]...) 304 305 // Append name. 306 binary.BigEndian.PutUint16(buf, uint16(len(name))) 307 dst = append(dst, buf[:2]...) 308 dst = append(dst, name...) 309 310 // Append tag count. 311 dst = append(dst, tcBuf[:tcSz]...) 312 313 // Append tags. 314 for _, tag := range tags { 315 binary.BigEndian.PutUint16(buf, uint16(len(tag.Key))) 316 dst = append(dst, buf[:2]...) 317 dst = append(dst, tag.Key...) 318 319 binary.BigEndian.PutUint16(buf, uint16(len(tag.Value))) 320 dst = append(dst, buf[:2]...) 321 dst = append(dst, tag.Value...) 322 } 323 324 // Verify that the total length equals the encoded byte count. 325 if got, exp := len(dst)-origLen, size+totalSz; got != exp { 326 panic(fmt.Sprintf("series key encoding does not match calculated total length: actual=%d, exp=%d, key=%x", got, exp, dst)) 327 } 328 329 return dst 330} 331 332// ReadSeriesKey returns the series key from the beginning of the buffer. 333func ReadSeriesKey(data []byte) (key, remainder []byte) { 334 sz, n := binary.Uvarint(data) 335 return data[:int(sz)+n], data[int(sz)+n:] 336} 337 338func ReadSeriesKeyLen(data []byte) (sz int, remainder []byte) { 339 sz64, i := binary.Uvarint(data) 340 return int(sz64), data[i:] 341} 342 343func ReadSeriesKeyMeasurement(data []byte) (name, remainder []byte) { 344 n, data := binary.BigEndian.Uint16(data), data[2:] 345 return data[:n], data[n:] 346} 347 348func ReadSeriesKeyTagN(data []byte) (n int, remainder []byte) { 349 n64, i := binary.Uvarint(data) 350 return int(n64), data[i:] 351} 352 353func ReadSeriesKeyTag(data []byte) (key, value, remainder []byte) { 354 n, data := binary.BigEndian.Uint16(data), data[2:] 355 key, data = data[:n], data[n:] 356 357 n, data = binary.BigEndian.Uint16(data), data[2:] 358 value, data = data[:n], data[n:] 359 return key, value, data 360} 361 362// ParseSeriesKey extracts the name & tags from a series key. 363func ParseSeriesKey(data []byte) (name []byte, tags models.Tags) { 364 return parseSeriesKey(data, nil) 365} 366 367// ParseSeriesKeyInto extracts the name and tags for data, parsing the tags into 368// dstTags, which is then returened. 369// 370// The returned dstTags may have a different length and capacity. 371func ParseSeriesKeyInto(data []byte, dstTags models.Tags) ([]byte, models.Tags) { 372 return parseSeriesKey(data, dstTags) 373} 374 375// parseSeriesKey extracts the name and tags from data, attempting to re-use the 376// provided tags value rather than allocating. The returned tags may have a 377// different length and capacity to those provided. 378func parseSeriesKey(data []byte, dst models.Tags) ([]byte, models.Tags) { 379 var name []byte 380 _, data = ReadSeriesKeyLen(data) 381 name, data = ReadSeriesKeyMeasurement(data) 382 tagN, data := ReadSeriesKeyTagN(data) 383 384 dst = dst[:cap(dst)] // Grow dst to use full capacity 385 if got, want := len(dst), tagN; got < want { 386 dst = append(dst, make(models.Tags, want-got)...) 387 } else if got > want { 388 dst = dst[:want] 389 } 390 dst = dst[:tagN] 391 392 for i := 0; i < tagN; i++ { 393 var key, value []byte 394 key, value, data = ReadSeriesKeyTag(data) 395 dst[i].Key, dst[i].Value = key, value 396 } 397 398 return name, dst 399} 400 401func CompareSeriesKeys(a, b []byte) int { 402 // Handle 'nil' keys. 403 if len(a) == 0 && len(b) == 0 { 404 return 0 405 } else if len(a) == 0 { 406 return -1 407 } else if len(b) == 0 { 408 return 1 409 } 410 411 // Read total size. 412 _, a = ReadSeriesKeyLen(a) 413 _, b = ReadSeriesKeyLen(b) 414 415 // Read names. 416 name0, a := ReadSeriesKeyMeasurement(a) 417 name1, b := ReadSeriesKeyMeasurement(b) 418 419 // Compare names, return if not equal. 420 if cmp := bytes.Compare(name0, name1); cmp != 0 { 421 return cmp 422 } 423 424 // Read tag counts. 425 tagN0, a := ReadSeriesKeyTagN(a) 426 tagN1, b := ReadSeriesKeyTagN(b) 427 428 // Compare each tag in order. 429 for i := 0; ; i++ { 430 // Check for EOF. 431 if i == tagN0 && i == tagN1 { 432 return 0 433 } else if i == tagN0 { 434 return -1 435 } else if i == tagN1 { 436 return 1 437 } 438 439 // Read keys. 440 var key0, key1, value0, value1 []byte 441 key0, value0, a = ReadSeriesKeyTag(a) 442 key1, value1, b = ReadSeriesKeyTag(b) 443 444 // Compare keys & values. 445 if cmp := bytes.Compare(key0, key1); cmp != 0 { 446 return cmp 447 } else if cmp := bytes.Compare(value0, value1); cmp != 0 { 448 return cmp 449 } 450 } 451} 452 453// GenerateSeriesKeys generates series keys for a list of names & tags using 454// a single large memory block. 455func GenerateSeriesKeys(names [][]byte, tagsSlice []models.Tags) [][]byte { 456 buf := make([]byte, 0, SeriesKeysSize(names, tagsSlice)) 457 keys := make([][]byte, len(names)) 458 for i := range names { 459 offset := len(buf) 460 buf = AppendSeriesKey(buf, names[i], tagsSlice[i]) 461 keys[i] = buf[offset:] 462 } 463 return keys 464} 465 466// SeriesKeysSize returns the number of bytes required to encode a list of name/tags. 467func SeriesKeysSize(names [][]byte, tagsSlice []models.Tags) int { 468 var n int 469 for i := range names { 470 n += SeriesKeySize(names[i], tagsSlice[i]) 471 } 472 return n 473} 474 475// SeriesKeySize returns the number of bytes required to encode a series key. 476func SeriesKeySize(name []byte, tags models.Tags) int { 477 var n int 478 n += 2 + len(name) 479 n += binaryutil.UvarintSize(uint64(len(tags))) 480 for _, tag := range tags { 481 n += 2 + len(tag.Key) 482 n += 2 + len(tag.Value) 483 } 484 n += binaryutil.UvarintSize(uint64(n)) 485 return n 486} 487 488type seriesKeys [][]byte 489 490func (a seriesKeys) Len() int { return len(a) } 491func (a seriesKeys) Swap(i, j int) { a[i], a[j] = a[j], a[i] } 492func (a seriesKeys) Less(i, j int) bool { 493 return CompareSeriesKeys(a[i], a[j]) == -1 494} 495 496type uint64Slice []uint64 497 498func (a uint64Slice) Len() int { return len(a) } 499func (a uint64Slice) Swap(i, j int) { a[i], a[j] = a[j], a[i] } 500func (a uint64Slice) Less(i, j int) bool { return a[i] < a[j] } 501 502func nop() {} 503