1package tsm1
2
3/*
4A TSM file is composed for four sections: header, blocks, index and the footer.
5
6┌────────┬────────────────────────────────────┬─────────────┬──────────────┐
7│ Header │               Blocks               │    Index    │    Footer    │
8│5 bytes │              N bytes               │   N bytes   │   4 bytes    │
9└────────┴────────────────────────────────────┴─────────────┴──────────────┘
10
11Header is composed of a magic number to identify the file type and a version
12number.
13
14┌───────────────────┐
15│      Header       │
16├─────────┬─────────┤
17│  Magic  │ Version │
18│ 4 bytes │ 1 byte  │
19└─────────┴─────────┘
20
21Blocks are sequences of pairs of CRC32 and data.  The block data is opaque to the
22file.  The CRC32 is used for block level error detection.  The length of the blocks
23is stored in the index.
24
25┌───────────────────────────────────────────────────────────┐
26│                          Blocks                           │
27├───────────────────┬───────────────────┬───────────────────┤
28│      Block 1      │      Block 2      │      Block N      │
29├─────────┬─────────┼─────────┬─────────┼─────────┬─────────┤
30│  CRC    │  Data   │  CRC    │  Data   │  CRC    │  Data   │
31│ 4 bytes │ N bytes │ 4 bytes │ N bytes │ 4 bytes │ N bytes │
32└─────────┴─────────┴─────────┴─────────┴─────────┴─────────┘
33
34Following the blocks is the index for the blocks in the file.  The index is
35composed of a sequence of index entries ordered lexicographically by key and
36then by time.  Each index entry starts with a key length and key followed by a
37count of the number of blocks in the file.  Each block entry is composed of
38the min and max time for the block, the offset into the file where the block
39is located and the the size of the block.
40
41The index structure can provide efficient access to all blocks as well as the
42ability to determine the cost associated with acessing a given key.  Given a key
43and timestamp, we can determine whether a file contains the block for that
44timestamp as well as where that block resides and how much data to read to
45retrieve the block.  If we know we need to read all or multiple blocks in a
46file, we can use the size to determine how much to read in a given IO.
47
48┌────────────────────────────────────────────────────────────────────────────┐
49│                                   Index                                    │
50├─────────┬─────────┬──────┬───────┬─────────┬─────────┬────────┬────────┬───┤
51│ Key Len │   Key   │ Type │ Count │Min Time │Max Time │ Offset │  Size  │...│
52│ 2 bytes │ N bytes │1 byte│2 bytes│ 8 bytes │ 8 bytes │8 bytes │4 bytes │   │
53└─────────┴─────────┴──────┴───────┴─────────┴─────────┴────────┴────────┴───┘
54
55The last section is the footer that stores the offset of the start of the index.
56
57┌─────────┐
58│ Footer  │
59├─────────┤
60│Index Ofs│
61│ 8 bytes │
62└─────────┘
63*/
64
65import (
66	"bufio"
67	"bytes"
68	"encoding/binary"
69	"fmt"
70	"hash/crc32"
71	"io"
72	"os"
73	"sort"
74	"strings"
75	"time"
76)
77
78const (
79	// MagicNumber is written as the first 4 bytes of a data file to
80	// identify the file as a tsm1 formatted file
81	MagicNumber uint32 = 0x16D116D1
82
83	// Version indicates the version of the TSM file format.
84	Version byte = 1
85
86	// Size in bytes of an index entry
87	indexEntrySize = 28
88
89	// Size in bytes used to store the count of index entries for a key
90	indexCountSize = 2
91
92	// Size in bytes used to store the type of block encoded
93	indexTypeSize = 1
94
95	// Max number of blocks for a given key that can exist in a single file
96	maxIndexEntries = (1 << (indexCountSize * 8)) - 1
97
98	// max length of a key in an index entry (measurement + tags)
99	maxKeyLength = (1 << (2 * 8)) - 1
100
101	// The threshold amount data written before we periodically fsync a TSM file.  This helps avoid
102	// long pauses due to very large fsyncs at the end of writing a TSM file.
103	fsyncEvery = 25 * 1024 * 1024
104)
105
106var (
107	//ErrNoValues is returned when TSMWriter.WriteIndex is called and there are no values to write.
108	ErrNoValues = fmt.Errorf("no values written")
109
110	// ErrTSMClosed is returned when performing an operation against a closed TSM file.
111	ErrTSMClosed = fmt.Errorf("tsm file closed")
112
113	// ErrMaxKeyLengthExceeded is returned when attempting to write a key that is too long.
114	ErrMaxKeyLengthExceeded = fmt.Errorf("max key length exceeded")
115
116	// ErrMaxBlocksExceeded is returned when attempting to write a block past the allowed number.
117	ErrMaxBlocksExceeded = fmt.Errorf("max blocks exceeded")
118)
119
120// TSMWriter writes TSM formatted key and values.
121type TSMWriter interface {
122	// Write writes a new block for key containing and values.  Writes append
123	// blocks in the order that the Write function is called.  The caller is
124	// responsible for ensuring keys and blocks are sorted appropriately.
125	// Values are encoded as a full block.  The caller is responsible for
126	// ensuring a fixed number of values are encoded in each block as well as
127	// ensuring the Values are sorted. The first and last timestamp values are
128	// used as the minimum and maximum values for the index entry.
129	Write(key []byte, values Values) error
130
131	// WriteBlock writes a new block for key containing the bytes in block.  WriteBlock appends
132	// blocks in the order that the WriteBlock function is called.  The caller is
133	// responsible for ensuring keys and blocks are sorted appropriately, and that the
134	// block and index information is correct for the block.  The minTime and maxTime
135	// timestamp values are used as the minimum and maximum values for the index entry.
136	WriteBlock(key []byte, minTime, maxTime int64, block []byte) error
137
138	// WriteIndex finishes the TSM write streams and writes the index.
139	WriteIndex() error
140
141	// Flushes flushes all pending changes to the underlying file resources.
142	Flush() error
143
144	// Close closes any underlying file resources.
145	Close() error
146
147	// Size returns the current size in bytes of the file.
148	Size() uint32
149
150	Remove() error
151}
152
153// IndexWriter writes a TSMIndex.
154type IndexWriter interface {
155	// Add records a new block entry for a key in the index.
156	Add(key []byte, blockType byte, minTime, maxTime int64, offset int64, size uint32)
157
158	// Entries returns all index entries for a key.
159	Entries(key []byte) []IndexEntry
160
161	// KeyCount returns the count of unique keys in the index.
162	KeyCount() int
163
164	// Size returns the size of a the current index in bytes.
165	Size() uint32
166
167	// MarshalBinary returns a byte slice encoded version of the index.
168	MarshalBinary() ([]byte, error)
169
170	// WriteTo writes the index contents to a writer.
171	WriteTo(w io.Writer) (int64, error)
172
173	Close() error
174
175	Remove() error
176}
177
178// IndexEntry is the index information for a given block in a TSM file.
179type IndexEntry struct {
180	// The min and max time of all points stored in the block.
181	MinTime, MaxTime int64
182
183	// The absolute position in the file where this block is located.
184	Offset int64
185
186	// The size in bytes of the block in the file.
187	Size uint32
188}
189
190// UnmarshalBinary decodes an IndexEntry from a byte slice.
191func (e *IndexEntry) UnmarshalBinary(b []byte) error {
192	if len(b) < indexEntrySize {
193		return fmt.Errorf("unmarshalBinary: short buf: %v < %v", len(b), indexEntrySize)
194	}
195	e.MinTime = int64(binary.BigEndian.Uint64(b[:8]))
196	e.MaxTime = int64(binary.BigEndian.Uint64(b[8:16]))
197	e.Offset = int64(binary.BigEndian.Uint64(b[16:24]))
198	e.Size = binary.BigEndian.Uint32(b[24:28])
199	return nil
200}
201
202// AppendTo writes a binary-encoded version of IndexEntry to b, allocating
203// and returning a new slice, if necessary.
204func (e *IndexEntry) AppendTo(b []byte) []byte {
205	if len(b) < indexEntrySize {
206		if cap(b) < indexEntrySize {
207			b = make([]byte, indexEntrySize)
208		} else {
209			b = b[:indexEntrySize]
210		}
211	}
212
213	binary.BigEndian.PutUint64(b[:8], uint64(e.MinTime))
214	binary.BigEndian.PutUint64(b[8:16], uint64(e.MaxTime))
215	binary.BigEndian.PutUint64(b[16:24], uint64(e.Offset))
216	binary.BigEndian.PutUint32(b[24:28], uint32(e.Size))
217
218	return b
219}
220
221// Contains returns true if this IndexEntry may contain values for the given time.
222// The min and max times are inclusive.
223func (e *IndexEntry) Contains(t int64) bool {
224	return e.MinTime <= t && e.MaxTime >= t
225}
226
227// OverlapsTimeRange returns true if the given time ranges are completely within the entry's time bounds.
228func (e *IndexEntry) OverlapsTimeRange(min, max int64) bool {
229	return e.MinTime <= max && e.MaxTime >= min
230}
231
232// String returns a string representation of the entry.
233func (e *IndexEntry) String() string {
234	return fmt.Sprintf("min=%s max=%s ofs=%d siz=%d",
235		time.Unix(0, e.MinTime).UTC(), time.Unix(0, e.MaxTime).UTC(), e.Offset, e.Size)
236}
237
238// NewIndexWriter returns a new IndexWriter.
239func NewIndexWriter() IndexWriter {
240	buf := bytes.NewBuffer(make([]byte, 0, 1024*1024))
241	return &directIndex{buf: buf, w: bufio.NewWriter(buf)}
242}
243
244// NewIndexWriter returns a new IndexWriter.
245func NewDiskIndexWriter(f *os.File) IndexWriter {
246	return &directIndex{fd: f, w: bufio.NewWriterSize(f, 1024*1024)}
247}
248
249type syncer interface {
250	Name() string
251	Sync() error
252}
253
254// directIndex is a simple in-memory index implementation for a TSM file.  The full index
255// must fit in memory.
256type directIndex struct {
257	keyCount int
258	size     uint32
259
260	// The bytes written count of when we last fsync'd
261	lastSync uint32
262	fd       *os.File
263	buf      *bytes.Buffer
264
265	f syncer
266
267	w *bufio.Writer
268
269	key          []byte
270	indexEntries *indexEntries
271}
272
273func (d *directIndex) Add(key []byte, blockType byte, minTime, maxTime int64, offset int64, size uint32) {
274	// Is this the first block being added?
275	if len(d.key) == 0 {
276		// size of the key stored in the index
277		d.size += uint32(2 + len(key))
278		// size of the count of entries stored in the index
279		d.size += indexCountSize
280
281		d.key = key
282		if d.indexEntries == nil {
283			d.indexEntries = &indexEntries{}
284		}
285		d.indexEntries.Type = blockType
286		d.indexEntries.entries = append(d.indexEntries.entries, IndexEntry{
287			MinTime: minTime,
288			MaxTime: maxTime,
289			Offset:  offset,
290			Size:    size,
291		})
292
293		// size of the encoded index entry
294		d.size += indexEntrySize
295		d.keyCount++
296		return
297	}
298
299	// See if were still adding to the same series key.
300	cmp := bytes.Compare(d.key, key)
301	if cmp == 0 {
302		// The last block is still this key
303		d.indexEntries.entries = append(d.indexEntries.entries, IndexEntry{
304			MinTime: minTime,
305			MaxTime: maxTime,
306			Offset:  offset,
307			Size:    size,
308		})
309
310		// size of the encoded index entry
311		d.size += indexEntrySize
312
313	} else if cmp < 0 {
314		d.flush(d.w)
315		// We have a new key that is greater than the last one so we need to add
316		// a new index block section.
317
318		// size of the key stored in the index
319		d.size += uint32(2 + len(key))
320		// size of the count of entries stored in the index
321		d.size += indexCountSize
322
323		d.key = key
324		d.indexEntries.Type = blockType
325		d.indexEntries.entries = append(d.indexEntries.entries, IndexEntry{
326			MinTime: minTime,
327			MaxTime: maxTime,
328			Offset:  offset,
329			Size:    size,
330		})
331
332		// size of the encoded index entry
333		d.size += indexEntrySize
334		d.keyCount++
335	} else {
336		// Keys can't be added out of order.
337		panic(fmt.Sprintf("keys must be added in sorted order: %s < %s", string(key), string(d.key)))
338	}
339}
340
341func (d *directIndex) entries(key []byte) []IndexEntry {
342	if len(d.key) == 0 {
343		return nil
344	}
345
346	if bytes.Equal(d.key, key) {
347		return d.indexEntries.entries
348	}
349
350	return nil
351}
352
353func (d *directIndex) Entries(key []byte) []IndexEntry {
354	return d.entries(key)
355}
356
357func (d *directIndex) Entry(key []byte, t int64) *IndexEntry {
358	entries := d.entries(key)
359	for _, entry := range entries {
360		if entry.Contains(t) {
361			return &entry
362		}
363	}
364	return nil
365}
366
367func (d *directIndex) KeyCount() int {
368	return d.keyCount
369}
370
371// copyBuffer is the actual implementation of Copy and CopyBuffer.
372// if buf is nil, one is allocated.  This is copied from the Go stdlib
373// in order to remove the fast path WriteTo calls which circumvent any
374// IO throttling as well as to add periodic fsyncs to avoid long stalls.
375func copyBuffer(f syncer, dst io.Writer, src io.Reader, buf []byte) (written int64, err error) {
376	if buf == nil {
377		buf = make([]byte, 32*1024)
378	}
379	var lastSync int64
380	for {
381		nr, er := src.Read(buf)
382		if nr > 0 {
383			nw, ew := dst.Write(buf[0:nr])
384			if nw > 0 {
385				written += int64(nw)
386			}
387
388			if written-lastSync > fsyncEvery {
389				if err := f.Sync(); err != nil {
390					return 0, err
391				}
392				lastSync = written
393			}
394			if ew != nil {
395				err = ew
396				break
397			}
398			if nr != nw {
399				err = io.ErrShortWrite
400				break
401			}
402		}
403		if er != nil {
404			if er != io.EOF {
405				err = er
406			}
407			break
408		}
409	}
410	return written, err
411}
412
413func (d *directIndex) WriteTo(w io.Writer) (int64, error) {
414	if _, err := d.flush(d.w); err != nil {
415		return 0, err
416	}
417
418	if err := d.w.Flush(); err != nil {
419		return 0, err
420	}
421
422	if d.fd == nil {
423		return copyBuffer(d.f, w, d.buf, nil)
424	}
425
426	if _, err := d.fd.Seek(0, io.SeekStart); err != nil {
427		return 0, err
428	}
429
430	return io.Copy(w, bufio.NewReaderSize(d.fd, 1024*1024))
431}
432
433func (d *directIndex) flush(w io.Writer) (int64, error) {
434	var (
435		n   int
436		err error
437		buf [5]byte
438		N   int64
439	)
440
441	if len(d.key) == 0 {
442		return 0, nil
443	}
444	// For each key, individual entries are sorted by time
445	key := d.key
446	entries := d.indexEntries
447
448	if entries.Len() > maxIndexEntries {
449		return N, fmt.Errorf("key '%s' exceeds max index entries: %d > %d", key, entries.Len(), maxIndexEntries)
450	}
451
452	if !sort.IsSorted(entries) {
453		sort.Sort(entries)
454	}
455
456	binary.BigEndian.PutUint16(buf[0:2], uint16(len(key)))
457	buf[2] = entries.Type
458	binary.BigEndian.PutUint16(buf[3:5], uint16(entries.Len()))
459
460	// Append the key length and key
461	if n, err = w.Write(buf[0:2]); err != nil {
462		return int64(n) + N, fmt.Errorf("write: writer key length error: %v", err)
463	}
464	N += int64(n)
465
466	if n, err = w.Write(key); err != nil {
467		return int64(n) + N, fmt.Errorf("write: writer key error: %v", err)
468	}
469	N += int64(n)
470
471	// Append the block type and count
472	if n, err = w.Write(buf[2:5]); err != nil {
473		return int64(n) + N, fmt.Errorf("write: writer block type and count error: %v", err)
474	}
475	N += int64(n)
476
477	// Append each index entry for all blocks for this key
478	var n64 int64
479	if n64, err = entries.WriteTo(w); err != nil {
480		return n64 + N, fmt.Errorf("write: writer entries error: %v", err)
481	}
482	N += n64
483
484	d.key = nil
485	d.indexEntries.Type = 0
486	d.indexEntries.entries = d.indexEntries.entries[:0]
487
488	// If this is a disk based index and we've written more than the fsync threshold,
489	// fsync the data to avoid long pauses later on.
490	if d.fd != nil && d.size-d.lastSync > fsyncEvery {
491		if err := d.fd.Sync(); err != nil {
492			return N, err
493		}
494		d.lastSync = d.size
495	}
496
497	return N, nil
498
499}
500
501func (d *directIndex) MarshalBinary() ([]byte, error) {
502	var b bytes.Buffer
503	if _, err := d.WriteTo(&b); err != nil {
504		return nil, err
505	}
506	return b.Bytes(), nil
507}
508
509func (d *directIndex) Size() uint32 {
510	return d.size
511}
512
513func (d *directIndex) Close() error {
514	// Flush anything remaining in the index
515	if err := d.w.Flush(); err != nil {
516		return err
517	}
518
519	if d.fd == nil {
520		return nil
521	}
522
523	if err := d.fd.Close(); err != nil {
524		return err
525	}
526	return os.Remove(d.fd.Name())
527}
528
529// Remove removes the index from any tempory storage
530func (d *directIndex) Remove() error {
531	if d.fd == nil {
532		return nil
533	}
534
535	// Close the file handle to prevent leaking.  We ignore the error because
536	// we just want to cleanup and remove the file.
537	_ = d.fd.Close()
538
539	return os.Remove(d.fd.Name())
540}
541
542// tsmWriter writes keys and values in the TSM format
543type tsmWriter struct {
544	wrapped io.Writer
545	w       *bufio.Writer
546	index   IndexWriter
547	n       int64
548
549	// The bytes written count of when we last fsync'd
550	lastSync int64
551}
552
553// NewTSMWriter returns a new TSMWriter writing to w.
554func NewTSMWriter(w io.Writer) (TSMWriter, error) {
555	index := NewIndexWriter()
556	return &tsmWriter{wrapped: w, w: bufio.NewWriterSize(w, 1024*1024), index: index}, nil
557}
558
559// NewTSMWriterWithDiskBuffer returns a new TSMWriter writing to w and will use a disk
560// based buffer for the TSM index if possible.
561func NewTSMWriterWithDiskBuffer(w io.Writer) (TSMWriter, error) {
562	var index IndexWriter
563	// Make sure is a File so we can write the temp index alongside it.
564	if fw, ok := w.(syncer); ok {
565		f, err := os.OpenFile(strings.TrimSuffix(fw.Name(), ".tsm.tmp")+".idx.tmp", os.O_CREATE|os.O_RDWR|os.O_EXCL, 0666)
566		if err != nil {
567			return nil, err
568		}
569		index = NewDiskIndexWriter(f)
570	} else {
571		// w is not a file, just use an inmem index
572		index = NewIndexWriter()
573	}
574
575	return &tsmWriter{wrapped: w, w: bufio.NewWriterSize(w, 1024*1024), index: index}, nil
576}
577
578func (t *tsmWriter) writeHeader() error {
579	var buf [5]byte
580	binary.BigEndian.PutUint32(buf[0:4], MagicNumber)
581	buf[4] = Version
582
583	n, err := t.w.Write(buf[:])
584	if err != nil {
585		return err
586	}
587	t.n = int64(n)
588	return nil
589}
590
591// Write writes a new block containing key and values.
592func (t *tsmWriter) Write(key []byte, values Values) error {
593	if len(key) > maxKeyLength {
594		return ErrMaxKeyLengthExceeded
595	}
596
597	// Nothing to write
598	if len(values) == 0 {
599		return nil
600	}
601
602	// Write header only after we have some data to write.
603	if t.n == 0 {
604		if err := t.writeHeader(); err != nil {
605			return err
606		}
607	}
608
609	block, err := values.Encode(nil)
610	if err != nil {
611		return err
612	}
613
614	blockType, err := BlockType(block)
615	if err != nil {
616		return err
617	}
618
619	var checksum [crc32.Size]byte
620	binary.BigEndian.PutUint32(checksum[:], crc32.ChecksumIEEE(block))
621
622	_, err = t.w.Write(checksum[:])
623	if err != nil {
624		return err
625	}
626
627	n, err := t.w.Write(block)
628	if err != nil {
629		return err
630	}
631	n += len(checksum)
632
633	// Record this block in index
634	t.index.Add(key, blockType, values[0].UnixNano(), values[len(values)-1].UnixNano(), t.n, uint32(n))
635
636	// Increment file position pointer
637	t.n += int64(n)
638
639	if len(t.index.Entries(key)) >= maxIndexEntries {
640		return ErrMaxBlocksExceeded
641	}
642
643	return nil
644}
645
646// WriteBlock writes block for the given key and time range to the TSM file.  If the write
647// exceeds max entries for a given key, ErrMaxBlocksExceeded is returned.  This indicates
648// that the index is now full for this key and no future writes to this key will succeed.
649func (t *tsmWriter) WriteBlock(key []byte, minTime, maxTime int64, block []byte) error {
650	if len(key) > maxKeyLength {
651		return ErrMaxKeyLengthExceeded
652	}
653
654	// Nothing to write
655	if len(block) == 0 {
656		return nil
657	}
658
659	blockType, err := BlockType(block)
660	if err != nil {
661		return err
662	}
663
664	// Write header only after we have some data to write.
665	if t.n == 0 {
666		if err := t.writeHeader(); err != nil {
667			return err
668		}
669	}
670
671	var checksum [crc32.Size]byte
672	binary.BigEndian.PutUint32(checksum[:], crc32.ChecksumIEEE(block))
673
674	_, err = t.w.Write(checksum[:])
675	if err != nil {
676		return err
677	}
678
679	n, err := t.w.Write(block)
680	if err != nil {
681		return err
682	}
683	n += len(checksum)
684
685	// Record this block in index
686	t.index.Add(key, blockType, minTime, maxTime, t.n, uint32(n))
687
688	// Increment file position pointer (checksum + block len)
689	t.n += int64(n)
690
691	// fsync the file periodically to avoid long pauses with very big files.
692	if t.n-t.lastSync > fsyncEvery {
693		if err := t.sync(); err != nil {
694			return err
695		}
696		t.lastSync = t.n
697	}
698
699	if len(t.index.Entries(key)) >= maxIndexEntries {
700		return ErrMaxBlocksExceeded
701	}
702
703	return nil
704}
705
706// WriteIndex writes the index section of the file.  If there are no index entries to write,
707// this returns ErrNoValues.
708func (t *tsmWriter) WriteIndex() error {
709	indexPos := t.n
710
711	if t.index.KeyCount() == 0 {
712		return ErrNoValues
713	}
714
715	// Set the destination file on the index so we can periodically
716	// fsync while writing the index.
717	if f, ok := t.wrapped.(syncer); ok {
718		t.index.(*directIndex).f = f
719	}
720
721	// Write the index
722	if _, err := t.index.WriteTo(t.w); err != nil {
723		return err
724	}
725
726	var buf [8]byte
727	binary.BigEndian.PutUint64(buf[:], uint64(indexPos))
728
729	// Write the index index position
730	_, err := t.w.Write(buf[:])
731	return err
732}
733
734func (t *tsmWriter) Flush() error {
735	if err := t.w.Flush(); err != nil {
736		return err
737	}
738
739	return t.sync()
740}
741
742func (t *tsmWriter) sync() error {
743	// sync is a minimal interface to make sure we can sync the wrapped
744	// value. we use a minimal interface to be as robust as possible for
745	// syncing these files.
746	type sync interface {
747		Sync() error
748	}
749
750	if f, ok := t.wrapped.(sync); ok {
751		if err := f.Sync(); err != nil {
752			return err
753		}
754	}
755	return nil
756}
757
758func (t *tsmWriter) Close() error {
759	if err := t.Flush(); err != nil {
760		return err
761	}
762
763	if err := t.index.Close(); err != nil {
764		return err
765	}
766
767	if c, ok := t.wrapped.(io.Closer); ok {
768		return c.Close()
769	}
770	return nil
771}
772
773// Remove removes any temporary storage used by the writer.
774func (t *tsmWriter) Remove() error {
775	if err := t.index.Remove(); err != nil {
776		return err
777	}
778
779	// nameCloser is the most permissive interface we can close the wrapped
780	// value with.
781	type nameCloser interface {
782		io.Closer
783		Name() string
784	}
785
786	if f, ok := t.wrapped.(nameCloser); ok {
787		// Close the file handle to prevent leaking.  We ignore the error because
788		// we just want to cleanup and remove the file.
789		_ = f.Close()
790
791		return os.Remove(f.Name())
792	}
793	return nil
794}
795
796func (t *tsmWriter) Size() uint32 {
797	return uint32(t.n) + t.index.Size()
798}
799
800// verifyVersion verifies that the reader's bytes are a TSM byte
801// stream of the correct version (1)
802func verifyVersion(r io.ReadSeeker) error {
803	_, err := r.Seek(0, 0)
804	if err != nil {
805		return fmt.Errorf("init: failed to seek: %v", err)
806	}
807	var b [4]byte
808	_, err = io.ReadFull(r, b[:])
809	if err != nil {
810		return fmt.Errorf("init: error reading magic number of file: %v", err)
811	}
812	if binary.BigEndian.Uint32(b[:]) != MagicNumber {
813		return fmt.Errorf("can only read from tsm file")
814	}
815	_, err = io.ReadFull(r, b[:1])
816	if err != nil {
817		return fmt.Errorf("init: error reading version: %v", err)
818	}
819	if b[0] != Version {
820		return fmt.Errorf("init: file is version %b. expected %b", b[0], Version)
821	}
822
823	return nil
824}
825