1package tsm1
2
3import (
4	"bytes"
5	"encoding/binary"
6	"fmt"
7	"io"
8	"math"
9	"os"
10	"runtime"
11	"sort"
12	"sync"
13	"sync/atomic"
14
15	"github.com/influxdata/influxdb/pkg/bytesutil"
16	"github.com/influxdata/influxdb/pkg/file"
17	"github.com/influxdata/influxdb/tsdb"
18)
19
20// ErrFileInUse is returned when attempting to remove or close a TSM file that is still being used.
21var ErrFileInUse = fmt.Errorf("file still in use")
22
23// nilOffset is the value written to the offsets to indicate that position is deleted.  The value is the max
24// uint32 which is an invalid position.  We don't use 0 as 0 is actually a valid position.
25var nilOffset = []byte{255, 255, 255, 255}
26
27// TSMReader is a reader for a TSM file.
28type TSMReader struct {
29	// refs is the count of active references to this reader.
30	refs   int64
31	refsWG sync.WaitGroup
32
33	madviseWillNeed bool // Hint to the kernel with MADV_WILLNEED.
34	mu              sync.RWMutex
35
36	// accessor provides access and decoding of blocks for the reader.
37	accessor blockAccessor
38
39	// index is the index of all blocks.
40	index TSMIndex
41
42	// tombstoner ensures tombstoned keys are not available by the index.
43	tombstoner *Tombstoner
44
45	// size is the size of the file on disk.
46	size int64
47
48	// lastModified is the last time this file was modified on disk
49	lastModified int64
50
51	// deleteMu limits concurrent deletes
52	deleteMu sync.Mutex
53}
54
55// TSMIndex represent the index section of a TSM file.  The index records all
56// blocks, their locations, sizes, min and max times.
57type TSMIndex interface {
58	// Delete removes the given keys from the index.
59	Delete(keys [][]byte)
60
61	// DeleteRange removes the given keys with data between minTime and maxTime from the index.
62	DeleteRange(keys [][]byte, minTime, maxTime int64)
63
64	// ContainsKey returns true if the given key may exist in the index.  This func is faster than
65	// Contains but, may return false positives.
66	ContainsKey(key []byte) bool
67
68	// Contains return true if the given key exists in the index.
69	Contains(key []byte) bool
70
71	// ContainsValue returns true if key and time might exist in this file.  This function could
72	// return true even though the actual point does not exists.  For example, the key may
73	// exist in this file, but not have a point exactly at time t.
74	ContainsValue(key []byte, timestamp int64) bool
75
76	// Entries returns all index entries for a key.
77	Entries(key []byte) []IndexEntry
78
79	// ReadEntries reads the index entries for key into entries.
80	ReadEntries(key []byte, entries *[]IndexEntry) []IndexEntry
81
82	// Entry returns the index entry for the specified key and timestamp.  If no entry
83	// matches the key and timestamp, nil is returned.
84	Entry(key []byte, timestamp int64) *IndexEntry
85
86	// Key returns the key in the index at the given position, using entries to avoid allocations.
87	Key(index int, entries *[]IndexEntry) ([]byte, byte, []IndexEntry)
88
89	// KeyAt returns the key in the index at the given position.
90	KeyAt(index int) ([]byte, byte)
91
92	// KeyCount returns the count of unique keys in the index.
93	KeyCount() int
94
95	// Seek returns the position in the index where key <= value in the index.
96	Seek(key []byte) int
97
98	// OverlapsTimeRange returns true if the time range of the file intersect min and max.
99	OverlapsTimeRange(min, max int64) bool
100
101	// OverlapsKeyRange returns true if the min and max keys of the file overlap the arguments min and max.
102	OverlapsKeyRange(min, max []byte) bool
103
104	// Size returns the size of the current index in bytes.
105	Size() uint32
106
107	// TimeRange returns the min and max time across all keys in the file.
108	TimeRange() (int64, int64)
109
110	// TombstoneRange returns ranges of time that are deleted for the given key.
111	TombstoneRange(key []byte) []TimeRange
112
113	// KeyRange returns the min and max keys in the file.
114	KeyRange() ([]byte, []byte)
115
116	// Type returns the block type of the values stored for the key.  Returns one of
117	// BlockFloat64, BlockInt64, BlockBool, BlockString.  If key does not exist,
118	// an error is returned.
119	Type(key []byte) (byte, error)
120
121	// UnmarshalBinary populates an index from an encoded byte slice
122	// representation of an index.
123	UnmarshalBinary(b []byte) error
124
125	// Close closes the index and releases any resources.
126	Close() error
127}
128
129// BlockIterator allows iterating over each block in a TSM file in order.  It provides
130// raw access to the block bytes without decoding them.
131type BlockIterator struct {
132	r *TSMReader
133
134	// i is the current key index
135	i int
136
137	// n is the total number of keys
138	n int
139
140	key     []byte
141	cache   []IndexEntry
142	entries []IndexEntry
143	err     error
144	typ     byte
145}
146
147// PeekNext returns the next key to be iterated or an empty string.
148func (b *BlockIterator) PeekNext() []byte {
149	if len(b.entries) > 1 {
150		return b.key
151	} else if b.n-b.i > 1 {
152		key, _ := b.r.KeyAt(b.i + 1)
153		return key
154	}
155	return nil
156}
157
158// Next returns true if there are more blocks to iterate through.
159func (b *BlockIterator) Next() bool {
160	if b.err != nil {
161		return false
162	}
163
164	if b.n-b.i == 0 && len(b.entries) == 0 {
165		return false
166	}
167
168	if len(b.entries) > 0 {
169		b.entries = b.entries[1:]
170		if len(b.entries) > 0 {
171			return true
172		}
173	}
174
175	if b.n-b.i > 0 {
176		b.key, b.typ, b.entries = b.r.Key(b.i, &b.cache)
177		b.i++
178
179		// If there were deletes on the TSMReader, then our index is now off and we
180		// can't proceed.  What we just read may not actually the next block.
181		if b.n != b.r.KeyCount() {
182			b.err = fmt.Errorf("delete during iteration")
183			return false
184		}
185
186		if len(b.entries) > 0 {
187			return true
188		}
189	}
190
191	return false
192}
193
194// Read reads information about the next block to be iterated.
195func (b *BlockIterator) Read() (key []byte, minTime int64, maxTime int64, typ byte, checksum uint32, buf []byte, err error) {
196	if b.err != nil {
197		return nil, 0, 0, 0, 0, nil, b.err
198	}
199	checksum, buf, err = b.r.ReadBytes(&b.entries[0], nil)
200	if err != nil {
201		b.err = err
202		return nil, 0, 0, 0, 0, nil, err
203	}
204	return b.key, b.entries[0].MinTime, b.entries[0].MaxTime, b.typ, checksum, buf, err
205}
206
207// Err returns any errors encounter during iteration.
208func (b *BlockIterator) Err() error {
209	return b.err
210}
211
212type tsmReaderOption func(*TSMReader)
213
214// WithMadviseWillNeed is an option for specifying whether to provide a MADV_WILL need hint to the kernel.
215var WithMadviseWillNeed = func(willNeed bool) tsmReaderOption {
216	return func(r *TSMReader) {
217		r.madviseWillNeed = willNeed
218	}
219}
220
221// NewTSMReader returns a new TSMReader from the given file.
222func NewTSMReader(f *os.File, options ...tsmReaderOption) (*TSMReader, error) {
223	t := &TSMReader{}
224	for _, option := range options {
225		option(t)
226	}
227
228	stat, err := f.Stat()
229	if err != nil {
230		return nil, err
231	}
232	t.size = stat.Size()
233	t.lastModified = stat.ModTime().UnixNano()
234	t.accessor = &mmapAccessor{
235		f:            f,
236		mmapWillNeed: t.madviseWillNeed,
237	}
238
239	index, err := t.accessor.init()
240	if err != nil {
241		return nil, err
242	}
243
244	t.index = index
245	t.tombstoner = NewTombstoner(t.Path(), index.ContainsKey)
246
247	if err := t.applyTombstones(); err != nil {
248		return nil, err
249	}
250
251	return t, nil
252}
253
254// WithObserver sets the observer for the TSM reader.
255func (t *TSMReader) WithObserver(obs tsdb.FileStoreObserver) {
256	t.tombstoner.WithObserver(obs)
257}
258
259func (t *TSMReader) applyTombstones() error {
260	var cur, prev Tombstone
261	batch := make([][]byte, 0, 4096)
262
263	if err := t.tombstoner.Walk(func(ts Tombstone) error {
264		cur = ts
265		if len(batch) > 0 {
266			if prev.Min != cur.Min || prev.Max != cur.Max {
267				t.index.DeleteRange(batch, prev.Min, prev.Max)
268				batch = batch[:0]
269			}
270		}
271
272		// Copy the tombstone key and re-use the buffers to avoid allocations
273		n := len(batch)
274		batch = batch[:n+1]
275		if cap(batch[n]) < len(ts.Key) {
276			batch[n] = make([]byte, len(ts.Key))
277		} else {
278			batch[n] = batch[n][:len(ts.Key)]
279		}
280		copy(batch[n], ts.Key)
281
282		if len(batch) >= 4096 {
283			t.index.DeleteRange(batch, prev.Min, prev.Max)
284			batch = batch[:0]
285		}
286
287		prev = ts
288		return nil
289	}); err != nil {
290		return fmt.Errorf("init: read tombstones: %v", err)
291	}
292
293	if len(batch) > 0 {
294		t.index.DeleteRange(batch, cur.Min, cur.Max)
295	}
296	return nil
297}
298
299func (t *TSMReader) Free() error {
300	t.mu.RLock()
301	defer t.mu.RUnlock()
302	return t.accessor.free()
303}
304
305// Path returns the path of the file the TSMReader was initialized with.
306func (t *TSMReader) Path() string {
307	t.mu.RLock()
308	p := t.accessor.path()
309	t.mu.RUnlock()
310	return p
311}
312
313// Key returns the key and the underlying entry at the numeric index.
314func (t *TSMReader) Key(index int, entries *[]IndexEntry) ([]byte, byte, []IndexEntry) {
315	return t.index.Key(index, entries)
316}
317
318// KeyAt returns the key and key type at position idx in the index.
319func (t *TSMReader) KeyAt(idx int) ([]byte, byte) {
320	return t.index.KeyAt(idx)
321}
322
323func (t *TSMReader) Seek(key []byte) int {
324	return t.index.Seek(key)
325}
326
327// ReadAt returns the values corresponding to the given index entry.
328func (t *TSMReader) ReadAt(entry *IndexEntry, vals []Value) ([]Value, error) {
329	t.mu.RLock()
330	v, err := t.accessor.readBlock(entry, vals)
331	t.mu.RUnlock()
332	return v, err
333}
334
335// Read returns the values corresponding to the block at the given key and timestamp.
336func (t *TSMReader) Read(key []byte, timestamp int64) ([]Value, error) {
337	t.mu.RLock()
338	v, err := t.accessor.read(key, timestamp)
339	t.mu.RUnlock()
340	return v, err
341}
342
343// ReadAll returns all values for a key in all blocks.
344func (t *TSMReader) ReadAll(key []byte) ([]Value, error) {
345	t.mu.RLock()
346	v, err := t.accessor.readAll(key)
347	t.mu.RUnlock()
348	return v, err
349}
350
351func (t *TSMReader) ReadBytes(e *IndexEntry, b []byte) (uint32, []byte, error) {
352	t.mu.RLock()
353	n, v, err := t.accessor.readBytes(e, b)
354	t.mu.RUnlock()
355	return n, v, err
356}
357
358// Type returns the type of values stored at the given key.
359func (t *TSMReader) Type(key []byte) (byte, error) {
360	return t.index.Type(key)
361}
362
363// Close closes the TSMReader.
364func (t *TSMReader) Close() error {
365	t.refsWG.Wait()
366
367	t.mu.Lock()
368	defer t.mu.Unlock()
369
370	if err := t.accessor.close(); err != nil {
371		return err
372	}
373
374	return t.index.Close()
375}
376
377// Ref records a usage of this TSMReader.  If there are active references
378// when the reader is closed or removed, the reader will remain open until
379// there are no more references.
380func (t *TSMReader) Ref() {
381	atomic.AddInt64(&t.refs, 1)
382	t.refsWG.Add(1)
383}
384
385// Unref removes a usage record of this TSMReader.  If the Reader was closed
386// by another goroutine while there were active references, the file will
387// be closed and remove
388func (t *TSMReader) Unref() {
389	atomic.AddInt64(&t.refs, -1)
390	t.refsWG.Done()
391}
392
393// InUse returns whether the TSMReader currently has any active references.
394func (t *TSMReader) InUse() bool {
395	refs := atomic.LoadInt64(&t.refs)
396	return refs > 0
397}
398
399// Remove removes any underlying files stored on disk for this reader.
400func (t *TSMReader) Remove() error {
401	t.mu.Lock()
402	defer t.mu.Unlock()
403	return t.remove()
404}
405
406// Rename renames the underlying file to the new path.
407func (t *TSMReader) Rename(path string) error {
408	t.mu.Lock()
409	defer t.mu.Unlock()
410	return t.accessor.rename(path)
411}
412
413// Remove removes any underlying files stored on disk for this reader.
414func (t *TSMReader) remove() error {
415	path := t.accessor.path()
416
417	if t.InUse() {
418		return ErrFileInUse
419	}
420
421	if path != "" {
422		err := os.RemoveAll(path)
423		if err != nil {
424			return err
425		}
426	}
427
428	if err := t.tombstoner.Delete(); err != nil {
429		return err
430	}
431	return nil
432}
433
434// Contains returns whether the given key is present in the index.
435func (t *TSMReader) Contains(key []byte) bool {
436	return t.index.Contains(key)
437}
438
439// ContainsValue returns true if key and time might exists in this file.  This function could
440// return true even though the actual point does not exist.  For example, the key may
441// exist in this file, but not have a point exactly at time t.
442func (t *TSMReader) ContainsValue(key []byte, ts int64) bool {
443	return t.index.ContainsValue(key, ts)
444}
445
446// DeleteRange removes the given points for keys between minTime and maxTime.   The series
447// keys passed in must be sorted.
448func (t *TSMReader) DeleteRange(keys [][]byte, minTime, maxTime int64) error {
449	if len(keys) == 0 {
450		return nil
451	}
452
453	batch := t.BatchDelete()
454	if err := batch.DeleteRange(keys, minTime, maxTime); err != nil {
455		batch.Rollback()
456		return err
457	}
458	return batch.Commit()
459}
460
461// Delete deletes blocks indicated by keys.
462func (t *TSMReader) Delete(keys [][]byte) error {
463	if err := t.tombstoner.Add(keys); err != nil {
464		return err
465	}
466
467	if err := t.tombstoner.Flush(); err != nil {
468		return err
469	}
470
471	t.index.Delete(keys)
472	return nil
473}
474
475// OverlapsTimeRange returns true if the time range of the file intersect min and max.
476func (t *TSMReader) OverlapsTimeRange(min, max int64) bool {
477	return t.index.OverlapsTimeRange(min, max)
478}
479
480// OverlapsKeyRange returns true if the key range of the file intersect min and max.
481func (t *TSMReader) OverlapsKeyRange(min, max []byte) bool {
482	return t.index.OverlapsKeyRange(min, max)
483}
484
485// TimeRange returns the min and max time across all keys in the file.
486func (t *TSMReader) TimeRange() (int64, int64) {
487	return t.index.TimeRange()
488}
489
490// KeyRange returns the min and max key across all keys in the file.
491func (t *TSMReader) KeyRange() ([]byte, []byte) {
492	return t.index.KeyRange()
493}
494
495// KeyCount returns the count of unique keys in the TSMReader.
496func (t *TSMReader) KeyCount() int {
497	return t.index.KeyCount()
498}
499
500// Entries returns all index entries for key.
501func (t *TSMReader) Entries(key []byte) []IndexEntry {
502	return t.index.Entries(key)
503}
504
505// ReadEntries reads the index entries for key into entries.
506func (t *TSMReader) ReadEntries(key []byte, entries *[]IndexEntry) []IndexEntry {
507	return t.index.ReadEntries(key, entries)
508}
509
510// IndexSize returns the size of the index in bytes.
511func (t *TSMReader) IndexSize() uint32 {
512	return t.index.Size()
513}
514
515// Size returns the size of the underlying file in bytes.
516func (t *TSMReader) Size() uint32 {
517	t.mu.RLock()
518	size := t.size
519	t.mu.RUnlock()
520	return uint32(size)
521}
522
523// LastModified returns the last time the underlying file was modified.
524func (t *TSMReader) LastModified() int64 {
525	t.mu.RLock()
526	lm := t.lastModified
527	for _, ts := range t.tombstoner.TombstoneFiles() {
528		if ts.LastModified > lm {
529			lm = ts.LastModified
530		}
531	}
532	t.mu.RUnlock()
533	return lm
534}
535
536// HasTombstones return true if there are any tombstone entries recorded.
537func (t *TSMReader) HasTombstones() bool {
538	t.mu.RLock()
539	b := t.tombstoner.HasTombstones()
540	t.mu.RUnlock()
541	return b
542}
543
544// TombstoneFiles returns any tombstone files associated with this TSM file.
545func (t *TSMReader) TombstoneFiles() []FileStat {
546	t.mu.RLock()
547	fs := t.tombstoner.TombstoneFiles()
548	t.mu.RUnlock()
549	return fs
550}
551
552// TombstoneRange returns ranges of time that are deleted for the given key.
553func (t *TSMReader) TombstoneRange(key []byte) []TimeRange {
554	t.mu.RLock()
555	tr := t.index.TombstoneRange(key)
556	t.mu.RUnlock()
557	return tr
558}
559
560// Stats returns the FileStat for the TSMReader's underlying file.
561func (t *TSMReader) Stats() FileStat {
562	minTime, maxTime := t.index.TimeRange()
563	minKey, maxKey := t.index.KeyRange()
564	return FileStat{
565		Path:         t.Path(),
566		Size:         t.Size(),
567		LastModified: t.LastModified(),
568		MinTime:      minTime,
569		MaxTime:      maxTime,
570		MinKey:       minKey,
571		MaxKey:       maxKey,
572		HasTombstone: t.tombstoner.HasTombstones(),
573	}
574}
575
576// BlockIterator returns a BlockIterator for the underlying TSM file.
577func (t *TSMReader) BlockIterator() *BlockIterator {
578	return &BlockIterator{
579		r: t,
580		n: t.index.KeyCount(),
581	}
582}
583
584type BatchDeleter interface {
585	DeleteRange(keys [][]byte, min, max int64) error
586	Commit() error
587	Rollback() error
588}
589
590type batchDelete struct {
591	r *TSMReader
592}
593
594func (b *batchDelete) DeleteRange(keys [][]byte, minTime, maxTime int64) error {
595	if len(keys) == 0 {
596		return nil
597	}
598
599	// If the keys can't exist in this TSM file, skip it.
600	minKey, maxKey := keys[0], keys[len(keys)-1]
601	if !b.r.index.OverlapsKeyRange(minKey, maxKey) {
602		return nil
603	}
604
605	// If the timerange can't exist in this TSM file, skip it.
606	if !b.r.index.OverlapsTimeRange(minTime, maxTime) {
607		return nil
608	}
609
610	if err := b.r.tombstoner.AddRange(keys, minTime, maxTime); err != nil {
611		return err
612	}
613
614	return nil
615}
616
617func (b *batchDelete) Commit() error {
618	defer b.r.deleteMu.Unlock()
619	if err := b.r.tombstoner.Flush(); err != nil {
620		return err
621	}
622
623	return b.r.applyTombstones()
624}
625
626func (b *batchDelete) Rollback() error {
627	defer b.r.deleteMu.Unlock()
628	return b.r.tombstoner.Rollback()
629}
630
631// BatchDelete returns a BatchDeleter.  Only a single goroutine may run a BatchDelete at a time.
632// Callers must either Commit or Rollback the operation.
633func (r *TSMReader) BatchDelete() BatchDeleter {
634	r.deleteMu.Lock()
635	return &batchDelete{r: r}
636}
637
638type BatchDeleters []BatchDeleter
639
640func (a BatchDeleters) DeleteRange(keys [][]byte, min, max int64) error {
641	errC := make(chan error, len(a))
642	for _, b := range a {
643		go func(b BatchDeleter) { errC <- b.DeleteRange(keys, min, max) }(b)
644	}
645
646	var err error
647	for i := 0; i < len(a); i++ {
648		dErr := <-errC
649		if dErr != nil {
650			err = dErr
651		}
652	}
653	return err
654}
655
656func (a BatchDeleters) Commit() error {
657	errC := make(chan error, len(a))
658	for _, b := range a {
659		go func(b BatchDeleter) { errC <- b.Commit() }(b)
660	}
661
662	var err error
663	for i := 0; i < len(a); i++ {
664		dErr := <-errC
665		if dErr != nil {
666			err = dErr
667		}
668	}
669	return err
670}
671
672func (a BatchDeleters) Rollback() error {
673	errC := make(chan error, len(a))
674	for _, b := range a {
675		go func(b BatchDeleter) { errC <- b.Rollback() }(b)
676	}
677
678	var err error
679	for i := 0; i < len(a); i++ {
680		dErr := <-errC
681		if dErr != nil {
682			err = dErr
683		}
684	}
685	return err
686}
687
688// indirectIndex is a TSMIndex that uses a raw byte slice representation of an index.  This
689// implementation can be used for indexes that may be MMAPed into memory.
690type indirectIndex struct {
691	mu sync.RWMutex
692
693	// indirectIndex works a follows.  Assuming we have an index structure in memory as
694	// the diagram below:
695	//
696	// ┌────────────────────────────────────────────────────────────────────┐
697	// │                               Index                                │
698	// ├─┬──────────────────────┬──┬───────────────────────┬───┬────────────┘
699	// │0│                      │62│                       │145│
700	// ├─┴───────┬─────────┬────┼──┴──────┬─────────┬──────┼───┴─────┬──────┐
701	// │Key 1 Len│   Key   │... │Key 2 Len│  Key 2  │ ...  │  Key 3  │ ...  │
702	// │ 2 bytes │ N bytes │    │ 2 bytes │ N bytes │      │ 2 bytes │      │
703	// └─────────┴─────────┴────┴─────────┴─────────┴──────┴─────────┴──────┘
704
705	// We would build an `offsets` slices where each element pointers to the byte location
706	// for the first key in the index slice.
707
708	// ┌────────────────────────────────────────────────────────────────────┐
709	// │                              Offsets                               │
710	// ├────┬────┬────┬─────────────────────────────────────────────────────┘
711	// │ 0  │ 62 │145 │
712	// └────┴────┴────┘
713
714	// Using this offset slice we can find `Key 2` by doing a binary search
715	// over the offsets slice.  Instead of comparing the value in the offsets
716	// (e.g. `62`), we use that as an index into the underlying index to
717	// retrieve the key at position `62` and perform our comparisons with that.
718
719	// When we have identified the correct position in the index for a given
720	// key, we could perform another binary search or a linear scan.  This
721	// should be fast as well since each index entry is 28 bytes and all
722	// contiguous in memory.  The current implementation uses a linear scan since the
723	// number of block entries is expected to be < 100 per key.
724
725	// b is the underlying index byte slice.  This could be a copy on the heap or an MMAP
726	// slice reference
727	b []byte
728
729	// offsets contains the positions in b for each key.  It points to the 2 byte length of
730	// key.
731	offsets []byte
732
733	// minKey, maxKey are the minium and maximum (lexicographically sorted) contained in the
734	// file
735	minKey, maxKey []byte
736
737	// minTime, maxTime are the minimum and maximum times contained in the file across all
738	// series.
739	minTime, maxTime int64
740
741	// tombstones contains only the tombstoned keys with subset of time values deleted.  An
742	// entry would exist here if a subset of the points for a key were deleted and the file
743	// had not be re-compacted to remove the points on disk.
744	tombstones map[string][]TimeRange
745}
746
747// TimeRange holds a min and max timestamp.
748type TimeRange struct {
749	Min, Max int64
750}
751
752func (t TimeRange) Overlaps(min, max int64) bool {
753	return t.Min <= max && t.Max >= min
754}
755
756// NewIndirectIndex returns a new indirect index.
757func NewIndirectIndex() *indirectIndex {
758	return &indirectIndex{
759		tombstones: make(map[string][]TimeRange),
760	}
761}
762
763func (d *indirectIndex) offset(i int) int {
764	if i < 0 || i+4 > len(d.offsets) {
765		return -1
766	}
767	return int(binary.BigEndian.Uint32(d.offsets[i*4 : i*4+4]))
768}
769
770func (d *indirectIndex) Seek(key []byte) int {
771	d.mu.RLock()
772	defer d.mu.RUnlock()
773	return d.searchOffset(key)
774}
775
776// searchOffset searches the offsets slice for key and returns the position in
777// offsets where key would exist.
778func (d *indirectIndex) searchOffset(key []byte) int {
779	// We use a binary search across our indirect offsets (pointers to all the keys
780	// in the index slice).
781	i := bytesutil.SearchBytesFixed(d.offsets, 4, func(x []byte) bool {
782		// i is the position in offsets we are at so get offset it points to
783		offset := int32(binary.BigEndian.Uint32(x))
784
785		// It's pointing to the start of the key which is a 2 byte length
786		keyLen := int32(binary.BigEndian.Uint16(d.b[offset : offset+2]))
787
788		// See if it matches
789		return bytes.Compare(d.b[offset+2:offset+2+keyLen], key) >= 0
790	})
791
792	// See if we might have found the right index
793	if i < len(d.offsets) {
794		return int(i / 4)
795	}
796
797	// The key is not in the index.  i is the index where it would be inserted so return
798	// a value outside our offset range.
799	return int(len(d.offsets)) / 4
800}
801
802// search returns the byte position of key in the index.  If key is not
803// in the index, len(index) is returned.
804func (d *indirectIndex) search(key []byte) int {
805	if !d.ContainsKey(key) {
806		return len(d.b)
807	}
808
809	// We use a binary search across our indirect offsets (pointers to all the keys
810	// in the index slice).
811	// TODO(sgc): this should be inlined to `indirectIndex` as it is only used here
812	i := bytesutil.SearchBytesFixed(d.offsets, 4, func(x []byte) bool {
813		// i is the position in offsets we are at so get offset it points to
814		offset := int32(binary.BigEndian.Uint32(x))
815
816		// It's pointing to the start of the key which is a 2 byte length
817		keyLen := int32(binary.BigEndian.Uint16(d.b[offset : offset+2]))
818
819		// See if it matches
820		return bytes.Compare(d.b[offset+2:offset+2+keyLen], key) >= 0
821	})
822
823	// See if we might have found the right index
824	if i < len(d.offsets) {
825		ofs := binary.BigEndian.Uint32(d.offsets[i : i+4])
826		_, k := readKey(d.b[ofs:])
827
828		// The search may have returned an i == 0 which could indicated that the value
829		// searched should be inserted at position 0.  Make sure the key in the index
830		// matches the search value.
831		if !bytes.Equal(key, k) {
832			return len(d.b)
833		}
834
835		return int(ofs)
836	}
837
838	// The key is not in the index.  i is the index where it would be inserted so return
839	// a value outside our offset range.
840	return len(d.b)
841}
842
843// ContainsKey returns true of key may exist in this index.
844func (d *indirectIndex) ContainsKey(key []byte) bool {
845	return bytes.Compare(key, d.minKey) >= 0 && bytes.Compare(key, d.maxKey) <= 0
846}
847
848// Entries returns all index entries for a key.
849func (d *indirectIndex) Entries(key []byte) []IndexEntry {
850	return d.ReadEntries(key, nil)
851}
852
853func (d *indirectIndex) readEntriesAt(ofs int, entries *[]IndexEntry) ([]byte, []IndexEntry) {
854	n, k := readKey(d.b[ofs:])
855
856	// Read and return all the entries
857	ofs += n
858	var ie indexEntries
859	if entries != nil {
860		ie.entries = *entries
861	}
862	if _, err := readEntries(d.b[ofs:], &ie); err != nil {
863		panic(fmt.Sprintf("error reading entries: %v", err))
864	}
865	if entries != nil {
866		*entries = ie.entries
867	}
868	return k, ie.entries
869}
870
871// ReadEntries returns all index entries for a key.
872func (d *indirectIndex) ReadEntries(key []byte, entries *[]IndexEntry) []IndexEntry {
873	d.mu.RLock()
874	defer d.mu.RUnlock()
875
876	ofs := d.search(key)
877	if ofs < len(d.b) {
878		k, entries := d.readEntriesAt(ofs, entries)
879		// The search may have returned an i == 0 which could indicated that the value
880		// searched should be inserted at position 0.  Make sure the key in the index
881		// matches the search value.
882		if !bytes.Equal(key, k) {
883			return nil
884		}
885
886		return entries
887	}
888
889	// The key is not in the index.  i is the index where it would be inserted.
890	return nil
891}
892
893// Entry returns the index entry for the specified key and timestamp.  If no entry
894// matches the key an timestamp, nil is returned.
895func (d *indirectIndex) Entry(key []byte, timestamp int64) *IndexEntry {
896	entries := d.Entries(key)
897	for _, entry := range entries {
898		if entry.Contains(timestamp) {
899			return &entry
900		}
901	}
902	return nil
903}
904
905// Key returns the key in the index at the given position.
906func (d *indirectIndex) Key(idx int, entries *[]IndexEntry) ([]byte, byte, []IndexEntry) {
907	d.mu.RLock()
908	defer d.mu.RUnlock()
909
910	if idx < 0 || idx*4+4 > len(d.offsets) {
911		return nil, 0, nil
912	}
913	ofs := binary.BigEndian.Uint32(d.offsets[idx*4 : idx*4+4])
914	n, key := readKey(d.b[ofs:])
915
916	typ := d.b[int(ofs)+n]
917
918	var ie indexEntries
919	if entries != nil {
920		ie.entries = *entries
921	}
922	if _, err := readEntries(d.b[int(ofs)+n:], &ie); err != nil {
923		return nil, 0, nil
924	}
925	if entries != nil {
926		*entries = ie.entries
927	}
928
929	return key, typ, ie.entries
930}
931
932// KeyAt returns the key in the index at the given position.
933func (d *indirectIndex) KeyAt(idx int) ([]byte, byte) {
934	d.mu.RLock()
935
936	if idx < 0 || idx*4+4 > len(d.offsets) {
937		d.mu.RUnlock()
938		return nil, 0
939	}
940	ofs := int32(binary.BigEndian.Uint32(d.offsets[idx*4 : idx*4+4]))
941
942	n, key := readKey(d.b[ofs:])
943	ofs = ofs + int32(n)
944	typ := d.b[ofs]
945	d.mu.RUnlock()
946	return key, typ
947}
948
949// KeyCount returns the count of unique keys in the index.
950func (d *indirectIndex) KeyCount() int {
951	d.mu.RLock()
952	n := len(d.offsets) / 4
953	d.mu.RUnlock()
954	return n
955}
956
957// Delete removes the given keys from the index.
958func (d *indirectIndex) Delete(keys [][]byte) {
959	if len(keys) == 0 {
960		return
961	}
962
963	if !bytesutil.IsSorted(keys) {
964		bytesutil.Sort(keys)
965	}
966
967	// Both keys and offsets are sorted.  Walk both in order and skip
968	// any keys that exist in both.
969	d.mu.Lock()
970	start := d.searchOffset(keys[0])
971	for i := start * 4; i+4 <= len(d.offsets) && len(keys) > 0; i += 4 {
972		offset := binary.BigEndian.Uint32(d.offsets[i : i+4])
973		_, indexKey := readKey(d.b[offset:])
974
975		for len(keys) > 0 && bytes.Compare(keys[0], indexKey) < 0 {
976			keys = keys[1:]
977		}
978
979		if len(keys) > 0 && bytes.Equal(keys[0], indexKey) {
980			keys = keys[1:]
981			copy(d.offsets[i:i+4], nilOffset)
982		}
983	}
984	d.offsets = bytesutil.Pack(d.offsets, 4, 255)
985	d.mu.Unlock()
986}
987
988// DeleteRange removes the given keys with data between minTime and maxTime from the index.
989func (d *indirectIndex) DeleteRange(keys [][]byte, minTime, maxTime int64) {
990	// No keys, nothing to do
991	if len(keys) == 0 {
992		return
993	}
994
995	if !bytesutil.IsSorted(keys) {
996		bytesutil.Sort(keys)
997	}
998
999	// If we're deleting the max time range, just use tombstoning to remove the
1000	// key from the offsets slice
1001	if minTime == math.MinInt64 && maxTime == math.MaxInt64 {
1002		d.Delete(keys)
1003		return
1004	}
1005
1006	// Is the range passed in outside of the time range for the file?
1007	min, max := d.TimeRange()
1008	if minTime > max || maxTime < min {
1009		return
1010	}
1011
1012	fullKeys := make([][]byte, 0, len(keys))
1013	tombstones := map[string][]TimeRange{}
1014	var ie []IndexEntry
1015
1016	for i := 0; len(keys) > 0 && i < d.KeyCount(); i++ {
1017		k, entries := d.readEntriesAt(d.offset(i), &ie)
1018
1019		// Skip any keys that don't exist.  These are less than the current key.
1020		for len(keys) > 0 && bytes.Compare(keys[0], k) < 0 {
1021			keys = keys[1:]
1022		}
1023
1024		// No more keys to delete, we're done.
1025		if len(keys) == 0 {
1026			break
1027		}
1028
1029		// If the current key is greater than the index one, continue to the next
1030		// index key.
1031		if len(keys) > 0 && bytes.Compare(keys[0], k) > 0 {
1032			continue
1033		}
1034
1035		// If multiple tombstones are saved for the same key
1036		if len(entries) == 0 {
1037			continue
1038		}
1039
1040		// Is the time range passed outside of the time range we've have stored for this key?
1041		min, max := entries[0].MinTime, entries[len(entries)-1].MaxTime
1042		if minTime > max || maxTime < min {
1043			continue
1044		}
1045
1046		// Does the range passed in cover every value for the key?
1047		if minTime <= min && maxTime >= max {
1048			fullKeys = append(fullKeys, keys[0])
1049			keys = keys[1:]
1050			continue
1051		}
1052
1053		d.mu.RLock()
1054		existing := d.tombstones[string(k)]
1055		d.mu.RUnlock()
1056
1057		// Append the new tombonstes to the existing ones
1058		newTs := append(existing, append(tombstones[string(k)], TimeRange{minTime, maxTime})...)
1059		fn := func(i, j int) bool {
1060			a, b := newTs[i], newTs[j]
1061			if a.Min == b.Min {
1062				return a.Max <= b.Max
1063			}
1064			return a.Min < b.Min
1065		}
1066
1067		// Sort the updated tombstones if necessary
1068		if len(newTs) > 1 && !sort.SliceIsSorted(newTs, fn) {
1069			sort.Slice(newTs, fn)
1070		}
1071
1072		tombstones[string(k)] = newTs
1073
1074		// We need to see if all the tombstones end up deleting the entire series.  This
1075		// could happen if their is one tombstore with min,max time spanning all the block
1076		// time ranges or from multiple smaller tombstones the delete segments.  To detect
1077		// this cases, we use a window starting at the first tombstone and grow it be each
1078		// tombstone that is immediately adjacent to the current window or if it overlaps.
1079		// If there are any gaps, we abort.
1080		minTs, maxTs := newTs[0].Min, newTs[0].Max
1081		for j := 1; j < len(newTs); j++ {
1082			prevTs := newTs[j-1]
1083			ts := newTs[j]
1084
1085			// Make sure all the tombstone line up for a continuous range.  We don't
1086			// want to have two small deletes on each edges end up causing us to
1087			// remove the full key.
1088			if prevTs.Max != ts.Min-1 && !prevTs.Overlaps(ts.Min, ts.Max) {
1089				minTs, maxTs = int64(math.MaxInt64), int64(math.MinInt64)
1090				break
1091			}
1092
1093			if ts.Min < minTs {
1094				minTs = ts.Min
1095			}
1096			if ts.Max > maxTs {
1097				maxTs = ts.Max
1098			}
1099		}
1100
1101		// If we have a fully deleted series, delete it all of it.
1102		if minTs <= min && maxTs >= max {
1103			fullKeys = append(fullKeys, keys[0])
1104			keys = keys[1:]
1105			continue
1106		}
1107	}
1108
1109	// Delete all the keys that fully deleted in bulk
1110	if len(fullKeys) > 0 {
1111		d.Delete(fullKeys)
1112	}
1113
1114	if len(tombstones) == 0 {
1115		return
1116	}
1117
1118	d.mu.Lock()
1119	for k, v := range tombstones {
1120		d.tombstones[k] = v
1121	}
1122	d.mu.Unlock()
1123}
1124
1125// TombstoneRange returns ranges of time that are deleted for the given key.
1126func (d *indirectIndex) TombstoneRange(key []byte) []TimeRange {
1127	d.mu.RLock()
1128	r := d.tombstones[string(key)]
1129	d.mu.RUnlock()
1130	return r
1131}
1132
1133// Contains return true if the given key exists in the index.
1134func (d *indirectIndex) Contains(key []byte) bool {
1135	return len(d.Entries(key)) > 0
1136}
1137
1138// ContainsValue returns true if key and time might exist in this file.
1139func (d *indirectIndex) ContainsValue(key []byte, timestamp int64) bool {
1140	entry := d.Entry(key, timestamp)
1141	if entry == nil {
1142		return false
1143	}
1144
1145	d.mu.RLock()
1146	tombstones := d.tombstones[string(key)]
1147	d.mu.RUnlock()
1148
1149	for _, t := range tombstones {
1150		if t.Min <= timestamp && t.Max >= timestamp {
1151			return false
1152		}
1153	}
1154	return true
1155}
1156
1157// Type returns the block type of the values stored for the key.
1158func (d *indirectIndex) Type(key []byte) (byte, error) {
1159	d.mu.RLock()
1160	defer d.mu.RUnlock()
1161
1162	ofs := d.search(key)
1163	if ofs < len(d.b) {
1164		n, _ := readKey(d.b[ofs:])
1165		ofs += n
1166		return d.b[ofs], nil
1167	}
1168	return 0, fmt.Errorf("key does not exist: %s", key)
1169}
1170
1171// OverlapsTimeRange returns true if the time range of the file intersect min and max.
1172func (d *indirectIndex) OverlapsTimeRange(min, max int64) bool {
1173	return d.minTime <= max && d.maxTime >= min
1174}
1175
1176// OverlapsKeyRange returns true if the min and max keys of the file overlap the arguments min and max.
1177func (d *indirectIndex) OverlapsKeyRange(min, max []byte) bool {
1178	return bytes.Compare(d.minKey, max) <= 0 && bytes.Compare(d.maxKey, min) >= 0
1179}
1180
1181// KeyRange returns the min and max keys in the index.
1182func (d *indirectIndex) KeyRange() ([]byte, []byte) {
1183	return d.minKey, d.maxKey
1184}
1185
1186// TimeRange returns the min and max time across all keys in the index.
1187func (d *indirectIndex) TimeRange() (int64, int64) {
1188	return d.minTime, d.maxTime
1189}
1190
1191// MarshalBinary returns a byte slice encoded version of the index.
1192func (d *indirectIndex) MarshalBinary() ([]byte, error) {
1193	d.mu.RLock()
1194	defer d.mu.RUnlock()
1195
1196	return d.b, nil
1197}
1198
1199// UnmarshalBinary populates an index from an encoded byte slice
1200// representation of an index.
1201func (d *indirectIndex) UnmarshalBinary(b []byte) error {
1202	d.mu.Lock()
1203	defer d.mu.Unlock()
1204
1205	// Keep a reference to the actual index bytes
1206	d.b = b
1207	if len(b) == 0 {
1208		return nil
1209	}
1210
1211	//var minKey, maxKey []byte
1212	var minTime, maxTime int64 = math.MaxInt64, 0
1213
1214	// To create our "indirect" index, we need to find the location of all the keys in
1215	// the raw byte slice.  The keys are listed once each (in sorted order).  Following
1216	// each key is a time ordered list of index entry blocks for that key.  The loop below
1217	// basically skips across the slice keeping track of the counter when we are at a key
1218	// field.
1219	var i int32
1220	var offsets []int32
1221	iMax := int32(len(b))
1222	for i < iMax {
1223		offsets = append(offsets, i)
1224
1225		// Skip to the start of the values
1226		// key length value (2) + type (1) + length of key
1227		if i+2 >= iMax {
1228			return fmt.Errorf("indirectIndex: not enough data for key length value")
1229		}
1230		i += 3 + int32(binary.BigEndian.Uint16(b[i:i+2]))
1231
1232		// count of index entries
1233		if i+indexCountSize >= iMax {
1234			return fmt.Errorf("indirectIndex: not enough data for index entries count")
1235		}
1236		count := int32(binary.BigEndian.Uint16(b[i : i+indexCountSize]))
1237		i += indexCountSize
1238
1239		// Find the min time for the block
1240		if i+8 >= iMax {
1241			return fmt.Errorf("indirectIndex: not enough data for min time")
1242		}
1243		minT := int64(binary.BigEndian.Uint64(b[i : i+8]))
1244		if minT < minTime {
1245			minTime = minT
1246		}
1247
1248		i += (count - 1) * indexEntrySize
1249
1250		// Find the max time for the block
1251		if i+16 >= iMax {
1252			return fmt.Errorf("indirectIndex: not enough data for max time")
1253		}
1254		maxT := int64(binary.BigEndian.Uint64(b[i+8 : i+16]))
1255		if maxT > maxTime {
1256			maxTime = maxT
1257		}
1258
1259		i += indexEntrySize
1260	}
1261
1262	firstOfs := offsets[0]
1263	_, key := readKey(b[firstOfs:])
1264	d.minKey = key
1265
1266	lastOfs := offsets[len(offsets)-1]
1267	_, key = readKey(b[lastOfs:])
1268	d.maxKey = key
1269
1270	d.minTime = minTime
1271	d.maxTime = maxTime
1272
1273	var err error
1274	d.offsets, err = mmap(nil, 0, len(offsets)*4)
1275	if err != nil {
1276		return err
1277	}
1278	for i, v := range offsets {
1279		binary.BigEndian.PutUint32(d.offsets[i*4:i*4+4], uint32(v))
1280	}
1281
1282	return nil
1283}
1284
1285// Size returns the size of the current index in bytes.
1286func (d *indirectIndex) Size() uint32 {
1287	d.mu.RLock()
1288	defer d.mu.RUnlock()
1289
1290	return uint32(len(d.b))
1291}
1292
1293func (d *indirectIndex) Close() error {
1294	// Windows doesn't use the anonymous map for the offsets index
1295	if runtime.GOOS == "windows" {
1296		return nil
1297	}
1298	return munmap(d.offsets[:cap(d.offsets)])
1299}
1300
1301// mmapAccess is mmap based block accessor.  It access blocks through an
1302// MMAP file interface.
1303type mmapAccessor struct {
1304	accessCount uint64 // Counter incremented everytime the mmapAccessor is accessed
1305	freeCount   uint64 // Counter to determine whether the accessor can free its resources
1306
1307	mmapWillNeed bool // If true then mmap advise value MADV_WILLNEED will be provided the kernel for b.
1308
1309	mu sync.RWMutex
1310	b  []byte
1311	f  *os.File
1312
1313	index *indirectIndex
1314}
1315
1316func (m *mmapAccessor) init() (*indirectIndex, error) {
1317	m.mu.Lock()
1318	defer m.mu.Unlock()
1319
1320	if err := verifyVersion(m.f); err != nil {
1321		return nil, err
1322	}
1323
1324	var err error
1325
1326	if _, err := m.f.Seek(0, 0); err != nil {
1327		return nil, err
1328	}
1329
1330	stat, err := m.f.Stat()
1331	if err != nil {
1332		return nil, err
1333	}
1334
1335	m.b, err = mmap(m.f, 0, int(stat.Size()))
1336	if err != nil {
1337		return nil, err
1338	}
1339	if len(m.b) < 8 {
1340		return nil, fmt.Errorf("mmapAccessor: byte slice too small for indirectIndex")
1341	}
1342
1343	// Hint to the kernel that we will be reading the file.  It would be better to hint
1344	// that we will be reading the index section, but that's not been
1345	// implemented as yet.
1346	if m.mmapWillNeed {
1347		if err := madviseWillNeed(m.b); err != nil {
1348			return nil, err
1349		}
1350	}
1351
1352	indexOfsPos := len(m.b) - 8
1353	indexStart := binary.BigEndian.Uint64(m.b[indexOfsPos : indexOfsPos+8])
1354	if indexStart >= uint64(indexOfsPos) {
1355		return nil, fmt.Errorf("mmapAccessor: invalid indexStart")
1356	}
1357
1358	m.index = NewIndirectIndex()
1359	if err := m.index.UnmarshalBinary(m.b[indexStart:indexOfsPos]); err != nil {
1360		return nil, err
1361	}
1362
1363	// Allow resources to be freed immediately if requested
1364	m.incAccess()
1365	atomic.StoreUint64(&m.freeCount, 1)
1366
1367	return m.index, nil
1368}
1369
1370func (m *mmapAccessor) free() error {
1371	accessCount := atomic.LoadUint64(&m.accessCount)
1372	freeCount := atomic.LoadUint64(&m.freeCount)
1373
1374	// Already freed everything.
1375	if freeCount == 0 && accessCount == 0 {
1376		return nil
1377	}
1378
1379	// Were there accesses after the last time we tried to free?
1380	// If so, don't free anything and record the access count that we
1381	// see now for the next check.
1382	if accessCount != freeCount {
1383		atomic.StoreUint64(&m.freeCount, accessCount)
1384		return nil
1385	}
1386
1387	// Reset both counters to zero to indicate that we have freed everything.
1388	atomic.StoreUint64(&m.accessCount, 0)
1389	atomic.StoreUint64(&m.freeCount, 0)
1390
1391	m.mu.RLock()
1392	defer m.mu.RUnlock()
1393
1394	return madviseDontNeed(m.b)
1395}
1396
1397func (m *mmapAccessor) incAccess() {
1398	atomic.AddUint64(&m.accessCount, 1)
1399}
1400
1401func (m *mmapAccessor) rename(path string) error {
1402	m.incAccess()
1403
1404	m.mu.Lock()
1405	defer m.mu.Unlock()
1406
1407	err := munmap(m.b)
1408	if err != nil {
1409		return err
1410	}
1411
1412	if err := m.f.Close(); err != nil {
1413		return err
1414	}
1415
1416	if err := file.RenameFile(m.f.Name(), path); err != nil {
1417		return err
1418	}
1419
1420	m.f, err = os.Open(path)
1421	if err != nil {
1422		return err
1423	}
1424
1425	if _, err := m.f.Seek(0, 0); err != nil {
1426		return err
1427	}
1428
1429	stat, err := m.f.Stat()
1430	if err != nil {
1431		return err
1432	}
1433
1434	m.b, err = mmap(m.f, 0, int(stat.Size()))
1435	if err != nil {
1436		return err
1437	}
1438
1439	if m.mmapWillNeed {
1440		return madviseWillNeed(m.b)
1441	}
1442	return nil
1443}
1444
1445func (m *mmapAccessor) read(key []byte, timestamp int64) ([]Value, error) {
1446	entry := m.index.Entry(key, timestamp)
1447	if entry == nil {
1448		return nil, nil
1449	}
1450
1451	return m.readBlock(entry, nil)
1452}
1453
1454func (m *mmapAccessor) readBlock(entry *IndexEntry, values []Value) ([]Value, error) {
1455	m.incAccess()
1456
1457	m.mu.RLock()
1458	defer m.mu.RUnlock()
1459
1460	if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
1461		return nil, ErrTSMClosed
1462	}
1463	//TODO: Validate checksum
1464	var err error
1465	values, err = DecodeBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values)
1466	if err != nil {
1467		return nil, err
1468	}
1469
1470	return values, nil
1471}
1472
1473func (m *mmapAccessor) readBytes(entry *IndexEntry, b []byte) (uint32, []byte, error) {
1474	m.incAccess()
1475
1476	m.mu.RLock()
1477	if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
1478		m.mu.RUnlock()
1479		return 0, nil, ErrTSMClosed
1480	}
1481
1482	// return the bytes after the 4 byte checksum
1483	crc, block := binary.BigEndian.Uint32(m.b[entry.Offset:entry.Offset+4]), m.b[entry.Offset+4:entry.Offset+int64(entry.Size)]
1484	m.mu.RUnlock()
1485
1486	return crc, block, nil
1487}
1488
1489// readAll returns all values for a key in all blocks.
1490func (m *mmapAccessor) readAll(key []byte) ([]Value, error) {
1491	m.incAccess()
1492
1493	blocks := m.index.Entries(key)
1494	if len(blocks) == 0 {
1495		return nil, nil
1496	}
1497
1498	tombstones := m.index.TombstoneRange(key)
1499
1500	m.mu.RLock()
1501	defer m.mu.RUnlock()
1502
1503	var temp []Value
1504	var err error
1505	var values []Value
1506	for _, block := range blocks {
1507		var skip bool
1508		for _, t := range tombstones {
1509			// Should we skip this block because it contains points that have been deleted
1510			if t.Min <= block.MinTime && t.Max >= block.MaxTime {
1511				skip = true
1512				break
1513			}
1514		}
1515
1516		if skip {
1517			continue
1518		}
1519		//TODO: Validate checksum
1520		temp = temp[:0]
1521		// The +4 is the 4 byte checksum length
1522		temp, err = DecodeBlock(m.b[block.Offset+4:block.Offset+int64(block.Size)], temp)
1523		if err != nil {
1524			return nil, err
1525		}
1526
1527		// Filter out any values that were deleted
1528		for _, t := range tombstones {
1529			temp = Values(temp).Exclude(t.Min, t.Max)
1530		}
1531
1532		values = append(values, temp...)
1533	}
1534
1535	return values, nil
1536}
1537
1538func (m *mmapAccessor) path() string {
1539	m.mu.RLock()
1540	path := m.f.Name()
1541	m.mu.RUnlock()
1542	return path
1543}
1544
1545func (m *mmapAccessor) close() error {
1546	m.mu.Lock()
1547	defer m.mu.Unlock()
1548
1549	if m.b == nil {
1550		return nil
1551	}
1552
1553	err := munmap(m.b)
1554	if err != nil {
1555		return err
1556	}
1557
1558	m.b = nil
1559	return m.f.Close()
1560}
1561
1562type indexEntries struct {
1563	Type    byte
1564	entries []IndexEntry
1565}
1566
1567func (a *indexEntries) Len() int      { return len(a.entries) }
1568func (a *indexEntries) Swap(i, j int) { a.entries[i], a.entries[j] = a.entries[j], a.entries[i] }
1569func (a *indexEntries) Less(i, j int) bool {
1570	return a.entries[i].MinTime < a.entries[j].MinTime
1571}
1572
1573func (a *indexEntries) MarshalBinary() ([]byte, error) {
1574	buf := make([]byte, len(a.entries)*indexEntrySize)
1575
1576	for i, entry := range a.entries {
1577		entry.AppendTo(buf[indexEntrySize*i:])
1578	}
1579
1580	return buf, nil
1581}
1582
1583func (a *indexEntries) WriteTo(w io.Writer) (total int64, err error) {
1584	var buf [indexEntrySize]byte
1585	var n int
1586
1587	for _, entry := range a.entries {
1588		entry.AppendTo(buf[:])
1589		n, err = w.Write(buf[:])
1590		total += int64(n)
1591		if err != nil {
1592			return total, err
1593		}
1594	}
1595
1596	return total, nil
1597}
1598
1599func readKey(b []byte) (n int, key []byte) {
1600	// 2 byte size of key
1601	n, size := 2, int(binary.BigEndian.Uint16(b[:2]))
1602
1603	// N byte key
1604	key = b[n : n+size]
1605
1606	n += len(key)
1607	return
1608}
1609
1610func readEntries(b []byte, entries *indexEntries) (n int, err error) {
1611	if len(b) < 1+indexCountSize {
1612		return 0, fmt.Errorf("readEntries: data too short for headers")
1613	}
1614
1615	// 1 byte block type
1616	entries.Type = b[n]
1617	n++
1618
1619	// 2 byte count of index entries
1620	count := int(binary.BigEndian.Uint16(b[n : n+indexCountSize]))
1621	n += indexCountSize
1622
1623	if cap(entries.entries) < count {
1624		entries.entries = make([]IndexEntry, count)
1625	} else {
1626		entries.entries = entries.entries[:count]
1627	}
1628
1629	b = b[indexCountSize+indexTypeSize:]
1630	for i := 0; i < len(entries.entries); i++ {
1631		if err = entries.entries[i].UnmarshalBinary(b); err != nil {
1632			return 0, fmt.Errorf("readEntries: unmarshal error: %v", err)
1633		}
1634		b = b[indexEntrySize:]
1635	}
1636
1637	n += count * indexEntrySize
1638
1639	return
1640}
1641