1package tsdb 2 3import ( 4 "bytes" 5 "encoding/binary" 6 "errors" 7 "fmt" 8 "os" 9 "path/filepath" 10 "runtime" 11 "sort" 12 "sync" 13 14 "github.com/cespare/xxhash" 15 "github.com/influxdata/influxdb/models" 16 "github.com/influxdata/influxdb/pkg/binaryutil" 17 "github.com/influxdata/influxdb/pkg/limiter" 18 "go.uber.org/zap" 19 "golang.org/x/sync/errgroup" 20) 21 22var ( 23 ErrSeriesFileClosed = errors.New("tsdb: series file closed") 24 ErrInvalidSeriesPartitionID = errors.New("tsdb: invalid series partition id") 25) 26 27// SeriesIDSize is the size in bytes of a series key ID. 28const SeriesIDSize = 8 29 30const ( 31 // SeriesFilePartitionN is the number of partitions a series file is split into. 32 SeriesFilePartitionN = 8 33) 34 35// SeriesFile represents the section of the index that holds series data. 36type SeriesFile struct { 37 path string 38 partitions []*SeriesPartition 39 40 maxSnapshotConcurrency int 41 42 refs sync.RWMutex // RWMutex to track references to the SeriesFile that are in use. 43 44 Logger *zap.Logger 45} 46 47// NewSeriesFile returns a new instance of SeriesFile. 48func NewSeriesFile(path string) *SeriesFile { 49 maxSnapshotConcurrency := runtime.GOMAXPROCS(0) 50 if maxSnapshotConcurrency < 1 { 51 maxSnapshotConcurrency = 1 52 } 53 54 return &SeriesFile{ 55 path: path, 56 maxSnapshotConcurrency: maxSnapshotConcurrency, 57 Logger: zap.NewNop(), 58 } 59} 60 61func (f *SeriesFile) WithMaxCompactionConcurrency(maxCompactionConcurrency int) { 62 if maxCompactionConcurrency < 1 { 63 maxCompactionConcurrency = runtime.GOMAXPROCS(0) 64 if maxCompactionConcurrency < 1 { 65 maxCompactionConcurrency = 1 66 } 67 } 68 69 f.maxSnapshotConcurrency = maxCompactionConcurrency 70} 71 72// Open memory maps the data file at the file's path. 73func (f *SeriesFile) Open() error { 74 // Wait for all references to be released and prevent new ones from being acquired. 75 f.refs.Lock() 76 defer f.refs.Unlock() 77 78 // Create path if it doesn't exist. 79 if err := os.MkdirAll(filepath.Join(f.path), 0777); err != nil { 80 return err 81 } 82 83 // Limit concurrent series file compactions 84 compactionLimiter := limiter.NewFixed(f.maxSnapshotConcurrency) 85 86 // Open partitions. 87 f.partitions = make([]*SeriesPartition, 0, SeriesFilePartitionN) 88 for i := 0; i < SeriesFilePartitionN; i++ { 89 p := NewSeriesPartition(i, f.SeriesPartitionPath(i), compactionLimiter) 90 p.Logger = f.Logger.With(zap.Int("partition", p.ID())) 91 if err := p.Open(); err != nil { 92 f.Logger.Error("Unable to open series file", 93 zap.String("path", f.path), 94 zap.Int("partition", p.ID()), 95 zap.Error(err)) 96 f.close() 97 return err 98 } 99 f.partitions = append(f.partitions, p) 100 } 101 102 return nil 103} 104 105func (f *SeriesFile) close() (err error) { 106 for _, p := range f.partitions { 107 if e := p.Close(); e != nil && err == nil { 108 err = e 109 } 110 } 111 112 return err 113} 114 115// Close unmaps the data file. 116func (f *SeriesFile) Close() (err error) { 117 f.refs.Lock() 118 defer f.refs.Unlock() 119 return f.close() 120} 121 122// Path returns the path to the file. 123func (f *SeriesFile) Path() string { return f.path } 124 125// SeriesPartitionPath returns the path to a given partition. 126func (f *SeriesFile) SeriesPartitionPath(i int) string { 127 return filepath.Join(f.path, fmt.Sprintf("%02x", i)) 128} 129 130// Partitions returns all partitions. 131func (f *SeriesFile) Partitions() []*SeriesPartition { return f.partitions } 132 133// Retain adds a reference count to the file. It returns a release func. 134func (f *SeriesFile) Retain() func() { 135 if f != nil { 136 f.refs.RLock() 137 138 // Return the RUnlock func as the release func to be called when done. 139 return f.refs.RUnlock 140 } 141 return nop 142} 143 144// EnableCompactions allows compactions to run. 145func (f *SeriesFile) EnableCompactions() { 146 for _, p := range f.partitions { 147 p.EnableCompactions() 148 } 149} 150 151// DisableCompactions prevents new compactions from running. 152func (f *SeriesFile) DisableCompactions() { 153 for _, p := range f.partitions { 154 p.DisableCompactions() 155 } 156} 157 158// Wait waits for all Retains to be released. 159func (f *SeriesFile) Wait() { 160 f.refs.Lock() 161 defer f.refs.Unlock() 162} 163 164// FileSize returns the size of all partitions, in bytes. 165func (f *SeriesFile) FileSize() (n int64, err error) { 166 for _, p := range f.partitions { 167 v, err := p.FileSize() 168 n += v 169 if err != nil { 170 return n, err 171 } 172 } 173 return n, err 174} 175 176// CreateSeriesListIfNotExists creates a list of series in bulk if they don't exist. 177// The returned ids slice returns IDs for every name+tags, creating new series IDs as needed. 178func (f *SeriesFile) CreateSeriesListIfNotExists(names [][]byte, tagsSlice []models.Tags) ([]uint64, error) { 179 keys := GenerateSeriesKeys(names, tagsSlice) 180 keyPartitionIDs := f.SeriesKeysPartitionIDs(keys) 181 ids := make([]uint64, len(keys)) 182 183 var g errgroup.Group 184 for i := range f.partitions { 185 p := f.partitions[i] 186 g.Go(func() error { 187 return p.CreateSeriesListIfNotExists(keys, keyPartitionIDs, ids) 188 }) 189 } 190 if err := g.Wait(); err != nil { 191 return nil, err 192 } 193 return ids, nil 194} 195 196// DeleteSeriesID flags a series as permanently deleted. 197// If the series is reintroduced later then it must create a new id. 198func (f *SeriesFile) DeleteSeriesID(id uint64) error { 199 p := f.SeriesIDPartition(id) 200 if p == nil { 201 return ErrInvalidSeriesPartitionID 202 } 203 return p.DeleteSeriesID(id) 204} 205 206// IsDeleted returns true if the ID has been deleted before. 207func (f *SeriesFile) IsDeleted(id uint64) bool { 208 p := f.SeriesIDPartition(id) 209 if p == nil { 210 return false 211 } 212 return p.IsDeleted(id) 213} 214 215// SeriesKey returns the series key for a given id. 216func (f *SeriesFile) SeriesKey(id uint64) []byte { 217 if id == 0 { 218 return nil 219 } 220 p := f.SeriesIDPartition(id) 221 if p == nil { 222 return nil 223 } 224 return p.SeriesKey(id) 225} 226 227// SeriesKeys returns a list of series keys from a list of ids. 228func (f *SeriesFile) SeriesKeys(ids []uint64) [][]byte { 229 keys := make([][]byte, len(ids)) 230 for i := range ids { 231 keys[i] = f.SeriesKey(ids[i]) 232 } 233 return keys 234} 235 236// Series returns the parsed series name and tags for an offset. 237func (f *SeriesFile) Series(id uint64) ([]byte, models.Tags) { 238 key := f.SeriesKey(id) 239 if key == nil { 240 return nil, nil 241 } 242 return ParseSeriesKey(key) 243} 244 245// SeriesID return the series id for the series. 246func (f *SeriesFile) SeriesID(name []byte, tags models.Tags, buf []byte) uint64 { 247 key := AppendSeriesKey(buf[:0], name, tags) 248 keyPartition := f.SeriesKeyPartition(key) 249 if keyPartition == nil { 250 return 0 251 } 252 return keyPartition.FindIDBySeriesKey(key) 253} 254 255// HasSeries return true if the series exists. 256func (f *SeriesFile) HasSeries(name []byte, tags models.Tags, buf []byte) bool { 257 return f.SeriesID(name, tags, buf) > 0 258} 259 260// SeriesCount returns the number of series. 261func (f *SeriesFile) SeriesCount() uint64 { 262 var n uint64 263 for _, p := range f.partitions { 264 n += p.SeriesCount() 265 } 266 return n 267} 268 269// SeriesIterator returns an iterator over all the series. 270func (f *SeriesFile) SeriesIDIterator() SeriesIDIterator { 271 var ids []uint64 272 for _, p := range f.partitions { 273 ids = p.AppendSeriesIDs(ids) 274 } 275 sort.Sort(uint64Slice(ids)) 276 return NewSeriesIDSliceIterator(ids) 277} 278 279func (f *SeriesFile) SeriesIDPartitionID(id uint64) int { 280 return int((id - 1) % SeriesFilePartitionN) 281} 282 283func (f *SeriesFile) SeriesIDPartition(id uint64) *SeriesPartition { 284 partitionID := f.SeriesIDPartitionID(id) 285 if partitionID >= len(f.partitions) { 286 return nil 287 } 288 return f.partitions[partitionID] 289} 290 291func (f *SeriesFile) SeriesKeysPartitionIDs(keys [][]byte) []int { 292 partitionIDs := make([]int, len(keys)) 293 for i := range keys { 294 partitionIDs[i] = f.SeriesKeyPartitionID(keys[i]) 295 } 296 return partitionIDs 297} 298 299func (f *SeriesFile) SeriesKeyPartitionID(key []byte) int { 300 return int(xxhash.Sum64(key) % SeriesFilePartitionN) 301} 302 303func (f *SeriesFile) SeriesKeyPartition(key []byte) *SeriesPartition { 304 partitionID := f.SeriesKeyPartitionID(key) 305 if partitionID >= len(f.partitions) { 306 return nil 307 } 308 return f.partitions[partitionID] 309} 310 311// AppendSeriesKey serializes name and tags to a byte slice. 312// The total length is prepended as a uvarint. 313func AppendSeriesKey(dst []byte, name []byte, tags models.Tags) []byte { 314 buf := make([]byte, binary.MaxVarintLen64) 315 origLen := len(dst) 316 317 // The tag count is variable encoded, so we need to know ahead of time what 318 // the size of the tag count value will be. 319 tcBuf := make([]byte, binary.MaxVarintLen64) 320 tcSz := binary.PutUvarint(tcBuf, uint64(len(tags))) 321 322 // Size of name/tags. Does not include total length. 323 size := 0 + // 324 2 + // size of measurement 325 len(name) + // measurement 326 tcSz + // size of number of tags 327 (4 * len(tags)) + // length of each tag key and value 328 tags.Size() // size of tag keys/values 329 330 // Variable encode length. 331 totalSz := binary.PutUvarint(buf, uint64(size)) 332 333 // If caller doesn't provide a buffer then pre-allocate an exact one. 334 if dst == nil { 335 dst = make([]byte, 0, size+totalSz) 336 } 337 338 // Append total length. 339 dst = append(dst, buf[:totalSz]...) 340 341 // Append name. 342 binary.BigEndian.PutUint16(buf, uint16(len(name))) 343 dst = append(dst, buf[:2]...) 344 dst = append(dst, name...) 345 346 // Append tag count. 347 dst = append(dst, tcBuf[:tcSz]...) 348 349 // Append tags. 350 for _, tag := range tags { 351 binary.BigEndian.PutUint16(buf, uint16(len(tag.Key))) 352 dst = append(dst, buf[:2]...) 353 dst = append(dst, tag.Key...) 354 355 binary.BigEndian.PutUint16(buf, uint16(len(tag.Value))) 356 dst = append(dst, buf[:2]...) 357 dst = append(dst, tag.Value...) 358 } 359 360 // Verify that the total length equals the encoded byte count. 361 if got, exp := len(dst)-origLen, size+totalSz; got != exp { 362 panic(fmt.Sprintf("series key encoding does not match calculated total length: actual=%d, exp=%d, key=%x", got, exp, dst)) 363 } 364 365 return dst 366} 367 368// ReadSeriesKey returns the series key from the beginning of the buffer. 369func ReadSeriesKey(data []byte) (key, remainder []byte) { 370 sz, n := binary.Uvarint(data) 371 return data[:int(sz)+n], data[int(sz)+n:] 372} 373 374func ReadSeriesKeyLen(data []byte) (sz int, remainder []byte) { 375 sz64, i := binary.Uvarint(data) 376 return int(sz64), data[i:] 377} 378 379func ReadSeriesKeyMeasurement(data []byte) (name, remainder []byte) { 380 n, data := binary.BigEndian.Uint16(data), data[2:] 381 return data[:n], data[n:] 382} 383 384func ReadSeriesKeyTagN(data []byte) (n int, remainder []byte) { 385 n64, i := binary.Uvarint(data) 386 return int(n64), data[i:] 387} 388 389func ReadSeriesKeyTag(data []byte) (key, value, remainder []byte) { 390 n, data := binary.BigEndian.Uint16(data), data[2:] 391 key, data = data[:n], data[n:] 392 393 n, data = binary.BigEndian.Uint16(data), data[2:] 394 value, data = data[:n], data[n:] 395 return key, value, data 396} 397 398// ParseSeriesKey extracts the name & tags from a series key. 399func ParseSeriesKey(data []byte) (name []byte, tags models.Tags) { 400 return parseSeriesKey(data, nil) 401} 402 403// ParseSeriesKeyInto extracts the name and tags for data, parsing the tags into 404// dstTags, which is then returened. 405// 406// The returned dstTags may have a different length and capacity. 407func ParseSeriesKeyInto(data []byte, dstTags models.Tags) ([]byte, models.Tags) { 408 return parseSeriesKey(data, dstTags) 409} 410 411// parseSeriesKey extracts the name and tags from data, attempting to re-use the 412// provided tags value rather than allocating. The returned tags may have a 413// different length and capacity to those provided. 414func parseSeriesKey(data []byte, dst models.Tags) ([]byte, models.Tags) { 415 var name []byte 416 _, data = ReadSeriesKeyLen(data) 417 name, data = ReadSeriesKeyMeasurement(data) 418 tagN, data := ReadSeriesKeyTagN(data) 419 420 dst = dst[:cap(dst)] // Grow dst to use full capacity 421 if got, want := len(dst), tagN; got < want { 422 dst = append(dst, make(models.Tags, want-got)...) 423 } else if got > want { 424 dst = dst[:want] 425 } 426 dst = dst[:tagN] 427 428 for i := 0; i < tagN; i++ { 429 var key, value []byte 430 key, value, data = ReadSeriesKeyTag(data) 431 dst[i].Key, dst[i].Value = key, value 432 } 433 434 return name, dst 435} 436 437func CompareSeriesKeys(a, b []byte) int { 438 // Handle 'nil' keys. 439 if len(a) == 0 && len(b) == 0 { 440 return 0 441 } else if len(a) == 0 { 442 return -1 443 } else if len(b) == 0 { 444 return 1 445 } 446 447 // Read total size. 448 _, a = ReadSeriesKeyLen(a) 449 _, b = ReadSeriesKeyLen(b) 450 451 // Read names. 452 name0, a := ReadSeriesKeyMeasurement(a) 453 name1, b := ReadSeriesKeyMeasurement(b) 454 455 // Compare names, return if not equal. 456 if cmp := bytes.Compare(name0, name1); cmp != 0 { 457 return cmp 458 } 459 460 // Read tag counts. 461 tagN0, a := ReadSeriesKeyTagN(a) 462 tagN1, b := ReadSeriesKeyTagN(b) 463 464 // Compare each tag in order. 465 for i := 0; ; i++ { 466 // Check for EOF. 467 if i == tagN0 && i == tagN1 { 468 return 0 469 } else if i == tagN0 { 470 return -1 471 } else if i == tagN1 { 472 return 1 473 } 474 475 // Read keys. 476 var key0, key1, value0, value1 []byte 477 key0, value0, a = ReadSeriesKeyTag(a) 478 key1, value1, b = ReadSeriesKeyTag(b) 479 480 // Compare keys & values. 481 if cmp := bytes.Compare(key0, key1); cmp != 0 { 482 return cmp 483 } else if cmp := bytes.Compare(value0, value1); cmp != 0 { 484 return cmp 485 } 486 } 487} 488 489// GenerateSeriesKeys generates series keys for a list of names & tags using 490// a single large memory block. 491func GenerateSeriesKeys(names [][]byte, tagsSlice []models.Tags) [][]byte { 492 buf := make([]byte, 0, SeriesKeysSize(names, tagsSlice)) 493 keys := make([][]byte, len(names)) 494 for i := range names { 495 offset := len(buf) 496 buf = AppendSeriesKey(buf, names[i], tagsSlice[i]) 497 keys[i] = buf[offset:] 498 } 499 return keys 500} 501 502// SeriesKeysSize returns the number of bytes required to encode a list of name/tags. 503func SeriesKeysSize(names [][]byte, tagsSlice []models.Tags) int { 504 var n int 505 for i := range names { 506 n += SeriesKeySize(names[i], tagsSlice[i]) 507 } 508 return n 509} 510 511// SeriesKeySize returns the number of bytes required to encode a series key. 512func SeriesKeySize(name []byte, tags models.Tags) int { 513 var n int 514 n += 2 + len(name) 515 n += binaryutil.UvarintSize(uint64(len(tags))) 516 for _, tag := range tags { 517 n += 2 + len(tag.Key) 518 n += 2 + len(tag.Value) 519 } 520 n += binaryutil.UvarintSize(uint64(n)) 521 return n 522} 523 524type seriesKeys [][]byte 525 526func (a seriesKeys) Len() int { return len(a) } 527func (a seriesKeys) Swap(i, j int) { a[i], a[j] = a[j], a[i] } 528func (a seriesKeys) Less(i, j int) bool { 529 return CompareSeriesKeys(a[i], a[j]) == -1 530} 531 532type uint64Slice []uint64 533 534func (a uint64Slice) Len() int { return len(a) } 535func (a uint64Slice) Swap(i, j int) { a[i], a[j] = a[j], a[i] } 536func (a uint64Slice) Less(i, j int) bool { return a[i] < a[j] } 537 538func nop() {} 539