1package tsi1 2 3import ( 4 "bufio" 5 "bytes" 6 "encoding/binary" 7 "errors" 8 "fmt" 9 "hash/crc32" 10 "io" 11 "os" 12 "sort" 13 "sync" 14 "time" 15 "unsafe" 16 17 "github.com/influxdata/influxdb/models" 18 "github.com/influxdata/influxdb/pkg/bloom" 19 "github.com/influxdata/influxdb/pkg/estimator" 20 "github.com/influxdata/influxdb/pkg/estimator/hll" 21 "github.com/influxdata/influxdb/pkg/mmap" 22 "github.com/influxdata/influxdb/tsdb" 23) 24 25// Log errors. 26var ( 27 ErrLogEntryChecksumMismatch = errors.New("log entry checksum mismatch") 28) 29 30// Log entry flag constants. 31const ( 32 LogEntrySeriesTombstoneFlag = 0x01 33 LogEntryMeasurementTombstoneFlag = 0x02 34 LogEntryTagKeyTombstoneFlag = 0x04 35 LogEntryTagValueTombstoneFlag = 0x08 36) 37 38// defaultLogFileBufferSize describes the size of the buffer that the LogFile's buffered 39// writer uses. If the LogFile does not have an explicit buffer size set then 40// this is the size of the buffer; it is equal to the default buffer size used 41// by a bufio.Writer. 42const defaultLogFileBufferSize = 4096 43 44// indexFileBufferSize is the buffer size used when compacting the LogFile down 45// into a .tsi file. 46const indexFileBufferSize = 1 << 17 // 128K 47 48// LogFile represents an on-disk write-ahead log file. 49type LogFile struct { 50 mu sync.RWMutex 51 wg sync.WaitGroup // ref count 52 id int // file sequence identifier 53 data []byte // mmap 54 file *os.File // writer 55 w *bufio.Writer // buffered writer 56 bufferSize int // The size of the buffer used by the buffered writer 57 nosync bool // Disables buffer flushing and file syncing. Useful for offline tooling. 58 buf []byte // marshaling buffer 59 keyBuf []byte 60 61 sfile *tsdb.SeriesFile // series lookup 62 size int64 // tracks current file size 63 modTime time.Time // tracks last time write occurred 64 65 // In-memory series existence/tombstone sets. 66 seriesIDSet, tombstoneSeriesIDSet *tsdb.SeriesIDSet 67 68 // In-memory index. 69 mms logMeasurements 70 71 // Filepath to the log file. 72 path string 73} 74 75// NewLogFile returns a new instance of LogFile. 76func NewLogFile(sfile *tsdb.SeriesFile, path string) *LogFile { 77 return &LogFile{ 78 sfile: sfile, 79 path: path, 80 mms: make(logMeasurements), 81 82 seriesIDSet: tsdb.NewSeriesIDSet(), 83 tombstoneSeriesIDSet: tsdb.NewSeriesIDSet(), 84 } 85} 86 87// bytes estimates the memory footprint of this LogFile, in bytes. 88func (f *LogFile) bytes() int { 89 var b int 90 b += 24 // mu RWMutex is 24 bytes 91 b += 16 // wg WaitGroup is 16 bytes 92 b += int(unsafe.Sizeof(f.id)) 93 // Do not include f.data because it is mmap'd 94 // TODO(jacobmarble): Uncomment when we are using go >= 1.10.0 95 //b += int(unsafe.Sizeof(f.w)) + f.w.Size() 96 b += int(unsafe.Sizeof(f.buf)) + len(f.buf) 97 b += int(unsafe.Sizeof(f.keyBuf)) + len(f.keyBuf) 98 // Do not count SeriesFile because it belongs to the code that constructed this Index. 99 b += int(unsafe.Sizeof(f.size)) 100 b += int(unsafe.Sizeof(f.modTime)) 101 b += int(unsafe.Sizeof(f.seriesIDSet)) + f.seriesIDSet.Bytes() 102 b += int(unsafe.Sizeof(f.tombstoneSeriesIDSet)) + f.tombstoneSeriesIDSet.Bytes() 103 b += int(unsafe.Sizeof(f.mms)) + f.mms.bytes() 104 b += int(unsafe.Sizeof(f.path)) + len(f.path) 105 return b 106} 107 108// Open reads the log from a file and validates all the checksums. 109func (f *LogFile) Open() error { 110 if err := f.open(); err != nil { 111 f.Close() 112 return err 113 } 114 return nil 115} 116 117func (f *LogFile) open() error { 118 f.id, _ = ParseFilename(f.path) 119 120 // Open file for appending. 121 file, err := os.OpenFile(f.Path(), os.O_WRONLY|os.O_CREATE, 0666) 122 if err != nil { 123 return err 124 } 125 f.file = file 126 127 if f.bufferSize == 0 { 128 f.bufferSize = defaultLogFileBufferSize 129 } 130 f.w = bufio.NewWriterSize(f.file, f.bufferSize) 131 132 // Finish opening if file is empty. 133 fi, err := file.Stat() 134 if err != nil { 135 return err 136 } else if fi.Size() == 0 { 137 return nil 138 } 139 f.size = fi.Size() 140 f.modTime = fi.ModTime() 141 142 // Open a read-only memory map of the existing data. 143 data, err := mmap.Map(f.Path(), 0) 144 if err != nil { 145 return err 146 } 147 f.data = data 148 149 // Read log entries from mmap. 150 var n int64 151 for buf := f.data; len(buf) > 0; { 152 // Read next entry. Truncate partial writes. 153 var e LogEntry 154 if err := e.UnmarshalBinary(buf); err == io.ErrShortBuffer || err == ErrLogEntryChecksumMismatch { 155 break 156 } else if err != nil { 157 return err 158 } 159 160 // Execute entry against in-memory index. 161 f.execEntry(&e) 162 163 // Move buffer forward. 164 n += int64(e.Size) 165 buf = buf[e.Size:] 166 } 167 168 // Move to the end of the file. 169 f.size = n 170 _, err = file.Seek(n, io.SeekStart) 171 return err 172} 173 174// Close shuts down the file handle and mmap. 175func (f *LogFile) Close() error { 176 // Wait until the file has no more references. 177 f.wg.Wait() 178 179 if f.w != nil { 180 f.w.Flush() 181 f.w = nil 182 } 183 184 if f.file != nil { 185 f.file.Close() 186 f.file = nil 187 } 188 189 if f.data != nil { 190 mmap.Unmap(f.data) 191 } 192 193 f.mms = make(logMeasurements) 194 return nil 195} 196 197// FlushAndSync flushes buffered data to disk and then fsyncs the underlying file. 198// If the LogFile has disabled flushing and syncing then FlushAndSync is a no-op. 199func (f *LogFile) FlushAndSync() error { 200 if f.nosync { 201 return nil 202 } 203 204 if f.w != nil { 205 if err := f.w.Flush(); err != nil { 206 return err 207 } 208 } 209 210 if f.file == nil { 211 return nil 212 } 213 return f.file.Sync() 214} 215 216// ID returns the file sequence identifier. 217func (f *LogFile) ID() int { return f.id } 218 219// Path returns the file path. 220func (f *LogFile) Path() string { return f.path } 221 222// SetPath sets the log file's path. 223func (f *LogFile) SetPath(path string) { f.path = path } 224 225// Level returns the log level of the file. 226func (f *LogFile) Level() int { return 0 } 227 228// Filter returns the bloom filter for the file. 229func (f *LogFile) Filter() *bloom.Filter { return nil } 230 231// Retain adds a reference count to the file. 232func (f *LogFile) Retain() { f.wg.Add(1) } 233 234// Release removes a reference count from the file. 235func (f *LogFile) Release() { f.wg.Done() } 236 237// Stat returns size and last modification time of the file. 238func (f *LogFile) Stat() (int64, time.Time) { 239 f.mu.RLock() 240 size, modTime := f.size, f.modTime 241 f.mu.RUnlock() 242 return size, modTime 243} 244 245// SeriesIDSet returns the series existence set. 246func (f *LogFile) SeriesIDSet() (*tsdb.SeriesIDSet, error) { 247 return f.seriesIDSet, nil 248} 249 250// TombstoneSeriesIDSet returns the series tombstone set. 251func (f *LogFile) TombstoneSeriesIDSet() (*tsdb.SeriesIDSet, error) { 252 return f.tombstoneSeriesIDSet, nil 253} 254 255// Size returns the size of the file, in bytes. 256func (f *LogFile) Size() int64 { 257 f.mu.RLock() 258 v := f.size 259 f.mu.RUnlock() 260 return v 261} 262 263// Measurement returns a measurement element. 264func (f *LogFile) Measurement(name []byte) MeasurementElem { 265 f.mu.RLock() 266 defer f.mu.RUnlock() 267 268 mm, ok := f.mms[string(name)] 269 if !ok { 270 return nil 271 } 272 273 return mm 274} 275 276func (f *LogFile) MeasurementHasSeries(ss *tsdb.SeriesIDSet, name []byte) bool { 277 f.mu.RLock() 278 defer f.mu.RUnlock() 279 280 mm, ok := f.mms[string(name)] 281 if !ok { 282 return false 283 } 284 285 // TODO(edd): if mm is using a seriesSet then this could be changed to do a fast intersection. 286 for _, id := range mm.seriesIDs() { 287 if ss.Contains(id) { 288 return true 289 } 290 } 291 return false 292} 293 294// MeasurementNames returns an ordered list of measurement names. 295func (f *LogFile) MeasurementNames() []string { 296 f.mu.RLock() 297 defer f.mu.RUnlock() 298 return f.measurementNames() 299} 300 301func (f *LogFile) measurementNames() []string { 302 a := make([]string, 0, len(f.mms)) 303 for name := range f.mms { 304 a = append(a, name) 305 } 306 sort.Strings(a) 307 return a 308} 309 310// DeleteMeasurement adds a tombstone for a measurement to the log file. 311func (f *LogFile) DeleteMeasurement(name []byte) error { 312 f.mu.Lock() 313 defer f.mu.Unlock() 314 315 e := LogEntry{Flag: LogEntryMeasurementTombstoneFlag, Name: name} 316 if err := f.appendEntry(&e); err != nil { 317 return err 318 } 319 f.execEntry(&e) 320 321 // Flush buffer and sync to disk. 322 return f.FlushAndSync() 323} 324 325// TagKeySeriesIDIterator returns a series iterator for a tag key. 326func (f *LogFile) TagKeySeriesIDIterator(name, key []byte) tsdb.SeriesIDIterator { 327 f.mu.RLock() 328 defer f.mu.RUnlock() 329 330 mm, ok := f.mms[string(name)] 331 if !ok { 332 return nil 333 } 334 335 tk, ok := mm.tagSet[string(key)] 336 if !ok { 337 return nil 338 } 339 340 // Combine iterators across all tag keys. 341 itrs := make([]tsdb.SeriesIDIterator, 0, len(tk.tagValues)) 342 for _, tv := range tk.tagValues { 343 if tv.cardinality() == 0 { 344 continue 345 } 346 itrs = append(itrs, tsdb.NewSeriesIDSetIterator(tv.seriesIDSet())) 347 } 348 349 return tsdb.MergeSeriesIDIterators(itrs...) 350} 351 352// TagKeyIterator returns a value iterator for a measurement. 353func (f *LogFile) TagKeyIterator(name []byte) TagKeyIterator { 354 f.mu.RLock() 355 defer f.mu.RUnlock() 356 357 mm, ok := f.mms[string(name)] 358 if !ok { 359 return nil 360 } 361 362 a := make([]logTagKey, 0, len(mm.tagSet)) 363 for _, k := range mm.tagSet { 364 a = append(a, k) 365 } 366 return newLogTagKeyIterator(a) 367} 368 369// TagKey returns a tag key element. 370func (f *LogFile) TagKey(name, key []byte) TagKeyElem { 371 f.mu.RLock() 372 defer f.mu.RUnlock() 373 374 mm, ok := f.mms[string(name)] 375 if !ok { 376 return nil 377 } 378 379 tk, ok := mm.tagSet[string(key)] 380 if !ok { 381 return nil 382 } 383 384 return &tk 385} 386 387// TagValue returns a tag value element. 388func (f *LogFile) TagValue(name, key, value []byte) TagValueElem { 389 f.mu.RLock() 390 defer f.mu.RUnlock() 391 392 mm, ok := f.mms[string(name)] 393 if !ok { 394 return nil 395 } 396 397 tk, ok := mm.tagSet[string(key)] 398 if !ok { 399 return nil 400 } 401 402 tv, ok := tk.tagValues[string(value)] 403 if !ok { 404 return nil 405 } 406 407 return &tv 408} 409 410// TagValueIterator returns a value iterator for a tag key. 411func (f *LogFile) TagValueIterator(name, key []byte) TagValueIterator { 412 f.mu.RLock() 413 defer f.mu.RUnlock() 414 415 mm, ok := f.mms[string(name)] 416 if !ok { 417 return nil 418 } 419 420 tk, ok := mm.tagSet[string(key)] 421 if !ok { 422 return nil 423 } 424 return tk.TagValueIterator() 425} 426 427// DeleteTagKey adds a tombstone for a tag key to the log file. 428func (f *LogFile) DeleteTagKey(name, key []byte) error { 429 f.mu.Lock() 430 defer f.mu.Unlock() 431 432 e := LogEntry{Flag: LogEntryTagKeyTombstoneFlag, Name: name, Key: key} 433 if err := f.appendEntry(&e); err != nil { 434 return err 435 } 436 f.execEntry(&e) 437 438 // Flush buffer and sync to disk. 439 return f.FlushAndSync() 440} 441 442// TagValueSeriesIDIterator returns a series iterator for a tag value. 443func (f *LogFile) TagValueSeriesIDIterator(name, key, value []byte) tsdb.SeriesIDIterator { 444 f.mu.RLock() 445 defer f.mu.RUnlock() 446 447 mm, ok := f.mms[string(name)] 448 if !ok { 449 return nil 450 } 451 452 tk, ok := mm.tagSet[string(key)] 453 if !ok { 454 return nil 455 } 456 457 tv, ok := tk.tagValues[string(value)] 458 if !ok { 459 return nil 460 } else if tv.cardinality() == 0 { 461 return nil 462 } 463 464 return tsdb.NewSeriesIDSetIterator(tv.seriesIDSet()) 465} 466 467// MeasurementN returns the total number of measurements. 468func (f *LogFile) MeasurementN() (n uint64) { 469 f.mu.RLock() 470 defer f.mu.RUnlock() 471 return uint64(len(f.mms)) 472} 473 474// TagKeyN returns the total number of keys. 475func (f *LogFile) TagKeyN() (n uint64) { 476 f.mu.RLock() 477 defer f.mu.RUnlock() 478 for _, mm := range f.mms { 479 n += uint64(len(mm.tagSet)) 480 } 481 return n 482} 483 484// TagValueN returns the total number of values. 485func (f *LogFile) TagValueN() (n uint64) { 486 f.mu.RLock() 487 defer f.mu.RUnlock() 488 for _, mm := range f.mms { 489 for _, k := range mm.tagSet { 490 n += uint64(len(k.tagValues)) 491 } 492 } 493 return n 494} 495 496// DeleteTagValue adds a tombstone for a tag value to the log file. 497func (f *LogFile) DeleteTagValue(name, key, value []byte) error { 498 f.mu.Lock() 499 defer f.mu.Unlock() 500 501 e := LogEntry{Flag: LogEntryTagValueTombstoneFlag, Name: name, Key: key, Value: value} 502 if err := f.appendEntry(&e); err != nil { 503 return err 504 } 505 f.execEntry(&e) 506 507 // Flush buffer and sync to disk. 508 return f.FlushAndSync() 509} 510 511// AddSeriesList adds a list of series to the log file in bulk. 512func (f *LogFile) AddSeriesList(seriesSet *tsdb.SeriesIDSet, names [][]byte, tagsSlice []models.Tags) error { 513 seriesIDs, err := f.sfile.CreateSeriesListIfNotExists(names, tagsSlice) 514 if err != nil { 515 return err 516 } 517 518 var writeRequired bool 519 entries := make([]LogEntry, 0, len(names)) 520 seriesSet.RLock() 521 for i := range names { 522 if seriesSet.ContainsNoLock(seriesIDs[i]) { 523 // We don't need to allocate anything for this series. 524 continue 525 } 526 writeRequired = true 527 entries = append(entries, LogEntry{SeriesID: seriesIDs[i], name: names[i], tags: tagsSlice[i], cached: true}) 528 } 529 seriesSet.RUnlock() 530 531 // Exit if all series already exist. 532 if !writeRequired { 533 return nil 534 } 535 536 f.mu.Lock() 537 defer f.mu.Unlock() 538 539 seriesSet.Lock() 540 defer seriesSet.Unlock() 541 542 for i := range entries { 543 entry := &entries[i] 544 if seriesSet.ContainsNoLock(entry.SeriesID) { 545 // We don't need to allocate anything for this series. 546 continue 547 } 548 if err := f.appendEntry(entry); err != nil { 549 return err 550 } 551 f.execEntry(entry) 552 seriesSet.AddNoLock(entry.SeriesID) 553 } 554 555 // Flush buffer and sync to disk. 556 return f.FlushAndSync() 557} 558 559// DeleteSeriesID adds a tombstone for a series id. 560func (f *LogFile) DeleteSeriesID(id uint64) error { 561 f.mu.Lock() 562 defer f.mu.Unlock() 563 564 e := LogEntry{Flag: LogEntrySeriesTombstoneFlag, SeriesID: id} 565 if err := f.appendEntry(&e); err != nil { 566 return err 567 } 568 f.execEntry(&e) 569 570 // Flush buffer and sync to disk. 571 return f.FlushAndSync() 572} 573 574// SeriesN returns the total number of series in the file. 575func (f *LogFile) SeriesN() (n uint64) { 576 f.mu.RLock() 577 defer f.mu.RUnlock() 578 579 for _, mm := range f.mms { 580 n += uint64(mm.cardinality()) 581 } 582 return n 583} 584 585// appendEntry adds a log entry to the end of the file. 586func (f *LogFile) appendEntry(e *LogEntry) error { 587 // Marshal entry to the local buffer. 588 f.buf = appendLogEntry(f.buf[:0], e) 589 590 // Save the size of the record. 591 e.Size = len(f.buf) 592 593 // Write record to file. 594 n, err := f.w.Write(f.buf) 595 if err != nil { 596 // Move position backwards over partial entry. 597 // Log should be reopened if seeking cannot be completed. 598 if n > 0 { 599 f.w.Reset(f.file) 600 if _, err := f.file.Seek(int64(-n), io.SeekCurrent); err != nil { 601 f.Close() 602 } 603 } 604 return err 605 } 606 607 // Update in-memory file size & modification time. 608 f.size += int64(n) 609 f.modTime = time.Now() 610 611 return nil 612} 613 614// execEntry executes a log entry against the in-memory index. 615// This is done after appending and on replay of the log. 616func (f *LogFile) execEntry(e *LogEntry) { 617 switch e.Flag { 618 case LogEntryMeasurementTombstoneFlag: 619 f.execDeleteMeasurementEntry(e) 620 case LogEntryTagKeyTombstoneFlag: 621 f.execDeleteTagKeyEntry(e) 622 case LogEntryTagValueTombstoneFlag: 623 f.execDeleteTagValueEntry(e) 624 default: 625 f.execSeriesEntry(e) 626 } 627} 628 629func (f *LogFile) execDeleteMeasurementEntry(e *LogEntry) { 630 mm := f.createMeasurementIfNotExists(e.Name) 631 mm.deleted = true 632 mm.tagSet = make(map[string]logTagKey) 633 mm.series = make(map[uint64]struct{}) 634 mm.seriesSet = nil 635} 636 637func (f *LogFile) execDeleteTagKeyEntry(e *LogEntry) { 638 mm := f.createMeasurementIfNotExists(e.Name) 639 ts := mm.createTagSetIfNotExists(e.Key) 640 641 ts.deleted = true 642 643 mm.tagSet[string(e.Key)] = ts 644} 645 646func (f *LogFile) execDeleteTagValueEntry(e *LogEntry) { 647 mm := f.createMeasurementIfNotExists(e.Name) 648 ts := mm.createTagSetIfNotExists(e.Key) 649 tv := ts.createTagValueIfNotExists(e.Value) 650 651 tv.deleted = true 652 653 ts.tagValues[string(e.Value)] = tv 654 mm.tagSet[string(e.Key)] = ts 655} 656 657func (f *LogFile) execSeriesEntry(e *LogEntry) { 658 var seriesKey []byte 659 if e.cached { 660 sz := tsdb.SeriesKeySize(e.name, e.tags) 661 if len(f.keyBuf) < sz { 662 f.keyBuf = make([]byte, 0, sz) 663 } 664 seriesKey = tsdb.AppendSeriesKey(f.keyBuf[:0], e.name, e.tags) 665 } else { 666 seriesKey = f.sfile.SeriesKey(e.SeriesID) 667 } 668 669 // Series keys can be removed if the series has been deleted from 670 // the entire database and the server is restarted. This would cause 671 // the log to replay its insert but the key cannot be found. 672 // 673 // https://github.com/influxdata/influxdb/issues/9444 674 if seriesKey == nil { 675 return 676 } 677 678 // Check if deleted. 679 deleted := e.Flag == LogEntrySeriesTombstoneFlag 680 681 // Read key size. 682 _, remainder := tsdb.ReadSeriesKeyLen(seriesKey) 683 684 // Read measurement name. 685 name, remainder := tsdb.ReadSeriesKeyMeasurement(remainder) 686 mm := f.createMeasurementIfNotExists(name) 687 mm.deleted = false 688 if !deleted { 689 mm.addSeriesID(e.SeriesID) 690 } else { 691 mm.removeSeriesID(e.SeriesID) 692 } 693 694 // Read tag count. 695 tagN, remainder := tsdb.ReadSeriesKeyTagN(remainder) 696 697 // Save tags. 698 var k, v []byte 699 for i := 0; i < tagN; i++ { 700 k, v, remainder = tsdb.ReadSeriesKeyTag(remainder) 701 ts := mm.createTagSetIfNotExists(k) 702 tv := ts.createTagValueIfNotExists(v) 703 704 // Add/remove a reference to the series on the tag value. 705 if !deleted { 706 tv.addSeriesID(e.SeriesID) 707 } else { 708 tv.removeSeriesID(e.SeriesID) 709 } 710 711 ts.tagValues[string(v)] = tv 712 mm.tagSet[string(k)] = ts 713 } 714 715 // Add/remove from appropriate series id sets. 716 if !deleted { 717 f.seriesIDSet.Add(e.SeriesID) 718 f.tombstoneSeriesIDSet.Remove(e.SeriesID) 719 } else { 720 f.seriesIDSet.Remove(e.SeriesID) 721 f.tombstoneSeriesIDSet.Add(e.SeriesID) 722 } 723} 724 725// SeriesIDIterator returns an iterator over all series in the log file. 726func (f *LogFile) SeriesIDIterator() tsdb.SeriesIDIterator { 727 f.mu.RLock() 728 defer f.mu.RUnlock() 729 730 ss := tsdb.NewSeriesIDSet() 731 allSeriesSets := make([]*tsdb.SeriesIDSet, 0, len(f.mms)) 732 733 for _, mm := range f.mms { 734 if mm.seriesSet != nil { 735 allSeriesSets = append(allSeriesSets, mm.seriesSet) 736 continue 737 } 738 739 // measurement is not using seriesSet to store series IDs. 740 mm.forEach(func(seriesID uint64) { 741 ss.AddNoLock(seriesID) 742 }) 743 } 744 745 // Fast merge all seriesSets. 746 if len(allSeriesSets) > 0 { 747 ss.Merge(allSeriesSets...) 748 } 749 750 return tsdb.NewSeriesIDSetIterator(ss) 751} 752 753// createMeasurementIfNotExists returns a measurement by name. 754func (f *LogFile) createMeasurementIfNotExists(name []byte) *logMeasurement { 755 mm := f.mms[string(name)] 756 if mm == nil { 757 mm = &logMeasurement{ 758 name: name, 759 tagSet: make(map[string]logTagKey), 760 series: make(map[uint64]struct{}), 761 } 762 f.mms[string(name)] = mm 763 } 764 return mm 765} 766 767// MeasurementIterator returns an iterator over all the measurements in the file. 768func (f *LogFile) MeasurementIterator() MeasurementIterator { 769 f.mu.RLock() 770 defer f.mu.RUnlock() 771 772 var itr logMeasurementIterator 773 for _, mm := range f.mms { 774 itr.mms = append(itr.mms, *mm) 775 } 776 sort.Sort(logMeasurementSlice(itr.mms)) 777 return &itr 778} 779 780// MeasurementSeriesIDIterator returns an iterator over all series for a measurement. 781func (f *LogFile) MeasurementSeriesIDIterator(name []byte) tsdb.SeriesIDIterator { 782 f.mu.RLock() 783 defer f.mu.RUnlock() 784 785 mm := f.mms[string(name)] 786 if mm == nil || mm.cardinality() == 0 { 787 return nil 788 } 789 return tsdb.NewSeriesIDSetIterator(mm.seriesIDSet()) 790} 791 792// CompactTo compacts the log file and writes it to w. 793func (f *LogFile) CompactTo(w io.Writer, m, k uint64, cancel <-chan struct{}) (n int64, err error) { 794 f.mu.RLock() 795 defer f.mu.RUnlock() 796 797 // Check for cancellation. 798 select { 799 case <-cancel: 800 return n, ErrCompactionInterrupted 801 default: 802 } 803 804 // Wrap in bufferred writer with a buffer equivalent to the LogFile size. 805 bw := bufio.NewWriterSize(w, indexFileBufferSize) // 128K 806 807 // Setup compaction offset tracking data. 808 var t IndexFileTrailer 809 info := newLogFileCompactInfo() 810 info.cancel = cancel 811 812 // Write magic number. 813 if err := writeTo(bw, []byte(FileSignature), &n); err != nil { 814 return n, err 815 } 816 817 // Retreve measurement names in order. 818 names := f.measurementNames() 819 820 // Flush buffer & mmap series block. 821 if err := bw.Flush(); err != nil { 822 return n, err 823 } 824 825 // Write tagset blocks in measurement order. 826 if err := f.writeTagsetsTo(bw, names, info, &n); err != nil { 827 return n, err 828 } 829 830 // Write measurement block. 831 t.MeasurementBlock.Offset = n 832 if err := f.writeMeasurementBlockTo(bw, names, info, &n); err != nil { 833 return n, err 834 } 835 t.MeasurementBlock.Size = n - t.MeasurementBlock.Offset 836 837 // Write series set. 838 t.SeriesIDSet.Offset = n 839 nn, err := f.seriesIDSet.WriteTo(bw) 840 if n += nn; err != nil { 841 return n, err 842 } 843 t.SeriesIDSet.Size = n - t.SeriesIDSet.Offset 844 845 // Write tombstone series set. 846 t.TombstoneSeriesIDSet.Offset = n 847 nn, err = f.tombstoneSeriesIDSet.WriteTo(bw) 848 if n += nn; err != nil { 849 return n, err 850 } 851 t.TombstoneSeriesIDSet.Size = n - t.TombstoneSeriesIDSet.Offset 852 853 // Build series sketches. 854 sSketch, sTSketch, err := f.seriesSketches() 855 if err != nil { 856 return n, err 857 } 858 859 // Write series sketches. 860 t.SeriesSketch.Offset = n 861 data, err := sSketch.MarshalBinary() 862 if err != nil { 863 return n, err 864 } else if _, err := bw.Write(data); err != nil { 865 return n, err 866 } 867 t.SeriesSketch.Size = int64(len(data)) 868 n += t.SeriesSketch.Size 869 870 t.TombstoneSeriesSketch.Offset = n 871 if data, err = sTSketch.MarshalBinary(); err != nil { 872 return n, err 873 } else if _, err := bw.Write(data); err != nil { 874 return n, err 875 } 876 t.TombstoneSeriesSketch.Size = int64(len(data)) 877 n += t.TombstoneSeriesSketch.Size 878 879 // Write trailer. 880 nn, err = t.WriteTo(bw) 881 n += nn 882 if err != nil { 883 return n, err 884 } 885 886 // Flush buffer. 887 if err := bw.Flush(); err != nil { 888 return n, err 889 } 890 891 return n, nil 892} 893 894func (f *LogFile) writeTagsetsTo(w io.Writer, names []string, info *logFileCompactInfo, n *int64) error { 895 for _, name := range names { 896 if err := f.writeTagsetTo(w, name, info, n); err != nil { 897 return err 898 } 899 } 900 return nil 901} 902 903// writeTagsetTo writes a single tagset to w and saves the tagset offset. 904func (f *LogFile) writeTagsetTo(w io.Writer, name string, info *logFileCompactInfo, n *int64) error { 905 mm := f.mms[name] 906 907 // Check for cancellation. 908 select { 909 case <-info.cancel: 910 return ErrCompactionInterrupted 911 default: 912 } 913 914 enc := NewTagBlockEncoder(w) 915 var valueN int 916 for _, k := range mm.keys() { 917 tag := mm.tagSet[k] 918 919 // Encode tag. Skip values if tag is deleted. 920 if err := enc.EncodeKey(tag.name, tag.deleted); err != nil { 921 return err 922 } else if tag.deleted { 923 continue 924 } 925 926 // Sort tag values. 927 values := make([]string, 0, len(tag.tagValues)) 928 for v := range tag.tagValues { 929 values = append(values, v) 930 } 931 sort.Strings(values) 932 933 // Add each value. 934 for _, v := range values { 935 value := tag.tagValues[v] 936 if err := enc.EncodeValue(value.name, value.deleted, value.seriesIDs()); err != nil { 937 return err 938 } 939 940 // Check for cancellation periodically. 941 if valueN++; valueN%1000 == 0 { 942 select { 943 case <-info.cancel: 944 return ErrCompactionInterrupted 945 default: 946 } 947 } 948 } 949 } 950 951 // Save tagset offset to measurement. 952 offset := *n 953 954 // Flush tag block. 955 err := enc.Close() 956 *n += enc.N() 957 if err != nil { 958 return err 959 } 960 961 // Save tagset offset to measurement. 962 size := *n - offset 963 964 info.mms[name] = &logFileMeasurementCompactInfo{offset: offset, size: size} 965 966 return nil 967} 968 969func (f *LogFile) writeMeasurementBlockTo(w io.Writer, names []string, info *logFileCompactInfo, n *int64) error { 970 mw := NewMeasurementBlockWriter() 971 972 // Check for cancellation. 973 select { 974 case <-info.cancel: 975 return ErrCompactionInterrupted 976 default: 977 } 978 979 // Add measurement data. 980 for _, name := range names { 981 mm := f.mms[name] 982 mmInfo := info.mms[name] 983 assert(mmInfo != nil, "measurement info not found") 984 mw.Add(mm.name, mm.deleted, mmInfo.offset, mmInfo.size, mm.seriesIDs()) 985 } 986 987 // Flush data to writer. 988 nn, err := mw.WriteTo(w) 989 *n += nn 990 return err 991} 992 993// logFileCompactInfo is a context object to track compaction position info. 994type logFileCompactInfo struct { 995 cancel <-chan struct{} 996 mms map[string]*logFileMeasurementCompactInfo 997} 998 999// newLogFileCompactInfo returns a new instance of logFileCompactInfo. 1000func newLogFileCompactInfo() *logFileCompactInfo { 1001 return &logFileCompactInfo{ 1002 mms: make(map[string]*logFileMeasurementCompactInfo), 1003 } 1004} 1005 1006type logFileMeasurementCompactInfo struct { 1007 offset int64 1008 size int64 1009} 1010 1011// MeasurementsSketches returns sketches for existing and tombstoned measurement names. 1012func (f *LogFile) MeasurementsSketches() (sketch, tSketch estimator.Sketch, err error) { 1013 f.mu.RLock() 1014 defer f.mu.RUnlock() 1015 return f.measurementsSketches() 1016} 1017 1018func (f *LogFile) measurementsSketches() (sketch, tSketch estimator.Sketch, err error) { 1019 sketch, tSketch = hll.NewDefaultPlus(), hll.NewDefaultPlus() 1020 for _, mm := range f.mms { 1021 if mm.deleted { 1022 tSketch.Add(mm.name) 1023 } else { 1024 sketch.Add(mm.name) 1025 } 1026 } 1027 return sketch, tSketch, nil 1028} 1029 1030// SeriesSketches returns sketches for existing and tombstoned series. 1031func (f *LogFile) SeriesSketches() (sketch, tSketch estimator.Sketch, err error) { 1032 f.mu.RLock() 1033 defer f.mu.RUnlock() 1034 return f.seriesSketches() 1035} 1036 1037func (f *LogFile) seriesSketches() (sketch, tSketch estimator.Sketch, err error) { 1038 sketch = hll.NewDefaultPlus() 1039 f.seriesIDSet.ForEach(func(id uint64) { 1040 name, keys := f.sfile.Series(id) 1041 sketch.Add(models.MakeKey(name, keys)) 1042 }) 1043 1044 tSketch = hll.NewDefaultPlus() 1045 f.tombstoneSeriesIDSet.ForEach(func(id uint64) { 1046 name, keys := f.sfile.Series(id) 1047 sketch.Add(models.MakeKey(name, keys)) 1048 }) 1049 return sketch, tSketch, nil 1050} 1051 1052// LogEntry represents a single log entry in the write-ahead log. 1053type LogEntry struct { 1054 Flag byte // flag 1055 SeriesID uint64 // series id 1056 Name []byte // measurement name 1057 Key []byte // tag key 1058 Value []byte // tag value 1059 Checksum uint32 // checksum of flag/name/tags. 1060 Size int // total size of record, in bytes. 1061 1062 cached bool // Hint to LogFile that series data is already parsed 1063 name []byte // series naem, this is a cached copy of the parsed measurement name 1064 tags models.Tags // series tags, this is a cached copied of the parsed tags 1065} 1066 1067// UnmarshalBinary unmarshals data into e. 1068func (e *LogEntry) UnmarshalBinary(data []byte) error { 1069 var sz uint64 1070 var n int 1071 var seriesID uint64 1072 var err error 1073 1074 orig := data 1075 start := len(data) 1076 1077 // Parse flag data. 1078 if len(data) < 1 { 1079 return io.ErrShortBuffer 1080 } 1081 e.Flag, data = data[0], data[1:] 1082 1083 // Parse series id. 1084 if seriesID, n, err = uvarint(data); err != nil { 1085 return err 1086 } 1087 e.SeriesID, data = seriesID, data[n:] 1088 1089 // Parse name length. 1090 if sz, n, err = uvarint(data); err != nil { 1091 return err 1092 } 1093 1094 // Read name data. 1095 if len(data) < n+int(sz) { 1096 return io.ErrShortBuffer 1097 } 1098 e.Name, data = data[n:n+int(sz)], data[n+int(sz):] 1099 1100 // Parse key length. 1101 if sz, n, err = uvarint(data); err != nil { 1102 return err 1103 } 1104 1105 // Read key data. 1106 if len(data) < n+int(sz) { 1107 return io.ErrShortBuffer 1108 } 1109 e.Key, data = data[n:n+int(sz)], data[n+int(sz):] 1110 1111 // Parse value length. 1112 if sz, n, err = uvarint(data); err != nil { 1113 return err 1114 } 1115 1116 // Read value data. 1117 if len(data) < n+int(sz) { 1118 return io.ErrShortBuffer 1119 } 1120 e.Value, data = data[n:n+int(sz)], data[n+int(sz):] 1121 1122 // Compute checksum. 1123 chk := crc32.ChecksumIEEE(orig[:start-len(data)]) 1124 1125 // Parse checksum. 1126 if len(data) < 4 { 1127 return io.ErrShortBuffer 1128 } 1129 e.Checksum, data = binary.BigEndian.Uint32(data[:4]), data[4:] 1130 1131 // Verify checksum. 1132 if chk != e.Checksum { 1133 return ErrLogEntryChecksumMismatch 1134 } 1135 1136 // Save length of elem. 1137 e.Size = start - len(data) 1138 1139 return nil 1140} 1141 1142// appendLogEntry appends to dst and returns the new buffer. 1143// This updates the checksum on the entry. 1144func appendLogEntry(dst []byte, e *LogEntry) []byte { 1145 var buf [binary.MaxVarintLen64]byte 1146 start := len(dst) 1147 1148 // Append flag. 1149 dst = append(dst, e.Flag) 1150 1151 // Append series id. 1152 n := binary.PutUvarint(buf[:], uint64(e.SeriesID)) 1153 dst = append(dst, buf[:n]...) 1154 1155 // Append name. 1156 n = binary.PutUvarint(buf[:], uint64(len(e.Name))) 1157 dst = append(dst, buf[:n]...) 1158 dst = append(dst, e.Name...) 1159 1160 // Append key. 1161 n = binary.PutUvarint(buf[:], uint64(len(e.Key))) 1162 dst = append(dst, buf[:n]...) 1163 dst = append(dst, e.Key...) 1164 1165 // Append value. 1166 n = binary.PutUvarint(buf[:], uint64(len(e.Value))) 1167 dst = append(dst, buf[:n]...) 1168 dst = append(dst, e.Value...) 1169 1170 // Calculate checksum. 1171 e.Checksum = crc32.ChecksumIEEE(dst[start:]) 1172 1173 // Append checksum. 1174 binary.BigEndian.PutUint32(buf[:4], e.Checksum) 1175 dst = append(dst, buf[:4]...) 1176 1177 return dst 1178} 1179 1180// logMeasurements represents a map of measurement names to measurements. 1181type logMeasurements map[string]*logMeasurement 1182 1183// bytes estimates the memory footprint of this logMeasurements, in bytes. 1184func (mms *logMeasurements) bytes() int { 1185 var b int 1186 for k, v := range *mms { 1187 b += len(k) 1188 b += v.bytes() 1189 } 1190 b += int(unsafe.Sizeof(*mms)) 1191 return b 1192} 1193 1194type logMeasurement struct { 1195 name []byte 1196 tagSet map[string]logTagKey 1197 deleted bool 1198 series map[uint64]struct{} 1199 seriesSet *tsdb.SeriesIDSet 1200} 1201 1202// bytes estimates the memory footprint of this logMeasurement, in bytes. 1203func (m *logMeasurement) bytes() int { 1204 var b int 1205 b += len(m.name) 1206 for k, v := range m.tagSet { 1207 b += len(k) 1208 b += v.bytes() 1209 } 1210 b += (int(m.cardinality()) * 8) 1211 b += int(unsafe.Sizeof(*m)) 1212 return b 1213} 1214 1215func (m *logMeasurement) addSeriesID(x uint64) { 1216 if m.seriesSet != nil { 1217 m.seriesSet.AddNoLock(x) 1218 return 1219 } 1220 1221 m.series[x] = struct{}{} 1222 1223 // If the map is getting too big it can be converted into a roaring seriesSet. 1224 if len(m.series) > 25 { 1225 m.seriesSet = tsdb.NewSeriesIDSet() 1226 for id := range m.series { 1227 m.seriesSet.AddNoLock(id) 1228 } 1229 m.series = nil 1230 } 1231} 1232 1233func (m *logMeasurement) removeSeriesID(x uint64) { 1234 if m.seriesSet != nil { 1235 m.seriesSet.RemoveNoLock(x) 1236 return 1237 } 1238 delete(m.series, x) 1239} 1240 1241func (m *logMeasurement) cardinality() int64 { 1242 if m.seriesSet != nil { 1243 return int64(m.seriesSet.Cardinality()) 1244 } 1245 return int64(len(m.series)) 1246} 1247 1248// forEach applies fn to every series ID in the logMeasurement. 1249func (m *logMeasurement) forEach(fn func(uint64)) { 1250 if m.seriesSet != nil { 1251 m.seriesSet.ForEachNoLock(fn) 1252 return 1253 } 1254 1255 for seriesID := range m.series { 1256 fn(seriesID) 1257 } 1258} 1259 1260// seriesIDs returns a sorted set of seriesIDs. 1261func (m *logMeasurement) seriesIDs() []uint64 { 1262 a := make([]uint64, 0, m.cardinality()) 1263 if m.seriesSet != nil { 1264 m.seriesSet.ForEachNoLock(func(id uint64) { a = append(a, id) }) 1265 return a // IDs are already sorted. 1266 } 1267 1268 for seriesID := range m.series { 1269 a = append(a, seriesID) 1270 } 1271 sort.Sort(uint64Slice(a)) 1272 return a 1273} 1274 1275// seriesIDSet returns a copy of the logMeasurement's seriesSet, or creates a new 1276// one 1277func (m *logMeasurement) seriesIDSet() *tsdb.SeriesIDSet { 1278 if m.seriesSet != nil { 1279 return m.seriesSet.CloneNoLock() 1280 } 1281 1282 ss := tsdb.NewSeriesIDSet() 1283 for seriesID := range m.series { 1284 ss.AddNoLock(seriesID) 1285 } 1286 return ss 1287} 1288 1289func (m *logMeasurement) Name() []byte { return m.name } 1290func (m *logMeasurement) Deleted() bool { return m.deleted } 1291 1292func (m *logMeasurement) createTagSetIfNotExists(key []byte) logTagKey { 1293 ts, ok := m.tagSet[string(key)] 1294 if !ok { 1295 ts = logTagKey{name: key, tagValues: make(map[string]logTagValue)} 1296 } 1297 return ts 1298} 1299 1300// keys returns a sorted list of tag keys. 1301func (m *logMeasurement) keys() []string { 1302 a := make([]string, 0, len(m.tagSet)) 1303 for k := range m.tagSet { 1304 a = append(a, k) 1305 } 1306 sort.Strings(a) 1307 return a 1308} 1309 1310// logMeasurementSlice is a sortable list of log measurements. 1311type logMeasurementSlice []logMeasurement 1312 1313func (a logMeasurementSlice) Len() int { return len(a) } 1314func (a logMeasurementSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] } 1315func (a logMeasurementSlice) Less(i, j int) bool { return bytes.Compare(a[i].name, a[j].name) == -1 } 1316 1317// logMeasurementIterator represents an iterator over a slice of measurements. 1318type logMeasurementIterator struct { 1319 mms []logMeasurement 1320} 1321 1322// Next returns the next element in the iterator. 1323func (itr *logMeasurementIterator) Next() (e MeasurementElem) { 1324 if len(itr.mms) == 0 { 1325 return nil 1326 } 1327 e, itr.mms = &itr.mms[0], itr.mms[1:] 1328 return e 1329} 1330 1331type logTagKey struct { 1332 name []byte 1333 deleted bool 1334 tagValues map[string]logTagValue 1335} 1336 1337// bytes estimates the memory footprint of this logTagKey, in bytes. 1338func (tk *logTagKey) bytes() int { 1339 var b int 1340 b += len(tk.name) 1341 for k, v := range tk.tagValues { 1342 b += len(k) 1343 b += v.bytes() 1344 } 1345 b += int(unsafe.Sizeof(*tk)) 1346 return b 1347} 1348 1349func (tk *logTagKey) Key() []byte { return tk.name } 1350func (tk *logTagKey) Deleted() bool { return tk.deleted } 1351 1352func (tk *logTagKey) TagValueIterator() TagValueIterator { 1353 a := make([]logTagValue, 0, len(tk.tagValues)) 1354 for _, v := range tk.tagValues { 1355 a = append(a, v) 1356 } 1357 return newLogTagValueIterator(a) 1358} 1359 1360func (tk *logTagKey) createTagValueIfNotExists(value []byte) logTagValue { 1361 tv, ok := tk.tagValues[string(value)] 1362 if !ok { 1363 tv = logTagValue{name: value, series: make(map[uint64]struct{})} 1364 } 1365 return tv 1366} 1367 1368// logTagKey is a sortable list of log tag keys. 1369type logTagKeySlice []logTagKey 1370 1371func (a logTagKeySlice) Len() int { return len(a) } 1372func (a logTagKeySlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] } 1373func (a logTagKeySlice) Less(i, j int) bool { return bytes.Compare(a[i].name, a[j].name) == -1 } 1374 1375type logTagValue struct { 1376 name []byte 1377 deleted bool 1378 series map[uint64]struct{} 1379 seriesSet *tsdb.SeriesIDSet 1380} 1381 1382// bytes estimates the memory footprint of this logTagValue, in bytes. 1383func (tv *logTagValue) bytes() int { 1384 var b int 1385 b += len(tv.name) 1386 b += int(unsafe.Sizeof(*tv)) 1387 b += (int(tv.cardinality()) * 8) 1388 return b 1389} 1390 1391func (tv *logTagValue) addSeriesID(x uint64) { 1392 if tv.seriesSet != nil { 1393 tv.seriesSet.AddNoLock(x) 1394 return 1395 } 1396 1397 tv.series[x] = struct{}{} 1398 1399 // If the map is getting too big it can be converted into a roaring seriesSet. 1400 if len(tv.series) > 25 { 1401 tv.seriesSet = tsdb.NewSeriesIDSet() 1402 for id := range tv.series { 1403 tv.seriesSet.AddNoLock(id) 1404 } 1405 tv.series = nil 1406 } 1407} 1408 1409func (tv *logTagValue) removeSeriesID(x uint64) { 1410 if tv.seriesSet != nil { 1411 tv.seriesSet.RemoveNoLock(x) 1412 return 1413 } 1414 delete(tv.series, x) 1415} 1416 1417func (tv *logTagValue) cardinality() int64 { 1418 if tv.seriesSet != nil { 1419 return int64(tv.seriesSet.Cardinality()) 1420 } 1421 return int64(len(tv.series)) 1422} 1423 1424// seriesIDs returns a sorted set of seriesIDs. 1425func (tv *logTagValue) seriesIDs() []uint64 { 1426 a := make([]uint64, 0, tv.cardinality()) 1427 if tv.seriesSet != nil { 1428 tv.seriesSet.ForEachNoLock(func(id uint64) { a = append(a, id) }) 1429 return a // IDs are already sorted. 1430 } 1431 1432 for seriesID := range tv.series { 1433 a = append(a, seriesID) 1434 } 1435 sort.Sort(uint64Slice(a)) 1436 return a 1437} 1438 1439// seriesIDSet returns a copy of the logMeasurement's seriesSet, or creates a new 1440// one 1441func (tv *logTagValue) seriesIDSet() *tsdb.SeriesIDSet { 1442 if tv.seriesSet != nil { 1443 return tv.seriesSet.CloneNoLock() 1444 } 1445 1446 ss := tsdb.NewSeriesIDSet() 1447 for seriesID := range tv.series { 1448 ss.AddNoLock(seriesID) 1449 } 1450 return ss 1451} 1452 1453func (tv *logTagValue) Value() []byte { return tv.name } 1454func (tv *logTagValue) Deleted() bool { return tv.deleted } 1455 1456// logTagValue is a sortable list of log tag values. 1457type logTagValueSlice []logTagValue 1458 1459func (a logTagValueSlice) Len() int { return len(a) } 1460func (a logTagValueSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] } 1461func (a logTagValueSlice) Less(i, j int) bool { return bytes.Compare(a[i].name, a[j].name) == -1 } 1462 1463// logTagKeyIterator represents an iterator over a slice of tag keys. 1464type logTagKeyIterator struct { 1465 a []logTagKey 1466} 1467 1468// newLogTagKeyIterator returns a new instance of logTagKeyIterator. 1469func newLogTagKeyIterator(a []logTagKey) *logTagKeyIterator { 1470 sort.Sort(logTagKeySlice(a)) 1471 return &logTagKeyIterator{a: a} 1472} 1473 1474// Next returns the next element in the iterator. 1475func (itr *logTagKeyIterator) Next() (e TagKeyElem) { 1476 if len(itr.a) == 0 { 1477 return nil 1478 } 1479 e, itr.a = &itr.a[0], itr.a[1:] 1480 return e 1481} 1482 1483// logTagValueIterator represents an iterator over a slice of tag values. 1484type logTagValueIterator struct { 1485 a []logTagValue 1486} 1487 1488// newLogTagValueIterator returns a new instance of logTagValueIterator. 1489func newLogTagValueIterator(a []logTagValue) *logTagValueIterator { 1490 sort.Sort(logTagValueSlice(a)) 1491 return &logTagValueIterator{a: a} 1492} 1493 1494// Next returns the next element in the iterator. 1495func (itr *logTagValueIterator) Next() (e TagValueElem) { 1496 if len(itr.a) == 0 { 1497 return nil 1498 } 1499 e, itr.a = &itr.a[0], itr.a[1:] 1500 return e 1501} 1502 1503// FormatLogFileName generates a log filename for the given index. 1504func FormatLogFileName(id int) string { 1505 return fmt.Sprintf("L0-%08d%s", id, LogFileExt) 1506} 1507