1package tsi1 2 3import ( 4 "bufio" 5 "bytes" 6 "encoding/binary" 7 "errors" 8 "fmt" 9 "hash/crc32" 10 "io" 11 "os" 12 "sort" 13 "sync" 14 "time" 15 "unsafe" 16 17 "github.com/influxdata/influxdb/models" 18 "github.com/influxdata/influxdb/pkg/bloom" 19 "github.com/influxdata/influxdb/pkg/estimator" 20 "github.com/influxdata/influxdb/pkg/estimator/hll" 21 "github.com/influxdata/influxdb/pkg/mmap" 22 "github.com/influxdata/influxdb/tsdb" 23) 24 25// Log errors. 26var ( 27 ErrLogEntryChecksumMismatch = errors.New("log entry checksum mismatch") 28) 29 30// Log entry flag constants. 31const ( 32 LogEntrySeriesTombstoneFlag = 0x01 33 LogEntryMeasurementTombstoneFlag = 0x02 34 LogEntryTagKeyTombstoneFlag = 0x04 35 LogEntryTagValueTombstoneFlag = 0x08 36) 37 38// defaultLogFileBufferSize describes the size of the buffer that the LogFile's buffered 39// writer uses. If the LogFile does not have an explicit buffer size set then 40// this is the size of the buffer; it is equal to the default buffer size used 41// by a bufio.Writer. 42const defaultLogFileBufferSize = 4096 43 44// indexFileBufferSize is the buffer size used when compacting the LogFile down 45// into a .tsi file. 46const indexFileBufferSize = 1 << 17 // 128K 47 48// LogFile represents an on-disk write-ahead log file. 49type LogFile struct { 50 mu sync.RWMutex 51 wg sync.WaitGroup // ref count 52 id int // file sequence identifier 53 data []byte // mmap 54 file *os.File // writer 55 w *bufio.Writer // buffered writer 56 bufferSize int // The size of the buffer used by the buffered writer 57 nosync bool // Disables buffer flushing and file syncing. Useful for offline tooling. 58 buf []byte // marshaling buffer 59 keyBuf []byte 60 61 sfile *tsdb.SeriesFile // series lookup 62 size int64 // tracks current file size 63 modTime time.Time // tracks last time write occurred 64 65 // In-memory series existence/tombstone sets. 66 seriesIDSet, tombstoneSeriesIDSet *tsdb.SeriesIDSet 67 68 // In-memory index. 69 mms logMeasurements 70 71 // Filepath to the log file. 72 path string 73} 74 75// NewLogFile returns a new instance of LogFile. 76func NewLogFile(sfile *tsdb.SeriesFile, path string) *LogFile { 77 return &LogFile{ 78 sfile: sfile, 79 path: path, 80 mms: make(logMeasurements), 81 82 seriesIDSet: tsdb.NewSeriesIDSet(), 83 tombstoneSeriesIDSet: tsdb.NewSeriesIDSet(), 84 } 85} 86 87// bytes estimates the memory footprint of this LogFile, in bytes. 88func (f *LogFile) bytes() int { 89 var b int 90 b += 24 // mu RWMutex is 24 bytes 91 b += 16 // wg WaitGroup is 16 bytes 92 b += int(unsafe.Sizeof(f.id)) 93 // Do not include f.data because it is mmap'd 94 // TODO(jacobmarble): Uncomment when we are using go >= 1.10.0 95 //b += int(unsafe.Sizeof(f.w)) + f.w.Size() 96 b += int(unsafe.Sizeof(f.buf)) + len(f.buf) 97 b += int(unsafe.Sizeof(f.keyBuf)) + len(f.keyBuf) 98 // Do not count SeriesFile because it belongs to the code that constructed this Index. 99 b += int(unsafe.Sizeof(f.size)) 100 b += int(unsafe.Sizeof(f.modTime)) 101 b += int(unsafe.Sizeof(f.seriesIDSet)) + f.seriesIDSet.Bytes() 102 b += int(unsafe.Sizeof(f.tombstoneSeriesIDSet)) + f.tombstoneSeriesIDSet.Bytes() 103 b += int(unsafe.Sizeof(f.mms)) + f.mms.bytes() 104 b += int(unsafe.Sizeof(f.path)) + len(f.path) 105 return b 106} 107 108// Open reads the log from a file and validates all the checksums. 109func (f *LogFile) Open() error { 110 if err := f.open(); err != nil { 111 f.Close() 112 return err 113 } 114 return nil 115} 116 117func (f *LogFile) open() error { 118 f.id, _ = ParseFilename(f.path) 119 120 // Open file for appending. 121 file, err := os.OpenFile(f.Path(), os.O_WRONLY|os.O_CREATE, 0666) 122 if err != nil { 123 return err 124 } 125 f.file = file 126 127 if f.bufferSize == 0 { 128 f.bufferSize = defaultLogFileBufferSize 129 } 130 f.w = bufio.NewWriterSize(f.file, f.bufferSize) 131 132 // Finish opening if file is empty. 133 fi, err := file.Stat() 134 if err != nil { 135 return err 136 } else if fi.Size() == 0 { 137 return nil 138 } 139 f.size = fi.Size() 140 f.modTime = fi.ModTime() 141 142 // Open a read-only memory map of the existing data. 143 data, err := mmap.Map(f.Path(), 0) 144 if err != nil { 145 return err 146 } 147 f.data = data 148 149 // Read log entries from mmap. 150 var n int64 151 for buf := f.data; len(buf) > 0; { 152 // Read next entry. Truncate partial writes. 153 var e LogEntry 154 if err := e.UnmarshalBinary(buf); err == io.ErrShortBuffer || err == ErrLogEntryChecksumMismatch { 155 break 156 } else if err != nil { 157 return err 158 } 159 160 // Execute entry against in-memory index. 161 f.execEntry(&e) 162 163 // Move buffer forward. 164 n += int64(e.Size) 165 buf = buf[e.Size:] 166 } 167 168 // Move to the end of the file. 169 f.size = n 170 _, err = file.Seek(n, io.SeekStart) 171 return err 172} 173 174// Close shuts down the file handle and mmap. 175func (f *LogFile) Close() error { 176 // Wait until the file has no more references. 177 f.wg.Wait() 178 179 if f.w != nil { 180 f.w.Flush() 181 f.w = nil 182 } 183 184 if f.file != nil { 185 f.file.Close() 186 f.file = nil 187 } 188 189 if f.data != nil { 190 mmap.Unmap(f.data) 191 } 192 193 f.mms = make(logMeasurements) 194 return nil 195} 196 197// FlushAndSync flushes buffered data to disk and then fsyncs the underlying file. 198// If the LogFile has disabled flushing and syncing then FlushAndSync is a no-op. 199func (f *LogFile) FlushAndSync() error { 200 if f.nosync { 201 return nil 202 } 203 204 if f.w != nil { 205 if err := f.w.Flush(); err != nil { 206 return err 207 } 208 } 209 210 if f.file == nil { 211 return nil 212 } 213 return f.file.Sync() 214} 215 216// ID returns the file sequence identifier. 217func (f *LogFile) ID() int { return f.id } 218 219// Path returns the file path. 220func (f *LogFile) Path() string { return f.path } 221 222// SetPath sets the log file's path. 223func (f *LogFile) SetPath(path string) { f.path = path } 224 225// Level returns the log level of the file. 226func (f *LogFile) Level() int { return 0 } 227 228// Filter returns the bloom filter for the file. 229func (f *LogFile) Filter() *bloom.Filter { return nil } 230 231// Retain adds a reference count to the file. 232func (f *LogFile) Retain() { f.wg.Add(1) } 233 234// Release removes a reference count from the file. 235func (f *LogFile) Release() { f.wg.Done() } 236 237// Stat returns size and last modification time of the file. 238func (f *LogFile) Stat() (int64, time.Time) { 239 f.mu.RLock() 240 size, modTime := f.size, f.modTime 241 f.mu.RUnlock() 242 return size, modTime 243} 244 245// SeriesIDSet returns the series existence set. 246func (f *LogFile) SeriesIDSet() (*tsdb.SeriesIDSet, error) { 247 return f.seriesIDSet, nil 248} 249 250// TombstoneSeriesIDSet returns the series tombstone set. 251func (f *LogFile) TombstoneSeriesIDSet() (*tsdb.SeriesIDSet, error) { 252 return f.tombstoneSeriesIDSet, nil 253} 254 255// Size returns the size of the file, in bytes. 256func (f *LogFile) Size() int64 { 257 f.mu.RLock() 258 v := f.size 259 f.mu.RUnlock() 260 return v 261} 262 263// Measurement returns a measurement element. 264func (f *LogFile) Measurement(name []byte) MeasurementElem { 265 f.mu.RLock() 266 defer f.mu.RUnlock() 267 268 mm, ok := f.mms[string(name)] 269 if !ok { 270 return nil 271 } 272 273 return mm 274} 275 276func (f *LogFile) MeasurementHasSeries(ss *tsdb.SeriesIDSet, name []byte) bool { 277 f.mu.RLock() 278 defer f.mu.RUnlock() 279 280 mm, ok := f.mms[string(name)] 281 if !ok { 282 return false 283 } 284 285 // TODO(edd): if mm is using a seriesSet then this could be changed to do a fast intersection. 286 for _, id := range mm.seriesIDs() { 287 if ss.Contains(id) { 288 return true 289 } 290 } 291 return false 292} 293 294// MeasurementNames returns an ordered list of measurement names. 295func (f *LogFile) MeasurementNames() []string { 296 f.mu.RLock() 297 defer f.mu.RUnlock() 298 return f.measurementNames() 299} 300 301func (f *LogFile) measurementNames() []string { 302 a := make([]string, 0, len(f.mms)) 303 for name := range f.mms { 304 a = append(a, name) 305 } 306 sort.Strings(a) 307 return a 308} 309 310// DeleteMeasurement adds a tombstone for a measurement to the log file. 311func (f *LogFile) DeleteMeasurement(name []byte) error { 312 f.mu.Lock() 313 defer f.mu.Unlock() 314 315 e := LogEntry{Flag: LogEntryMeasurementTombstoneFlag, Name: name} 316 if err := f.appendEntry(&e); err != nil { 317 return err 318 } 319 f.execEntry(&e) 320 321 // Flush buffer and sync to disk. 322 return f.FlushAndSync() 323} 324 325// TagKeySeriesIDIterator returns a series iterator for a tag key. 326func (f *LogFile) TagKeySeriesIDIterator(name, key []byte) tsdb.SeriesIDIterator { 327 f.mu.RLock() 328 defer f.mu.RUnlock() 329 330 mm, ok := f.mms[string(name)] 331 if !ok { 332 return nil 333 } 334 335 tk, ok := mm.tagSet[string(key)] 336 if !ok { 337 return nil 338 } 339 340 // Combine iterators across all tag keys. 341 itrs := make([]tsdb.SeriesIDIterator, 0, len(tk.tagValues)) 342 for _, tv := range tk.tagValues { 343 if tv.cardinality() == 0 { 344 continue 345 } 346 if itr := tsdb.NewSeriesIDSetIterator(tv.seriesIDSet()); itr != nil { 347 itrs = append(itrs, itr) 348 } 349 } 350 351 return tsdb.MergeSeriesIDIterators(itrs...) 352} 353 354// TagKeyIterator returns a value iterator for a measurement. 355func (f *LogFile) TagKeyIterator(name []byte) TagKeyIterator { 356 f.mu.RLock() 357 defer f.mu.RUnlock() 358 359 mm, ok := f.mms[string(name)] 360 if !ok { 361 return nil 362 } 363 364 a := make([]logTagKey, 0, len(mm.tagSet)) 365 for _, k := range mm.tagSet { 366 a = append(a, k) 367 } 368 return newLogTagKeyIterator(a) 369} 370 371// TagKey returns a tag key element. 372func (f *LogFile) TagKey(name, key []byte) TagKeyElem { 373 f.mu.RLock() 374 defer f.mu.RUnlock() 375 376 mm, ok := f.mms[string(name)] 377 if !ok { 378 return nil 379 } 380 381 tk, ok := mm.tagSet[string(key)] 382 if !ok { 383 return nil 384 } 385 386 return &tk 387} 388 389// TagValue returns a tag value element. 390func (f *LogFile) TagValue(name, key, value []byte) TagValueElem { 391 f.mu.RLock() 392 defer f.mu.RUnlock() 393 394 mm, ok := f.mms[string(name)] 395 if !ok { 396 return nil 397 } 398 399 tk, ok := mm.tagSet[string(key)] 400 if !ok { 401 return nil 402 } 403 404 tv, ok := tk.tagValues[string(value)] 405 if !ok { 406 return nil 407 } 408 409 return &tv 410} 411 412// TagValueIterator returns a value iterator for a tag key. 413func (f *LogFile) TagValueIterator(name, key []byte) TagValueIterator { 414 f.mu.RLock() 415 defer f.mu.RUnlock() 416 417 mm, ok := f.mms[string(name)] 418 if !ok { 419 return nil 420 } 421 422 tk, ok := mm.tagSet[string(key)] 423 if !ok { 424 return nil 425 } 426 return tk.TagValueIterator() 427} 428 429// DeleteTagKey adds a tombstone for a tag key to the log file. 430func (f *LogFile) DeleteTagKey(name, key []byte) error { 431 f.mu.Lock() 432 defer f.mu.Unlock() 433 434 e := LogEntry{Flag: LogEntryTagKeyTombstoneFlag, Name: name, Key: key} 435 if err := f.appendEntry(&e); err != nil { 436 return err 437 } 438 f.execEntry(&e) 439 440 // Flush buffer and sync to disk. 441 return f.FlushAndSync() 442} 443 444// TagValueSeriesIDSet returns a series iterator for a tag value. 445func (f *LogFile) TagValueSeriesIDSet(name, key, value []byte) (*tsdb.SeriesIDSet, error) { 446 f.mu.RLock() 447 defer f.mu.RUnlock() 448 449 mm, ok := f.mms[string(name)] 450 if !ok { 451 return nil, nil 452 } 453 454 tk, ok := mm.tagSet[string(key)] 455 if !ok { 456 return nil, nil 457 } 458 459 tv, ok := tk.tagValues[string(value)] 460 if !ok { 461 return nil, nil 462 } else if tv.cardinality() == 0 { 463 return nil, nil 464 } 465 466 return tv.seriesIDSet(), nil 467} 468 469// MeasurementN returns the total number of measurements. 470func (f *LogFile) MeasurementN() (n uint64) { 471 f.mu.RLock() 472 defer f.mu.RUnlock() 473 return uint64(len(f.mms)) 474} 475 476// TagKeyN returns the total number of keys. 477func (f *LogFile) TagKeyN() (n uint64) { 478 f.mu.RLock() 479 defer f.mu.RUnlock() 480 for _, mm := range f.mms { 481 n += uint64(len(mm.tagSet)) 482 } 483 return n 484} 485 486// TagValueN returns the total number of values. 487func (f *LogFile) TagValueN() (n uint64) { 488 f.mu.RLock() 489 defer f.mu.RUnlock() 490 for _, mm := range f.mms { 491 for _, k := range mm.tagSet { 492 n += uint64(len(k.tagValues)) 493 } 494 } 495 return n 496} 497 498// DeleteTagValue adds a tombstone for a tag value to the log file. 499func (f *LogFile) DeleteTagValue(name, key, value []byte) error { 500 f.mu.Lock() 501 defer f.mu.Unlock() 502 503 e := LogEntry{Flag: LogEntryTagValueTombstoneFlag, Name: name, Key: key, Value: value} 504 if err := f.appendEntry(&e); err != nil { 505 return err 506 } 507 f.execEntry(&e) 508 509 // Flush buffer and sync to disk. 510 return f.FlushAndSync() 511} 512 513// AddSeriesList adds a list of series to the log file in bulk. 514func (f *LogFile) AddSeriesList(seriesSet *tsdb.SeriesIDSet, names [][]byte, tagsSlice []models.Tags) ([]uint64, error) { 515 seriesIDs, err := f.sfile.CreateSeriesListIfNotExists(names, tagsSlice) 516 if err != nil { 517 return nil, err 518 } 519 520 var writeRequired bool 521 entries := make([]LogEntry, 0, len(names)) 522 seriesSet.RLock() 523 for i := range names { 524 if seriesSet.ContainsNoLock(seriesIDs[i]) { 525 // We don't need to allocate anything for this series. 526 seriesIDs[i] = 0 527 continue 528 } 529 writeRequired = true 530 entries = append(entries, LogEntry{SeriesID: seriesIDs[i], name: names[i], tags: tagsSlice[i], cached: true, batchidx: i}) 531 } 532 seriesSet.RUnlock() 533 534 // Exit if all series already exist. 535 if !writeRequired { 536 return seriesIDs, nil 537 } 538 539 f.mu.Lock() 540 defer f.mu.Unlock() 541 542 seriesSet.Lock() 543 defer seriesSet.Unlock() 544 545 for i := range entries { // NB - this doesn't evaluate all series ids returned from series file. 546 entry := &entries[i] 547 if seriesSet.ContainsNoLock(entry.SeriesID) { 548 // We don't need to allocate anything for this series. 549 seriesIDs[entry.batchidx] = 0 550 continue 551 } 552 if err := f.appendEntry(entry); err != nil { 553 return nil, err 554 } 555 f.execEntry(entry) 556 seriesSet.AddNoLock(entry.SeriesID) 557 } 558 559 // Flush buffer and sync to disk. 560 if err := f.FlushAndSync(); err != nil { 561 return nil, err 562 } 563 return seriesIDs, nil 564} 565 566// DeleteSeriesID adds a tombstone for a series id. 567func (f *LogFile) DeleteSeriesID(id uint64) error { 568 f.mu.Lock() 569 defer f.mu.Unlock() 570 571 e := LogEntry{Flag: LogEntrySeriesTombstoneFlag, SeriesID: id} 572 if err := f.appendEntry(&e); err != nil { 573 return err 574 } 575 f.execEntry(&e) 576 577 // Flush buffer and sync to disk. 578 return f.FlushAndSync() 579} 580 581// SeriesN returns the total number of series in the file. 582func (f *LogFile) SeriesN() (n uint64) { 583 f.mu.RLock() 584 defer f.mu.RUnlock() 585 586 for _, mm := range f.mms { 587 n += uint64(mm.cardinality()) 588 } 589 return n 590} 591 592// appendEntry adds a log entry to the end of the file. 593func (f *LogFile) appendEntry(e *LogEntry) error { 594 // Marshal entry to the local buffer. 595 f.buf = appendLogEntry(f.buf[:0], e) 596 597 // Save the size of the record. 598 e.Size = len(f.buf) 599 600 // Write record to file. 601 n, err := f.w.Write(f.buf) 602 if err != nil { 603 // Move position backwards over partial entry. 604 // Log should be reopened if seeking cannot be completed. 605 if n > 0 { 606 f.w.Reset(f.file) 607 if _, err := f.file.Seek(int64(-n), io.SeekCurrent); err != nil { 608 f.Close() 609 } 610 } 611 return err 612 } 613 614 // Update in-memory file size & modification time. 615 f.size += int64(n) 616 f.modTime = time.Now() 617 618 return nil 619} 620 621// execEntry executes a log entry against the in-memory index. 622// This is done after appending and on replay of the log. 623func (f *LogFile) execEntry(e *LogEntry) { 624 switch e.Flag { 625 case LogEntryMeasurementTombstoneFlag: 626 f.execDeleteMeasurementEntry(e) 627 case LogEntryTagKeyTombstoneFlag: 628 f.execDeleteTagKeyEntry(e) 629 case LogEntryTagValueTombstoneFlag: 630 f.execDeleteTagValueEntry(e) 631 default: 632 f.execSeriesEntry(e) 633 } 634} 635 636func (f *LogFile) execDeleteMeasurementEntry(e *LogEntry) { 637 mm := f.createMeasurementIfNotExists(e.Name) 638 mm.deleted = true 639 mm.tagSet = make(map[string]logTagKey) 640 mm.series = make(map[uint64]struct{}) 641 mm.seriesSet = nil 642} 643 644func (f *LogFile) execDeleteTagKeyEntry(e *LogEntry) { 645 mm := f.createMeasurementIfNotExists(e.Name) 646 ts := mm.createTagSetIfNotExists(e.Key) 647 648 ts.deleted = true 649 650 mm.tagSet[string(e.Key)] = ts 651} 652 653func (f *LogFile) execDeleteTagValueEntry(e *LogEntry) { 654 mm := f.createMeasurementIfNotExists(e.Name) 655 ts := mm.createTagSetIfNotExists(e.Key) 656 tv := ts.createTagValueIfNotExists(e.Value) 657 658 tv.deleted = true 659 660 ts.tagValues[string(e.Value)] = tv 661 mm.tagSet[string(e.Key)] = ts 662} 663 664func (f *LogFile) execSeriesEntry(e *LogEntry) { 665 var seriesKey []byte 666 if e.cached { 667 sz := tsdb.SeriesKeySize(e.name, e.tags) 668 if len(f.keyBuf) < sz { 669 f.keyBuf = make([]byte, 0, sz) 670 } 671 seriesKey = tsdb.AppendSeriesKey(f.keyBuf[:0], e.name, e.tags) 672 } else { 673 seriesKey = f.sfile.SeriesKey(e.SeriesID) 674 } 675 676 // Series keys can be removed if the series has been deleted from 677 // the entire database and the server is restarted. This would cause 678 // the log to replay its insert but the key cannot be found. 679 // 680 // https://github.com/influxdata/influxdb/issues/9444 681 if seriesKey == nil { 682 return 683 } 684 685 // Check if deleted. 686 deleted := e.Flag == LogEntrySeriesTombstoneFlag 687 688 // Read key size. 689 _, remainder := tsdb.ReadSeriesKeyLen(seriesKey) 690 691 // Read measurement name. 692 name, remainder := tsdb.ReadSeriesKeyMeasurement(remainder) 693 mm := f.createMeasurementIfNotExists(name) 694 mm.deleted = false 695 if !deleted { 696 mm.addSeriesID(e.SeriesID) 697 } else { 698 mm.removeSeriesID(e.SeriesID) 699 } 700 701 // Read tag count. 702 tagN, remainder := tsdb.ReadSeriesKeyTagN(remainder) 703 704 // Save tags. 705 var k, v []byte 706 for i := 0; i < tagN; i++ { 707 k, v, remainder = tsdb.ReadSeriesKeyTag(remainder) 708 ts := mm.createTagSetIfNotExists(k) 709 tv := ts.createTagValueIfNotExists(v) 710 711 // Add/remove a reference to the series on the tag value. 712 if !deleted { 713 tv.addSeriesID(e.SeriesID) 714 } else { 715 tv.removeSeriesID(e.SeriesID) 716 } 717 718 ts.tagValues[string(v)] = tv 719 mm.tagSet[string(k)] = ts 720 } 721 722 // Add/remove from appropriate series id sets. 723 if !deleted { 724 f.seriesIDSet.Add(e.SeriesID) 725 f.tombstoneSeriesIDSet.Remove(e.SeriesID) 726 } else { 727 f.seriesIDSet.Remove(e.SeriesID) 728 f.tombstoneSeriesIDSet.Add(e.SeriesID) 729 } 730} 731 732// SeriesIDIterator returns an iterator over all series in the log file. 733func (f *LogFile) SeriesIDIterator() tsdb.SeriesIDIterator { 734 f.mu.RLock() 735 defer f.mu.RUnlock() 736 737 ss := tsdb.NewSeriesIDSet() 738 allSeriesSets := make([]*tsdb.SeriesIDSet, 0, len(f.mms)) 739 740 for _, mm := range f.mms { 741 if mm.seriesSet != nil { 742 allSeriesSets = append(allSeriesSets, mm.seriesSet) 743 continue 744 } 745 746 // measurement is not using seriesSet to store series IDs. 747 mm.forEach(func(seriesID uint64) { 748 ss.AddNoLock(seriesID) 749 }) 750 } 751 752 // Fast merge all seriesSets. 753 if len(allSeriesSets) > 0 { 754 ss.Merge(allSeriesSets...) 755 } 756 757 return tsdb.NewSeriesIDSetIterator(ss) 758} 759 760// createMeasurementIfNotExists returns a measurement by name. 761func (f *LogFile) createMeasurementIfNotExists(name []byte) *logMeasurement { 762 mm := f.mms[string(name)] 763 if mm == nil { 764 mm = &logMeasurement{ 765 name: name, 766 tagSet: make(map[string]logTagKey), 767 series: make(map[uint64]struct{}), 768 } 769 f.mms[string(name)] = mm 770 } 771 return mm 772} 773 774// MeasurementIterator returns an iterator over all the measurements in the file. 775func (f *LogFile) MeasurementIterator() MeasurementIterator { 776 f.mu.RLock() 777 defer f.mu.RUnlock() 778 779 var itr logMeasurementIterator 780 for _, mm := range f.mms { 781 itr.mms = append(itr.mms, *mm) 782 } 783 sort.Sort(logMeasurementSlice(itr.mms)) 784 return &itr 785} 786 787// MeasurementSeriesIDIterator returns an iterator over all series for a measurement. 788func (f *LogFile) MeasurementSeriesIDIterator(name []byte) tsdb.SeriesIDIterator { 789 f.mu.RLock() 790 defer f.mu.RUnlock() 791 792 mm := f.mms[string(name)] 793 if mm == nil || mm.cardinality() == 0 { 794 return nil 795 } 796 return tsdb.NewSeriesIDSetIterator(mm.seriesIDSet()) 797} 798 799// CompactTo compacts the log file and writes it to w. 800func (f *LogFile) CompactTo(w io.Writer, m, k uint64, cancel <-chan struct{}) (n int64, err error) { 801 f.mu.RLock() 802 defer f.mu.RUnlock() 803 804 // Check for cancellation. 805 select { 806 case <-cancel: 807 return n, ErrCompactionInterrupted 808 default: 809 } 810 811 // Wrap in bufferred writer with a buffer equivalent to the LogFile size. 812 bw := bufio.NewWriterSize(w, indexFileBufferSize) // 128K 813 814 // Setup compaction offset tracking data. 815 var t IndexFileTrailer 816 info := newLogFileCompactInfo() 817 info.cancel = cancel 818 819 // Write magic number. 820 if err := writeTo(bw, []byte(FileSignature), &n); err != nil { 821 return n, err 822 } 823 824 // Retreve measurement names in order. 825 names := f.measurementNames() 826 827 // Flush buffer & mmap series block. 828 if err := bw.Flush(); err != nil { 829 return n, err 830 } 831 832 // Write tagset blocks in measurement order. 833 if err := f.writeTagsetsTo(bw, names, info, &n); err != nil { 834 return n, err 835 } 836 837 // Write measurement block. 838 t.MeasurementBlock.Offset = n 839 if err := f.writeMeasurementBlockTo(bw, names, info, &n); err != nil { 840 return n, err 841 } 842 t.MeasurementBlock.Size = n - t.MeasurementBlock.Offset 843 844 // Write series set. 845 t.SeriesIDSet.Offset = n 846 nn, err := f.seriesIDSet.WriteTo(bw) 847 if n += nn; err != nil { 848 return n, err 849 } 850 t.SeriesIDSet.Size = n - t.SeriesIDSet.Offset 851 852 // Write tombstone series set. 853 t.TombstoneSeriesIDSet.Offset = n 854 nn, err = f.tombstoneSeriesIDSet.WriteTo(bw) 855 if n += nn; err != nil { 856 return n, err 857 } 858 t.TombstoneSeriesIDSet.Size = n - t.TombstoneSeriesIDSet.Offset 859 860 // Build series sketches. 861 sSketch, sTSketch, err := f.seriesSketches() 862 if err != nil { 863 return n, err 864 } 865 866 // Write series sketches. 867 t.SeriesSketch.Offset = n 868 data, err := sSketch.MarshalBinary() 869 if err != nil { 870 return n, err 871 } else if _, err := bw.Write(data); err != nil { 872 return n, err 873 } 874 t.SeriesSketch.Size = int64(len(data)) 875 n += t.SeriesSketch.Size 876 877 t.TombstoneSeriesSketch.Offset = n 878 if data, err = sTSketch.MarshalBinary(); err != nil { 879 return n, err 880 } else if _, err := bw.Write(data); err != nil { 881 return n, err 882 } 883 t.TombstoneSeriesSketch.Size = int64(len(data)) 884 n += t.TombstoneSeriesSketch.Size 885 886 // Write trailer. 887 nn, err = t.WriteTo(bw) 888 n += nn 889 if err != nil { 890 return n, err 891 } 892 893 // Flush buffer. 894 if err := bw.Flush(); err != nil { 895 return n, err 896 } 897 898 return n, nil 899} 900 901func (f *LogFile) writeTagsetsTo(w io.Writer, names []string, info *logFileCompactInfo, n *int64) error { 902 for _, name := range names { 903 if err := f.writeTagsetTo(w, name, info, n); err != nil { 904 return err 905 } 906 } 907 return nil 908} 909 910// writeTagsetTo writes a single tagset to w and saves the tagset offset. 911func (f *LogFile) writeTagsetTo(w io.Writer, name string, info *logFileCompactInfo, n *int64) error { 912 mm := f.mms[name] 913 914 // Check for cancellation. 915 select { 916 case <-info.cancel: 917 return ErrCompactionInterrupted 918 default: 919 } 920 921 enc := NewTagBlockEncoder(w) 922 var valueN int 923 for _, k := range mm.keys() { 924 tag := mm.tagSet[k] 925 926 // Encode tag. Skip values if tag is deleted. 927 if err := enc.EncodeKey(tag.name, tag.deleted); err != nil { 928 return err 929 } else if tag.deleted { 930 continue 931 } 932 933 // Sort tag values. 934 values := make([]string, 0, len(tag.tagValues)) 935 for v := range tag.tagValues { 936 values = append(values, v) 937 } 938 sort.Strings(values) 939 940 // Add each value. 941 for _, v := range values { 942 value := tag.tagValues[v] 943 if err := enc.EncodeValue(value.name, value.deleted, value.seriesIDSet()); err != nil { 944 return err 945 } 946 947 // Check for cancellation periodically. 948 if valueN++; valueN%1000 == 0 { 949 select { 950 case <-info.cancel: 951 return ErrCompactionInterrupted 952 default: 953 } 954 } 955 } 956 } 957 958 // Save tagset offset to measurement. 959 offset := *n 960 961 // Flush tag block. 962 err := enc.Close() 963 *n += enc.N() 964 if err != nil { 965 return err 966 } 967 968 // Save tagset offset to measurement. 969 size := *n - offset 970 971 info.mms[name] = &logFileMeasurementCompactInfo{offset: offset, size: size} 972 973 return nil 974} 975 976func (f *LogFile) writeMeasurementBlockTo(w io.Writer, names []string, info *logFileCompactInfo, n *int64) error { 977 mw := NewMeasurementBlockWriter() 978 979 // Check for cancellation. 980 select { 981 case <-info.cancel: 982 return ErrCompactionInterrupted 983 default: 984 } 985 986 // Add measurement data. 987 for _, name := range names { 988 mm := f.mms[name] 989 mmInfo := info.mms[name] 990 assert(mmInfo != nil, "measurement info not found") 991 mw.Add(mm.name, mm.deleted, mmInfo.offset, mmInfo.size, mm.seriesIDs()) 992 } 993 994 // Flush data to writer. 995 nn, err := mw.WriteTo(w) 996 *n += nn 997 return err 998} 999 1000// logFileCompactInfo is a context object to track compaction position info. 1001type logFileCompactInfo struct { 1002 cancel <-chan struct{} 1003 mms map[string]*logFileMeasurementCompactInfo 1004} 1005 1006// newLogFileCompactInfo returns a new instance of logFileCompactInfo. 1007func newLogFileCompactInfo() *logFileCompactInfo { 1008 return &logFileCompactInfo{ 1009 mms: make(map[string]*logFileMeasurementCompactInfo), 1010 } 1011} 1012 1013type logFileMeasurementCompactInfo struct { 1014 offset int64 1015 size int64 1016} 1017 1018// MeasurementsSketches returns sketches for existing and tombstoned measurement names. 1019func (f *LogFile) MeasurementsSketches() (sketch, tSketch estimator.Sketch, err error) { 1020 f.mu.RLock() 1021 defer f.mu.RUnlock() 1022 return f.measurementsSketches() 1023} 1024 1025func (f *LogFile) measurementsSketches() (sketch, tSketch estimator.Sketch, err error) { 1026 sketch, tSketch = hll.NewDefaultPlus(), hll.NewDefaultPlus() 1027 for _, mm := range f.mms { 1028 if mm.deleted { 1029 tSketch.Add(mm.name) 1030 } else { 1031 sketch.Add(mm.name) 1032 } 1033 } 1034 return sketch, tSketch, nil 1035} 1036 1037// SeriesSketches returns sketches for existing and tombstoned series. 1038func (f *LogFile) SeriesSketches() (sketch, tSketch estimator.Sketch, err error) { 1039 f.mu.RLock() 1040 defer f.mu.RUnlock() 1041 return f.seriesSketches() 1042} 1043 1044func (f *LogFile) seriesSketches() (sketch, tSketch estimator.Sketch, err error) { 1045 sketch = hll.NewDefaultPlus() 1046 f.seriesIDSet.ForEach(func(id uint64) { 1047 name, keys := f.sfile.Series(id) 1048 sketch.Add(models.MakeKey(name, keys)) 1049 }) 1050 1051 tSketch = hll.NewDefaultPlus() 1052 f.tombstoneSeriesIDSet.ForEach(func(id uint64) { 1053 name, keys := f.sfile.Series(id) 1054 tSketch.Add(models.MakeKey(name, keys)) 1055 }) 1056 return sketch, tSketch, nil 1057} 1058 1059// LogEntry represents a single log entry in the write-ahead log. 1060type LogEntry struct { 1061 Flag byte // flag 1062 SeriesID uint64 // series id 1063 Name []byte // measurement name 1064 Key []byte // tag key 1065 Value []byte // tag value 1066 Checksum uint32 // checksum of flag/name/tags. 1067 Size int // total size of record, in bytes. 1068 1069 cached bool // Hint to LogFile that series data is already parsed 1070 name []byte // series naem, this is a cached copy of the parsed measurement name 1071 tags models.Tags // series tags, this is a cached copied of the parsed tags 1072 batchidx int // position of entry in batch. 1073} 1074 1075// UnmarshalBinary unmarshals data into e. 1076func (e *LogEntry) UnmarshalBinary(data []byte) error { 1077 var sz uint64 1078 var n int 1079 var seriesID uint64 1080 var err error 1081 1082 orig := data 1083 start := len(data) 1084 1085 // Parse flag data. 1086 if len(data) < 1 { 1087 return io.ErrShortBuffer 1088 } 1089 e.Flag, data = data[0], data[1:] 1090 1091 // Parse series id. 1092 if seriesID, n, err = uvarint(data); err != nil { 1093 return err 1094 } 1095 e.SeriesID, data = seriesID, data[n:] 1096 1097 // Parse name length. 1098 if sz, n, err = uvarint(data); err != nil { 1099 return err 1100 } 1101 1102 // Read name data. 1103 if len(data) < n+int(sz) { 1104 return io.ErrShortBuffer 1105 } 1106 e.Name, data = data[n:n+int(sz)], data[n+int(sz):] 1107 1108 // Parse key length. 1109 if sz, n, err = uvarint(data); err != nil { 1110 return err 1111 } 1112 1113 // Read key data. 1114 if len(data) < n+int(sz) { 1115 return io.ErrShortBuffer 1116 } 1117 e.Key, data = data[n:n+int(sz)], data[n+int(sz):] 1118 1119 // Parse value length. 1120 if sz, n, err = uvarint(data); err != nil { 1121 return err 1122 } 1123 1124 // Read value data. 1125 if len(data) < n+int(sz) { 1126 return io.ErrShortBuffer 1127 } 1128 e.Value, data = data[n:n+int(sz)], data[n+int(sz):] 1129 1130 // Compute checksum. 1131 chk := crc32.ChecksumIEEE(orig[:start-len(data)]) 1132 1133 // Parse checksum. 1134 if len(data) < 4 { 1135 return io.ErrShortBuffer 1136 } 1137 e.Checksum, data = binary.BigEndian.Uint32(data[:4]), data[4:] 1138 1139 // Verify checksum. 1140 if chk != e.Checksum { 1141 return ErrLogEntryChecksumMismatch 1142 } 1143 1144 // Save length of elem. 1145 e.Size = start - len(data) 1146 1147 return nil 1148} 1149 1150// appendLogEntry appends to dst and returns the new buffer. 1151// This updates the checksum on the entry. 1152func appendLogEntry(dst []byte, e *LogEntry) []byte { 1153 var buf [binary.MaxVarintLen64]byte 1154 start := len(dst) 1155 1156 // Append flag. 1157 dst = append(dst, e.Flag) 1158 1159 // Append series id. 1160 n := binary.PutUvarint(buf[:], uint64(e.SeriesID)) 1161 dst = append(dst, buf[:n]...) 1162 1163 // Append name. 1164 n = binary.PutUvarint(buf[:], uint64(len(e.Name))) 1165 dst = append(dst, buf[:n]...) 1166 dst = append(dst, e.Name...) 1167 1168 // Append key. 1169 n = binary.PutUvarint(buf[:], uint64(len(e.Key))) 1170 dst = append(dst, buf[:n]...) 1171 dst = append(dst, e.Key...) 1172 1173 // Append value. 1174 n = binary.PutUvarint(buf[:], uint64(len(e.Value))) 1175 dst = append(dst, buf[:n]...) 1176 dst = append(dst, e.Value...) 1177 1178 // Calculate checksum. 1179 e.Checksum = crc32.ChecksumIEEE(dst[start:]) 1180 1181 // Append checksum. 1182 binary.BigEndian.PutUint32(buf[:4], e.Checksum) 1183 dst = append(dst, buf[:4]...) 1184 1185 return dst 1186} 1187 1188// logMeasurements represents a map of measurement names to measurements. 1189type logMeasurements map[string]*logMeasurement 1190 1191// bytes estimates the memory footprint of this logMeasurements, in bytes. 1192func (mms *logMeasurements) bytes() int { 1193 var b int 1194 for k, v := range *mms { 1195 b += len(k) 1196 b += v.bytes() 1197 } 1198 b += int(unsafe.Sizeof(*mms)) 1199 return b 1200} 1201 1202type logMeasurement struct { 1203 name []byte 1204 tagSet map[string]logTagKey 1205 deleted bool 1206 series map[uint64]struct{} 1207 seriesSet *tsdb.SeriesIDSet 1208} 1209 1210// bytes estimates the memory footprint of this logMeasurement, in bytes. 1211func (m *logMeasurement) bytes() int { 1212 var b int 1213 b += len(m.name) 1214 for k, v := range m.tagSet { 1215 b += len(k) 1216 b += v.bytes() 1217 } 1218 b += (int(m.cardinality()) * 8) 1219 b += int(unsafe.Sizeof(*m)) 1220 return b 1221} 1222 1223func (m *logMeasurement) addSeriesID(x uint64) { 1224 if m.seriesSet != nil { 1225 m.seriesSet.AddNoLock(x) 1226 return 1227 } 1228 1229 m.series[x] = struct{}{} 1230 1231 // If the map is getting too big it can be converted into a roaring seriesSet. 1232 if len(m.series) > 25 { 1233 m.seriesSet = tsdb.NewSeriesIDSet() 1234 for id := range m.series { 1235 m.seriesSet.AddNoLock(id) 1236 } 1237 m.series = nil 1238 } 1239} 1240 1241func (m *logMeasurement) removeSeriesID(x uint64) { 1242 if m.seriesSet != nil { 1243 m.seriesSet.RemoveNoLock(x) 1244 return 1245 } 1246 delete(m.series, x) 1247} 1248 1249func (m *logMeasurement) cardinality() int64 { 1250 if m.seriesSet != nil { 1251 return int64(m.seriesSet.Cardinality()) 1252 } 1253 return int64(len(m.series)) 1254} 1255 1256// forEach applies fn to every series ID in the logMeasurement. 1257func (m *logMeasurement) forEach(fn func(uint64)) { 1258 if m.seriesSet != nil { 1259 m.seriesSet.ForEachNoLock(fn) 1260 return 1261 } 1262 1263 for seriesID := range m.series { 1264 fn(seriesID) 1265 } 1266} 1267 1268// seriesIDs returns a sorted set of seriesIDs. 1269func (m *logMeasurement) seriesIDs() []uint64 { 1270 a := make([]uint64, 0, m.cardinality()) 1271 if m.seriesSet != nil { 1272 m.seriesSet.ForEachNoLock(func(id uint64) { a = append(a, id) }) 1273 return a // IDs are already sorted. 1274 } 1275 1276 for seriesID := range m.series { 1277 a = append(a, seriesID) 1278 } 1279 sort.Sort(uint64Slice(a)) 1280 return a 1281} 1282 1283// seriesIDSet returns a copy of the logMeasurement's seriesSet, or creates a new 1284// one 1285func (m *logMeasurement) seriesIDSet() *tsdb.SeriesIDSet { 1286 if m.seriesSet != nil { 1287 return m.seriesSet.CloneNoLock() 1288 } 1289 1290 ss := tsdb.NewSeriesIDSet() 1291 for seriesID := range m.series { 1292 ss.AddNoLock(seriesID) 1293 } 1294 return ss 1295} 1296 1297func (m *logMeasurement) Name() []byte { return m.name } 1298func (m *logMeasurement) Deleted() bool { return m.deleted } 1299 1300func (m *logMeasurement) createTagSetIfNotExists(key []byte) logTagKey { 1301 ts, ok := m.tagSet[string(key)] 1302 if !ok { 1303 ts = logTagKey{name: key, tagValues: make(map[string]logTagValue)} 1304 } 1305 return ts 1306} 1307 1308// keys returns a sorted list of tag keys. 1309func (m *logMeasurement) keys() []string { 1310 a := make([]string, 0, len(m.tagSet)) 1311 for k := range m.tagSet { 1312 a = append(a, k) 1313 } 1314 sort.Strings(a) 1315 return a 1316} 1317 1318// logMeasurementSlice is a sortable list of log measurements. 1319type logMeasurementSlice []logMeasurement 1320 1321func (a logMeasurementSlice) Len() int { return len(a) } 1322func (a logMeasurementSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] } 1323func (a logMeasurementSlice) Less(i, j int) bool { return bytes.Compare(a[i].name, a[j].name) == -1 } 1324 1325// logMeasurementIterator represents an iterator over a slice of measurements. 1326type logMeasurementIterator struct { 1327 mms []logMeasurement 1328} 1329 1330// Next returns the next element in the iterator. 1331func (itr *logMeasurementIterator) Next() (e MeasurementElem) { 1332 if len(itr.mms) == 0 { 1333 return nil 1334 } 1335 e, itr.mms = &itr.mms[0], itr.mms[1:] 1336 return e 1337} 1338 1339type logTagKey struct { 1340 name []byte 1341 deleted bool 1342 tagValues map[string]logTagValue 1343} 1344 1345// bytes estimates the memory footprint of this logTagKey, in bytes. 1346func (tk *logTagKey) bytes() int { 1347 var b int 1348 b += len(tk.name) 1349 for k, v := range tk.tagValues { 1350 b += len(k) 1351 b += v.bytes() 1352 } 1353 b += int(unsafe.Sizeof(*tk)) 1354 return b 1355} 1356 1357func (tk *logTagKey) Key() []byte { return tk.name } 1358func (tk *logTagKey) Deleted() bool { return tk.deleted } 1359 1360func (tk *logTagKey) TagValueIterator() TagValueIterator { 1361 a := make([]logTagValue, 0, len(tk.tagValues)) 1362 for _, v := range tk.tagValues { 1363 a = append(a, v) 1364 } 1365 return newLogTagValueIterator(a) 1366} 1367 1368func (tk *logTagKey) createTagValueIfNotExists(value []byte) logTagValue { 1369 tv, ok := tk.tagValues[string(value)] 1370 if !ok { 1371 tv = logTagValue{name: value, series: make(map[uint64]struct{})} 1372 } 1373 return tv 1374} 1375 1376// logTagKey is a sortable list of log tag keys. 1377type logTagKeySlice []logTagKey 1378 1379func (a logTagKeySlice) Len() int { return len(a) } 1380func (a logTagKeySlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] } 1381func (a logTagKeySlice) Less(i, j int) bool { return bytes.Compare(a[i].name, a[j].name) == -1 } 1382 1383type logTagValue struct { 1384 name []byte 1385 deleted bool 1386 series map[uint64]struct{} 1387 seriesSet *tsdb.SeriesIDSet 1388} 1389 1390// bytes estimates the memory footprint of this logTagValue, in bytes. 1391func (tv *logTagValue) bytes() int { 1392 var b int 1393 b += len(tv.name) 1394 b += int(unsafe.Sizeof(*tv)) 1395 b += (int(tv.cardinality()) * 8) 1396 return b 1397} 1398 1399func (tv *logTagValue) addSeriesID(x uint64) { 1400 if tv.seriesSet != nil { 1401 tv.seriesSet.AddNoLock(x) 1402 return 1403 } 1404 1405 tv.series[x] = struct{}{} 1406 1407 // If the map is getting too big it can be converted into a roaring seriesSet. 1408 if len(tv.series) > 25 { 1409 tv.seriesSet = tsdb.NewSeriesIDSet() 1410 for id := range tv.series { 1411 tv.seriesSet.AddNoLock(id) 1412 } 1413 tv.series = nil 1414 } 1415} 1416 1417func (tv *logTagValue) removeSeriesID(x uint64) { 1418 if tv.seriesSet != nil { 1419 tv.seriesSet.RemoveNoLock(x) 1420 return 1421 } 1422 delete(tv.series, x) 1423} 1424 1425func (tv *logTagValue) cardinality() int64 { 1426 if tv.seriesSet != nil { 1427 return int64(tv.seriesSet.Cardinality()) 1428 } 1429 return int64(len(tv.series)) 1430} 1431 1432// seriesIDSet returns a copy of the logMeasurement's seriesSet, or creates a new 1433// one 1434func (tv *logTagValue) seriesIDSet() *tsdb.SeriesIDSet { 1435 if tv.seriesSet != nil { 1436 return tv.seriesSet.CloneNoLock() 1437 } 1438 1439 ss := tsdb.NewSeriesIDSet() 1440 for seriesID := range tv.series { 1441 ss.AddNoLock(seriesID) 1442 } 1443 return ss 1444} 1445 1446func (tv *logTagValue) Value() []byte { return tv.name } 1447func (tv *logTagValue) Deleted() bool { return tv.deleted } 1448 1449// logTagValue is a sortable list of log tag values. 1450type logTagValueSlice []logTagValue 1451 1452func (a logTagValueSlice) Len() int { return len(a) } 1453func (a logTagValueSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] } 1454func (a logTagValueSlice) Less(i, j int) bool { return bytes.Compare(a[i].name, a[j].name) == -1 } 1455 1456// logTagKeyIterator represents an iterator over a slice of tag keys. 1457type logTagKeyIterator struct { 1458 a []logTagKey 1459} 1460 1461// newLogTagKeyIterator returns a new instance of logTagKeyIterator. 1462func newLogTagKeyIterator(a []logTagKey) *logTagKeyIterator { 1463 sort.Sort(logTagKeySlice(a)) 1464 return &logTagKeyIterator{a: a} 1465} 1466 1467// Next returns the next element in the iterator. 1468func (itr *logTagKeyIterator) Next() (e TagKeyElem) { 1469 if len(itr.a) == 0 { 1470 return nil 1471 } 1472 e, itr.a = &itr.a[0], itr.a[1:] 1473 return e 1474} 1475 1476// logTagValueIterator represents an iterator over a slice of tag values. 1477type logTagValueIterator struct { 1478 a []logTagValue 1479} 1480 1481// newLogTagValueIterator returns a new instance of logTagValueIterator. 1482func newLogTagValueIterator(a []logTagValue) *logTagValueIterator { 1483 sort.Sort(logTagValueSlice(a)) 1484 return &logTagValueIterator{a: a} 1485} 1486 1487// Next returns the next element in the iterator. 1488func (itr *logTagValueIterator) Next() (e TagValueElem) { 1489 if len(itr.a) == 0 { 1490 return nil 1491 } 1492 e, itr.a = &itr.a[0], itr.a[1:] 1493 return e 1494} 1495 1496// FormatLogFileName generates a log filename for the given index. 1497func FormatLogFileName(id int) string { 1498 return fmt.Sprintf("L0-%08d%s", id, LogFileExt) 1499} 1500