1package tsi1 2 3import ( 4 "bufio" 5 "bytes" 6 "encoding/binary" 7 "errors" 8 "fmt" 9 "hash/crc32" 10 "io" 11 "os" 12 "sort" 13 "sync" 14 "time" 15 "unsafe" 16 17 "github.com/influxdata/influxdb/models" 18 "github.com/influxdata/influxdb/pkg/bloom" 19 "github.com/influxdata/influxdb/pkg/estimator" 20 "github.com/influxdata/influxdb/pkg/estimator/hll" 21 "github.com/influxdata/influxdb/pkg/mmap" 22 "github.com/influxdata/influxdb/tsdb" 23) 24 25// Log errors. 26var ( 27 ErrLogEntryChecksumMismatch = errors.New("log entry checksum mismatch") 28) 29 30// Log entry flag constants. 31const ( 32 LogEntrySeriesTombstoneFlag = 0x01 33 LogEntryMeasurementTombstoneFlag = 0x02 34 LogEntryTagKeyTombstoneFlag = 0x04 35 LogEntryTagValueTombstoneFlag = 0x08 36) 37 38// defaultLogFileBufferSize describes the size of the buffer that the LogFile's buffered 39// writer uses. If the LogFile does not have an explicit buffer size set then 40// this is the size of the buffer; it is equal to the default buffer size used 41// by a bufio.Writer. 42const defaultLogFileBufferSize = 4096 43 44// indexFileBufferSize is the buffer size used when compacting the LogFile down 45// into a .tsi file. 46const indexFileBufferSize = 1 << 17 // 128K 47 48// LogFile represents an on-disk write-ahead log file. 49type LogFile struct { 50 mu sync.RWMutex 51 wg sync.WaitGroup // ref count 52 id int // file sequence identifier 53 data []byte // mmap 54 file *os.File // writer 55 w *bufio.Writer // buffered writer 56 bufferSize int // The size of the buffer used by the buffered writer 57 nosync bool // Disables buffer flushing and file syncing. Useful for offline tooling. 58 buf []byte // marshaling buffer 59 keyBuf []byte 60 61 sfile *tsdb.SeriesFile // series lookup 62 size int64 // tracks current file size 63 modTime time.Time // tracks last time write occurred 64 65 // In-memory series existence/tombstone sets. 66 seriesIDSet, tombstoneSeriesIDSet *tsdb.SeriesIDSet 67 68 // In-memory index. 69 mms logMeasurements 70 71 // Filepath to the log file. 72 path string 73} 74 75// NewLogFile returns a new instance of LogFile. 76func NewLogFile(sfile *tsdb.SeriesFile, path string) *LogFile { 77 return &LogFile{ 78 sfile: sfile, 79 path: path, 80 mms: make(logMeasurements), 81 82 seriesIDSet: tsdb.NewSeriesIDSet(), 83 tombstoneSeriesIDSet: tsdb.NewSeriesIDSet(), 84 } 85} 86 87// bytes estimates the memory footprint of this LogFile, in bytes. 88func (f *LogFile) bytes() int { 89 var b int 90 b += 24 // mu RWMutex is 24 bytes 91 b += 16 // wg WaitGroup is 16 bytes 92 b += int(unsafe.Sizeof(f.id)) 93 // Do not include f.data because it is mmap'd 94 // TODO(jacobmarble): Uncomment when we are using go >= 1.10.0 95 //b += int(unsafe.Sizeof(f.w)) + f.w.Size() 96 b += int(unsafe.Sizeof(f.buf)) + len(f.buf) 97 b += int(unsafe.Sizeof(f.keyBuf)) + len(f.keyBuf) 98 // Do not count SeriesFile because it belongs to the code that constructed this Index. 99 b += int(unsafe.Sizeof(f.size)) 100 b += int(unsafe.Sizeof(f.modTime)) 101 b += int(unsafe.Sizeof(f.seriesIDSet)) + f.seriesIDSet.Bytes() 102 b += int(unsafe.Sizeof(f.tombstoneSeriesIDSet)) + f.tombstoneSeriesIDSet.Bytes() 103 b += int(unsafe.Sizeof(f.mms)) + f.mms.bytes() 104 b += int(unsafe.Sizeof(f.path)) + len(f.path) 105 return b 106} 107 108// Open reads the log from a file and validates all the checksums. 109func (f *LogFile) Open() error { 110 if err := f.open(); err != nil { 111 f.Close() 112 return err 113 } 114 return nil 115} 116 117func (f *LogFile) open() error { 118 f.id, _ = ParseFilename(f.path) 119 120 // Open file for appending. 121 file, err := os.OpenFile(f.Path(), os.O_WRONLY|os.O_CREATE, 0666) 122 if err != nil { 123 return err 124 } 125 f.file = file 126 127 if f.bufferSize == 0 { 128 f.bufferSize = defaultLogFileBufferSize 129 } 130 f.w = bufio.NewWriterSize(f.file, f.bufferSize) 131 132 // Finish opening if file is empty. 133 fi, err := file.Stat() 134 if err != nil { 135 return err 136 } else if fi.Size() == 0 { 137 return nil 138 } 139 f.size = fi.Size() 140 f.modTime = fi.ModTime() 141 142 // Open a read-only memory map of the existing data. 143 data, err := mmap.Map(f.Path(), 0) 144 if err != nil { 145 return err 146 } 147 f.data = data 148 149 // Read log entries from mmap. 150 var n int64 151 for buf := f.data; len(buf) > 0; { 152 // Read next entry. Truncate partial writes. 153 var e LogEntry 154 if err := e.UnmarshalBinary(buf); err == io.ErrShortBuffer || err == ErrLogEntryChecksumMismatch { 155 break 156 } else if err != nil { 157 return err 158 } 159 160 // Execute entry against in-memory index. 161 f.execEntry(&e) 162 163 // Move buffer forward. 164 n += int64(e.Size) 165 buf = buf[e.Size:] 166 } 167 168 // Move to the end of the file. 169 f.size = n 170 _, err = file.Seek(n, io.SeekStart) 171 return err 172} 173 174// Close shuts down the file handle and mmap. 175func (f *LogFile) Close() error { 176 // Wait until the file has no more references. 177 f.wg.Wait() 178 179 if f.w != nil { 180 f.w.Flush() 181 f.w = nil 182 } 183 184 if f.file != nil { 185 f.file.Close() 186 f.file = nil 187 } 188 189 if f.data != nil { 190 mmap.Unmap(f.data) 191 } 192 193 f.mms = make(logMeasurements) 194 return nil 195} 196 197// FlushAndSync flushes buffered data to disk and then fsyncs the underlying file. 198// If the LogFile has disabled flushing and syncing then FlushAndSync is a no-op. 199func (f *LogFile) FlushAndSync() error { 200 if f.nosync { 201 return nil 202 } 203 204 if f.w != nil { 205 if err := f.w.Flush(); err != nil { 206 return err 207 } 208 } 209 210 if f.file == nil { 211 return nil 212 } 213 return f.file.Sync() 214} 215 216// ID returns the file sequence identifier. 217func (f *LogFile) ID() int { return f.id } 218 219// Path returns the file path. 220func (f *LogFile) Path() string { return f.path } 221 222// SetPath sets the log file's path. 223func (f *LogFile) SetPath(path string) { f.path = path } 224 225// Level returns the log level of the file. 226func (f *LogFile) Level() int { return 0 } 227 228// Filter returns the bloom filter for the file. 229func (f *LogFile) Filter() *bloom.Filter { return nil } 230 231// Retain adds a reference count to the file. 232func (f *LogFile) Retain() { f.wg.Add(1) } 233 234// Release removes a reference count from the file. 235func (f *LogFile) Release() { f.wg.Done() } 236 237// Stat returns size and last modification time of the file. 238func (f *LogFile) Stat() (int64, time.Time) { 239 f.mu.RLock() 240 size, modTime := f.size, f.modTime 241 f.mu.RUnlock() 242 return size, modTime 243} 244 245// SeriesIDSet returns the series existence set. 246func (f *LogFile) SeriesIDSet() (*tsdb.SeriesIDSet, error) { 247 return f.seriesIDSet, nil 248} 249 250// TombstoneSeriesIDSet returns the series tombstone set. 251func (f *LogFile) TombstoneSeriesIDSet() (*tsdb.SeriesIDSet, error) { 252 return f.tombstoneSeriesIDSet, nil 253} 254 255// Size returns the size of the file, in bytes. 256func (f *LogFile) Size() int64 { 257 f.mu.RLock() 258 v := f.size 259 f.mu.RUnlock() 260 return v 261} 262 263// Measurement returns a measurement element. 264func (f *LogFile) Measurement(name []byte) MeasurementElem { 265 f.mu.RLock() 266 defer f.mu.RUnlock() 267 268 mm, ok := f.mms[string(name)] 269 if !ok { 270 return nil 271 } 272 273 return mm 274} 275 276func (f *LogFile) MeasurementHasSeries(ss *tsdb.SeriesIDSet, name []byte) bool { 277 f.mu.RLock() 278 defer f.mu.RUnlock() 279 280 mm, ok := f.mms[string(name)] 281 if !ok { 282 return false 283 } 284 285 // TODO(edd): if mm is using a seriesSet then this could be changed to do a fast intersection. 286 for _, id := range mm.seriesIDs() { 287 if ss.Contains(id) { 288 return true 289 } 290 } 291 return false 292} 293 294// MeasurementNames returns an ordered list of measurement names. 295func (f *LogFile) MeasurementNames() []string { 296 f.mu.RLock() 297 defer f.mu.RUnlock() 298 return f.measurementNames() 299} 300 301func (f *LogFile) measurementNames() []string { 302 a := make([]string, 0, len(f.mms)) 303 for name := range f.mms { 304 a = append(a, name) 305 } 306 sort.Strings(a) 307 return a 308} 309 310// DeleteMeasurement adds a tombstone for a measurement to the log file. 311func (f *LogFile) DeleteMeasurement(name []byte) error { 312 f.mu.Lock() 313 defer f.mu.Unlock() 314 315 e := LogEntry{Flag: LogEntryMeasurementTombstoneFlag, Name: name} 316 if err := f.appendEntry(&e); err != nil { 317 return err 318 } 319 f.execEntry(&e) 320 321 // Flush buffer and sync to disk. 322 return f.FlushAndSync() 323} 324 325// TagKeySeriesIDIterator returns a series iterator for a tag key. 326func (f *LogFile) TagKeySeriesIDIterator(name, key []byte) (tsdb.SeriesIDIterator, error) { 327 f.mu.RLock() 328 defer f.mu.RUnlock() 329 330 mm, ok := f.mms[string(name)] 331 if !ok { 332 return nil, nil 333 } 334 335 tk, ok := mm.tagSet[string(key)] 336 if !ok { 337 return nil, nil 338 } 339 340 // Combine iterators across all tag keys. 341 itrs := make([]tsdb.SeriesIDIterator, 0, len(tk.tagValues)) 342 for _, tv := range tk.tagValues { 343 if tv.cardinality() == 0 { 344 continue 345 } 346 if itr := tsdb.NewSeriesIDSetIterator(tv.seriesIDSet()); itr != nil { 347 itrs = append(itrs, itr) 348 } 349 } 350 351 return tsdb.MergeSeriesIDIterators(itrs...), nil 352} 353 354// TagKeyIterator returns a value iterator for a measurement. 355func (f *LogFile) TagKeyIterator(name []byte) TagKeyIterator { 356 f.mu.RLock() 357 defer f.mu.RUnlock() 358 359 mm, ok := f.mms[string(name)] 360 if !ok { 361 return nil 362 } 363 364 a := make([]logTagKey, 0, len(mm.tagSet)) 365 for _, k := range mm.tagSet { 366 a = append(a, k) 367 } 368 return newLogTagKeyIterator(a) 369} 370 371// TagKey returns a tag key element. 372func (f *LogFile) TagKey(name, key []byte) TagKeyElem { 373 f.mu.RLock() 374 defer f.mu.RUnlock() 375 376 mm, ok := f.mms[string(name)] 377 if !ok { 378 return nil 379 } 380 381 tk, ok := mm.tagSet[string(key)] 382 if !ok { 383 return nil 384 } 385 386 return &tk 387} 388 389// TagValue returns a tag value element. 390func (f *LogFile) TagValue(name, key, value []byte) TagValueElem { 391 f.mu.RLock() 392 defer f.mu.RUnlock() 393 394 mm, ok := f.mms[string(name)] 395 if !ok { 396 return nil 397 } 398 399 tk, ok := mm.tagSet[string(key)] 400 if !ok { 401 return nil 402 } 403 404 tv, ok := tk.tagValues[string(value)] 405 if !ok { 406 return nil 407 } 408 409 return &tv 410} 411 412// TagValueIterator returns a value iterator for a tag key. 413func (f *LogFile) TagValueIterator(name, key []byte) TagValueIterator { 414 f.mu.RLock() 415 defer f.mu.RUnlock() 416 417 mm, ok := f.mms[string(name)] 418 if !ok { 419 return nil 420 } 421 422 tk, ok := mm.tagSet[string(key)] 423 if !ok { 424 return nil 425 } 426 return tk.TagValueIterator() 427} 428 429// DeleteTagKey adds a tombstone for a tag key to the log file. 430func (f *LogFile) DeleteTagKey(name, key []byte) error { 431 f.mu.Lock() 432 defer f.mu.Unlock() 433 434 e := LogEntry{Flag: LogEntryTagKeyTombstoneFlag, Name: name, Key: key} 435 if err := f.appendEntry(&e); err != nil { 436 return err 437 } 438 f.execEntry(&e) 439 440 // Flush buffer and sync to disk. 441 return f.FlushAndSync() 442} 443 444// TagValueSeriesIDSet returns a series iterator for a tag value. 445func (f *LogFile) TagValueSeriesIDSet(name, key, value []byte) (*tsdb.SeriesIDSet, error) { 446 f.mu.RLock() 447 defer f.mu.RUnlock() 448 449 mm, ok := f.mms[string(name)] 450 if !ok { 451 return nil, nil 452 } 453 454 tk, ok := mm.tagSet[string(key)] 455 if !ok { 456 return nil, nil 457 } 458 459 tv, ok := tk.tagValues[string(value)] 460 if !ok { 461 return nil, nil 462 } else if tv.cardinality() == 0 { 463 return nil, nil 464 } 465 466 return tv.seriesIDSet(), nil 467} 468 469// MeasurementN returns the total number of measurements. 470func (f *LogFile) MeasurementN() (n uint64) { 471 f.mu.RLock() 472 defer f.mu.RUnlock() 473 return uint64(len(f.mms)) 474} 475 476// TagKeyN returns the total number of keys. 477func (f *LogFile) TagKeyN() (n uint64) { 478 f.mu.RLock() 479 defer f.mu.RUnlock() 480 for _, mm := range f.mms { 481 n += uint64(len(mm.tagSet)) 482 } 483 return n 484} 485 486// TagValueN returns the total number of values. 487func (f *LogFile) TagValueN() (n uint64) { 488 f.mu.RLock() 489 defer f.mu.RUnlock() 490 for _, mm := range f.mms { 491 for _, k := range mm.tagSet { 492 n += uint64(len(k.tagValues)) 493 } 494 } 495 return n 496} 497 498// DeleteTagValue adds a tombstone for a tag value to the log file. 499func (f *LogFile) DeleteTagValue(name, key, value []byte) error { 500 f.mu.Lock() 501 defer f.mu.Unlock() 502 503 e := LogEntry{Flag: LogEntryTagValueTombstoneFlag, Name: name, Key: key, Value: value} 504 if err := f.appendEntry(&e); err != nil { 505 return err 506 } 507 f.execEntry(&e) 508 509 // Flush buffer and sync to disk. 510 return f.FlushAndSync() 511} 512 513// AddSeriesList adds a list of series to the log file in bulk. 514func (f *LogFile) AddSeriesList(seriesSet *tsdb.SeriesIDSet, names [][]byte, tagsSlice []models.Tags, tracker tsdb.StatsTracker) ([]uint64, error) { 515 seriesIDs, err := f.sfile.CreateSeriesListIfNotExists(names, tagsSlice, tracker) 516 if err != nil { 517 return nil, err 518 } 519 520 var writeRequired bool 521 entries := make([]LogEntry, 0, len(names)) 522 seriesSet.RLock() 523 for i := range names { 524 if seriesSet.ContainsNoLock(seriesIDs[i]) { 525 // We don't need to allocate anything for this series. 526 seriesIDs[i] = 0 527 continue 528 } 529 writeRequired = true 530 entries = append(entries, LogEntry{SeriesID: seriesIDs[i], name: names[i], tags: tagsSlice[i], cached: true, batchidx: i}) 531 } 532 seriesSet.RUnlock() 533 534 // Exit if all series already exist. 535 if !writeRequired { 536 return seriesIDs, nil 537 } 538 539 f.mu.Lock() 540 defer f.mu.Unlock() 541 542 seriesSet.Lock() 543 defer seriesSet.Unlock() 544 545 for i := range entries { // NB - this doesn't evaluate all series ids returned from series file. 546 entry := &entries[i] 547 if seriesSet.ContainsNoLock(entry.SeriesID) { 548 // We don't need to allocate anything for this series. 549 seriesIDs[entry.batchidx] = 0 550 continue 551 } 552 if err := f.appendEntry(entry); err != nil { 553 return nil, err 554 } 555 f.execEntry(entry) 556 seriesSet.AddNoLock(entry.SeriesID) 557 } 558 559 // Flush buffer and sync to disk. 560 if err := f.FlushAndSync(); err != nil { 561 return nil, err 562 } 563 return seriesIDs, nil 564} 565 566// DeleteSeriesID adds a tombstone for a series id. 567func (f *LogFile) DeleteSeriesID(id uint64) error { 568 f.mu.Lock() 569 defer f.mu.Unlock() 570 571 e := LogEntry{Flag: LogEntrySeriesTombstoneFlag, SeriesID: id} 572 if err := f.appendEntry(&e); err != nil { 573 return err 574 } 575 f.execEntry(&e) 576 577 // Flush buffer and sync to disk. 578 return f.FlushAndSync() 579} 580 581// DeleteSeriesIDList adds a tombstone for seriesIDList 582func (f *LogFile) DeleteSeriesIDList(ids []uint64) error { 583 f.mu.Lock() 584 defer f.mu.Unlock() 585 586 for _, id := range ids { 587 e := LogEntry{Flag: LogEntrySeriesTombstoneFlag, SeriesID: id} 588 if err := f.appendEntry(&e); err != nil { 589 return err 590 } 591 f.execEntry(&e) 592 } 593 594 // Flush buffer and sync to disk. 595 return f.FlushAndSync() 596} 597 598// SeriesN returns the total number of series in the file. 599func (f *LogFile) SeriesN() (n uint64) { 600 f.mu.RLock() 601 defer f.mu.RUnlock() 602 603 for _, mm := range f.mms { 604 n += uint64(mm.cardinality()) 605 } 606 return n 607} 608 609// appendEntry adds a log entry to the end of the file. 610func (f *LogFile) appendEntry(e *LogEntry) error { 611 // Marshal entry to the local buffer. 612 f.buf = appendLogEntry(f.buf[:0], e) 613 614 // Save the size of the record. 615 e.Size = len(f.buf) 616 617 // Write record to file. 618 n, err := f.w.Write(f.buf) 619 if err != nil { 620 // Move position backwards over partial entry. 621 // Log should be reopened if seeking cannot be completed. 622 if n > 0 { 623 f.w.Reset(f.file) 624 if _, err := f.file.Seek(int64(-n), io.SeekCurrent); err != nil { 625 f.Close() 626 } 627 } 628 return err 629 } 630 631 // Update in-memory file size & modification time. 632 f.size += int64(n) 633 f.modTime = time.Now() 634 635 return nil 636} 637 638// execEntry executes a log entry against the in-memory index. 639// This is done after appending and on replay of the log. 640func (f *LogFile) execEntry(e *LogEntry) { 641 switch e.Flag { 642 case LogEntryMeasurementTombstoneFlag: 643 f.execDeleteMeasurementEntry(e) 644 case LogEntryTagKeyTombstoneFlag: 645 f.execDeleteTagKeyEntry(e) 646 case LogEntryTagValueTombstoneFlag: 647 f.execDeleteTagValueEntry(e) 648 default: 649 f.execSeriesEntry(e) 650 } 651} 652 653func (f *LogFile) execDeleteMeasurementEntry(e *LogEntry) { 654 mm := f.createMeasurementIfNotExists(e.Name) 655 mm.deleted = true 656 mm.tagSet = make(map[string]logTagKey) 657 mm.series = make(map[uint64]struct{}) 658 mm.seriesSet = nil 659} 660 661func (f *LogFile) execDeleteTagKeyEntry(e *LogEntry) { 662 mm := f.createMeasurementIfNotExists(e.Name) 663 ts := mm.createTagSetIfNotExists(e.Key) 664 665 ts.deleted = true 666 667 mm.tagSet[string(e.Key)] = ts 668} 669 670func (f *LogFile) execDeleteTagValueEntry(e *LogEntry) { 671 mm := f.createMeasurementIfNotExists(e.Name) 672 ts := mm.createTagSetIfNotExists(e.Key) 673 tv := ts.createTagValueIfNotExists(e.Value) 674 675 tv.deleted = true 676 677 ts.tagValues[string(e.Value)] = tv 678 mm.tagSet[string(e.Key)] = ts 679} 680 681func (f *LogFile) execSeriesEntry(e *LogEntry) { 682 var seriesKey []byte 683 if e.cached { 684 sz := tsdb.SeriesKeySize(e.name, e.tags) 685 if len(f.keyBuf) < sz { 686 f.keyBuf = make([]byte, 0, sz) 687 } 688 seriesKey = tsdb.AppendSeriesKey(f.keyBuf[:0], e.name, e.tags) 689 } else { 690 seriesKey = f.sfile.SeriesKey(e.SeriesID) 691 } 692 693 // Series keys can be removed if the series has been deleted from 694 // the entire database and the server is restarted. This would cause 695 // the log to replay its insert but the key cannot be found. 696 // 697 // https://github.com/influxdata/influxdb/issues/9444 698 if seriesKey == nil { 699 return 700 } 701 702 // Check if deleted. 703 deleted := e.Flag == LogEntrySeriesTombstoneFlag 704 705 // Read key size. 706 _, remainder := tsdb.ReadSeriesKeyLen(seriesKey) 707 708 // Read measurement name. 709 name, remainder := tsdb.ReadSeriesKeyMeasurement(remainder) 710 mm := f.createMeasurementIfNotExists(name) 711 mm.deleted = false 712 if !deleted { 713 mm.addSeriesID(e.SeriesID) 714 } else { 715 mm.removeSeriesID(e.SeriesID) 716 } 717 718 // Read tag count. 719 tagN, remainder := tsdb.ReadSeriesKeyTagN(remainder) 720 721 // Save tags. 722 var k, v []byte 723 for i := 0; i < tagN; i++ { 724 k, v, remainder = tsdb.ReadSeriesKeyTag(remainder) 725 ts := mm.createTagSetIfNotExists(k) 726 tv := ts.createTagValueIfNotExists(v) 727 728 // Add/remove a reference to the series on the tag value. 729 if !deleted { 730 tv.addSeriesID(e.SeriesID) 731 } else { 732 tv.removeSeriesID(e.SeriesID) 733 } 734 735 ts.tagValues[string(v)] = tv 736 mm.tagSet[string(k)] = ts 737 } 738 739 // Add/remove from appropriate series id sets. 740 if !deleted { 741 f.seriesIDSet.Add(e.SeriesID) 742 f.tombstoneSeriesIDSet.Remove(e.SeriesID) 743 } else { 744 f.seriesIDSet.Remove(e.SeriesID) 745 f.tombstoneSeriesIDSet.Add(e.SeriesID) 746 } 747} 748 749// SeriesIDIterator returns an iterator over all series in the log file. 750func (f *LogFile) SeriesIDIterator() tsdb.SeriesIDIterator { 751 f.mu.RLock() 752 defer f.mu.RUnlock() 753 754 ss := tsdb.NewSeriesIDSet() 755 allSeriesSets := make([]*tsdb.SeriesIDSet, 0, len(f.mms)) 756 757 for _, mm := range f.mms { 758 if mm.seriesSet != nil { 759 allSeriesSets = append(allSeriesSets, mm.seriesSet) 760 continue 761 } 762 763 // measurement is not using seriesSet to store series IDs. 764 mm.forEach(func(seriesID uint64) { 765 ss.AddNoLock(seriesID) 766 }) 767 } 768 769 // Fast merge all seriesSets. 770 if len(allSeriesSets) > 0 { 771 ss.Merge(allSeriesSets...) 772 } 773 774 return tsdb.NewSeriesIDSetIterator(ss) 775} 776 777// createMeasurementIfNotExists returns a measurement by name. 778func (f *LogFile) createMeasurementIfNotExists(name []byte) *logMeasurement { 779 mm := f.mms[string(name)] 780 if mm == nil { 781 mm = &logMeasurement{ 782 name: name, 783 tagSet: make(map[string]logTagKey), 784 series: make(map[uint64]struct{}), 785 } 786 f.mms[string(name)] = mm 787 } 788 return mm 789} 790 791// MeasurementIterator returns an iterator over all the measurements in the file. 792func (f *LogFile) MeasurementIterator() MeasurementIterator { 793 f.mu.RLock() 794 defer f.mu.RUnlock() 795 796 var itr logMeasurementIterator 797 for _, mm := range f.mms { 798 itr.mms = append(itr.mms, *mm) 799 } 800 sort.Sort(logMeasurementSlice(itr.mms)) 801 return &itr 802} 803 804// MeasurementSeriesIDIterator returns an iterator over all series for a measurement. 805func (f *LogFile) MeasurementSeriesIDIterator(name []byte) tsdb.SeriesIDIterator { 806 f.mu.RLock() 807 defer f.mu.RUnlock() 808 809 mm := f.mms[string(name)] 810 if mm == nil || mm.cardinality() == 0 { 811 return nil 812 } 813 return tsdb.NewSeriesIDSetIterator(mm.seriesIDSet()) 814} 815 816// CompactTo compacts the log file and writes it to w. 817func (f *LogFile) CompactTo(w io.Writer, m, k uint64, cancel <-chan struct{}) (n int64, err error) { 818 f.mu.RLock() 819 defer f.mu.RUnlock() 820 821 // Check for cancellation. 822 select { 823 case <-cancel: 824 return n, ErrCompactionInterrupted 825 default: 826 } 827 828 // Wrap in bufferred writer with a buffer equivalent to the LogFile size. 829 bw := bufio.NewWriterSize(w, indexFileBufferSize) // 128K 830 831 // Setup compaction offset tracking data. 832 var t IndexFileTrailer 833 info := newLogFileCompactInfo() 834 info.cancel = cancel 835 836 // Write magic number. 837 if err := writeTo(bw, []byte(FileSignature), &n); err != nil { 838 return n, err 839 } 840 841 // Retreve measurement names in order. 842 names := f.measurementNames() 843 844 // Flush buffer & mmap series block. 845 if err := bw.Flush(); err != nil { 846 return n, err 847 } 848 849 // Write tagset blocks in measurement order. 850 if err := f.writeTagsetsTo(bw, names, info, &n); err != nil { 851 return n, err 852 } 853 854 // Write measurement block. 855 t.MeasurementBlock.Offset = n 856 if err := f.writeMeasurementBlockTo(bw, names, info, &n); err != nil { 857 return n, err 858 } 859 t.MeasurementBlock.Size = n - t.MeasurementBlock.Offset 860 861 // Write series set. 862 t.SeriesIDSet.Offset = n 863 nn, err := f.seriesIDSet.WriteTo(bw) 864 if n += nn; err != nil { 865 return n, err 866 } 867 t.SeriesIDSet.Size = n - t.SeriesIDSet.Offset 868 869 // Write tombstone series set. 870 t.TombstoneSeriesIDSet.Offset = n 871 nn, err = f.tombstoneSeriesIDSet.WriteTo(bw) 872 if n += nn; err != nil { 873 return n, err 874 } 875 t.TombstoneSeriesIDSet.Size = n - t.TombstoneSeriesIDSet.Offset 876 877 // Build series sketches. 878 sSketch, sTSketch, err := f.seriesSketches() 879 if err != nil { 880 return n, err 881 } 882 883 // Write series sketches. 884 t.SeriesSketch.Offset = n 885 data, err := sSketch.MarshalBinary() 886 if err != nil { 887 return n, err 888 } else if _, err := bw.Write(data); err != nil { 889 return n, err 890 } 891 t.SeriesSketch.Size = int64(len(data)) 892 n += t.SeriesSketch.Size 893 894 t.TombstoneSeriesSketch.Offset = n 895 if data, err = sTSketch.MarshalBinary(); err != nil { 896 return n, err 897 } else if _, err := bw.Write(data); err != nil { 898 return n, err 899 } 900 t.TombstoneSeriesSketch.Size = int64(len(data)) 901 n += t.TombstoneSeriesSketch.Size 902 903 // Write trailer. 904 nn, err = t.WriteTo(bw) 905 n += nn 906 if err != nil { 907 return n, err 908 } 909 910 // Flush buffer. 911 if err := bw.Flush(); err != nil { 912 return n, err 913 } 914 915 return n, nil 916} 917 918func (f *LogFile) writeTagsetsTo(w io.Writer, names []string, info *logFileCompactInfo, n *int64) error { 919 for _, name := range names { 920 if err := f.writeTagsetTo(w, name, info, n); err != nil { 921 return err 922 } 923 } 924 return nil 925} 926 927// writeTagsetTo writes a single tagset to w and saves the tagset offset. 928func (f *LogFile) writeTagsetTo(w io.Writer, name string, info *logFileCompactInfo, n *int64) error { 929 mm := f.mms[name] 930 931 // Check for cancellation. 932 select { 933 case <-info.cancel: 934 return ErrCompactionInterrupted 935 default: 936 } 937 938 enc := NewTagBlockEncoder(w) 939 var valueN int 940 for _, k := range mm.keys() { 941 tag := mm.tagSet[k] 942 943 // Encode tag. Skip values if tag is deleted. 944 if err := enc.EncodeKey(tag.name, tag.deleted); err != nil { 945 return err 946 } else if tag.deleted { 947 continue 948 } 949 950 // Sort tag values. 951 values := make([]string, 0, len(tag.tagValues)) 952 for v := range tag.tagValues { 953 values = append(values, v) 954 } 955 sort.Strings(values) 956 957 // Add each value. 958 for _, v := range values { 959 value := tag.tagValues[v] 960 if err := enc.EncodeValue(value.name, value.deleted, value.seriesIDSet()); err != nil { 961 return err 962 } 963 964 // Check for cancellation periodically. 965 if valueN++; valueN%1000 == 0 { 966 select { 967 case <-info.cancel: 968 return ErrCompactionInterrupted 969 default: 970 } 971 } 972 } 973 } 974 975 // Save tagset offset to measurement. 976 offset := *n 977 978 // Flush tag block. 979 err := enc.Close() 980 *n += enc.N() 981 if err != nil { 982 return err 983 } 984 985 // Save tagset offset to measurement. 986 size := *n - offset 987 988 info.mms[name] = &logFileMeasurementCompactInfo{offset: offset, size: size} 989 990 return nil 991} 992 993func (f *LogFile) writeMeasurementBlockTo(w io.Writer, names []string, info *logFileCompactInfo, n *int64) error { 994 mw := NewMeasurementBlockWriter() 995 996 // Check for cancellation. 997 select { 998 case <-info.cancel: 999 return ErrCompactionInterrupted 1000 default: 1001 } 1002 1003 // Add measurement data. 1004 for _, name := range names { 1005 mm := f.mms[name] 1006 mmInfo := info.mms[name] 1007 assert(mmInfo != nil, "measurement info not found") 1008 mw.Add(mm.name, mm.deleted, mmInfo.offset, mmInfo.size, mm.seriesIDs()) 1009 } 1010 1011 // Flush data to writer. 1012 nn, err := mw.WriteTo(w) 1013 *n += nn 1014 return err 1015} 1016 1017// logFileCompactInfo is a context object to track compaction position info. 1018type logFileCompactInfo struct { 1019 cancel <-chan struct{} 1020 mms map[string]*logFileMeasurementCompactInfo 1021} 1022 1023// newLogFileCompactInfo returns a new instance of logFileCompactInfo. 1024func newLogFileCompactInfo() *logFileCompactInfo { 1025 return &logFileCompactInfo{ 1026 mms: make(map[string]*logFileMeasurementCompactInfo), 1027 } 1028} 1029 1030type logFileMeasurementCompactInfo struct { 1031 offset int64 1032 size int64 1033} 1034 1035// MeasurementsSketches returns sketches for existing and tombstoned measurement names. 1036func (f *LogFile) MeasurementsSketches() (sketch, tSketch estimator.Sketch, err error) { 1037 f.mu.RLock() 1038 defer f.mu.RUnlock() 1039 return f.measurementsSketches() 1040} 1041 1042func (f *LogFile) measurementsSketches() (sketch, tSketch estimator.Sketch, err error) { 1043 sketch, tSketch = hll.NewDefaultPlus(), hll.NewDefaultPlus() 1044 for _, mm := range f.mms { 1045 if mm.deleted { 1046 tSketch.Add(mm.name) 1047 } else { 1048 sketch.Add(mm.name) 1049 } 1050 } 1051 return sketch, tSketch, nil 1052} 1053 1054// SeriesSketches returns sketches for existing and tombstoned series. 1055func (f *LogFile) SeriesSketches() (sketch, tSketch estimator.Sketch, err error) { 1056 f.mu.RLock() 1057 defer f.mu.RUnlock() 1058 return f.seriesSketches() 1059} 1060 1061func (f *LogFile) seriesSketches() (sketch, tSketch estimator.Sketch, err error) { 1062 sketch = hll.NewDefaultPlus() 1063 f.seriesIDSet.ForEach(func(id uint64) { 1064 name, keys := f.sfile.Series(id) 1065 sketch.Add(models.MakeKey(name, keys)) 1066 }) 1067 1068 tSketch = hll.NewDefaultPlus() 1069 f.tombstoneSeriesIDSet.ForEach(func(id uint64) { 1070 name, keys := f.sfile.Series(id) 1071 tSketch.Add(models.MakeKey(name, keys)) 1072 }) 1073 return sketch, tSketch, nil 1074} 1075 1076func (f *LogFile) ExecEntries(entries []LogEntry) error { 1077 f.mu.Lock() 1078 defer f.mu.Unlock() 1079 for i := range entries { 1080 entry := &entries[i] 1081 if err := f.appendEntry(entry); err != nil { 1082 return err 1083 } 1084 f.execEntry(entry) 1085 } 1086 return nil 1087} 1088 1089func (f *LogFile) Writes(entries []LogEntry) error { 1090 if err := f.ExecEntries(entries); err != nil { 1091 return err 1092 } 1093 1094 // Flush buffer and sync to disk. 1095 return f.FlushAndSync() 1096} 1097 1098// LogEntry represents a single log entry in the write-ahead log. 1099type LogEntry struct { 1100 Flag byte // flag 1101 SeriesID uint64 // series id 1102 Name []byte // measurement name 1103 Key []byte // tag key 1104 Value []byte // tag value 1105 Checksum uint32 // checksum of flag/name/tags. 1106 Size int // total size of record, in bytes. 1107 1108 cached bool // Hint to LogFile that series data is already parsed 1109 name []byte // series naem, this is a cached copy of the parsed measurement name 1110 tags models.Tags // series tags, this is a cached copied of the parsed tags 1111 batchidx int // position of entry in batch. 1112} 1113 1114// UnmarshalBinary unmarshals data into e. 1115func (e *LogEntry) UnmarshalBinary(data []byte) error { 1116 var sz uint64 1117 var n int 1118 var seriesID uint64 1119 var err error 1120 1121 orig := data 1122 start := len(data) 1123 1124 // Parse flag data. 1125 if len(data) < 1 { 1126 return io.ErrShortBuffer 1127 } 1128 e.Flag, data = data[0], data[1:] 1129 1130 // Parse series id. 1131 if seriesID, n, err = uvarint(data); err != nil { 1132 return err 1133 } 1134 e.SeriesID, data = seriesID, data[n:] 1135 1136 // Parse name length. 1137 if sz, n, err = uvarint(data); err != nil { 1138 return err 1139 } 1140 1141 // Read name data. 1142 if len(data) < n+int(sz) { 1143 return io.ErrShortBuffer 1144 } 1145 e.Name, data = data[n:n+int(sz)], data[n+int(sz):] 1146 1147 // Parse key length. 1148 if sz, n, err = uvarint(data); err != nil { 1149 return err 1150 } 1151 1152 // Read key data. 1153 if len(data) < n+int(sz) { 1154 return io.ErrShortBuffer 1155 } 1156 e.Key, data = data[n:n+int(sz)], data[n+int(sz):] 1157 1158 // Parse value length. 1159 if sz, n, err = uvarint(data); err != nil { 1160 return err 1161 } 1162 1163 // Read value data. 1164 if len(data) < n+int(sz) { 1165 return io.ErrShortBuffer 1166 } 1167 e.Value, data = data[n:n+int(sz)], data[n+int(sz):] 1168 1169 // Compute checksum. 1170 chk := crc32.ChecksumIEEE(orig[:start-len(data)]) 1171 1172 // Parse checksum. 1173 if len(data) < 4 { 1174 return io.ErrShortBuffer 1175 } 1176 e.Checksum, data = binary.BigEndian.Uint32(data[:4]), data[4:] 1177 1178 // Verify checksum. 1179 if chk != e.Checksum { 1180 return ErrLogEntryChecksumMismatch 1181 } 1182 1183 // Save length of elem. 1184 e.Size = start - len(data) 1185 1186 return nil 1187} 1188 1189// appendLogEntry appends to dst and returns the new buffer. 1190// This updates the checksum on the entry. 1191func appendLogEntry(dst []byte, e *LogEntry) []byte { 1192 var buf [binary.MaxVarintLen64]byte 1193 start := len(dst) 1194 1195 // Append flag. 1196 dst = append(dst, e.Flag) 1197 1198 // Append series id. 1199 n := binary.PutUvarint(buf[:], uint64(e.SeriesID)) 1200 dst = append(dst, buf[:n]...) 1201 1202 // Append name. 1203 n = binary.PutUvarint(buf[:], uint64(len(e.Name))) 1204 dst = append(dst, buf[:n]...) 1205 dst = append(dst, e.Name...) 1206 1207 // Append key. 1208 n = binary.PutUvarint(buf[:], uint64(len(e.Key))) 1209 dst = append(dst, buf[:n]...) 1210 dst = append(dst, e.Key...) 1211 1212 // Append value. 1213 n = binary.PutUvarint(buf[:], uint64(len(e.Value))) 1214 dst = append(dst, buf[:n]...) 1215 dst = append(dst, e.Value...) 1216 1217 // Calculate checksum. 1218 e.Checksum = crc32.ChecksumIEEE(dst[start:]) 1219 1220 // Append checksum. 1221 binary.BigEndian.PutUint32(buf[:4], e.Checksum) 1222 dst = append(dst, buf[:4]...) 1223 1224 return dst 1225} 1226 1227// logMeasurements represents a map of measurement names to measurements. 1228type logMeasurements map[string]*logMeasurement 1229 1230// bytes estimates the memory footprint of this logMeasurements, in bytes. 1231func (mms *logMeasurements) bytes() int { 1232 var b int 1233 for k, v := range *mms { 1234 b += len(k) 1235 b += v.bytes() 1236 } 1237 b += int(unsafe.Sizeof(*mms)) 1238 return b 1239} 1240 1241type logMeasurement struct { 1242 name []byte 1243 tagSet map[string]logTagKey 1244 deleted bool 1245 series map[uint64]struct{} 1246 seriesSet *tsdb.SeriesIDSet 1247} 1248 1249// bytes estimates the memory footprint of this logMeasurement, in bytes. 1250func (m *logMeasurement) bytes() int { 1251 var b int 1252 b += len(m.name) 1253 for k, v := range m.tagSet { 1254 b += len(k) 1255 b += v.bytes() 1256 } 1257 b += (int(m.cardinality()) * 8) 1258 b += int(unsafe.Sizeof(*m)) 1259 return b 1260} 1261 1262func (m *logMeasurement) addSeriesID(x uint64) { 1263 if m.seriesSet != nil { 1264 m.seriesSet.AddNoLock(x) 1265 return 1266 } 1267 1268 m.series[x] = struct{}{} 1269 1270 // If the map is getting too big it can be converted into a roaring seriesSet. 1271 if len(m.series) > 25 { 1272 m.seriesSet = tsdb.NewSeriesIDSet() 1273 for id := range m.series { 1274 m.seriesSet.AddNoLock(id) 1275 } 1276 m.series = nil 1277 } 1278} 1279 1280func (m *logMeasurement) removeSeriesID(x uint64) { 1281 if m.seriesSet != nil { 1282 m.seriesSet.RemoveNoLock(x) 1283 return 1284 } 1285 delete(m.series, x) 1286} 1287 1288func (m *logMeasurement) cardinality() int64 { 1289 if m.seriesSet != nil { 1290 return int64(m.seriesSet.Cardinality()) 1291 } 1292 return int64(len(m.series)) 1293} 1294 1295// forEach applies fn to every series ID in the logMeasurement. 1296func (m *logMeasurement) forEach(fn func(uint64)) { 1297 if m.seriesSet != nil { 1298 m.seriesSet.ForEachNoLock(fn) 1299 return 1300 } 1301 1302 for seriesID := range m.series { 1303 fn(seriesID) 1304 } 1305} 1306 1307// seriesIDs returns a sorted set of seriesIDs. 1308func (m *logMeasurement) seriesIDs() []uint64 { 1309 a := make([]uint64, 0, m.cardinality()) 1310 if m.seriesSet != nil { 1311 m.seriesSet.ForEachNoLock(func(id uint64) { a = append(a, id) }) 1312 return a // IDs are already sorted. 1313 } 1314 1315 for seriesID := range m.series { 1316 a = append(a, seriesID) 1317 } 1318 sort.Sort(uint64Slice(a)) 1319 return a 1320} 1321 1322// seriesIDSet returns a copy of the logMeasurement's seriesSet, or creates a new 1323// one 1324func (m *logMeasurement) seriesIDSet() *tsdb.SeriesIDSet { 1325 if m.seriesSet != nil { 1326 return m.seriesSet.CloneNoLock() 1327 } 1328 1329 ss := tsdb.NewSeriesIDSet() 1330 for seriesID := range m.series { 1331 ss.AddNoLock(seriesID) 1332 } 1333 return ss 1334} 1335 1336func (m *logMeasurement) Name() []byte { return m.name } 1337func (m *logMeasurement) Deleted() bool { return m.deleted } 1338 1339func (m *logMeasurement) createTagSetIfNotExists(key []byte) logTagKey { 1340 ts, ok := m.tagSet[string(key)] 1341 if !ok { 1342 ts = logTagKey{name: key, tagValues: make(map[string]logTagValue)} 1343 } 1344 return ts 1345} 1346 1347// keys returns a sorted list of tag keys. 1348func (m *logMeasurement) keys() []string { 1349 a := make([]string, 0, len(m.tagSet)) 1350 for k := range m.tagSet { 1351 a = append(a, k) 1352 } 1353 sort.Strings(a) 1354 return a 1355} 1356 1357// logMeasurementSlice is a sortable list of log measurements. 1358type logMeasurementSlice []logMeasurement 1359 1360func (a logMeasurementSlice) Len() int { return len(a) } 1361func (a logMeasurementSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] } 1362func (a logMeasurementSlice) Less(i, j int) bool { return bytes.Compare(a[i].name, a[j].name) == -1 } 1363 1364// logMeasurementIterator represents an iterator over a slice of measurements. 1365type logMeasurementIterator struct { 1366 mms []logMeasurement 1367} 1368 1369// Next returns the next element in the iterator. 1370func (itr *logMeasurementIterator) Next() (e MeasurementElem) { 1371 if len(itr.mms) == 0 { 1372 return nil 1373 } 1374 e, itr.mms = &itr.mms[0], itr.mms[1:] 1375 return e 1376} 1377 1378type logTagKey struct { 1379 name []byte 1380 deleted bool 1381 tagValues map[string]logTagValue 1382} 1383 1384// bytes estimates the memory footprint of this logTagKey, in bytes. 1385func (tk *logTagKey) bytes() int { 1386 var b int 1387 b += len(tk.name) 1388 for k, v := range tk.tagValues { 1389 b += len(k) 1390 b += v.bytes() 1391 } 1392 b += int(unsafe.Sizeof(*tk)) 1393 return b 1394} 1395 1396func (tk *logTagKey) Key() []byte { return tk.name } 1397func (tk *logTagKey) Deleted() bool { return tk.deleted } 1398 1399func (tk *logTagKey) TagValueIterator() TagValueIterator { 1400 a := make([]logTagValue, 0, len(tk.tagValues)) 1401 for _, v := range tk.tagValues { 1402 a = append(a, v) 1403 } 1404 return newLogTagValueIterator(a) 1405} 1406 1407func (tk *logTagKey) createTagValueIfNotExists(value []byte) logTagValue { 1408 tv, ok := tk.tagValues[string(value)] 1409 if !ok { 1410 tv = logTagValue{name: value, series: make(map[uint64]struct{})} 1411 } 1412 return tv 1413} 1414 1415// logTagKey is a sortable list of log tag keys. 1416type logTagKeySlice []logTagKey 1417 1418func (a logTagKeySlice) Len() int { return len(a) } 1419func (a logTagKeySlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] } 1420func (a logTagKeySlice) Less(i, j int) bool { return bytes.Compare(a[i].name, a[j].name) == -1 } 1421 1422type logTagValue struct { 1423 name []byte 1424 deleted bool 1425 series map[uint64]struct{} 1426 seriesSet *tsdb.SeriesIDSet 1427} 1428 1429// bytes estimates the memory footprint of this logTagValue, in bytes. 1430func (tv *logTagValue) bytes() int { 1431 var b int 1432 b += len(tv.name) 1433 b += int(unsafe.Sizeof(*tv)) 1434 b += (int(tv.cardinality()) * 8) 1435 return b 1436} 1437 1438func (tv *logTagValue) addSeriesID(x uint64) { 1439 if tv.seriesSet != nil { 1440 tv.seriesSet.AddNoLock(x) 1441 return 1442 } 1443 1444 tv.series[x] = struct{}{} 1445 1446 // If the map is getting too big it can be converted into a roaring seriesSet. 1447 if len(tv.series) > 25 { 1448 tv.seriesSet = tsdb.NewSeriesIDSet() 1449 for id := range tv.series { 1450 tv.seriesSet.AddNoLock(id) 1451 } 1452 tv.series = nil 1453 } 1454} 1455 1456func (tv *logTagValue) removeSeriesID(x uint64) { 1457 if tv.seriesSet != nil { 1458 tv.seriesSet.RemoveNoLock(x) 1459 return 1460 } 1461 delete(tv.series, x) 1462} 1463 1464func (tv *logTagValue) cardinality() int64 { 1465 if tv.seriesSet != nil { 1466 return int64(tv.seriesSet.Cardinality()) 1467 } 1468 return int64(len(tv.series)) 1469} 1470 1471// seriesIDSet returns a copy of the logMeasurement's seriesSet, or creates a new 1472// one 1473func (tv *logTagValue) seriesIDSet() *tsdb.SeriesIDSet { 1474 if tv.seriesSet != nil { 1475 return tv.seriesSet.CloneNoLock() 1476 } 1477 1478 ss := tsdb.NewSeriesIDSet() 1479 for seriesID := range tv.series { 1480 ss.AddNoLock(seriesID) 1481 } 1482 return ss 1483} 1484 1485func (tv *logTagValue) Value() []byte { return tv.name } 1486func (tv *logTagValue) Deleted() bool { return tv.deleted } 1487 1488// logTagValue is a sortable list of log tag values. 1489type logTagValueSlice []logTagValue 1490 1491func (a logTagValueSlice) Len() int { return len(a) } 1492func (a logTagValueSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] } 1493func (a logTagValueSlice) Less(i, j int) bool { return bytes.Compare(a[i].name, a[j].name) == -1 } 1494 1495// logTagKeyIterator represents an iterator over a slice of tag keys. 1496type logTagKeyIterator struct { 1497 a []logTagKey 1498} 1499 1500// newLogTagKeyIterator returns a new instance of logTagKeyIterator. 1501func newLogTagKeyIterator(a []logTagKey) *logTagKeyIterator { 1502 sort.Sort(logTagKeySlice(a)) 1503 return &logTagKeyIterator{a: a} 1504} 1505 1506// Next returns the next element in the iterator. 1507func (itr *logTagKeyIterator) Next() (e TagKeyElem) { 1508 if len(itr.a) == 0 { 1509 return nil 1510 } 1511 e, itr.a = &itr.a[0], itr.a[1:] 1512 return e 1513} 1514 1515// logTagValueIterator represents an iterator over a slice of tag values. 1516type logTagValueIterator struct { 1517 a []logTagValue 1518} 1519 1520// newLogTagValueIterator returns a new instance of logTagValueIterator. 1521func newLogTagValueIterator(a []logTagValue) *logTagValueIterator { 1522 sort.Sort(logTagValueSlice(a)) 1523 return &logTagValueIterator{a: a} 1524} 1525 1526// Next returns the next element in the iterator. 1527func (itr *logTagValueIterator) Next() (e TagValueElem) { 1528 if len(itr.a) == 0 { 1529 return nil 1530 } 1531 e, itr.a = &itr.a[0], itr.a[1:] 1532 return e 1533} 1534 1535// FormatLogFileName generates a log filename for the given index. 1536func FormatLogFileName(id int) string { 1537 return fmt.Sprintf("L0-%08d%s", id, LogFileExt) 1538} 1539