1// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
2// All rights reserved.
3//
4// Use of this source code is governed by a BSD-style license that can be
5// found in the LICENSE file.
6
7package leveldb
8
9import (
10	"errors"
11	"math/rand"
12	"runtime"
13	"sync"
14	"sync/atomic"
15
16	"github.com/syndtr/goleveldb/leveldb/iterator"
17	"github.com/syndtr/goleveldb/leveldb/opt"
18	"github.com/syndtr/goleveldb/leveldb/util"
19)
20
21var (
22	errInvalidInternalKey = errors.New("leveldb: Iterator: invalid internal key")
23)
24
25type memdbReleaser struct {
26	once sync.Once
27	m    *memDB
28}
29
30func (mr *memdbReleaser) Release() {
31	mr.once.Do(func() {
32		mr.m.decref()
33	})
34}
35
36func (db *DB) newRawIterator(auxm *memDB, auxt tFiles, slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
37	strict := opt.GetStrict(db.s.o.Options, ro, opt.StrictReader)
38	em, fm := db.getMems()
39	v := db.s.version()
40
41	tableIts := v.getIterators(slice, ro)
42	n := len(tableIts) + len(auxt) + 3
43	its := make([]iterator.Iterator, 0, n)
44
45	if auxm != nil {
46		ami := auxm.NewIterator(slice)
47		ami.SetReleaser(&memdbReleaser{m: auxm})
48		its = append(its, ami)
49	}
50	for _, t := range auxt {
51		its = append(its, v.s.tops.newIterator(t, slice, ro))
52	}
53
54	emi := em.NewIterator(slice)
55	emi.SetReleaser(&memdbReleaser{m: em})
56	its = append(its, emi)
57	if fm != nil {
58		fmi := fm.NewIterator(slice)
59		fmi.SetReleaser(&memdbReleaser{m: fm})
60		its = append(its, fmi)
61	}
62	its = append(its, tableIts...)
63	mi := iterator.NewMergedIterator(its, db.s.icmp, strict)
64	mi.SetReleaser(&versionReleaser{v: v})
65	return mi
66}
67
68func (db *DB) newIterator(auxm *memDB, auxt tFiles, seq uint64, slice *util.Range, ro *opt.ReadOptions) *dbIter {
69	var islice *util.Range
70	if slice != nil {
71		islice = &util.Range{}
72		if slice.Start != nil {
73			islice.Start = makeInternalKey(nil, slice.Start, keyMaxSeq, keyTypeSeek)
74		}
75		if slice.Limit != nil {
76			islice.Limit = makeInternalKey(nil, slice.Limit, keyMaxSeq, keyTypeSeek)
77		}
78	}
79	rawIter := db.newRawIterator(auxm, auxt, islice, ro)
80	iter := &dbIter{
81		db:     db,
82		icmp:   db.s.icmp,
83		iter:   rawIter,
84		seq:    seq,
85		strict: opt.GetStrict(db.s.o.Options, ro, opt.StrictReader),
86		key:    make([]byte, 0),
87		value:  make([]byte, 0),
88	}
89	atomic.AddInt32(&db.aliveIters, 1)
90	runtime.SetFinalizer(iter, (*dbIter).Release)
91	return iter
92}
93
94func (db *DB) iterSamplingRate() int {
95	return rand.Intn(2 * db.s.o.GetIteratorSamplingRate())
96}
97
98type dir int
99
100const (
101	dirReleased dir = iota - 1
102	dirSOI
103	dirEOI
104	dirBackward
105	dirForward
106)
107
108// dbIter represent an interator states over a database session.
109type dbIter struct {
110	db     *DB
111	icmp   *iComparer
112	iter   iterator.Iterator
113	seq    uint64
114	strict bool
115
116	smaplingGap int
117	dir         dir
118	key         []byte
119	value       []byte
120	err         error
121	releaser    util.Releaser
122}
123
124func (i *dbIter) sampleSeek() {
125	ikey := i.iter.Key()
126	i.smaplingGap -= len(ikey) + len(i.iter.Value())
127	for i.smaplingGap < 0 {
128		i.smaplingGap += i.db.iterSamplingRate()
129		i.db.sampleSeek(ikey)
130	}
131}
132
133func (i *dbIter) setErr(err error) {
134	i.err = err
135	i.key = nil
136	i.value = nil
137}
138
139func (i *dbIter) iterErr() {
140	if err := i.iter.Error(); err != nil {
141		i.setErr(err)
142	}
143}
144
145func (i *dbIter) Valid() bool {
146	return i.err == nil && i.dir > dirEOI
147}
148
149func (i *dbIter) First() bool {
150	if i.err != nil {
151		return false
152	} else if i.dir == dirReleased {
153		i.err = ErrIterReleased
154		return false
155	}
156
157	if i.iter.First() {
158		i.dir = dirSOI
159		return i.next()
160	}
161	i.dir = dirEOI
162	i.iterErr()
163	return false
164}
165
166func (i *dbIter) Last() bool {
167	if i.err != nil {
168		return false
169	} else if i.dir == dirReleased {
170		i.err = ErrIterReleased
171		return false
172	}
173
174	if i.iter.Last() {
175		return i.prev()
176	}
177	i.dir = dirSOI
178	i.iterErr()
179	return false
180}
181
182func (i *dbIter) Seek(key []byte) bool {
183	if i.err != nil {
184		return false
185	} else if i.dir == dirReleased {
186		i.err = ErrIterReleased
187		return false
188	}
189
190	ikey := makeInternalKey(nil, key, i.seq, keyTypeSeek)
191	if i.iter.Seek(ikey) {
192		i.dir = dirSOI
193		return i.next()
194	}
195	i.dir = dirEOI
196	i.iterErr()
197	return false
198}
199
200func (i *dbIter) next() bool {
201	for {
202		if ukey, seq, kt, kerr := parseInternalKey(i.iter.Key()); kerr == nil {
203			i.sampleSeek()
204			if seq <= i.seq {
205				switch kt {
206				case keyTypeDel:
207					// Skip deleted key.
208					i.key = append(i.key[:0], ukey...)
209					i.dir = dirForward
210				case keyTypeVal:
211					if i.dir == dirSOI || i.icmp.uCompare(ukey, i.key) > 0 {
212						i.key = append(i.key[:0], ukey...)
213						i.value = append(i.value[:0], i.iter.Value()...)
214						i.dir = dirForward
215						return true
216					}
217				}
218			}
219		} else if i.strict {
220			i.setErr(kerr)
221			break
222		}
223		if !i.iter.Next() {
224			i.dir = dirEOI
225			i.iterErr()
226			break
227		}
228	}
229	return false
230}
231
232func (i *dbIter) Next() bool {
233	if i.dir == dirEOI || i.err != nil {
234		return false
235	} else if i.dir == dirReleased {
236		i.err = ErrIterReleased
237		return false
238	}
239
240	if !i.iter.Next() || (i.dir == dirBackward && !i.iter.Next()) {
241		i.dir = dirEOI
242		i.iterErr()
243		return false
244	}
245	return i.next()
246}
247
248func (i *dbIter) prev() bool {
249	i.dir = dirBackward
250	del := true
251	if i.iter.Valid() {
252		for {
253			if ukey, seq, kt, kerr := parseInternalKey(i.iter.Key()); kerr == nil {
254				i.sampleSeek()
255				if seq <= i.seq {
256					if !del && i.icmp.uCompare(ukey, i.key) < 0 {
257						return true
258					}
259					del = (kt == keyTypeDel)
260					if !del {
261						i.key = append(i.key[:0], ukey...)
262						i.value = append(i.value[:0], i.iter.Value()...)
263					}
264				}
265			} else if i.strict {
266				i.setErr(kerr)
267				return false
268			}
269			if !i.iter.Prev() {
270				break
271			}
272		}
273	}
274	if del {
275		i.dir = dirSOI
276		i.iterErr()
277		return false
278	}
279	return true
280}
281
282func (i *dbIter) Prev() bool {
283	if i.dir == dirSOI || i.err != nil {
284		return false
285	} else if i.dir == dirReleased {
286		i.err = ErrIterReleased
287		return false
288	}
289
290	switch i.dir {
291	case dirEOI:
292		return i.Last()
293	case dirForward:
294		for i.iter.Prev() {
295			if ukey, _, _, kerr := parseInternalKey(i.iter.Key()); kerr == nil {
296				i.sampleSeek()
297				if i.icmp.uCompare(ukey, i.key) < 0 {
298					goto cont
299				}
300			} else if i.strict {
301				i.setErr(kerr)
302				return false
303			}
304		}
305		i.dir = dirSOI
306		i.iterErr()
307		return false
308	}
309
310cont:
311	return i.prev()
312}
313
314func (i *dbIter) Key() []byte {
315	if i.err != nil || i.dir <= dirEOI {
316		return nil
317	}
318	return i.key
319}
320
321func (i *dbIter) Value() []byte {
322	if i.err != nil || i.dir <= dirEOI {
323		return nil
324	}
325	return i.value
326}
327
328func (i *dbIter) Release() {
329	if i.dir != dirReleased {
330		// Clear the finalizer.
331		runtime.SetFinalizer(i, nil)
332
333		if i.releaser != nil {
334			i.releaser.Release()
335			i.releaser = nil
336		}
337
338		i.dir = dirReleased
339		i.key = nil
340		i.value = nil
341		i.iter.Release()
342		i.iter = nil
343		atomic.AddInt32(&i.db.aliveIters, -1)
344		i.db = nil
345	}
346}
347
348func (i *dbIter) SetReleaser(releaser util.Releaser) {
349	if i.dir == dirReleased {
350		panic(util.ErrReleased)
351	}
352	if i.releaser != nil && releaser != nil {
353		panic(util.ErrHasReleaser)
354	}
355	i.releaser = releaser
356}
357
358func (i *dbIter) Error() error {
359	return i.err
360}
361