1package tsm1 2 3import ( 4 "bytes" 5 "encoding/binary" 6 "fmt" 7 "io" 8 "math" 9 "os" 10 "runtime" 11 "sort" 12 "sync" 13 "sync/atomic" 14 15 "github.com/influxdata/influxdb/pkg/bytesutil" 16 "github.com/influxdata/influxdb/tsdb" 17) 18 19// ErrFileInUse is returned when attempting to remove or close a TSM file that is still being used. 20var ErrFileInUse = fmt.Errorf("file still in use") 21 22// nilOffset is the value written to the offsets to indicate that position is deleted. The value is the max 23// uint32 which is an invalid position. We don't use 0 as 0 is actually a valid position. 24var nilOffset = []byte{255, 255, 255, 255} 25 26// TSMReader is a reader for a TSM file. 27type TSMReader struct { 28 // refs is the count of active references to this reader. 29 refs int64 30 refsWG sync.WaitGroup 31 32 madviseWillNeed bool // Hint to the kernel with MADV_WILLNEED. 33 mu sync.RWMutex 34 35 // accessor provides access and decoding of blocks for the reader. 36 accessor blockAccessor 37 38 // index is the index of all blocks. 39 index TSMIndex 40 41 // tombstoner ensures tombstoned keys are not available by the index. 42 tombstoner *Tombstoner 43 44 // size is the size of the file on disk. 45 size int64 46 47 // lastModified is the last time this file was modified on disk 48 lastModified int64 49 50 // deleteMu limits concurrent deletes 51 deleteMu sync.Mutex 52} 53 54// TSMIndex represent the index section of a TSM file. The index records all 55// blocks, their locations, sizes, min and max times. 56type TSMIndex interface { 57 // Delete removes the given keys from the index. 58 Delete(keys [][]byte) 59 60 // DeleteRange removes the given keys with data between minTime and maxTime from the index. 61 DeleteRange(keys [][]byte, minTime, maxTime int64) 62 63 // ContainsKey returns true if the given key may exist in the index. This func is faster than 64 // Contains but, may return false positives. 65 ContainsKey(key []byte) bool 66 67 // Contains return true if the given key exists in the index. 68 Contains(key []byte) bool 69 70 // ContainsValue returns true if key and time might exist in this file. This function could 71 // return true even though the actual point does not exists. For example, the key may 72 // exist in this file, but not have a point exactly at time t. 73 ContainsValue(key []byte, timestamp int64) bool 74 75 // Entries returns all index entries for a key. 76 Entries(key []byte) []IndexEntry 77 78 // ReadEntries reads the index entries for key into entries. 79 ReadEntries(key []byte, entries *[]IndexEntry) []IndexEntry 80 81 // Entry returns the index entry for the specified key and timestamp. If no entry 82 // matches the key and timestamp, nil is returned. 83 Entry(key []byte, timestamp int64) *IndexEntry 84 85 // Key returns the key in the index at the given position, using entries to avoid allocations. 86 Key(index int, entries *[]IndexEntry) ([]byte, byte, []IndexEntry) 87 88 // KeyAt returns the key in the index at the given position. 89 KeyAt(index int) ([]byte, byte) 90 91 // KeyCount returns the count of unique keys in the index. 92 KeyCount() int 93 94 // Seek returns the position in the index where key <= value in the index. 95 Seek(key []byte) int 96 97 // OverlapsTimeRange returns true if the time range of the file intersect min and max. 98 OverlapsTimeRange(min, max int64) bool 99 100 // OverlapsKeyRange returns true if the min and max keys of the file overlap the arguments min and max. 101 OverlapsKeyRange(min, max []byte) bool 102 103 // Size returns the size of the current index in bytes. 104 Size() uint32 105 106 // TimeRange returns the min and max time across all keys in the file. 107 TimeRange() (int64, int64) 108 109 // TombstoneRange returns ranges of time that are deleted for the given key. 110 TombstoneRange(key []byte) []TimeRange 111 112 // KeyRange returns the min and max keys in the file. 113 KeyRange() ([]byte, []byte) 114 115 // Type returns the block type of the values stored for the key. Returns one of 116 // BlockFloat64, BlockInt64, BlockBool, BlockString. If key does not exist, 117 // an error is returned. 118 Type(key []byte) (byte, error) 119 120 // UnmarshalBinary populates an index from an encoded byte slice 121 // representation of an index. 122 UnmarshalBinary(b []byte) error 123 124 // Close closes the index and releases any resources. 125 Close() error 126} 127 128// BlockIterator allows iterating over each block in a TSM file in order. It provides 129// raw access to the block bytes without decoding them. 130type BlockIterator struct { 131 r *TSMReader 132 133 // i is the current key index 134 i int 135 136 // n is the total number of keys 137 n int 138 139 key []byte 140 cache []IndexEntry 141 entries []IndexEntry 142 err error 143 typ byte 144} 145 146// PeekNext returns the next key to be iterated or an empty string. 147func (b *BlockIterator) PeekNext() []byte { 148 if len(b.entries) > 1 { 149 return b.key 150 } else if b.n-b.i > 1 { 151 key, _ := b.r.KeyAt(b.i + 1) 152 return key 153 } 154 return nil 155} 156 157// Next returns true if there are more blocks to iterate through. 158func (b *BlockIterator) Next() bool { 159 if b.err != nil { 160 return false 161 } 162 163 if b.n-b.i == 0 && len(b.entries) == 0 { 164 return false 165 } 166 167 if len(b.entries) > 0 { 168 b.entries = b.entries[1:] 169 if len(b.entries) > 0 { 170 return true 171 } 172 } 173 174 if b.n-b.i > 0 { 175 b.key, b.typ, b.entries = b.r.Key(b.i, &b.cache) 176 b.i++ 177 178 // If there were deletes on the TSMReader, then our index is now off and we 179 // can't proceed. What we just read may not actually the next block. 180 if b.n != b.r.KeyCount() { 181 b.err = fmt.Errorf("delete during iteration") 182 return false 183 } 184 185 if len(b.entries) > 0 { 186 return true 187 } 188 } 189 190 return false 191} 192 193// Read reads information about the next block to be iterated. 194func (b *BlockIterator) Read() (key []byte, minTime int64, maxTime int64, typ byte, checksum uint32, buf []byte, err error) { 195 if b.err != nil { 196 return nil, 0, 0, 0, 0, nil, b.err 197 } 198 checksum, buf, err = b.r.ReadBytes(&b.entries[0], nil) 199 if err != nil { 200 return nil, 0, 0, 0, 0, nil, err 201 } 202 return b.key, b.entries[0].MinTime, b.entries[0].MaxTime, b.typ, checksum, buf, err 203} 204 205// Err returns any errors encounter during iteration. 206func (b *BlockIterator) Err() error { 207 return b.err 208} 209 210// blockAccessor abstracts a method of accessing blocks from a 211// TSM file. 212type blockAccessor interface { 213 init() (*indirectIndex, error) 214 read(key []byte, timestamp int64) ([]Value, error) 215 readAll(key []byte) ([]Value, error) 216 readBlock(entry *IndexEntry, values []Value) ([]Value, error) 217 readFloatBlock(entry *IndexEntry, values *[]FloatValue) ([]FloatValue, error) 218 readIntegerBlock(entry *IndexEntry, values *[]IntegerValue) ([]IntegerValue, error) 219 readUnsignedBlock(entry *IndexEntry, values *[]UnsignedValue) ([]UnsignedValue, error) 220 readStringBlock(entry *IndexEntry, values *[]StringValue) ([]StringValue, error) 221 readBooleanBlock(entry *IndexEntry, values *[]BooleanValue) ([]BooleanValue, error) 222 readBytes(entry *IndexEntry, buf []byte) (uint32, []byte, error) 223 rename(path string) error 224 path() string 225 close() error 226 free() error 227} 228 229type tsmReaderOption func(*TSMReader) 230 231// WithMadviseWillNeed is an option for specifying whether to provide a MADV_WILL need hint to the kernel. 232var WithMadviseWillNeed = func(willNeed bool) tsmReaderOption { 233 return func(r *TSMReader) { 234 r.madviseWillNeed = willNeed 235 } 236} 237 238// NewTSMReader returns a new TSMReader from the given file. 239func NewTSMReader(f *os.File, options ...tsmReaderOption) (*TSMReader, error) { 240 t := &TSMReader{} 241 for _, option := range options { 242 option(t) 243 } 244 245 stat, err := f.Stat() 246 if err != nil { 247 return nil, err 248 } 249 t.size = stat.Size() 250 t.lastModified = stat.ModTime().UnixNano() 251 t.accessor = &mmapAccessor{ 252 f: f, 253 mmapWillNeed: t.madviseWillNeed, 254 } 255 256 index, err := t.accessor.init() 257 if err != nil { 258 return nil, err 259 } 260 261 t.index = index 262 t.tombstoner = NewTombstoner(t.Path(), index.ContainsKey) 263 264 if err := t.applyTombstones(); err != nil { 265 return nil, err 266 } 267 268 return t, nil 269} 270 271// WithObserver sets the observer for the TSM reader. 272func (t *TSMReader) WithObserver(obs tsdb.FileStoreObserver) { 273 t.tombstoner.WithObserver(obs) 274} 275 276func (t *TSMReader) applyTombstones() error { 277 var cur, prev Tombstone 278 batch := make([][]byte, 0, 4096) 279 280 if err := t.tombstoner.Walk(func(ts Tombstone) error { 281 cur = ts 282 if len(batch) > 0 { 283 if prev.Min != cur.Min || prev.Max != cur.Max { 284 t.index.DeleteRange(batch, prev.Min, prev.Max) 285 batch = batch[:0] 286 } 287 } 288 289 // Copy the tombstone key and re-use the buffers to avoid allocations 290 n := len(batch) 291 batch = batch[:n+1] 292 if cap(batch[n]) < len(ts.Key) { 293 batch[n] = make([]byte, len(ts.Key)) 294 } else { 295 batch[n] = batch[n][:len(ts.Key)] 296 } 297 copy(batch[n], ts.Key) 298 299 if len(batch) >= 4096 { 300 t.index.DeleteRange(batch, prev.Min, prev.Max) 301 batch = batch[:0] 302 } 303 304 prev = ts 305 return nil 306 }); err != nil { 307 return fmt.Errorf("init: read tombstones: %v", err) 308 } 309 310 if len(batch) > 0 { 311 t.index.DeleteRange(batch, cur.Min, cur.Max) 312 } 313 return nil 314} 315 316func (t *TSMReader) Free() error { 317 t.mu.RLock() 318 defer t.mu.RUnlock() 319 return t.accessor.free() 320} 321 322// Path returns the path of the file the TSMReader was initialized with. 323func (t *TSMReader) Path() string { 324 t.mu.RLock() 325 p := t.accessor.path() 326 t.mu.RUnlock() 327 return p 328} 329 330// Key returns the key and the underlying entry at the numeric index. 331func (t *TSMReader) Key(index int, entries *[]IndexEntry) ([]byte, byte, []IndexEntry) { 332 return t.index.Key(index, entries) 333} 334 335// KeyAt returns the key and key type at position idx in the index. 336func (t *TSMReader) KeyAt(idx int) ([]byte, byte) { 337 return t.index.KeyAt(idx) 338} 339 340func (t *TSMReader) Seek(key []byte) int { 341 return t.index.Seek(key) 342} 343 344// ReadAt returns the values corresponding to the given index entry. 345func (t *TSMReader) ReadAt(entry *IndexEntry, vals []Value) ([]Value, error) { 346 t.mu.RLock() 347 v, err := t.accessor.readBlock(entry, vals) 348 t.mu.RUnlock() 349 return v, err 350} 351 352// ReadFloatBlockAt returns the float values corresponding to the given index entry. 353func (t *TSMReader) ReadFloatBlockAt(entry *IndexEntry, vals *[]FloatValue) ([]FloatValue, error) { 354 t.mu.RLock() 355 v, err := t.accessor.readFloatBlock(entry, vals) 356 t.mu.RUnlock() 357 return v, err 358} 359 360// ReadIntegerBlockAt returns the integer values corresponding to the given index entry. 361func (t *TSMReader) ReadIntegerBlockAt(entry *IndexEntry, vals *[]IntegerValue) ([]IntegerValue, error) { 362 t.mu.RLock() 363 v, err := t.accessor.readIntegerBlock(entry, vals) 364 t.mu.RUnlock() 365 return v, err 366} 367 368// ReadUnsignedBlockAt returns the unsigned integer values corresponding to the given index entry. 369func (t *TSMReader) ReadUnsignedBlockAt(entry *IndexEntry, vals *[]UnsignedValue) ([]UnsignedValue, error) { 370 t.mu.RLock() 371 v, err := t.accessor.readUnsignedBlock(entry, vals) 372 t.mu.RUnlock() 373 return v, err 374} 375 376// ReadStringBlockAt returns the string values corresponding to the given index entry. 377func (t *TSMReader) ReadStringBlockAt(entry *IndexEntry, vals *[]StringValue) ([]StringValue, error) { 378 t.mu.RLock() 379 v, err := t.accessor.readStringBlock(entry, vals) 380 t.mu.RUnlock() 381 return v, err 382} 383 384// ReadBooleanBlockAt returns the boolean values corresponding to the given index entry. 385func (t *TSMReader) ReadBooleanBlockAt(entry *IndexEntry, vals *[]BooleanValue) ([]BooleanValue, error) { 386 t.mu.RLock() 387 v, err := t.accessor.readBooleanBlock(entry, vals) 388 t.mu.RUnlock() 389 return v, err 390} 391 392// Read returns the values corresponding to the block at the given key and timestamp. 393func (t *TSMReader) Read(key []byte, timestamp int64) ([]Value, error) { 394 t.mu.RLock() 395 v, err := t.accessor.read(key, timestamp) 396 t.mu.RUnlock() 397 return v, err 398} 399 400// ReadAll returns all values for a key in all blocks. 401func (t *TSMReader) ReadAll(key []byte) ([]Value, error) { 402 t.mu.RLock() 403 v, err := t.accessor.readAll(key) 404 t.mu.RUnlock() 405 return v, err 406} 407 408func (t *TSMReader) ReadBytes(e *IndexEntry, b []byte) (uint32, []byte, error) { 409 t.mu.RLock() 410 n, v, err := t.accessor.readBytes(e, b) 411 t.mu.RUnlock() 412 return n, v, err 413} 414 415// Type returns the type of values stored at the given key. 416func (t *TSMReader) Type(key []byte) (byte, error) { 417 return t.index.Type(key) 418} 419 420// Close closes the TSMReader. 421func (t *TSMReader) Close() error { 422 t.refsWG.Wait() 423 424 t.mu.Lock() 425 defer t.mu.Unlock() 426 427 if err := t.accessor.close(); err != nil { 428 return err 429 } 430 431 return t.index.Close() 432} 433 434// Ref records a usage of this TSMReader. If there are active references 435// when the reader is closed or removed, the reader will remain open until 436// there are no more references. 437func (t *TSMReader) Ref() { 438 atomic.AddInt64(&t.refs, 1) 439 t.refsWG.Add(1) 440} 441 442// Unref removes a usage record of this TSMReader. If the Reader was closed 443// by another goroutine while there were active references, the file will 444// be closed and remove 445func (t *TSMReader) Unref() { 446 atomic.AddInt64(&t.refs, -1) 447 t.refsWG.Done() 448} 449 450// InUse returns whether the TSMReader currently has any active references. 451func (t *TSMReader) InUse() bool { 452 refs := atomic.LoadInt64(&t.refs) 453 return refs > 0 454} 455 456// Remove removes any underlying files stored on disk for this reader. 457func (t *TSMReader) Remove() error { 458 t.mu.Lock() 459 defer t.mu.Unlock() 460 return t.remove() 461} 462 463// Rename renames the underlying file to the new path. 464func (t *TSMReader) Rename(path string) error { 465 t.mu.Lock() 466 defer t.mu.Unlock() 467 return t.accessor.rename(path) 468} 469 470// Remove removes any underlying files stored on disk for this reader. 471func (t *TSMReader) remove() error { 472 path := t.accessor.path() 473 474 if t.InUse() { 475 return ErrFileInUse 476 } 477 478 if path != "" { 479 err := os.RemoveAll(path) 480 if err != nil { 481 return err 482 } 483 } 484 485 if err := t.tombstoner.Delete(); err != nil { 486 return err 487 } 488 return nil 489} 490 491// Contains returns whether the given key is present in the index. 492func (t *TSMReader) Contains(key []byte) bool { 493 return t.index.Contains(key) 494} 495 496// ContainsValue returns true if key and time might exists in this file. This function could 497// return true even though the actual point does not exist. For example, the key may 498// exist in this file, but not have a point exactly at time t. 499func (t *TSMReader) ContainsValue(key []byte, ts int64) bool { 500 return t.index.ContainsValue(key, ts) 501} 502 503// DeleteRange removes the given points for keys between minTime and maxTime. The series 504// keys passed in must be sorted. 505func (t *TSMReader) DeleteRange(keys [][]byte, minTime, maxTime int64) error { 506 if len(keys) == 0 { 507 return nil 508 } 509 510 batch := t.BatchDelete() 511 if err := batch.DeleteRange(keys, minTime, maxTime); err != nil { 512 batch.Rollback() 513 return err 514 } 515 return batch.Commit() 516} 517 518// Delete deletes blocks indicated by keys. 519func (t *TSMReader) Delete(keys [][]byte) error { 520 if err := t.tombstoner.Add(keys); err != nil { 521 return err 522 } 523 524 if err := t.tombstoner.Flush(); err != nil { 525 return err 526 } 527 528 t.index.Delete(keys) 529 return nil 530} 531 532// OverlapsTimeRange returns true if the time range of the file intersect min and max. 533func (t *TSMReader) OverlapsTimeRange(min, max int64) bool { 534 return t.index.OverlapsTimeRange(min, max) 535} 536 537// OverlapsKeyRange returns true if the key range of the file intersect min and max. 538func (t *TSMReader) OverlapsKeyRange(min, max []byte) bool { 539 return t.index.OverlapsKeyRange(min, max) 540} 541 542// TimeRange returns the min and max time across all keys in the file. 543func (t *TSMReader) TimeRange() (int64, int64) { 544 return t.index.TimeRange() 545} 546 547// KeyRange returns the min and max key across all keys in the file. 548func (t *TSMReader) KeyRange() ([]byte, []byte) { 549 return t.index.KeyRange() 550} 551 552// KeyCount returns the count of unique keys in the TSMReader. 553func (t *TSMReader) KeyCount() int { 554 return t.index.KeyCount() 555} 556 557// Entries returns all index entries for key. 558func (t *TSMReader) Entries(key []byte) []IndexEntry { 559 return t.index.Entries(key) 560} 561 562// ReadEntries reads the index entries for key into entries. 563func (t *TSMReader) ReadEntries(key []byte, entries *[]IndexEntry) []IndexEntry { 564 return t.index.ReadEntries(key, entries) 565} 566 567// IndexSize returns the size of the index in bytes. 568func (t *TSMReader) IndexSize() uint32 { 569 return t.index.Size() 570} 571 572// Size returns the size of the underlying file in bytes. 573func (t *TSMReader) Size() uint32 { 574 t.mu.RLock() 575 size := t.size 576 t.mu.RUnlock() 577 return uint32(size) 578} 579 580// LastModified returns the last time the underlying file was modified. 581func (t *TSMReader) LastModified() int64 { 582 t.mu.RLock() 583 lm := t.lastModified 584 for _, ts := range t.tombstoner.TombstoneFiles() { 585 if ts.LastModified > lm { 586 lm = ts.LastModified 587 } 588 } 589 t.mu.RUnlock() 590 return lm 591} 592 593// HasTombstones return true if there are any tombstone entries recorded. 594func (t *TSMReader) HasTombstones() bool { 595 t.mu.RLock() 596 b := t.tombstoner.HasTombstones() 597 t.mu.RUnlock() 598 return b 599} 600 601// TombstoneFiles returns any tombstone files associated with this TSM file. 602func (t *TSMReader) TombstoneFiles() []FileStat { 603 t.mu.RLock() 604 fs := t.tombstoner.TombstoneFiles() 605 t.mu.RUnlock() 606 return fs 607} 608 609// TombstoneRange returns ranges of time that are deleted for the given key. 610func (t *TSMReader) TombstoneRange(key []byte) []TimeRange { 611 t.mu.RLock() 612 tr := t.index.TombstoneRange(key) 613 t.mu.RUnlock() 614 return tr 615} 616 617// Stats returns the FileStat for the TSMReader's underlying file. 618func (t *TSMReader) Stats() FileStat { 619 minTime, maxTime := t.index.TimeRange() 620 minKey, maxKey := t.index.KeyRange() 621 return FileStat{ 622 Path: t.Path(), 623 Size: t.Size(), 624 LastModified: t.LastModified(), 625 MinTime: minTime, 626 MaxTime: maxTime, 627 MinKey: minKey, 628 MaxKey: maxKey, 629 HasTombstone: t.tombstoner.HasTombstones(), 630 } 631} 632 633// BlockIterator returns a BlockIterator for the underlying TSM file. 634func (t *TSMReader) BlockIterator() *BlockIterator { 635 return &BlockIterator{ 636 r: t, 637 n: t.index.KeyCount(), 638 } 639} 640 641type BatchDeleter interface { 642 DeleteRange(keys [][]byte, min, max int64) error 643 Commit() error 644 Rollback() error 645} 646 647type batchDelete struct { 648 r *TSMReader 649} 650 651func (b *batchDelete) DeleteRange(keys [][]byte, minTime, maxTime int64) error { 652 if len(keys) == 0 { 653 return nil 654 } 655 656 // If the keys can't exist in this TSM file, skip it. 657 minKey, maxKey := keys[0], keys[len(keys)-1] 658 if !b.r.index.OverlapsKeyRange(minKey, maxKey) { 659 return nil 660 } 661 662 // If the timerange can't exist in this TSM file, skip it. 663 if !b.r.index.OverlapsTimeRange(minTime, maxTime) { 664 return nil 665 } 666 667 if err := b.r.tombstoner.AddRange(keys, minTime, maxTime); err != nil { 668 return err 669 } 670 671 return nil 672} 673 674func (b *batchDelete) Commit() error { 675 defer b.r.deleteMu.Unlock() 676 if err := b.r.tombstoner.Flush(); err != nil { 677 return err 678 } 679 680 return b.r.applyTombstones() 681} 682 683func (b *batchDelete) Rollback() error { 684 defer b.r.deleteMu.Unlock() 685 return b.r.tombstoner.Rollback() 686} 687 688// BatchDelete returns a BatchDeleter. Only a single goroutine may run a BatchDelete at a time. 689// Callers must either Commit or Rollback the operation. 690func (r *TSMReader) BatchDelete() BatchDeleter { 691 r.deleteMu.Lock() 692 return &batchDelete{r: r} 693} 694 695type BatchDeleters []BatchDeleter 696 697func (a BatchDeleters) DeleteRange(keys [][]byte, min, max int64) error { 698 errC := make(chan error, len(a)) 699 for _, b := range a { 700 go func(b BatchDeleter) { errC <- b.DeleteRange(keys, min, max) }(b) 701 } 702 703 var err error 704 for i := 0; i < len(a); i++ { 705 dErr := <-errC 706 if dErr != nil { 707 err = dErr 708 } 709 } 710 return err 711} 712 713func (a BatchDeleters) Commit() error { 714 errC := make(chan error, len(a)) 715 for _, b := range a { 716 go func(b BatchDeleter) { errC <- b.Commit() }(b) 717 } 718 719 var err error 720 for i := 0; i < len(a); i++ { 721 dErr := <-errC 722 if dErr != nil { 723 err = dErr 724 } 725 } 726 return err 727} 728 729func (a BatchDeleters) Rollback() error { 730 errC := make(chan error, len(a)) 731 for _, b := range a { 732 go func(b BatchDeleter) { errC <- b.Rollback() }(b) 733 } 734 735 var err error 736 for i := 0; i < len(a); i++ { 737 dErr := <-errC 738 if dErr != nil { 739 err = dErr 740 } 741 } 742 return err 743} 744 745// indirectIndex is a TSMIndex that uses a raw byte slice representation of an index. This 746// implementation can be used for indexes that may be MMAPed into memory. 747type indirectIndex struct { 748 mu sync.RWMutex 749 750 // indirectIndex works a follows. Assuming we have an index structure in memory as 751 // the diagram below: 752 // 753 // ┌────────────────────────────────────────────────────────────────────┐ 754 // │ Index │ 755 // ├─┬──────────────────────┬──┬───────────────────────┬───┬────────────┘ 756 // │0│ │62│ │145│ 757 // ├─┴───────┬─────────┬────┼──┴──────┬─────────┬──────┼───┴─────┬──────┐ 758 // │Key 1 Len│ Key │... │Key 2 Len│ Key 2 │ ... │ Key 3 │ ... │ 759 // │ 2 bytes │ N bytes │ │ 2 bytes │ N bytes │ │ 2 bytes │ │ 760 // └─────────┴─────────┴────┴─────────┴─────────┴──────┴─────────┴──────┘ 761 762 // We would build an `offsets` slices where each element pointers to the byte location 763 // for the first key in the index slice. 764 765 // ┌────────────────────────────────────────────────────────────────────┐ 766 // │ Offsets │ 767 // ├────┬────┬────┬─────────────────────────────────────────────────────┘ 768 // │ 0 │ 62 │145 │ 769 // └────┴────┴────┘ 770 771 // Using this offset slice we can find `Key 2` by doing a binary search 772 // over the offsets slice. Instead of comparing the value in the offsets 773 // (e.g. `62`), we use that as an index into the underlying index to 774 // retrieve the key at postion `62` and perform our comparisons with that. 775 776 // When we have identified the correct position in the index for a given 777 // key, we could perform another binary search or a linear scan. This 778 // should be fast as well since each index entry is 28 bytes and all 779 // contiguous in memory. The current implementation uses a linear scan since the 780 // number of block entries is expected to be < 100 per key. 781 782 // b is the underlying index byte slice. This could be a copy on the heap or an MMAP 783 // slice reference 784 b []byte 785 786 // offsets contains the positions in b for each key. It points to the 2 byte length of 787 // key. 788 offsets []byte 789 790 // minKey, maxKey are the minium and maximum (lexicographically sorted) contained in the 791 // file 792 minKey, maxKey []byte 793 794 // minTime, maxTime are the minimum and maximum times contained in the file across all 795 // series. 796 minTime, maxTime int64 797 798 // tombstones contains only the tombstoned keys with subset of time values deleted. An 799 // entry would exist here if a subset of the points for a key were deleted and the file 800 // had not be re-compacted to remove the points on disk. 801 tombstones map[string][]TimeRange 802} 803 804// TimeRange holds a min and max timestamp. 805type TimeRange struct { 806 Min, Max int64 807} 808 809func (t TimeRange) Overlaps(min, max int64) bool { 810 return t.Min <= max && t.Max >= min 811} 812 813// NewIndirectIndex returns a new indirect index. 814func NewIndirectIndex() *indirectIndex { 815 return &indirectIndex{ 816 tombstones: make(map[string][]TimeRange), 817 } 818} 819 820func (d *indirectIndex) offset(i int) int { 821 if i < 0 || i+4 > len(d.offsets) { 822 return -1 823 } 824 return int(binary.BigEndian.Uint32(d.offsets[i*4 : i*4+4])) 825} 826 827func (d *indirectIndex) Seek(key []byte) int { 828 d.mu.RLock() 829 defer d.mu.RUnlock() 830 return d.searchOffset(key) 831} 832 833// searchOffset searches the offsets slice for key and returns the position in 834// offsets where key would exist. 835func (d *indirectIndex) searchOffset(key []byte) int { 836 // We use a binary search across our indirect offsets (pointers to all the keys 837 // in the index slice). 838 i := bytesutil.SearchBytesFixed(d.offsets, 4, func(x []byte) bool { 839 // i is the position in offsets we are at so get offset it points to 840 offset := int32(binary.BigEndian.Uint32(x)) 841 842 // It's pointing to the start of the key which is a 2 byte length 843 keyLen := int32(binary.BigEndian.Uint16(d.b[offset : offset+2])) 844 845 // See if it matches 846 return bytes.Compare(d.b[offset+2:offset+2+keyLen], key) >= 0 847 }) 848 849 // See if we might have found the right index 850 if i < len(d.offsets) { 851 return int(i / 4) 852 } 853 854 // The key is not in the index. i is the index where it would be inserted so return 855 // a value outside our offset range. 856 return int(len(d.offsets)) / 4 857} 858 859// search returns the byte position of key in the index. If key is not 860// in the index, len(index) is returned. 861func (d *indirectIndex) search(key []byte) int { 862 if !d.ContainsKey(key) { 863 return len(d.b) 864 } 865 866 // We use a binary search across our indirect offsets (pointers to all the keys 867 // in the index slice). 868 i := bytesutil.SearchBytesFixed(d.offsets, 4, func(x []byte) bool { 869 // i is the position in offsets we are at so get offset it points to 870 offset := int32(binary.BigEndian.Uint32(x)) 871 872 // It's pointing to the start of the key which is a 2 byte length 873 keyLen := int32(binary.BigEndian.Uint16(d.b[offset : offset+2])) 874 875 // See if it matches 876 return bytes.Compare(d.b[offset+2:offset+2+keyLen], key) >= 0 877 }) 878 879 // See if we might have found the right index 880 if i < len(d.offsets) { 881 ofs := binary.BigEndian.Uint32(d.offsets[i : i+4]) 882 _, k := readKey(d.b[ofs:]) 883 884 // The search may have returned an i == 0 which could indicated that the value 885 // searched should be inserted at postion 0. Make sure the key in the index 886 // matches the search value. 887 if !bytes.Equal(key, k) { 888 return len(d.b) 889 } 890 891 return int(ofs) 892 } 893 894 // The key is not in the index. i is the index where it would be inserted so return 895 // a value outside our offset range. 896 return len(d.b) 897} 898 899// ContainsKey returns true of key may exist in this index. 900func (d *indirectIndex) ContainsKey(key []byte) bool { 901 return bytes.Compare(key, d.minKey) >= 0 && bytes.Compare(key, d.maxKey) <= 0 902} 903 904// Entries returns all index entries for a key. 905func (d *indirectIndex) Entries(key []byte) []IndexEntry { 906 return d.ReadEntries(key, nil) 907} 908 909func (d *indirectIndex) readEntriesAt(ofs int, entries *[]IndexEntry) ([]byte, []IndexEntry) { 910 n, k := readKey(d.b[ofs:]) 911 912 // Read and return all the entries 913 ofs += n 914 var ie indexEntries 915 if entries != nil { 916 ie.entries = *entries 917 } 918 if _, err := readEntries(d.b[ofs:], &ie); err != nil { 919 panic(fmt.Sprintf("error reading entries: %v", err)) 920 } 921 if entries != nil { 922 *entries = ie.entries 923 } 924 return k, ie.entries 925} 926 927// ReadEntries returns all index entries for a key. 928func (d *indirectIndex) ReadEntries(key []byte, entries *[]IndexEntry) []IndexEntry { 929 d.mu.RLock() 930 defer d.mu.RUnlock() 931 932 ofs := d.search(key) 933 if ofs < len(d.b) { 934 k, entries := d.readEntriesAt(ofs, entries) 935 // The search may have returned an i == 0 which could indicated that the value 936 // searched should be inserted at position 0. Make sure the key in the index 937 // matches the search value. 938 if !bytes.Equal(key, k) { 939 return nil 940 } 941 942 return entries 943 } 944 945 // The key is not in the index. i is the index where it would be inserted. 946 return nil 947} 948 949// Entry returns the index entry for the specified key and timestamp. If no entry 950// matches the key an timestamp, nil is returned. 951func (d *indirectIndex) Entry(key []byte, timestamp int64) *IndexEntry { 952 entries := d.Entries(key) 953 for _, entry := range entries { 954 if entry.Contains(timestamp) { 955 return &entry 956 } 957 } 958 return nil 959} 960 961// Key returns the key in the index at the given position. 962func (d *indirectIndex) Key(idx int, entries *[]IndexEntry) ([]byte, byte, []IndexEntry) { 963 d.mu.RLock() 964 defer d.mu.RUnlock() 965 966 if idx < 0 || idx*4+4 > len(d.offsets) { 967 return nil, 0, nil 968 } 969 ofs := binary.BigEndian.Uint32(d.offsets[idx*4 : idx*4+4]) 970 n, key := readKey(d.b[ofs:]) 971 972 typ := d.b[int(ofs)+n] 973 974 var ie indexEntries 975 if entries != nil { 976 ie.entries = *entries 977 } 978 if _, err := readEntries(d.b[int(ofs)+n:], &ie); err != nil { 979 return nil, 0, nil 980 } 981 if entries != nil { 982 *entries = ie.entries 983 } 984 985 return key, typ, ie.entries 986} 987 988// KeyAt returns the key in the index at the given position. 989func (d *indirectIndex) KeyAt(idx int) ([]byte, byte) { 990 d.mu.RLock() 991 992 if idx < 0 || idx*4+4 > len(d.offsets) { 993 d.mu.RUnlock() 994 return nil, 0 995 } 996 ofs := int32(binary.BigEndian.Uint32(d.offsets[idx*4 : idx*4+4])) 997 998 n, key := readKey(d.b[ofs:]) 999 ofs = ofs + int32(n) 1000 typ := d.b[ofs] 1001 d.mu.RUnlock() 1002 return key, typ 1003} 1004 1005// KeyCount returns the count of unique keys in the index. 1006func (d *indirectIndex) KeyCount() int { 1007 d.mu.RLock() 1008 n := len(d.offsets) / 4 1009 d.mu.RUnlock() 1010 return n 1011} 1012 1013// Delete removes the given keys from the index. 1014func (d *indirectIndex) Delete(keys [][]byte) { 1015 if len(keys) == 0 { 1016 return 1017 } 1018 1019 if !bytesutil.IsSorted(keys) { 1020 bytesutil.Sort(keys) 1021 } 1022 1023 // Both keys and offsets are sorted. Walk both in order and skip 1024 // any keys that exist in both. 1025 d.mu.Lock() 1026 start := d.searchOffset(keys[0]) 1027 for i := start * 4; i+4 <= len(d.offsets) && len(keys) > 0; i += 4 { 1028 offset := binary.BigEndian.Uint32(d.offsets[i : i+4]) 1029 _, indexKey := readKey(d.b[offset:]) 1030 1031 for len(keys) > 0 && bytes.Compare(keys[0], indexKey) < 0 { 1032 keys = keys[1:] 1033 } 1034 1035 if len(keys) > 0 && bytes.Equal(keys[0], indexKey) { 1036 keys = keys[1:] 1037 copy(d.offsets[i:i+4], nilOffset[:]) 1038 } 1039 } 1040 d.offsets = bytesutil.Pack(d.offsets, 4, 255) 1041 d.mu.Unlock() 1042} 1043 1044// DeleteRange removes the given keys with data between minTime and maxTime from the index. 1045func (d *indirectIndex) DeleteRange(keys [][]byte, minTime, maxTime int64) { 1046 // No keys, nothing to do 1047 if len(keys) == 0 { 1048 return 1049 } 1050 1051 if !bytesutil.IsSorted(keys) { 1052 bytesutil.Sort(keys) 1053 } 1054 1055 // If we're deleting the max time range, just use tombstoning to remove the 1056 // key from the offsets slice 1057 if minTime == math.MinInt64 && maxTime == math.MaxInt64 { 1058 d.Delete(keys) 1059 return 1060 } 1061 1062 // Is the range passed in outside of the time range for the file? 1063 min, max := d.TimeRange() 1064 if minTime > max || maxTime < min { 1065 return 1066 } 1067 1068 fullKeys := make([][]byte, 0, len(keys)) 1069 tombstones := map[string][]TimeRange{} 1070 var ie []IndexEntry 1071 1072 for i := 0; len(keys) > 0 && i < d.KeyCount(); i++ { 1073 k, entries := d.readEntriesAt(d.offset(i), &ie) 1074 1075 // Skip any keys that don't exist. These are less than the current key. 1076 for len(keys) > 0 && bytes.Compare(keys[0], k) < 0 { 1077 keys = keys[1:] 1078 } 1079 1080 // No more keys to delete, we're done. 1081 if len(keys) == 0 { 1082 break 1083 } 1084 1085 // If the current key is greater than the index one, continue to the next 1086 // index key. 1087 if len(keys) > 0 && bytes.Compare(keys[0], k) > 0 { 1088 continue 1089 } 1090 1091 // If multiple tombstones are saved for the same key 1092 if len(entries) == 0 { 1093 continue 1094 } 1095 1096 // Is the time range passed outside of the time range we've have stored for this key? 1097 min, max := entries[0].MinTime, entries[len(entries)-1].MaxTime 1098 if minTime > max || maxTime < min { 1099 continue 1100 } 1101 1102 // Does the range passed in cover every value for the key? 1103 if minTime <= min && maxTime >= max { 1104 fullKeys = append(fullKeys, keys[0]) 1105 keys = keys[1:] 1106 continue 1107 } 1108 1109 d.mu.RLock() 1110 existing := d.tombstones[string(k)] 1111 d.mu.RUnlock() 1112 1113 // Append the new tombonstes to the existing ones 1114 newTs := append(existing, append(tombstones[string(k)], TimeRange{minTime, maxTime})...) 1115 fn := func(i, j int) bool { 1116 a, b := newTs[i], newTs[j] 1117 if a.Min == b.Min { 1118 return a.Max <= b.Max 1119 } 1120 return a.Min < b.Min 1121 } 1122 1123 // Sort the updated tombstones if necessary 1124 if len(newTs) > 1 && !sort.SliceIsSorted(newTs, fn) { 1125 sort.Slice(newTs, fn) 1126 } 1127 1128 tombstones[string(k)] = newTs 1129 1130 // We need to see if all the tombstones end up deleting the entire series. This 1131 // could happen if their is one tombstore with min,max time spanning all the block 1132 // time ranges or from multiple smaller tombstones the delete segments. To detect 1133 // this cases, we use a window starting at the first tombstone and grow it be each 1134 // tombstone that is immediately adjacent to the current window or if it overlaps. 1135 // If there are any gaps, we abort. 1136 minTs, maxTs := newTs[0].Min, newTs[0].Max 1137 for j := 1; j < len(newTs); j++ { 1138 prevTs := newTs[j-1] 1139 ts := newTs[j] 1140 1141 // Make sure all the tombstone line up for a continuous range. We don't 1142 // want to have two small deletes on each edges end up causing us to 1143 // remove the full key. 1144 if prevTs.Max != ts.Min-1 && !prevTs.Overlaps(ts.Min, ts.Max) { 1145 minTs, maxTs = int64(math.MaxInt64), int64(math.MinInt64) 1146 break 1147 } 1148 1149 if ts.Min < minTs { 1150 minTs = ts.Min 1151 } 1152 if ts.Max > maxTs { 1153 maxTs = ts.Max 1154 } 1155 } 1156 1157 // If we have a fully deleted series, delete it all of it. 1158 if minTs <= min && maxTs >= max { 1159 fullKeys = append(fullKeys, keys[0]) 1160 keys = keys[1:] 1161 continue 1162 } 1163 } 1164 1165 // Delete all the keys that fully deleted in bulk 1166 if len(fullKeys) > 0 { 1167 d.Delete(fullKeys) 1168 } 1169 1170 if len(tombstones) == 0 { 1171 return 1172 } 1173 1174 d.mu.Lock() 1175 for k, v := range tombstones { 1176 d.tombstones[k] = v 1177 } 1178 d.mu.Unlock() 1179} 1180 1181// TombstoneRange returns ranges of time that are deleted for the given key. 1182func (d *indirectIndex) TombstoneRange(key []byte) []TimeRange { 1183 d.mu.RLock() 1184 r := d.tombstones[string(key)] 1185 d.mu.RUnlock() 1186 return r 1187} 1188 1189// Contains return true if the given key exists in the index. 1190func (d *indirectIndex) Contains(key []byte) bool { 1191 return len(d.Entries(key)) > 0 1192} 1193 1194// ContainsValue returns true if key and time might exist in this file. 1195func (d *indirectIndex) ContainsValue(key []byte, timestamp int64) bool { 1196 entry := d.Entry(key, timestamp) 1197 if entry == nil { 1198 return false 1199 } 1200 1201 d.mu.RLock() 1202 tombstones := d.tombstones[string(key)] 1203 d.mu.RUnlock() 1204 1205 for _, t := range tombstones { 1206 if t.Min <= timestamp && t.Max >= timestamp { 1207 return false 1208 } 1209 } 1210 return true 1211} 1212 1213// Type returns the block type of the values stored for the key. 1214func (d *indirectIndex) Type(key []byte) (byte, error) { 1215 d.mu.RLock() 1216 defer d.mu.RUnlock() 1217 1218 ofs := d.search(key) 1219 if ofs < len(d.b) { 1220 n, _ := readKey(d.b[ofs:]) 1221 ofs += n 1222 return d.b[ofs], nil 1223 } 1224 return 0, fmt.Errorf("key does not exist: %s", key) 1225} 1226 1227// OverlapsTimeRange returns true if the time range of the file intersect min and max. 1228func (d *indirectIndex) OverlapsTimeRange(min, max int64) bool { 1229 return d.minTime <= max && d.maxTime >= min 1230} 1231 1232// OverlapsKeyRange returns true if the min and max keys of the file overlap the arguments min and max. 1233func (d *indirectIndex) OverlapsKeyRange(min, max []byte) bool { 1234 return bytes.Compare(d.minKey, max) <= 0 && bytes.Compare(d.maxKey, min) >= 0 1235} 1236 1237// KeyRange returns the min and max keys in the index. 1238func (d *indirectIndex) KeyRange() ([]byte, []byte) { 1239 return d.minKey, d.maxKey 1240} 1241 1242// TimeRange returns the min and max time across all keys in the index. 1243func (d *indirectIndex) TimeRange() (int64, int64) { 1244 return d.minTime, d.maxTime 1245} 1246 1247// MarshalBinary returns a byte slice encoded version of the index. 1248func (d *indirectIndex) MarshalBinary() ([]byte, error) { 1249 d.mu.RLock() 1250 defer d.mu.RUnlock() 1251 1252 return d.b, nil 1253} 1254 1255// UnmarshalBinary populates an index from an encoded byte slice 1256// representation of an index. 1257func (d *indirectIndex) UnmarshalBinary(b []byte) error { 1258 d.mu.Lock() 1259 defer d.mu.Unlock() 1260 1261 // Keep a reference to the actual index bytes 1262 d.b = b 1263 if len(b) == 0 { 1264 return nil 1265 } 1266 1267 //var minKey, maxKey []byte 1268 var minTime, maxTime int64 = math.MaxInt64, 0 1269 1270 // To create our "indirect" index, we need to find the location of all the keys in 1271 // the raw byte slice. The keys are listed once each (in sorted order). Following 1272 // each key is a time ordered list of index entry blocks for that key. The loop below 1273 // basically skips across the slice keeping track of the counter when we are at a key 1274 // field. 1275 var i int32 1276 var offsets []int32 1277 iMax := int32(len(b)) 1278 for i < iMax { 1279 offsets = append(offsets, i) 1280 1281 // Skip to the start of the values 1282 // key length value (2) + type (1) + length of key 1283 if i+2 >= iMax { 1284 return fmt.Errorf("indirectIndex: not enough data for key length value") 1285 } 1286 i += 3 + int32(binary.BigEndian.Uint16(b[i:i+2])) 1287 1288 // count of index entries 1289 if i+indexCountSize >= iMax { 1290 return fmt.Errorf("indirectIndex: not enough data for index entries count") 1291 } 1292 count := int32(binary.BigEndian.Uint16(b[i : i+indexCountSize])) 1293 i += indexCountSize 1294 1295 // Find the min time for the block 1296 if i+8 >= iMax { 1297 return fmt.Errorf("indirectIndex: not enough data for min time") 1298 } 1299 minT := int64(binary.BigEndian.Uint64(b[i : i+8])) 1300 if minT < minTime { 1301 minTime = minT 1302 } 1303 1304 i += (count - 1) * indexEntrySize 1305 1306 // Find the max time for the block 1307 if i+16 >= iMax { 1308 return fmt.Errorf("indirectIndex: not enough data for max time") 1309 } 1310 maxT := int64(binary.BigEndian.Uint64(b[i+8 : i+16])) 1311 if maxT > maxTime { 1312 maxTime = maxT 1313 } 1314 1315 i += indexEntrySize 1316 } 1317 1318 firstOfs := offsets[0] 1319 _, key := readKey(b[firstOfs:]) 1320 d.minKey = key 1321 1322 lastOfs := offsets[len(offsets)-1] 1323 _, key = readKey(b[lastOfs:]) 1324 d.maxKey = key 1325 1326 d.minTime = minTime 1327 d.maxTime = maxTime 1328 1329 var err error 1330 d.offsets, err = mmap(nil, 0, len(offsets)*4) 1331 if err != nil { 1332 return err 1333 } 1334 for i, v := range offsets { 1335 binary.BigEndian.PutUint32(d.offsets[i*4:i*4+4], uint32(v)) 1336 } 1337 1338 return nil 1339} 1340 1341// Size returns the size of the current index in bytes. 1342func (d *indirectIndex) Size() uint32 { 1343 d.mu.RLock() 1344 defer d.mu.RUnlock() 1345 1346 return uint32(len(d.b)) 1347} 1348 1349func (d *indirectIndex) Close() error { 1350 // Windows doesn't use the anonymous map for the offsets index 1351 if runtime.GOOS == "windows" { 1352 return nil 1353 } 1354 return munmap(d.offsets[:cap(d.offsets)]) 1355} 1356 1357// mmapAccess is mmap based block accessor. It access blocks through an 1358// MMAP file interface. 1359type mmapAccessor struct { 1360 accessCount uint64 // Counter incremented everytime the mmapAccessor is accessed 1361 freeCount uint64 // Counter to determine whether the accessor can free its resources 1362 1363 mmapWillNeed bool // If true then mmap advise value MADV_WILLNEED will be provided the kernel for b. 1364 1365 mu sync.RWMutex 1366 b []byte 1367 f *os.File 1368 1369 index *indirectIndex 1370} 1371 1372func (m *mmapAccessor) init() (*indirectIndex, error) { 1373 m.mu.Lock() 1374 defer m.mu.Unlock() 1375 1376 if err := verifyVersion(m.f); err != nil { 1377 return nil, err 1378 } 1379 1380 var err error 1381 1382 if _, err := m.f.Seek(0, 0); err != nil { 1383 return nil, err 1384 } 1385 1386 stat, err := m.f.Stat() 1387 if err != nil { 1388 return nil, err 1389 } 1390 1391 m.b, err = mmap(m.f, 0, int(stat.Size())) 1392 if err != nil { 1393 return nil, err 1394 } 1395 if len(m.b) < 8 { 1396 return nil, fmt.Errorf("mmapAccessor: byte slice too small for indirectIndex") 1397 } 1398 1399 // Hint to the kernel that we will be reading the file. It would be better to hint 1400 // that we will be reading the index section, but that's not been 1401 // implemented as yet. 1402 if m.mmapWillNeed { 1403 if err := madviseWillNeed(m.b); err != nil { 1404 return nil, err 1405 } 1406 } 1407 1408 indexOfsPos := len(m.b) - 8 1409 indexStart := binary.BigEndian.Uint64(m.b[indexOfsPos : indexOfsPos+8]) 1410 if indexStart >= uint64(indexOfsPos) { 1411 return nil, fmt.Errorf("mmapAccessor: invalid indexStart") 1412 } 1413 1414 m.index = NewIndirectIndex() 1415 if err := m.index.UnmarshalBinary(m.b[indexStart:indexOfsPos]); err != nil { 1416 return nil, err 1417 } 1418 1419 // Allow resources to be freed immediately if requested 1420 m.incAccess() 1421 atomic.StoreUint64(&m.freeCount, 1) 1422 1423 return m.index, nil 1424} 1425 1426func (m *mmapAccessor) free() error { 1427 accessCount := atomic.LoadUint64(&m.accessCount) 1428 freeCount := atomic.LoadUint64(&m.freeCount) 1429 1430 // Already freed everything. 1431 if freeCount == 0 && accessCount == 0 { 1432 return nil 1433 } 1434 1435 // Were there accesses after the last time we tried to free? 1436 // If so, don't free anything and record the access count that we 1437 // see now for the next check. 1438 if accessCount != freeCount { 1439 atomic.StoreUint64(&m.freeCount, accessCount) 1440 return nil 1441 } 1442 1443 // Reset both counters to zero to indicate that we have freed everything. 1444 atomic.StoreUint64(&m.accessCount, 0) 1445 atomic.StoreUint64(&m.freeCount, 0) 1446 1447 m.mu.RLock() 1448 defer m.mu.RUnlock() 1449 1450 return madviseDontNeed(m.b) 1451} 1452 1453func (m *mmapAccessor) incAccess() { 1454 atomic.AddUint64(&m.accessCount, 1) 1455} 1456 1457func (m *mmapAccessor) rename(path string) error { 1458 m.incAccess() 1459 1460 m.mu.Lock() 1461 defer m.mu.Unlock() 1462 1463 err := munmap(m.b) 1464 if err != nil { 1465 return err 1466 } 1467 1468 if err := m.f.Close(); err != nil { 1469 return err 1470 } 1471 1472 if err := renameFile(m.f.Name(), path); err != nil { 1473 return err 1474 } 1475 1476 m.f, err = os.Open(path) 1477 if err != nil { 1478 return err 1479 } 1480 1481 if _, err := m.f.Seek(0, 0); err != nil { 1482 return err 1483 } 1484 1485 stat, err := m.f.Stat() 1486 if err != nil { 1487 return err 1488 } 1489 1490 m.b, err = mmap(m.f, 0, int(stat.Size())) 1491 if err != nil { 1492 return err 1493 } 1494 1495 if m.mmapWillNeed { 1496 return madviseWillNeed(m.b) 1497 } 1498 return nil 1499} 1500 1501func (m *mmapAccessor) read(key []byte, timestamp int64) ([]Value, error) { 1502 entry := m.index.Entry(key, timestamp) 1503 if entry == nil { 1504 return nil, nil 1505 } 1506 1507 return m.readBlock(entry, nil) 1508} 1509 1510func (m *mmapAccessor) readBlock(entry *IndexEntry, values []Value) ([]Value, error) { 1511 m.incAccess() 1512 1513 m.mu.RLock() 1514 defer m.mu.RUnlock() 1515 1516 if int64(len(m.b)) < entry.Offset+int64(entry.Size) { 1517 return nil, ErrTSMClosed 1518 } 1519 //TODO: Validate checksum 1520 var err error 1521 values, err = DecodeBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values) 1522 if err != nil { 1523 return nil, err 1524 } 1525 1526 return values, nil 1527} 1528 1529func (m *mmapAccessor) readFloatBlock(entry *IndexEntry, values *[]FloatValue) ([]FloatValue, error) { 1530 m.incAccess() 1531 1532 m.mu.RLock() 1533 if int64(len(m.b)) < entry.Offset+int64(entry.Size) { 1534 m.mu.RUnlock() 1535 return nil, ErrTSMClosed 1536 } 1537 1538 a, err := DecodeFloatBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values) 1539 m.mu.RUnlock() 1540 1541 if err != nil { 1542 return nil, err 1543 } 1544 1545 return a, nil 1546} 1547 1548func (m *mmapAccessor) readIntegerBlock(entry *IndexEntry, values *[]IntegerValue) ([]IntegerValue, error) { 1549 m.incAccess() 1550 1551 m.mu.RLock() 1552 if int64(len(m.b)) < entry.Offset+int64(entry.Size) { 1553 m.mu.RUnlock() 1554 return nil, ErrTSMClosed 1555 } 1556 1557 a, err := DecodeIntegerBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values) 1558 m.mu.RUnlock() 1559 1560 if err != nil { 1561 return nil, err 1562 } 1563 1564 return a, nil 1565} 1566 1567func (m *mmapAccessor) readUnsignedBlock(entry *IndexEntry, values *[]UnsignedValue) ([]UnsignedValue, error) { 1568 m.incAccess() 1569 1570 m.mu.RLock() 1571 if int64(len(m.b)) < entry.Offset+int64(entry.Size) { 1572 m.mu.RUnlock() 1573 return nil, ErrTSMClosed 1574 } 1575 1576 a, err := DecodeUnsignedBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values) 1577 m.mu.RUnlock() 1578 1579 if err != nil { 1580 return nil, err 1581 } 1582 1583 return a, nil 1584} 1585 1586func (m *mmapAccessor) readStringBlock(entry *IndexEntry, values *[]StringValue) ([]StringValue, error) { 1587 m.incAccess() 1588 1589 m.mu.RLock() 1590 if int64(len(m.b)) < entry.Offset+int64(entry.Size) { 1591 m.mu.RUnlock() 1592 return nil, ErrTSMClosed 1593 } 1594 1595 a, err := DecodeStringBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values) 1596 m.mu.RUnlock() 1597 1598 if err != nil { 1599 return nil, err 1600 } 1601 1602 return a, nil 1603} 1604 1605func (m *mmapAccessor) readBooleanBlock(entry *IndexEntry, values *[]BooleanValue) ([]BooleanValue, error) { 1606 m.incAccess() 1607 1608 m.mu.RLock() 1609 if int64(len(m.b)) < entry.Offset+int64(entry.Size) { 1610 m.mu.RUnlock() 1611 return nil, ErrTSMClosed 1612 } 1613 1614 a, err := DecodeBooleanBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values) 1615 m.mu.RUnlock() 1616 1617 if err != nil { 1618 return nil, err 1619 } 1620 1621 return a, nil 1622} 1623 1624func (m *mmapAccessor) readBytes(entry *IndexEntry, b []byte) (uint32, []byte, error) { 1625 m.incAccess() 1626 1627 m.mu.RLock() 1628 if int64(len(m.b)) < entry.Offset+int64(entry.Size) { 1629 m.mu.RUnlock() 1630 return 0, nil, ErrTSMClosed 1631 } 1632 1633 // return the bytes after the 4 byte checksum 1634 crc, block := binary.BigEndian.Uint32(m.b[entry.Offset:entry.Offset+4]), m.b[entry.Offset+4:entry.Offset+int64(entry.Size)] 1635 m.mu.RUnlock() 1636 1637 return crc, block, nil 1638} 1639 1640// readAll returns all values for a key in all blocks. 1641func (m *mmapAccessor) readAll(key []byte) ([]Value, error) { 1642 m.incAccess() 1643 1644 blocks := m.index.Entries(key) 1645 if len(blocks) == 0 { 1646 return nil, nil 1647 } 1648 1649 tombstones := m.index.TombstoneRange(key) 1650 1651 m.mu.RLock() 1652 defer m.mu.RUnlock() 1653 1654 var temp []Value 1655 var err error 1656 var values []Value 1657 for _, block := range blocks { 1658 var skip bool 1659 for _, t := range tombstones { 1660 // Should we skip this block because it contains points that have been deleted 1661 if t.Min <= block.MinTime && t.Max >= block.MaxTime { 1662 skip = true 1663 break 1664 } 1665 } 1666 1667 if skip { 1668 continue 1669 } 1670 //TODO: Validate checksum 1671 temp = temp[:0] 1672 // The +4 is the 4 byte checksum length 1673 temp, err = DecodeBlock(m.b[block.Offset+4:block.Offset+int64(block.Size)], temp) 1674 if err != nil { 1675 return nil, err 1676 } 1677 1678 // Filter out any values that were deleted 1679 for _, t := range tombstones { 1680 temp = Values(temp).Exclude(t.Min, t.Max) 1681 } 1682 1683 values = append(values, temp...) 1684 } 1685 1686 return values, nil 1687} 1688 1689func (m *mmapAccessor) path() string { 1690 m.mu.RLock() 1691 path := m.f.Name() 1692 m.mu.RUnlock() 1693 return path 1694} 1695 1696func (m *mmapAccessor) close() error { 1697 m.mu.Lock() 1698 defer m.mu.Unlock() 1699 1700 if m.b == nil { 1701 return nil 1702 } 1703 1704 err := munmap(m.b) 1705 if err != nil { 1706 return err 1707 } 1708 1709 m.b = nil 1710 return m.f.Close() 1711} 1712 1713type indexEntries struct { 1714 Type byte 1715 entries []IndexEntry 1716} 1717 1718func (a *indexEntries) Len() int { return len(a.entries) } 1719func (a *indexEntries) Swap(i, j int) { a.entries[i], a.entries[j] = a.entries[j], a.entries[i] } 1720func (a *indexEntries) Less(i, j int) bool { 1721 return a.entries[i].MinTime < a.entries[j].MinTime 1722} 1723 1724func (a *indexEntries) MarshalBinary() ([]byte, error) { 1725 buf := make([]byte, len(a.entries)*indexEntrySize) 1726 1727 for i, entry := range a.entries { 1728 entry.AppendTo(buf[indexEntrySize*i:]) 1729 } 1730 1731 return buf, nil 1732} 1733 1734func (a *indexEntries) WriteTo(w io.Writer) (total int64, err error) { 1735 var buf [indexEntrySize]byte 1736 var n int 1737 1738 for _, entry := range a.entries { 1739 entry.AppendTo(buf[:]) 1740 n, err = w.Write(buf[:]) 1741 total += int64(n) 1742 if err != nil { 1743 return total, err 1744 } 1745 } 1746 1747 return total, nil 1748} 1749 1750func readKey(b []byte) (n int, key []byte) { 1751 // 2 byte size of key 1752 n, size := 2, int(binary.BigEndian.Uint16(b[:2])) 1753 1754 // N byte key 1755 key = b[n : n+size] 1756 1757 n += len(key) 1758 return 1759} 1760 1761func readEntries(b []byte, entries *indexEntries) (n int, err error) { 1762 if len(b) < 1+indexCountSize { 1763 return 0, fmt.Errorf("readEntries: data too short for headers") 1764 } 1765 1766 // 1 byte block type 1767 entries.Type = b[n] 1768 n++ 1769 1770 // 2 byte count of index entries 1771 count := int(binary.BigEndian.Uint16(b[n : n+indexCountSize])) 1772 n += indexCountSize 1773 1774 if cap(entries.entries) < count { 1775 entries.entries = make([]IndexEntry, count) 1776 } else { 1777 entries.entries = entries.entries[:count] 1778 } 1779 1780 b = b[indexCountSize+indexTypeSize:] 1781 for i := 0; i < len(entries.entries); i++ { 1782 if err = entries.entries[i].UnmarshalBinary(b); err != nil { 1783 return 0, fmt.Errorf("readEntries: unmarshal error: %v", err) 1784 } 1785 b = b[indexEntrySize:] 1786 } 1787 1788 n += count * indexEntrySize 1789 1790 return 1791} 1792