1// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
2// All rights reserved.
3//
4// Use of this source code is governed by a BSD-style license that can be
5// found in the LICENSE file.
6
7package table
8
9import (
10	"encoding/binary"
11	"fmt"
12	"io"
13	"sort"
14	"strings"
15	"sync"
16
17	"github.com/golang/snappy"
18
19	"github.com/syndtr/goleveldb/leveldb/cache"
20	"github.com/syndtr/goleveldb/leveldb/comparer"
21	"github.com/syndtr/goleveldb/leveldb/errors"
22	"github.com/syndtr/goleveldb/leveldb/filter"
23	"github.com/syndtr/goleveldb/leveldb/iterator"
24	"github.com/syndtr/goleveldb/leveldb/opt"
25	"github.com/syndtr/goleveldb/leveldb/storage"
26	"github.com/syndtr/goleveldb/leveldb/util"
27)
28
29// Reader errors.
30var (
31	ErrNotFound       = errors.ErrNotFound
32	ErrReaderReleased = errors.New("leveldb/table: reader released")
33	ErrIterReleased   = errors.New("leveldb/table: iterator released")
34)
35
36// ErrCorrupted describes error due to corruption. This error will be wrapped
37// with errors.ErrCorrupted.
38type ErrCorrupted struct {
39	Pos    int64
40	Size   int64
41	Kind   string
42	Reason string
43}
44
45func (e *ErrCorrupted) Error() string {
46	return fmt.Sprintf("leveldb/table: corruption on %s (pos=%d): %s", e.Kind, e.Pos, e.Reason)
47}
48
49func max(x, y int) int {
50	if x > y {
51		return x
52	}
53	return y
54}
55
56type block struct {
57	bpool          *util.BufferPool
58	bh             blockHandle
59	data           []byte
60	restartsLen    int
61	restartsOffset int
62}
63
64func (b *block) seek(cmp comparer.Comparer, rstart, rlimit int, key []byte) (index, offset int, err error) {
65	index = sort.Search(b.restartsLen-rstart-(b.restartsLen-rlimit), func(i int) bool {
66		offset := int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*(rstart+i):]))
67		offset++                                    // shared always zero, since this is a restart point
68		v1, n1 := binary.Uvarint(b.data[offset:])   // key length
69		_, n2 := binary.Uvarint(b.data[offset+n1:]) // value length
70		m := offset + n1 + n2
71		return cmp.Compare(b.data[m:m+int(v1)], key) > 0
72	}) + rstart - 1
73	if index < rstart {
74		// The smallest key is greater-than key sought.
75		index = rstart
76	}
77	offset = int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*index:]))
78	return
79}
80
81func (b *block) restartIndex(rstart, rlimit, offset int) int {
82	return sort.Search(b.restartsLen-rstart-(b.restartsLen-rlimit), func(i int) bool {
83		return int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*(rstart+i):])) > offset
84	}) + rstart - 1
85}
86
87func (b *block) restartOffset(index int) int {
88	return int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*index:]))
89}
90
91func (b *block) entry(offset int) (key, value []byte, nShared, n int, err error) {
92	if offset >= b.restartsOffset {
93		if offset != b.restartsOffset {
94			err = &ErrCorrupted{Reason: "entries offset not aligned"}
95		}
96		return
97	}
98	v0, n0 := binary.Uvarint(b.data[offset:])       // Shared prefix length
99	v1, n1 := binary.Uvarint(b.data[offset+n0:])    // Key length
100	v2, n2 := binary.Uvarint(b.data[offset+n0+n1:]) // Value length
101	m := n0 + n1 + n2
102	n = m + int(v1) + int(v2)
103	if n0 <= 0 || n1 <= 0 || n2 <= 0 || offset+n > b.restartsOffset {
104		err = &ErrCorrupted{Reason: "entries corrupted"}
105		return
106	}
107	key = b.data[offset+m : offset+m+int(v1)]
108	value = b.data[offset+m+int(v1) : offset+n]
109	nShared = int(v0)
110	return
111}
112
113func (b *block) Release() {
114	b.bpool.Put(b.data)
115	b.bpool = nil
116	b.data = nil
117}
118
119type dir int
120
121const (
122	dirReleased dir = iota - 1
123	dirSOI
124	dirEOI
125	dirBackward
126	dirForward
127)
128
129type blockIter struct {
130	tr            *Reader
131	block         *block
132	blockReleaser util.Releaser
133	releaser      util.Releaser
134	key, value    []byte
135	offset        int
136	// Previous offset, only filled by Next.
137	prevOffset   int
138	prevNode     []int
139	prevKeys     []byte
140	restartIndex int
141	// Iterator direction.
142	dir dir
143	// Restart index slice range.
144	riStart int
145	riLimit int
146	// Offset slice range.
147	offsetStart     int
148	offsetRealStart int
149	offsetLimit     int
150	// Error.
151	err error
152}
153
154func (i *blockIter) sErr(err error) {
155	i.err = err
156	i.key = nil
157	i.value = nil
158	i.prevNode = nil
159	i.prevKeys = nil
160}
161
162func (i *blockIter) reset() {
163	if i.dir == dirBackward {
164		i.prevNode = i.prevNode[:0]
165		i.prevKeys = i.prevKeys[:0]
166	}
167	i.restartIndex = i.riStart
168	i.offset = i.offsetStart
169	i.dir = dirSOI
170	i.key = i.key[:0]
171	i.value = nil
172}
173
174func (i *blockIter) isFirst() bool {
175	switch i.dir {
176	case dirForward:
177		return i.prevOffset == i.offsetRealStart
178	case dirBackward:
179		return len(i.prevNode) == 1 && i.restartIndex == i.riStart
180	}
181	return false
182}
183
184func (i *blockIter) isLast() bool {
185	switch i.dir {
186	case dirForward, dirBackward:
187		return i.offset == i.offsetLimit
188	}
189	return false
190}
191
192func (i *blockIter) First() bool {
193	if i.err != nil {
194		return false
195	} else if i.dir == dirReleased {
196		i.err = ErrIterReleased
197		return false
198	}
199
200	if i.dir == dirBackward {
201		i.prevNode = i.prevNode[:0]
202		i.prevKeys = i.prevKeys[:0]
203	}
204	i.dir = dirSOI
205	return i.Next()
206}
207
208func (i *blockIter) Last() bool {
209	if i.err != nil {
210		return false
211	} else if i.dir == dirReleased {
212		i.err = ErrIterReleased
213		return false
214	}
215
216	if i.dir == dirBackward {
217		i.prevNode = i.prevNode[:0]
218		i.prevKeys = i.prevKeys[:0]
219	}
220	i.dir = dirEOI
221	return i.Prev()
222}
223
224func (i *blockIter) Seek(key []byte) bool {
225	if i.err != nil {
226		return false
227	} else if i.dir == dirReleased {
228		i.err = ErrIterReleased
229		return false
230	}
231
232	ri, offset, err := i.block.seek(i.tr.cmp, i.riStart, i.riLimit, key)
233	if err != nil {
234		i.sErr(err)
235		return false
236	}
237	i.restartIndex = ri
238	i.offset = max(i.offsetStart, offset)
239	if i.dir == dirSOI || i.dir == dirEOI {
240		i.dir = dirForward
241	}
242	for i.Next() {
243		if i.tr.cmp.Compare(i.key, key) >= 0 {
244			return true
245		}
246	}
247	return false
248}
249
250func (i *blockIter) Next() bool {
251	if i.dir == dirEOI || i.err != nil {
252		return false
253	} else if i.dir == dirReleased {
254		i.err = ErrIterReleased
255		return false
256	}
257
258	if i.dir == dirSOI {
259		i.restartIndex = i.riStart
260		i.offset = i.offsetStart
261	} else if i.dir == dirBackward {
262		i.prevNode = i.prevNode[:0]
263		i.prevKeys = i.prevKeys[:0]
264	}
265	for i.offset < i.offsetRealStart {
266		key, value, nShared, n, err := i.block.entry(i.offset)
267		if err != nil {
268			i.sErr(i.tr.fixErrCorruptedBH(i.block.bh, err))
269			return false
270		}
271		if n == 0 {
272			i.dir = dirEOI
273			return false
274		}
275		i.key = append(i.key[:nShared], key...)
276		i.value = value
277		i.offset += n
278	}
279	if i.offset >= i.offsetLimit {
280		i.dir = dirEOI
281		if i.offset != i.offsetLimit {
282			i.sErr(i.tr.newErrCorruptedBH(i.block.bh, "entries offset not aligned"))
283		}
284		return false
285	}
286	key, value, nShared, n, err := i.block.entry(i.offset)
287	if err != nil {
288		i.sErr(i.tr.fixErrCorruptedBH(i.block.bh, err))
289		return false
290	}
291	if n == 0 {
292		i.dir = dirEOI
293		return false
294	}
295	i.key = append(i.key[:nShared], key...)
296	i.value = value
297	i.prevOffset = i.offset
298	i.offset += n
299	i.dir = dirForward
300	return true
301}
302
303func (i *blockIter) Prev() bool {
304	if i.dir == dirSOI || i.err != nil {
305		return false
306	} else if i.dir == dirReleased {
307		i.err = ErrIterReleased
308		return false
309	}
310
311	var ri int
312	if i.dir == dirForward {
313		// Change direction.
314		i.offset = i.prevOffset
315		if i.offset == i.offsetRealStart {
316			i.dir = dirSOI
317			return false
318		}
319		ri = i.block.restartIndex(i.restartIndex, i.riLimit, i.offset)
320		i.dir = dirBackward
321	} else if i.dir == dirEOI {
322		// At the end of iterator.
323		i.restartIndex = i.riLimit
324		i.offset = i.offsetLimit
325		if i.offset == i.offsetRealStart {
326			i.dir = dirSOI
327			return false
328		}
329		ri = i.riLimit - 1
330		i.dir = dirBackward
331	} else if len(i.prevNode) == 1 {
332		// This is the end of a restart range.
333		i.offset = i.prevNode[0]
334		i.prevNode = i.prevNode[:0]
335		if i.restartIndex == i.riStart {
336			i.dir = dirSOI
337			return false
338		}
339		i.restartIndex--
340		ri = i.restartIndex
341	} else {
342		// In the middle of restart range, get from cache.
343		n := len(i.prevNode) - 3
344		node := i.prevNode[n:]
345		i.prevNode = i.prevNode[:n]
346		// Get the key.
347		ko := node[0]
348		i.key = append(i.key[:0], i.prevKeys[ko:]...)
349		i.prevKeys = i.prevKeys[:ko]
350		// Get the value.
351		vo := node[1]
352		vl := vo + node[2]
353		i.value = i.block.data[vo:vl]
354		i.offset = vl
355		return true
356	}
357	// Build entries cache.
358	i.key = i.key[:0]
359	i.value = nil
360	offset := i.block.restartOffset(ri)
361	if offset == i.offset {
362		ri--
363		if ri < 0 {
364			i.dir = dirSOI
365			return false
366		}
367		offset = i.block.restartOffset(ri)
368	}
369	i.prevNode = append(i.prevNode, offset)
370	for {
371		key, value, nShared, n, err := i.block.entry(offset)
372		if err != nil {
373			i.sErr(i.tr.fixErrCorruptedBH(i.block.bh, err))
374			return false
375		}
376		if offset >= i.offsetRealStart {
377			if i.value != nil {
378				// Appends 3 variables:
379				// 1. Previous keys offset
380				// 2. Value offset in the data block
381				// 3. Value length
382				i.prevNode = append(i.prevNode, len(i.prevKeys), offset-len(i.value), len(i.value))
383				i.prevKeys = append(i.prevKeys, i.key...)
384			}
385			i.value = value
386		}
387		i.key = append(i.key[:nShared], key...)
388		offset += n
389		// Stop if target offset reached.
390		if offset >= i.offset {
391			if offset != i.offset {
392				i.sErr(i.tr.newErrCorruptedBH(i.block.bh, "entries offset not aligned"))
393				return false
394			}
395
396			break
397		}
398	}
399	i.restartIndex = ri
400	i.offset = offset
401	return true
402}
403
404func (i *blockIter) Key() []byte {
405	if i.err != nil || i.dir <= dirEOI {
406		return nil
407	}
408	return i.key
409}
410
411func (i *blockIter) Value() []byte {
412	if i.err != nil || i.dir <= dirEOI {
413		return nil
414	}
415	return i.value
416}
417
418func (i *blockIter) Release() {
419	if i.dir != dirReleased {
420		i.tr = nil
421		i.block = nil
422		i.prevNode = nil
423		i.prevKeys = nil
424		i.key = nil
425		i.value = nil
426		i.dir = dirReleased
427		if i.blockReleaser != nil {
428			i.blockReleaser.Release()
429			i.blockReleaser = nil
430		}
431		if i.releaser != nil {
432			i.releaser.Release()
433			i.releaser = nil
434		}
435	}
436}
437
438func (i *blockIter) SetReleaser(releaser util.Releaser) {
439	if i.dir == dirReleased {
440		panic(util.ErrReleased)
441	}
442	if i.releaser != nil && releaser != nil {
443		panic(util.ErrHasReleaser)
444	}
445	i.releaser = releaser
446}
447
448func (i *blockIter) Valid() bool {
449	return i.err == nil && (i.dir == dirBackward || i.dir == dirForward)
450}
451
452func (i *blockIter) Error() error {
453	return i.err
454}
455
456type filterBlock struct {
457	bpool      *util.BufferPool
458	data       []byte
459	oOffset    int
460	baseLg     uint
461	filtersNum int
462}
463
464func (b *filterBlock) contains(filter filter.Filter, offset uint64, key []byte) bool {
465	i := int(offset >> b.baseLg)
466	if i < b.filtersNum {
467		o := b.data[b.oOffset+i*4:]
468		n := int(binary.LittleEndian.Uint32(o))
469		m := int(binary.LittleEndian.Uint32(o[4:]))
470		if n < m && m <= b.oOffset {
471			return filter.Contains(b.data[n:m], key)
472		} else if n == m {
473			return false
474		}
475	}
476	return true
477}
478
479func (b *filterBlock) Release() {
480	b.bpool.Put(b.data)
481	b.bpool = nil
482	b.data = nil
483}
484
485type indexIter struct {
486	*blockIter
487	tr    *Reader
488	slice *util.Range
489	// Options
490	fillCache bool
491}
492
493func (i *indexIter) Get() iterator.Iterator {
494	value := i.Value()
495	if value == nil {
496		return nil
497	}
498	dataBH, n := decodeBlockHandle(value)
499	if n == 0 {
500		return iterator.NewEmptyIterator(i.tr.newErrCorruptedBH(i.tr.indexBH, "bad data block handle"))
501	}
502
503	var slice *util.Range
504	if i.slice != nil && (i.blockIter.isFirst() || i.blockIter.isLast()) {
505		slice = i.slice
506	}
507	return i.tr.getDataIterErr(dataBH, slice, i.tr.verifyChecksum, i.fillCache)
508}
509
510// Reader is a table reader.
511type Reader struct {
512	mu     sync.RWMutex
513	fd     storage.FileDesc
514	reader io.ReaderAt
515	cache  *cache.NamespaceGetter
516	err    error
517	bpool  *util.BufferPool
518	// Options
519	o              *opt.Options
520	cmp            comparer.Comparer
521	filter         filter.Filter
522	verifyChecksum bool
523
524	dataEnd                   int64
525	metaBH, indexBH, filterBH blockHandle
526	indexBlock                *block
527	filterBlock               *filterBlock
528}
529
530func (r *Reader) blockKind(bh blockHandle) string {
531	switch bh.offset {
532	case r.metaBH.offset:
533		return "meta-block"
534	case r.indexBH.offset:
535		return "index-block"
536	case r.filterBH.offset:
537		if r.filterBH.length > 0 {
538			return "filter-block"
539		}
540	}
541	return "data-block"
542}
543
544func (r *Reader) newErrCorrupted(pos, size int64, kind, reason string) error {
545	return &errors.ErrCorrupted{Fd: r.fd, Err: &ErrCorrupted{Pos: pos, Size: size, Kind: kind, Reason: reason}}
546}
547
548func (r *Reader) newErrCorruptedBH(bh blockHandle, reason string) error {
549	return r.newErrCorrupted(int64(bh.offset), int64(bh.length), r.blockKind(bh), reason)
550}
551
552func (r *Reader) fixErrCorruptedBH(bh blockHandle, err error) error {
553	if cerr, ok := err.(*ErrCorrupted); ok {
554		cerr.Pos = int64(bh.offset)
555		cerr.Size = int64(bh.length)
556		cerr.Kind = r.blockKind(bh)
557		return &errors.ErrCorrupted{Fd: r.fd, Err: cerr}
558	}
559	return err
560}
561
562func (r *Reader) readRawBlock(bh blockHandle, verifyChecksum bool) ([]byte, error) {
563	data := r.bpool.Get(int(bh.length + blockTrailerLen))
564	if _, err := r.reader.ReadAt(data, int64(bh.offset)); err != nil && err != io.EOF {
565		return nil, err
566	}
567
568	if verifyChecksum {
569		n := bh.length + 1
570		checksum0 := binary.LittleEndian.Uint32(data[n:])
571		checksum1 := util.NewCRC(data[:n]).Value()
572		if checksum0 != checksum1 {
573			r.bpool.Put(data)
574			return nil, r.newErrCorruptedBH(bh, fmt.Sprintf("checksum mismatch, want=%#x got=%#x", checksum0, checksum1))
575		}
576	}
577
578	switch data[bh.length] {
579	case blockTypeNoCompression:
580		data = data[:bh.length]
581	case blockTypeSnappyCompression:
582		decLen, err := snappy.DecodedLen(data[:bh.length])
583		if err != nil {
584			r.bpool.Put(data)
585			return nil, r.newErrCorruptedBH(bh, err.Error())
586		}
587		decData := r.bpool.Get(decLen)
588		decData, err = snappy.Decode(decData, data[:bh.length])
589		r.bpool.Put(data)
590		if err != nil {
591			r.bpool.Put(decData)
592			return nil, r.newErrCorruptedBH(bh, err.Error())
593		}
594		data = decData
595	default:
596		r.bpool.Put(data)
597		return nil, r.newErrCorruptedBH(bh, fmt.Sprintf("unknown compression type %#x", data[bh.length]))
598	}
599	return data, nil
600}
601
602func (r *Reader) readBlock(bh blockHandle, verifyChecksum bool) (*block, error) {
603	data, err := r.readRawBlock(bh, verifyChecksum)
604	if err != nil {
605		return nil, err
606	}
607	restartsLen := int(binary.LittleEndian.Uint32(data[len(data)-4:]))
608	b := &block{
609		bpool:          r.bpool,
610		bh:             bh,
611		data:           data,
612		restartsLen:    restartsLen,
613		restartsOffset: len(data) - (restartsLen+1)*4,
614	}
615	return b, nil
616}
617
618func (r *Reader) readBlockCached(bh blockHandle, verifyChecksum, fillCache bool) (*block, util.Releaser, error) {
619	if r.cache != nil {
620		var (
621			err error
622			ch  *cache.Handle
623		)
624		if fillCache {
625			ch = r.cache.Get(bh.offset, func() (size int, value cache.Value) {
626				var b *block
627				b, err = r.readBlock(bh, verifyChecksum)
628				if err != nil {
629					return 0, nil
630				}
631				return cap(b.data), b
632			})
633		} else {
634			ch = r.cache.Get(bh.offset, nil)
635		}
636		if ch != nil {
637			b, ok := ch.Value().(*block)
638			if !ok {
639				ch.Release()
640				return nil, nil, errors.New("leveldb/table: inconsistent block type")
641			}
642			return b, ch, err
643		} else if err != nil {
644			return nil, nil, err
645		}
646	}
647
648	b, err := r.readBlock(bh, verifyChecksum)
649	return b, b, err
650}
651
652func (r *Reader) readFilterBlock(bh blockHandle) (*filterBlock, error) {
653	data, err := r.readRawBlock(bh, true)
654	if err != nil {
655		return nil, err
656	}
657	n := len(data)
658	if n < 5 {
659		return nil, r.newErrCorruptedBH(bh, "too short")
660	}
661	m := n - 5
662	oOffset := int(binary.LittleEndian.Uint32(data[m:]))
663	if oOffset > m {
664		return nil, r.newErrCorruptedBH(bh, "invalid data-offsets offset")
665	}
666	b := &filterBlock{
667		bpool:      r.bpool,
668		data:       data,
669		oOffset:    oOffset,
670		baseLg:     uint(data[n-1]),
671		filtersNum: (m - oOffset) / 4,
672	}
673	return b, nil
674}
675
676func (r *Reader) readFilterBlockCached(bh blockHandle, fillCache bool) (*filterBlock, util.Releaser, error) {
677	if r.cache != nil {
678		var (
679			err error
680			ch  *cache.Handle
681		)
682		if fillCache {
683			ch = r.cache.Get(bh.offset, func() (size int, value cache.Value) {
684				var b *filterBlock
685				b, err = r.readFilterBlock(bh)
686				if err != nil {
687					return 0, nil
688				}
689				return cap(b.data), b
690			})
691		} else {
692			ch = r.cache.Get(bh.offset, nil)
693		}
694		if ch != nil {
695			b, ok := ch.Value().(*filterBlock)
696			if !ok {
697				ch.Release()
698				return nil, nil, errors.New("leveldb/table: inconsistent block type")
699			}
700			return b, ch, err
701		} else if err != nil {
702			return nil, nil, err
703		}
704	}
705
706	b, err := r.readFilterBlock(bh)
707	return b, b, err
708}
709
710func (r *Reader) getIndexBlock(fillCache bool) (b *block, rel util.Releaser, err error) {
711	if r.indexBlock == nil {
712		return r.readBlockCached(r.indexBH, true, fillCache)
713	}
714	return r.indexBlock, util.NoopReleaser{}, nil
715}
716
717func (r *Reader) getFilterBlock(fillCache bool) (*filterBlock, util.Releaser, error) {
718	if r.filterBlock == nil {
719		return r.readFilterBlockCached(r.filterBH, fillCache)
720	}
721	return r.filterBlock, util.NoopReleaser{}, nil
722}
723
724func (r *Reader) newBlockIter(b *block, bReleaser util.Releaser, slice *util.Range, inclLimit bool) *blockIter {
725	bi := &blockIter{
726		tr:            r,
727		block:         b,
728		blockReleaser: bReleaser,
729		// Valid key should never be nil.
730		key:             make([]byte, 0),
731		dir:             dirSOI,
732		riStart:         0,
733		riLimit:         b.restartsLen,
734		offsetStart:     0,
735		offsetRealStart: 0,
736		offsetLimit:     b.restartsOffset,
737	}
738	if slice != nil {
739		if slice.Start != nil {
740			if bi.Seek(slice.Start) {
741				bi.riStart = b.restartIndex(bi.restartIndex, b.restartsLen, bi.prevOffset)
742				bi.offsetStart = b.restartOffset(bi.riStart)
743				bi.offsetRealStart = bi.prevOffset
744			} else {
745				bi.riStart = b.restartsLen
746				bi.offsetStart = b.restartsOffset
747				bi.offsetRealStart = b.restartsOffset
748			}
749		}
750		if slice.Limit != nil {
751			if bi.Seek(slice.Limit) && (!inclLimit || bi.Next()) {
752				bi.offsetLimit = bi.prevOffset
753				bi.riLimit = bi.restartIndex + 1
754			}
755		}
756		bi.reset()
757		if bi.offsetStart > bi.offsetLimit {
758			bi.sErr(errors.New("leveldb/table: invalid slice range"))
759		}
760	}
761	return bi
762}
763
764func (r *Reader) getDataIter(dataBH blockHandle, slice *util.Range, verifyChecksum, fillCache bool) iterator.Iterator {
765	b, rel, err := r.readBlockCached(dataBH, verifyChecksum, fillCache)
766	if err != nil {
767		return iterator.NewEmptyIterator(err)
768	}
769	return r.newBlockIter(b, rel, slice, false)
770}
771
772func (r *Reader) getDataIterErr(dataBH blockHandle, slice *util.Range, verifyChecksum, fillCache bool) iterator.Iterator {
773	r.mu.RLock()
774	defer r.mu.RUnlock()
775
776	if r.err != nil {
777		return iterator.NewEmptyIterator(r.err)
778	}
779
780	return r.getDataIter(dataBH, slice, verifyChecksum, fillCache)
781}
782
783// NewIterator creates an iterator from the table.
784//
785// Slice allows slicing the iterator to only contains keys in the given
786// range. A nil Range.Start is treated as a key before all keys in the
787// table. And a nil Range.Limit is treated as a key after all keys in
788// the table.
789//
790// WARNING: Any slice returned by interator (e.g. slice returned by calling
791// Iterator.Key() or Iterator.Key() methods), its content should not be modified
792// unless noted otherwise.
793//
794// The returned iterator is not safe for concurrent use and should be released
795// after use.
796//
797// Also read Iterator documentation of the leveldb/iterator package.
798func (r *Reader) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
799	r.mu.RLock()
800	defer r.mu.RUnlock()
801
802	if r.err != nil {
803		return iterator.NewEmptyIterator(r.err)
804	}
805
806	fillCache := !ro.GetDontFillCache()
807	indexBlock, rel, err := r.getIndexBlock(fillCache)
808	if err != nil {
809		return iterator.NewEmptyIterator(err)
810	}
811	index := &indexIter{
812		blockIter: r.newBlockIter(indexBlock, rel, slice, true),
813		tr:        r,
814		slice:     slice,
815		fillCache: !ro.GetDontFillCache(),
816	}
817	return iterator.NewIndexedIterator(index, opt.GetStrict(r.o, ro, opt.StrictReader))
818}
819
820func (r *Reader) find(key []byte, filtered bool, ro *opt.ReadOptions, noValue bool) (rkey, value []byte, err error) {
821	r.mu.RLock()
822	defer r.mu.RUnlock()
823
824	if r.err != nil {
825		err = r.err
826		return
827	}
828
829	indexBlock, rel, err := r.getIndexBlock(true)
830	if err != nil {
831		return
832	}
833	defer rel.Release()
834
835	index := r.newBlockIter(indexBlock, nil, nil, true)
836	defer index.Release()
837
838	if !index.Seek(key) {
839		if err = index.Error(); err == nil {
840			err = ErrNotFound
841		}
842		return
843	}
844
845	dataBH, n := decodeBlockHandle(index.Value())
846	if n == 0 {
847		r.err = r.newErrCorruptedBH(r.indexBH, "bad data block handle")
848		return nil, nil, r.err
849	}
850
851	// The filter should only used for exact match.
852	if filtered && r.filter != nil {
853		filterBlock, frel, ferr := r.getFilterBlock(true)
854		if ferr == nil {
855			if !filterBlock.contains(r.filter, dataBH.offset, key) {
856				frel.Release()
857				return nil, nil, ErrNotFound
858			}
859			frel.Release()
860		} else if !errors.IsCorrupted(ferr) {
861			return nil, nil, ferr
862		}
863	}
864
865	data := r.getDataIter(dataBH, nil, r.verifyChecksum, !ro.GetDontFillCache())
866	if !data.Seek(key) {
867		data.Release()
868		if err = data.Error(); err != nil {
869			return
870		}
871
872		// The nearest greater-than key is the first key of the next block.
873		if !index.Next() {
874			if err = index.Error(); err == nil {
875				err = ErrNotFound
876			}
877			return
878		}
879
880		dataBH, n = decodeBlockHandle(index.Value())
881		if n == 0 {
882			r.err = r.newErrCorruptedBH(r.indexBH, "bad data block handle")
883			return nil, nil, r.err
884		}
885
886		data = r.getDataIter(dataBH, nil, r.verifyChecksum, !ro.GetDontFillCache())
887		if !data.Next() {
888			data.Release()
889			if err = data.Error(); err == nil {
890				err = ErrNotFound
891			}
892			return
893		}
894	}
895
896	// Key doesn't use block buffer, no need to copy the buffer.
897	rkey = data.Key()
898	if !noValue {
899		if r.bpool == nil {
900			value = data.Value()
901		} else {
902			// Value does use block buffer, and since the buffer will be
903			// recycled, it need to be copied.
904			value = append([]byte{}, data.Value()...)
905		}
906	}
907	data.Release()
908	return
909}
910
911// Find finds key/value pair whose key is greater than or equal to the
912// given key. It returns ErrNotFound if the table doesn't contain
913// such pair.
914// If filtered is true then the nearest 'block' will be checked against
915// 'filter data' (if present) and will immediately return ErrNotFound if
916// 'filter data' indicates that such pair doesn't exist.
917//
918// The caller may modify the contents of the returned slice as it is its
919// own copy.
920// It is safe to modify the contents of the argument after Find returns.
921func (r *Reader) Find(key []byte, filtered bool, ro *opt.ReadOptions) (rkey, value []byte, err error) {
922	return r.find(key, filtered, ro, false)
923}
924
925// FindKey finds key that is greater than or equal to the given key.
926// It returns ErrNotFound if the table doesn't contain such key.
927// If filtered is true then the nearest 'block' will be checked against
928// 'filter data' (if present) and will immediately return ErrNotFound if
929// 'filter data' indicates that such key doesn't exist.
930//
931// The caller may modify the contents of the returned slice as it is its
932// own copy.
933// It is safe to modify the contents of the argument after Find returns.
934func (r *Reader) FindKey(key []byte, filtered bool, ro *opt.ReadOptions) (rkey []byte, err error) {
935	rkey, _, err = r.find(key, filtered, ro, true)
936	return
937}
938
939// Get gets the value for the given key. It returns errors.ErrNotFound
940// if the table does not contain the key.
941//
942// The caller may modify the contents of the returned slice as it is its
943// own copy.
944// It is safe to modify the contents of the argument after Find returns.
945func (r *Reader) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) {
946	r.mu.RLock()
947	defer r.mu.RUnlock()
948
949	if r.err != nil {
950		err = r.err
951		return
952	}
953
954	rkey, value, err := r.find(key, false, ro, false)
955	if err == nil && r.cmp.Compare(rkey, key) != 0 {
956		value = nil
957		err = ErrNotFound
958	}
959	return
960}
961
962// OffsetOf returns approximate offset for the given key.
963//
964// It is safe to modify the contents of the argument after Get returns.
965func (r *Reader) OffsetOf(key []byte) (offset int64, err error) {
966	r.mu.RLock()
967	defer r.mu.RUnlock()
968
969	if r.err != nil {
970		err = r.err
971		return
972	}
973
974	indexBlock, rel, err := r.readBlockCached(r.indexBH, true, true)
975	if err != nil {
976		return
977	}
978	defer rel.Release()
979
980	index := r.newBlockIter(indexBlock, nil, nil, true)
981	defer index.Release()
982	if index.Seek(key) {
983		dataBH, n := decodeBlockHandle(index.Value())
984		if n == 0 {
985			r.err = r.newErrCorruptedBH(r.indexBH, "bad data block handle")
986			return
987		}
988		offset = int64(dataBH.offset)
989		return
990	}
991	err = index.Error()
992	if err == nil {
993		offset = r.dataEnd
994	}
995	return
996}
997
998// Release implements util.Releaser.
999// It also close the file if it is an io.Closer.
1000func (r *Reader) Release() {
1001	r.mu.Lock()
1002	defer r.mu.Unlock()
1003
1004	if closer, ok := r.reader.(io.Closer); ok {
1005		closer.Close()
1006	}
1007	if r.indexBlock != nil {
1008		r.indexBlock.Release()
1009		r.indexBlock = nil
1010	}
1011	if r.filterBlock != nil {
1012		r.filterBlock.Release()
1013		r.filterBlock = nil
1014	}
1015	r.reader = nil
1016	r.cache = nil
1017	r.bpool = nil
1018	r.err = ErrReaderReleased
1019}
1020
1021// NewReader creates a new initialized table reader for the file.
1022// The fi, cache and bpool is optional and can be nil.
1023//
1024// The returned table reader instance is safe for concurrent use.
1025func NewReader(f io.ReaderAt, size int64, fd storage.FileDesc, cache *cache.NamespaceGetter, bpool *util.BufferPool, o *opt.Options) (*Reader, error) {
1026	if f == nil {
1027		return nil, errors.New("leveldb/table: nil file")
1028	}
1029
1030	r := &Reader{
1031		fd:             fd,
1032		reader:         f,
1033		cache:          cache,
1034		bpool:          bpool,
1035		o:              o,
1036		cmp:            o.GetComparer(),
1037		verifyChecksum: o.GetStrict(opt.StrictBlockChecksum),
1038	}
1039
1040	if size < footerLen {
1041		r.err = r.newErrCorrupted(0, size, "table", "too small")
1042		return r, nil
1043	}
1044
1045	footerPos := size - footerLen
1046	var footer [footerLen]byte
1047	if _, err := r.reader.ReadAt(footer[:], footerPos); err != nil && err != io.EOF {
1048		return nil, err
1049	}
1050	if string(footer[footerLen-len(magic):footerLen]) != magic {
1051		r.err = r.newErrCorrupted(footerPos, footerLen, "table-footer", "bad magic number")
1052		return r, nil
1053	}
1054
1055	var n int
1056	// Decode the metaindex block handle.
1057	r.metaBH, n = decodeBlockHandle(footer[:])
1058	if n == 0 {
1059		r.err = r.newErrCorrupted(footerPos, footerLen, "table-footer", "bad metaindex block handle")
1060		return r, nil
1061	}
1062
1063	// Decode the index block handle.
1064	r.indexBH, n = decodeBlockHandle(footer[n:])
1065	if n == 0 {
1066		r.err = r.newErrCorrupted(footerPos, footerLen, "table-footer", "bad index block handle")
1067		return r, nil
1068	}
1069
1070	// Read metaindex block.
1071	metaBlock, err := r.readBlock(r.metaBH, true)
1072	if err != nil {
1073		if errors.IsCorrupted(err) {
1074			r.err = err
1075			return r, nil
1076		}
1077		return nil, err
1078	}
1079
1080	// Set data end.
1081	r.dataEnd = int64(r.metaBH.offset)
1082
1083	// Read metaindex.
1084	metaIter := r.newBlockIter(metaBlock, nil, nil, true)
1085	for metaIter.Next() {
1086		key := string(metaIter.Key())
1087		if !strings.HasPrefix(key, "filter.") {
1088			continue
1089		}
1090		fn := key[7:]
1091		if f0 := o.GetFilter(); f0 != nil && f0.Name() == fn {
1092			r.filter = f0
1093		} else {
1094			for _, f0 := range o.GetAltFilters() {
1095				if f0.Name() == fn {
1096					r.filter = f0
1097					break
1098				}
1099			}
1100		}
1101		if r.filter != nil {
1102			filterBH, n := decodeBlockHandle(metaIter.Value())
1103			if n == 0 {
1104				continue
1105			}
1106			r.filterBH = filterBH
1107			// Update data end.
1108			r.dataEnd = int64(filterBH.offset)
1109			break
1110		}
1111	}
1112	metaIter.Release()
1113	metaBlock.Release()
1114
1115	// Cache index and filter block locally, since we don't have global cache.
1116	if cache == nil {
1117		r.indexBlock, err = r.readBlock(r.indexBH, true)
1118		if err != nil {
1119			if errors.IsCorrupted(err) {
1120				r.err = err
1121				return r, nil
1122			}
1123			return nil, err
1124		}
1125		if r.filter != nil {
1126			r.filterBlock, err = r.readFilterBlock(r.filterBH)
1127			if err != nil {
1128				if !errors.IsCorrupted(err) {
1129					return nil, err
1130				}
1131
1132				// Don't use filter then.
1133				r.filter = nil
1134			}
1135		}
1136	}
1137
1138	return r, nil
1139}
1140