1package tsm1 2 3import ( 4 "bytes" 5 "encoding/binary" 6 "fmt" 7 "io" 8 "math" 9 "os" 10 "runtime" 11 "sort" 12 "sync" 13 "sync/atomic" 14 15 "github.com/influxdata/influxdb/pkg/bytesutil" 16 "github.com/influxdata/influxdb/pkg/file" 17 "github.com/influxdata/influxdb/tsdb" 18) 19 20// ErrFileInUse is returned when attempting to remove or close a TSM file that is still being used. 21var ErrFileInUse = fmt.Errorf("file still in use") 22 23// nilOffset is the value written to the offsets to indicate that position is deleted. The value is the max 24// uint32 which is an invalid position. We don't use 0 as 0 is actually a valid position. 25var nilOffset = []byte{255, 255, 255, 255} 26 27// TSMReader is a reader for a TSM file. 28type TSMReader struct { 29 // refs is the count of active references to this reader. 30 refs int64 31 refsWG sync.WaitGroup 32 33 madviseWillNeed bool // Hint to the kernel with MADV_WILLNEED. 34 mu sync.RWMutex 35 36 // accessor provides access and decoding of blocks for the reader. 37 accessor blockAccessor 38 39 // index is the index of all blocks. 40 index TSMIndex 41 42 // tombstoner ensures tombstoned keys are not available by the index. 43 tombstoner *Tombstoner 44 45 // size is the size of the file on disk. 46 size int64 47 48 // lastModified is the last time this file was modified on disk 49 lastModified int64 50 51 // deleteMu limits concurrent deletes 52 deleteMu sync.Mutex 53} 54 55// TSMIndex represent the index section of a TSM file. The index records all 56// blocks, their locations, sizes, min and max times. 57type TSMIndex interface { 58 // Delete removes the given keys from the index. 59 Delete(keys [][]byte) 60 61 // DeleteRange removes the given keys with data between minTime and maxTime from the index. 62 DeleteRange(keys [][]byte, minTime, maxTime int64) 63 64 // ContainsKey returns true if the given key may exist in the index. This func is faster than 65 // Contains but, may return false positives. 66 ContainsKey(key []byte) bool 67 68 // Contains return true if the given key exists in the index. 69 Contains(key []byte) bool 70 71 // ContainsValue returns true if key and time might exist in this file. This function could 72 // return true even though the actual point does not exists. For example, the key may 73 // exist in this file, but not have a point exactly at time t. 74 ContainsValue(key []byte, timestamp int64) bool 75 76 // Entries returns all index entries for a key. 77 Entries(key []byte) []IndexEntry 78 79 // ReadEntries reads the index entries for key into entries. 80 ReadEntries(key []byte, entries *[]IndexEntry) []IndexEntry 81 82 // Entry returns the index entry for the specified key and timestamp. If no entry 83 // matches the key and timestamp, nil is returned. 84 Entry(key []byte, timestamp int64) *IndexEntry 85 86 // Key returns the key in the index at the given position, using entries to avoid allocations. 87 Key(index int, entries *[]IndexEntry) ([]byte, byte, []IndexEntry) 88 89 // KeyAt returns the key in the index at the given position. 90 KeyAt(index int) ([]byte, byte) 91 92 // KeyCount returns the count of unique keys in the index. 93 KeyCount() int 94 95 // Seek returns the position in the index where key <= value in the index. 96 Seek(key []byte) int 97 98 // OverlapsTimeRange returns true if the time range of the file intersect min and max. 99 OverlapsTimeRange(min, max int64) bool 100 101 // OverlapsKeyRange returns true if the min and max keys of the file overlap the arguments min and max. 102 OverlapsKeyRange(min, max []byte) bool 103 104 // Size returns the size of the current index in bytes. 105 Size() uint32 106 107 // TimeRange returns the min and max time across all keys in the file. 108 TimeRange() (int64, int64) 109 110 // TombstoneRange returns ranges of time that are deleted for the given key. 111 TombstoneRange(key []byte) []TimeRange 112 113 // KeyRange returns the min and max keys in the file. 114 KeyRange() ([]byte, []byte) 115 116 // Type returns the block type of the values stored for the key. Returns one of 117 // BlockFloat64, BlockInt64, BlockBool, BlockString. If key does not exist, 118 // an error is returned. 119 Type(key []byte) (byte, error) 120 121 // UnmarshalBinary populates an index from an encoded byte slice 122 // representation of an index. 123 UnmarshalBinary(b []byte) error 124 125 // Close closes the index and releases any resources. 126 Close() error 127} 128 129// BlockIterator allows iterating over each block in a TSM file in order. It provides 130// raw access to the block bytes without decoding them. 131type BlockIterator struct { 132 r *TSMReader 133 134 // i is the current key index 135 i int 136 137 // n is the total number of keys 138 n int 139 140 key []byte 141 cache []IndexEntry 142 entries []IndexEntry 143 err error 144 typ byte 145} 146 147// PeekNext returns the next key to be iterated or an empty string. 148func (b *BlockIterator) PeekNext() []byte { 149 if len(b.entries) > 1 { 150 return b.key 151 } else if b.n-b.i > 1 { 152 key, _ := b.r.KeyAt(b.i + 1) 153 return key 154 } 155 return nil 156} 157 158// Next returns true if there are more blocks to iterate through. 159func (b *BlockIterator) Next() bool { 160 if b.err != nil { 161 return false 162 } 163 164 if b.n-b.i == 0 && len(b.entries) == 0 { 165 return false 166 } 167 168 if len(b.entries) > 0 { 169 b.entries = b.entries[1:] 170 if len(b.entries) > 0 { 171 return true 172 } 173 } 174 175 if b.n-b.i > 0 { 176 b.key, b.typ, b.entries = b.r.Key(b.i, &b.cache) 177 b.i++ 178 179 // If there were deletes on the TSMReader, then our index is now off and we 180 // can't proceed. What we just read may not actually the next block. 181 if b.n != b.r.KeyCount() { 182 b.err = fmt.Errorf("delete during iteration") 183 return false 184 } 185 186 if len(b.entries) > 0 { 187 return true 188 } 189 } 190 191 return false 192} 193 194// Read reads information about the next block to be iterated. 195func (b *BlockIterator) Read() (key []byte, minTime int64, maxTime int64, typ byte, checksum uint32, buf []byte, err error) { 196 if b.err != nil { 197 return nil, 0, 0, 0, 0, nil, b.err 198 } 199 checksum, buf, err = b.r.ReadBytes(&b.entries[0], nil) 200 if err != nil { 201 b.err = err 202 return nil, 0, 0, 0, 0, nil, err 203 } 204 return b.key, b.entries[0].MinTime, b.entries[0].MaxTime, b.typ, checksum, buf, err 205} 206 207// Err returns any errors encounter during iteration. 208func (b *BlockIterator) Err() error { 209 return b.err 210} 211 212type tsmReaderOption func(*TSMReader) 213 214// WithMadviseWillNeed is an option for specifying whether to provide a MADV_WILL need hint to the kernel. 215var WithMadviseWillNeed = func(willNeed bool) tsmReaderOption { 216 return func(r *TSMReader) { 217 r.madviseWillNeed = willNeed 218 } 219} 220 221// NewTSMReader returns a new TSMReader from the given file. 222func NewTSMReader(f *os.File, options ...tsmReaderOption) (*TSMReader, error) { 223 t := &TSMReader{} 224 for _, option := range options { 225 option(t) 226 } 227 228 stat, err := f.Stat() 229 if err != nil { 230 return nil, err 231 } 232 t.size = stat.Size() 233 t.lastModified = stat.ModTime().UnixNano() 234 t.accessor = &mmapAccessor{ 235 f: f, 236 mmapWillNeed: t.madviseWillNeed, 237 } 238 239 index, err := t.accessor.init() 240 if err != nil { 241 return nil, err 242 } 243 244 t.index = index 245 t.tombstoner = NewTombstoner(t.Path(), index.ContainsKey) 246 247 if err := t.applyTombstones(); err != nil { 248 return nil, err 249 } 250 251 return t, nil 252} 253 254// WithObserver sets the observer for the TSM reader. 255func (t *TSMReader) WithObserver(obs tsdb.FileStoreObserver) { 256 t.tombstoner.WithObserver(obs) 257} 258 259func (t *TSMReader) applyTombstones() error { 260 var cur, prev Tombstone 261 batch := make([][]byte, 0, 4096) 262 263 if err := t.tombstoner.Walk(func(ts Tombstone) error { 264 cur = ts 265 if len(batch) > 0 { 266 if prev.Min != cur.Min || prev.Max != cur.Max { 267 t.index.DeleteRange(batch, prev.Min, prev.Max) 268 batch = batch[:0] 269 } 270 } 271 272 // Copy the tombstone key and re-use the buffers to avoid allocations 273 n := len(batch) 274 batch = batch[:n+1] 275 if cap(batch[n]) < len(ts.Key) { 276 batch[n] = make([]byte, len(ts.Key)) 277 } else { 278 batch[n] = batch[n][:len(ts.Key)] 279 } 280 copy(batch[n], ts.Key) 281 282 if len(batch) >= 4096 { 283 t.index.DeleteRange(batch, prev.Min, prev.Max) 284 batch = batch[:0] 285 } 286 287 prev = ts 288 return nil 289 }); err != nil { 290 return fmt.Errorf("init: read tombstones: %v", err) 291 } 292 293 if len(batch) > 0 { 294 t.index.DeleteRange(batch, cur.Min, cur.Max) 295 } 296 return nil 297} 298 299func (t *TSMReader) Free() error { 300 t.mu.RLock() 301 defer t.mu.RUnlock() 302 return t.accessor.free() 303} 304 305// Path returns the path of the file the TSMReader was initialized with. 306func (t *TSMReader) Path() string { 307 t.mu.RLock() 308 p := t.accessor.path() 309 t.mu.RUnlock() 310 return p 311} 312 313// Key returns the key and the underlying entry at the numeric index. 314func (t *TSMReader) Key(index int, entries *[]IndexEntry) ([]byte, byte, []IndexEntry) { 315 return t.index.Key(index, entries) 316} 317 318// KeyAt returns the key and key type at position idx in the index. 319func (t *TSMReader) KeyAt(idx int) ([]byte, byte) { 320 return t.index.KeyAt(idx) 321} 322 323func (t *TSMReader) Seek(key []byte) int { 324 return t.index.Seek(key) 325} 326 327// ReadAt returns the values corresponding to the given index entry. 328func (t *TSMReader) ReadAt(entry *IndexEntry, vals []Value) ([]Value, error) { 329 t.mu.RLock() 330 v, err := t.accessor.readBlock(entry, vals) 331 t.mu.RUnlock() 332 return v, err 333} 334 335// Read returns the values corresponding to the block at the given key and timestamp. 336func (t *TSMReader) Read(key []byte, timestamp int64) ([]Value, error) { 337 t.mu.RLock() 338 v, err := t.accessor.read(key, timestamp) 339 t.mu.RUnlock() 340 return v, err 341} 342 343// ReadAll returns all values for a key in all blocks. 344func (t *TSMReader) ReadAll(key []byte) ([]Value, error) { 345 t.mu.RLock() 346 v, err := t.accessor.readAll(key) 347 t.mu.RUnlock() 348 return v, err 349} 350 351func (t *TSMReader) ReadBytes(e *IndexEntry, b []byte) (uint32, []byte, error) { 352 t.mu.RLock() 353 n, v, err := t.accessor.readBytes(e, b) 354 t.mu.RUnlock() 355 return n, v, err 356} 357 358// Type returns the type of values stored at the given key. 359func (t *TSMReader) Type(key []byte) (byte, error) { 360 return t.index.Type(key) 361} 362 363// Close closes the TSMReader. 364func (t *TSMReader) Close() error { 365 t.refsWG.Wait() 366 367 t.mu.Lock() 368 defer t.mu.Unlock() 369 370 if err := t.accessor.close(); err != nil { 371 return err 372 } 373 374 return t.index.Close() 375} 376 377// Ref records a usage of this TSMReader. If there are active references 378// when the reader is closed or removed, the reader will remain open until 379// there are no more references. 380func (t *TSMReader) Ref() { 381 atomic.AddInt64(&t.refs, 1) 382 t.refsWG.Add(1) 383} 384 385// Unref removes a usage record of this TSMReader. If the Reader was closed 386// by another goroutine while there were active references, the file will 387// be closed and remove 388func (t *TSMReader) Unref() { 389 atomic.AddInt64(&t.refs, -1) 390 t.refsWG.Done() 391} 392 393// InUse returns whether the TSMReader currently has any active references. 394func (t *TSMReader) InUse() bool { 395 refs := atomic.LoadInt64(&t.refs) 396 return refs > 0 397} 398 399// Remove removes any underlying files stored on disk for this reader. 400func (t *TSMReader) Remove() error { 401 t.mu.Lock() 402 defer t.mu.Unlock() 403 return t.remove() 404} 405 406// Rename renames the underlying file to the new path. 407func (t *TSMReader) Rename(path string) error { 408 t.mu.Lock() 409 defer t.mu.Unlock() 410 return t.accessor.rename(path) 411} 412 413// Remove removes any underlying files stored on disk for this reader. 414func (t *TSMReader) remove() error { 415 path := t.accessor.path() 416 417 if t.InUse() { 418 return ErrFileInUse 419 } 420 421 if path != "" { 422 err := os.RemoveAll(path) 423 if err != nil { 424 return err 425 } 426 } 427 428 if err := t.tombstoner.Delete(); err != nil { 429 return err 430 } 431 return nil 432} 433 434// Contains returns whether the given key is present in the index. 435func (t *TSMReader) Contains(key []byte) bool { 436 return t.index.Contains(key) 437} 438 439// ContainsValue returns true if key and time might exists in this file. This function could 440// return true even though the actual point does not exist. For example, the key may 441// exist in this file, but not have a point exactly at time t. 442func (t *TSMReader) ContainsValue(key []byte, ts int64) bool { 443 return t.index.ContainsValue(key, ts) 444} 445 446// DeleteRange removes the given points for keys between minTime and maxTime. The series 447// keys passed in must be sorted. 448func (t *TSMReader) DeleteRange(keys [][]byte, minTime, maxTime int64) error { 449 if len(keys) == 0 { 450 return nil 451 } 452 453 batch := t.BatchDelete() 454 if err := batch.DeleteRange(keys, minTime, maxTime); err != nil { 455 batch.Rollback() 456 return err 457 } 458 return batch.Commit() 459} 460 461// Delete deletes blocks indicated by keys. 462func (t *TSMReader) Delete(keys [][]byte) error { 463 if err := t.tombstoner.Add(keys); err != nil { 464 return err 465 } 466 467 if err := t.tombstoner.Flush(); err != nil { 468 return err 469 } 470 471 t.index.Delete(keys) 472 return nil 473} 474 475// OverlapsTimeRange returns true if the time range of the file intersect min and max. 476func (t *TSMReader) OverlapsTimeRange(min, max int64) bool { 477 return t.index.OverlapsTimeRange(min, max) 478} 479 480// OverlapsKeyRange returns true if the key range of the file intersect min and max. 481func (t *TSMReader) OverlapsKeyRange(min, max []byte) bool { 482 return t.index.OverlapsKeyRange(min, max) 483} 484 485// TimeRange returns the min and max time across all keys in the file. 486func (t *TSMReader) TimeRange() (int64, int64) { 487 return t.index.TimeRange() 488} 489 490// KeyRange returns the min and max key across all keys in the file. 491func (t *TSMReader) KeyRange() ([]byte, []byte) { 492 return t.index.KeyRange() 493} 494 495// KeyCount returns the count of unique keys in the TSMReader. 496func (t *TSMReader) KeyCount() int { 497 return t.index.KeyCount() 498} 499 500// Entries returns all index entries for key. 501func (t *TSMReader) Entries(key []byte) []IndexEntry { 502 return t.index.Entries(key) 503} 504 505// ReadEntries reads the index entries for key into entries. 506func (t *TSMReader) ReadEntries(key []byte, entries *[]IndexEntry) []IndexEntry { 507 return t.index.ReadEntries(key, entries) 508} 509 510// IndexSize returns the size of the index in bytes. 511func (t *TSMReader) IndexSize() uint32 { 512 return t.index.Size() 513} 514 515// Size returns the size of the underlying file in bytes. 516func (t *TSMReader) Size() uint32 { 517 t.mu.RLock() 518 size := t.size 519 t.mu.RUnlock() 520 return uint32(size) 521} 522 523// LastModified returns the last time the underlying file was modified. 524func (t *TSMReader) LastModified() int64 { 525 t.mu.RLock() 526 lm := t.lastModified 527 if ts := t.tombstoner.TombstoneStats(); ts.TombstoneExists { 528 if ts.LastModified > lm { 529 lm = ts.LastModified 530 } 531 } 532 t.mu.RUnlock() 533 return lm 534} 535 536// HasTombstones return true if there are any tombstone entries recorded. 537func (t *TSMReader) HasTombstones() bool { 538 t.mu.RLock() 539 b := t.tombstoner.HasTombstones() 540 t.mu.RUnlock() 541 return b 542} 543 544// TombstoneFiles returns any tombstone files associated with this TSM file. 545func (t *TSMReader) TombstoneStats() TombstoneStat { 546 t.mu.RLock() 547 fs := t.tombstoner.TombstoneStats() 548 t.mu.RUnlock() 549 return fs 550} 551 552// TombstoneRange returns ranges of time that are deleted for the given key. 553func (t *TSMReader) TombstoneRange(key []byte) []TimeRange { 554 t.mu.RLock() 555 tr := t.index.TombstoneRange(key) 556 t.mu.RUnlock() 557 return tr 558} 559 560// Stats returns the FileStat for the TSMReader's underlying file. 561func (t *TSMReader) Stats() FileStat { 562 minTime, maxTime := t.index.TimeRange() 563 minKey, maxKey := t.index.KeyRange() 564 return FileStat{ 565 Path: t.Path(), 566 Size: t.Size(), 567 LastModified: t.LastModified(), 568 MinTime: minTime, 569 MaxTime: maxTime, 570 MinKey: minKey, 571 MaxKey: maxKey, 572 HasTombstone: t.tombstoner.HasTombstones(), 573 } 574} 575 576// BlockIterator returns a BlockIterator for the underlying TSM file. 577func (t *TSMReader) BlockIterator() *BlockIterator { 578 return &BlockIterator{ 579 r: t, 580 n: t.index.KeyCount(), 581 } 582} 583 584type BatchDeleter interface { 585 DeleteRange(keys [][]byte, min, max int64) error 586 Commit() error 587 Rollback() error 588} 589 590type batchDelete struct { 591 r *TSMReader 592} 593 594func (b *batchDelete) DeleteRange(keys [][]byte, minTime, maxTime int64) error { 595 if len(keys) == 0 { 596 return nil 597 } 598 599 // If the keys can't exist in this TSM file, skip it. 600 minKey, maxKey := keys[0], keys[len(keys)-1] 601 if !b.r.index.OverlapsKeyRange(minKey, maxKey) { 602 return nil 603 } 604 605 // If the timerange can't exist in this TSM file, skip it. 606 if !b.r.index.OverlapsTimeRange(minTime, maxTime) { 607 return nil 608 } 609 610 if err := b.r.tombstoner.AddRange(keys, minTime, maxTime); err != nil { 611 return err 612 } 613 614 return nil 615} 616 617func (b *batchDelete) Commit() error { 618 defer b.r.deleteMu.Unlock() 619 if err := b.r.tombstoner.Flush(); err != nil { 620 return err 621 } 622 623 return b.r.applyTombstones() 624} 625 626func (b *batchDelete) Rollback() error { 627 defer b.r.deleteMu.Unlock() 628 return b.r.tombstoner.Rollback() 629} 630 631// BatchDelete returns a BatchDeleter. Only a single goroutine may run a BatchDelete at a time. 632// Callers must either Commit or Rollback the operation. 633func (r *TSMReader) BatchDelete() BatchDeleter { 634 r.deleteMu.Lock() 635 return &batchDelete{r: r} 636} 637 638type BatchDeleters []BatchDeleter 639 640func (a BatchDeleters) DeleteRange(keys [][]byte, min, max int64) error { 641 errC := make(chan error, len(a)) 642 for _, b := range a { 643 go func(b BatchDeleter) { errC <- b.DeleteRange(keys, min, max) }(b) 644 } 645 646 var err error 647 for i := 0; i < len(a); i++ { 648 dErr := <-errC 649 if dErr != nil { 650 err = dErr 651 } 652 } 653 return err 654} 655 656func (a BatchDeleters) Commit() error { 657 errC := make(chan error, len(a)) 658 for _, b := range a { 659 go func(b BatchDeleter) { errC <- b.Commit() }(b) 660 } 661 662 var err error 663 for i := 0; i < len(a); i++ { 664 dErr := <-errC 665 if dErr != nil { 666 err = dErr 667 } 668 } 669 return err 670} 671 672func (a BatchDeleters) Rollback() error { 673 errC := make(chan error, len(a)) 674 for _, b := range a { 675 go func(b BatchDeleter) { errC <- b.Rollback() }(b) 676 } 677 678 var err error 679 for i := 0; i < len(a); i++ { 680 dErr := <-errC 681 if dErr != nil { 682 err = dErr 683 } 684 } 685 return err 686} 687 688// indirectIndex is a TSMIndex that uses a raw byte slice representation of an index. This 689// implementation can be used for indexes that may be MMAPed into memory. 690type indirectIndex struct { 691 mu sync.RWMutex 692 693 // indirectIndex works a follows. Assuming we have an index structure in memory as 694 // the diagram below: 695 // 696 // ┌────────────────────────────────────────────────────────────────────┐ 697 // │ Index │ 698 // ├─┬──────────────────────┬──┬───────────────────────┬───┬────────────┘ 699 // │0│ │62│ │145│ 700 // ├─┴───────┬─────────┬────┼──┴──────┬─────────┬──────┼───┴─────┬──────┐ 701 // │Key 1 Len│ Key │... │Key 2 Len│ Key 2 │ ... │ Key 3 │ ... │ 702 // │ 2 bytes │ N bytes │ │ 2 bytes │ N bytes │ │ 2 bytes │ │ 703 // └─────────┴─────────┴────┴─────────┴─────────┴──────┴─────────┴──────┘ 704 705 // We would build an `offsets` slices where each element pointers to the byte location 706 // for the first key in the index slice. 707 708 // ┌────────────────────────────────────────────────────────────────────┐ 709 // │ Offsets │ 710 // ├────┬────┬────┬─────────────────────────────────────────────────────┘ 711 // │ 0 │ 62 │145 │ 712 // └────┴────┴────┘ 713 714 // Using this offset slice we can find `Key 2` by doing a binary search 715 // over the offsets slice. Instead of comparing the value in the offsets 716 // (e.g. `62`), we use that as an index into the underlying index to 717 // retrieve the key at position `62` and perform our comparisons with that. 718 719 // When we have identified the correct position in the index for a given 720 // key, we could perform another binary search or a linear scan. This 721 // should be fast as well since each index entry is 28 bytes and all 722 // contiguous in memory. The current implementation uses a linear scan since the 723 // number of block entries is expected to be < 100 per key. 724 725 // b is the underlying index byte slice. This could be a copy on the heap or an MMAP 726 // slice reference 727 b []byte 728 729 // offsets contains the positions in b for each key. It points to the 2 byte length of 730 // key. 731 offsets []byte 732 733 // minKey, maxKey are the minium and maximum (lexicographically sorted) contained in the 734 // file 735 minKey, maxKey []byte 736 737 // minTime, maxTime are the minimum and maximum times contained in the file across all 738 // series. 739 minTime, maxTime int64 740 741 // tombstones contains only the tombstoned keys with subset of time values deleted. An 742 // entry would exist here if a subset of the points for a key were deleted and the file 743 // had not be re-compacted to remove the points on disk. 744 tombstones map[string][]TimeRange 745} 746 747// TimeRange holds a min and max timestamp. 748type TimeRange struct { 749 Min, Max int64 750} 751 752func (t TimeRange) Overlaps(min, max int64) bool { 753 return t.Min <= max && t.Max >= min 754} 755 756// NewIndirectIndex returns a new indirect index. 757func NewIndirectIndex() *indirectIndex { 758 return &indirectIndex{ 759 tombstones: make(map[string][]TimeRange), 760 } 761} 762 763func (d *indirectIndex) offset(i int) int { 764 if i < 0 || i+4 > len(d.offsets) { 765 return -1 766 } 767 return int(binary.BigEndian.Uint32(d.offsets[i*4 : i*4+4])) 768} 769 770func (d *indirectIndex) Seek(key []byte) int { 771 d.mu.RLock() 772 defer d.mu.RUnlock() 773 return d.searchOffset(key) 774} 775 776// searchOffset searches the offsets slice for key and returns the position in 777// offsets where key would exist. 778func (d *indirectIndex) searchOffset(key []byte) int { 779 // We use a binary search across our indirect offsets (pointers to all the keys 780 // in the index slice). 781 i := bytesutil.SearchBytesFixed(d.offsets, 4, func(x []byte) bool { 782 // i is the position in offsets we are at so get offset it points to 783 offset := int32(binary.BigEndian.Uint32(x)) 784 785 // It's pointing to the start of the key which is a 2 byte length 786 keyLen := int32(binary.BigEndian.Uint16(d.b[offset : offset+2])) 787 788 // See if it matches 789 return bytes.Compare(d.b[offset+2:offset+2+keyLen], key) >= 0 790 }) 791 792 // See if we might have found the right index 793 if i < len(d.offsets) { 794 return int(i / 4) 795 } 796 797 // The key is not in the index. i is the index where it would be inserted so return 798 // a value outside our offset range. 799 return int(len(d.offsets)) / 4 800} 801 802// search returns the byte position of key in the index. If key is not 803// in the index, len(index) is returned. 804func (d *indirectIndex) search(key []byte) int { 805 if !d.ContainsKey(key) { 806 return len(d.b) 807 } 808 809 // We use a binary search across our indirect offsets (pointers to all the keys 810 // in the index slice). 811 // TODO(sgc): this should be inlined to `indirectIndex` as it is only used here 812 i := bytesutil.SearchBytesFixed(d.offsets, 4, func(x []byte) bool { 813 // i is the position in offsets we are at so get offset it points to 814 offset := int32(binary.BigEndian.Uint32(x)) 815 816 // It's pointing to the start of the key which is a 2 byte length 817 keyLen := int32(binary.BigEndian.Uint16(d.b[offset : offset+2])) 818 819 // See if it matches 820 return bytes.Compare(d.b[offset+2:offset+2+keyLen], key) >= 0 821 }) 822 823 // See if we might have found the right index 824 if i < len(d.offsets) { 825 ofs := binary.BigEndian.Uint32(d.offsets[i : i+4]) 826 _, k := readKey(d.b[ofs:]) 827 828 // The search may have returned an i == 0 which could indicated that the value 829 // searched should be inserted at position 0. Make sure the key in the index 830 // matches the search value. 831 if !bytes.Equal(key, k) { 832 return len(d.b) 833 } 834 835 return int(ofs) 836 } 837 838 // The key is not in the index. i is the index where it would be inserted so return 839 // a value outside our offset range. 840 return len(d.b) 841} 842 843// ContainsKey returns true of key may exist in this index. 844func (d *indirectIndex) ContainsKey(key []byte) bool { 845 return bytes.Compare(key, d.minKey) >= 0 && bytes.Compare(key, d.maxKey) <= 0 846} 847 848// Entries returns all index entries for a key. 849func (d *indirectIndex) Entries(key []byte) []IndexEntry { 850 return d.ReadEntries(key, nil) 851} 852 853func (d *indirectIndex) readEntriesAt(ofs int, entries *[]IndexEntry) ([]byte, []IndexEntry) { 854 n, k := readKey(d.b[ofs:]) 855 856 // Read and return all the entries 857 ofs += n 858 var ie indexEntries 859 if entries != nil { 860 ie.entries = *entries 861 } 862 if _, err := readEntries(d.b[ofs:], &ie); err != nil { 863 panic(fmt.Sprintf("error reading entries: %v", err)) 864 } 865 if entries != nil { 866 *entries = ie.entries 867 } 868 return k, ie.entries 869} 870 871// ReadEntries returns all index entries for a key. 872func (d *indirectIndex) ReadEntries(key []byte, entries *[]IndexEntry) []IndexEntry { 873 d.mu.RLock() 874 defer d.mu.RUnlock() 875 876 ofs := d.search(key) 877 if ofs < len(d.b) { 878 k, entries := d.readEntriesAt(ofs, entries) 879 // The search may have returned an i == 0 which could indicated that the value 880 // searched should be inserted at position 0. Make sure the key in the index 881 // matches the search value. 882 if !bytes.Equal(key, k) { 883 return nil 884 } 885 886 return entries 887 } 888 889 // The key is not in the index. i is the index where it would be inserted. 890 return nil 891} 892 893// Entry returns the index entry for the specified key and timestamp. If no entry 894// matches the key an timestamp, nil is returned. 895func (d *indirectIndex) Entry(key []byte, timestamp int64) *IndexEntry { 896 entries := d.Entries(key) 897 for _, entry := range entries { 898 if entry.Contains(timestamp) { 899 return &entry 900 } 901 } 902 return nil 903} 904 905// Key returns the key in the index at the given position. 906func (d *indirectIndex) Key(idx int, entries *[]IndexEntry) ([]byte, byte, []IndexEntry) { 907 d.mu.RLock() 908 defer d.mu.RUnlock() 909 910 if idx < 0 || idx*4+4 > len(d.offsets) { 911 return nil, 0, nil 912 } 913 ofs := binary.BigEndian.Uint32(d.offsets[idx*4 : idx*4+4]) 914 n, key := readKey(d.b[ofs:]) 915 916 typ := d.b[int(ofs)+n] 917 918 var ie indexEntries 919 if entries != nil { 920 ie.entries = *entries 921 } 922 if _, err := readEntries(d.b[int(ofs)+n:], &ie); err != nil { 923 return nil, 0, nil 924 } 925 if entries != nil { 926 *entries = ie.entries 927 } 928 929 return key, typ, ie.entries 930} 931 932// KeyAt returns the key in the index at the given position. 933func (d *indirectIndex) KeyAt(idx int) ([]byte, byte) { 934 d.mu.RLock() 935 936 if idx < 0 || idx*4+4 > len(d.offsets) { 937 d.mu.RUnlock() 938 return nil, 0 939 } 940 ofs := int32(binary.BigEndian.Uint32(d.offsets[idx*4 : idx*4+4])) 941 942 n, key := readKey(d.b[ofs:]) 943 ofs = ofs + int32(n) 944 typ := d.b[ofs] 945 d.mu.RUnlock() 946 return key, typ 947} 948 949// KeyCount returns the count of unique keys in the index. 950func (d *indirectIndex) KeyCount() int { 951 d.mu.RLock() 952 n := len(d.offsets) / 4 953 d.mu.RUnlock() 954 return n 955} 956 957// Delete removes the given keys from the index. 958func (d *indirectIndex) Delete(keys [][]byte) { 959 if len(keys) == 0 { 960 return 961 } 962 963 if !bytesutil.IsSorted(keys) { 964 bytesutil.Sort(keys) 965 } 966 967 // Both keys and offsets are sorted. Walk both in order and skip 968 // any keys that exist in both. 969 d.mu.Lock() 970 start := d.searchOffset(keys[0]) 971 for i := start * 4; i+4 <= len(d.offsets) && len(keys) > 0; i += 4 { 972 offset := binary.BigEndian.Uint32(d.offsets[i : i+4]) 973 _, indexKey := readKey(d.b[offset:]) 974 975 for len(keys) > 0 && bytes.Compare(keys[0], indexKey) < 0 { 976 keys = keys[1:] 977 } 978 979 if len(keys) > 0 && bytes.Equal(keys[0], indexKey) { 980 keys = keys[1:] 981 copy(d.offsets[i:i+4], nilOffset) 982 } 983 } 984 d.offsets = bytesutil.Pack(d.offsets, 4, 255) 985 d.mu.Unlock() 986} 987 988// DeleteRange removes the given keys with data between minTime and maxTime from the index. 989func (d *indirectIndex) DeleteRange(keys [][]byte, minTime, maxTime int64) { 990 // No keys, nothing to do 991 if len(keys) == 0 { 992 return 993 } 994 995 if !bytesutil.IsSorted(keys) { 996 bytesutil.Sort(keys) 997 } 998 999 // If we're deleting the max time range, just use tombstoning to remove the 1000 // key from the offsets slice 1001 if minTime == math.MinInt64 && maxTime == math.MaxInt64 { 1002 d.Delete(keys) 1003 return 1004 } 1005 1006 // Is the range passed in outside of the time range for the file? 1007 min, max := d.TimeRange() 1008 if minTime > max || maxTime < min { 1009 return 1010 } 1011 1012 fullKeys := make([][]byte, 0, len(keys)) 1013 tombstones := map[string][]TimeRange{} 1014 var ie []IndexEntry 1015 1016 for i := 0; len(keys) > 0 && i < d.KeyCount(); i++ { 1017 k, entries := d.readEntriesAt(d.offset(i), &ie) 1018 1019 // Skip any keys that don't exist. These are less than the current key. 1020 for len(keys) > 0 && bytes.Compare(keys[0], k) < 0 { 1021 keys = keys[1:] 1022 } 1023 1024 // No more keys to delete, we're done. 1025 if len(keys) == 0 { 1026 break 1027 } 1028 1029 // If the current key is greater than the index one, continue to the next 1030 // index key. 1031 if len(keys) > 0 && bytes.Compare(keys[0], k) > 0 { 1032 continue 1033 } 1034 1035 // If multiple tombstones are saved for the same key 1036 if len(entries) == 0 { 1037 continue 1038 } 1039 1040 // Is the time range passed outside of the time range we've have stored for this key? 1041 min, max := entries[0].MinTime, entries[len(entries)-1].MaxTime 1042 if minTime > max || maxTime < min { 1043 continue 1044 } 1045 1046 // Does the range passed in cover every value for the key? 1047 if minTime <= min && maxTime >= max { 1048 fullKeys = append(fullKeys, keys[0]) 1049 keys = keys[1:] 1050 continue 1051 } 1052 1053 d.mu.RLock() 1054 existing := d.tombstones[string(k)] 1055 d.mu.RUnlock() 1056 1057 // Append the new tombonstes to the existing ones 1058 newTs := append(existing, append(tombstones[string(k)], TimeRange{minTime, maxTime})...) 1059 fn := func(i, j int) bool { 1060 a, b := newTs[i], newTs[j] 1061 if a.Min == b.Min { 1062 return a.Max <= b.Max 1063 } 1064 return a.Min < b.Min 1065 } 1066 1067 // Sort the updated tombstones if necessary 1068 if len(newTs) > 1 && !sort.SliceIsSorted(newTs, fn) { 1069 sort.Slice(newTs, fn) 1070 } 1071 1072 tombstones[string(k)] = newTs 1073 1074 // We need to see if all the tombstones end up deleting the entire series. This 1075 // could happen if their is one tombstore with min,max time spanning all the block 1076 // time ranges or from multiple smaller tombstones the delete segments. To detect 1077 // this cases, we use a window starting at the first tombstone and grow it be each 1078 // tombstone that is immediately adjacent to the current window or if it overlaps. 1079 // If there are any gaps, we abort. 1080 minTs, maxTs := newTs[0].Min, newTs[0].Max 1081 for j := 1; j < len(newTs); j++ { 1082 prevTs := newTs[j-1] 1083 ts := newTs[j] 1084 1085 // Make sure all the tombstone line up for a continuous range. We don't 1086 // want to have two small deletes on each edges end up causing us to 1087 // remove the full key. 1088 if prevTs.Max != ts.Min-1 && !prevTs.Overlaps(ts.Min, ts.Max) { 1089 minTs, maxTs = int64(math.MaxInt64), int64(math.MinInt64) 1090 break 1091 } 1092 1093 if ts.Min < minTs { 1094 minTs = ts.Min 1095 } 1096 if ts.Max > maxTs { 1097 maxTs = ts.Max 1098 } 1099 } 1100 1101 // If we have a fully deleted series, delete it all of it. 1102 if minTs <= min && maxTs >= max { 1103 fullKeys = append(fullKeys, keys[0]) 1104 keys = keys[1:] 1105 continue 1106 } 1107 } 1108 1109 // Delete all the keys that fully deleted in bulk 1110 if len(fullKeys) > 0 { 1111 d.Delete(fullKeys) 1112 } 1113 1114 if len(tombstones) == 0 { 1115 return 1116 } 1117 1118 d.mu.Lock() 1119 for k, v := range tombstones { 1120 d.tombstones[k] = v 1121 } 1122 d.mu.Unlock() 1123} 1124 1125// TombstoneRange returns ranges of time that are deleted for the given key. 1126func (d *indirectIndex) TombstoneRange(key []byte) []TimeRange { 1127 d.mu.RLock() 1128 r := d.tombstones[string(key)] 1129 d.mu.RUnlock() 1130 return r 1131} 1132 1133// Contains return true if the given key exists in the index. 1134func (d *indirectIndex) Contains(key []byte) bool { 1135 return len(d.Entries(key)) > 0 1136} 1137 1138// ContainsValue returns true if key and time might exist in this file. 1139func (d *indirectIndex) ContainsValue(key []byte, timestamp int64) bool { 1140 entry := d.Entry(key, timestamp) 1141 if entry == nil { 1142 return false 1143 } 1144 1145 d.mu.RLock() 1146 tombstones := d.tombstones[string(key)] 1147 d.mu.RUnlock() 1148 1149 for _, t := range tombstones { 1150 if t.Min <= timestamp && t.Max >= timestamp { 1151 return false 1152 } 1153 } 1154 return true 1155} 1156 1157// Type returns the block type of the values stored for the key. 1158func (d *indirectIndex) Type(key []byte) (byte, error) { 1159 d.mu.RLock() 1160 defer d.mu.RUnlock() 1161 1162 ofs := d.search(key) 1163 if ofs < len(d.b) { 1164 n, _ := readKey(d.b[ofs:]) 1165 ofs += n 1166 return d.b[ofs], nil 1167 } 1168 return 0, fmt.Errorf("key does not exist: %s", key) 1169} 1170 1171// OverlapsTimeRange returns true if the time range of the file intersect min and max. 1172func (d *indirectIndex) OverlapsTimeRange(min, max int64) bool { 1173 return d.minTime <= max && d.maxTime >= min 1174} 1175 1176// OverlapsKeyRange returns true if the min and max keys of the file overlap the arguments min and max. 1177func (d *indirectIndex) OverlapsKeyRange(min, max []byte) bool { 1178 return bytes.Compare(d.minKey, max) <= 0 && bytes.Compare(d.maxKey, min) >= 0 1179} 1180 1181// KeyRange returns the min and max keys in the index. 1182func (d *indirectIndex) KeyRange() ([]byte, []byte) { 1183 return d.minKey, d.maxKey 1184} 1185 1186// TimeRange returns the min and max time across all keys in the index. 1187func (d *indirectIndex) TimeRange() (int64, int64) { 1188 return d.minTime, d.maxTime 1189} 1190 1191// MarshalBinary returns a byte slice encoded version of the index. 1192func (d *indirectIndex) MarshalBinary() ([]byte, error) { 1193 d.mu.RLock() 1194 defer d.mu.RUnlock() 1195 1196 return d.b, nil 1197} 1198 1199// UnmarshalBinary populates an index from an encoded byte slice 1200// representation of an index. 1201func (d *indirectIndex) UnmarshalBinary(b []byte) error { 1202 d.mu.Lock() 1203 defer d.mu.Unlock() 1204 1205 // Keep a reference to the actual index bytes 1206 d.b = b 1207 if len(b) == 0 { 1208 return nil 1209 } 1210 1211 //var minKey, maxKey []byte 1212 var minTime, maxTime int64 = math.MaxInt64, 0 1213 1214 // To create our "indirect" index, we need to find the location of all the keys in 1215 // the raw byte slice. The keys are listed once each (in sorted order). Following 1216 // each key is a time ordered list of index entry blocks for that key. The loop below 1217 // basically skips across the slice keeping track of the counter when we are at a key 1218 // field. 1219 var i int32 1220 var offsets []int32 1221 iMax := int32(len(b)) 1222 for i < iMax { 1223 offsets = append(offsets, i) 1224 1225 // Skip to the start of the values 1226 // key length value (2) + type (1) + length of key 1227 if i+2 >= iMax { 1228 return fmt.Errorf("indirectIndex: not enough data for key length value") 1229 } 1230 i += 3 + int32(binary.BigEndian.Uint16(b[i:i+2])) 1231 1232 // count of index entries 1233 if i+indexCountSize >= iMax { 1234 return fmt.Errorf("indirectIndex: not enough data for index entries count") 1235 } 1236 count := int32(binary.BigEndian.Uint16(b[i : i+indexCountSize])) 1237 i += indexCountSize 1238 1239 // Find the min time for the block 1240 if i+8 >= iMax { 1241 return fmt.Errorf("indirectIndex: not enough data for min time") 1242 } 1243 minT := int64(binary.BigEndian.Uint64(b[i : i+8])) 1244 if minT < minTime { 1245 minTime = minT 1246 } 1247 1248 i += (count - 1) * indexEntrySize 1249 1250 // Find the max time for the block 1251 if i+16 >= iMax { 1252 return fmt.Errorf("indirectIndex: not enough data for max time") 1253 } 1254 maxT := int64(binary.BigEndian.Uint64(b[i+8 : i+16])) 1255 if maxT > maxTime { 1256 maxTime = maxT 1257 } 1258 1259 i += indexEntrySize 1260 } 1261 1262 firstOfs := offsets[0] 1263 _, key := readKey(b[firstOfs:]) 1264 d.minKey = key 1265 1266 lastOfs := offsets[len(offsets)-1] 1267 _, key = readKey(b[lastOfs:]) 1268 d.maxKey = key 1269 1270 d.minTime = minTime 1271 d.maxTime = maxTime 1272 1273 var err error 1274 d.offsets, err = mmap(nil, 0, len(offsets)*4) 1275 if err != nil { 1276 return err 1277 } 1278 for i, v := range offsets { 1279 binary.BigEndian.PutUint32(d.offsets[i*4:i*4+4], uint32(v)) 1280 } 1281 1282 return nil 1283} 1284 1285// Size returns the size of the current index in bytes. 1286func (d *indirectIndex) Size() uint32 { 1287 d.mu.RLock() 1288 defer d.mu.RUnlock() 1289 1290 return uint32(len(d.b)) 1291} 1292 1293func (d *indirectIndex) Close() error { 1294 // Windows doesn't use the anonymous map for the offsets index 1295 if runtime.GOOS == "windows" { 1296 return nil 1297 } 1298 return munmap(d.offsets[:cap(d.offsets)]) 1299} 1300 1301// mmapAccess is mmap based block accessor. It access blocks through an 1302// MMAP file interface. 1303type mmapAccessor struct { 1304 accessCount uint64 // Counter incremented everytime the mmapAccessor is accessed 1305 freeCount uint64 // Counter to determine whether the accessor can free its resources 1306 1307 mmapWillNeed bool // If true then mmap advise value MADV_WILLNEED will be provided the kernel for b. 1308 1309 mu sync.RWMutex 1310 b []byte 1311 f *os.File 1312 1313 index *indirectIndex 1314} 1315 1316// verifyVersion verifies that the reader's bytes are a TSM byte 1317// stream of the correct version (1) 1318func verifyVersion(r io.Reader) error { 1319 // Attempt to read magic number. 1320 var magic uint32 1321 if err := binary.Read(r, binary.BigEndian, &magic); err != nil { 1322 return fmt.Errorf("init: error reading magic number of file: %v", err) 1323 } 1324 1325 // Attempt to read version. 1326 var version byte 1327 if err := binary.Read(r, binary.BigEndian, &version); err != nil { 1328 return fmt.Errorf("init: error reading version: %v", err) 1329 } 1330 1331 // Ensure magic matches expectations. 1332 if magic != MagicNumber { 1333 return fmt.Errorf("can only read from tsm file") 1334 } 1335 1336 // Ensure version matches expectations. 1337 if version != Version { 1338 return fmt.Errorf("init: file is version %b. expected %b", version, Version) 1339 } 1340 1341 return nil 1342} 1343 1344func (m *mmapAccessor) init() (*indirectIndex, error) { 1345 m.mu.Lock() 1346 defer m.mu.Unlock() 1347 1348 if _, err := m.f.Seek(0, 0); err != nil { 1349 return nil, fmt.Errorf("init: failed to seek: %v", err) 1350 } 1351 1352 if err := verifyVersion(m.f); err != nil { 1353 return nil, err 1354 } 1355 1356 var err error 1357 1358 if _, err := m.f.Seek(0, 0); err != nil { 1359 return nil, err 1360 } 1361 1362 stat, err := m.f.Stat() 1363 if err != nil { 1364 return nil, err 1365 } 1366 1367 m.b, err = mmap(m.f, 0, int(stat.Size())) 1368 if err != nil { 1369 return nil, err 1370 } 1371 if len(m.b) < 8 { 1372 return nil, fmt.Errorf("mmapAccessor: byte slice too small for indirectIndex") 1373 } 1374 1375 // Hint to the kernel that we will be reading the file. It would be better to hint 1376 // that we will be reading the index section, but that's not been 1377 // implemented as yet. 1378 if m.mmapWillNeed { 1379 if err := madviseWillNeed(m.b); err != nil { 1380 return nil, err 1381 } 1382 } 1383 1384 indexOfsPos := len(m.b) - 8 1385 indexStart := binary.BigEndian.Uint64(m.b[indexOfsPos : indexOfsPos+8]) 1386 if indexStart >= uint64(indexOfsPos) { 1387 return nil, fmt.Errorf("mmapAccessor: invalid indexStart") 1388 } 1389 1390 m.index = NewIndirectIndex() 1391 if err := m.index.UnmarshalBinary(m.b[indexStart:indexOfsPos]); err != nil { 1392 return nil, err 1393 } 1394 1395 // Allow resources to be freed immediately if requested 1396 m.incAccess() 1397 atomic.StoreUint64(&m.freeCount, 1) 1398 1399 return m.index, nil 1400} 1401 1402func (m *mmapAccessor) free() error { 1403 accessCount := atomic.LoadUint64(&m.accessCount) 1404 freeCount := atomic.LoadUint64(&m.freeCount) 1405 1406 // Already freed everything. 1407 if freeCount == 0 && accessCount == 0 { 1408 return nil 1409 } 1410 1411 // Were there accesses after the last time we tried to free? 1412 // If so, don't free anything and record the access count that we 1413 // see now for the next check. 1414 if accessCount != freeCount { 1415 atomic.StoreUint64(&m.freeCount, accessCount) 1416 return nil 1417 } 1418 1419 // Reset both counters to zero to indicate that we have freed everything. 1420 atomic.StoreUint64(&m.accessCount, 0) 1421 atomic.StoreUint64(&m.freeCount, 0) 1422 1423 m.mu.RLock() 1424 defer m.mu.RUnlock() 1425 1426 return madviseDontNeed(m.b) 1427} 1428 1429func (m *mmapAccessor) incAccess() { 1430 atomic.AddUint64(&m.accessCount, 1) 1431} 1432 1433func (m *mmapAccessor) rename(path string) error { 1434 m.incAccess() 1435 1436 m.mu.Lock() 1437 defer m.mu.Unlock() 1438 1439 err := munmap(m.b) 1440 if err != nil { 1441 return err 1442 } 1443 1444 if err := m.f.Close(); err != nil { 1445 return err 1446 } 1447 1448 if err := file.RenameFile(m.f.Name(), path); err != nil { 1449 return err 1450 } 1451 1452 m.f, err = os.Open(path) 1453 if err != nil { 1454 return err 1455 } 1456 1457 if _, err := m.f.Seek(0, 0); err != nil { 1458 return err 1459 } 1460 1461 stat, err := m.f.Stat() 1462 if err != nil { 1463 return err 1464 } 1465 1466 m.b, err = mmap(m.f, 0, int(stat.Size())) 1467 if err != nil { 1468 return err 1469 } 1470 1471 if m.mmapWillNeed { 1472 return madviseWillNeed(m.b) 1473 } 1474 return nil 1475} 1476 1477func (m *mmapAccessor) read(key []byte, timestamp int64) ([]Value, error) { 1478 entry := m.index.Entry(key, timestamp) 1479 if entry == nil { 1480 return nil, nil 1481 } 1482 1483 return m.readBlock(entry, nil) 1484} 1485 1486func (m *mmapAccessor) readBlock(entry *IndexEntry, values []Value) ([]Value, error) { 1487 m.incAccess() 1488 1489 m.mu.RLock() 1490 defer m.mu.RUnlock() 1491 1492 if int64(len(m.b)) < entry.Offset+int64(entry.Size) { 1493 return nil, ErrTSMClosed 1494 } 1495 //TODO: Validate checksum 1496 var err error 1497 values, err = DecodeBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values) 1498 if err != nil { 1499 return nil, err 1500 } 1501 1502 return values, nil 1503} 1504 1505func (m *mmapAccessor) readBytes(entry *IndexEntry, b []byte) (uint32, []byte, error) { 1506 m.incAccess() 1507 1508 m.mu.RLock() 1509 if int64(len(m.b)) < entry.Offset+int64(entry.Size) { 1510 m.mu.RUnlock() 1511 return 0, nil, ErrTSMClosed 1512 } 1513 1514 // return the bytes after the 4 byte checksum 1515 crc, block := binary.BigEndian.Uint32(m.b[entry.Offset:entry.Offset+4]), m.b[entry.Offset+4:entry.Offset+int64(entry.Size)] 1516 m.mu.RUnlock() 1517 1518 return crc, block, nil 1519} 1520 1521// readAll returns all values for a key in all blocks. 1522func (m *mmapAccessor) readAll(key []byte) ([]Value, error) { 1523 m.incAccess() 1524 1525 blocks := m.index.Entries(key) 1526 if len(blocks) == 0 { 1527 return nil, nil 1528 } 1529 1530 tombstones := m.index.TombstoneRange(key) 1531 1532 m.mu.RLock() 1533 defer m.mu.RUnlock() 1534 1535 var temp []Value 1536 var err error 1537 var values []Value 1538 for _, block := range blocks { 1539 var skip bool 1540 for _, t := range tombstones { 1541 // Should we skip this block because it contains points that have been deleted 1542 if t.Min <= block.MinTime && t.Max >= block.MaxTime { 1543 skip = true 1544 break 1545 } 1546 } 1547 1548 if skip { 1549 continue 1550 } 1551 //TODO: Validate checksum 1552 temp = temp[:0] 1553 // The +4 is the 4 byte checksum length 1554 temp, err = DecodeBlock(m.b[block.Offset+4:block.Offset+int64(block.Size)], temp) 1555 if err != nil { 1556 return nil, err 1557 } 1558 1559 // Filter out any values that were deleted 1560 for _, t := range tombstones { 1561 temp = Values(temp).Exclude(t.Min, t.Max) 1562 } 1563 1564 values = append(values, temp...) 1565 } 1566 1567 return values, nil 1568} 1569 1570func (m *mmapAccessor) path() string { 1571 m.mu.RLock() 1572 path := m.f.Name() 1573 m.mu.RUnlock() 1574 return path 1575} 1576 1577func (m *mmapAccessor) close() error { 1578 m.mu.Lock() 1579 defer m.mu.Unlock() 1580 1581 if m.b == nil { 1582 return nil 1583 } 1584 1585 err := munmap(m.b) 1586 if err != nil { 1587 return err 1588 } 1589 1590 m.b = nil 1591 return m.f.Close() 1592} 1593 1594type indexEntries struct { 1595 Type byte 1596 entries []IndexEntry 1597} 1598 1599func (a *indexEntries) Len() int { return len(a.entries) } 1600func (a *indexEntries) Swap(i, j int) { a.entries[i], a.entries[j] = a.entries[j], a.entries[i] } 1601func (a *indexEntries) Less(i, j int) bool { 1602 return a.entries[i].MinTime < a.entries[j].MinTime 1603} 1604 1605func (a *indexEntries) MarshalBinary() ([]byte, error) { 1606 buf := make([]byte, len(a.entries)*indexEntrySize) 1607 1608 for i, entry := range a.entries { 1609 entry.AppendTo(buf[indexEntrySize*i:]) 1610 } 1611 1612 return buf, nil 1613} 1614 1615func (a *indexEntries) WriteTo(w io.Writer) (total int64, err error) { 1616 var buf [indexEntrySize]byte 1617 var n int 1618 1619 for _, entry := range a.entries { 1620 entry.AppendTo(buf[:]) 1621 n, err = w.Write(buf[:]) 1622 total += int64(n) 1623 if err != nil { 1624 return total, err 1625 } 1626 } 1627 1628 return total, nil 1629} 1630 1631func readKey(b []byte) (n int, key []byte) { 1632 // 2 byte size of key 1633 n, size := 2, int(binary.BigEndian.Uint16(b[:2])) 1634 1635 // N byte key 1636 key = b[n : n+size] 1637 1638 n += len(key) 1639 return 1640} 1641 1642func readEntries(b []byte, entries *indexEntries) (n int, err error) { 1643 if len(b) < 1+indexCountSize { 1644 return 0, fmt.Errorf("readEntries: data too short for headers") 1645 } 1646 1647 // 1 byte block type 1648 entries.Type = b[n] 1649 n++ 1650 1651 // 2 byte count of index entries 1652 count := int(binary.BigEndian.Uint16(b[n : n+indexCountSize])) 1653 n += indexCountSize 1654 1655 if cap(entries.entries) < count { 1656 entries.entries = make([]IndexEntry, count) 1657 } else { 1658 entries.entries = entries.entries[:count] 1659 } 1660 1661 b = b[indexCountSize+indexTypeSize:] 1662 for i := 0; i < len(entries.entries); i++ { 1663 if err = entries.entries[i].UnmarshalBinary(b); err != nil { 1664 return 0, fmt.Errorf("readEntries: unmarshal error: %v", err) 1665 } 1666 b = b[indexEntrySize:] 1667 } 1668 1669 n += count * indexEntrySize 1670 1671 return 1672} 1673