1// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com> 2// All rights reserved. 3// 4// Use of this source code is governed by a BSD-style license that can be 5// found in the LICENSE file. 6 7package leveldb 8 9import ( 10 "errors" 11 "math/rand" 12 "runtime" 13 "sync" 14 "sync/atomic" 15 16 "github.com/syndtr/goleveldb/leveldb/iterator" 17 "github.com/syndtr/goleveldb/leveldb/opt" 18 "github.com/syndtr/goleveldb/leveldb/util" 19) 20 21var ( 22 errInvalidInternalKey = errors.New("leveldb: Iterator: invalid internal key") 23) 24 25type memdbReleaser struct { 26 once sync.Once 27 m *memDB 28} 29 30func (mr *memdbReleaser) Release() { 31 mr.once.Do(func() { 32 mr.m.decref() 33 }) 34} 35 36func (db *DB) newRawIterator(auxm *memDB, auxt tFiles, slice *util.Range, ro *opt.ReadOptions) iterator.Iterator { 37 strict := opt.GetStrict(db.s.o.Options, ro, opt.StrictReader) 38 em, fm := db.getMems() 39 v := db.s.version() 40 41 tableIts := v.getIterators(slice, ro) 42 n := len(tableIts) + len(auxt) + 3 43 its := make([]iterator.Iterator, 0, n) 44 45 if auxm != nil { 46 ami := auxm.NewIterator(slice) 47 ami.SetReleaser(&memdbReleaser{m: auxm}) 48 its = append(its, ami) 49 } 50 for _, t := range auxt { 51 its = append(its, v.s.tops.newIterator(t, slice, ro)) 52 } 53 54 emi := em.NewIterator(slice) 55 emi.SetReleaser(&memdbReleaser{m: em}) 56 its = append(its, emi) 57 if fm != nil { 58 fmi := fm.NewIterator(slice) 59 fmi.SetReleaser(&memdbReleaser{m: fm}) 60 its = append(its, fmi) 61 } 62 its = append(its, tableIts...) 63 mi := iterator.NewMergedIterator(its, db.s.icmp, strict) 64 mi.SetReleaser(&versionReleaser{v: v}) 65 return mi 66} 67 68func (db *DB) newIterator(auxm *memDB, auxt tFiles, seq uint64, slice *util.Range, ro *opt.ReadOptions) *dbIter { 69 var islice *util.Range 70 if slice != nil { 71 islice = &util.Range{} 72 if slice.Start != nil { 73 islice.Start = makeInternalKey(nil, slice.Start, keyMaxSeq, keyTypeSeek) 74 } 75 if slice.Limit != nil { 76 islice.Limit = makeInternalKey(nil, slice.Limit, keyMaxSeq, keyTypeSeek) 77 } 78 } 79 rawIter := db.newRawIterator(auxm, auxt, islice, ro) 80 iter := &dbIter{ 81 db: db, 82 icmp: db.s.icmp, 83 iter: rawIter, 84 seq: seq, 85 strict: opt.GetStrict(db.s.o.Options, ro, opt.StrictReader), 86 key: make([]byte, 0), 87 value: make([]byte, 0), 88 } 89 atomic.AddInt32(&db.aliveIters, 1) 90 runtime.SetFinalizer(iter, (*dbIter).Release) 91 return iter 92} 93 94func (db *DB) iterSamplingRate() int { 95 return rand.Intn(2 * db.s.o.GetIteratorSamplingRate()) 96} 97 98type dir int 99 100const ( 101 dirReleased dir = iota - 1 102 dirSOI 103 dirEOI 104 dirBackward 105 dirForward 106) 107 108// dbIter represent an interator states over a database session. 109type dbIter struct { 110 db *DB 111 icmp *iComparer 112 iter iterator.Iterator 113 seq uint64 114 strict bool 115 116 smaplingGap int 117 dir dir 118 key []byte 119 value []byte 120 err error 121 releaser util.Releaser 122} 123 124func (i *dbIter) sampleSeek() { 125 ikey := i.iter.Key() 126 i.smaplingGap -= len(ikey) + len(i.iter.Value()) 127 for i.smaplingGap < 0 { 128 i.smaplingGap += i.db.iterSamplingRate() 129 i.db.sampleSeek(ikey) 130 } 131} 132 133func (i *dbIter) setErr(err error) { 134 i.err = err 135 i.key = nil 136 i.value = nil 137} 138 139func (i *dbIter) iterErr() { 140 if err := i.iter.Error(); err != nil { 141 i.setErr(err) 142 } 143} 144 145func (i *dbIter) Valid() bool { 146 return i.err == nil && i.dir > dirEOI 147} 148 149func (i *dbIter) First() bool { 150 if i.err != nil { 151 return false 152 } else if i.dir == dirReleased { 153 i.err = ErrIterReleased 154 return false 155 } 156 157 if i.iter.First() { 158 i.dir = dirSOI 159 return i.next() 160 } 161 i.dir = dirEOI 162 i.iterErr() 163 return false 164} 165 166func (i *dbIter) Last() bool { 167 if i.err != nil { 168 return false 169 } else if i.dir == dirReleased { 170 i.err = ErrIterReleased 171 return false 172 } 173 174 if i.iter.Last() { 175 return i.prev() 176 } 177 i.dir = dirSOI 178 i.iterErr() 179 return false 180} 181 182func (i *dbIter) Seek(key []byte) bool { 183 if i.err != nil { 184 return false 185 } else if i.dir == dirReleased { 186 i.err = ErrIterReleased 187 return false 188 } 189 190 ikey := makeInternalKey(nil, key, i.seq, keyTypeSeek) 191 if i.iter.Seek(ikey) { 192 i.dir = dirSOI 193 return i.next() 194 } 195 i.dir = dirEOI 196 i.iterErr() 197 return false 198} 199 200func (i *dbIter) next() bool { 201 for { 202 if ukey, seq, kt, kerr := parseInternalKey(i.iter.Key()); kerr == nil { 203 i.sampleSeek() 204 if seq <= i.seq { 205 switch kt { 206 case keyTypeDel: 207 // Skip deleted key. 208 i.key = append(i.key[:0], ukey...) 209 i.dir = dirForward 210 case keyTypeVal: 211 if i.dir == dirSOI || i.icmp.uCompare(ukey, i.key) > 0 { 212 i.key = append(i.key[:0], ukey...) 213 i.value = append(i.value[:0], i.iter.Value()...) 214 i.dir = dirForward 215 return true 216 } 217 } 218 } 219 } else if i.strict { 220 i.setErr(kerr) 221 break 222 } 223 if !i.iter.Next() { 224 i.dir = dirEOI 225 i.iterErr() 226 break 227 } 228 } 229 return false 230} 231 232func (i *dbIter) Next() bool { 233 if i.dir == dirEOI || i.err != nil { 234 return false 235 } else if i.dir == dirReleased { 236 i.err = ErrIterReleased 237 return false 238 } 239 240 if !i.iter.Next() || (i.dir == dirBackward && !i.iter.Next()) { 241 i.dir = dirEOI 242 i.iterErr() 243 return false 244 } 245 return i.next() 246} 247 248func (i *dbIter) prev() bool { 249 i.dir = dirBackward 250 del := true 251 if i.iter.Valid() { 252 for { 253 if ukey, seq, kt, kerr := parseInternalKey(i.iter.Key()); kerr == nil { 254 i.sampleSeek() 255 if seq <= i.seq { 256 if !del && i.icmp.uCompare(ukey, i.key) < 0 { 257 return true 258 } 259 del = (kt == keyTypeDel) 260 if !del { 261 i.key = append(i.key[:0], ukey...) 262 i.value = append(i.value[:0], i.iter.Value()...) 263 } 264 } 265 } else if i.strict { 266 i.setErr(kerr) 267 return false 268 } 269 if !i.iter.Prev() { 270 break 271 } 272 } 273 } 274 if del { 275 i.dir = dirSOI 276 i.iterErr() 277 return false 278 } 279 return true 280} 281 282func (i *dbIter) Prev() bool { 283 if i.dir == dirSOI || i.err != nil { 284 return false 285 } else if i.dir == dirReleased { 286 i.err = ErrIterReleased 287 return false 288 } 289 290 switch i.dir { 291 case dirEOI: 292 return i.Last() 293 case dirForward: 294 for i.iter.Prev() { 295 if ukey, _, _, kerr := parseInternalKey(i.iter.Key()); kerr == nil { 296 i.sampleSeek() 297 if i.icmp.uCompare(ukey, i.key) < 0 { 298 goto cont 299 } 300 } else if i.strict { 301 i.setErr(kerr) 302 return false 303 } 304 } 305 i.dir = dirSOI 306 i.iterErr() 307 return false 308 } 309 310cont: 311 return i.prev() 312} 313 314func (i *dbIter) Key() []byte { 315 if i.err != nil || i.dir <= dirEOI { 316 return nil 317 } 318 return i.key 319} 320 321func (i *dbIter) Value() []byte { 322 if i.err != nil || i.dir <= dirEOI { 323 return nil 324 } 325 return i.value 326} 327 328func (i *dbIter) Release() { 329 if i.dir != dirReleased { 330 // Clear the finalizer. 331 runtime.SetFinalizer(i, nil) 332 333 if i.releaser != nil { 334 i.releaser.Release() 335 i.releaser = nil 336 } 337 338 i.dir = dirReleased 339 i.key = nil 340 i.value = nil 341 i.iter.Release() 342 i.iter = nil 343 atomic.AddInt32(&i.db.aliveIters, -1) 344 i.db = nil 345 } 346} 347 348func (i *dbIter) SetReleaser(releaser util.Releaser) { 349 if i.dir == dirReleased { 350 panic(util.ErrReleased) 351 } 352 if i.releaser != nil && releaser != nil { 353 panic(util.ErrHasReleaser) 354 } 355 i.releaser = releaser 356} 357 358func (i *dbIter) Error() error { 359 return i.err 360} 361