1package tsm1
2
3import (
4	"bytes"
5	"encoding/binary"
6	"fmt"
7	"io"
8	"math"
9	"os"
10	"runtime"
11	"sort"
12	"sync"
13	"sync/atomic"
14
15	"github.com/influxdata/influxdb/pkg/bytesutil"
16	"github.com/influxdata/influxdb/pkg/file"
17	"github.com/influxdata/influxdb/tsdb"
18)
19
20// ErrFileInUse is returned when attempting to remove or close a TSM file that is still being used.
21var ErrFileInUse = fmt.Errorf("file still in use")
22
23// nilOffset is the value written to the offsets to indicate that position is deleted.  The value is the max
24// uint32 which is an invalid position.  We don't use 0 as 0 is actually a valid position.
25var nilOffset = []byte{255, 255, 255, 255}
26
27// TSMReader is a reader for a TSM file.
28type TSMReader struct {
29	// refs is the count of active references to this reader.
30	refs   int64
31	refsWG sync.WaitGroup
32
33	madviseWillNeed bool // Hint to the kernel with MADV_WILLNEED.
34	mu              sync.RWMutex
35
36	// accessor provides access and decoding of blocks for the reader.
37	accessor blockAccessor
38
39	// index is the index of all blocks.
40	index TSMIndex
41
42	// tombstoner ensures tombstoned keys are not available by the index.
43	tombstoner *Tombstoner
44
45	// size is the size of the file on disk.
46	size int64
47
48	// lastModified is the last time this file was modified on disk
49	lastModified int64
50
51	// deleteMu limits concurrent deletes
52	deleteMu sync.Mutex
53}
54
55// TSMIndex represent the index section of a TSM file.  The index records all
56// blocks, their locations, sizes, min and max times.
57type TSMIndex interface {
58	// Delete removes the given keys from the index.
59	Delete(keys [][]byte)
60
61	// DeleteRange removes the given keys with data between minTime and maxTime from the index.
62	DeleteRange(keys [][]byte, minTime, maxTime int64)
63
64	// ContainsKey returns true if the given key may exist in the index.  This func is faster than
65	// Contains but, may return false positives.
66	ContainsKey(key []byte) bool
67
68	// Contains return true if the given key exists in the index.
69	Contains(key []byte) bool
70
71	// ContainsValue returns true if key and time might exist in this file.  This function could
72	// return true even though the actual point does not exists.  For example, the key may
73	// exist in this file, but not have a point exactly at time t.
74	ContainsValue(key []byte, timestamp int64) bool
75
76	// Entries returns all index entries for a key.
77	Entries(key []byte) []IndexEntry
78
79	// ReadEntries reads the index entries for key into entries.
80	ReadEntries(key []byte, entries *[]IndexEntry) []IndexEntry
81
82	// Entry returns the index entry for the specified key and timestamp.  If no entry
83	// matches the key and timestamp, nil is returned.
84	Entry(key []byte, timestamp int64) *IndexEntry
85
86	// Key returns the key in the index at the given position, using entries to avoid allocations.
87	Key(index int, entries *[]IndexEntry) ([]byte, byte, []IndexEntry)
88
89	// KeyAt returns the key in the index at the given position.
90	KeyAt(index int) ([]byte, byte)
91
92	// KeyCount returns the count of unique keys in the index.
93	KeyCount() int
94
95	// Seek returns the position in the index where key <= value in the index.
96	Seek(key []byte) int
97
98	// OverlapsTimeRange returns true if the time range of the file intersect min and max.
99	OverlapsTimeRange(min, max int64) bool
100
101	// OverlapsKeyRange returns true if the min and max keys of the file overlap the arguments min and max.
102	OverlapsKeyRange(min, max []byte) bool
103
104	// Size returns the size of the current index in bytes.
105	Size() uint32
106
107	// TimeRange returns the min and max time across all keys in the file.
108	TimeRange() (int64, int64)
109
110	// TombstoneRange returns ranges of time that are deleted for the given key.
111	TombstoneRange(key []byte) []TimeRange
112
113	// KeyRange returns the min and max keys in the file.
114	KeyRange() ([]byte, []byte)
115
116	// Type returns the block type of the values stored for the key.  Returns one of
117	// BlockFloat64, BlockInt64, BlockBool, BlockString.  If key does not exist,
118	// an error is returned.
119	Type(key []byte) (byte, error)
120
121	// UnmarshalBinary populates an index from an encoded byte slice
122	// representation of an index.
123	UnmarshalBinary(b []byte) error
124
125	// Close closes the index and releases any resources.
126	Close() error
127}
128
129// BlockIterator allows iterating over each block in a TSM file in order.  It provides
130// raw access to the block bytes without decoding them.
131type BlockIterator struct {
132	r *TSMReader
133
134	// i is the current key index
135	i int
136
137	// n is the total number of keys
138	n int
139
140	key     []byte
141	cache   []IndexEntry
142	entries []IndexEntry
143	err     error
144	typ     byte
145}
146
147// PeekNext returns the next key to be iterated or an empty string.
148func (b *BlockIterator) PeekNext() []byte {
149	if len(b.entries) > 1 {
150		return b.key
151	} else if b.n-b.i > 1 {
152		key, _ := b.r.KeyAt(b.i + 1)
153		return key
154	}
155	return nil
156}
157
158// Next returns true if there are more blocks to iterate through.
159func (b *BlockIterator) Next() bool {
160	if b.err != nil {
161		return false
162	}
163
164	if b.n-b.i == 0 && len(b.entries) == 0 {
165		return false
166	}
167
168	if len(b.entries) > 0 {
169		b.entries = b.entries[1:]
170		if len(b.entries) > 0 {
171			return true
172		}
173	}
174
175	if b.n-b.i > 0 {
176		b.key, b.typ, b.entries = b.r.Key(b.i, &b.cache)
177		b.i++
178
179		// If there were deletes on the TSMReader, then our index is now off and we
180		// can't proceed.  What we just read may not actually the next block.
181		if b.n != b.r.KeyCount() {
182			b.err = fmt.Errorf("delete during iteration")
183			return false
184		}
185
186		if len(b.entries) > 0 {
187			return true
188		}
189	}
190
191	return false
192}
193
194// Read reads information about the next block to be iterated.
195func (b *BlockIterator) Read() (key []byte, minTime int64, maxTime int64, typ byte, checksum uint32, buf []byte, err error) {
196	if b.err != nil {
197		return nil, 0, 0, 0, 0, nil, b.err
198	}
199	checksum, buf, err = b.r.ReadBytes(&b.entries[0], nil)
200	if err != nil {
201		return nil, 0, 0, 0, 0, nil, err
202	}
203	return b.key, b.entries[0].MinTime, b.entries[0].MaxTime, b.typ, checksum, buf, err
204}
205
206// Err returns any errors encounter during iteration.
207func (b *BlockIterator) Err() error {
208	return b.err
209}
210
211type tsmReaderOption func(*TSMReader)
212
213// WithMadviseWillNeed is an option for specifying whether to provide a MADV_WILL need hint to the kernel.
214var WithMadviseWillNeed = func(willNeed bool) tsmReaderOption {
215	return func(r *TSMReader) {
216		r.madviseWillNeed = willNeed
217	}
218}
219
220// NewTSMReader returns a new TSMReader from the given file.
221func NewTSMReader(f *os.File, options ...tsmReaderOption) (*TSMReader, error) {
222	t := &TSMReader{}
223	for _, option := range options {
224		option(t)
225	}
226
227	stat, err := f.Stat()
228	if err != nil {
229		return nil, err
230	}
231	t.size = stat.Size()
232	t.lastModified = stat.ModTime().UnixNano()
233	t.accessor = &mmapAccessor{
234		f:            f,
235		mmapWillNeed: t.madviseWillNeed,
236	}
237
238	index, err := t.accessor.init()
239	if err != nil {
240		return nil, err
241	}
242
243	t.index = index
244	t.tombstoner = NewTombstoner(t.Path(), index.ContainsKey)
245
246	if err := t.applyTombstones(); err != nil {
247		return nil, err
248	}
249
250	return t, nil
251}
252
253// WithObserver sets the observer for the TSM reader.
254func (t *TSMReader) WithObserver(obs tsdb.FileStoreObserver) {
255	t.tombstoner.WithObserver(obs)
256}
257
258func (t *TSMReader) applyTombstones() error {
259	var cur, prev Tombstone
260	batch := make([][]byte, 0, 4096)
261
262	if err := t.tombstoner.Walk(func(ts Tombstone) error {
263		cur = ts
264		if len(batch) > 0 {
265			if prev.Min != cur.Min || prev.Max != cur.Max {
266				t.index.DeleteRange(batch, prev.Min, prev.Max)
267				batch = batch[:0]
268			}
269		}
270
271		// Copy the tombstone key and re-use the buffers to avoid allocations
272		n := len(batch)
273		batch = batch[:n+1]
274		if cap(batch[n]) < len(ts.Key) {
275			batch[n] = make([]byte, len(ts.Key))
276		} else {
277			batch[n] = batch[n][:len(ts.Key)]
278		}
279		copy(batch[n], ts.Key)
280
281		if len(batch) >= 4096 {
282			t.index.DeleteRange(batch, prev.Min, prev.Max)
283			batch = batch[:0]
284		}
285
286		prev = ts
287		return nil
288	}); err != nil {
289		return fmt.Errorf("init: read tombstones: %v", err)
290	}
291
292	if len(batch) > 0 {
293		t.index.DeleteRange(batch, cur.Min, cur.Max)
294	}
295	return nil
296}
297
298func (t *TSMReader) Free() error {
299	t.mu.RLock()
300	defer t.mu.RUnlock()
301	return t.accessor.free()
302}
303
304// Path returns the path of the file the TSMReader was initialized with.
305func (t *TSMReader) Path() string {
306	t.mu.RLock()
307	p := t.accessor.path()
308	t.mu.RUnlock()
309	return p
310}
311
312// Key returns the key and the underlying entry at the numeric index.
313func (t *TSMReader) Key(index int, entries *[]IndexEntry) ([]byte, byte, []IndexEntry) {
314	return t.index.Key(index, entries)
315}
316
317// KeyAt returns the key and key type at position idx in the index.
318func (t *TSMReader) KeyAt(idx int) ([]byte, byte) {
319	return t.index.KeyAt(idx)
320}
321
322func (t *TSMReader) Seek(key []byte) int {
323	return t.index.Seek(key)
324}
325
326// ReadAt returns the values corresponding to the given index entry.
327func (t *TSMReader) ReadAt(entry *IndexEntry, vals []Value) ([]Value, error) {
328	t.mu.RLock()
329	v, err := t.accessor.readBlock(entry, vals)
330	t.mu.RUnlock()
331	return v, err
332}
333
334// Read returns the values corresponding to the block at the given key and timestamp.
335func (t *TSMReader) Read(key []byte, timestamp int64) ([]Value, error) {
336	t.mu.RLock()
337	v, err := t.accessor.read(key, timestamp)
338	t.mu.RUnlock()
339	return v, err
340}
341
342// ReadAll returns all values for a key in all blocks.
343func (t *TSMReader) ReadAll(key []byte) ([]Value, error) {
344	t.mu.RLock()
345	v, err := t.accessor.readAll(key)
346	t.mu.RUnlock()
347	return v, err
348}
349
350func (t *TSMReader) ReadBytes(e *IndexEntry, b []byte) (uint32, []byte, error) {
351	t.mu.RLock()
352	n, v, err := t.accessor.readBytes(e, b)
353	t.mu.RUnlock()
354	return n, v, err
355}
356
357// Type returns the type of values stored at the given key.
358func (t *TSMReader) Type(key []byte) (byte, error) {
359	return t.index.Type(key)
360}
361
362// Close closes the TSMReader.
363func (t *TSMReader) Close() error {
364	t.refsWG.Wait()
365
366	t.mu.Lock()
367	defer t.mu.Unlock()
368
369	if err := t.accessor.close(); err != nil {
370		return err
371	}
372
373	return t.index.Close()
374}
375
376// Ref records a usage of this TSMReader.  If there are active references
377// when the reader is closed or removed, the reader will remain open until
378// there are no more references.
379func (t *TSMReader) Ref() {
380	atomic.AddInt64(&t.refs, 1)
381	t.refsWG.Add(1)
382}
383
384// Unref removes a usage record of this TSMReader.  If the Reader was closed
385// by another goroutine while there were active references, the file will
386// be closed and remove
387func (t *TSMReader) Unref() {
388	atomic.AddInt64(&t.refs, -1)
389	t.refsWG.Done()
390}
391
392// InUse returns whether the TSMReader currently has any active references.
393func (t *TSMReader) InUse() bool {
394	refs := atomic.LoadInt64(&t.refs)
395	return refs > 0
396}
397
398// Remove removes any underlying files stored on disk for this reader.
399func (t *TSMReader) Remove() error {
400	t.mu.Lock()
401	defer t.mu.Unlock()
402	return t.remove()
403}
404
405// Rename renames the underlying file to the new path.
406func (t *TSMReader) Rename(path string) error {
407	t.mu.Lock()
408	defer t.mu.Unlock()
409	return t.accessor.rename(path)
410}
411
412// Remove removes any underlying files stored on disk for this reader.
413func (t *TSMReader) remove() error {
414	path := t.accessor.path()
415
416	if t.InUse() {
417		return ErrFileInUse
418	}
419
420	if path != "" {
421		err := os.RemoveAll(path)
422		if err != nil {
423			return err
424		}
425	}
426
427	if err := t.tombstoner.Delete(); err != nil {
428		return err
429	}
430	return nil
431}
432
433// Contains returns whether the given key is present in the index.
434func (t *TSMReader) Contains(key []byte) bool {
435	return t.index.Contains(key)
436}
437
438// ContainsValue returns true if key and time might exists in this file.  This function could
439// return true even though the actual point does not exist.  For example, the key may
440// exist in this file, but not have a point exactly at time t.
441func (t *TSMReader) ContainsValue(key []byte, ts int64) bool {
442	return t.index.ContainsValue(key, ts)
443}
444
445// DeleteRange removes the given points for keys between minTime and maxTime.   The series
446// keys passed in must be sorted.
447func (t *TSMReader) DeleteRange(keys [][]byte, minTime, maxTime int64) error {
448	if len(keys) == 0 {
449		return nil
450	}
451
452	batch := t.BatchDelete()
453	if err := batch.DeleteRange(keys, minTime, maxTime); err != nil {
454		batch.Rollback()
455		return err
456	}
457	return batch.Commit()
458}
459
460// Delete deletes blocks indicated by keys.
461func (t *TSMReader) Delete(keys [][]byte) error {
462	if err := t.tombstoner.Add(keys); err != nil {
463		return err
464	}
465
466	if err := t.tombstoner.Flush(); err != nil {
467		return err
468	}
469
470	t.index.Delete(keys)
471	return nil
472}
473
474// OverlapsTimeRange returns true if the time range of the file intersect min and max.
475func (t *TSMReader) OverlapsTimeRange(min, max int64) bool {
476	return t.index.OverlapsTimeRange(min, max)
477}
478
479// OverlapsKeyRange returns true if the key range of the file intersect min and max.
480func (t *TSMReader) OverlapsKeyRange(min, max []byte) bool {
481	return t.index.OverlapsKeyRange(min, max)
482}
483
484// TimeRange returns the min and max time across all keys in the file.
485func (t *TSMReader) TimeRange() (int64, int64) {
486	return t.index.TimeRange()
487}
488
489// KeyRange returns the min and max key across all keys in the file.
490func (t *TSMReader) KeyRange() ([]byte, []byte) {
491	return t.index.KeyRange()
492}
493
494// KeyCount returns the count of unique keys in the TSMReader.
495func (t *TSMReader) KeyCount() int {
496	return t.index.KeyCount()
497}
498
499// Entries returns all index entries for key.
500func (t *TSMReader) Entries(key []byte) []IndexEntry {
501	return t.index.Entries(key)
502}
503
504// ReadEntries reads the index entries for key into entries.
505func (t *TSMReader) ReadEntries(key []byte, entries *[]IndexEntry) []IndexEntry {
506	return t.index.ReadEntries(key, entries)
507}
508
509// IndexSize returns the size of the index in bytes.
510func (t *TSMReader) IndexSize() uint32 {
511	return t.index.Size()
512}
513
514// Size returns the size of the underlying file in bytes.
515func (t *TSMReader) Size() uint32 {
516	t.mu.RLock()
517	size := t.size
518	t.mu.RUnlock()
519	return uint32(size)
520}
521
522// LastModified returns the last time the underlying file was modified.
523func (t *TSMReader) LastModified() int64 {
524	t.mu.RLock()
525	lm := t.lastModified
526	for _, ts := range t.tombstoner.TombstoneFiles() {
527		if ts.LastModified > lm {
528			lm = ts.LastModified
529		}
530	}
531	t.mu.RUnlock()
532	return lm
533}
534
535// HasTombstones return true if there are any tombstone entries recorded.
536func (t *TSMReader) HasTombstones() bool {
537	t.mu.RLock()
538	b := t.tombstoner.HasTombstones()
539	t.mu.RUnlock()
540	return b
541}
542
543// TombstoneFiles returns any tombstone files associated with this TSM file.
544func (t *TSMReader) TombstoneFiles() []FileStat {
545	t.mu.RLock()
546	fs := t.tombstoner.TombstoneFiles()
547	t.mu.RUnlock()
548	return fs
549}
550
551// TombstoneRange returns ranges of time that are deleted for the given key.
552func (t *TSMReader) TombstoneRange(key []byte) []TimeRange {
553	t.mu.RLock()
554	tr := t.index.TombstoneRange(key)
555	t.mu.RUnlock()
556	return tr
557}
558
559// Stats returns the FileStat for the TSMReader's underlying file.
560func (t *TSMReader) Stats() FileStat {
561	minTime, maxTime := t.index.TimeRange()
562	minKey, maxKey := t.index.KeyRange()
563	return FileStat{
564		Path:         t.Path(),
565		Size:         t.Size(),
566		LastModified: t.LastModified(),
567		MinTime:      minTime,
568		MaxTime:      maxTime,
569		MinKey:       minKey,
570		MaxKey:       maxKey,
571		HasTombstone: t.tombstoner.HasTombstones(),
572	}
573}
574
575// BlockIterator returns a BlockIterator for the underlying TSM file.
576func (t *TSMReader) BlockIterator() *BlockIterator {
577	return &BlockIterator{
578		r: t,
579		n: t.index.KeyCount(),
580	}
581}
582
583type BatchDeleter interface {
584	DeleteRange(keys [][]byte, min, max int64) error
585	Commit() error
586	Rollback() error
587}
588
589type batchDelete struct {
590	r *TSMReader
591}
592
593func (b *batchDelete) DeleteRange(keys [][]byte, minTime, maxTime int64) error {
594	if len(keys) == 0 {
595		return nil
596	}
597
598	// If the keys can't exist in this TSM file, skip it.
599	minKey, maxKey := keys[0], keys[len(keys)-1]
600	if !b.r.index.OverlapsKeyRange(minKey, maxKey) {
601		return nil
602	}
603
604	// If the timerange can't exist in this TSM file, skip it.
605	if !b.r.index.OverlapsTimeRange(minTime, maxTime) {
606		return nil
607	}
608
609	if err := b.r.tombstoner.AddRange(keys, minTime, maxTime); err != nil {
610		return err
611	}
612
613	return nil
614}
615
616func (b *batchDelete) Commit() error {
617	defer b.r.deleteMu.Unlock()
618	if err := b.r.tombstoner.Flush(); err != nil {
619		return err
620	}
621
622	return b.r.applyTombstones()
623}
624
625func (b *batchDelete) Rollback() error {
626	defer b.r.deleteMu.Unlock()
627	return b.r.tombstoner.Rollback()
628}
629
630// BatchDelete returns a BatchDeleter.  Only a single goroutine may run a BatchDelete at a time.
631// Callers must either Commit or Rollback the operation.
632func (r *TSMReader) BatchDelete() BatchDeleter {
633	r.deleteMu.Lock()
634	return &batchDelete{r: r}
635}
636
637type BatchDeleters []BatchDeleter
638
639func (a BatchDeleters) DeleteRange(keys [][]byte, min, max int64) error {
640	errC := make(chan error, len(a))
641	for _, b := range a {
642		go func(b BatchDeleter) { errC <- b.DeleteRange(keys, min, max) }(b)
643	}
644
645	var err error
646	for i := 0; i < len(a); i++ {
647		dErr := <-errC
648		if dErr != nil {
649			err = dErr
650		}
651	}
652	return err
653}
654
655func (a BatchDeleters) Commit() error {
656	errC := make(chan error, len(a))
657	for _, b := range a {
658		go func(b BatchDeleter) { errC <- b.Commit() }(b)
659	}
660
661	var err error
662	for i := 0; i < len(a); i++ {
663		dErr := <-errC
664		if dErr != nil {
665			err = dErr
666		}
667	}
668	return err
669}
670
671func (a BatchDeleters) Rollback() error {
672	errC := make(chan error, len(a))
673	for _, b := range a {
674		go func(b BatchDeleter) { errC <- b.Rollback() }(b)
675	}
676
677	var err error
678	for i := 0; i < len(a); i++ {
679		dErr := <-errC
680		if dErr != nil {
681			err = dErr
682		}
683	}
684	return err
685}
686
687// indirectIndex is a TSMIndex that uses a raw byte slice representation of an index.  This
688// implementation can be used for indexes that may be MMAPed into memory.
689type indirectIndex struct {
690	mu sync.RWMutex
691
692	// indirectIndex works a follows.  Assuming we have an index structure in memory as
693	// the diagram below:
694	//
695	// ┌────────────────────────────────────────────────────────────────────┐
696	// │                               Index                                │
697	// ├─┬──────────────────────┬──┬───────────────────────┬───┬────────────┘
698	// │0│                      │62│                       │145│
699	// ├─┴───────┬─────────┬────┼──┴──────┬─────────┬──────┼───┴─────┬──────┐
700	// │Key 1 Len│   Key   │... │Key 2 Len│  Key 2  │ ...  │  Key 3  │ ...  │
701	// │ 2 bytes │ N bytes │    │ 2 bytes │ N bytes │      │ 2 bytes │      │
702	// └─────────┴─────────┴────┴─────────┴─────────┴──────┴─────────┴──────┘
703
704	// We would build an `offsets` slices where each element pointers to the byte location
705	// for the first key in the index slice.
706
707	// ┌────────────────────────────────────────────────────────────────────┐
708	// │                              Offsets                               │
709	// ├────┬────┬────┬─────────────────────────────────────────────────────┘
710	// │ 0  │ 62 │145 │
711	// └────┴────┴────┘
712
713	// Using this offset slice we can find `Key 2` by doing a binary search
714	// over the offsets slice.  Instead of comparing the value in the offsets
715	// (e.g. `62`), we use that as an index into the underlying index to
716	// retrieve the key at position `62` and perform our comparisons with that.
717
718	// When we have identified the correct position in the index for a given
719	// key, we could perform another binary search or a linear scan.  This
720	// should be fast as well since each index entry is 28 bytes and all
721	// contiguous in memory.  The current implementation uses a linear scan since the
722	// number of block entries is expected to be < 100 per key.
723
724	// b is the underlying index byte slice.  This could be a copy on the heap or an MMAP
725	// slice reference
726	b []byte
727
728	// offsets contains the positions in b for each key.  It points to the 2 byte length of
729	// key.
730	offsets []byte
731
732	// minKey, maxKey are the minium and maximum (lexicographically sorted) contained in the
733	// file
734	minKey, maxKey []byte
735
736	// minTime, maxTime are the minimum and maximum times contained in the file across all
737	// series.
738	minTime, maxTime int64
739
740	// tombstones contains only the tombstoned keys with subset of time values deleted.  An
741	// entry would exist here if a subset of the points for a key were deleted and the file
742	// had not be re-compacted to remove the points on disk.
743	tombstones map[string][]TimeRange
744}
745
746// TimeRange holds a min and max timestamp.
747type TimeRange struct {
748	Min, Max int64
749}
750
751func (t TimeRange) Overlaps(min, max int64) bool {
752	return t.Min <= max && t.Max >= min
753}
754
755// NewIndirectIndex returns a new indirect index.
756func NewIndirectIndex() *indirectIndex {
757	return &indirectIndex{
758		tombstones: make(map[string][]TimeRange),
759	}
760}
761
762func (d *indirectIndex) offset(i int) int {
763	if i < 0 || i+4 > len(d.offsets) {
764		return -1
765	}
766	return int(binary.BigEndian.Uint32(d.offsets[i*4 : i*4+4]))
767}
768
769func (d *indirectIndex) Seek(key []byte) int {
770	d.mu.RLock()
771	defer d.mu.RUnlock()
772	return d.searchOffset(key)
773}
774
775// searchOffset searches the offsets slice for key and returns the position in
776// offsets where key would exist.
777func (d *indirectIndex) searchOffset(key []byte) int {
778	// We use a binary search across our indirect offsets (pointers to all the keys
779	// in the index slice).
780	i := bytesutil.SearchBytesFixed(d.offsets, 4, func(x []byte) bool {
781		// i is the position in offsets we are at so get offset it points to
782		offset := int32(binary.BigEndian.Uint32(x))
783
784		// It's pointing to the start of the key which is a 2 byte length
785		keyLen := int32(binary.BigEndian.Uint16(d.b[offset : offset+2]))
786
787		// See if it matches
788		return bytes.Compare(d.b[offset+2:offset+2+keyLen], key) >= 0
789	})
790
791	// See if we might have found the right index
792	if i < len(d.offsets) {
793		return int(i / 4)
794	}
795
796	// The key is not in the index.  i is the index where it would be inserted so return
797	// a value outside our offset range.
798	return int(len(d.offsets)) / 4
799}
800
801// search returns the byte position of key in the index.  If key is not
802// in the index, len(index) is returned.
803func (d *indirectIndex) search(key []byte) int {
804	if !d.ContainsKey(key) {
805		return len(d.b)
806	}
807
808	// We use a binary search across our indirect offsets (pointers to all the keys
809	// in the index slice).
810	// TODO(sgc): this should be inlined to `indirectIndex` as it is only used here
811	i := bytesutil.SearchBytesFixed(d.offsets, 4, func(x []byte) bool {
812		// i is the position in offsets we are at so get offset it points to
813		offset := int32(binary.BigEndian.Uint32(x))
814
815		// It's pointing to the start of the key which is a 2 byte length
816		keyLen := int32(binary.BigEndian.Uint16(d.b[offset : offset+2]))
817
818		// See if it matches
819		return bytes.Compare(d.b[offset+2:offset+2+keyLen], key) >= 0
820	})
821
822	// See if we might have found the right index
823	if i < len(d.offsets) {
824		ofs := binary.BigEndian.Uint32(d.offsets[i : i+4])
825		_, k := readKey(d.b[ofs:])
826
827		// The search may have returned an i == 0 which could indicated that the value
828		// searched should be inserted at position 0.  Make sure the key in the index
829		// matches the search value.
830		if !bytes.Equal(key, k) {
831			return len(d.b)
832		}
833
834		return int(ofs)
835	}
836
837	// The key is not in the index.  i is the index where it would be inserted so return
838	// a value outside our offset range.
839	return len(d.b)
840}
841
842// ContainsKey returns true of key may exist in this index.
843func (d *indirectIndex) ContainsKey(key []byte) bool {
844	return bytes.Compare(key, d.minKey) >= 0 && bytes.Compare(key, d.maxKey) <= 0
845}
846
847// Entries returns all index entries for a key.
848func (d *indirectIndex) Entries(key []byte) []IndexEntry {
849	return d.ReadEntries(key, nil)
850}
851
852func (d *indirectIndex) readEntriesAt(ofs int, entries *[]IndexEntry) ([]byte, []IndexEntry) {
853	n, k := readKey(d.b[ofs:])
854
855	// Read and return all the entries
856	ofs += n
857	var ie indexEntries
858	if entries != nil {
859		ie.entries = *entries
860	}
861	if _, err := readEntries(d.b[ofs:], &ie); err != nil {
862		panic(fmt.Sprintf("error reading entries: %v", err))
863	}
864	if entries != nil {
865		*entries = ie.entries
866	}
867	return k, ie.entries
868}
869
870// ReadEntries returns all index entries for a key.
871func (d *indirectIndex) ReadEntries(key []byte, entries *[]IndexEntry) []IndexEntry {
872	d.mu.RLock()
873	defer d.mu.RUnlock()
874
875	ofs := d.search(key)
876	if ofs < len(d.b) {
877		k, entries := d.readEntriesAt(ofs, entries)
878		// The search may have returned an i == 0 which could indicated that the value
879		// searched should be inserted at position 0.  Make sure the key in the index
880		// matches the search value.
881		if !bytes.Equal(key, k) {
882			return nil
883		}
884
885		return entries
886	}
887
888	// The key is not in the index.  i is the index where it would be inserted.
889	return nil
890}
891
892// Entry returns the index entry for the specified key and timestamp.  If no entry
893// matches the key an timestamp, nil is returned.
894func (d *indirectIndex) Entry(key []byte, timestamp int64) *IndexEntry {
895	entries := d.Entries(key)
896	for _, entry := range entries {
897		if entry.Contains(timestamp) {
898			return &entry
899		}
900	}
901	return nil
902}
903
904// Key returns the key in the index at the given position.
905func (d *indirectIndex) Key(idx int, entries *[]IndexEntry) ([]byte, byte, []IndexEntry) {
906	d.mu.RLock()
907	defer d.mu.RUnlock()
908
909	if idx < 0 || idx*4+4 > len(d.offsets) {
910		return nil, 0, nil
911	}
912	ofs := binary.BigEndian.Uint32(d.offsets[idx*4 : idx*4+4])
913	n, key := readKey(d.b[ofs:])
914
915	typ := d.b[int(ofs)+n]
916
917	var ie indexEntries
918	if entries != nil {
919		ie.entries = *entries
920	}
921	if _, err := readEntries(d.b[int(ofs)+n:], &ie); err != nil {
922		return nil, 0, nil
923	}
924	if entries != nil {
925		*entries = ie.entries
926	}
927
928	return key, typ, ie.entries
929}
930
931// KeyAt returns the key in the index at the given position.
932func (d *indirectIndex) KeyAt(idx int) ([]byte, byte) {
933	d.mu.RLock()
934
935	if idx < 0 || idx*4+4 > len(d.offsets) {
936		d.mu.RUnlock()
937		return nil, 0
938	}
939	ofs := int32(binary.BigEndian.Uint32(d.offsets[idx*4 : idx*4+4]))
940
941	n, key := readKey(d.b[ofs:])
942	ofs = ofs + int32(n)
943	typ := d.b[ofs]
944	d.mu.RUnlock()
945	return key, typ
946}
947
948// KeyCount returns the count of unique keys in the index.
949func (d *indirectIndex) KeyCount() int {
950	d.mu.RLock()
951	n := len(d.offsets) / 4
952	d.mu.RUnlock()
953	return n
954}
955
956// Delete removes the given keys from the index.
957func (d *indirectIndex) Delete(keys [][]byte) {
958	if len(keys) == 0 {
959		return
960	}
961
962	if !bytesutil.IsSorted(keys) {
963		bytesutil.Sort(keys)
964	}
965
966	// Both keys and offsets are sorted.  Walk both in order and skip
967	// any keys that exist in both.
968	d.mu.Lock()
969	start := d.searchOffset(keys[0])
970	for i := start * 4; i+4 <= len(d.offsets) && len(keys) > 0; i += 4 {
971		offset := binary.BigEndian.Uint32(d.offsets[i : i+4])
972		_, indexKey := readKey(d.b[offset:])
973
974		for len(keys) > 0 && bytes.Compare(keys[0], indexKey) < 0 {
975			keys = keys[1:]
976		}
977
978		if len(keys) > 0 && bytes.Equal(keys[0], indexKey) {
979			keys = keys[1:]
980			copy(d.offsets[i:i+4], nilOffset)
981		}
982	}
983	d.offsets = bytesutil.Pack(d.offsets, 4, 255)
984	d.mu.Unlock()
985}
986
987// DeleteRange removes the given keys with data between minTime and maxTime from the index.
988func (d *indirectIndex) DeleteRange(keys [][]byte, minTime, maxTime int64) {
989	// No keys, nothing to do
990	if len(keys) == 0 {
991		return
992	}
993
994	if !bytesutil.IsSorted(keys) {
995		bytesutil.Sort(keys)
996	}
997
998	// If we're deleting the max time range, just use tombstoning to remove the
999	// key from the offsets slice
1000	if minTime == math.MinInt64 && maxTime == math.MaxInt64 {
1001		d.Delete(keys)
1002		return
1003	}
1004
1005	// Is the range passed in outside of the time range for the file?
1006	min, max := d.TimeRange()
1007	if minTime > max || maxTime < min {
1008		return
1009	}
1010
1011	fullKeys := make([][]byte, 0, len(keys))
1012	tombstones := map[string][]TimeRange{}
1013	var ie []IndexEntry
1014
1015	for i := 0; len(keys) > 0 && i < d.KeyCount(); i++ {
1016		k, entries := d.readEntriesAt(d.offset(i), &ie)
1017
1018		// Skip any keys that don't exist.  These are less than the current key.
1019		for len(keys) > 0 && bytes.Compare(keys[0], k) < 0 {
1020			keys = keys[1:]
1021		}
1022
1023		// No more keys to delete, we're done.
1024		if len(keys) == 0 {
1025			break
1026		}
1027
1028		// If the current key is greater than the index one, continue to the next
1029		// index key.
1030		if len(keys) > 0 && bytes.Compare(keys[0], k) > 0 {
1031			continue
1032		}
1033
1034		// If multiple tombstones are saved for the same key
1035		if len(entries) == 0 {
1036			continue
1037		}
1038
1039		// Is the time range passed outside of the time range we've have stored for this key?
1040		min, max := entries[0].MinTime, entries[len(entries)-1].MaxTime
1041		if minTime > max || maxTime < min {
1042			continue
1043		}
1044
1045		// Does the range passed in cover every value for the key?
1046		if minTime <= min && maxTime >= max {
1047			fullKeys = append(fullKeys, keys[0])
1048			keys = keys[1:]
1049			continue
1050		}
1051
1052		d.mu.RLock()
1053		existing := d.tombstones[string(k)]
1054		d.mu.RUnlock()
1055
1056		// Append the new tombonstes to the existing ones
1057		newTs := append(existing, append(tombstones[string(k)], TimeRange{minTime, maxTime})...)
1058		fn := func(i, j int) bool {
1059			a, b := newTs[i], newTs[j]
1060			if a.Min == b.Min {
1061				return a.Max <= b.Max
1062			}
1063			return a.Min < b.Min
1064		}
1065
1066		// Sort the updated tombstones if necessary
1067		if len(newTs) > 1 && !sort.SliceIsSorted(newTs, fn) {
1068			sort.Slice(newTs, fn)
1069		}
1070
1071		tombstones[string(k)] = newTs
1072
1073		// We need to see if all the tombstones end up deleting the entire series.  This
1074		// could happen if their is one tombstore with min,max time spanning all the block
1075		// time ranges or from multiple smaller tombstones the delete segments.  To detect
1076		// this cases, we use a window starting at the first tombstone and grow it be each
1077		// tombstone that is immediately adjacent to the current window or if it overlaps.
1078		// If there are any gaps, we abort.
1079		minTs, maxTs := newTs[0].Min, newTs[0].Max
1080		for j := 1; j < len(newTs); j++ {
1081			prevTs := newTs[j-1]
1082			ts := newTs[j]
1083
1084			// Make sure all the tombstone line up for a continuous range.  We don't
1085			// want to have two small deletes on each edges end up causing us to
1086			// remove the full key.
1087			if prevTs.Max != ts.Min-1 && !prevTs.Overlaps(ts.Min, ts.Max) {
1088				minTs, maxTs = int64(math.MaxInt64), int64(math.MinInt64)
1089				break
1090			}
1091
1092			if ts.Min < minTs {
1093				minTs = ts.Min
1094			}
1095			if ts.Max > maxTs {
1096				maxTs = ts.Max
1097			}
1098		}
1099
1100		// If we have a fully deleted series, delete it all of it.
1101		if minTs <= min && maxTs >= max {
1102			fullKeys = append(fullKeys, keys[0])
1103			keys = keys[1:]
1104			continue
1105		}
1106	}
1107
1108	// Delete all the keys that fully deleted in bulk
1109	if len(fullKeys) > 0 {
1110		d.Delete(fullKeys)
1111	}
1112
1113	if len(tombstones) == 0 {
1114		return
1115	}
1116
1117	d.mu.Lock()
1118	for k, v := range tombstones {
1119		d.tombstones[k] = v
1120	}
1121	d.mu.Unlock()
1122}
1123
1124// TombstoneRange returns ranges of time that are deleted for the given key.
1125func (d *indirectIndex) TombstoneRange(key []byte) []TimeRange {
1126	d.mu.RLock()
1127	r := d.tombstones[string(key)]
1128	d.mu.RUnlock()
1129	return r
1130}
1131
1132// Contains return true if the given key exists in the index.
1133func (d *indirectIndex) Contains(key []byte) bool {
1134	return len(d.Entries(key)) > 0
1135}
1136
1137// ContainsValue returns true if key and time might exist in this file.
1138func (d *indirectIndex) ContainsValue(key []byte, timestamp int64) bool {
1139	entry := d.Entry(key, timestamp)
1140	if entry == nil {
1141		return false
1142	}
1143
1144	d.mu.RLock()
1145	tombstones := d.tombstones[string(key)]
1146	d.mu.RUnlock()
1147
1148	for _, t := range tombstones {
1149		if t.Min <= timestamp && t.Max >= timestamp {
1150			return false
1151		}
1152	}
1153	return true
1154}
1155
1156// Type returns the block type of the values stored for the key.
1157func (d *indirectIndex) Type(key []byte) (byte, error) {
1158	d.mu.RLock()
1159	defer d.mu.RUnlock()
1160
1161	ofs := d.search(key)
1162	if ofs < len(d.b) {
1163		n, _ := readKey(d.b[ofs:])
1164		ofs += n
1165		return d.b[ofs], nil
1166	}
1167	return 0, fmt.Errorf("key does not exist: %s", key)
1168}
1169
1170// OverlapsTimeRange returns true if the time range of the file intersect min and max.
1171func (d *indirectIndex) OverlapsTimeRange(min, max int64) bool {
1172	return d.minTime <= max && d.maxTime >= min
1173}
1174
1175// OverlapsKeyRange returns true if the min and max keys of the file overlap the arguments min and max.
1176func (d *indirectIndex) OverlapsKeyRange(min, max []byte) bool {
1177	return bytes.Compare(d.minKey, max) <= 0 && bytes.Compare(d.maxKey, min) >= 0
1178}
1179
1180// KeyRange returns the min and max keys in the index.
1181func (d *indirectIndex) KeyRange() ([]byte, []byte) {
1182	return d.minKey, d.maxKey
1183}
1184
1185// TimeRange returns the min and max time across all keys in the index.
1186func (d *indirectIndex) TimeRange() (int64, int64) {
1187	return d.minTime, d.maxTime
1188}
1189
1190// MarshalBinary returns a byte slice encoded version of the index.
1191func (d *indirectIndex) MarshalBinary() ([]byte, error) {
1192	d.mu.RLock()
1193	defer d.mu.RUnlock()
1194
1195	return d.b, nil
1196}
1197
1198// UnmarshalBinary populates an index from an encoded byte slice
1199// representation of an index.
1200func (d *indirectIndex) UnmarshalBinary(b []byte) error {
1201	d.mu.Lock()
1202	defer d.mu.Unlock()
1203
1204	// Keep a reference to the actual index bytes
1205	d.b = b
1206	if len(b) == 0 {
1207		return nil
1208	}
1209
1210	//var minKey, maxKey []byte
1211	var minTime, maxTime int64 = math.MaxInt64, 0
1212
1213	// To create our "indirect" index, we need to find the location of all the keys in
1214	// the raw byte slice.  The keys are listed once each (in sorted order).  Following
1215	// each key is a time ordered list of index entry blocks for that key.  The loop below
1216	// basically skips across the slice keeping track of the counter when we are at a key
1217	// field.
1218	var i int32
1219	var offsets []int32
1220	iMax := int32(len(b))
1221	for i < iMax {
1222		offsets = append(offsets, i)
1223
1224		// Skip to the start of the values
1225		// key length value (2) + type (1) + length of key
1226		if i+2 >= iMax {
1227			return fmt.Errorf("indirectIndex: not enough data for key length value")
1228		}
1229		i += 3 + int32(binary.BigEndian.Uint16(b[i:i+2]))
1230
1231		// count of index entries
1232		if i+indexCountSize >= iMax {
1233			return fmt.Errorf("indirectIndex: not enough data for index entries count")
1234		}
1235		count := int32(binary.BigEndian.Uint16(b[i : i+indexCountSize]))
1236		i += indexCountSize
1237
1238		// Find the min time for the block
1239		if i+8 >= iMax {
1240			return fmt.Errorf("indirectIndex: not enough data for min time")
1241		}
1242		minT := int64(binary.BigEndian.Uint64(b[i : i+8]))
1243		if minT < minTime {
1244			minTime = minT
1245		}
1246
1247		i += (count - 1) * indexEntrySize
1248
1249		// Find the max time for the block
1250		if i+16 >= iMax {
1251			return fmt.Errorf("indirectIndex: not enough data for max time")
1252		}
1253		maxT := int64(binary.BigEndian.Uint64(b[i+8 : i+16]))
1254		if maxT > maxTime {
1255			maxTime = maxT
1256		}
1257
1258		i += indexEntrySize
1259	}
1260
1261	firstOfs := offsets[0]
1262	_, key := readKey(b[firstOfs:])
1263	d.minKey = key
1264
1265	lastOfs := offsets[len(offsets)-1]
1266	_, key = readKey(b[lastOfs:])
1267	d.maxKey = key
1268
1269	d.minTime = minTime
1270	d.maxTime = maxTime
1271
1272	var err error
1273	d.offsets, err = mmap(nil, 0, len(offsets)*4)
1274	if err != nil {
1275		return err
1276	}
1277	for i, v := range offsets {
1278		binary.BigEndian.PutUint32(d.offsets[i*4:i*4+4], uint32(v))
1279	}
1280
1281	return nil
1282}
1283
1284// Size returns the size of the current index in bytes.
1285func (d *indirectIndex) Size() uint32 {
1286	d.mu.RLock()
1287	defer d.mu.RUnlock()
1288
1289	return uint32(len(d.b))
1290}
1291
1292func (d *indirectIndex) Close() error {
1293	// Windows doesn't use the anonymous map for the offsets index
1294	if runtime.GOOS == "windows" {
1295		return nil
1296	}
1297	return munmap(d.offsets[:cap(d.offsets)])
1298}
1299
1300// mmapAccess is mmap based block accessor.  It access blocks through an
1301// MMAP file interface.
1302type mmapAccessor struct {
1303	accessCount uint64 // Counter incremented everytime the mmapAccessor is accessed
1304	freeCount   uint64 // Counter to determine whether the accessor can free its resources
1305
1306	mmapWillNeed bool // If true then mmap advise value MADV_WILLNEED will be provided the kernel for b.
1307
1308	mu sync.RWMutex
1309	b  []byte
1310	f  *os.File
1311
1312	index *indirectIndex
1313}
1314
1315func (m *mmapAccessor) init() (*indirectIndex, error) {
1316	m.mu.Lock()
1317	defer m.mu.Unlock()
1318
1319	if err := verifyVersion(m.f); err != nil {
1320		return nil, err
1321	}
1322
1323	var err error
1324
1325	if _, err := m.f.Seek(0, 0); err != nil {
1326		return nil, err
1327	}
1328
1329	stat, err := m.f.Stat()
1330	if err != nil {
1331		return nil, err
1332	}
1333
1334	m.b, err = mmap(m.f, 0, int(stat.Size()))
1335	if err != nil {
1336		return nil, err
1337	}
1338	if len(m.b) < 8 {
1339		return nil, fmt.Errorf("mmapAccessor: byte slice too small for indirectIndex")
1340	}
1341
1342	// Hint to the kernel that we will be reading the file.  It would be better to hint
1343	// that we will be reading the index section, but that's not been
1344	// implemented as yet.
1345	if m.mmapWillNeed {
1346		if err := madviseWillNeed(m.b); err != nil {
1347			return nil, err
1348		}
1349	}
1350
1351	indexOfsPos := len(m.b) - 8
1352	indexStart := binary.BigEndian.Uint64(m.b[indexOfsPos : indexOfsPos+8])
1353	if indexStart >= uint64(indexOfsPos) {
1354		return nil, fmt.Errorf("mmapAccessor: invalid indexStart")
1355	}
1356
1357	m.index = NewIndirectIndex()
1358	if err := m.index.UnmarshalBinary(m.b[indexStart:indexOfsPos]); err != nil {
1359		return nil, err
1360	}
1361
1362	// Allow resources to be freed immediately if requested
1363	m.incAccess()
1364	atomic.StoreUint64(&m.freeCount, 1)
1365
1366	return m.index, nil
1367}
1368
1369func (m *mmapAccessor) free() error {
1370	accessCount := atomic.LoadUint64(&m.accessCount)
1371	freeCount := atomic.LoadUint64(&m.freeCount)
1372
1373	// Already freed everything.
1374	if freeCount == 0 && accessCount == 0 {
1375		return nil
1376	}
1377
1378	// Were there accesses after the last time we tried to free?
1379	// If so, don't free anything and record the access count that we
1380	// see now for the next check.
1381	if accessCount != freeCount {
1382		atomic.StoreUint64(&m.freeCount, accessCount)
1383		return nil
1384	}
1385
1386	// Reset both counters to zero to indicate that we have freed everything.
1387	atomic.StoreUint64(&m.accessCount, 0)
1388	atomic.StoreUint64(&m.freeCount, 0)
1389
1390	m.mu.RLock()
1391	defer m.mu.RUnlock()
1392
1393	return madviseDontNeed(m.b)
1394}
1395
1396func (m *mmapAccessor) incAccess() {
1397	atomic.AddUint64(&m.accessCount, 1)
1398}
1399
1400func (m *mmapAccessor) rename(path string) error {
1401	m.incAccess()
1402
1403	m.mu.Lock()
1404	defer m.mu.Unlock()
1405
1406	err := munmap(m.b)
1407	if err != nil {
1408		return err
1409	}
1410
1411	if err := m.f.Close(); err != nil {
1412		return err
1413	}
1414
1415	if err := file.RenameFile(m.f.Name(), path); err != nil {
1416		return err
1417	}
1418
1419	m.f, err = os.Open(path)
1420	if err != nil {
1421		return err
1422	}
1423
1424	if _, err := m.f.Seek(0, 0); err != nil {
1425		return err
1426	}
1427
1428	stat, err := m.f.Stat()
1429	if err != nil {
1430		return err
1431	}
1432
1433	m.b, err = mmap(m.f, 0, int(stat.Size()))
1434	if err != nil {
1435		return err
1436	}
1437
1438	if m.mmapWillNeed {
1439		return madviseWillNeed(m.b)
1440	}
1441	return nil
1442}
1443
1444func (m *mmapAccessor) read(key []byte, timestamp int64) ([]Value, error) {
1445	entry := m.index.Entry(key, timestamp)
1446	if entry == nil {
1447		return nil, nil
1448	}
1449
1450	return m.readBlock(entry, nil)
1451}
1452
1453func (m *mmapAccessor) readBlock(entry *IndexEntry, values []Value) ([]Value, error) {
1454	m.incAccess()
1455
1456	m.mu.RLock()
1457	defer m.mu.RUnlock()
1458
1459	if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
1460		return nil, ErrTSMClosed
1461	}
1462	//TODO: Validate checksum
1463	var err error
1464	values, err = DecodeBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values)
1465	if err != nil {
1466		return nil, err
1467	}
1468
1469	return values, nil
1470}
1471
1472func (m *mmapAccessor) readBytes(entry *IndexEntry, b []byte) (uint32, []byte, error) {
1473	m.incAccess()
1474
1475	m.mu.RLock()
1476	if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
1477		m.mu.RUnlock()
1478		return 0, nil, ErrTSMClosed
1479	}
1480
1481	// return the bytes after the 4 byte checksum
1482	crc, block := binary.BigEndian.Uint32(m.b[entry.Offset:entry.Offset+4]), m.b[entry.Offset+4:entry.Offset+int64(entry.Size)]
1483	m.mu.RUnlock()
1484
1485	return crc, block, nil
1486}
1487
1488// readAll returns all values for a key in all blocks.
1489func (m *mmapAccessor) readAll(key []byte) ([]Value, error) {
1490	m.incAccess()
1491
1492	blocks := m.index.Entries(key)
1493	if len(blocks) == 0 {
1494		return nil, nil
1495	}
1496
1497	tombstones := m.index.TombstoneRange(key)
1498
1499	m.mu.RLock()
1500	defer m.mu.RUnlock()
1501
1502	var temp []Value
1503	var err error
1504	var values []Value
1505	for _, block := range blocks {
1506		var skip bool
1507		for _, t := range tombstones {
1508			// Should we skip this block because it contains points that have been deleted
1509			if t.Min <= block.MinTime && t.Max >= block.MaxTime {
1510				skip = true
1511				break
1512			}
1513		}
1514
1515		if skip {
1516			continue
1517		}
1518		//TODO: Validate checksum
1519		temp = temp[:0]
1520		// The +4 is the 4 byte checksum length
1521		temp, err = DecodeBlock(m.b[block.Offset+4:block.Offset+int64(block.Size)], temp)
1522		if err != nil {
1523			return nil, err
1524		}
1525
1526		// Filter out any values that were deleted
1527		for _, t := range tombstones {
1528			temp = Values(temp).Exclude(t.Min, t.Max)
1529		}
1530
1531		values = append(values, temp...)
1532	}
1533
1534	return values, nil
1535}
1536
1537func (m *mmapAccessor) path() string {
1538	m.mu.RLock()
1539	path := m.f.Name()
1540	m.mu.RUnlock()
1541	return path
1542}
1543
1544func (m *mmapAccessor) close() error {
1545	m.mu.Lock()
1546	defer m.mu.Unlock()
1547
1548	if m.b == nil {
1549		return nil
1550	}
1551
1552	err := munmap(m.b)
1553	if err != nil {
1554		return err
1555	}
1556
1557	m.b = nil
1558	return m.f.Close()
1559}
1560
1561type indexEntries struct {
1562	Type    byte
1563	entries []IndexEntry
1564}
1565
1566func (a *indexEntries) Len() int      { return len(a.entries) }
1567func (a *indexEntries) Swap(i, j int) { a.entries[i], a.entries[j] = a.entries[j], a.entries[i] }
1568func (a *indexEntries) Less(i, j int) bool {
1569	return a.entries[i].MinTime < a.entries[j].MinTime
1570}
1571
1572func (a *indexEntries) MarshalBinary() ([]byte, error) {
1573	buf := make([]byte, len(a.entries)*indexEntrySize)
1574
1575	for i, entry := range a.entries {
1576		entry.AppendTo(buf[indexEntrySize*i:])
1577	}
1578
1579	return buf, nil
1580}
1581
1582func (a *indexEntries) WriteTo(w io.Writer) (total int64, err error) {
1583	var buf [indexEntrySize]byte
1584	var n int
1585
1586	for _, entry := range a.entries {
1587		entry.AppendTo(buf[:])
1588		n, err = w.Write(buf[:])
1589		total += int64(n)
1590		if err != nil {
1591			return total, err
1592		}
1593	}
1594
1595	return total, nil
1596}
1597
1598func readKey(b []byte) (n int, key []byte) {
1599	// 2 byte size of key
1600	n, size := 2, int(binary.BigEndian.Uint16(b[:2]))
1601
1602	// N byte key
1603	key = b[n : n+size]
1604
1605	n += len(key)
1606	return
1607}
1608
1609func readEntries(b []byte, entries *indexEntries) (n int, err error) {
1610	if len(b) < 1+indexCountSize {
1611		return 0, fmt.Errorf("readEntries: data too short for headers")
1612	}
1613
1614	// 1 byte block type
1615	entries.Type = b[n]
1616	n++
1617
1618	// 2 byte count of index entries
1619	count := int(binary.BigEndian.Uint16(b[n : n+indexCountSize]))
1620	n += indexCountSize
1621
1622	if cap(entries.entries) < count {
1623		entries.entries = make([]IndexEntry, count)
1624	} else {
1625		entries.entries = entries.entries[:count]
1626	}
1627
1628	b = b[indexCountSize+indexTypeSize:]
1629	for i := 0; i < len(entries.entries); i++ {
1630		if err = entries.entries[i].UnmarshalBinary(b); err != nil {
1631			return 0, fmt.Errorf("readEntries: unmarshal error: %v", err)
1632		}
1633		b = b[indexEntrySize:]
1634	}
1635
1636	n += count * indexEntrySize
1637
1638	return
1639}
1640