1// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
2// All rights reserved.
3//
4// Use of this source code is governed by a BSD-style license that can be
5// found in the LICENSE file.
6
7package leveldb
8
9import (
10	"container/list"
11	"fmt"
12	"io"
13	"os"
14	"runtime"
15	"strings"
16	"sync"
17	"sync/atomic"
18	"time"
19
20	"github.com/syndtr/goleveldb/leveldb/errors"
21	"github.com/syndtr/goleveldb/leveldb/iterator"
22	"github.com/syndtr/goleveldb/leveldb/journal"
23	"github.com/syndtr/goleveldb/leveldb/memdb"
24	"github.com/syndtr/goleveldb/leveldb/opt"
25	"github.com/syndtr/goleveldb/leveldb/storage"
26	"github.com/syndtr/goleveldb/leveldb/table"
27	"github.com/syndtr/goleveldb/leveldb/util"
28)
29
30// DB is a LevelDB database.
31type DB struct {
32	// Need 64-bit alignment.
33	seq uint64
34
35	// Stats. Need 64-bit alignment.
36	cWriteDelay            int64 // The cumulative duration of write delays
37	cWriteDelayN           int32 // The cumulative number of write delays
38	aliveSnaps, aliveIters int32
39
40	// Session.
41	s *session
42
43	// MemDB.
44	memMu           sync.RWMutex
45	memPool         chan *memdb.DB
46	mem, frozenMem  *memDB
47	journal         *journal.Writer
48	journalWriter   storage.Writer
49	journalFd       storage.FileDesc
50	frozenJournalFd storage.FileDesc
51	frozenSeq       uint64
52
53	// Snapshot.
54	snapsMu   sync.Mutex
55	snapsList *list.List
56
57	// Write.
58	batchPool    sync.Pool
59	writeMergeC  chan writeMerge
60	writeMergedC chan bool
61	writeLockC   chan struct{}
62	writeAckC    chan error
63	writeDelay   time.Duration
64	writeDelayN  int
65	tr           *Transaction
66
67	// Compaction.
68	compCommitLk     sync.Mutex
69	tcompCmdC        chan cCmd
70	tcompPauseC      chan chan<- struct{}
71	mcompCmdC        chan cCmd
72	compErrC         chan error
73	compPerErrC      chan error
74	compErrSetC      chan error
75	compWriteLocking bool
76	compStats        cStats
77	memdbMaxLevel    int // For testing.
78
79	// Close.
80	closeW sync.WaitGroup
81	closeC chan struct{}
82	closed uint32
83	closer io.Closer
84}
85
86func openDB(s *session) (*DB, error) {
87	s.log("db@open opening")
88	start := time.Now()
89	db := &DB{
90		s: s,
91		// Initial sequence
92		seq: s.stSeqNum,
93		// MemDB
94		memPool: make(chan *memdb.DB, 1),
95		// Snapshot
96		snapsList: list.New(),
97		// Write
98		batchPool:    sync.Pool{New: newBatch},
99		writeMergeC:  make(chan writeMerge),
100		writeMergedC: make(chan bool),
101		writeLockC:   make(chan struct{}, 1),
102		writeAckC:    make(chan error),
103		// Compaction
104		tcompCmdC:   make(chan cCmd),
105		tcompPauseC: make(chan chan<- struct{}),
106		mcompCmdC:   make(chan cCmd),
107		compErrC:    make(chan error),
108		compPerErrC: make(chan error),
109		compErrSetC: make(chan error),
110		// Close
111		closeC: make(chan struct{}),
112	}
113
114	// Read-only mode.
115	readOnly := s.o.GetReadOnly()
116
117	if readOnly {
118		// Recover journals (read-only mode).
119		if err := db.recoverJournalRO(); err != nil {
120			return nil, err
121		}
122	} else {
123		// Recover journals.
124		if err := db.recoverJournal(); err != nil {
125			return nil, err
126		}
127
128		// Remove any obsolete files.
129		if err := db.checkAndCleanFiles(); err != nil {
130			// Close journal.
131			if db.journal != nil {
132				db.journal.Close()
133				db.journalWriter.Close()
134			}
135			return nil, err
136		}
137
138	}
139
140	// Doesn't need to be included in the wait group.
141	go db.compactionError()
142	go db.mpoolDrain()
143
144	if readOnly {
145		db.SetReadOnly()
146	} else {
147		db.closeW.Add(2)
148		go db.tCompaction()
149		go db.mCompaction()
150		// go db.jWriter()
151	}
152
153	s.logf("db@open done T·%v", time.Since(start))
154
155	runtime.SetFinalizer(db, (*DB).Close)
156	return db, nil
157}
158
159// Open opens or creates a DB for the given storage.
160// The DB will be created if not exist, unless ErrorIfMissing is true.
161// Also, if ErrorIfExist is true and the DB exist Open will returns
162// os.ErrExist error.
163//
164// Open will return an error with type of ErrCorrupted if corruption
165// detected in the DB. Use errors.IsCorrupted to test whether an error is
166// due to corruption. Corrupted DB can be recovered with Recover function.
167//
168// The returned DB instance is safe for concurrent use.
169// The DB must be closed after use, by calling Close method.
170func Open(stor storage.Storage, o *opt.Options) (db *DB, err error) {
171	s, err := newSession(stor, o)
172	if err != nil {
173		return
174	}
175	defer func() {
176		if err != nil {
177			s.close()
178			s.release()
179		}
180	}()
181
182	err = s.recover()
183	if err != nil {
184		if !os.IsNotExist(err) || s.o.GetErrorIfMissing() {
185			return
186		}
187		err = s.create()
188		if err != nil {
189			return
190		}
191	} else if s.o.GetErrorIfExist() {
192		err = os.ErrExist
193		return
194	}
195
196	return openDB(s)
197}
198
199// OpenFile opens or creates a DB for the given path.
200// The DB will be created if not exist, unless ErrorIfMissing is true.
201// Also, if ErrorIfExist is true and the DB exist OpenFile will returns
202// os.ErrExist error.
203//
204// OpenFile uses standard file-system backed storage implementation as
205// described in the leveldb/storage package.
206//
207// OpenFile will return an error with type of ErrCorrupted if corruption
208// detected in the DB. Use errors.IsCorrupted to test whether an error is
209// due to corruption. Corrupted DB can be recovered with Recover function.
210//
211// The returned DB instance is safe for concurrent use.
212// The DB must be closed after use, by calling Close method.
213func OpenFile(path string, o *opt.Options) (db *DB, err error) {
214	stor, err := storage.OpenFile(path, o.GetReadOnly())
215	if err != nil {
216		return
217	}
218	db, err = Open(stor, o)
219	if err != nil {
220		stor.Close()
221	} else {
222		db.closer = stor
223	}
224	return
225}
226
227// Recover recovers and opens a DB with missing or corrupted manifest files
228// for the given storage. It will ignore any manifest files, valid or not.
229// The DB must already exist or it will returns an error.
230// Also, Recover will ignore ErrorIfMissing and ErrorIfExist options.
231//
232// The returned DB instance is safe for concurrent use.
233// The DB must be closed after use, by calling Close method.
234func Recover(stor storage.Storage, o *opt.Options) (db *DB, err error) {
235	s, err := newSession(stor, o)
236	if err != nil {
237		return
238	}
239	defer func() {
240		if err != nil {
241			s.close()
242			s.release()
243		}
244	}()
245
246	err = recoverTable(s, o)
247	if err != nil {
248		return
249	}
250	return openDB(s)
251}
252
253// RecoverFile recovers and opens a DB with missing or corrupted manifest files
254// for the given path. It will ignore any manifest files, valid or not.
255// The DB must already exist or it will returns an error.
256// Also, Recover will ignore ErrorIfMissing and ErrorIfExist options.
257//
258// RecoverFile uses standard file-system backed storage implementation as described
259// in the leveldb/storage package.
260//
261// The returned DB instance is safe for concurrent use.
262// The DB must be closed after use, by calling Close method.
263func RecoverFile(path string, o *opt.Options) (db *DB, err error) {
264	stor, err := storage.OpenFile(path, false)
265	if err != nil {
266		return
267	}
268	db, err = Recover(stor, o)
269	if err != nil {
270		stor.Close()
271	} else {
272		db.closer = stor
273	}
274	return
275}
276
277func recoverTable(s *session, o *opt.Options) error {
278	o = dupOptions(o)
279	// Mask StrictReader, lets StrictRecovery doing its job.
280	o.Strict &= ^opt.StrictReader
281
282	// Get all tables and sort it by file number.
283	fds, err := s.stor.List(storage.TypeTable)
284	if err != nil {
285		return err
286	}
287	sortFds(fds)
288
289	var (
290		maxSeq                                                            uint64
291		recoveredKey, goodKey, corruptedKey, corruptedBlock, droppedTable int
292
293		// We will drop corrupted table.
294		strict = o.GetStrict(opt.StrictRecovery)
295		noSync = o.GetNoSync()
296
297		rec   = &sessionRecord{}
298		bpool = util.NewBufferPool(o.GetBlockSize() + 5)
299	)
300	buildTable := func(iter iterator.Iterator) (tmpFd storage.FileDesc, size int64, err error) {
301		tmpFd = s.newTemp()
302		writer, err := s.stor.Create(tmpFd)
303		if err != nil {
304			return
305		}
306		defer func() {
307			writer.Close()
308			if err != nil {
309				s.stor.Remove(tmpFd)
310				tmpFd = storage.FileDesc{}
311			}
312		}()
313
314		// Copy entries.
315		tw := table.NewWriter(writer, o)
316		for iter.Next() {
317			key := iter.Key()
318			if validInternalKey(key) {
319				err = tw.Append(key, iter.Value())
320				if err != nil {
321					return
322				}
323			}
324		}
325		err = iter.Error()
326		if err != nil && !errors.IsCorrupted(err) {
327			return
328		}
329		err = tw.Close()
330		if err != nil {
331			return
332		}
333		if !noSync {
334			err = writer.Sync()
335			if err != nil {
336				return
337			}
338		}
339		size = int64(tw.BytesLen())
340		return
341	}
342	recoverTable := func(fd storage.FileDesc) error {
343		s.logf("table@recovery recovering @%d", fd.Num)
344		reader, err := s.stor.Open(fd)
345		if err != nil {
346			return err
347		}
348		var closed bool
349		defer func() {
350			if !closed {
351				reader.Close()
352			}
353		}()
354
355		// Get file size.
356		size, err := reader.Seek(0, 2)
357		if err != nil {
358			return err
359		}
360
361		var (
362			tSeq                                     uint64
363			tgoodKey, tcorruptedKey, tcorruptedBlock int
364			imin, imax                               []byte
365		)
366		tr, err := table.NewReader(reader, size, fd, nil, bpool, o)
367		if err != nil {
368			return err
369		}
370		iter := tr.NewIterator(nil, nil)
371		if itererr, ok := iter.(iterator.ErrorCallbackSetter); ok {
372			itererr.SetErrorCallback(func(err error) {
373				if errors.IsCorrupted(err) {
374					s.logf("table@recovery block corruption @%d %q", fd.Num, err)
375					tcorruptedBlock++
376				}
377			})
378		}
379
380		// Scan the table.
381		for iter.Next() {
382			key := iter.Key()
383			_, seq, _, kerr := parseInternalKey(key)
384			if kerr != nil {
385				tcorruptedKey++
386				continue
387			}
388			tgoodKey++
389			if seq > tSeq {
390				tSeq = seq
391			}
392			if imin == nil {
393				imin = append([]byte{}, key...)
394			}
395			imax = append(imax[:0], key...)
396		}
397		if err := iter.Error(); err != nil && !errors.IsCorrupted(err) {
398			iter.Release()
399			return err
400		}
401		iter.Release()
402
403		goodKey += tgoodKey
404		corruptedKey += tcorruptedKey
405		corruptedBlock += tcorruptedBlock
406
407		if strict && (tcorruptedKey > 0 || tcorruptedBlock > 0) {
408			droppedTable++
409			s.logf("table@recovery dropped @%d Gk·%d Ck·%d Cb·%d S·%d Q·%d", fd.Num, tgoodKey, tcorruptedKey, tcorruptedBlock, size, tSeq)
410			return nil
411		}
412
413		if tgoodKey > 0 {
414			if tcorruptedKey > 0 || tcorruptedBlock > 0 {
415				// Rebuild the table.
416				s.logf("table@recovery rebuilding @%d", fd.Num)
417				iter := tr.NewIterator(nil, nil)
418				tmpFd, newSize, err := buildTable(iter)
419				iter.Release()
420				if err != nil {
421					return err
422				}
423				closed = true
424				reader.Close()
425				if err := s.stor.Rename(tmpFd, fd); err != nil {
426					return err
427				}
428				size = newSize
429			}
430			if tSeq > maxSeq {
431				maxSeq = tSeq
432			}
433			recoveredKey += tgoodKey
434			// Add table to level 0.
435			rec.addTable(0, fd.Num, size, imin, imax)
436			s.logf("table@recovery recovered @%d Gk·%d Ck·%d Cb·%d S·%d Q·%d", fd.Num, tgoodKey, tcorruptedKey, tcorruptedBlock, size, tSeq)
437		} else {
438			droppedTable++
439			s.logf("table@recovery unrecoverable @%d Ck·%d Cb·%d S·%d", fd.Num, tcorruptedKey, tcorruptedBlock, size)
440		}
441
442		return nil
443	}
444
445	// Recover all tables.
446	if len(fds) > 0 {
447		s.logf("table@recovery F·%d", len(fds))
448
449		// Mark file number as used.
450		s.markFileNum(fds[len(fds)-1].Num)
451
452		for _, fd := range fds {
453			if err := recoverTable(fd); err != nil {
454				return err
455			}
456		}
457
458		s.logf("table@recovery recovered F·%d N·%d Gk·%d Ck·%d Q·%d", len(fds), recoveredKey, goodKey, corruptedKey, maxSeq)
459	}
460
461	// Set sequence number.
462	rec.setSeqNum(maxSeq)
463
464	// Create new manifest.
465	if err := s.create(); err != nil {
466		return err
467	}
468
469	// Commit.
470	return s.commit(rec)
471}
472
473func (db *DB) recoverJournal() error {
474	// Get all journals and sort it by file number.
475	rawFds, err := db.s.stor.List(storage.TypeJournal)
476	if err != nil {
477		return err
478	}
479	sortFds(rawFds)
480
481	// Journals that will be recovered.
482	var fds []storage.FileDesc
483	for _, fd := range rawFds {
484		if fd.Num >= db.s.stJournalNum || fd.Num == db.s.stPrevJournalNum {
485			fds = append(fds, fd)
486		}
487	}
488
489	var (
490		ofd storage.FileDesc // Obsolete file.
491		rec = &sessionRecord{}
492	)
493
494	// Recover journals.
495	if len(fds) > 0 {
496		db.logf("journal@recovery F·%d", len(fds))
497
498		// Mark file number as used.
499		db.s.markFileNum(fds[len(fds)-1].Num)
500
501		var (
502			// Options.
503			strict      = db.s.o.GetStrict(opt.StrictJournal)
504			checksum    = db.s.o.GetStrict(opt.StrictJournalChecksum)
505			writeBuffer = db.s.o.GetWriteBuffer()
506
507			jr       *journal.Reader
508			mdb      = memdb.New(db.s.icmp, writeBuffer)
509			buf      = &util.Buffer{}
510			batchSeq uint64
511			batchLen int
512		)
513
514		for _, fd := range fds {
515			db.logf("journal@recovery recovering @%d", fd.Num)
516
517			fr, err := db.s.stor.Open(fd)
518			if err != nil {
519				return err
520			}
521
522			// Create or reset journal reader instance.
523			if jr == nil {
524				jr = journal.NewReader(fr, dropper{db.s, fd}, strict, checksum)
525			} else {
526				jr.Reset(fr, dropper{db.s, fd}, strict, checksum)
527			}
528
529			// Flush memdb and remove obsolete journal file.
530			if !ofd.Zero() {
531				if mdb.Len() > 0 {
532					if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil {
533						fr.Close()
534						return err
535					}
536				}
537
538				rec.setJournalNum(fd.Num)
539				rec.setSeqNum(db.seq)
540				if err := db.s.commit(rec); err != nil {
541					fr.Close()
542					return err
543				}
544				rec.resetAddedTables()
545
546				db.s.stor.Remove(ofd)
547				ofd = storage.FileDesc{}
548			}
549
550			// Replay journal to memdb.
551			mdb.Reset()
552			for {
553				r, err := jr.Next()
554				if err != nil {
555					if err == io.EOF {
556						break
557					}
558
559					fr.Close()
560					return errors.SetFd(err, fd)
561				}
562
563				buf.Reset()
564				if _, err := buf.ReadFrom(r); err != nil {
565					if err == io.ErrUnexpectedEOF {
566						// This is error returned due to corruption, with strict == false.
567						continue
568					}
569
570					fr.Close()
571					return errors.SetFd(err, fd)
572				}
573				batchSeq, batchLen, err = decodeBatchToMem(buf.Bytes(), db.seq, mdb)
574				if err != nil {
575					if !strict && errors.IsCorrupted(err) {
576						db.s.logf("journal error: %v (skipped)", err)
577						// We won't apply sequence number as it might be corrupted.
578						continue
579					}
580
581					fr.Close()
582					return errors.SetFd(err, fd)
583				}
584
585				// Save sequence number.
586				db.seq = batchSeq + uint64(batchLen)
587
588				// Flush it if large enough.
589				if mdb.Size() >= writeBuffer {
590					if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil {
591						fr.Close()
592						return err
593					}
594
595					mdb.Reset()
596				}
597			}
598
599			fr.Close()
600			ofd = fd
601		}
602
603		// Flush the last memdb.
604		if mdb.Len() > 0 {
605			if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil {
606				return err
607			}
608		}
609	}
610
611	// Create a new journal.
612	if _, err := db.newMem(0); err != nil {
613		return err
614	}
615
616	// Commit.
617	rec.setJournalNum(db.journalFd.Num)
618	rec.setSeqNum(db.seq)
619	if err := db.s.commit(rec); err != nil {
620		// Close journal on error.
621		if db.journal != nil {
622			db.journal.Close()
623			db.journalWriter.Close()
624		}
625		return err
626	}
627
628	// Remove the last obsolete journal file.
629	if !ofd.Zero() {
630		db.s.stor.Remove(ofd)
631	}
632
633	return nil
634}
635
636func (db *DB) recoverJournalRO() error {
637	// Get all journals and sort it by file number.
638	rawFds, err := db.s.stor.List(storage.TypeJournal)
639	if err != nil {
640		return err
641	}
642	sortFds(rawFds)
643
644	// Journals that will be recovered.
645	var fds []storage.FileDesc
646	for _, fd := range rawFds {
647		if fd.Num >= db.s.stJournalNum || fd.Num == db.s.stPrevJournalNum {
648			fds = append(fds, fd)
649		}
650	}
651
652	var (
653		// Options.
654		strict      = db.s.o.GetStrict(opt.StrictJournal)
655		checksum    = db.s.o.GetStrict(opt.StrictJournalChecksum)
656		writeBuffer = db.s.o.GetWriteBuffer()
657
658		mdb = memdb.New(db.s.icmp, writeBuffer)
659	)
660
661	// Recover journals.
662	if len(fds) > 0 {
663		db.logf("journal@recovery RO·Mode F·%d", len(fds))
664
665		var (
666			jr       *journal.Reader
667			buf      = &util.Buffer{}
668			batchSeq uint64
669			batchLen int
670		)
671
672		for _, fd := range fds {
673			db.logf("journal@recovery recovering @%d", fd.Num)
674
675			fr, err := db.s.stor.Open(fd)
676			if err != nil {
677				return err
678			}
679
680			// Create or reset journal reader instance.
681			if jr == nil {
682				jr = journal.NewReader(fr, dropper{db.s, fd}, strict, checksum)
683			} else {
684				jr.Reset(fr, dropper{db.s, fd}, strict, checksum)
685			}
686
687			// Replay journal to memdb.
688			for {
689				r, err := jr.Next()
690				if err != nil {
691					if err == io.EOF {
692						break
693					}
694
695					fr.Close()
696					return errors.SetFd(err, fd)
697				}
698
699				buf.Reset()
700				if _, err := buf.ReadFrom(r); err != nil {
701					if err == io.ErrUnexpectedEOF {
702						// This is error returned due to corruption, with strict == false.
703						continue
704					}
705
706					fr.Close()
707					return errors.SetFd(err, fd)
708				}
709				batchSeq, batchLen, err = decodeBatchToMem(buf.Bytes(), db.seq, mdb)
710				if err != nil {
711					if !strict && errors.IsCorrupted(err) {
712						db.s.logf("journal error: %v (skipped)", err)
713						// We won't apply sequence number as it might be corrupted.
714						continue
715					}
716
717					fr.Close()
718					return errors.SetFd(err, fd)
719				}
720
721				// Save sequence number.
722				db.seq = batchSeq + uint64(batchLen)
723			}
724
725			fr.Close()
726		}
727	}
728
729	// Set memDB.
730	db.mem = &memDB{db: db, DB: mdb, ref: 1}
731
732	return nil
733}
734
735func memGet(mdb *memdb.DB, ikey internalKey, icmp *iComparer) (ok bool, mv []byte, err error) {
736	mk, mv, err := mdb.Find(ikey)
737	if err == nil {
738		ukey, _, kt, kerr := parseInternalKey(mk)
739		if kerr != nil {
740			// Shouldn't have had happen.
741			panic(kerr)
742		}
743		if icmp.uCompare(ukey, ikey.ukey()) == 0 {
744			if kt == keyTypeDel {
745				return true, nil, ErrNotFound
746			}
747			return true, mv, nil
748
749		}
750	} else if err != ErrNotFound {
751		return true, nil, err
752	}
753	return
754}
755
756func (db *DB) get(auxm *memdb.DB, auxt tFiles, key []byte, seq uint64, ro *opt.ReadOptions) (value []byte, err error) {
757	ikey := makeInternalKey(nil, key, seq, keyTypeSeek)
758
759	if auxm != nil {
760		if ok, mv, me := memGet(auxm, ikey, db.s.icmp); ok {
761			return append([]byte{}, mv...), me
762		}
763	}
764
765	em, fm := db.getMems()
766	for _, m := range [...]*memDB{em, fm} {
767		if m == nil {
768			continue
769		}
770		defer m.decref()
771
772		if ok, mv, me := memGet(m.DB, ikey, db.s.icmp); ok {
773			return append([]byte{}, mv...), me
774		}
775	}
776
777	v := db.s.version()
778	value, cSched, err := v.get(auxt, ikey, ro, false)
779	v.release()
780	if cSched {
781		// Trigger table compaction.
782		db.compTrigger(db.tcompCmdC)
783	}
784	return
785}
786
787func nilIfNotFound(err error) error {
788	if err == ErrNotFound {
789		return nil
790	}
791	return err
792}
793
794func (db *DB) has(auxm *memdb.DB, auxt tFiles, key []byte, seq uint64, ro *opt.ReadOptions) (ret bool, err error) {
795	ikey := makeInternalKey(nil, key, seq, keyTypeSeek)
796
797	if auxm != nil {
798		if ok, _, me := memGet(auxm, ikey, db.s.icmp); ok {
799			return me == nil, nilIfNotFound(me)
800		}
801	}
802
803	em, fm := db.getMems()
804	for _, m := range [...]*memDB{em, fm} {
805		if m == nil {
806			continue
807		}
808		defer m.decref()
809
810		if ok, _, me := memGet(m.DB, ikey, db.s.icmp); ok {
811			return me == nil, nilIfNotFound(me)
812		}
813	}
814
815	v := db.s.version()
816	_, cSched, err := v.get(auxt, ikey, ro, true)
817	v.release()
818	if cSched {
819		// Trigger table compaction.
820		db.compTrigger(db.tcompCmdC)
821	}
822	if err == nil {
823		ret = true
824	} else if err == ErrNotFound {
825		err = nil
826	}
827	return
828}
829
830// Get gets the value for the given key. It returns ErrNotFound if the
831// DB does not contains the key.
832//
833// The returned slice is its own copy, it is safe to modify the contents
834// of the returned slice.
835// It is safe to modify the contents of the argument after Get returns.
836func (db *DB) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) {
837	err = db.ok()
838	if err != nil {
839		return
840	}
841
842	se := db.acquireSnapshot()
843	defer db.releaseSnapshot(se)
844	return db.get(nil, nil, key, se.seq, ro)
845}
846
847// Has returns true if the DB does contains the given key.
848//
849// It is safe to modify the contents of the argument after Has returns.
850func (db *DB) Has(key []byte, ro *opt.ReadOptions) (ret bool, err error) {
851	err = db.ok()
852	if err != nil {
853		return
854	}
855
856	se := db.acquireSnapshot()
857	defer db.releaseSnapshot(se)
858	return db.has(nil, nil, key, se.seq, ro)
859}
860
861// NewIterator returns an iterator for the latest snapshot of the
862// underlying DB.
863// The returned iterator is not safe for concurrent use, but it is safe to use
864// multiple iterators concurrently, with each in a dedicated goroutine.
865// It is also safe to use an iterator concurrently with modifying its
866// underlying DB. The resultant key/value pairs are guaranteed to be
867// consistent.
868//
869// Slice allows slicing the iterator to only contains keys in the given
870// range. A nil Range.Start is treated as a key before all keys in the
871// DB. And a nil Range.Limit is treated as a key after all keys in
872// the DB.
873//
874// The iterator must be released after use, by calling Release method.
875//
876// Also read Iterator documentation of the leveldb/iterator package.
877func (db *DB) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
878	if err := db.ok(); err != nil {
879		return iterator.NewEmptyIterator(err)
880	}
881
882	se := db.acquireSnapshot()
883	defer db.releaseSnapshot(se)
884	// Iterator holds 'version' lock, 'version' is immutable so snapshot
885	// can be released after iterator created.
886	return db.newIterator(nil, nil, se.seq, slice, ro)
887}
888
889// GetSnapshot returns a latest snapshot of the underlying DB. A snapshot
890// is a frozen snapshot of a DB state at a particular point in time. The
891// content of snapshot are guaranteed to be consistent.
892//
893// The snapshot must be released after use, by calling Release method.
894func (db *DB) GetSnapshot() (*Snapshot, error) {
895	if err := db.ok(); err != nil {
896		return nil, err
897	}
898
899	return db.newSnapshot(), nil
900}
901
902// GetProperty returns value of the given property name.
903//
904// Property names:
905//	leveldb.num-files-at-level{n}
906//		Returns the number of files at level 'n'.
907//	leveldb.stats
908//		Returns statistics of the underlying DB.
909//	leveldb.iostats
910//		Returns statistics of effective disk read and write.
911//	leveldb.writedelay
912//		Returns cumulative write delay caused by compaction.
913//	leveldb.sstables
914//		Returns sstables list for each level.
915//	leveldb.blockpool
916//		Returns block pool stats.
917//	leveldb.cachedblock
918//		Returns size of cached block.
919//	leveldb.openedtables
920//		Returns number of opened tables.
921//	leveldb.alivesnaps
922//		Returns number of alive snapshots.
923//	leveldb.aliveiters
924//		Returns number of alive iterators.
925func (db *DB) GetProperty(name string) (value string, err error) {
926	err = db.ok()
927	if err != nil {
928		return
929	}
930
931	const prefix = "leveldb."
932	if !strings.HasPrefix(name, prefix) {
933		return "", ErrNotFound
934	}
935	p := name[len(prefix):]
936
937	v := db.s.version()
938	defer v.release()
939
940	numFilesPrefix := "num-files-at-level"
941	switch {
942	case strings.HasPrefix(p, numFilesPrefix):
943		var level uint
944		var rest string
945		n, _ := fmt.Sscanf(p[len(numFilesPrefix):], "%d%s", &level, &rest)
946		if n != 1 {
947			err = ErrNotFound
948		} else {
949			value = fmt.Sprint(v.tLen(int(level)))
950		}
951	case p == "stats":
952		value = "Compactions\n" +
953			" Level |   Tables   |    Size(MB)   |    Time(sec)  |    Read(MB)   |   Write(MB)\n" +
954			"-------+------------+---------------+---------------+---------------+---------------\n"
955		for level, tables := range v.levels {
956			duration, read, write := db.compStats.getStat(level)
957			if len(tables) == 0 && duration == 0 {
958				continue
959			}
960			value += fmt.Sprintf(" %3d   | %10d | %13.5f | %13.5f | %13.5f | %13.5f\n",
961				level, len(tables), float64(tables.size())/1048576.0, duration.Seconds(),
962				float64(read)/1048576.0, float64(write)/1048576.0)
963		}
964	case p == "iostats":
965		value = fmt.Sprintf("Read(MB):%.5f Write(MB):%.5f",
966			float64(db.s.stor.reads())/1048576.0,
967			float64(db.s.stor.writes())/1048576.0)
968	case p == "writedelay":
969		writeDelayN, writeDelay := atomic.LoadInt32(&db.cWriteDelayN), time.Duration(atomic.LoadInt64(&db.cWriteDelay))
970		value = fmt.Sprintf("DelayN:%d Delay:%s", writeDelayN, writeDelay)
971	case p == "sstables":
972		for level, tables := range v.levels {
973			value += fmt.Sprintf("--- level %d ---\n", level)
974			for _, t := range tables {
975				value += fmt.Sprintf("%d:%d[%q .. %q]\n", t.fd.Num, t.size, t.imin, t.imax)
976			}
977		}
978	case p == "blockpool":
979		value = fmt.Sprintf("%v", db.s.tops.bpool)
980	case p == "cachedblock":
981		if db.s.tops.bcache != nil {
982			value = fmt.Sprintf("%d", db.s.tops.bcache.Size())
983		} else {
984			value = "<nil>"
985		}
986	case p == "openedtables":
987		value = fmt.Sprintf("%d", db.s.tops.cache.Size())
988	case p == "alivesnaps":
989		value = fmt.Sprintf("%d", atomic.LoadInt32(&db.aliveSnaps))
990	case p == "aliveiters":
991		value = fmt.Sprintf("%d", atomic.LoadInt32(&db.aliveIters))
992	default:
993		err = ErrNotFound
994	}
995
996	return
997}
998
999// SizeOf calculates approximate sizes of the given key ranges.
1000// The length of the returned sizes are equal with the length of the given
1001// ranges. The returned sizes measure storage space usage, so if the user
1002// data compresses by a factor of ten, the returned sizes will be one-tenth
1003// the size of the corresponding user data size.
1004// The results may not include the sizes of recently written data.
1005func (db *DB) SizeOf(ranges []util.Range) (Sizes, error) {
1006	if err := db.ok(); err != nil {
1007		return nil, err
1008	}
1009
1010	v := db.s.version()
1011	defer v.release()
1012
1013	sizes := make(Sizes, 0, len(ranges))
1014	for _, r := range ranges {
1015		imin := makeInternalKey(nil, r.Start, keyMaxSeq, keyTypeSeek)
1016		imax := makeInternalKey(nil, r.Limit, keyMaxSeq, keyTypeSeek)
1017		start, err := v.offsetOf(imin)
1018		if err != nil {
1019			return nil, err
1020		}
1021		limit, err := v.offsetOf(imax)
1022		if err != nil {
1023			return nil, err
1024		}
1025		var size int64
1026		if limit >= start {
1027			size = limit - start
1028		}
1029		sizes = append(sizes, size)
1030	}
1031
1032	return sizes, nil
1033}
1034
1035// Close closes the DB. This will also releases any outstanding snapshot,
1036// abort any in-flight compaction and discard open transaction.
1037//
1038// It is not safe to close a DB until all outstanding iterators are released.
1039// It is valid to call Close multiple times. Other methods should not be
1040// called after the DB has been closed.
1041func (db *DB) Close() error {
1042	if !db.setClosed() {
1043		return ErrClosed
1044	}
1045
1046	start := time.Now()
1047	db.log("db@close closing")
1048
1049	// Clear the finalizer.
1050	runtime.SetFinalizer(db, nil)
1051
1052	// Get compaction error.
1053	var err error
1054	select {
1055	case err = <-db.compErrC:
1056		if err == ErrReadOnly {
1057			err = nil
1058		}
1059	default:
1060	}
1061
1062	// Signal all goroutines.
1063	close(db.closeC)
1064
1065	// Discard open transaction.
1066	if db.tr != nil {
1067		db.tr.Discard()
1068	}
1069
1070	// Acquire writer lock.
1071	db.writeLockC <- struct{}{}
1072
1073	// Wait for all gorotines to exit.
1074	db.closeW.Wait()
1075
1076	// Closes journal.
1077	if db.journal != nil {
1078		db.journal.Close()
1079		db.journalWriter.Close()
1080		db.journal = nil
1081		db.journalWriter = nil
1082	}
1083
1084	if db.writeDelayN > 0 {
1085		db.logf("db@write was delayed N·%d T·%v", db.writeDelayN, db.writeDelay)
1086	}
1087
1088	// Close session.
1089	db.s.close()
1090	db.logf("db@close done T·%v", time.Since(start))
1091	db.s.release()
1092
1093	if db.closer != nil {
1094		if err1 := db.closer.Close(); err == nil {
1095			err = err1
1096		}
1097		db.closer = nil
1098	}
1099
1100	// Clear memdbs.
1101	db.clearMems()
1102
1103	return err
1104}
1105