1// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
2// All rights reserved.
3//
4// Use of this source code is governed by a BSD-style license that can be
5// found in the LICENSE file.
6
7package leveldb
8
9import (
10	"container/list"
11	"fmt"
12	"io"
13	"os"
14	"runtime"
15	"strings"
16	"sync"
17	"sync/atomic"
18	"time"
19
20	"github.com/syndtr/goleveldb/leveldb/errors"
21	"github.com/syndtr/goleveldb/leveldb/iterator"
22	"github.com/syndtr/goleveldb/leveldb/journal"
23	"github.com/syndtr/goleveldb/leveldb/memdb"
24	"github.com/syndtr/goleveldb/leveldb/opt"
25	"github.com/syndtr/goleveldb/leveldb/storage"
26	"github.com/syndtr/goleveldb/leveldb/table"
27	"github.com/syndtr/goleveldb/leveldb/util"
28)
29
30// DB is a LevelDB database.
31type DB struct {
32	// Need 64-bit alignment.
33	seq uint64
34
35	// Stats. Need 64-bit alignment.
36	cWriteDelay            int64 // The cumulative duration of write delays
37	cWriteDelayN           int32 // The cumulative number of write delays
38	inWritePaused          int32 // The indicator whether write operation is paused by compaction
39	aliveSnaps, aliveIters int32
40
41	// Session.
42	s *session
43
44	// MemDB.
45	memMu           sync.RWMutex
46	memPool         chan *memdb.DB
47	mem, frozenMem  *memDB
48	journal         *journal.Writer
49	journalWriter   storage.Writer
50	journalFd       storage.FileDesc
51	frozenJournalFd storage.FileDesc
52	frozenSeq       uint64
53
54	// Snapshot.
55	snapsMu   sync.Mutex
56	snapsList *list.List
57
58	// Write.
59	batchPool    sync.Pool
60	writeMergeC  chan writeMerge
61	writeMergedC chan bool
62	writeLockC   chan struct{}
63	writeAckC    chan error
64	writeDelay   time.Duration
65	writeDelayN  int
66	tr           *Transaction
67
68	// Compaction.
69	compCommitLk     sync.Mutex
70	tcompCmdC        chan cCmd
71	tcompPauseC      chan chan<- struct{}
72	mcompCmdC        chan cCmd
73	compErrC         chan error
74	compPerErrC      chan error
75	compErrSetC      chan error
76	compWriteLocking bool
77	compStats        cStats
78	memdbMaxLevel    int // For testing.
79
80	// Close.
81	closeW sync.WaitGroup
82	closeC chan struct{}
83	closed uint32
84	closer io.Closer
85}
86
87func openDB(s *session) (*DB, error) {
88	s.log("db@open opening")
89	start := time.Now()
90	db := &DB{
91		s: s,
92		// Initial sequence
93		seq: s.stSeqNum,
94		// MemDB
95		memPool: make(chan *memdb.DB, 1),
96		// Snapshot
97		snapsList: list.New(),
98		// Write
99		batchPool:    sync.Pool{New: newBatch},
100		writeMergeC:  make(chan writeMerge),
101		writeMergedC: make(chan bool),
102		writeLockC:   make(chan struct{}, 1),
103		writeAckC:    make(chan error),
104		// Compaction
105		tcompCmdC:   make(chan cCmd),
106		tcompPauseC: make(chan chan<- struct{}),
107		mcompCmdC:   make(chan cCmd),
108		compErrC:    make(chan error),
109		compPerErrC: make(chan error),
110		compErrSetC: make(chan error),
111		// Close
112		closeC: make(chan struct{}),
113	}
114
115	// Read-only mode.
116	readOnly := s.o.GetReadOnly()
117
118	if readOnly {
119		// Recover journals (read-only mode).
120		if err := db.recoverJournalRO(); err != nil {
121			return nil, err
122		}
123	} else {
124		// Recover journals.
125		if err := db.recoverJournal(); err != nil {
126			return nil, err
127		}
128
129		// Remove any obsolete files.
130		if err := db.checkAndCleanFiles(); err != nil {
131			// Close journal.
132			if db.journal != nil {
133				db.journal.Close()
134				db.journalWriter.Close()
135			}
136			return nil, err
137		}
138
139	}
140
141	// Doesn't need to be included in the wait group.
142	go db.compactionError()
143	go db.mpoolDrain()
144
145	if readOnly {
146		db.SetReadOnly()
147	} else {
148		db.closeW.Add(2)
149		go db.tCompaction()
150		go db.mCompaction()
151		// go db.jWriter()
152	}
153
154	s.logf("db@open done T·%v", time.Since(start))
155
156	runtime.SetFinalizer(db, (*DB).Close)
157	return db, nil
158}
159
160// Open opens or creates a DB for the given storage.
161// The DB will be created if not exist, unless ErrorIfMissing is true.
162// Also, if ErrorIfExist is true and the DB exist Open will returns
163// os.ErrExist error.
164//
165// Open will return an error with type of ErrCorrupted if corruption
166// detected in the DB. Use errors.IsCorrupted to test whether an error is
167// due to corruption. Corrupted DB can be recovered with Recover function.
168//
169// The returned DB instance is safe for concurrent use.
170// The DB must be closed after use, by calling Close method.
171func Open(stor storage.Storage, o *opt.Options) (db *DB, err error) {
172	s, err := newSession(stor, o)
173	if err != nil {
174		return
175	}
176	defer func() {
177		if err != nil {
178			s.close()
179			s.release()
180		}
181	}()
182
183	err = s.recover()
184	if err != nil {
185		if !os.IsNotExist(err) || s.o.GetErrorIfMissing() || s.o.GetReadOnly() {
186			return
187		}
188		err = s.create()
189		if err != nil {
190			return
191		}
192	} else if s.o.GetErrorIfExist() {
193		err = os.ErrExist
194		return
195	}
196
197	return openDB(s)
198}
199
200// OpenFile opens or creates a DB for the given path.
201// The DB will be created if not exist, unless ErrorIfMissing is true.
202// Also, if ErrorIfExist is true and the DB exist OpenFile will returns
203// os.ErrExist error.
204//
205// OpenFile uses standard file-system backed storage implementation as
206// described in the leveldb/storage package.
207//
208// OpenFile will return an error with type of ErrCorrupted if corruption
209// detected in the DB. Use errors.IsCorrupted to test whether an error is
210// due to corruption. Corrupted DB can be recovered with Recover function.
211//
212// The returned DB instance is safe for concurrent use.
213// The DB must be closed after use, by calling Close method.
214func OpenFile(path string, o *opt.Options) (db *DB, err error) {
215	stor, err := storage.OpenFile(path, o.GetReadOnly())
216	if err != nil {
217		return
218	}
219	db, err = Open(stor, o)
220	if err != nil {
221		stor.Close()
222	} else {
223		db.closer = stor
224	}
225	return
226}
227
228// Recover recovers and opens a DB with missing or corrupted manifest files
229// for the given storage. It will ignore any manifest files, valid or not.
230// The DB must already exist or it will returns an error.
231// Also, Recover will ignore ErrorIfMissing and ErrorIfExist options.
232//
233// The returned DB instance is safe for concurrent use.
234// The DB must be closed after use, by calling Close method.
235func Recover(stor storage.Storage, o *opt.Options) (db *DB, err error) {
236	s, err := newSession(stor, o)
237	if err != nil {
238		return
239	}
240	defer func() {
241		if err != nil {
242			s.close()
243			s.release()
244		}
245	}()
246
247	err = recoverTable(s, o)
248	if err != nil {
249		return
250	}
251	return openDB(s)
252}
253
254// RecoverFile recovers and opens a DB with missing or corrupted manifest files
255// for the given path. It will ignore any manifest files, valid or not.
256// The DB must already exist or it will returns an error.
257// Also, Recover will ignore ErrorIfMissing and ErrorIfExist options.
258//
259// RecoverFile uses standard file-system backed storage implementation as described
260// in the leveldb/storage package.
261//
262// The returned DB instance is safe for concurrent use.
263// The DB must be closed after use, by calling Close method.
264func RecoverFile(path string, o *opt.Options) (db *DB, err error) {
265	stor, err := storage.OpenFile(path, false)
266	if err != nil {
267		return
268	}
269	db, err = Recover(stor, o)
270	if err != nil {
271		stor.Close()
272	} else {
273		db.closer = stor
274	}
275	return
276}
277
278func recoverTable(s *session, o *opt.Options) error {
279	o = dupOptions(o)
280	// Mask StrictReader, lets StrictRecovery doing its job.
281	o.Strict &= ^opt.StrictReader
282
283	// Get all tables and sort it by file number.
284	fds, err := s.stor.List(storage.TypeTable)
285	if err != nil {
286		return err
287	}
288	sortFds(fds)
289
290	var (
291		maxSeq                                                            uint64
292		recoveredKey, goodKey, corruptedKey, corruptedBlock, droppedTable int
293
294		// We will drop corrupted table.
295		strict = o.GetStrict(opt.StrictRecovery)
296		noSync = o.GetNoSync()
297
298		rec   = &sessionRecord{}
299		bpool = util.NewBufferPool(o.GetBlockSize() + 5)
300	)
301	buildTable := func(iter iterator.Iterator) (tmpFd storage.FileDesc, size int64, err error) {
302		tmpFd = s.newTemp()
303		writer, err := s.stor.Create(tmpFd)
304		if err != nil {
305			return
306		}
307		defer func() {
308			writer.Close()
309			if err != nil {
310				s.stor.Remove(tmpFd)
311				tmpFd = storage.FileDesc{}
312			}
313		}()
314
315		// Copy entries.
316		tw := table.NewWriter(writer, o)
317		for iter.Next() {
318			key := iter.Key()
319			if validInternalKey(key) {
320				err = tw.Append(key, iter.Value())
321				if err != nil {
322					return
323				}
324			}
325		}
326		err = iter.Error()
327		if err != nil && !errors.IsCorrupted(err) {
328			return
329		}
330		err = tw.Close()
331		if err != nil {
332			return
333		}
334		if !noSync {
335			err = writer.Sync()
336			if err != nil {
337				return
338			}
339		}
340		size = int64(tw.BytesLen())
341		return
342	}
343	recoverTable := func(fd storage.FileDesc) error {
344		s.logf("table@recovery recovering @%d", fd.Num)
345		reader, err := s.stor.Open(fd)
346		if err != nil {
347			return err
348		}
349		var closed bool
350		defer func() {
351			if !closed {
352				reader.Close()
353			}
354		}()
355
356		// Get file size.
357		size, err := reader.Seek(0, 2)
358		if err != nil {
359			return err
360		}
361
362		var (
363			tSeq                                     uint64
364			tgoodKey, tcorruptedKey, tcorruptedBlock int
365			imin, imax                               []byte
366		)
367		tr, err := table.NewReader(reader, size, fd, nil, bpool, o)
368		if err != nil {
369			return err
370		}
371		iter := tr.NewIterator(nil, nil)
372		if itererr, ok := iter.(iterator.ErrorCallbackSetter); ok {
373			itererr.SetErrorCallback(func(err error) {
374				if errors.IsCorrupted(err) {
375					s.logf("table@recovery block corruption @%d %q", fd.Num, err)
376					tcorruptedBlock++
377				}
378			})
379		}
380
381		// Scan the table.
382		for iter.Next() {
383			key := iter.Key()
384			_, seq, _, kerr := parseInternalKey(key)
385			if kerr != nil {
386				tcorruptedKey++
387				continue
388			}
389			tgoodKey++
390			if seq > tSeq {
391				tSeq = seq
392			}
393			if imin == nil {
394				imin = append([]byte{}, key...)
395			}
396			imax = append(imax[:0], key...)
397		}
398		if err := iter.Error(); err != nil && !errors.IsCorrupted(err) {
399			iter.Release()
400			return err
401		}
402		iter.Release()
403
404		goodKey += tgoodKey
405		corruptedKey += tcorruptedKey
406		corruptedBlock += tcorruptedBlock
407
408		if strict && (tcorruptedKey > 0 || tcorruptedBlock > 0) {
409			droppedTable++
410			s.logf("table@recovery dropped @%d Gk·%d Ck·%d Cb·%d S·%d Q·%d", fd.Num, tgoodKey, tcorruptedKey, tcorruptedBlock, size, tSeq)
411			return nil
412		}
413
414		if tgoodKey > 0 {
415			if tcorruptedKey > 0 || tcorruptedBlock > 0 {
416				// Rebuild the table.
417				s.logf("table@recovery rebuilding @%d", fd.Num)
418				iter := tr.NewIterator(nil, nil)
419				tmpFd, newSize, err := buildTable(iter)
420				iter.Release()
421				if err != nil {
422					return err
423				}
424				closed = true
425				reader.Close()
426				if err := s.stor.Rename(tmpFd, fd); err != nil {
427					return err
428				}
429				size = newSize
430			}
431			if tSeq > maxSeq {
432				maxSeq = tSeq
433			}
434			recoveredKey += tgoodKey
435			// Add table to level 0.
436			rec.addTable(0, fd.Num, size, imin, imax)
437			s.logf("table@recovery recovered @%d Gk·%d Ck·%d Cb·%d S·%d Q·%d", fd.Num, tgoodKey, tcorruptedKey, tcorruptedBlock, size, tSeq)
438		} else {
439			droppedTable++
440			s.logf("table@recovery unrecoverable @%d Ck·%d Cb·%d S·%d", fd.Num, tcorruptedKey, tcorruptedBlock, size)
441		}
442
443		return nil
444	}
445
446	// Recover all tables.
447	if len(fds) > 0 {
448		s.logf("table@recovery F·%d", len(fds))
449
450		// Mark file number as used.
451		s.markFileNum(fds[len(fds)-1].Num)
452
453		for _, fd := range fds {
454			if err := recoverTable(fd); err != nil {
455				return err
456			}
457		}
458
459		s.logf("table@recovery recovered F·%d N·%d Gk·%d Ck·%d Q·%d", len(fds), recoveredKey, goodKey, corruptedKey, maxSeq)
460	}
461
462	// Set sequence number.
463	rec.setSeqNum(maxSeq)
464
465	// Create new manifest.
466	if err := s.create(); err != nil {
467		return err
468	}
469
470	// Commit.
471	return s.commit(rec)
472}
473
474func (db *DB) recoverJournal() error {
475	// Get all journals and sort it by file number.
476	rawFds, err := db.s.stor.List(storage.TypeJournal)
477	if err != nil {
478		return err
479	}
480	sortFds(rawFds)
481
482	// Journals that will be recovered.
483	var fds []storage.FileDesc
484	for _, fd := range rawFds {
485		if fd.Num >= db.s.stJournalNum || fd.Num == db.s.stPrevJournalNum {
486			fds = append(fds, fd)
487		}
488	}
489
490	var (
491		ofd storage.FileDesc // Obsolete file.
492		rec = &sessionRecord{}
493	)
494
495	// Recover journals.
496	if len(fds) > 0 {
497		db.logf("journal@recovery F·%d", len(fds))
498
499		// Mark file number as used.
500		db.s.markFileNum(fds[len(fds)-1].Num)
501
502		var (
503			// Options.
504			strict      = db.s.o.GetStrict(opt.StrictJournal)
505			checksum    = db.s.o.GetStrict(opt.StrictJournalChecksum)
506			writeBuffer = db.s.o.GetWriteBuffer()
507
508			jr       *journal.Reader
509			mdb      = memdb.New(db.s.icmp, writeBuffer)
510			buf      = &util.Buffer{}
511			batchSeq uint64
512			batchLen int
513		)
514
515		for _, fd := range fds {
516			db.logf("journal@recovery recovering @%d", fd.Num)
517
518			fr, err := db.s.stor.Open(fd)
519			if err != nil {
520				return err
521			}
522
523			// Create or reset journal reader instance.
524			if jr == nil {
525				jr = journal.NewReader(fr, dropper{db.s, fd}, strict, checksum)
526			} else {
527				jr.Reset(fr, dropper{db.s, fd}, strict, checksum)
528			}
529
530			// Flush memdb and remove obsolete journal file.
531			if !ofd.Zero() {
532				if mdb.Len() > 0 {
533					if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil {
534						fr.Close()
535						return err
536					}
537				}
538
539				rec.setJournalNum(fd.Num)
540				rec.setSeqNum(db.seq)
541				if err := db.s.commit(rec); err != nil {
542					fr.Close()
543					return err
544				}
545				rec.resetAddedTables()
546
547				db.s.stor.Remove(ofd)
548				ofd = storage.FileDesc{}
549			}
550
551			// Replay journal to memdb.
552			mdb.Reset()
553			for {
554				r, err := jr.Next()
555				if err != nil {
556					if err == io.EOF {
557						break
558					}
559
560					fr.Close()
561					return errors.SetFd(err, fd)
562				}
563
564				buf.Reset()
565				if _, err := buf.ReadFrom(r); err != nil {
566					if err == io.ErrUnexpectedEOF {
567						// This is error returned due to corruption, with strict == false.
568						continue
569					}
570
571					fr.Close()
572					return errors.SetFd(err, fd)
573				}
574				batchSeq, batchLen, err = decodeBatchToMem(buf.Bytes(), db.seq, mdb)
575				if err != nil {
576					if !strict && errors.IsCorrupted(err) {
577						db.s.logf("journal error: %v (skipped)", err)
578						// We won't apply sequence number as it might be corrupted.
579						continue
580					}
581
582					fr.Close()
583					return errors.SetFd(err, fd)
584				}
585
586				// Save sequence number.
587				db.seq = batchSeq + uint64(batchLen)
588
589				// Flush it if large enough.
590				if mdb.Size() >= writeBuffer {
591					if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil {
592						fr.Close()
593						return err
594					}
595
596					mdb.Reset()
597				}
598			}
599
600			fr.Close()
601			ofd = fd
602		}
603
604		// Flush the last memdb.
605		if mdb.Len() > 0 {
606			if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil {
607				return err
608			}
609		}
610	}
611
612	// Create a new journal.
613	if _, err := db.newMem(0); err != nil {
614		return err
615	}
616
617	// Commit.
618	rec.setJournalNum(db.journalFd.Num)
619	rec.setSeqNum(db.seq)
620	if err := db.s.commit(rec); err != nil {
621		// Close journal on error.
622		if db.journal != nil {
623			db.journal.Close()
624			db.journalWriter.Close()
625		}
626		return err
627	}
628
629	// Remove the last obsolete journal file.
630	if !ofd.Zero() {
631		db.s.stor.Remove(ofd)
632	}
633
634	return nil
635}
636
637func (db *DB) recoverJournalRO() error {
638	// Get all journals and sort it by file number.
639	rawFds, err := db.s.stor.List(storage.TypeJournal)
640	if err != nil {
641		return err
642	}
643	sortFds(rawFds)
644
645	// Journals that will be recovered.
646	var fds []storage.FileDesc
647	for _, fd := range rawFds {
648		if fd.Num >= db.s.stJournalNum || fd.Num == db.s.stPrevJournalNum {
649			fds = append(fds, fd)
650		}
651	}
652
653	var (
654		// Options.
655		strict      = db.s.o.GetStrict(opt.StrictJournal)
656		checksum    = db.s.o.GetStrict(opt.StrictJournalChecksum)
657		writeBuffer = db.s.o.GetWriteBuffer()
658
659		mdb = memdb.New(db.s.icmp, writeBuffer)
660	)
661
662	// Recover journals.
663	if len(fds) > 0 {
664		db.logf("journal@recovery RO·Mode F·%d", len(fds))
665
666		var (
667			jr       *journal.Reader
668			buf      = &util.Buffer{}
669			batchSeq uint64
670			batchLen int
671		)
672
673		for _, fd := range fds {
674			db.logf("journal@recovery recovering @%d", fd.Num)
675
676			fr, err := db.s.stor.Open(fd)
677			if err != nil {
678				return err
679			}
680
681			// Create or reset journal reader instance.
682			if jr == nil {
683				jr = journal.NewReader(fr, dropper{db.s, fd}, strict, checksum)
684			} else {
685				jr.Reset(fr, dropper{db.s, fd}, strict, checksum)
686			}
687
688			// Replay journal to memdb.
689			for {
690				r, err := jr.Next()
691				if err != nil {
692					if err == io.EOF {
693						break
694					}
695
696					fr.Close()
697					return errors.SetFd(err, fd)
698				}
699
700				buf.Reset()
701				if _, err := buf.ReadFrom(r); err != nil {
702					if err == io.ErrUnexpectedEOF {
703						// This is error returned due to corruption, with strict == false.
704						continue
705					}
706
707					fr.Close()
708					return errors.SetFd(err, fd)
709				}
710				batchSeq, batchLen, err = decodeBatchToMem(buf.Bytes(), db.seq, mdb)
711				if err != nil {
712					if !strict && errors.IsCorrupted(err) {
713						db.s.logf("journal error: %v (skipped)", err)
714						// We won't apply sequence number as it might be corrupted.
715						continue
716					}
717
718					fr.Close()
719					return errors.SetFd(err, fd)
720				}
721
722				// Save sequence number.
723				db.seq = batchSeq + uint64(batchLen)
724			}
725
726			fr.Close()
727		}
728	}
729
730	// Set memDB.
731	db.mem = &memDB{db: db, DB: mdb, ref: 1}
732
733	return nil
734}
735
736func memGet(mdb *memdb.DB, ikey internalKey, icmp *iComparer) (ok bool, mv []byte, err error) {
737	mk, mv, err := mdb.Find(ikey)
738	if err == nil {
739		ukey, _, kt, kerr := parseInternalKey(mk)
740		if kerr != nil {
741			// Shouldn't have had happen.
742			panic(kerr)
743		}
744		if icmp.uCompare(ukey, ikey.ukey()) == 0 {
745			if kt == keyTypeDel {
746				return true, nil, ErrNotFound
747			}
748			return true, mv, nil
749
750		}
751	} else if err != ErrNotFound {
752		return true, nil, err
753	}
754	return
755}
756
757func (db *DB) get(auxm *memdb.DB, auxt tFiles, key []byte, seq uint64, ro *opt.ReadOptions) (value []byte, err error) {
758	ikey := makeInternalKey(nil, key, seq, keyTypeSeek)
759
760	if auxm != nil {
761		if ok, mv, me := memGet(auxm, ikey, db.s.icmp); ok {
762			return append([]byte{}, mv...), me
763		}
764	}
765
766	em, fm := db.getMems()
767	for _, m := range [...]*memDB{em, fm} {
768		if m == nil {
769			continue
770		}
771		defer m.decref()
772
773		if ok, mv, me := memGet(m.DB, ikey, db.s.icmp); ok {
774			return append([]byte{}, mv...), me
775		}
776	}
777
778	v := db.s.version()
779	value, cSched, err := v.get(auxt, ikey, ro, false)
780	v.release()
781	if cSched {
782		// Trigger table compaction.
783		db.compTrigger(db.tcompCmdC)
784	}
785	return
786}
787
788func nilIfNotFound(err error) error {
789	if err == ErrNotFound {
790		return nil
791	}
792	return err
793}
794
795func (db *DB) has(auxm *memdb.DB, auxt tFiles, key []byte, seq uint64, ro *opt.ReadOptions) (ret bool, err error) {
796	ikey := makeInternalKey(nil, key, seq, keyTypeSeek)
797
798	if auxm != nil {
799		if ok, _, me := memGet(auxm, ikey, db.s.icmp); ok {
800			return me == nil, nilIfNotFound(me)
801		}
802	}
803
804	em, fm := db.getMems()
805	for _, m := range [...]*memDB{em, fm} {
806		if m == nil {
807			continue
808		}
809		defer m.decref()
810
811		if ok, _, me := memGet(m.DB, ikey, db.s.icmp); ok {
812			return me == nil, nilIfNotFound(me)
813		}
814	}
815
816	v := db.s.version()
817	_, cSched, err := v.get(auxt, ikey, ro, true)
818	v.release()
819	if cSched {
820		// Trigger table compaction.
821		db.compTrigger(db.tcompCmdC)
822	}
823	if err == nil {
824		ret = true
825	} else if err == ErrNotFound {
826		err = nil
827	}
828	return
829}
830
831// Get gets the value for the given key. It returns ErrNotFound if the
832// DB does not contains the key.
833//
834// The returned slice is its own copy, it is safe to modify the contents
835// of the returned slice.
836// It is safe to modify the contents of the argument after Get returns.
837func (db *DB) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) {
838	err = db.ok()
839	if err != nil {
840		return
841	}
842
843	se := db.acquireSnapshot()
844	defer db.releaseSnapshot(se)
845	return db.get(nil, nil, key, se.seq, ro)
846}
847
848// Has returns true if the DB does contains the given key.
849//
850// It is safe to modify the contents of the argument after Has returns.
851func (db *DB) Has(key []byte, ro *opt.ReadOptions) (ret bool, err error) {
852	err = db.ok()
853	if err != nil {
854		return
855	}
856
857	se := db.acquireSnapshot()
858	defer db.releaseSnapshot(se)
859	return db.has(nil, nil, key, se.seq, ro)
860}
861
862// NewIterator returns an iterator for the latest snapshot of the
863// underlying DB.
864// The returned iterator is not safe for concurrent use, but it is safe to use
865// multiple iterators concurrently, with each in a dedicated goroutine.
866// It is also safe to use an iterator concurrently with modifying its
867// underlying DB. The resultant key/value pairs are guaranteed to be
868// consistent.
869//
870// Slice allows slicing the iterator to only contains keys in the given
871// range. A nil Range.Start is treated as a key before all keys in the
872// DB. And a nil Range.Limit is treated as a key after all keys in
873// the DB.
874//
875// WARNING: Any slice returned by interator (e.g. slice returned by calling
876// Iterator.Key() or Iterator.Key() methods), its content should not be modified
877// unless noted otherwise.
878//
879// The iterator must be released after use, by calling Release method.
880//
881// Also read Iterator documentation of the leveldb/iterator package.
882func (db *DB) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
883	if err := db.ok(); err != nil {
884		return iterator.NewEmptyIterator(err)
885	}
886
887	se := db.acquireSnapshot()
888	defer db.releaseSnapshot(se)
889	// Iterator holds 'version' lock, 'version' is immutable so snapshot
890	// can be released after iterator created.
891	return db.newIterator(nil, nil, se.seq, slice, ro)
892}
893
894// GetSnapshot returns a latest snapshot of the underlying DB. A snapshot
895// is a frozen snapshot of a DB state at a particular point in time. The
896// content of snapshot are guaranteed to be consistent.
897//
898// The snapshot must be released after use, by calling Release method.
899func (db *DB) GetSnapshot() (*Snapshot, error) {
900	if err := db.ok(); err != nil {
901		return nil, err
902	}
903
904	return db.newSnapshot(), nil
905}
906
907// GetProperty returns value of the given property name.
908//
909// Property names:
910//	leveldb.num-files-at-level{n}
911//		Returns the number of files at level 'n'.
912//	leveldb.stats
913//		Returns statistics of the underlying DB.
914//	leveldb.iostats
915//		Returns statistics of effective disk read and write.
916//	leveldb.writedelay
917//		Returns cumulative write delay caused by compaction.
918//	leveldb.sstables
919//		Returns sstables list for each level.
920//	leveldb.blockpool
921//		Returns block pool stats.
922//	leveldb.cachedblock
923//		Returns size of cached block.
924//	leveldb.openedtables
925//		Returns number of opened tables.
926//	leveldb.alivesnaps
927//		Returns number of alive snapshots.
928//	leveldb.aliveiters
929//		Returns number of alive iterators.
930func (db *DB) GetProperty(name string) (value string, err error) {
931	err = db.ok()
932	if err != nil {
933		return
934	}
935
936	const prefix = "leveldb."
937	if !strings.HasPrefix(name, prefix) {
938		return "", ErrNotFound
939	}
940	p := name[len(prefix):]
941
942	v := db.s.version()
943	defer v.release()
944
945	numFilesPrefix := "num-files-at-level"
946	switch {
947	case strings.HasPrefix(p, numFilesPrefix):
948		var level uint
949		var rest string
950		n, _ := fmt.Sscanf(p[len(numFilesPrefix):], "%d%s", &level, &rest)
951		if n != 1 {
952			err = ErrNotFound
953		} else {
954			value = fmt.Sprint(v.tLen(int(level)))
955		}
956	case p == "stats":
957		value = "Compactions\n" +
958			" Level |   Tables   |    Size(MB)   |    Time(sec)  |    Read(MB)   |   Write(MB)\n" +
959			"-------+------------+---------------+---------------+---------------+---------------\n"
960		for level, tables := range v.levels {
961			duration, read, write := db.compStats.getStat(level)
962			if len(tables) == 0 && duration == 0 {
963				continue
964			}
965			value += fmt.Sprintf(" %3d   | %10d | %13.5f | %13.5f | %13.5f | %13.5f\n",
966				level, len(tables), float64(tables.size())/1048576.0, duration.Seconds(),
967				float64(read)/1048576.0, float64(write)/1048576.0)
968		}
969	case p == "iostats":
970		value = fmt.Sprintf("Read(MB):%.5f Write(MB):%.5f",
971			float64(db.s.stor.reads())/1048576.0,
972			float64(db.s.stor.writes())/1048576.0)
973	case p == "writedelay":
974		writeDelayN, writeDelay := atomic.LoadInt32(&db.cWriteDelayN), time.Duration(atomic.LoadInt64(&db.cWriteDelay))
975		paused := atomic.LoadInt32(&db.inWritePaused) == 1
976		value = fmt.Sprintf("DelayN:%d Delay:%s Paused:%t", writeDelayN, writeDelay, paused)
977	case p == "sstables":
978		for level, tables := range v.levels {
979			value += fmt.Sprintf("--- level %d ---\n", level)
980			for _, t := range tables {
981				value += fmt.Sprintf("%d:%d[%q .. %q]\n", t.fd.Num, t.size, t.imin, t.imax)
982			}
983		}
984	case p == "blockpool":
985		value = fmt.Sprintf("%v", db.s.tops.bpool)
986	case p == "cachedblock":
987		if db.s.tops.bcache != nil {
988			value = fmt.Sprintf("%d", db.s.tops.bcache.Size())
989		} else {
990			value = "<nil>"
991		}
992	case p == "openedtables":
993		value = fmt.Sprintf("%d", db.s.tops.cache.Size())
994	case p == "alivesnaps":
995		value = fmt.Sprintf("%d", atomic.LoadInt32(&db.aliveSnaps))
996	case p == "aliveiters":
997		value = fmt.Sprintf("%d", atomic.LoadInt32(&db.aliveIters))
998	default:
999		err = ErrNotFound
1000	}
1001
1002	return
1003}
1004
1005// DBStats is database statistics.
1006type DBStats struct {
1007	WriteDelayCount    int32
1008	WriteDelayDuration time.Duration
1009	WritePaused        bool
1010
1011	AliveSnapshots int32
1012	AliveIterators int32
1013
1014	IOWrite uint64
1015	IORead  uint64
1016
1017	BlockCacheSize    int
1018	OpenedTablesCount int
1019
1020	LevelSizes        []int64
1021	LevelTablesCounts []int
1022	LevelRead         []int64
1023	LevelWrite        []int64
1024	LevelDurations    []time.Duration
1025}
1026
1027// Stats populates s with database statistics.
1028func (db *DB) Stats(s *DBStats) error {
1029	err := db.ok()
1030	if err != nil {
1031		return err
1032	}
1033
1034	s.IORead = db.s.stor.reads()
1035	s.IOWrite = db.s.stor.writes()
1036	s.WriteDelayCount = atomic.LoadInt32(&db.cWriteDelayN)
1037	s.WriteDelayDuration = time.Duration(atomic.LoadInt64(&db.cWriteDelay))
1038	s.WritePaused = atomic.LoadInt32(&db.inWritePaused) == 1
1039
1040	s.OpenedTablesCount = db.s.tops.cache.Size()
1041	if db.s.tops.bcache != nil {
1042		s.BlockCacheSize = db.s.tops.bcache.Size()
1043	} else {
1044		s.BlockCacheSize = 0
1045	}
1046
1047	s.AliveIterators = atomic.LoadInt32(&db.aliveIters)
1048	s.AliveSnapshots = atomic.LoadInt32(&db.aliveSnaps)
1049
1050	s.LevelDurations = s.LevelDurations[:0]
1051	s.LevelRead = s.LevelRead[:0]
1052	s.LevelWrite = s.LevelWrite[:0]
1053	s.LevelSizes = s.LevelSizes[:0]
1054	s.LevelTablesCounts = s.LevelTablesCounts[:0]
1055
1056	v := db.s.version()
1057	defer v.release()
1058
1059	for level, tables := range v.levels {
1060		duration, read, write := db.compStats.getStat(level)
1061		if len(tables) == 0 && duration == 0 {
1062			continue
1063		}
1064		s.LevelDurations = append(s.LevelDurations, duration)
1065		s.LevelRead = append(s.LevelRead, read)
1066		s.LevelWrite = append(s.LevelWrite, write)
1067		s.LevelSizes = append(s.LevelSizes, tables.size())
1068		s.LevelTablesCounts = append(s.LevelTablesCounts, len(tables))
1069	}
1070
1071	return nil
1072}
1073
1074// SizeOf calculates approximate sizes of the given key ranges.
1075// The length of the returned sizes are equal with the length of the given
1076// ranges. The returned sizes measure storage space usage, so if the user
1077// data compresses by a factor of ten, the returned sizes will be one-tenth
1078// the size of the corresponding user data size.
1079// The results may not include the sizes of recently written data.
1080func (db *DB) SizeOf(ranges []util.Range) (Sizes, error) {
1081	if err := db.ok(); err != nil {
1082		return nil, err
1083	}
1084
1085	v := db.s.version()
1086	defer v.release()
1087
1088	sizes := make(Sizes, 0, len(ranges))
1089	for _, r := range ranges {
1090		imin := makeInternalKey(nil, r.Start, keyMaxSeq, keyTypeSeek)
1091		imax := makeInternalKey(nil, r.Limit, keyMaxSeq, keyTypeSeek)
1092		start, err := v.offsetOf(imin)
1093		if err != nil {
1094			return nil, err
1095		}
1096		limit, err := v.offsetOf(imax)
1097		if err != nil {
1098			return nil, err
1099		}
1100		var size int64
1101		if limit >= start {
1102			size = limit - start
1103		}
1104		sizes = append(sizes, size)
1105	}
1106
1107	return sizes, nil
1108}
1109
1110// Close closes the DB. This will also releases any outstanding snapshot,
1111// abort any in-flight compaction and discard open transaction.
1112//
1113// It is not safe to close a DB until all outstanding iterators are released.
1114// It is valid to call Close multiple times. Other methods should not be
1115// called after the DB has been closed.
1116func (db *DB) Close() error {
1117	if !db.setClosed() {
1118		return ErrClosed
1119	}
1120
1121	start := time.Now()
1122	db.log("db@close closing")
1123
1124	// Clear the finalizer.
1125	runtime.SetFinalizer(db, nil)
1126
1127	// Get compaction error.
1128	var err error
1129	select {
1130	case err = <-db.compErrC:
1131		if err == ErrReadOnly {
1132			err = nil
1133		}
1134	default:
1135	}
1136
1137	// Signal all goroutines.
1138	close(db.closeC)
1139
1140	// Discard open transaction.
1141	if db.tr != nil {
1142		db.tr.Discard()
1143	}
1144
1145	// Acquire writer lock.
1146	db.writeLockC <- struct{}{}
1147
1148	// Wait for all gorotines to exit.
1149	db.closeW.Wait()
1150
1151	// Closes journal.
1152	if db.journal != nil {
1153		db.journal.Close()
1154		db.journalWriter.Close()
1155		db.journal = nil
1156		db.journalWriter = nil
1157	}
1158
1159	if db.writeDelayN > 0 {
1160		db.logf("db@write was delayed N·%d T·%v", db.writeDelayN, db.writeDelay)
1161	}
1162
1163	// Close session.
1164	db.s.close()
1165	db.logf("db@close done T·%v", time.Since(start))
1166	db.s.release()
1167
1168	if db.closer != nil {
1169		if err1 := db.closer.Close(); err == nil {
1170			err = err1
1171		}
1172		db.closer = nil
1173	}
1174
1175	// Clear memdbs.
1176	db.clearMems()
1177
1178	return err
1179}
1180