1package tsm1
2
3import (
4	"bytes"
5	"encoding/binary"
6	"fmt"
7	"io"
8	"math"
9	"os"
10	"runtime"
11	"sort"
12	"sync"
13	"sync/atomic"
14
15	"github.com/influxdata/influxdb/pkg/bytesutil"
16	"github.com/influxdata/influxdb/tsdb"
17)
18
19// ErrFileInUse is returned when attempting to remove or close a TSM file that is still being used.
20var ErrFileInUse = fmt.Errorf("file still in use")
21
22// nilOffset is the value written to the offsets to indicate that position is deleted.  The value is the max
23// uint32 which is an invalid position.  We don't use 0 as 0 is actually a valid position.
24var nilOffset = []byte{255, 255, 255, 255}
25
26// TSMReader is a reader for a TSM file.
27type TSMReader struct {
28	// refs is the count of active references to this reader.
29	refs   int64
30	refsWG sync.WaitGroup
31
32	madviseWillNeed bool // Hint to the kernel with MADV_WILLNEED.
33	mu              sync.RWMutex
34
35	// accessor provides access and decoding of blocks for the reader.
36	accessor blockAccessor
37
38	// index is the index of all blocks.
39	index TSMIndex
40
41	// tombstoner ensures tombstoned keys are not available by the index.
42	tombstoner *Tombstoner
43
44	// size is the size of the file on disk.
45	size int64
46
47	// lastModified is the last time this file was modified on disk
48	lastModified int64
49
50	// deleteMu limits concurrent deletes
51	deleteMu sync.Mutex
52}
53
54// TSMIndex represent the index section of a TSM file.  The index records all
55// blocks, their locations, sizes, min and max times.
56type TSMIndex interface {
57	// Delete removes the given keys from the index.
58	Delete(keys [][]byte)
59
60	// DeleteRange removes the given keys with data between minTime and maxTime from the index.
61	DeleteRange(keys [][]byte, minTime, maxTime int64)
62
63	// ContainsKey returns true if the given key may exist in the index.  This func is faster than
64	// Contains but, may return false positives.
65	ContainsKey(key []byte) bool
66
67	// Contains return true if the given key exists in the index.
68	Contains(key []byte) bool
69
70	// ContainsValue returns true if key and time might exist in this file.  This function could
71	// return true even though the actual point does not exists.  For example, the key may
72	// exist in this file, but not have a point exactly at time t.
73	ContainsValue(key []byte, timestamp int64) bool
74
75	// Entries returns all index entries for a key.
76	Entries(key []byte) []IndexEntry
77
78	// ReadEntries reads the index entries for key into entries.
79	ReadEntries(key []byte, entries *[]IndexEntry) []IndexEntry
80
81	// Entry returns the index entry for the specified key and timestamp.  If no entry
82	// matches the key and timestamp, nil is returned.
83	Entry(key []byte, timestamp int64) *IndexEntry
84
85	// Key returns the key in the index at the given position, using entries to avoid allocations.
86	Key(index int, entries *[]IndexEntry) ([]byte, byte, []IndexEntry)
87
88	// KeyAt returns the key in the index at the given position.
89	KeyAt(index int) ([]byte, byte)
90
91	// KeyCount returns the count of unique keys in the index.
92	KeyCount() int
93
94	// Seek returns the position in the index where key <= value in the index.
95	Seek(key []byte) int
96
97	// OverlapsTimeRange returns true if the time range of the file intersect min and max.
98	OverlapsTimeRange(min, max int64) bool
99
100	// OverlapsKeyRange returns true if the min and max keys of the file overlap the arguments min and max.
101	OverlapsKeyRange(min, max []byte) bool
102
103	// Size returns the size of the current index in bytes.
104	Size() uint32
105
106	// TimeRange returns the min and max time across all keys in the file.
107	TimeRange() (int64, int64)
108
109	// TombstoneRange returns ranges of time that are deleted for the given key.
110	TombstoneRange(key []byte) []TimeRange
111
112	// KeyRange returns the min and max keys in the file.
113	KeyRange() ([]byte, []byte)
114
115	// Type returns the block type of the values stored for the key.  Returns one of
116	// BlockFloat64, BlockInt64, BlockBool, BlockString.  If key does not exist,
117	// an error is returned.
118	Type(key []byte) (byte, error)
119
120	// UnmarshalBinary populates an index from an encoded byte slice
121	// representation of an index.
122	UnmarshalBinary(b []byte) error
123
124	// Close closes the index and releases any resources.
125	Close() error
126}
127
128// BlockIterator allows iterating over each block in a TSM file in order.  It provides
129// raw access to the block bytes without decoding them.
130type BlockIterator struct {
131	r *TSMReader
132
133	// i is the current key index
134	i int
135
136	// n is the total number of keys
137	n int
138
139	key     []byte
140	cache   []IndexEntry
141	entries []IndexEntry
142	err     error
143	typ     byte
144}
145
146// PeekNext returns the next key to be iterated or an empty string.
147func (b *BlockIterator) PeekNext() []byte {
148	if len(b.entries) > 1 {
149		return b.key
150	} else if b.n-b.i > 1 {
151		key, _ := b.r.KeyAt(b.i + 1)
152		return key
153	}
154	return nil
155}
156
157// Next returns true if there are more blocks to iterate through.
158func (b *BlockIterator) Next() bool {
159	if b.err != nil {
160		return false
161	}
162
163	if b.n-b.i == 0 && len(b.entries) == 0 {
164		return false
165	}
166
167	if len(b.entries) > 0 {
168		b.entries = b.entries[1:]
169		if len(b.entries) > 0 {
170			return true
171		}
172	}
173
174	if b.n-b.i > 0 {
175		b.key, b.typ, b.entries = b.r.Key(b.i, &b.cache)
176		b.i++
177
178		// If there were deletes on the TSMReader, then our index is now off and we
179		// can't proceed.  What we just read may not actually the next block.
180		if b.n != b.r.KeyCount() {
181			b.err = fmt.Errorf("delete during iteration")
182			return false
183		}
184
185		if len(b.entries) > 0 {
186			return true
187		}
188	}
189
190	return false
191}
192
193// Read reads information about the next block to be iterated.
194func (b *BlockIterator) Read() (key []byte, minTime int64, maxTime int64, typ byte, checksum uint32, buf []byte, err error) {
195	if b.err != nil {
196		return nil, 0, 0, 0, 0, nil, b.err
197	}
198	checksum, buf, err = b.r.ReadBytes(&b.entries[0], nil)
199	if err != nil {
200		return nil, 0, 0, 0, 0, nil, err
201	}
202	return b.key, b.entries[0].MinTime, b.entries[0].MaxTime, b.typ, checksum, buf, err
203}
204
205// Err returns any errors encounter during iteration.
206func (b *BlockIterator) Err() error {
207	return b.err
208}
209
210// blockAccessor abstracts a method of accessing blocks from a
211// TSM file.
212type blockAccessor interface {
213	init() (*indirectIndex, error)
214	read(key []byte, timestamp int64) ([]Value, error)
215	readAll(key []byte) ([]Value, error)
216	readBlock(entry *IndexEntry, values []Value) ([]Value, error)
217	readFloatBlock(entry *IndexEntry, values *[]FloatValue) ([]FloatValue, error)
218	readIntegerBlock(entry *IndexEntry, values *[]IntegerValue) ([]IntegerValue, error)
219	readUnsignedBlock(entry *IndexEntry, values *[]UnsignedValue) ([]UnsignedValue, error)
220	readStringBlock(entry *IndexEntry, values *[]StringValue) ([]StringValue, error)
221	readBooleanBlock(entry *IndexEntry, values *[]BooleanValue) ([]BooleanValue, error)
222	readBytes(entry *IndexEntry, buf []byte) (uint32, []byte, error)
223	rename(path string) error
224	path() string
225	close() error
226	free() error
227}
228
229type tsmReaderOption func(*TSMReader)
230
231// WithMadviseWillNeed is an option for specifying whether to provide a MADV_WILL need hint to the kernel.
232var WithMadviseWillNeed = func(willNeed bool) tsmReaderOption {
233	return func(r *TSMReader) {
234		r.madviseWillNeed = willNeed
235	}
236}
237
238// NewTSMReader returns a new TSMReader from the given file.
239func NewTSMReader(f *os.File, options ...tsmReaderOption) (*TSMReader, error) {
240	t := &TSMReader{}
241	for _, option := range options {
242		option(t)
243	}
244
245	stat, err := f.Stat()
246	if err != nil {
247		return nil, err
248	}
249	t.size = stat.Size()
250	t.lastModified = stat.ModTime().UnixNano()
251	t.accessor = &mmapAccessor{
252		f:            f,
253		mmapWillNeed: t.madviseWillNeed,
254	}
255
256	index, err := t.accessor.init()
257	if err != nil {
258		return nil, err
259	}
260
261	t.index = index
262	t.tombstoner = NewTombstoner(t.Path(), index.ContainsKey)
263
264	if err := t.applyTombstones(); err != nil {
265		return nil, err
266	}
267
268	return t, nil
269}
270
271// WithObserver sets the observer for the TSM reader.
272func (t *TSMReader) WithObserver(obs tsdb.FileStoreObserver) {
273	t.tombstoner.WithObserver(obs)
274}
275
276func (t *TSMReader) applyTombstones() error {
277	var cur, prev Tombstone
278	batch := make([][]byte, 0, 4096)
279
280	if err := t.tombstoner.Walk(func(ts Tombstone) error {
281		cur = ts
282		if len(batch) > 0 {
283			if prev.Min != cur.Min || prev.Max != cur.Max {
284				t.index.DeleteRange(batch, prev.Min, prev.Max)
285				batch = batch[:0]
286			}
287		}
288
289		// Copy the tombstone key and re-use the buffers to avoid allocations
290		n := len(batch)
291		batch = batch[:n+1]
292		if cap(batch[n]) < len(ts.Key) {
293			batch[n] = make([]byte, len(ts.Key))
294		} else {
295			batch[n] = batch[n][:len(ts.Key)]
296		}
297		copy(batch[n], ts.Key)
298
299		if len(batch) >= 4096 {
300			t.index.DeleteRange(batch, prev.Min, prev.Max)
301			batch = batch[:0]
302		}
303
304		prev = ts
305		return nil
306	}); err != nil {
307		return fmt.Errorf("init: read tombstones: %v", err)
308	}
309
310	if len(batch) > 0 {
311		t.index.DeleteRange(batch, cur.Min, cur.Max)
312	}
313	return nil
314}
315
316func (t *TSMReader) Free() error {
317	t.mu.RLock()
318	defer t.mu.RUnlock()
319	return t.accessor.free()
320}
321
322// Path returns the path of the file the TSMReader was initialized with.
323func (t *TSMReader) Path() string {
324	t.mu.RLock()
325	p := t.accessor.path()
326	t.mu.RUnlock()
327	return p
328}
329
330// Key returns the key and the underlying entry at the numeric index.
331func (t *TSMReader) Key(index int, entries *[]IndexEntry) ([]byte, byte, []IndexEntry) {
332	return t.index.Key(index, entries)
333}
334
335// KeyAt returns the key and key type at position idx in the index.
336func (t *TSMReader) KeyAt(idx int) ([]byte, byte) {
337	return t.index.KeyAt(idx)
338}
339
340func (t *TSMReader) Seek(key []byte) int {
341	return t.index.Seek(key)
342}
343
344// ReadAt returns the values corresponding to the given index entry.
345func (t *TSMReader) ReadAt(entry *IndexEntry, vals []Value) ([]Value, error) {
346	t.mu.RLock()
347	v, err := t.accessor.readBlock(entry, vals)
348	t.mu.RUnlock()
349	return v, err
350}
351
352// ReadFloatBlockAt returns the float values corresponding to the given index entry.
353func (t *TSMReader) ReadFloatBlockAt(entry *IndexEntry, vals *[]FloatValue) ([]FloatValue, error) {
354	t.mu.RLock()
355	v, err := t.accessor.readFloatBlock(entry, vals)
356	t.mu.RUnlock()
357	return v, err
358}
359
360// ReadIntegerBlockAt returns the integer values corresponding to the given index entry.
361func (t *TSMReader) ReadIntegerBlockAt(entry *IndexEntry, vals *[]IntegerValue) ([]IntegerValue, error) {
362	t.mu.RLock()
363	v, err := t.accessor.readIntegerBlock(entry, vals)
364	t.mu.RUnlock()
365	return v, err
366}
367
368// ReadUnsignedBlockAt returns the unsigned integer values corresponding to the given index entry.
369func (t *TSMReader) ReadUnsignedBlockAt(entry *IndexEntry, vals *[]UnsignedValue) ([]UnsignedValue, error) {
370	t.mu.RLock()
371	v, err := t.accessor.readUnsignedBlock(entry, vals)
372	t.mu.RUnlock()
373	return v, err
374}
375
376// ReadStringBlockAt returns the string values corresponding to the given index entry.
377func (t *TSMReader) ReadStringBlockAt(entry *IndexEntry, vals *[]StringValue) ([]StringValue, error) {
378	t.mu.RLock()
379	v, err := t.accessor.readStringBlock(entry, vals)
380	t.mu.RUnlock()
381	return v, err
382}
383
384// ReadBooleanBlockAt returns the boolean values corresponding to the given index entry.
385func (t *TSMReader) ReadBooleanBlockAt(entry *IndexEntry, vals *[]BooleanValue) ([]BooleanValue, error) {
386	t.mu.RLock()
387	v, err := t.accessor.readBooleanBlock(entry, vals)
388	t.mu.RUnlock()
389	return v, err
390}
391
392// Read returns the values corresponding to the block at the given key and timestamp.
393func (t *TSMReader) Read(key []byte, timestamp int64) ([]Value, error) {
394	t.mu.RLock()
395	v, err := t.accessor.read(key, timestamp)
396	t.mu.RUnlock()
397	return v, err
398}
399
400// ReadAll returns all values for a key in all blocks.
401func (t *TSMReader) ReadAll(key []byte) ([]Value, error) {
402	t.mu.RLock()
403	v, err := t.accessor.readAll(key)
404	t.mu.RUnlock()
405	return v, err
406}
407
408func (t *TSMReader) ReadBytes(e *IndexEntry, b []byte) (uint32, []byte, error) {
409	t.mu.RLock()
410	n, v, err := t.accessor.readBytes(e, b)
411	t.mu.RUnlock()
412	return n, v, err
413}
414
415// Type returns the type of values stored at the given key.
416func (t *TSMReader) Type(key []byte) (byte, error) {
417	return t.index.Type(key)
418}
419
420// Close closes the TSMReader.
421func (t *TSMReader) Close() error {
422	t.refsWG.Wait()
423
424	t.mu.Lock()
425	defer t.mu.Unlock()
426
427	if err := t.accessor.close(); err != nil {
428		return err
429	}
430
431	return t.index.Close()
432}
433
434// Ref records a usage of this TSMReader.  If there are active references
435// when the reader is closed or removed, the reader will remain open until
436// there are no more references.
437func (t *TSMReader) Ref() {
438	atomic.AddInt64(&t.refs, 1)
439	t.refsWG.Add(1)
440}
441
442// Unref removes a usage record of this TSMReader.  If the Reader was closed
443// by another goroutine while there were active references, the file will
444// be closed and remove
445func (t *TSMReader) Unref() {
446	atomic.AddInt64(&t.refs, -1)
447	t.refsWG.Done()
448}
449
450// InUse returns whether the TSMReader currently has any active references.
451func (t *TSMReader) InUse() bool {
452	refs := atomic.LoadInt64(&t.refs)
453	return refs > 0
454}
455
456// Remove removes any underlying files stored on disk for this reader.
457func (t *TSMReader) Remove() error {
458	t.mu.Lock()
459	defer t.mu.Unlock()
460	return t.remove()
461}
462
463// Rename renames the underlying file to the new path.
464func (t *TSMReader) Rename(path string) error {
465	t.mu.Lock()
466	defer t.mu.Unlock()
467	return t.accessor.rename(path)
468}
469
470// Remove removes any underlying files stored on disk for this reader.
471func (t *TSMReader) remove() error {
472	path := t.accessor.path()
473
474	if t.InUse() {
475		return ErrFileInUse
476	}
477
478	if path != "" {
479		err := os.RemoveAll(path)
480		if err != nil {
481			return err
482		}
483	}
484
485	if err := t.tombstoner.Delete(); err != nil {
486		return err
487	}
488	return nil
489}
490
491// Contains returns whether the given key is present in the index.
492func (t *TSMReader) Contains(key []byte) bool {
493	return t.index.Contains(key)
494}
495
496// ContainsValue returns true if key and time might exists in this file.  This function could
497// return true even though the actual point does not exist.  For example, the key may
498// exist in this file, but not have a point exactly at time t.
499func (t *TSMReader) ContainsValue(key []byte, ts int64) bool {
500	return t.index.ContainsValue(key, ts)
501}
502
503// DeleteRange removes the given points for keys between minTime and maxTime.   The series
504// keys passed in must be sorted.
505func (t *TSMReader) DeleteRange(keys [][]byte, minTime, maxTime int64) error {
506	if len(keys) == 0 {
507		return nil
508	}
509
510	batch := t.BatchDelete()
511	if err := batch.DeleteRange(keys, minTime, maxTime); err != nil {
512		batch.Rollback()
513		return err
514	}
515	return batch.Commit()
516}
517
518// Delete deletes blocks indicated by keys.
519func (t *TSMReader) Delete(keys [][]byte) error {
520	if err := t.tombstoner.Add(keys); err != nil {
521		return err
522	}
523
524	if err := t.tombstoner.Flush(); err != nil {
525		return err
526	}
527
528	t.index.Delete(keys)
529	return nil
530}
531
532// OverlapsTimeRange returns true if the time range of the file intersect min and max.
533func (t *TSMReader) OverlapsTimeRange(min, max int64) bool {
534	return t.index.OverlapsTimeRange(min, max)
535}
536
537// OverlapsKeyRange returns true if the key range of the file intersect min and max.
538func (t *TSMReader) OverlapsKeyRange(min, max []byte) bool {
539	return t.index.OverlapsKeyRange(min, max)
540}
541
542// TimeRange returns the min and max time across all keys in the file.
543func (t *TSMReader) TimeRange() (int64, int64) {
544	return t.index.TimeRange()
545}
546
547// KeyRange returns the min and max key across all keys in the file.
548func (t *TSMReader) KeyRange() ([]byte, []byte) {
549	return t.index.KeyRange()
550}
551
552// KeyCount returns the count of unique keys in the TSMReader.
553func (t *TSMReader) KeyCount() int {
554	return t.index.KeyCount()
555}
556
557// Entries returns all index entries for key.
558func (t *TSMReader) Entries(key []byte) []IndexEntry {
559	return t.index.Entries(key)
560}
561
562// ReadEntries reads the index entries for key into entries.
563func (t *TSMReader) ReadEntries(key []byte, entries *[]IndexEntry) []IndexEntry {
564	return t.index.ReadEntries(key, entries)
565}
566
567// IndexSize returns the size of the index in bytes.
568func (t *TSMReader) IndexSize() uint32 {
569	return t.index.Size()
570}
571
572// Size returns the size of the underlying file in bytes.
573func (t *TSMReader) Size() uint32 {
574	t.mu.RLock()
575	size := t.size
576	t.mu.RUnlock()
577	return uint32(size)
578}
579
580// LastModified returns the last time the underlying file was modified.
581func (t *TSMReader) LastModified() int64 {
582	t.mu.RLock()
583	lm := t.lastModified
584	for _, ts := range t.tombstoner.TombstoneFiles() {
585		if ts.LastModified > lm {
586			lm = ts.LastModified
587		}
588	}
589	t.mu.RUnlock()
590	return lm
591}
592
593// HasTombstones return true if there are any tombstone entries recorded.
594func (t *TSMReader) HasTombstones() bool {
595	t.mu.RLock()
596	b := t.tombstoner.HasTombstones()
597	t.mu.RUnlock()
598	return b
599}
600
601// TombstoneFiles returns any tombstone files associated with this TSM file.
602func (t *TSMReader) TombstoneFiles() []FileStat {
603	t.mu.RLock()
604	fs := t.tombstoner.TombstoneFiles()
605	t.mu.RUnlock()
606	return fs
607}
608
609// TombstoneRange returns ranges of time that are deleted for the given key.
610func (t *TSMReader) TombstoneRange(key []byte) []TimeRange {
611	t.mu.RLock()
612	tr := t.index.TombstoneRange(key)
613	t.mu.RUnlock()
614	return tr
615}
616
617// Stats returns the FileStat for the TSMReader's underlying file.
618func (t *TSMReader) Stats() FileStat {
619	minTime, maxTime := t.index.TimeRange()
620	minKey, maxKey := t.index.KeyRange()
621	return FileStat{
622		Path:         t.Path(),
623		Size:         t.Size(),
624		LastModified: t.LastModified(),
625		MinTime:      minTime,
626		MaxTime:      maxTime,
627		MinKey:       minKey,
628		MaxKey:       maxKey,
629		HasTombstone: t.tombstoner.HasTombstones(),
630	}
631}
632
633// BlockIterator returns a BlockIterator for the underlying TSM file.
634func (t *TSMReader) BlockIterator() *BlockIterator {
635	return &BlockIterator{
636		r: t,
637		n: t.index.KeyCount(),
638	}
639}
640
641type BatchDeleter interface {
642	DeleteRange(keys [][]byte, min, max int64) error
643	Commit() error
644	Rollback() error
645}
646
647type batchDelete struct {
648	r *TSMReader
649}
650
651func (b *batchDelete) DeleteRange(keys [][]byte, minTime, maxTime int64) error {
652	if len(keys) == 0 {
653		return nil
654	}
655
656	// If the keys can't exist in this TSM file, skip it.
657	minKey, maxKey := keys[0], keys[len(keys)-1]
658	if !b.r.index.OverlapsKeyRange(minKey, maxKey) {
659		return nil
660	}
661
662	// If the timerange can't exist in this TSM file, skip it.
663	if !b.r.index.OverlapsTimeRange(minTime, maxTime) {
664		return nil
665	}
666
667	if err := b.r.tombstoner.AddRange(keys, minTime, maxTime); err != nil {
668		return err
669	}
670
671	return nil
672}
673
674func (b *batchDelete) Commit() error {
675	defer b.r.deleteMu.Unlock()
676	if err := b.r.tombstoner.Flush(); err != nil {
677		return err
678	}
679
680	return b.r.applyTombstones()
681}
682
683func (b *batchDelete) Rollback() error {
684	defer b.r.deleteMu.Unlock()
685	return b.r.tombstoner.Rollback()
686}
687
688// BatchDelete returns a BatchDeleter.  Only a single goroutine may run a BatchDelete at a time.
689// Callers must either Commit or Rollback the operation.
690func (r *TSMReader) BatchDelete() BatchDeleter {
691	r.deleteMu.Lock()
692	return &batchDelete{r: r}
693}
694
695type BatchDeleters []BatchDeleter
696
697func (a BatchDeleters) DeleteRange(keys [][]byte, min, max int64) error {
698	errC := make(chan error, len(a))
699	for _, b := range a {
700		go func(b BatchDeleter) { errC <- b.DeleteRange(keys, min, max) }(b)
701	}
702
703	var err error
704	for i := 0; i < len(a); i++ {
705		dErr := <-errC
706		if dErr != nil {
707			err = dErr
708		}
709	}
710	return err
711}
712
713func (a BatchDeleters) Commit() error {
714	errC := make(chan error, len(a))
715	for _, b := range a {
716		go func(b BatchDeleter) { errC <- b.Commit() }(b)
717	}
718
719	var err error
720	for i := 0; i < len(a); i++ {
721		dErr := <-errC
722		if dErr != nil {
723			err = dErr
724		}
725	}
726	return err
727}
728
729func (a BatchDeleters) Rollback() error {
730	errC := make(chan error, len(a))
731	for _, b := range a {
732		go func(b BatchDeleter) { errC <- b.Rollback() }(b)
733	}
734
735	var err error
736	for i := 0; i < len(a); i++ {
737		dErr := <-errC
738		if dErr != nil {
739			err = dErr
740		}
741	}
742	return err
743}
744
745// indirectIndex is a TSMIndex that uses a raw byte slice representation of an index.  This
746// implementation can be used for indexes that may be MMAPed into memory.
747type indirectIndex struct {
748	mu sync.RWMutex
749
750	// indirectIndex works a follows.  Assuming we have an index structure in memory as
751	// the diagram below:
752	//
753	// ┌────────────────────────────────────────────────────────────────────┐
754	// │                               Index                                │
755	// ├─┬──────────────────────┬──┬───────────────────────┬───┬────────────┘
756	// │0│                      │62│                       │145│
757	// ├─┴───────┬─────────┬────┼──┴──────┬─────────┬──────┼───┴─────┬──────┐
758	// │Key 1 Len│   Key   │... │Key 2 Len│  Key 2  │ ...  │  Key 3  │ ...  │
759	// │ 2 bytes │ N bytes │    │ 2 bytes │ N bytes │      │ 2 bytes │      │
760	// └─────────┴─────────┴────┴─────────┴─────────┴──────┴─────────┴──────┘
761
762	// We would build an `offsets` slices where each element pointers to the byte location
763	// for the first key in the index slice.
764
765	// ┌────────────────────────────────────────────────────────────────────┐
766	// │                              Offsets                               │
767	// ├────┬────┬────┬─────────────────────────────────────────────────────┘
768	// │ 0  │ 62 │145 │
769	// └────┴────┴────┘
770
771	// Using this offset slice we can find `Key 2` by doing a binary search
772	// over the offsets slice.  Instead of comparing the value in the offsets
773	// (e.g. `62`), we use that as an index into the underlying index to
774	// retrieve the key at postion `62` and perform our comparisons with that.
775
776	// When we have identified the correct position in the index for a given
777	// key, we could perform another binary search or a linear scan.  This
778	// should be fast as well since each index entry is 28 bytes and all
779	// contiguous in memory.  The current implementation uses a linear scan since the
780	// number of block entries is expected to be < 100 per key.
781
782	// b is the underlying index byte slice.  This could be a copy on the heap or an MMAP
783	// slice reference
784	b []byte
785
786	// offsets contains the positions in b for each key.  It points to the 2 byte length of
787	// key.
788	offsets []byte
789
790	// minKey, maxKey are the minium and maximum (lexicographically sorted) contained in the
791	// file
792	minKey, maxKey []byte
793
794	// minTime, maxTime are the minimum and maximum times contained in the file across all
795	// series.
796	minTime, maxTime int64
797
798	// tombstones contains only the tombstoned keys with subset of time values deleted.  An
799	// entry would exist here if a subset of the points for a key were deleted and the file
800	// had not be re-compacted to remove the points on disk.
801	tombstones map[string][]TimeRange
802}
803
804// TimeRange holds a min and max timestamp.
805type TimeRange struct {
806	Min, Max int64
807}
808
809func (t TimeRange) Overlaps(min, max int64) bool {
810	return t.Min <= max && t.Max >= min
811}
812
813// NewIndirectIndex returns a new indirect index.
814func NewIndirectIndex() *indirectIndex {
815	return &indirectIndex{
816		tombstones: make(map[string][]TimeRange),
817	}
818}
819
820func (d *indirectIndex) offset(i int) int {
821	if i < 0 || i+4 > len(d.offsets) {
822		return -1
823	}
824	return int(binary.BigEndian.Uint32(d.offsets[i*4 : i*4+4]))
825}
826
827func (d *indirectIndex) Seek(key []byte) int {
828	d.mu.RLock()
829	defer d.mu.RUnlock()
830	return d.searchOffset(key)
831}
832
833// searchOffset searches the offsets slice for key and returns the position in
834// offsets where key would exist.
835func (d *indirectIndex) searchOffset(key []byte) int {
836	// We use a binary search across our indirect offsets (pointers to all the keys
837	// in the index slice).
838	i := bytesutil.SearchBytesFixed(d.offsets, 4, func(x []byte) bool {
839		// i is the position in offsets we are at so get offset it points to
840		offset := int32(binary.BigEndian.Uint32(x))
841
842		// It's pointing to the start of the key which is a 2 byte length
843		keyLen := int32(binary.BigEndian.Uint16(d.b[offset : offset+2]))
844
845		// See if it matches
846		return bytes.Compare(d.b[offset+2:offset+2+keyLen], key) >= 0
847	})
848
849	// See if we might have found the right index
850	if i < len(d.offsets) {
851		return int(i / 4)
852	}
853
854	// The key is not in the index.  i is the index where it would be inserted so return
855	// a value outside our offset range.
856	return int(len(d.offsets)) / 4
857}
858
859// search returns the byte position of key in the index.  If key is not
860// in the index, len(index) is returned.
861func (d *indirectIndex) search(key []byte) int {
862	if !d.ContainsKey(key) {
863		return len(d.b)
864	}
865
866	// We use a binary search across our indirect offsets (pointers to all the keys
867	// in the index slice).
868	i := bytesutil.SearchBytesFixed(d.offsets, 4, func(x []byte) bool {
869		// i is the position in offsets we are at so get offset it points to
870		offset := int32(binary.BigEndian.Uint32(x))
871
872		// It's pointing to the start of the key which is a 2 byte length
873		keyLen := int32(binary.BigEndian.Uint16(d.b[offset : offset+2]))
874
875		// See if it matches
876		return bytes.Compare(d.b[offset+2:offset+2+keyLen], key) >= 0
877	})
878
879	// See if we might have found the right index
880	if i < len(d.offsets) {
881		ofs := binary.BigEndian.Uint32(d.offsets[i : i+4])
882		_, k := readKey(d.b[ofs:])
883
884		// The search may have returned an i == 0 which could indicated that the value
885		// searched should be inserted at postion 0.  Make sure the key in the index
886		// matches the search value.
887		if !bytes.Equal(key, k) {
888			return len(d.b)
889		}
890
891		return int(ofs)
892	}
893
894	// The key is not in the index.  i is the index where it would be inserted so return
895	// a value outside our offset range.
896	return len(d.b)
897}
898
899// ContainsKey returns true of key may exist in this index.
900func (d *indirectIndex) ContainsKey(key []byte) bool {
901	return bytes.Compare(key, d.minKey) >= 0 && bytes.Compare(key, d.maxKey) <= 0
902}
903
904// Entries returns all index entries for a key.
905func (d *indirectIndex) Entries(key []byte) []IndexEntry {
906	return d.ReadEntries(key, nil)
907}
908
909func (d *indirectIndex) readEntriesAt(ofs int, entries *[]IndexEntry) ([]byte, []IndexEntry) {
910	n, k := readKey(d.b[ofs:])
911
912	// Read and return all the entries
913	ofs += n
914	var ie indexEntries
915	if entries != nil {
916		ie.entries = *entries
917	}
918	if _, err := readEntries(d.b[ofs:], &ie); err != nil {
919		panic(fmt.Sprintf("error reading entries: %v", err))
920	}
921	if entries != nil {
922		*entries = ie.entries
923	}
924	return k, ie.entries
925}
926
927// ReadEntries returns all index entries for a key.
928func (d *indirectIndex) ReadEntries(key []byte, entries *[]IndexEntry) []IndexEntry {
929	d.mu.RLock()
930	defer d.mu.RUnlock()
931
932	ofs := d.search(key)
933	if ofs < len(d.b) {
934		k, entries := d.readEntriesAt(ofs, entries)
935		// The search may have returned an i == 0 which could indicated that the value
936		// searched should be inserted at position 0.  Make sure the key in the index
937		// matches the search value.
938		if !bytes.Equal(key, k) {
939			return nil
940		}
941
942		return entries
943	}
944
945	// The key is not in the index.  i is the index where it would be inserted.
946	return nil
947}
948
949// Entry returns the index entry for the specified key and timestamp.  If no entry
950// matches the key an timestamp, nil is returned.
951func (d *indirectIndex) Entry(key []byte, timestamp int64) *IndexEntry {
952	entries := d.Entries(key)
953	for _, entry := range entries {
954		if entry.Contains(timestamp) {
955			return &entry
956		}
957	}
958	return nil
959}
960
961// Key returns the key in the index at the given position.
962func (d *indirectIndex) Key(idx int, entries *[]IndexEntry) ([]byte, byte, []IndexEntry) {
963	d.mu.RLock()
964	defer d.mu.RUnlock()
965
966	if idx < 0 || idx*4+4 > len(d.offsets) {
967		return nil, 0, nil
968	}
969	ofs := binary.BigEndian.Uint32(d.offsets[idx*4 : idx*4+4])
970	n, key := readKey(d.b[ofs:])
971
972	typ := d.b[int(ofs)+n]
973
974	var ie indexEntries
975	if entries != nil {
976		ie.entries = *entries
977	}
978	if _, err := readEntries(d.b[int(ofs)+n:], &ie); err != nil {
979		return nil, 0, nil
980	}
981	if entries != nil {
982		*entries = ie.entries
983	}
984
985	return key, typ, ie.entries
986}
987
988// KeyAt returns the key in the index at the given position.
989func (d *indirectIndex) KeyAt(idx int) ([]byte, byte) {
990	d.mu.RLock()
991
992	if idx < 0 || idx*4+4 > len(d.offsets) {
993		d.mu.RUnlock()
994		return nil, 0
995	}
996	ofs := int32(binary.BigEndian.Uint32(d.offsets[idx*4 : idx*4+4]))
997
998	n, key := readKey(d.b[ofs:])
999	ofs = ofs + int32(n)
1000	typ := d.b[ofs]
1001	d.mu.RUnlock()
1002	return key, typ
1003}
1004
1005// KeyCount returns the count of unique keys in the index.
1006func (d *indirectIndex) KeyCount() int {
1007	d.mu.RLock()
1008	n := len(d.offsets) / 4
1009	d.mu.RUnlock()
1010	return n
1011}
1012
1013// Delete removes the given keys from the index.
1014func (d *indirectIndex) Delete(keys [][]byte) {
1015	if len(keys) == 0 {
1016		return
1017	}
1018
1019	if !bytesutil.IsSorted(keys) {
1020		bytesutil.Sort(keys)
1021	}
1022
1023	// Both keys and offsets are sorted.  Walk both in order and skip
1024	// any keys that exist in both.
1025	d.mu.Lock()
1026	start := d.searchOffset(keys[0])
1027	for i := start * 4; i+4 <= len(d.offsets) && len(keys) > 0; i += 4 {
1028		offset := binary.BigEndian.Uint32(d.offsets[i : i+4])
1029		_, indexKey := readKey(d.b[offset:])
1030
1031		for len(keys) > 0 && bytes.Compare(keys[0], indexKey) < 0 {
1032			keys = keys[1:]
1033		}
1034
1035		if len(keys) > 0 && bytes.Equal(keys[0], indexKey) {
1036			keys = keys[1:]
1037			copy(d.offsets[i:i+4], nilOffset[:])
1038		}
1039	}
1040	d.offsets = bytesutil.Pack(d.offsets, 4, 255)
1041	d.mu.Unlock()
1042}
1043
1044// DeleteRange removes the given keys with data between minTime and maxTime from the index.
1045func (d *indirectIndex) DeleteRange(keys [][]byte, minTime, maxTime int64) {
1046	// No keys, nothing to do
1047	if len(keys) == 0 {
1048		return
1049	}
1050
1051	if !bytesutil.IsSorted(keys) {
1052		bytesutil.Sort(keys)
1053	}
1054
1055	// If we're deleting the max time range, just use tombstoning to remove the
1056	// key from the offsets slice
1057	if minTime == math.MinInt64 && maxTime == math.MaxInt64 {
1058		d.Delete(keys)
1059		return
1060	}
1061
1062	// Is the range passed in outside of the time range for the file?
1063	min, max := d.TimeRange()
1064	if minTime > max || maxTime < min {
1065		return
1066	}
1067
1068	fullKeys := make([][]byte, 0, len(keys))
1069	tombstones := map[string][]TimeRange{}
1070	var ie []IndexEntry
1071
1072	for i := 0; len(keys) > 0 && i < d.KeyCount(); i++ {
1073		k, entries := d.readEntriesAt(d.offset(i), &ie)
1074
1075		// Skip any keys that don't exist.  These are less than the current key.
1076		for len(keys) > 0 && bytes.Compare(keys[0], k) < 0 {
1077			keys = keys[1:]
1078		}
1079
1080		// No more keys to delete, we're done.
1081		if len(keys) == 0 {
1082			break
1083		}
1084
1085		// If the current key is greater than the index one, continue to the next
1086		// index key.
1087		if len(keys) > 0 && bytes.Compare(keys[0], k) > 0 {
1088			continue
1089		}
1090
1091		// If multiple tombstones are saved for the same key
1092		if len(entries) == 0 {
1093			continue
1094		}
1095
1096		// Is the time range passed outside of the time range we've have stored for this key?
1097		min, max := entries[0].MinTime, entries[len(entries)-1].MaxTime
1098		if minTime > max || maxTime < min {
1099			continue
1100		}
1101
1102		// Does the range passed in cover every value for the key?
1103		if minTime <= min && maxTime >= max {
1104			fullKeys = append(fullKeys, keys[0])
1105			keys = keys[1:]
1106			continue
1107		}
1108
1109		d.mu.RLock()
1110		existing := d.tombstones[string(k)]
1111		d.mu.RUnlock()
1112
1113		// Append the new tombonstes to the existing ones
1114		newTs := append(existing, append(tombstones[string(k)], TimeRange{minTime, maxTime})...)
1115		fn := func(i, j int) bool {
1116			a, b := newTs[i], newTs[j]
1117			if a.Min == b.Min {
1118				return a.Max <= b.Max
1119			}
1120			return a.Min < b.Min
1121		}
1122
1123		// Sort the updated tombstones if necessary
1124		if len(newTs) > 1 && !sort.SliceIsSorted(newTs, fn) {
1125			sort.Slice(newTs, fn)
1126		}
1127
1128		tombstones[string(k)] = newTs
1129
1130		// We need to see if all the tombstones end up deleting the entire series.  This
1131		// could happen if their is one tombstore with min,max time spanning all the block
1132		// time ranges or from multiple smaller tombstones the delete segments.  To detect
1133		// this cases, we use a window starting at the first tombstone and grow it be each
1134		// tombstone that is immediately adjacent to the current window or if it overlaps.
1135		// If there are any gaps, we abort.
1136		minTs, maxTs := newTs[0].Min, newTs[0].Max
1137		for j := 1; j < len(newTs); j++ {
1138			prevTs := newTs[j-1]
1139			ts := newTs[j]
1140
1141			// Make sure all the tombstone line up for a continuous range.  We don't
1142			// want to have two small deletes on each edges end up causing us to
1143			// remove the full key.
1144			if prevTs.Max != ts.Min-1 && !prevTs.Overlaps(ts.Min, ts.Max) {
1145				minTs, maxTs = int64(math.MaxInt64), int64(math.MinInt64)
1146				break
1147			}
1148
1149			if ts.Min < minTs {
1150				minTs = ts.Min
1151			}
1152			if ts.Max > maxTs {
1153				maxTs = ts.Max
1154			}
1155		}
1156
1157		// If we have a fully deleted series, delete it all of it.
1158		if minTs <= min && maxTs >= max {
1159			fullKeys = append(fullKeys, keys[0])
1160			keys = keys[1:]
1161			continue
1162		}
1163	}
1164
1165	// Delete all the keys that fully deleted in bulk
1166	if len(fullKeys) > 0 {
1167		d.Delete(fullKeys)
1168	}
1169
1170	if len(tombstones) == 0 {
1171		return
1172	}
1173
1174	d.mu.Lock()
1175	for k, v := range tombstones {
1176		d.tombstones[k] = v
1177	}
1178	d.mu.Unlock()
1179}
1180
1181// TombstoneRange returns ranges of time that are deleted for the given key.
1182func (d *indirectIndex) TombstoneRange(key []byte) []TimeRange {
1183	d.mu.RLock()
1184	r := d.tombstones[string(key)]
1185	d.mu.RUnlock()
1186	return r
1187}
1188
1189// Contains return true if the given key exists in the index.
1190func (d *indirectIndex) Contains(key []byte) bool {
1191	return len(d.Entries(key)) > 0
1192}
1193
1194// ContainsValue returns true if key and time might exist in this file.
1195func (d *indirectIndex) ContainsValue(key []byte, timestamp int64) bool {
1196	entry := d.Entry(key, timestamp)
1197	if entry == nil {
1198		return false
1199	}
1200
1201	d.mu.RLock()
1202	tombstones := d.tombstones[string(key)]
1203	d.mu.RUnlock()
1204
1205	for _, t := range tombstones {
1206		if t.Min <= timestamp && t.Max >= timestamp {
1207			return false
1208		}
1209	}
1210	return true
1211}
1212
1213// Type returns the block type of the values stored for the key.
1214func (d *indirectIndex) Type(key []byte) (byte, error) {
1215	d.mu.RLock()
1216	defer d.mu.RUnlock()
1217
1218	ofs := d.search(key)
1219	if ofs < len(d.b) {
1220		n, _ := readKey(d.b[ofs:])
1221		ofs += n
1222		return d.b[ofs], nil
1223	}
1224	return 0, fmt.Errorf("key does not exist: %s", key)
1225}
1226
1227// OverlapsTimeRange returns true if the time range of the file intersect min and max.
1228func (d *indirectIndex) OverlapsTimeRange(min, max int64) bool {
1229	return d.minTime <= max && d.maxTime >= min
1230}
1231
1232// OverlapsKeyRange returns true if the min and max keys of the file overlap the arguments min and max.
1233func (d *indirectIndex) OverlapsKeyRange(min, max []byte) bool {
1234	return bytes.Compare(d.minKey, max) <= 0 && bytes.Compare(d.maxKey, min) >= 0
1235}
1236
1237// KeyRange returns the min and max keys in the index.
1238func (d *indirectIndex) KeyRange() ([]byte, []byte) {
1239	return d.minKey, d.maxKey
1240}
1241
1242// TimeRange returns the min and max time across all keys in the index.
1243func (d *indirectIndex) TimeRange() (int64, int64) {
1244	return d.minTime, d.maxTime
1245}
1246
1247// MarshalBinary returns a byte slice encoded version of the index.
1248func (d *indirectIndex) MarshalBinary() ([]byte, error) {
1249	d.mu.RLock()
1250	defer d.mu.RUnlock()
1251
1252	return d.b, nil
1253}
1254
1255// UnmarshalBinary populates an index from an encoded byte slice
1256// representation of an index.
1257func (d *indirectIndex) UnmarshalBinary(b []byte) error {
1258	d.mu.Lock()
1259	defer d.mu.Unlock()
1260
1261	// Keep a reference to the actual index bytes
1262	d.b = b
1263	if len(b) == 0 {
1264		return nil
1265	}
1266
1267	//var minKey, maxKey []byte
1268	var minTime, maxTime int64 = math.MaxInt64, 0
1269
1270	// To create our "indirect" index, we need to find the location of all the keys in
1271	// the raw byte slice.  The keys are listed once each (in sorted order).  Following
1272	// each key is a time ordered list of index entry blocks for that key.  The loop below
1273	// basically skips across the slice keeping track of the counter when we are at a key
1274	// field.
1275	var i int32
1276	var offsets []int32
1277	iMax := int32(len(b))
1278	for i < iMax {
1279		offsets = append(offsets, i)
1280
1281		// Skip to the start of the values
1282		// key length value (2) + type (1) + length of key
1283		if i+2 >= iMax {
1284			return fmt.Errorf("indirectIndex: not enough data for key length value")
1285		}
1286		i += 3 + int32(binary.BigEndian.Uint16(b[i:i+2]))
1287
1288		// count of index entries
1289		if i+indexCountSize >= iMax {
1290			return fmt.Errorf("indirectIndex: not enough data for index entries count")
1291		}
1292		count := int32(binary.BigEndian.Uint16(b[i : i+indexCountSize]))
1293		i += indexCountSize
1294
1295		// Find the min time for the block
1296		if i+8 >= iMax {
1297			return fmt.Errorf("indirectIndex: not enough data for min time")
1298		}
1299		minT := int64(binary.BigEndian.Uint64(b[i : i+8]))
1300		if minT < minTime {
1301			minTime = minT
1302		}
1303
1304		i += (count - 1) * indexEntrySize
1305
1306		// Find the max time for the block
1307		if i+16 >= iMax {
1308			return fmt.Errorf("indirectIndex: not enough data for max time")
1309		}
1310		maxT := int64(binary.BigEndian.Uint64(b[i+8 : i+16]))
1311		if maxT > maxTime {
1312			maxTime = maxT
1313		}
1314
1315		i += indexEntrySize
1316	}
1317
1318	firstOfs := offsets[0]
1319	_, key := readKey(b[firstOfs:])
1320	d.minKey = key
1321
1322	lastOfs := offsets[len(offsets)-1]
1323	_, key = readKey(b[lastOfs:])
1324	d.maxKey = key
1325
1326	d.minTime = minTime
1327	d.maxTime = maxTime
1328
1329	var err error
1330	d.offsets, err = mmap(nil, 0, len(offsets)*4)
1331	if err != nil {
1332		return err
1333	}
1334	for i, v := range offsets {
1335		binary.BigEndian.PutUint32(d.offsets[i*4:i*4+4], uint32(v))
1336	}
1337
1338	return nil
1339}
1340
1341// Size returns the size of the current index in bytes.
1342func (d *indirectIndex) Size() uint32 {
1343	d.mu.RLock()
1344	defer d.mu.RUnlock()
1345
1346	return uint32(len(d.b))
1347}
1348
1349func (d *indirectIndex) Close() error {
1350	// Windows doesn't use the anonymous map for the offsets index
1351	if runtime.GOOS == "windows" {
1352		return nil
1353	}
1354	return munmap(d.offsets[:cap(d.offsets)])
1355}
1356
1357// mmapAccess is mmap based block accessor.  It access blocks through an
1358// MMAP file interface.
1359type mmapAccessor struct {
1360	accessCount uint64 // Counter incremented everytime the mmapAccessor is accessed
1361	freeCount   uint64 // Counter to determine whether the accessor can free its resources
1362
1363	mmapWillNeed bool // If true then mmap advise value MADV_WILLNEED will be provided the kernel for b.
1364
1365	mu sync.RWMutex
1366	b  []byte
1367	f  *os.File
1368
1369	index *indirectIndex
1370}
1371
1372func (m *mmapAccessor) init() (*indirectIndex, error) {
1373	m.mu.Lock()
1374	defer m.mu.Unlock()
1375
1376	if err := verifyVersion(m.f); err != nil {
1377		return nil, err
1378	}
1379
1380	var err error
1381
1382	if _, err := m.f.Seek(0, 0); err != nil {
1383		return nil, err
1384	}
1385
1386	stat, err := m.f.Stat()
1387	if err != nil {
1388		return nil, err
1389	}
1390
1391	m.b, err = mmap(m.f, 0, int(stat.Size()))
1392	if err != nil {
1393		return nil, err
1394	}
1395	if len(m.b) < 8 {
1396		return nil, fmt.Errorf("mmapAccessor: byte slice too small for indirectIndex")
1397	}
1398
1399	// Hint to the kernel that we will be reading the file.  It would be better to hint
1400	// that we will be reading the index section, but that's not been
1401	// implemented as yet.
1402	if m.mmapWillNeed {
1403		if err := madviseWillNeed(m.b); err != nil {
1404			return nil, err
1405		}
1406	}
1407
1408	indexOfsPos := len(m.b) - 8
1409	indexStart := binary.BigEndian.Uint64(m.b[indexOfsPos : indexOfsPos+8])
1410	if indexStart >= uint64(indexOfsPos) {
1411		return nil, fmt.Errorf("mmapAccessor: invalid indexStart")
1412	}
1413
1414	m.index = NewIndirectIndex()
1415	if err := m.index.UnmarshalBinary(m.b[indexStart:indexOfsPos]); err != nil {
1416		return nil, err
1417	}
1418
1419	// Allow resources to be freed immediately if requested
1420	m.incAccess()
1421	atomic.StoreUint64(&m.freeCount, 1)
1422
1423	return m.index, nil
1424}
1425
1426func (m *mmapAccessor) free() error {
1427	accessCount := atomic.LoadUint64(&m.accessCount)
1428	freeCount := atomic.LoadUint64(&m.freeCount)
1429
1430	// Already freed everything.
1431	if freeCount == 0 && accessCount == 0 {
1432		return nil
1433	}
1434
1435	// Were there accesses after the last time we tried to free?
1436	// If so, don't free anything and record the access count that we
1437	// see now for the next check.
1438	if accessCount != freeCount {
1439		atomic.StoreUint64(&m.freeCount, accessCount)
1440		return nil
1441	}
1442
1443	// Reset both counters to zero to indicate that we have freed everything.
1444	atomic.StoreUint64(&m.accessCount, 0)
1445	atomic.StoreUint64(&m.freeCount, 0)
1446
1447	m.mu.RLock()
1448	defer m.mu.RUnlock()
1449
1450	return madviseDontNeed(m.b)
1451}
1452
1453func (m *mmapAccessor) incAccess() {
1454	atomic.AddUint64(&m.accessCount, 1)
1455}
1456
1457func (m *mmapAccessor) rename(path string) error {
1458	m.incAccess()
1459
1460	m.mu.Lock()
1461	defer m.mu.Unlock()
1462
1463	err := munmap(m.b)
1464	if err != nil {
1465		return err
1466	}
1467
1468	if err := m.f.Close(); err != nil {
1469		return err
1470	}
1471
1472	if err := renameFile(m.f.Name(), path); err != nil {
1473		return err
1474	}
1475
1476	m.f, err = os.Open(path)
1477	if err != nil {
1478		return err
1479	}
1480
1481	if _, err := m.f.Seek(0, 0); err != nil {
1482		return err
1483	}
1484
1485	stat, err := m.f.Stat()
1486	if err != nil {
1487		return err
1488	}
1489
1490	m.b, err = mmap(m.f, 0, int(stat.Size()))
1491	if err != nil {
1492		return err
1493	}
1494
1495	if m.mmapWillNeed {
1496		return madviseWillNeed(m.b)
1497	}
1498	return nil
1499}
1500
1501func (m *mmapAccessor) read(key []byte, timestamp int64) ([]Value, error) {
1502	entry := m.index.Entry(key, timestamp)
1503	if entry == nil {
1504		return nil, nil
1505	}
1506
1507	return m.readBlock(entry, nil)
1508}
1509
1510func (m *mmapAccessor) readBlock(entry *IndexEntry, values []Value) ([]Value, error) {
1511	m.incAccess()
1512
1513	m.mu.RLock()
1514	defer m.mu.RUnlock()
1515
1516	if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
1517		return nil, ErrTSMClosed
1518	}
1519	//TODO: Validate checksum
1520	var err error
1521	values, err = DecodeBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values)
1522	if err != nil {
1523		return nil, err
1524	}
1525
1526	return values, nil
1527}
1528
1529func (m *mmapAccessor) readFloatBlock(entry *IndexEntry, values *[]FloatValue) ([]FloatValue, error) {
1530	m.incAccess()
1531
1532	m.mu.RLock()
1533	if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
1534		m.mu.RUnlock()
1535		return nil, ErrTSMClosed
1536	}
1537
1538	a, err := DecodeFloatBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values)
1539	m.mu.RUnlock()
1540
1541	if err != nil {
1542		return nil, err
1543	}
1544
1545	return a, nil
1546}
1547
1548func (m *mmapAccessor) readIntegerBlock(entry *IndexEntry, values *[]IntegerValue) ([]IntegerValue, error) {
1549	m.incAccess()
1550
1551	m.mu.RLock()
1552	if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
1553		m.mu.RUnlock()
1554		return nil, ErrTSMClosed
1555	}
1556
1557	a, err := DecodeIntegerBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values)
1558	m.mu.RUnlock()
1559
1560	if err != nil {
1561		return nil, err
1562	}
1563
1564	return a, nil
1565}
1566
1567func (m *mmapAccessor) readUnsignedBlock(entry *IndexEntry, values *[]UnsignedValue) ([]UnsignedValue, error) {
1568	m.incAccess()
1569
1570	m.mu.RLock()
1571	if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
1572		m.mu.RUnlock()
1573		return nil, ErrTSMClosed
1574	}
1575
1576	a, err := DecodeUnsignedBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values)
1577	m.mu.RUnlock()
1578
1579	if err != nil {
1580		return nil, err
1581	}
1582
1583	return a, nil
1584}
1585
1586func (m *mmapAccessor) readStringBlock(entry *IndexEntry, values *[]StringValue) ([]StringValue, error) {
1587	m.incAccess()
1588
1589	m.mu.RLock()
1590	if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
1591		m.mu.RUnlock()
1592		return nil, ErrTSMClosed
1593	}
1594
1595	a, err := DecodeStringBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values)
1596	m.mu.RUnlock()
1597
1598	if err != nil {
1599		return nil, err
1600	}
1601
1602	return a, nil
1603}
1604
1605func (m *mmapAccessor) readBooleanBlock(entry *IndexEntry, values *[]BooleanValue) ([]BooleanValue, error) {
1606	m.incAccess()
1607
1608	m.mu.RLock()
1609	if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
1610		m.mu.RUnlock()
1611		return nil, ErrTSMClosed
1612	}
1613
1614	a, err := DecodeBooleanBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values)
1615	m.mu.RUnlock()
1616
1617	if err != nil {
1618		return nil, err
1619	}
1620
1621	return a, nil
1622}
1623
1624func (m *mmapAccessor) readBytes(entry *IndexEntry, b []byte) (uint32, []byte, error) {
1625	m.incAccess()
1626
1627	m.mu.RLock()
1628	if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
1629		m.mu.RUnlock()
1630		return 0, nil, ErrTSMClosed
1631	}
1632
1633	// return the bytes after the 4 byte checksum
1634	crc, block := binary.BigEndian.Uint32(m.b[entry.Offset:entry.Offset+4]), m.b[entry.Offset+4:entry.Offset+int64(entry.Size)]
1635	m.mu.RUnlock()
1636
1637	return crc, block, nil
1638}
1639
1640// readAll returns all values for a key in all blocks.
1641func (m *mmapAccessor) readAll(key []byte) ([]Value, error) {
1642	m.incAccess()
1643
1644	blocks := m.index.Entries(key)
1645	if len(blocks) == 0 {
1646		return nil, nil
1647	}
1648
1649	tombstones := m.index.TombstoneRange(key)
1650
1651	m.mu.RLock()
1652	defer m.mu.RUnlock()
1653
1654	var temp []Value
1655	var err error
1656	var values []Value
1657	for _, block := range blocks {
1658		var skip bool
1659		for _, t := range tombstones {
1660			// Should we skip this block because it contains points that have been deleted
1661			if t.Min <= block.MinTime && t.Max >= block.MaxTime {
1662				skip = true
1663				break
1664			}
1665		}
1666
1667		if skip {
1668			continue
1669		}
1670		//TODO: Validate checksum
1671		temp = temp[:0]
1672		// The +4 is the 4 byte checksum length
1673		temp, err = DecodeBlock(m.b[block.Offset+4:block.Offset+int64(block.Size)], temp)
1674		if err != nil {
1675			return nil, err
1676		}
1677
1678		// Filter out any values that were deleted
1679		for _, t := range tombstones {
1680			temp = Values(temp).Exclude(t.Min, t.Max)
1681		}
1682
1683		values = append(values, temp...)
1684	}
1685
1686	return values, nil
1687}
1688
1689func (m *mmapAccessor) path() string {
1690	m.mu.RLock()
1691	path := m.f.Name()
1692	m.mu.RUnlock()
1693	return path
1694}
1695
1696func (m *mmapAccessor) close() error {
1697	m.mu.Lock()
1698	defer m.mu.Unlock()
1699
1700	if m.b == nil {
1701		return nil
1702	}
1703
1704	err := munmap(m.b)
1705	if err != nil {
1706		return err
1707	}
1708
1709	m.b = nil
1710	return m.f.Close()
1711}
1712
1713type indexEntries struct {
1714	Type    byte
1715	entries []IndexEntry
1716}
1717
1718func (a *indexEntries) Len() int      { return len(a.entries) }
1719func (a *indexEntries) Swap(i, j int) { a.entries[i], a.entries[j] = a.entries[j], a.entries[i] }
1720func (a *indexEntries) Less(i, j int) bool {
1721	return a.entries[i].MinTime < a.entries[j].MinTime
1722}
1723
1724func (a *indexEntries) MarshalBinary() ([]byte, error) {
1725	buf := make([]byte, len(a.entries)*indexEntrySize)
1726
1727	for i, entry := range a.entries {
1728		entry.AppendTo(buf[indexEntrySize*i:])
1729	}
1730
1731	return buf, nil
1732}
1733
1734func (a *indexEntries) WriteTo(w io.Writer) (total int64, err error) {
1735	var buf [indexEntrySize]byte
1736	var n int
1737
1738	for _, entry := range a.entries {
1739		entry.AppendTo(buf[:])
1740		n, err = w.Write(buf[:])
1741		total += int64(n)
1742		if err != nil {
1743			return total, err
1744		}
1745	}
1746
1747	return total, nil
1748}
1749
1750func readKey(b []byte) (n int, key []byte) {
1751	// 2 byte size of key
1752	n, size := 2, int(binary.BigEndian.Uint16(b[:2]))
1753
1754	// N byte key
1755	key = b[n : n+size]
1756
1757	n += len(key)
1758	return
1759}
1760
1761func readEntries(b []byte, entries *indexEntries) (n int, err error) {
1762	if len(b) < 1+indexCountSize {
1763		return 0, fmt.Errorf("readEntries: data too short for headers")
1764	}
1765
1766	// 1 byte block type
1767	entries.Type = b[n]
1768	n++
1769
1770	// 2 byte count of index entries
1771	count := int(binary.BigEndian.Uint16(b[n : n+indexCountSize]))
1772	n += indexCountSize
1773
1774	if cap(entries.entries) < count {
1775		entries.entries = make([]IndexEntry, count)
1776	} else {
1777		entries.entries = entries.entries[:count]
1778	}
1779
1780	b = b[indexCountSize+indexTypeSize:]
1781	for i := 0; i < len(entries.entries); i++ {
1782		if err = entries.entries[i].UnmarshalBinary(b); err != nil {
1783			return 0, fmt.Errorf("readEntries: unmarshal error: %v", err)
1784		}
1785		b = b[indexEntrySize:]
1786	}
1787
1788	n += count * indexEntrySize
1789
1790	return
1791}
1792