1package tsm1 2 3import ( 4 "bytes" 5 "encoding/binary" 6 "fmt" 7 "io" 8 "math" 9 "os" 10 "runtime" 11 "sort" 12 "sync" 13 "sync/atomic" 14 15 "github.com/influxdata/influxdb/pkg/bytesutil" 16 "github.com/influxdata/influxdb/pkg/file" 17 "github.com/influxdata/influxdb/tsdb" 18) 19 20// ErrFileInUse is returned when attempting to remove or close a TSM file that is still being used. 21var ErrFileInUse = fmt.Errorf("file still in use") 22 23// nilOffset is the value written to the offsets to indicate that position is deleted. The value is the max 24// uint32 which is an invalid position. We don't use 0 as 0 is actually a valid position. 25var nilOffset = []byte{255, 255, 255, 255} 26 27// TSMReader is a reader for a TSM file. 28type TSMReader struct { 29 // refs is the count of active references to this reader. 30 refs int64 31 refsWG sync.WaitGroup 32 33 madviseWillNeed bool // Hint to the kernel with MADV_WILLNEED. 34 mu sync.RWMutex 35 36 // accessor provides access and decoding of blocks for the reader. 37 accessor blockAccessor 38 39 // index is the index of all blocks. 40 index TSMIndex 41 42 // tombstoner ensures tombstoned keys are not available by the index. 43 tombstoner *Tombstoner 44 45 // size is the size of the file on disk. 46 size int64 47 48 // lastModified is the last time this file was modified on disk 49 lastModified int64 50 51 // deleteMu limits concurrent deletes 52 deleteMu sync.Mutex 53} 54 55// TSMIndex represent the index section of a TSM file. The index records all 56// blocks, their locations, sizes, min and max times. 57type TSMIndex interface { 58 // Delete removes the given keys from the index. 59 Delete(keys [][]byte) 60 61 // DeleteRange removes the given keys with data between minTime and maxTime from the index. 62 DeleteRange(keys [][]byte, minTime, maxTime int64) 63 64 // ContainsKey returns true if the given key may exist in the index. This func is faster than 65 // Contains but, may return false positives. 66 ContainsKey(key []byte) bool 67 68 // Contains return true if the given key exists in the index. 69 Contains(key []byte) bool 70 71 // ContainsValue returns true if key and time might exist in this file. This function could 72 // return true even though the actual point does not exists. For example, the key may 73 // exist in this file, but not have a point exactly at time t. 74 ContainsValue(key []byte, timestamp int64) bool 75 76 // Entries returns all index entries for a key. 77 Entries(key []byte) []IndexEntry 78 79 // ReadEntries reads the index entries for key into entries. 80 ReadEntries(key []byte, entries *[]IndexEntry) []IndexEntry 81 82 // Entry returns the index entry for the specified key and timestamp. If no entry 83 // matches the key and timestamp, nil is returned. 84 Entry(key []byte, timestamp int64) *IndexEntry 85 86 // Key returns the key in the index at the given position, using entries to avoid allocations. 87 Key(index int, entries *[]IndexEntry) ([]byte, byte, []IndexEntry) 88 89 // KeyAt returns the key in the index at the given position. 90 KeyAt(index int) ([]byte, byte) 91 92 // KeyCount returns the count of unique keys in the index. 93 KeyCount() int 94 95 // Seek returns the position in the index where key <= value in the index. 96 Seek(key []byte) int 97 98 // OverlapsTimeRange returns true if the time range of the file intersect min and max. 99 OverlapsTimeRange(min, max int64) bool 100 101 // OverlapsKeyRange returns true if the min and max keys of the file overlap the arguments min and max. 102 OverlapsKeyRange(min, max []byte) bool 103 104 // Size returns the size of the current index in bytes. 105 Size() uint32 106 107 // TimeRange returns the min and max time across all keys in the file. 108 TimeRange() (int64, int64) 109 110 // TombstoneRange returns ranges of time that are deleted for the given key. 111 TombstoneRange(key []byte) []TimeRange 112 113 // KeyRange returns the min and max keys in the file. 114 KeyRange() ([]byte, []byte) 115 116 // Type returns the block type of the values stored for the key. Returns one of 117 // BlockFloat64, BlockInt64, BlockBool, BlockString. If key does not exist, 118 // an error is returned. 119 Type(key []byte) (byte, error) 120 121 // UnmarshalBinary populates an index from an encoded byte slice 122 // representation of an index. 123 UnmarshalBinary(b []byte) error 124 125 // Close closes the index and releases any resources. 126 Close() error 127} 128 129// BlockIterator allows iterating over each block in a TSM file in order. It provides 130// raw access to the block bytes without decoding them. 131type BlockIterator struct { 132 r *TSMReader 133 134 // i is the current key index 135 i int 136 137 // n is the total number of keys 138 n int 139 140 key []byte 141 cache []IndexEntry 142 entries []IndexEntry 143 err error 144 typ byte 145} 146 147// PeekNext returns the next key to be iterated or an empty string. 148func (b *BlockIterator) PeekNext() []byte { 149 if len(b.entries) > 1 { 150 return b.key 151 } else if b.n-b.i > 1 { 152 key, _ := b.r.KeyAt(b.i + 1) 153 return key 154 } 155 return nil 156} 157 158// Next returns true if there are more blocks to iterate through. 159func (b *BlockIterator) Next() bool { 160 if b.err != nil { 161 return false 162 } 163 164 if b.n-b.i == 0 && len(b.entries) == 0 { 165 return false 166 } 167 168 if len(b.entries) > 0 { 169 b.entries = b.entries[1:] 170 if len(b.entries) > 0 { 171 return true 172 } 173 } 174 175 if b.n-b.i > 0 { 176 b.key, b.typ, b.entries = b.r.Key(b.i, &b.cache) 177 b.i++ 178 179 // If there were deletes on the TSMReader, then our index is now off and we 180 // can't proceed. What we just read may not actually the next block. 181 if b.n != b.r.KeyCount() { 182 b.err = fmt.Errorf("delete during iteration") 183 return false 184 } 185 186 if len(b.entries) > 0 { 187 return true 188 } 189 } 190 191 return false 192} 193 194// Read reads information about the next block to be iterated. 195func (b *BlockIterator) Read() (key []byte, minTime int64, maxTime int64, typ byte, checksum uint32, buf []byte, err error) { 196 if b.err != nil { 197 return nil, 0, 0, 0, 0, nil, b.err 198 } 199 checksum, buf, err = b.r.ReadBytes(&b.entries[0], nil) 200 if err != nil { 201 b.err = err 202 return nil, 0, 0, 0, 0, nil, err 203 } 204 return b.key, b.entries[0].MinTime, b.entries[0].MaxTime, b.typ, checksum, buf, err 205} 206 207// Err returns any errors encounter during iteration. 208func (b *BlockIterator) Err() error { 209 return b.err 210} 211 212type tsmReaderOption func(*TSMReader) 213 214// WithMadviseWillNeed is an option for specifying whether to provide a MADV_WILL need hint to the kernel. 215var WithMadviseWillNeed = func(willNeed bool) tsmReaderOption { 216 return func(r *TSMReader) { 217 r.madviseWillNeed = willNeed 218 } 219} 220 221// NewTSMReader returns a new TSMReader from the given file. 222func NewTSMReader(f *os.File, options ...tsmReaderOption) (*TSMReader, error) { 223 t := &TSMReader{} 224 for _, option := range options { 225 option(t) 226 } 227 228 stat, err := f.Stat() 229 if err != nil { 230 return nil, err 231 } 232 t.size = stat.Size() 233 t.lastModified = stat.ModTime().UnixNano() 234 t.accessor = &mmapAccessor{ 235 f: f, 236 mmapWillNeed: t.madviseWillNeed, 237 } 238 239 index, err := t.accessor.init() 240 if err != nil { 241 return nil, err 242 } 243 244 t.index = index 245 t.tombstoner = NewTombstoner(t.Path(), index.ContainsKey) 246 247 if err := t.applyTombstones(); err != nil { 248 return nil, err 249 } 250 251 return t, nil 252} 253 254// WithObserver sets the observer for the TSM reader. 255func (t *TSMReader) WithObserver(obs tsdb.FileStoreObserver) { 256 t.tombstoner.WithObserver(obs) 257} 258 259func (t *TSMReader) applyTombstones() error { 260 var cur, prev Tombstone 261 batch := make([][]byte, 0, 4096) 262 263 if err := t.tombstoner.Walk(func(ts Tombstone) error { 264 cur = ts 265 if len(batch) > 0 { 266 if prev.Min != cur.Min || prev.Max != cur.Max { 267 t.index.DeleteRange(batch, prev.Min, prev.Max) 268 batch = batch[:0] 269 } 270 } 271 272 // Copy the tombstone key and re-use the buffers to avoid allocations 273 n := len(batch) 274 batch = batch[:n+1] 275 if cap(batch[n]) < len(ts.Key) { 276 batch[n] = make([]byte, len(ts.Key)) 277 } else { 278 batch[n] = batch[n][:len(ts.Key)] 279 } 280 copy(batch[n], ts.Key) 281 282 if len(batch) >= 4096 { 283 t.index.DeleteRange(batch, prev.Min, prev.Max) 284 batch = batch[:0] 285 } 286 287 prev = ts 288 return nil 289 }); err != nil { 290 return fmt.Errorf("init: read tombstones: %v", err) 291 } 292 293 if len(batch) > 0 { 294 t.index.DeleteRange(batch, cur.Min, cur.Max) 295 } 296 return nil 297} 298 299func (t *TSMReader) Free() error { 300 t.mu.RLock() 301 defer t.mu.RUnlock() 302 return t.accessor.free() 303} 304 305// Path returns the path of the file the TSMReader was initialized with. 306func (t *TSMReader) Path() string { 307 t.mu.RLock() 308 p := t.accessor.path() 309 t.mu.RUnlock() 310 return p 311} 312 313// Key returns the key and the underlying entry at the numeric index. 314func (t *TSMReader) Key(index int, entries *[]IndexEntry) ([]byte, byte, []IndexEntry) { 315 return t.index.Key(index, entries) 316} 317 318// KeyAt returns the key and key type at position idx in the index. 319func (t *TSMReader) KeyAt(idx int) ([]byte, byte) { 320 return t.index.KeyAt(idx) 321} 322 323func (t *TSMReader) Seek(key []byte) int { 324 return t.index.Seek(key) 325} 326 327// ReadAt returns the values corresponding to the given index entry. 328func (t *TSMReader) ReadAt(entry *IndexEntry, vals []Value) ([]Value, error) { 329 t.mu.RLock() 330 v, err := t.accessor.readBlock(entry, vals) 331 t.mu.RUnlock() 332 return v, err 333} 334 335// Read returns the values corresponding to the block at the given key and timestamp. 336func (t *TSMReader) Read(key []byte, timestamp int64) ([]Value, error) { 337 t.mu.RLock() 338 v, err := t.accessor.read(key, timestamp) 339 t.mu.RUnlock() 340 return v, err 341} 342 343// ReadAll returns all values for a key in all blocks. 344func (t *TSMReader) ReadAll(key []byte) ([]Value, error) { 345 t.mu.RLock() 346 v, err := t.accessor.readAll(key) 347 t.mu.RUnlock() 348 return v, err 349} 350 351func (t *TSMReader) ReadBytes(e *IndexEntry, b []byte) (uint32, []byte, error) { 352 t.mu.RLock() 353 n, v, err := t.accessor.readBytes(e, b) 354 t.mu.RUnlock() 355 return n, v, err 356} 357 358// Type returns the type of values stored at the given key. 359func (t *TSMReader) Type(key []byte) (byte, error) { 360 return t.index.Type(key) 361} 362 363// Close closes the TSMReader. 364func (t *TSMReader) Close() error { 365 t.refsWG.Wait() 366 367 t.mu.Lock() 368 defer t.mu.Unlock() 369 370 if err := t.accessor.close(); err != nil { 371 return err 372 } 373 374 return t.index.Close() 375} 376 377// Ref records a usage of this TSMReader. If there are active references 378// when the reader is closed or removed, the reader will remain open until 379// there are no more references. 380func (t *TSMReader) Ref() { 381 atomic.AddInt64(&t.refs, 1) 382 t.refsWG.Add(1) 383} 384 385// Unref removes a usage record of this TSMReader. If the Reader was closed 386// by another goroutine while there were active references, the file will 387// be closed and remove 388func (t *TSMReader) Unref() { 389 atomic.AddInt64(&t.refs, -1) 390 t.refsWG.Done() 391} 392 393// InUse returns whether the TSMReader currently has any active references. 394func (t *TSMReader) InUse() bool { 395 refs := atomic.LoadInt64(&t.refs) 396 return refs > 0 397} 398 399// Remove removes any underlying files stored on disk for this reader. 400func (t *TSMReader) Remove() error { 401 t.mu.Lock() 402 defer t.mu.Unlock() 403 return t.remove() 404} 405 406// Rename renames the underlying file to the new path. 407func (t *TSMReader) Rename(path string) error { 408 t.mu.Lock() 409 defer t.mu.Unlock() 410 return t.accessor.rename(path) 411} 412 413// Remove removes any underlying files stored on disk for this reader. 414func (t *TSMReader) remove() error { 415 path := t.accessor.path() 416 417 if t.InUse() { 418 return ErrFileInUse 419 } 420 421 if path != "" { 422 err := os.RemoveAll(path) 423 if err != nil { 424 return err 425 } 426 } 427 428 if err := t.tombstoner.Delete(); err != nil { 429 return err 430 } 431 return nil 432} 433 434// Contains returns whether the given key is present in the index. 435func (t *TSMReader) Contains(key []byte) bool { 436 return t.index.Contains(key) 437} 438 439// ContainsValue returns true if key and time might exists in this file. This function could 440// return true even though the actual point does not exist. For example, the key may 441// exist in this file, but not have a point exactly at time t. 442func (t *TSMReader) ContainsValue(key []byte, ts int64) bool { 443 return t.index.ContainsValue(key, ts) 444} 445 446// DeleteRange removes the given points for keys between minTime and maxTime. The series 447// keys passed in must be sorted. 448func (t *TSMReader) DeleteRange(keys [][]byte, minTime, maxTime int64) error { 449 if len(keys) == 0 { 450 return nil 451 } 452 453 batch := t.BatchDelete() 454 if err := batch.DeleteRange(keys, minTime, maxTime); err != nil { 455 batch.Rollback() 456 return err 457 } 458 return batch.Commit() 459} 460 461// Delete deletes blocks indicated by keys. 462func (t *TSMReader) Delete(keys [][]byte) error { 463 if err := t.tombstoner.Add(keys); err != nil { 464 return err 465 } 466 467 if err := t.tombstoner.Flush(); err != nil { 468 return err 469 } 470 471 t.index.Delete(keys) 472 return nil 473} 474 475// OverlapsTimeRange returns true if the time range of the file intersect min and max. 476func (t *TSMReader) OverlapsTimeRange(min, max int64) bool { 477 return t.index.OverlapsTimeRange(min, max) 478} 479 480// OverlapsKeyRange returns true if the key range of the file intersect min and max. 481func (t *TSMReader) OverlapsKeyRange(min, max []byte) bool { 482 return t.index.OverlapsKeyRange(min, max) 483} 484 485// TimeRange returns the min and max time across all keys in the file. 486func (t *TSMReader) TimeRange() (int64, int64) { 487 return t.index.TimeRange() 488} 489 490// KeyRange returns the min and max key across all keys in the file. 491func (t *TSMReader) KeyRange() ([]byte, []byte) { 492 return t.index.KeyRange() 493} 494 495// KeyCount returns the count of unique keys in the TSMReader. 496func (t *TSMReader) KeyCount() int { 497 return t.index.KeyCount() 498} 499 500// Entries returns all index entries for key. 501func (t *TSMReader) Entries(key []byte) []IndexEntry { 502 return t.index.Entries(key) 503} 504 505// ReadEntries reads the index entries for key into entries. 506func (t *TSMReader) ReadEntries(key []byte, entries *[]IndexEntry) []IndexEntry { 507 return t.index.ReadEntries(key, entries) 508} 509 510// IndexSize returns the size of the index in bytes. 511func (t *TSMReader) IndexSize() uint32 { 512 return t.index.Size() 513} 514 515// Size returns the size of the underlying file in bytes. 516func (t *TSMReader) Size() uint32 { 517 t.mu.RLock() 518 size := t.size 519 t.mu.RUnlock() 520 return uint32(size) 521} 522 523// LastModified returns the last time the underlying file was modified. 524func (t *TSMReader) LastModified() int64 { 525 t.mu.RLock() 526 lm := t.lastModified 527 for _, ts := range t.tombstoner.TombstoneFiles() { 528 if ts.LastModified > lm { 529 lm = ts.LastModified 530 } 531 } 532 t.mu.RUnlock() 533 return lm 534} 535 536// HasTombstones return true if there are any tombstone entries recorded. 537func (t *TSMReader) HasTombstones() bool { 538 t.mu.RLock() 539 b := t.tombstoner.HasTombstones() 540 t.mu.RUnlock() 541 return b 542} 543 544// TombstoneFiles returns any tombstone files associated with this TSM file. 545func (t *TSMReader) TombstoneFiles() []FileStat { 546 t.mu.RLock() 547 fs := t.tombstoner.TombstoneFiles() 548 t.mu.RUnlock() 549 return fs 550} 551 552// TombstoneRange returns ranges of time that are deleted for the given key. 553func (t *TSMReader) TombstoneRange(key []byte) []TimeRange { 554 t.mu.RLock() 555 tr := t.index.TombstoneRange(key) 556 t.mu.RUnlock() 557 return tr 558} 559 560// Stats returns the FileStat for the TSMReader's underlying file. 561func (t *TSMReader) Stats() FileStat { 562 minTime, maxTime := t.index.TimeRange() 563 minKey, maxKey := t.index.KeyRange() 564 return FileStat{ 565 Path: t.Path(), 566 Size: t.Size(), 567 LastModified: t.LastModified(), 568 MinTime: minTime, 569 MaxTime: maxTime, 570 MinKey: minKey, 571 MaxKey: maxKey, 572 HasTombstone: t.tombstoner.HasTombstones(), 573 } 574} 575 576// BlockIterator returns a BlockIterator for the underlying TSM file. 577func (t *TSMReader) BlockIterator() *BlockIterator { 578 return &BlockIterator{ 579 r: t, 580 n: t.index.KeyCount(), 581 } 582} 583 584type BatchDeleter interface { 585 DeleteRange(keys [][]byte, min, max int64) error 586 Commit() error 587 Rollback() error 588} 589 590type batchDelete struct { 591 r *TSMReader 592} 593 594func (b *batchDelete) DeleteRange(keys [][]byte, minTime, maxTime int64) error { 595 if len(keys) == 0 { 596 return nil 597 } 598 599 // If the keys can't exist in this TSM file, skip it. 600 minKey, maxKey := keys[0], keys[len(keys)-1] 601 if !b.r.index.OverlapsKeyRange(minKey, maxKey) { 602 return nil 603 } 604 605 // If the timerange can't exist in this TSM file, skip it. 606 if !b.r.index.OverlapsTimeRange(minTime, maxTime) { 607 return nil 608 } 609 610 if err := b.r.tombstoner.AddRange(keys, minTime, maxTime); err != nil { 611 return err 612 } 613 614 return nil 615} 616 617func (b *batchDelete) Commit() error { 618 defer b.r.deleteMu.Unlock() 619 if err := b.r.tombstoner.Flush(); err != nil { 620 return err 621 } 622 623 return b.r.applyTombstones() 624} 625 626func (b *batchDelete) Rollback() error { 627 defer b.r.deleteMu.Unlock() 628 return b.r.tombstoner.Rollback() 629} 630 631// BatchDelete returns a BatchDeleter. Only a single goroutine may run a BatchDelete at a time. 632// Callers must either Commit or Rollback the operation. 633func (r *TSMReader) BatchDelete() BatchDeleter { 634 r.deleteMu.Lock() 635 return &batchDelete{r: r} 636} 637 638type BatchDeleters []BatchDeleter 639 640func (a BatchDeleters) DeleteRange(keys [][]byte, min, max int64) error { 641 errC := make(chan error, len(a)) 642 for _, b := range a { 643 go func(b BatchDeleter) { errC <- b.DeleteRange(keys, min, max) }(b) 644 } 645 646 var err error 647 for i := 0; i < len(a); i++ { 648 dErr := <-errC 649 if dErr != nil { 650 err = dErr 651 } 652 } 653 return err 654} 655 656func (a BatchDeleters) Commit() error { 657 errC := make(chan error, len(a)) 658 for _, b := range a { 659 go func(b BatchDeleter) { errC <- b.Commit() }(b) 660 } 661 662 var err error 663 for i := 0; i < len(a); i++ { 664 dErr := <-errC 665 if dErr != nil { 666 err = dErr 667 } 668 } 669 return err 670} 671 672func (a BatchDeleters) Rollback() error { 673 errC := make(chan error, len(a)) 674 for _, b := range a { 675 go func(b BatchDeleter) { errC <- b.Rollback() }(b) 676 } 677 678 var err error 679 for i := 0; i < len(a); i++ { 680 dErr := <-errC 681 if dErr != nil { 682 err = dErr 683 } 684 } 685 return err 686} 687 688// indirectIndex is a TSMIndex that uses a raw byte slice representation of an index. This 689// implementation can be used for indexes that may be MMAPed into memory. 690type indirectIndex struct { 691 mu sync.RWMutex 692 693 // indirectIndex works a follows. Assuming we have an index structure in memory as 694 // the diagram below: 695 // 696 // ┌────────────────────────────────────────────────────────────────────┐ 697 // │ Index │ 698 // ├─┬──────────────────────┬──┬───────────────────────┬───┬────────────┘ 699 // │0│ │62│ │145│ 700 // ├─┴───────┬─────────┬────┼──┴──────┬─────────┬──────┼───┴─────┬──────┐ 701 // │Key 1 Len│ Key │... │Key 2 Len│ Key 2 │ ... │ Key 3 │ ... │ 702 // │ 2 bytes │ N bytes │ │ 2 bytes │ N bytes │ │ 2 bytes │ │ 703 // └─────────┴─────────┴────┴─────────┴─────────┴──────┴─────────┴──────┘ 704 705 // We would build an `offsets` slices where each element pointers to the byte location 706 // for the first key in the index slice. 707 708 // ┌────────────────────────────────────────────────────────────────────┐ 709 // │ Offsets │ 710 // ├────┬────┬────┬─────────────────────────────────────────────────────┘ 711 // │ 0 │ 62 │145 │ 712 // └────┴────┴────┘ 713 714 // Using this offset slice we can find `Key 2` by doing a binary search 715 // over the offsets slice. Instead of comparing the value in the offsets 716 // (e.g. `62`), we use that as an index into the underlying index to 717 // retrieve the key at position `62` and perform our comparisons with that. 718 719 // When we have identified the correct position in the index for a given 720 // key, we could perform another binary search or a linear scan. This 721 // should be fast as well since each index entry is 28 bytes and all 722 // contiguous in memory. The current implementation uses a linear scan since the 723 // number of block entries is expected to be < 100 per key. 724 725 // b is the underlying index byte slice. This could be a copy on the heap or an MMAP 726 // slice reference 727 b []byte 728 729 // offsets contains the positions in b for each key. It points to the 2 byte length of 730 // key. 731 offsets []byte 732 733 // minKey, maxKey are the minium and maximum (lexicographically sorted) contained in the 734 // file 735 minKey, maxKey []byte 736 737 // minTime, maxTime are the minimum and maximum times contained in the file across all 738 // series. 739 minTime, maxTime int64 740 741 // tombstones contains only the tombstoned keys with subset of time values deleted. An 742 // entry would exist here if a subset of the points for a key were deleted and the file 743 // had not be re-compacted to remove the points on disk. 744 tombstones map[string][]TimeRange 745} 746 747// TimeRange holds a min and max timestamp. 748type TimeRange struct { 749 Min, Max int64 750} 751 752func (t TimeRange) Overlaps(min, max int64) bool { 753 return t.Min <= max && t.Max >= min 754} 755 756// NewIndirectIndex returns a new indirect index. 757func NewIndirectIndex() *indirectIndex { 758 return &indirectIndex{ 759 tombstones: make(map[string][]TimeRange), 760 } 761} 762 763func (d *indirectIndex) offset(i int) int { 764 if i < 0 || i+4 > len(d.offsets) { 765 return -1 766 } 767 return int(binary.BigEndian.Uint32(d.offsets[i*4 : i*4+4])) 768} 769 770func (d *indirectIndex) Seek(key []byte) int { 771 d.mu.RLock() 772 defer d.mu.RUnlock() 773 return d.searchOffset(key) 774} 775 776// searchOffset searches the offsets slice for key and returns the position in 777// offsets where key would exist. 778func (d *indirectIndex) searchOffset(key []byte) int { 779 // We use a binary search across our indirect offsets (pointers to all the keys 780 // in the index slice). 781 i := bytesutil.SearchBytesFixed(d.offsets, 4, func(x []byte) bool { 782 // i is the position in offsets we are at so get offset it points to 783 offset := int32(binary.BigEndian.Uint32(x)) 784 785 // It's pointing to the start of the key which is a 2 byte length 786 keyLen := int32(binary.BigEndian.Uint16(d.b[offset : offset+2])) 787 788 // See if it matches 789 return bytes.Compare(d.b[offset+2:offset+2+keyLen], key) >= 0 790 }) 791 792 // See if we might have found the right index 793 if i < len(d.offsets) { 794 return int(i / 4) 795 } 796 797 // The key is not in the index. i is the index where it would be inserted so return 798 // a value outside our offset range. 799 return int(len(d.offsets)) / 4 800} 801 802// search returns the byte position of key in the index. If key is not 803// in the index, len(index) is returned. 804func (d *indirectIndex) search(key []byte) int { 805 if !d.ContainsKey(key) { 806 return len(d.b) 807 } 808 809 // We use a binary search across our indirect offsets (pointers to all the keys 810 // in the index slice). 811 // TODO(sgc): this should be inlined to `indirectIndex` as it is only used here 812 i := bytesutil.SearchBytesFixed(d.offsets, 4, func(x []byte) bool { 813 // i is the position in offsets we are at so get offset it points to 814 offset := int32(binary.BigEndian.Uint32(x)) 815 816 // It's pointing to the start of the key which is a 2 byte length 817 keyLen := int32(binary.BigEndian.Uint16(d.b[offset : offset+2])) 818 819 // See if it matches 820 return bytes.Compare(d.b[offset+2:offset+2+keyLen], key) >= 0 821 }) 822 823 // See if we might have found the right index 824 if i < len(d.offsets) { 825 ofs := binary.BigEndian.Uint32(d.offsets[i : i+4]) 826 _, k := readKey(d.b[ofs:]) 827 828 // The search may have returned an i == 0 which could indicated that the value 829 // searched should be inserted at position 0. Make sure the key in the index 830 // matches the search value. 831 if !bytes.Equal(key, k) { 832 return len(d.b) 833 } 834 835 return int(ofs) 836 } 837 838 // The key is not in the index. i is the index where it would be inserted so return 839 // a value outside our offset range. 840 return len(d.b) 841} 842 843// ContainsKey returns true of key may exist in this index. 844func (d *indirectIndex) ContainsKey(key []byte) bool { 845 return bytes.Compare(key, d.minKey) >= 0 && bytes.Compare(key, d.maxKey) <= 0 846} 847 848// Entries returns all index entries for a key. 849func (d *indirectIndex) Entries(key []byte) []IndexEntry { 850 return d.ReadEntries(key, nil) 851} 852 853func (d *indirectIndex) readEntriesAt(ofs int, entries *[]IndexEntry) ([]byte, []IndexEntry) { 854 n, k := readKey(d.b[ofs:]) 855 856 // Read and return all the entries 857 ofs += n 858 var ie indexEntries 859 if entries != nil { 860 ie.entries = *entries 861 } 862 if _, err := readEntries(d.b[ofs:], &ie); err != nil { 863 panic(fmt.Sprintf("error reading entries: %v", err)) 864 } 865 if entries != nil { 866 *entries = ie.entries 867 } 868 return k, ie.entries 869} 870 871// ReadEntries returns all index entries for a key. 872func (d *indirectIndex) ReadEntries(key []byte, entries *[]IndexEntry) []IndexEntry { 873 d.mu.RLock() 874 defer d.mu.RUnlock() 875 876 ofs := d.search(key) 877 if ofs < len(d.b) { 878 k, entries := d.readEntriesAt(ofs, entries) 879 // The search may have returned an i == 0 which could indicated that the value 880 // searched should be inserted at position 0. Make sure the key in the index 881 // matches the search value. 882 if !bytes.Equal(key, k) { 883 return nil 884 } 885 886 return entries 887 } 888 889 // The key is not in the index. i is the index where it would be inserted. 890 return nil 891} 892 893// Entry returns the index entry for the specified key and timestamp. If no entry 894// matches the key an timestamp, nil is returned. 895func (d *indirectIndex) Entry(key []byte, timestamp int64) *IndexEntry { 896 entries := d.Entries(key) 897 for _, entry := range entries { 898 if entry.Contains(timestamp) { 899 return &entry 900 } 901 } 902 return nil 903} 904 905// Key returns the key in the index at the given position. 906func (d *indirectIndex) Key(idx int, entries *[]IndexEntry) ([]byte, byte, []IndexEntry) { 907 d.mu.RLock() 908 defer d.mu.RUnlock() 909 910 if idx < 0 || idx*4+4 > len(d.offsets) { 911 return nil, 0, nil 912 } 913 ofs := binary.BigEndian.Uint32(d.offsets[idx*4 : idx*4+4]) 914 n, key := readKey(d.b[ofs:]) 915 916 typ := d.b[int(ofs)+n] 917 918 var ie indexEntries 919 if entries != nil { 920 ie.entries = *entries 921 } 922 if _, err := readEntries(d.b[int(ofs)+n:], &ie); err != nil { 923 return nil, 0, nil 924 } 925 if entries != nil { 926 *entries = ie.entries 927 } 928 929 return key, typ, ie.entries 930} 931 932// KeyAt returns the key in the index at the given position. 933func (d *indirectIndex) KeyAt(idx int) ([]byte, byte) { 934 d.mu.RLock() 935 936 if idx < 0 || idx*4+4 > len(d.offsets) { 937 d.mu.RUnlock() 938 return nil, 0 939 } 940 ofs := int32(binary.BigEndian.Uint32(d.offsets[idx*4 : idx*4+4])) 941 942 n, key := readKey(d.b[ofs:]) 943 ofs = ofs + int32(n) 944 typ := d.b[ofs] 945 d.mu.RUnlock() 946 return key, typ 947} 948 949// KeyCount returns the count of unique keys in the index. 950func (d *indirectIndex) KeyCount() int { 951 d.mu.RLock() 952 n := len(d.offsets) / 4 953 d.mu.RUnlock() 954 return n 955} 956 957// Delete removes the given keys from the index. 958func (d *indirectIndex) Delete(keys [][]byte) { 959 if len(keys) == 0 { 960 return 961 } 962 963 if !bytesutil.IsSorted(keys) { 964 bytesutil.Sort(keys) 965 } 966 967 // Both keys and offsets are sorted. Walk both in order and skip 968 // any keys that exist in both. 969 d.mu.Lock() 970 start := d.searchOffset(keys[0]) 971 for i := start * 4; i+4 <= len(d.offsets) && len(keys) > 0; i += 4 { 972 offset := binary.BigEndian.Uint32(d.offsets[i : i+4]) 973 _, indexKey := readKey(d.b[offset:]) 974 975 for len(keys) > 0 && bytes.Compare(keys[0], indexKey) < 0 { 976 keys = keys[1:] 977 } 978 979 if len(keys) > 0 && bytes.Equal(keys[0], indexKey) { 980 keys = keys[1:] 981 copy(d.offsets[i:i+4], nilOffset) 982 } 983 } 984 d.offsets = bytesutil.Pack(d.offsets, 4, 255) 985 d.mu.Unlock() 986} 987 988// DeleteRange removes the given keys with data between minTime and maxTime from the index. 989func (d *indirectIndex) DeleteRange(keys [][]byte, minTime, maxTime int64) { 990 // No keys, nothing to do 991 if len(keys) == 0 { 992 return 993 } 994 995 if !bytesutil.IsSorted(keys) { 996 bytesutil.Sort(keys) 997 } 998 999 // If we're deleting the max time range, just use tombstoning to remove the 1000 // key from the offsets slice 1001 if minTime == math.MinInt64 && maxTime == math.MaxInt64 { 1002 d.Delete(keys) 1003 return 1004 } 1005 1006 // Is the range passed in outside of the time range for the file? 1007 min, max := d.TimeRange() 1008 if minTime > max || maxTime < min { 1009 return 1010 } 1011 1012 fullKeys := make([][]byte, 0, len(keys)) 1013 tombstones := map[string][]TimeRange{} 1014 var ie []IndexEntry 1015 1016 for i := 0; len(keys) > 0 && i < d.KeyCount(); i++ { 1017 k, entries := d.readEntriesAt(d.offset(i), &ie) 1018 1019 // Skip any keys that don't exist. These are less than the current key. 1020 for len(keys) > 0 && bytes.Compare(keys[0], k) < 0 { 1021 keys = keys[1:] 1022 } 1023 1024 // No more keys to delete, we're done. 1025 if len(keys) == 0 { 1026 break 1027 } 1028 1029 // If the current key is greater than the index one, continue to the next 1030 // index key. 1031 if len(keys) > 0 && bytes.Compare(keys[0], k) > 0 { 1032 continue 1033 } 1034 1035 // If multiple tombstones are saved for the same key 1036 if len(entries) == 0 { 1037 continue 1038 } 1039 1040 // Is the time range passed outside of the time range we've have stored for this key? 1041 min, max := entries[0].MinTime, entries[len(entries)-1].MaxTime 1042 if minTime > max || maxTime < min { 1043 continue 1044 } 1045 1046 // Does the range passed in cover every value for the key? 1047 if minTime <= min && maxTime >= max { 1048 fullKeys = append(fullKeys, keys[0]) 1049 keys = keys[1:] 1050 continue 1051 } 1052 1053 d.mu.RLock() 1054 existing := d.tombstones[string(k)] 1055 d.mu.RUnlock() 1056 1057 // Append the new tombonstes to the existing ones 1058 newTs := append(existing, append(tombstones[string(k)], TimeRange{minTime, maxTime})...) 1059 fn := func(i, j int) bool { 1060 a, b := newTs[i], newTs[j] 1061 if a.Min == b.Min { 1062 return a.Max <= b.Max 1063 } 1064 return a.Min < b.Min 1065 } 1066 1067 // Sort the updated tombstones if necessary 1068 if len(newTs) > 1 && !sort.SliceIsSorted(newTs, fn) { 1069 sort.Slice(newTs, fn) 1070 } 1071 1072 tombstones[string(k)] = newTs 1073 1074 // We need to see if all the tombstones end up deleting the entire series. This 1075 // could happen if their is one tombstore with min,max time spanning all the block 1076 // time ranges or from multiple smaller tombstones the delete segments. To detect 1077 // this cases, we use a window starting at the first tombstone and grow it be each 1078 // tombstone that is immediately adjacent to the current window or if it overlaps. 1079 // If there are any gaps, we abort. 1080 minTs, maxTs := newTs[0].Min, newTs[0].Max 1081 for j := 1; j < len(newTs); j++ { 1082 prevTs := newTs[j-1] 1083 ts := newTs[j] 1084 1085 // Make sure all the tombstone line up for a continuous range. We don't 1086 // want to have two small deletes on each edges end up causing us to 1087 // remove the full key. 1088 if prevTs.Max != ts.Min-1 && !prevTs.Overlaps(ts.Min, ts.Max) { 1089 minTs, maxTs = int64(math.MaxInt64), int64(math.MinInt64) 1090 break 1091 } 1092 1093 if ts.Min < minTs { 1094 minTs = ts.Min 1095 } 1096 if ts.Max > maxTs { 1097 maxTs = ts.Max 1098 } 1099 } 1100 1101 // If we have a fully deleted series, delete it all of it. 1102 if minTs <= min && maxTs >= max { 1103 fullKeys = append(fullKeys, keys[0]) 1104 keys = keys[1:] 1105 continue 1106 } 1107 } 1108 1109 // Delete all the keys that fully deleted in bulk 1110 if len(fullKeys) > 0 { 1111 d.Delete(fullKeys) 1112 } 1113 1114 if len(tombstones) == 0 { 1115 return 1116 } 1117 1118 d.mu.Lock() 1119 for k, v := range tombstones { 1120 d.tombstones[k] = v 1121 } 1122 d.mu.Unlock() 1123} 1124 1125// TombstoneRange returns ranges of time that are deleted for the given key. 1126func (d *indirectIndex) TombstoneRange(key []byte) []TimeRange { 1127 d.mu.RLock() 1128 r := d.tombstones[string(key)] 1129 d.mu.RUnlock() 1130 return r 1131} 1132 1133// Contains return true if the given key exists in the index. 1134func (d *indirectIndex) Contains(key []byte) bool { 1135 return len(d.Entries(key)) > 0 1136} 1137 1138// ContainsValue returns true if key and time might exist in this file. 1139func (d *indirectIndex) ContainsValue(key []byte, timestamp int64) bool { 1140 entry := d.Entry(key, timestamp) 1141 if entry == nil { 1142 return false 1143 } 1144 1145 d.mu.RLock() 1146 tombstones := d.tombstones[string(key)] 1147 d.mu.RUnlock() 1148 1149 for _, t := range tombstones { 1150 if t.Min <= timestamp && t.Max >= timestamp { 1151 return false 1152 } 1153 } 1154 return true 1155} 1156 1157// Type returns the block type of the values stored for the key. 1158func (d *indirectIndex) Type(key []byte) (byte, error) { 1159 d.mu.RLock() 1160 defer d.mu.RUnlock() 1161 1162 ofs := d.search(key) 1163 if ofs < len(d.b) { 1164 n, _ := readKey(d.b[ofs:]) 1165 ofs += n 1166 return d.b[ofs], nil 1167 } 1168 return 0, fmt.Errorf("key does not exist: %s", key) 1169} 1170 1171// OverlapsTimeRange returns true if the time range of the file intersect min and max. 1172func (d *indirectIndex) OverlapsTimeRange(min, max int64) bool { 1173 return d.minTime <= max && d.maxTime >= min 1174} 1175 1176// OverlapsKeyRange returns true if the min and max keys of the file overlap the arguments min and max. 1177func (d *indirectIndex) OverlapsKeyRange(min, max []byte) bool { 1178 return bytes.Compare(d.minKey, max) <= 0 && bytes.Compare(d.maxKey, min) >= 0 1179} 1180 1181// KeyRange returns the min and max keys in the index. 1182func (d *indirectIndex) KeyRange() ([]byte, []byte) { 1183 return d.minKey, d.maxKey 1184} 1185 1186// TimeRange returns the min and max time across all keys in the index. 1187func (d *indirectIndex) TimeRange() (int64, int64) { 1188 return d.minTime, d.maxTime 1189} 1190 1191// MarshalBinary returns a byte slice encoded version of the index. 1192func (d *indirectIndex) MarshalBinary() ([]byte, error) { 1193 d.mu.RLock() 1194 defer d.mu.RUnlock() 1195 1196 return d.b, nil 1197} 1198 1199// UnmarshalBinary populates an index from an encoded byte slice 1200// representation of an index. 1201func (d *indirectIndex) UnmarshalBinary(b []byte) error { 1202 d.mu.Lock() 1203 defer d.mu.Unlock() 1204 1205 // Keep a reference to the actual index bytes 1206 d.b = b 1207 if len(b) == 0 { 1208 return nil 1209 } 1210 1211 //var minKey, maxKey []byte 1212 var minTime, maxTime int64 = math.MaxInt64, 0 1213 1214 // To create our "indirect" index, we need to find the location of all the keys in 1215 // the raw byte slice. The keys are listed once each (in sorted order). Following 1216 // each key is a time ordered list of index entry blocks for that key. The loop below 1217 // basically skips across the slice keeping track of the counter when we are at a key 1218 // field. 1219 var i int32 1220 var offsets []int32 1221 iMax := int32(len(b)) 1222 for i < iMax { 1223 offsets = append(offsets, i) 1224 1225 // Skip to the start of the values 1226 // key length value (2) + type (1) + length of key 1227 if i+2 >= iMax { 1228 return fmt.Errorf("indirectIndex: not enough data for key length value") 1229 } 1230 i += 3 + int32(binary.BigEndian.Uint16(b[i:i+2])) 1231 1232 // count of index entries 1233 if i+indexCountSize >= iMax { 1234 return fmt.Errorf("indirectIndex: not enough data for index entries count") 1235 } 1236 count := int32(binary.BigEndian.Uint16(b[i : i+indexCountSize])) 1237 i += indexCountSize 1238 1239 // Find the min time for the block 1240 if i+8 >= iMax { 1241 return fmt.Errorf("indirectIndex: not enough data for min time") 1242 } 1243 minT := int64(binary.BigEndian.Uint64(b[i : i+8])) 1244 if minT < minTime { 1245 minTime = minT 1246 } 1247 1248 i += (count - 1) * indexEntrySize 1249 1250 // Find the max time for the block 1251 if i+16 >= iMax { 1252 return fmt.Errorf("indirectIndex: not enough data for max time") 1253 } 1254 maxT := int64(binary.BigEndian.Uint64(b[i+8 : i+16])) 1255 if maxT > maxTime { 1256 maxTime = maxT 1257 } 1258 1259 i += indexEntrySize 1260 } 1261 1262 firstOfs := offsets[0] 1263 _, key := readKey(b[firstOfs:]) 1264 d.minKey = key 1265 1266 lastOfs := offsets[len(offsets)-1] 1267 _, key = readKey(b[lastOfs:]) 1268 d.maxKey = key 1269 1270 d.minTime = minTime 1271 d.maxTime = maxTime 1272 1273 var err error 1274 d.offsets, err = mmap(nil, 0, len(offsets)*4) 1275 if err != nil { 1276 return err 1277 } 1278 for i, v := range offsets { 1279 binary.BigEndian.PutUint32(d.offsets[i*4:i*4+4], uint32(v)) 1280 } 1281 1282 return nil 1283} 1284 1285// Size returns the size of the current index in bytes. 1286func (d *indirectIndex) Size() uint32 { 1287 d.mu.RLock() 1288 defer d.mu.RUnlock() 1289 1290 return uint32(len(d.b)) 1291} 1292 1293func (d *indirectIndex) Close() error { 1294 // Windows doesn't use the anonymous map for the offsets index 1295 if runtime.GOOS == "windows" { 1296 return nil 1297 } 1298 return munmap(d.offsets[:cap(d.offsets)]) 1299} 1300 1301// mmapAccess is mmap based block accessor. It access blocks through an 1302// MMAP file interface. 1303type mmapAccessor struct { 1304 accessCount uint64 // Counter incremented everytime the mmapAccessor is accessed 1305 freeCount uint64 // Counter to determine whether the accessor can free its resources 1306 1307 mmapWillNeed bool // If true then mmap advise value MADV_WILLNEED will be provided the kernel for b. 1308 1309 mu sync.RWMutex 1310 b []byte 1311 f *os.File 1312 1313 index *indirectIndex 1314} 1315 1316func (m *mmapAccessor) init() (*indirectIndex, error) { 1317 m.mu.Lock() 1318 defer m.mu.Unlock() 1319 1320 if err := verifyVersion(m.f); err != nil { 1321 return nil, err 1322 } 1323 1324 var err error 1325 1326 if _, err := m.f.Seek(0, 0); err != nil { 1327 return nil, err 1328 } 1329 1330 stat, err := m.f.Stat() 1331 if err != nil { 1332 return nil, err 1333 } 1334 1335 m.b, err = mmap(m.f, 0, int(stat.Size())) 1336 if err != nil { 1337 return nil, err 1338 } 1339 if len(m.b) < 8 { 1340 return nil, fmt.Errorf("mmapAccessor: byte slice too small for indirectIndex") 1341 } 1342 1343 // Hint to the kernel that we will be reading the file. It would be better to hint 1344 // that we will be reading the index section, but that's not been 1345 // implemented as yet. 1346 if m.mmapWillNeed { 1347 if err := madviseWillNeed(m.b); err != nil { 1348 return nil, err 1349 } 1350 } 1351 1352 indexOfsPos := len(m.b) - 8 1353 indexStart := binary.BigEndian.Uint64(m.b[indexOfsPos : indexOfsPos+8]) 1354 if indexStart >= uint64(indexOfsPos) { 1355 return nil, fmt.Errorf("mmapAccessor: invalid indexStart") 1356 } 1357 1358 m.index = NewIndirectIndex() 1359 if err := m.index.UnmarshalBinary(m.b[indexStart:indexOfsPos]); err != nil { 1360 return nil, err 1361 } 1362 1363 // Allow resources to be freed immediately if requested 1364 m.incAccess() 1365 atomic.StoreUint64(&m.freeCount, 1) 1366 1367 return m.index, nil 1368} 1369 1370func (m *mmapAccessor) free() error { 1371 accessCount := atomic.LoadUint64(&m.accessCount) 1372 freeCount := atomic.LoadUint64(&m.freeCount) 1373 1374 // Already freed everything. 1375 if freeCount == 0 && accessCount == 0 { 1376 return nil 1377 } 1378 1379 // Were there accesses after the last time we tried to free? 1380 // If so, don't free anything and record the access count that we 1381 // see now for the next check. 1382 if accessCount != freeCount { 1383 atomic.StoreUint64(&m.freeCount, accessCount) 1384 return nil 1385 } 1386 1387 // Reset both counters to zero to indicate that we have freed everything. 1388 atomic.StoreUint64(&m.accessCount, 0) 1389 atomic.StoreUint64(&m.freeCount, 0) 1390 1391 m.mu.RLock() 1392 defer m.mu.RUnlock() 1393 1394 return madviseDontNeed(m.b) 1395} 1396 1397func (m *mmapAccessor) incAccess() { 1398 atomic.AddUint64(&m.accessCount, 1) 1399} 1400 1401func (m *mmapAccessor) rename(path string) error { 1402 m.incAccess() 1403 1404 m.mu.Lock() 1405 defer m.mu.Unlock() 1406 1407 err := munmap(m.b) 1408 if err != nil { 1409 return err 1410 } 1411 1412 if err := m.f.Close(); err != nil { 1413 return err 1414 } 1415 1416 if err := file.RenameFile(m.f.Name(), path); err != nil { 1417 return err 1418 } 1419 1420 m.f, err = os.Open(path) 1421 if err != nil { 1422 return err 1423 } 1424 1425 if _, err := m.f.Seek(0, 0); err != nil { 1426 return err 1427 } 1428 1429 stat, err := m.f.Stat() 1430 if err != nil { 1431 return err 1432 } 1433 1434 m.b, err = mmap(m.f, 0, int(stat.Size())) 1435 if err != nil { 1436 return err 1437 } 1438 1439 if m.mmapWillNeed { 1440 return madviseWillNeed(m.b) 1441 } 1442 return nil 1443} 1444 1445func (m *mmapAccessor) read(key []byte, timestamp int64) ([]Value, error) { 1446 entry := m.index.Entry(key, timestamp) 1447 if entry == nil { 1448 return nil, nil 1449 } 1450 1451 return m.readBlock(entry, nil) 1452} 1453 1454func (m *mmapAccessor) readBlock(entry *IndexEntry, values []Value) ([]Value, error) { 1455 m.incAccess() 1456 1457 m.mu.RLock() 1458 defer m.mu.RUnlock() 1459 1460 if int64(len(m.b)) < entry.Offset+int64(entry.Size) { 1461 return nil, ErrTSMClosed 1462 } 1463 //TODO: Validate checksum 1464 var err error 1465 values, err = DecodeBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values) 1466 if err != nil { 1467 return nil, err 1468 } 1469 1470 return values, nil 1471} 1472 1473func (m *mmapAccessor) readBytes(entry *IndexEntry, b []byte) (uint32, []byte, error) { 1474 m.incAccess() 1475 1476 m.mu.RLock() 1477 if int64(len(m.b)) < entry.Offset+int64(entry.Size) { 1478 m.mu.RUnlock() 1479 return 0, nil, ErrTSMClosed 1480 } 1481 1482 // return the bytes after the 4 byte checksum 1483 crc, block := binary.BigEndian.Uint32(m.b[entry.Offset:entry.Offset+4]), m.b[entry.Offset+4:entry.Offset+int64(entry.Size)] 1484 m.mu.RUnlock() 1485 1486 return crc, block, nil 1487} 1488 1489// readAll returns all values for a key in all blocks. 1490func (m *mmapAccessor) readAll(key []byte) ([]Value, error) { 1491 m.incAccess() 1492 1493 blocks := m.index.Entries(key) 1494 if len(blocks) == 0 { 1495 return nil, nil 1496 } 1497 1498 tombstones := m.index.TombstoneRange(key) 1499 1500 m.mu.RLock() 1501 defer m.mu.RUnlock() 1502 1503 var temp []Value 1504 var err error 1505 var values []Value 1506 for _, block := range blocks { 1507 var skip bool 1508 for _, t := range tombstones { 1509 // Should we skip this block because it contains points that have been deleted 1510 if t.Min <= block.MinTime && t.Max >= block.MaxTime { 1511 skip = true 1512 break 1513 } 1514 } 1515 1516 if skip { 1517 continue 1518 } 1519 //TODO: Validate checksum 1520 temp = temp[:0] 1521 // The +4 is the 4 byte checksum length 1522 temp, err = DecodeBlock(m.b[block.Offset+4:block.Offset+int64(block.Size)], temp) 1523 if err != nil { 1524 return nil, err 1525 } 1526 1527 // Filter out any values that were deleted 1528 for _, t := range tombstones { 1529 temp = Values(temp).Exclude(t.Min, t.Max) 1530 } 1531 1532 values = append(values, temp...) 1533 } 1534 1535 return values, nil 1536} 1537 1538func (m *mmapAccessor) path() string { 1539 m.mu.RLock() 1540 path := m.f.Name() 1541 m.mu.RUnlock() 1542 return path 1543} 1544 1545func (m *mmapAccessor) close() error { 1546 m.mu.Lock() 1547 defer m.mu.Unlock() 1548 1549 if m.b == nil { 1550 return nil 1551 } 1552 1553 err := munmap(m.b) 1554 if err != nil { 1555 return err 1556 } 1557 1558 m.b = nil 1559 return m.f.Close() 1560} 1561 1562type indexEntries struct { 1563 Type byte 1564 entries []IndexEntry 1565} 1566 1567func (a *indexEntries) Len() int { return len(a.entries) } 1568func (a *indexEntries) Swap(i, j int) { a.entries[i], a.entries[j] = a.entries[j], a.entries[i] } 1569func (a *indexEntries) Less(i, j int) bool { 1570 return a.entries[i].MinTime < a.entries[j].MinTime 1571} 1572 1573func (a *indexEntries) MarshalBinary() ([]byte, error) { 1574 buf := make([]byte, len(a.entries)*indexEntrySize) 1575 1576 for i, entry := range a.entries { 1577 entry.AppendTo(buf[indexEntrySize*i:]) 1578 } 1579 1580 return buf, nil 1581} 1582 1583func (a *indexEntries) WriteTo(w io.Writer) (total int64, err error) { 1584 var buf [indexEntrySize]byte 1585 var n int 1586 1587 for _, entry := range a.entries { 1588 entry.AppendTo(buf[:]) 1589 n, err = w.Write(buf[:]) 1590 total += int64(n) 1591 if err != nil { 1592 return total, err 1593 } 1594 } 1595 1596 return total, nil 1597} 1598 1599func readKey(b []byte) (n int, key []byte) { 1600 // 2 byte size of key 1601 n, size := 2, int(binary.BigEndian.Uint16(b[:2])) 1602 1603 // N byte key 1604 key = b[n : n+size] 1605 1606 n += len(key) 1607 return 1608} 1609 1610func readEntries(b []byte, entries *indexEntries) (n int, err error) { 1611 if len(b) < 1+indexCountSize { 1612 return 0, fmt.Errorf("readEntries: data too short for headers") 1613 } 1614 1615 // 1 byte block type 1616 entries.Type = b[n] 1617 n++ 1618 1619 // 2 byte count of index entries 1620 count := int(binary.BigEndian.Uint16(b[n : n+indexCountSize])) 1621 n += indexCountSize 1622 1623 if cap(entries.entries) < count { 1624 entries.entries = make([]IndexEntry, count) 1625 } else { 1626 entries.entries = entries.entries[:count] 1627 } 1628 1629 b = b[indexCountSize+indexTypeSize:] 1630 for i := 0; i < len(entries.entries); i++ { 1631 if err = entries.entries[i].UnmarshalBinary(b); err != nil { 1632 return 0, fmt.Errorf("readEntries: unmarshal error: %v", err) 1633 } 1634 b = b[indexEntrySize:] 1635 } 1636 1637 n += count * indexEntrySize 1638 1639 return 1640} 1641