1package tsm1 2 3import ( 4 "bytes" 5 "encoding/binary" 6 "fmt" 7 "io" 8 "math" 9 "os" 10 "runtime" 11 "sort" 12 "sync" 13 "sync/atomic" 14 15 "github.com/influxdata/influxdb/pkg/bytesutil" 16 "github.com/influxdata/influxdb/pkg/file" 17 "github.com/influxdata/influxdb/tsdb" 18) 19 20// ErrFileInUse is returned when attempting to remove or close a TSM file that is still being used. 21var ErrFileInUse = fmt.Errorf("file still in use") 22 23// nilOffset is the value written to the offsets to indicate that position is deleted. The value is the max 24// uint32 which is an invalid position. We don't use 0 as 0 is actually a valid position. 25var nilOffset = []byte{255, 255, 255, 255} 26 27// TSMReader is a reader for a TSM file. 28type TSMReader struct { 29 // refs is the count of active references to this reader. 30 refs int64 31 refsWG sync.WaitGroup 32 33 madviseWillNeed bool // Hint to the kernel with MADV_WILLNEED. 34 mu sync.RWMutex 35 36 // accessor provides access and decoding of blocks for the reader. 37 accessor blockAccessor 38 39 // index is the index of all blocks. 40 index TSMIndex 41 42 // tombstoner ensures tombstoned keys are not available by the index. 43 tombstoner *Tombstoner 44 45 // size is the size of the file on disk. 46 size int64 47 48 // lastModified is the last time this file was modified on disk 49 lastModified int64 50 51 // deleteMu limits concurrent deletes 52 deleteMu sync.Mutex 53} 54 55// TSMIndex represent the index section of a TSM file. The index records all 56// blocks, their locations, sizes, min and max times. 57type TSMIndex interface { 58 // Delete removes the given keys from the index. 59 Delete(keys [][]byte) 60 61 // DeleteRange removes the given keys with data between minTime and maxTime from the index. 62 DeleteRange(keys [][]byte, minTime, maxTime int64) 63 64 // ContainsKey returns true if the given key may exist in the index. This func is faster than 65 // Contains but, may return false positives. 66 ContainsKey(key []byte) bool 67 68 // Contains return true if the given key exists in the index. 69 Contains(key []byte) bool 70 71 // ContainsValue returns true if key and time might exist in this file. This function could 72 // return true even though the actual point does not exists. For example, the key may 73 // exist in this file, but not have a point exactly at time t. 74 ContainsValue(key []byte, timestamp int64) bool 75 76 // Entries returns all index entries for a key. 77 Entries(key []byte) []IndexEntry 78 79 // ReadEntries reads the index entries for key into entries. 80 ReadEntries(key []byte, entries *[]IndexEntry) []IndexEntry 81 82 // Entry returns the index entry for the specified key and timestamp. If no entry 83 // matches the key and timestamp, nil is returned. 84 Entry(key []byte, timestamp int64) *IndexEntry 85 86 // Key returns the key in the index at the given position, using entries to avoid allocations. 87 Key(index int, entries *[]IndexEntry) ([]byte, byte, []IndexEntry) 88 89 // KeyAt returns the key in the index at the given position. 90 KeyAt(index int) ([]byte, byte) 91 92 // KeyCount returns the count of unique keys in the index. 93 KeyCount() int 94 95 // Seek returns the position in the index where key <= value in the index. 96 Seek(key []byte) int 97 98 // OverlapsTimeRange returns true if the time range of the file intersect min and max. 99 OverlapsTimeRange(min, max int64) bool 100 101 // OverlapsKeyRange returns true if the min and max keys of the file overlap the arguments min and max. 102 OverlapsKeyRange(min, max []byte) bool 103 104 // Size returns the size of the current index in bytes. 105 Size() uint32 106 107 // TimeRange returns the min and max time across all keys in the file. 108 TimeRange() (int64, int64) 109 110 // TombstoneRange returns ranges of time that are deleted for the given key. 111 TombstoneRange(key []byte) []TimeRange 112 113 // KeyRange returns the min and max keys in the file. 114 KeyRange() ([]byte, []byte) 115 116 // Type returns the block type of the values stored for the key. Returns one of 117 // BlockFloat64, BlockInt64, BlockBool, BlockString. If key does not exist, 118 // an error is returned. 119 Type(key []byte) (byte, error) 120 121 // UnmarshalBinary populates an index from an encoded byte slice 122 // representation of an index. 123 UnmarshalBinary(b []byte) error 124 125 // Close closes the index and releases any resources. 126 Close() error 127} 128 129// BlockIterator allows iterating over each block in a TSM file in order. It provides 130// raw access to the block bytes without decoding them. 131type BlockIterator struct { 132 r *TSMReader 133 134 // i is the current key index 135 i int 136 137 // n is the total number of keys 138 n int 139 140 key []byte 141 cache []IndexEntry 142 entries []IndexEntry 143 err error 144 typ byte 145} 146 147// PeekNext returns the next key to be iterated or an empty string. 148func (b *BlockIterator) PeekNext() []byte { 149 if len(b.entries) > 1 { 150 return b.key 151 } else if b.n-b.i > 1 { 152 key, _ := b.r.KeyAt(b.i + 1) 153 return key 154 } 155 return nil 156} 157 158// Next returns true if there are more blocks to iterate through. 159func (b *BlockIterator) Next() bool { 160 if b.err != nil { 161 return false 162 } 163 164 if b.n-b.i == 0 && len(b.entries) == 0 { 165 return false 166 } 167 168 if len(b.entries) > 0 { 169 b.entries = b.entries[1:] 170 if len(b.entries) > 0 { 171 return true 172 } 173 } 174 175 if b.n-b.i > 0 { 176 b.key, b.typ, b.entries = b.r.Key(b.i, &b.cache) 177 b.i++ 178 179 // If there were deletes on the TSMReader, then our index is now off and we 180 // can't proceed. What we just read may not actually the next block. 181 if b.n != b.r.KeyCount() { 182 b.err = fmt.Errorf("delete during iteration") 183 return false 184 } 185 186 if len(b.entries) > 0 { 187 return true 188 } 189 } 190 191 return false 192} 193 194// Read reads information about the next block to be iterated. 195func (b *BlockIterator) Read() (key []byte, minTime int64, maxTime int64, typ byte, checksum uint32, buf []byte, err error) { 196 if b.err != nil { 197 return nil, 0, 0, 0, 0, nil, b.err 198 } 199 checksum, buf, err = b.r.ReadBytes(&b.entries[0], nil) 200 if err != nil { 201 return nil, 0, 0, 0, 0, nil, err 202 } 203 return b.key, b.entries[0].MinTime, b.entries[0].MaxTime, b.typ, checksum, buf, err 204} 205 206// Err returns any errors encounter during iteration. 207func (b *BlockIterator) Err() error { 208 return b.err 209} 210 211type tsmReaderOption func(*TSMReader) 212 213// WithMadviseWillNeed is an option for specifying whether to provide a MADV_WILL need hint to the kernel. 214var WithMadviseWillNeed = func(willNeed bool) tsmReaderOption { 215 return func(r *TSMReader) { 216 r.madviseWillNeed = willNeed 217 } 218} 219 220// NewTSMReader returns a new TSMReader from the given file. 221func NewTSMReader(f *os.File, options ...tsmReaderOption) (*TSMReader, error) { 222 t := &TSMReader{} 223 for _, option := range options { 224 option(t) 225 } 226 227 stat, err := f.Stat() 228 if err != nil { 229 return nil, err 230 } 231 t.size = stat.Size() 232 t.lastModified = stat.ModTime().UnixNano() 233 t.accessor = &mmapAccessor{ 234 f: f, 235 mmapWillNeed: t.madviseWillNeed, 236 } 237 238 index, err := t.accessor.init() 239 if err != nil { 240 return nil, err 241 } 242 243 t.index = index 244 t.tombstoner = NewTombstoner(t.Path(), index.ContainsKey) 245 246 if err := t.applyTombstones(); err != nil { 247 return nil, err 248 } 249 250 return t, nil 251} 252 253// WithObserver sets the observer for the TSM reader. 254func (t *TSMReader) WithObserver(obs tsdb.FileStoreObserver) { 255 t.tombstoner.WithObserver(obs) 256} 257 258func (t *TSMReader) applyTombstones() error { 259 var cur, prev Tombstone 260 batch := make([][]byte, 0, 4096) 261 262 if err := t.tombstoner.Walk(func(ts Tombstone) error { 263 cur = ts 264 if len(batch) > 0 { 265 if prev.Min != cur.Min || prev.Max != cur.Max { 266 t.index.DeleteRange(batch, prev.Min, prev.Max) 267 batch = batch[:0] 268 } 269 } 270 271 // Copy the tombstone key and re-use the buffers to avoid allocations 272 n := len(batch) 273 batch = batch[:n+1] 274 if cap(batch[n]) < len(ts.Key) { 275 batch[n] = make([]byte, len(ts.Key)) 276 } else { 277 batch[n] = batch[n][:len(ts.Key)] 278 } 279 copy(batch[n], ts.Key) 280 281 if len(batch) >= 4096 { 282 t.index.DeleteRange(batch, prev.Min, prev.Max) 283 batch = batch[:0] 284 } 285 286 prev = ts 287 return nil 288 }); err != nil { 289 return fmt.Errorf("init: read tombstones: %v", err) 290 } 291 292 if len(batch) > 0 { 293 t.index.DeleteRange(batch, cur.Min, cur.Max) 294 } 295 return nil 296} 297 298func (t *TSMReader) Free() error { 299 t.mu.RLock() 300 defer t.mu.RUnlock() 301 return t.accessor.free() 302} 303 304// Path returns the path of the file the TSMReader was initialized with. 305func (t *TSMReader) Path() string { 306 t.mu.RLock() 307 p := t.accessor.path() 308 t.mu.RUnlock() 309 return p 310} 311 312// Key returns the key and the underlying entry at the numeric index. 313func (t *TSMReader) Key(index int, entries *[]IndexEntry) ([]byte, byte, []IndexEntry) { 314 return t.index.Key(index, entries) 315} 316 317// KeyAt returns the key and key type at position idx in the index. 318func (t *TSMReader) KeyAt(idx int) ([]byte, byte) { 319 return t.index.KeyAt(idx) 320} 321 322func (t *TSMReader) Seek(key []byte) int { 323 return t.index.Seek(key) 324} 325 326// ReadAt returns the values corresponding to the given index entry. 327func (t *TSMReader) ReadAt(entry *IndexEntry, vals []Value) ([]Value, error) { 328 t.mu.RLock() 329 v, err := t.accessor.readBlock(entry, vals) 330 t.mu.RUnlock() 331 return v, err 332} 333 334// Read returns the values corresponding to the block at the given key and timestamp. 335func (t *TSMReader) Read(key []byte, timestamp int64) ([]Value, error) { 336 t.mu.RLock() 337 v, err := t.accessor.read(key, timestamp) 338 t.mu.RUnlock() 339 return v, err 340} 341 342// ReadAll returns all values for a key in all blocks. 343func (t *TSMReader) ReadAll(key []byte) ([]Value, error) { 344 t.mu.RLock() 345 v, err := t.accessor.readAll(key) 346 t.mu.RUnlock() 347 return v, err 348} 349 350func (t *TSMReader) ReadBytes(e *IndexEntry, b []byte) (uint32, []byte, error) { 351 t.mu.RLock() 352 n, v, err := t.accessor.readBytes(e, b) 353 t.mu.RUnlock() 354 return n, v, err 355} 356 357// Type returns the type of values stored at the given key. 358func (t *TSMReader) Type(key []byte) (byte, error) { 359 return t.index.Type(key) 360} 361 362// Close closes the TSMReader. 363func (t *TSMReader) Close() error { 364 t.refsWG.Wait() 365 366 t.mu.Lock() 367 defer t.mu.Unlock() 368 369 if err := t.accessor.close(); err != nil { 370 return err 371 } 372 373 return t.index.Close() 374} 375 376// Ref records a usage of this TSMReader. If there are active references 377// when the reader is closed or removed, the reader will remain open until 378// there are no more references. 379func (t *TSMReader) Ref() { 380 atomic.AddInt64(&t.refs, 1) 381 t.refsWG.Add(1) 382} 383 384// Unref removes a usage record of this TSMReader. If the Reader was closed 385// by another goroutine while there were active references, the file will 386// be closed and remove 387func (t *TSMReader) Unref() { 388 atomic.AddInt64(&t.refs, -1) 389 t.refsWG.Done() 390} 391 392// InUse returns whether the TSMReader currently has any active references. 393func (t *TSMReader) InUse() bool { 394 refs := atomic.LoadInt64(&t.refs) 395 return refs > 0 396} 397 398// Remove removes any underlying files stored on disk for this reader. 399func (t *TSMReader) Remove() error { 400 t.mu.Lock() 401 defer t.mu.Unlock() 402 return t.remove() 403} 404 405// Rename renames the underlying file to the new path. 406func (t *TSMReader) Rename(path string) error { 407 t.mu.Lock() 408 defer t.mu.Unlock() 409 return t.accessor.rename(path) 410} 411 412// Remove removes any underlying files stored on disk for this reader. 413func (t *TSMReader) remove() error { 414 path := t.accessor.path() 415 416 if t.InUse() { 417 return ErrFileInUse 418 } 419 420 if path != "" { 421 err := os.RemoveAll(path) 422 if err != nil { 423 return err 424 } 425 } 426 427 if err := t.tombstoner.Delete(); err != nil { 428 return err 429 } 430 return nil 431} 432 433// Contains returns whether the given key is present in the index. 434func (t *TSMReader) Contains(key []byte) bool { 435 return t.index.Contains(key) 436} 437 438// ContainsValue returns true if key and time might exists in this file. This function could 439// return true even though the actual point does not exist. For example, the key may 440// exist in this file, but not have a point exactly at time t. 441func (t *TSMReader) ContainsValue(key []byte, ts int64) bool { 442 return t.index.ContainsValue(key, ts) 443} 444 445// DeleteRange removes the given points for keys between minTime and maxTime. The series 446// keys passed in must be sorted. 447func (t *TSMReader) DeleteRange(keys [][]byte, minTime, maxTime int64) error { 448 if len(keys) == 0 { 449 return nil 450 } 451 452 batch := t.BatchDelete() 453 if err := batch.DeleteRange(keys, minTime, maxTime); err != nil { 454 batch.Rollback() 455 return err 456 } 457 return batch.Commit() 458} 459 460// Delete deletes blocks indicated by keys. 461func (t *TSMReader) Delete(keys [][]byte) error { 462 if err := t.tombstoner.Add(keys); err != nil { 463 return err 464 } 465 466 if err := t.tombstoner.Flush(); err != nil { 467 return err 468 } 469 470 t.index.Delete(keys) 471 return nil 472} 473 474// OverlapsTimeRange returns true if the time range of the file intersect min and max. 475func (t *TSMReader) OverlapsTimeRange(min, max int64) bool { 476 return t.index.OverlapsTimeRange(min, max) 477} 478 479// OverlapsKeyRange returns true if the key range of the file intersect min and max. 480func (t *TSMReader) OverlapsKeyRange(min, max []byte) bool { 481 return t.index.OverlapsKeyRange(min, max) 482} 483 484// TimeRange returns the min and max time across all keys in the file. 485func (t *TSMReader) TimeRange() (int64, int64) { 486 return t.index.TimeRange() 487} 488 489// KeyRange returns the min and max key across all keys in the file. 490func (t *TSMReader) KeyRange() ([]byte, []byte) { 491 return t.index.KeyRange() 492} 493 494// KeyCount returns the count of unique keys in the TSMReader. 495func (t *TSMReader) KeyCount() int { 496 return t.index.KeyCount() 497} 498 499// Entries returns all index entries for key. 500func (t *TSMReader) Entries(key []byte) []IndexEntry { 501 return t.index.Entries(key) 502} 503 504// ReadEntries reads the index entries for key into entries. 505func (t *TSMReader) ReadEntries(key []byte, entries *[]IndexEntry) []IndexEntry { 506 return t.index.ReadEntries(key, entries) 507} 508 509// IndexSize returns the size of the index in bytes. 510func (t *TSMReader) IndexSize() uint32 { 511 return t.index.Size() 512} 513 514// Size returns the size of the underlying file in bytes. 515func (t *TSMReader) Size() uint32 { 516 t.mu.RLock() 517 size := t.size 518 t.mu.RUnlock() 519 return uint32(size) 520} 521 522// LastModified returns the last time the underlying file was modified. 523func (t *TSMReader) LastModified() int64 { 524 t.mu.RLock() 525 lm := t.lastModified 526 for _, ts := range t.tombstoner.TombstoneFiles() { 527 if ts.LastModified > lm { 528 lm = ts.LastModified 529 } 530 } 531 t.mu.RUnlock() 532 return lm 533} 534 535// HasTombstones return true if there are any tombstone entries recorded. 536func (t *TSMReader) HasTombstones() bool { 537 t.mu.RLock() 538 b := t.tombstoner.HasTombstones() 539 t.mu.RUnlock() 540 return b 541} 542 543// TombstoneFiles returns any tombstone files associated with this TSM file. 544func (t *TSMReader) TombstoneFiles() []FileStat { 545 t.mu.RLock() 546 fs := t.tombstoner.TombstoneFiles() 547 t.mu.RUnlock() 548 return fs 549} 550 551// TombstoneRange returns ranges of time that are deleted for the given key. 552func (t *TSMReader) TombstoneRange(key []byte) []TimeRange { 553 t.mu.RLock() 554 tr := t.index.TombstoneRange(key) 555 t.mu.RUnlock() 556 return tr 557} 558 559// Stats returns the FileStat for the TSMReader's underlying file. 560func (t *TSMReader) Stats() FileStat { 561 minTime, maxTime := t.index.TimeRange() 562 minKey, maxKey := t.index.KeyRange() 563 return FileStat{ 564 Path: t.Path(), 565 Size: t.Size(), 566 LastModified: t.LastModified(), 567 MinTime: minTime, 568 MaxTime: maxTime, 569 MinKey: minKey, 570 MaxKey: maxKey, 571 HasTombstone: t.tombstoner.HasTombstones(), 572 } 573} 574 575// BlockIterator returns a BlockIterator for the underlying TSM file. 576func (t *TSMReader) BlockIterator() *BlockIterator { 577 return &BlockIterator{ 578 r: t, 579 n: t.index.KeyCount(), 580 } 581} 582 583type BatchDeleter interface { 584 DeleteRange(keys [][]byte, min, max int64) error 585 Commit() error 586 Rollback() error 587} 588 589type batchDelete struct { 590 r *TSMReader 591} 592 593func (b *batchDelete) DeleteRange(keys [][]byte, minTime, maxTime int64) error { 594 if len(keys) == 0 { 595 return nil 596 } 597 598 // If the keys can't exist in this TSM file, skip it. 599 minKey, maxKey := keys[0], keys[len(keys)-1] 600 if !b.r.index.OverlapsKeyRange(minKey, maxKey) { 601 return nil 602 } 603 604 // If the timerange can't exist in this TSM file, skip it. 605 if !b.r.index.OverlapsTimeRange(minTime, maxTime) { 606 return nil 607 } 608 609 if err := b.r.tombstoner.AddRange(keys, minTime, maxTime); err != nil { 610 return err 611 } 612 613 return nil 614} 615 616func (b *batchDelete) Commit() error { 617 defer b.r.deleteMu.Unlock() 618 if err := b.r.tombstoner.Flush(); err != nil { 619 return err 620 } 621 622 return b.r.applyTombstones() 623} 624 625func (b *batchDelete) Rollback() error { 626 defer b.r.deleteMu.Unlock() 627 return b.r.tombstoner.Rollback() 628} 629 630// BatchDelete returns a BatchDeleter. Only a single goroutine may run a BatchDelete at a time. 631// Callers must either Commit or Rollback the operation. 632func (r *TSMReader) BatchDelete() BatchDeleter { 633 r.deleteMu.Lock() 634 return &batchDelete{r: r} 635} 636 637type BatchDeleters []BatchDeleter 638 639func (a BatchDeleters) DeleteRange(keys [][]byte, min, max int64) error { 640 errC := make(chan error, len(a)) 641 for _, b := range a { 642 go func(b BatchDeleter) { errC <- b.DeleteRange(keys, min, max) }(b) 643 } 644 645 var err error 646 for i := 0; i < len(a); i++ { 647 dErr := <-errC 648 if dErr != nil { 649 err = dErr 650 } 651 } 652 return err 653} 654 655func (a BatchDeleters) Commit() error { 656 errC := make(chan error, len(a)) 657 for _, b := range a { 658 go func(b BatchDeleter) { errC <- b.Commit() }(b) 659 } 660 661 var err error 662 for i := 0; i < len(a); i++ { 663 dErr := <-errC 664 if dErr != nil { 665 err = dErr 666 } 667 } 668 return err 669} 670 671func (a BatchDeleters) Rollback() error { 672 errC := make(chan error, len(a)) 673 for _, b := range a { 674 go func(b BatchDeleter) { errC <- b.Rollback() }(b) 675 } 676 677 var err error 678 for i := 0; i < len(a); i++ { 679 dErr := <-errC 680 if dErr != nil { 681 err = dErr 682 } 683 } 684 return err 685} 686 687// indirectIndex is a TSMIndex that uses a raw byte slice representation of an index. This 688// implementation can be used for indexes that may be MMAPed into memory. 689type indirectIndex struct { 690 mu sync.RWMutex 691 692 // indirectIndex works a follows. Assuming we have an index structure in memory as 693 // the diagram below: 694 // 695 // ┌────────────────────────────────────────────────────────────────────┐ 696 // │ Index │ 697 // ├─┬──────────────────────┬──┬───────────────────────┬───┬────────────┘ 698 // │0│ │62│ │145│ 699 // ├─┴───────┬─────────┬────┼──┴──────┬─────────┬──────┼───┴─────┬──────┐ 700 // │Key 1 Len│ Key │... │Key 2 Len│ Key 2 │ ... │ Key 3 │ ... │ 701 // │ 2 bytes │ N bytes │ │ 2 bytes │ N bytes │ │ 2 bytes │ │ 702 // └─────────┴─────────┴────┴─────────┴─────────┴──────┴─────────┴──────┘ 703 704 // We would build an `offsets` slices where each element pointers to the byte location 705 // for the first key in the index slice. 706 707 // ┌────────────────────────────────────────────────────────────────────┐ 708 // │ Offsets │ 709 // ├────┬────┬────┬─────────────────────────────────────────────────────┘ 710 // │ 0 │ 62 │145 │ 711 // └────┴────┴────┘ 712 713 // Using this offset slice we can find `Key 2` by doing a binary search 714 // over the offsets slice. Instead of comparing the value in the offsets 715 // (e.g. `62`), we use that as an index into the underlying index to 716 // retrieve the key at position `62` and perform our comparisons with that. 717 718 // When we have identified the correct position in the index for a given 719 // key, we could perform another binary search or a linear scan. This 720 // should be fast as well since each index entry is 28 bytes and all 721 // contiguous in memory. The current implementation uses a linear scan since the 722 // number of block entries is expected to be < 100 per key. 723 724 // b is the underlying index byte slice. This could be a copy on the heap or an MMAP 725 // slice reference 726 b []byte 727 728 // offsets contains the positions in b for each key. It points to the 2 byte length of 729 // key. 730 offsets []byte 731 732 // minKey, maxKey are the minium and maximum (lexicographically sorted) contained in the 733 // file 734 minKey, maxKey []byte 735 736 // minTime, maxTime are the minimum and maximum times contained in the file across all 737 // series. 738 minTime, maxTime int64 739 740 // tombstones contains only the tombstoned keys with subset of time values deleted. An 741 // entry would exist here if a subset of the points for a key were deleted and the file 742 // had not be re-compacted to remove the points on disk. 743 tombstones map[string][]TimeRange 744} 745 746// TimeRange holds a min and max timestamp. 747type TimeRange struct { 748 Min, Max int64 749} 750 751func (t TimeRange) Overlaps(min, max int64) bool { 752 return t.Min <= max && t.Max >= min 753} 754 755// NewIndirectIndex returns a new indirect index. 756func NewIndirectIndex() *indirectIndex { 757 return &indirectIndex{ 758 tombstones: make(map[string][]TimeRange), 759 } 760} 761 762func (d *indirectIndex) offset(i int) int { 763 if i < 0 || i+4 > len(d.offsets) { 764 return -1 765 } 766 return int(binary.BigEndian.Uint32(d.offsets[i*4 : i*4+4])) 767} 768 769func (d *indirectIndex) Seek(key []byte) int { 770 d.mu.RLock() 771 defer d.mu.RUnlock() 772 return d.searchOffset(key) 773} 774 775// searchOffset searches the offsets slice for key and returns the position in 776// offsets where key would exist. 777func (d *indirectIndex) searchOffset(key []byte) int { 778 // We use a binary search across our indirect offsets (pointers to all the keys 779 // in the index slice). 780 i := bytesutil.SearchBytesFixed(d.offsets, 4, func(x []byte) bool { 781 // i is the position in offsets we are at so get offset it points to 782 offset := int32(binary.BigEndian.Uint32(x)) 783 784 // It's pointing to the start of the key which is a 2 byte length 785 keyLen := int32(binary.BigEndian.Uint16(d.b[offset : offset+2])) 786 787 // See if it matches 788 return bytes.Compare(d.b[offset+2:offset+2+keyLen], key) >= 0 789 }) 790 791 // See if we might have found the right index 792 if i < len(d.offsets) { 793 return int(i / 4) 794 } 795 796 // The key is not in the index. i is the index where it would be inserted so return 797 // a value outside our offset range. 798 return int(len(d.offsets)) / 4 799} 800 801// search returns the byte position of key in the index. If key is not 802// in the index, len(index) is returned. 803func (d *indirectIndex) search(key []byte) int { 804 if !d.ContainsKey(key) { 805 return len(d.b) 806 } 807 808 // We use a binary search across our indirect offsets (pointers to all the keys 809 // in the index slice). 810 // TODO(sgc): this should be inlined to `indirectIndex` as it is only used here 811 i := bytesutil.SearchBytesFixed(d.offsets, 4, func(x []byte) bool { 812 // i is the position in offsets we are at so get offset it points to 813 offset := int32(binary.BigEndian.Uint32(x)) 814 815 // It's pointing to the start of the key which is a 2 byte length 816 keyLen := int32(binary.BigEndian.Uint16(d.b[offset : offset+2])) 817 818 // See if it matches 819 return bytes.Compare(d.b[offset+2:offset+2+keyLen], key) >= 0 820 }) 821 822 // See if we might have found the right index 823 if i < len(d.offsets) { 824 ofs := binary.BigEndian.Uint32(d.offsets[i : i+4]) 825 _, k := readKey(d.b[ofs:]) 826 827 // The search may have returned an i == 0 which could indicated that the value 828 // searched should be inserted at position 0. Make sure the key in the index 829 // matches the search value. 830 if !bytes.Equal(key, k) { 831 return len(d.b) 832 } 833 834 return int(ofs) 835 } 836 837 // The key is not in the index. i is the index where it would be inserted so return 838 // a value outside our offset range. 839 return len(d.b) 840} 841 842// ContainsKey returns true of key may exist in this index. 843func (d *indirectIndex) ContainsKey(key []byte) bool { 844 return bytes.Compare(key, d.minKey) >= 0 && bytes.Compare(key, d.maxKey) <= 0 845} 846 847// Entries returns all index entries for a key. 848func (d *indirectIndex) Entries(key []byte) []IndexEntry { 849 return d.ReadEntries(key, nil) 850} 851 852func (d *indirectIndex) readEntriesAt(ofs int, entries *[]IndexEntry) ([]byte, []IndexEntry) { 853 n, k := readKey(d.b[ofs:]) 854 855 // Read and return all the entries 856 ofs += n 857 var ie indexEntries 858 if entries != nil { 859 ie.entries = *entries 860 } 861 if _, err := readEntries(d.b[ofs:], &ie); err != nil { 862 panic(fmt.Sprintf("error reading entries: %v", err)) 863 } 864 if entries != nil { 865 *entries = ie.entries 866 } 867 return k, ie.entries 868} 869 870// ReadEntries returns all index entries for a key. 871func (d *indirectIndex) ReadEntries(key []byte, entries *[]IndexEntry) []IndexEntry { 872 d.mu.RLock() 873 defer d.mu.RUnlock() 874 875 ofs := d.search(key) 876 if ofs < len(d.b) { 877 k, entries := d.readEntriesAt(ofs, entries) 878 // The search may have returned an i == 0 which could indicated that the value 879 // searched should be inserted at position 0. Make sure the key in the index 880 // matches the search value. 881 if !bytes.Equal(key, k) { 882 return nil 883 } 884 885 return entries 886 } 887 888 // The key is not in the index. i is the index where it would be inserted. 889 return nil 890} 891 892// Entry returns the index entry for the specified key and timestamp. If no entry 893// matches the key an timestamp, nil is returned. 894func (d *indirectIndex) Entry(key []byte, timestamp int64) *IndexEntry { 895 entries := d.Entries(key) 896 for _, entry := range entries { 897 if entry.Contains(timestamp) { 898 return &entry 899 } 900 } 901 return nil 902} 903 904// Key returns the key in the index at the given position. 905func (d *indirectIndex) Key(idx int, entries *[]IndexEntry) ([]byte, byte, []IndexEntry) { 906 d.mu.RLock() 907 defer d.mu.RUnlock() 908 909 if idx < 0 || idx*4+4 > len(d.offsets) { 910 return nil, 0, nil 911 } 912 ofs := binary.BigEndian.Uint32(d.offsets[idx*4 : idx*4+4]) 913 n, key := readKey(d.b[ofs:]) 914 915 typ := d.b[int(ofs)+n] 916 917 var ie indexEntries 918 if entries != nil { 919 ie.entries = *entries 920 } 921 if _, err := readEntries(d.b[int(ofs)+n:], &ie); err != nil { 922 return nil, 0, nil 923 } 924 if entries != nil { 925 *entries = ie.entries 926 } 927 928 return key, typ, ie.entries 929} 930 931// KeyAt returns the key in the index at the given position. 932func (d *indirectIndex) KeyAt(idx int) ([]byte, byte) { 933 d.mu.RLock() 934 935 if idx < 0 || idx*4+4 > len(d.offsets) { 936 d.mu.RUnlock() 937 return nil, 0 938 } 939 ofs := int32(binary.BigEndian.Uint32(d.offsets[idx*4 : idx*4+4])) 940 941 n, key := readKey(d.b[ofs:]) 942 ofs = ofs + int32(n) 943 typ := d.b[ofs] 944 d.mu.RUnlock() 945 return key, typ 946} 947 948// KeyCount returns the count of unique keys in the index. 949func (d *indirectIndex) KeyCount() int { 950 d.mu.RLock() 951 n := len(d.offsets) / 4 952 d.mu.RUnlock() 953 return n 954} 955 956// Delete removes the given keys from the index. 957func (d *indirectIndex) Delete(keys [][]byte) { 958 if len(keys) == 0 { 959 return 960 } 961 962 if !bytesutil.IsSorted(keys) { 963 bytesutil.Sort(keys) 964 } 965 966 // Both keys and offsets are sorted. Walk both in order and skip 967 // any keys that exist in both. 968 d.mu.Lock() 969 start := d.searchOffset(keys[0]) 970 for i := start * 4; i+4 <= len(d.offsets) && len(keys) > 0; i += 4 { 971 offset := binary.BigEndian.Uint32(d.offsets[i : i+4]) 972 _, indexKey := readKey(d.b[offset:]) 973 974 for len(keys) > 0 && bytes.Compare(keys[0], indexKey) < 0 { 975 keys = keys[1:] 976 } 977 978 if len(keys) > 0 && bytes.Equal(keys[0], indexKey) { 979 keys = keys[1:] 980 copy(d.offsets[i:i+4], nilOffset) 981 } 982 } 983 d.offsets = bytesutil.Pack(d.offsets, 4, 255) 984 d.mu.Unlock() 985} 986 987// DeleteRange removes the given keys with data between minTime and maxTime from the index. 988func (d *indirectIndex) DeleteRange(keys [][]byte, minTime, maxTime int64) { 989 // No keys, nothing to do 990 if len(keys) == 0 { 991 return 992 } 993 994 if !bytesutil.IsSorted(keys) { 995 bytesutil.Sort(keys) 996 } 997 998 // If we're deleting the max time range, just use tombstoning to remove the 999 // key from the offsets slice 1000 if minTime == math.MinInt64 && maxTime == math.MaxInt64 { 1001 d.Delete(keys) 1002 return 1003 } 1004 1005 // Is the range passed in outside of the time range for the file? 1006 min, max := d.TimeRange() 1007 if minTime > max || maxTime < min { 1008 return 1009 } 1010 1011 fullKeys := make([][]byte, 0, len(keys)) 1012 tombstones := map[string][]TimeRange{} 1013 var ie []IndexEntry 1014 1015 for i := 0; len(keys) > 0 && i < d.KeyCount(); i++ { 1016 k, entries := d.readEntriesAt(d.offset(i), &ie) 1017 1018 // Skip any keys that don't exist. These are less than the current key. 1019 for len(keys) > 0 && bytes.Compare(keys[0], k) < 0 { 1020 keys = keys[1:] 1021 } 1022 1023 // No more keys to delete, we're done. 1024 if len(keys) == 0 { 1025 break 1026 } 1027 1028 // If the current key is greater than the index one, continue to the next 1029 // index key. 1030 if len(keys) > 0 && bytes.Compare(keys[0], k) > 0 { 1031 continue 1032 } 1033 1034 // If multiple tombstones are saved for the same key 1035 if len(entries) == 0 { 1036 continue 1037 } 1038 1039 // Is the time range passed outside of the time range we've have stored for this key? 1040 min, max := entries[0].MinTime, entries[len(entries)-1].MaxTime 1041 if minTime > max || maxTime < min { 1042 continue 1043 } 1044 1045 // Does the range passed in cover every value for the key? 1046 if minTime <= min && maxTime >= max { 1047 fullKeys = append(fullKeys, keys[0]) 1048 keys = keys[1:] 1049 continue 1050 } 1051 1052 d.mu.RLock() 1053 existing := d.tombstones[string(k)] 1054 d.mu.RUnlock() 1055 1056 // Append the new tombonstes to the existing ones 1057 newTs := append(existing, append(tombstones[string(k)], TimeRange{minTime, maxTime})...) 1058 fn := func(i, j int) bool { 1059 a, b := newTs[i], newTs[j] 1060 if a.Min == b.Min { 1061 return a.Max <= b.Max 1062 } 1063 return a.Min < b.Min 1064 } 1065 1066 // Sort the updated tombstones if necessary 1067 if len(newTs) > 1 && !sort.SliceIsSorted(newTs, fn) { 1068 sort.Slice(newTs, fn) 1069 } 1070 1071 tombstones[string(k)] = newTs 1072 1073 // We need to see if all the tombstones end up deleting the entire series. This 1074 // could happen if their is one tombstore with min,max time spanning all the block 1075 // time ranges or from multiple smaller tombstones the delete segments. To detect 1076 // this cases, we use a window starting at the first tombstone and grow it be each 1077 // tombstone that is immediately adjacent to the current window or if it overlaps. 1078 // If there are any gaps, we abort. 1079 minTs, maxTs := newTs[0].Min, newTs[0].Max 1080 for j := 1; j < len(newTs); j++ { 1081 prevTs := newTs[j-1] 1082 ts := newTs[j] 1083 1084 // Make sure all the tombstone line up for a continuous range. We don't 1085 // want to have two small deletes on each edges end up causing us to 1086 // remove the full key. 1087 if prevTs.Max != ts.Min-1 && !prevTs.Overlaps(ts.Min, ts.Max) { 1088 minTs, maxTs = int64(math.MaxInt64), int64(math.MinInt64) 1089 break 1090 } 1091 1092 if ts.Min < minTs { 1093 minTs = ts.Min 1094 } 1095 if ts.Max > maxTs { 1096 maxTs = ts.Max 1097 } 1098 } 1099 1100 // If we have a fully deleted series, delete it all of it. 1101 if minTs <= min && maxTs >= max { 1102 fullKeys = append(fullKeys, keys[0]) 1103 keys = keys[1:] 1104 continue 1105 } 1106 } 1107 1108 // Delete all the keys that fully deleted in bulk 1109 if len(fullKeys) > 0 { 1110 d.Delete(fullKeys) 1111 } 1112 1113 if len(tombstones) == 0 { 1114 return 1115 } 1116 1117 d.mu.Lock() 1118 for k, v := range tombstones { 1119 d.tombstones[k] = v 1120 } 1121 d.mu.Unlock() 1122} 1123 1124// TombstoneRange returns ranges of time that are deleted for the given key. 1125func (d *indirectIndex) TombstoneRange(key []byte) []TimeRange { 1126 d.mu.RLock() 1127 r := d.tombstones[string(key)] 1128 d.mu.RUnlock() 1129 return r 1130} 1131 1132// Contains return true if the given key exists in the index. 1133func (d *indirectIndex) Contains(key []byte) bool { 1134 return len(d.Entries(key)) > 0 1135} 1136 1137// ContainsValue returns true if key and time might exist in this file. 1138func (d *indirectIndex) ContainsValue(key []byte, timestamp int64) bool { 1139 entry := d.Entry(key, timestamp) 1140 if entry == nil { 1141 return false 1142 } 1143 1144 d.mu.RLock() 1145 tombstones := d.tombstones[string(key)] 1146 d.mu.RUnlock() 1147 1148 for _, t := range tombstones { 1149 if t.Min <= timestamp && t.Max >= timestamp { 1150 return false 1151 } 1152 } 1153 return true 1154} 1155 1156// Type returns the block type of the values stored for the key. 1157func (d *indirectIndex) Type(key []byte) (byte, error) { 1158 d.mu.RLock() 1159 defer d.mu.RUnlock() 1160 1161 ofs := d.search(key) 1162 if ofs < len(d.b) { 1163 n, _ := readKey(d.b[ofs:]) 1164 ofs += n 1165 return d.b[ofs], nil 1166 } 1167 return 0, fmt.Errorf("key does not exist: %s", key) 1168} 1169 1170// OverlapsTimeRange returns true if the time range of the file intersect min and max. 1171func (d *indirectIndex) OverlapsTimeRange(min, max int64) bool { 1172 return d.minTime <= max && d.maxTime >= min 1173} 1174 1175// OverlapsKeyRange returns true if the min and max keys of the file overlap the arguments min and max. 1176func (d *indirectIndex) OverlapsKeyRange(min, max []byte) bool { 1177 return bytes.Compare(d.minKey, max) <= 0 && bytes.Compare(d.maxKey, min) >= 0 1178} 1179 1180// KeyRange returns the min and max keys in the index. 1181func (d *indirectIndex) KeyRange() ([]byte, []byte) { 1182 return d.minKey, d.maxKey 1183} 1184 1185// TimeRange returns the min and max time across all keys in the index. 1186func (d *indirectIndex) TimeRange() (int64, int64) { 1187 return d.minTime, d.maxTime 1188} 1189 1190// MarshalBinary returns a byte slice encoded version of the index. 1191func (d *indirectIndex) MarshalBinary() ([]byte, error) { 1192 d.mu.RLock() 1193 defer d.mu.RUnlock() 1194 1195 return d.b, nil 1196} 1197 1198// UnmarshalBinary populates an index from an encoded byte slice 1199// representation of an index. 1200func (d *indirectIndex) UnmarshalBinary(b []byte) error { 1201 d.mu.Lock() 1202 defer d.mu.Unlock() 1203 1204 // Keep a reference to the actual index bytes 1205 d.b = b 1206 if len(b) == 0 { 1207 return nil 1208 } 1209 1210 //var minKey, maxKey []byte 1211 var minTime, maxTime int64 = math.MaxInt64, 0 1212 1213 // To create our "indirect" index, we need to find the location of all the keys in 1214 // the raw byte slice. The keys are listed once each (in sorted order). Following 1215 // each key is a time ordered list of index entry blocks for that key. The loop below 1216 // basically skips across the slice keeping track of the counter when we are at a key 1217 // field. 1218 var i int32 1219 var offsets []int32 1220 iMax := int32(len(b)) 1221 for i < iMax { 1222 offsets = append(offsets, i) 1223 1224 // Skip to the start of the values 1225 // key length value (2) + type (1) + length of key 1226 if i+2 >= iMax { 1227 return fmt.Errorf("indirectIndex: not enough data for key length value") 1228 } 1229 i += 3 + int32(binary.BigEndian.Uint16(b[i:i+2])) 1230 1231 // count of index entries 1232 if i+indexCountSize >= iMax { 1233 return fmt.Errorf("indirectIndex: not enough data for index entries count") 1234 } 1235 count := int32(binary.BigEndian.Uint16(b[i : i+indexCountSize])) 1236 i += indexCountSize 1237 1238 // Find the min time for the block 1239 if i+8 >= iMax { 1240 return fmt.Errorf("indirectIndex: not enough data for min time") 1241 } 1242 minT := int64(binary.BigEndian.Uint64(b[i : i+8])) 1243 if minT < minTime { 1244 minTime = minT 1245 } 1246 1247 i += (count - 1) * indexEntrySize 1248 1249 // Find the max time for the block 1250 if i+16 >= iMax { 1251 return fmt.Errorf("indirectIndex: not enough data for max time") 1252 } 1253 maxT := int64(binary.BigEndian.Uint64(b[i+8 : i+16])) 1254 if maxT > maxTime { 1255 maxTime = maxT 1256 } 1257 1258 i += indexEntrySize 1259 } 1260 1261 firstOfs := offsets[0] 1262 _, key := readKey(b[firstOfs:]) 1263 d.minKey = key 1264 1265 lastOfs := offsets[len(offsets)-1] 1266 _, key = readKey(b[lastOfs:]) 1267 d.maxKey = key 1268 1269 d.minTime = minTime 1270 d.maxTime = maxTime 1271 1272 var err error 1273 d.offsets, err = mmap(nil, 0, len(offsets)*4) 1274 if err != nil { 1275 return err 1276 } 1277 for i, v := range offsets { 1278 binary.BigEndian.PutUint32(d.offsets[i*4:i*4+4], uint32(v)) 1279 } 1280 1281 return nil 1282} 1283 1284// Size returns the size of the current index in bytes. 1285func (d *indirectIndex) Size() uint32 { 1286 d.mu.RLock() 1287 defer d.mu.RUnlock() 1288 1289 return uint32(len(d.b)) 1290} 1291 1292func (d *indirectIndex) Close() error { 1293 // Windows doesn't use the anonymous map for the offsets index 1294 if runtime.GOOS == "windows" { 1295 return nil 1296 } 1297 return munmap(d.offsets[:cap(d.offsets)]) 1298} 1299 1300// mmapAccess is mmap based block accessor. It access blocks through an 1301// MMAP file interface. 1302type mmapAccessor struct { 1303 accessCount uint64 // Counter incremented everytime the mmapAccessor is accessed 1304 freeCount uint64 // Counter to determine whether the accessor can free its resources 1305 1306 mmapWillNeed bool // If true then mmap advise value MADV_WILLNEED will be provided the kernel for b. 1307 1308 mu sync.RWMutex 1309 b []byte 1310 f *os.File 1311 1312 index *indirectIndex 1313} 1314 1315func (m *mmapAccessor) init() (*indirectIndex, error) { 1316 m.mu.Lock() 1317 defer m.mu.Unlock() 1318 1319 if err := verifyVersion(m.f); err != nil { 1320 return nil, err 1321 } 1322 1323 var err error 1324 1325 if _, err := m.f.Seek(0, 0); err != nil { 1326 return nil, err 1327 } 1328 1329 stat, err := m.f.Stat() 1330 if err != nil { 1331 return nil, err 1332 } 1333 1334 m.b, err = mmap(m.f, 0, int(stat.Size())) 1335 if err != nil { 1336 return nil, err 1337 } 1338 if len(m.b) < 8 { 1339 return nil, fmt.Errorf("mmapAccessor: byte slice too small for indirectIndex") 1340 } 1341 1342 // Hint to the kernel that we will be reading the file. It would be better to hint 1343 // that we will be reading the index section, but that's not been 1344 // implemented as yet. 1345 if m.mmapWillNeed { 1346 if err := madviseWillNeed(m.b); err != nil { 1347 return nil, err 1348 } 1349 } 1350 1351 indexOfsPos := len(m.b) - 8 1352 indexStart := binary.BigEndian.Uint64(m.b[indexOfsPos : indexOfsPos+8]) 1353 if indexStart >= uint64(indexOfsPos) { 1354 return nil, fmt.Errorf("mmapAccessor: invalid indexStart") 1355 } 1356 1357 m.index = NewIndirectIndex() 1358 if err := m.index.UnmarshalBinary(m.b[indexStart:indexOfsPos]); err != nil { 1359 return nil, err 1360 } 1361 1362 // Allow resources to be freed immediately if requested 1363 m.incAccess() 1364 atomic.StoreUint64(&m.freeCount, 1) 1365 1366 return m.index, nil 1367} 1368 1369func (m *mmapAccessor) free() error { 1370 accessCount := atomic.LoadUint64(&m.accessCount) 1371 freeCount := atomic.LoadUint64(&m.freeCount) 1372 1373 // Already freed everything. 1374 if freeCount == 0 && accessCount == 0 { 1375 return nil 1376 } 1377 1378 // Were there accesses after the last time we tried to free? 1379 // If so, don't free anything and record the access count that we 1380 // see now for the next check. 1381 if accessCount != freeCount { 1382 atomic.StoreUint64(&m.freeCount, accessCount) 1383 return nil 1384 } 1385 1386 // Reset both counters to zero to indicate that we have freed everything. 1387 atomic.StoreUint64(&m.accessCount, 0) 1388 atomic.StoreUint64(&m.freeCount, 0) 1389 1390 m.mu.RLock() 1391 defer m.mu.RUnlock() 1392 1393 return madviseDontNeed(m.b) 1394} 1395 1396func (m *mmapAccessor) incAccess() { 1397 atomic.AddUint64(&m.accessCount, 1) 1398} 1399 1400func (m *mmapAccessor) rename(path string) error { 1401 m.incAccess() 1402 1403 m.mu.Lock() 1404 defer m.mu.Unlock() 1405 1406 err := munmap(m.b) 1407 if err != nil { 1408 return err 1409 } 1410 1411 if err := m.f.Close(); err != nil { 1412 return err 1413 } 1414 1415 if err := file.RenameFile(m.f.Name(), path); err != nil { 1416 return err 1417 } 1418 1419 m.f, err = os.Open(path) 1420 if err != nil { 1421 return err 1422 } 1423 1424 if _, err := m.f.Seek(0, 0); err != nil { 1425 return err 1426 } 1427 1428 stat, err := m.f.Stat() 1429 if err != nil { 1430 return err 1431 } 1432 1433 m.b, err = mmap(m.f, 0, int(stat.Size())) 1434 if err != nil { 1435 return err 1436 } 1437 1438 if m.mmapWillNeed { 1439 return madviseWillNeed(m.b) 1440 } 1441 return nil 1442} 1443 1444func (m *mmapAccessor) read(key []byte, timestamp int64) ([]Value, error) { 1445 entry := m.index.Entry(key, timestamp) 1446 if entry == nil { 1447 return nil, nil 1448 } 1449 1450 return m.readBlock(entry, nil) 1451} 1452 1453func (m *mmapAccessor) readBlock(entry *IndexEntry, values []Value) ([]Value, error) { 1454 m.incAccess() 1455 1456 m.mu.RLock() 1457 defer m.mu.RUnlock() 1458 1459 if int64(len(m.b)) < entry.Offset+int64(entry.Size) { 1460 return nil, ErrTSMClosed 1461 } 1462 //TODO: Validate checksum 1463 var err error 1464 values, err = DecodeBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values) 1465 if err != nil { 1466 return nil, err 1467 } 1468 1469 return values, nil 1470} 1471 1472func (m *mmapAccessor) readBytes(entry *IndexEntry, b []byte) (uint32, []byte, error) { 1473 m.incAccess() 1474 1475 m.mu.RLock() 1476 if int64(len(m.b)) < entry.Offset+int64(entry.Size) { 1477 m.mu.RUnlock() 1478 return 0, nil, ErrTSMClosed 1479 } 1480 1481 // return the bytes after the 4 byte checksum 1482 crc, block := binary.BigEndian.Uint32(m.b[entry.Offset:entry.Offset+4]), m.b[entry.Offset+4:entry.Offset+int64(entry.Size)] 1483 m.mu.RUnlock() 1484 1485 return crc, block, nil 1486} 1487 1488// readAll returns all values for a key in all blocks. 1489func (m *mmapAccessor) readAll(key []byte) ([]Value, error) { 1490 m.incAccess() 1491 1492 blocks := m.index.Entries(key) 1493 if len(blocks) == 0 { 1494 return nil, nil 1495 } 1496 1497 tombstones := m.index.TombstoneRange(key) 1498 1499 m.mu.RLock() 1500 defer m.mu.RUnlock() 1501 1502 var temp []Value 1503 var err error 1504 var values []Value 1505 for _, block := range blocks { 1506 var skip bool 1507 for _, t := range tombstones { 1508 // Should we skip this block because it contains points that have been deleted 1509 if t.Min <= block.MinTime && t.Max >= block.MaxTime { 1510 skip = true 1511 break 1512 } 1513 } 1514 1515 if skip { 1516 continue 1517 } 1518 //TODO: Validate checksum 1519 temp = temp[:0] 1520 // The +4 is the 4 byte checksum length 1521 temp, err = DecodeBlock(m.b[block.Offset+4:block.Offset+int64(block.Size)], temp) 1522 if err != nil { 1523 return nil, err 1524 } 1525 1526 // Filter out any values that were deleted 1527 for _, t := range tombstones { 1528 temp = Values(temp).Exclude(t.Min, t.Max) 1529 } 1530 1531 values = append(values, temp...) 1532 } 1533 1534 return values, nil 1535} 1536 1537func (m *mmapAccessor) path() string { 1538 m.mu.RLock() 1539 path := m.f.Name() 1540 m.mu.RUnlock() 1541 return path 1542} 1543 1544func (m *mmapAccessor) close() error { 1545 m.mu.Lock() 1546 defer m.mu.Unlock() 1547 1548 if m.b == nil { 1549 return nil 1550 } 1551 1552 err := munmap(m.b) 1553 if err != nil { 1554 return err 1555 } 1556 1557 m.b = nil 1558 return m.f.Close() 1559} 1560 1561type indexEntries struct { 1562 Type byte 1563 entries []IndexEntry 1564} 1565 1566func (a *indexEntries) Len() int { return len(a.entries) } 1567func (a *indexEntries) Swap(i, j int) { a.entries[i], a.entries[j] = a.entries[j], a.entries[i] } 1568func (a *indexEntries) Less(i, j int) bool { 1569 return a.entries[i].MinTime < a.entries[j].MinTime 1570} 1571 1572func (a *indexEntries) MarshalBinary() ([]byte, error) { 1573 buf := make([]byte, len(a.entries)*indexEntrySize) 1574 1575 for i, entry := range a.entries { 1576 entry.AppendTo(buf[indexEntrySize*i:]) 1577 } 1578 1579 return buf, nil 1580} 1581 1582func (a *indexEntries) WriteTo(w io.Writer) (total int64, err error) { 1583 var buf [indexEntrySize]byte 1584 var n int 1585 1586 for _, entry := range a.entries { 1587 entry.AppendTo(buf[:]) 1588 n, err = w.Write(buf[:]) 1589 total += int64(n) 1590 if err != nil { 1591 return total, err 1592 } 1593 } 1594 1595 return total, nil 1596} 1597 1598func readKey(b []byte) (n int, key []byte) { 1599 // 2 byte size of key 1600 n, size := 2, int(binary.BigEndian.Uint16(b[:2])) 1601 1602 // N byte key 1603 key = b[n : n+size] 1604 1605 n += len(key) 1606 return 1607} 1608 1609func readEntries(b []byte, entries *indexEntries) (n int, err error) { 1610 if len(b) < 1+indexCountSize { 1611 return 0, fmt.Errorf("readEntries: data too short for headers") 1612 } 1613 1614 // 1 byte block type 1615 entries.Type = b[n] 1616 n++ 1617 1618 // 2 byte count of index entries 1619 count := int(binary.BigEndian.Uint16(b[n : n+indexCountSize])) 1620 n += indexCountSize 1621 1622 if cap(entries.entries) < count { 1623 entries.entries = make([]IndexEntry, count) 1624 } else { 1625 entries.entries = entries.entries[:count] 1626 } 1627 1628 b = b[indexCountSize+indexTypeSize:] 1629 for i := 0; i < len(entries.entries); i++ { 1630 if err = entries.entries[i].UnmarshalBinary(b); err != nil { 1631 return 0, fmt.Errorf("readEntries: unmarshal error: %v", err) 1632 } 1633 b = b[indexEntrySize:] 1634 } 1635 1636 n += count * indexEntrySize 1637 1638 return 1639} 1640