1// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
2// All rights reserved.
3//
4// Use of this source code is governed by a BSD-style license that can be
5// found in the LICENSE file.
6
7package leveldb
8
9import (
10	"container/list"
11	"fmt"
12	"io"
13	"os"
14	"runtime"
15	"strings"
16	"sync"
17	"sync/atomic"
18	"time"
19
20	"github.com/syndtr/goleveldb/leveldb/errors"
21	"github.com/syndtr/goleveldb/leveldb/iterator"
22	"github.com/syndtr/goleveldb/leveldb/journal"
23	"github.com/syndtr/goleveldb/leveldb/memdb"
24	"github.com/syndtr/goleveldb/leveldb/opt"
25	"github.com/syndtr/goleveldb/leveldb/storage"
26	"github.com/syndtr/goleveldb/leveldb/table"
27	"github.com/syndtr/goleveldb/leveldb/util"
28)
29
30// DB is a LevelDB database.
31type DB struct {
32	// Need 64-bit alignment.
33	seq uint64
34
35	// Stats. Need 64-bit alignment.
36	cWriteDelay            int64 // The cumulative duration of write delays
37	cWriteDelayN           int32 // The cumulative number of write delays
38	inWritePaused          int32 // The indicator whether write operation is paused by compaction
39	aliveSnaps, aliveIters int32
40
41	// Compaction statistic
42	memComp       uint32 // The cumulative number of memory compaction
43	level0Comp    uint32 // The cumulative number of level0 compaction
44	nonLevel0Comp uint32 // The cumulative number of non-level0 compaction
45	seekComp      uint32 // The cumulative number of seek compaction
46
47	// Session.
48	s *session
49
50	// MemDB.
51	memMu           sync.RWMutex
52	memPool         chan *memdb.DB
53	mem, frozenMem  *memDB
54	journal         *journal.Writer
55	journalWriter   storage.Writer
56	journalFd       storage.FileDesc
57	frozenJournalFd storage.FileDesc
58	frozenSeq       uint64
59
60	// Snapshot.
61	snapsMu   sync.Mutex
62	snapsList *list.List
63
64	// Write.
65	batchPool    sync.Pool
66	writeMergeC  chan writeMerge
67	writeMergedC chan bool
68	writeLockC   chan struct{}
69	writeAckC    chan error
70	writeDelay   time.Duration
71	writeDelayN  int
72	tr           *Transaction
73
74	// Compaction.
75	compCommitLk     sync.Mutex
76	tcompCmdC        chan cCmd
77	tcompPauseC      chan chan<- struct{}
78	mcompCmdC        chan cCmd
79	compErrC         chan error
80	compPerErrC      chan error
81	compErrSetC      chan error
82	compWriteLocking bool
83	compStats        cStats
84	memdbMaxLevel    int // For testing.
85	compActiveLk     sync.RWMutex
86	memCompActive    bool
87	tableCompActive  bool
88
89	// Close.
90	closeW sync.WaitGroup
91	closeC chan struct{}
92	closed uint32
93	closer io.Closer
94}
95
96func openDB(s *session) (*DB, error) {
97	s.log("db@open opening")
98	start := time.Now()
99	db := &DB{
100		s: s,
101		// Initial sequence
102		seq: s.stSeqNum,
103		// MemDB
104		memPool: make(chan *memdb.DB, 1),
105		// Snapshot
106		snapsList: list.New(),
107		// Write
108		batchPool:    sync.Pool{New: newBatch},
109		writeMergeC:  make(chan writeMerge),
110		writeMergedC: make(chan bool),
111		writeLockC:   make(chan struct{}, 1),
112		writeAckC:    make(chan error),
113		// Compaction
114		tcompCmdC:   make(chan cCmd),
115		tcompPauseC: make(chan chan<- struct{}),
116		mcompCmdC:   make(chan cCmd),
117		compErrC:    make(chan error),
118		compPerErrC: make(chan error),
119		compErrSetC: make(chan error),
120		// Close
121		closeC: make(chan struct{}),
122	}
123
124	// Read-only mode.
125	readOnly := s.o.GetReadOnly()
126
127	if readOnly {
128		// Recover journals (read-only mode).
129		if err := db.recoverJournalRO(); err != nil {
130			return nil, err
131		}
132	} else {
133		// Recover journals.
134		if err := db.recoverJournal(); err != nil {
135			return nil, err
136		}
137
138		// Remove any obsolete files.
139		if err := db.checkAndCleanFiles(); err != nil {
140			// Close journal.
141			if db.journal != nil {
142				db.journal.Close()
143				db.journalWriter.Close()
144			}
145			return nil, err
146		}
147
148	}
149
150	// Doesn't need to be included in the wait group.
151	go db.compactionError()
152	go db.mpoolDrain()
153
154	if readOnly {
155		db.SetReadOnly()
156	} else {
157		db.closeW.Add(2)
158		go db.tCompaction()
159		go db.mCompaction()
160		// go db.jWriter()
161	}
162
163	s.logf("db@open done T·%v", time.Since(start))
164
165	runtime.SetFinalizer(db, (*DB).Close)
166	return db, nil
167}
168
169// Open opens or creates a DB for the given storage.
170// The DB will be created if not exist, unless ErrorIfMissing is true.
171// Also, if ErrorIfExist is true and the DB exist Open will returns
172// os.ErrExist error.
173//
174// Open will return an error with type of ErrCorrupted if corruption
175// detected in the DB. Use errors.IsCorrupted to test whether an error is
176// due to corruption. Corrupted DB can be recovered with Recover function.
177//
178// The returned DB instance is safe for concurrent use.
179// The DB must be closed after use, by calling Close method.
180func Open(stor storage.Storage, o *opt.Options) (db *DB, err error) {
181	s, err := newSession(stor, o)
182	if err != nil {
183		return
184	}
185	defer func() {
186		if err != nil {
187			s.close()
188			s.release()
189		}
190	}()
191
192	err = s.recover()
193	if err != nil {
194		if !os.IsNotExist(err) || s.o.GetErrorIfMissing() || s.o.GetReadOnly() {
195			return
196		}
197		err = s.create()
198		if err != nil {
199			return
200		}
201	} else if s.o.GetErrorIfExist() {
202		err = os.ErrExist
203		return
204	}
205
206	return openDB(s)
207}
208
209// OpenFile opens or creates a DB for the given path.
210// The DB will be created if not exist, unless ErrorIfMissing is true.
211// Also, if ErrorIfExist is true and the DB exist OpenFile will returns
212// os.ErrExist error.
213//
214// OpenFile uses standard file-system backed storage implementation as
215// described in the leveldb/storage package.
216//
217// OpenFile will return an error with type of ErrCorrupted if corruption
218// detected in the DB. Use errors.IsCorrupted to test whether an error is
219// due to corruption. Corrupted DB can be recovered with Recover function.
220//
221// The returned DB instance is safe for concurrent use.
222// The DB must be closed after use, by calling Close method.
223func OpenFile(path string, o *opt.Options) (db *DB, err error) {
224	stor, err := storage.OpenFile(path, o.GetReadOnly())
225	if err != nil {
226		return
227	}
228	db, err = Open(stor, o)
229	if err != nil {
230		stor.Close()
231	} else {
232		db.closer = stor
233	}
234	return
235}
236
237// Recover recovers and opens a DB with missing or corrupted manifest files
238// for the given storage. It will ignore any manifest files, valid or not.
239// The DB must already exist or it will returns an error.
240// Also, Recover will ignore ErrorIfMissing and ErrorIfExist options.
241//
242// The returned DB instance is safe for concurrent use.
243// The DB must be closed after use, by calling Close method.
244func Recover(stor storage.Storage, o *opt.Options) (db *DB, err error) {
245	s, err := newSession(stor, o)
246	if err != nil {
247		return
248	}
249	defer func() {
250		if err != nil {
251			s.close()
252			s.release()
253		}
254	}()
255
256	err = recoverTable(s, o)
257	if err != nil {
258		return
259	}
260	return openDB(s)
261}
262
263// RecoverFile recovers and opens a DB with missing or corrupted manifest files
264// for the given path. It will ignore any manifest files, valid or not.
265// The DB must already exist or it will returns an error.
266// Also, Recover will ignore ErrorIfMissing and ErrorIfExist options.
267//
268// RecoverFile uses standard file-system backed storage implementation as described
269// in the leveldb/storage package.
270//
271// The returned DB instance is safe for concurrent use.
272// The DB must be closed after use, by calling Close method.
273func RecoverFile(path string, o *opt.Options) (db *DB, err error) {
274	stor, err := storage.OpenFile(path, false)
275	if err != nil {
276		return
277	}
278	db, err = Recover(stor, o)
279	if err != nil {
280		stor.Close()
281	} else {
282		db.closer = stor
283	}
284	return
285}
286
287func recoverTable(s *session, o *opt.Options) error {
288	o = dupOptions(o)
289	// Mask StrictReader, lets StrictRecovery doing its job.
290	o.Strict &= ^opt.StrictReader
291
292	// Get all tables and sort it by file number.
293	fds, err := s.stor.List(storage.TypeTable)
294	if err != nil {
295		return err
296	}
297	sortFds(fds)
298
299	var (
300		maxSeq                                                            uint64
301		recoveredKey, goodKey, corruptedKey, corruptedBlock, droppedTable int
302
303		// We will drop corrupted table.
304		strict = o.GetStrict(opt.StrictRecovery)
305		noSync = o.GetNoSync()
306
307		rec   = &sessionRecord{}
308		bpool = util.NewBufferPool(o.GetBlockSize() + 5)
309	)
310	buildTable := func(iter iterator.Iterator) (tmpFd storage.FileDesc, size int64, err error) {
311		tmpFd = s.newTemp()
312		writer, err := s.stor.Create(tmpFd)
313		if err != nil {
314			return
315		}
316		defer func() {
317			writer.Close()
318			if err != nil {
319				s.stor.Remove(tmpFd)
320				tmpFd = storage.FileDesc{}
321			}
322		}()
323
324		// Copy entries.
325		tw := table.NewWriter(writer, o)
326		for iter.Next() {
327			key := iter.Key()
328			if validInternalKey(key) {
329				err = tw.Append(key, iter.Value())
330				if err != nil {
331					return
332				}
333			}
334		}
335		err = iter.Error()
336		if err != nil && !errors.IsCorrupted(err) {
337			return
338		}
339		err = tw.Close()
340		if err != nil {
341			return
342		}
343		if !noSync {
344			err = writer.Sync()
345			if err != nil {
346				return
347			}
348		}
349		size = int64(tw.BytesLen())
350		return
351	}
352	recoverTable := func(fd storage.FileDesc) error {
353		s.logf("table@recovery recovering @%d", fd.Num)
354		reader, err := s.stor.Open(fd)
355		if err != nil {
356			return err
357		}
358		var closed bool
359		defer func() {
360			if !closed {
361				reader.Close()
362			}
363		}()
364
365		// Get file size.
366		size, err := reader.Seek(0, 2)
367		if err != nil {
368			return err
369		}
370
371		var (
372			tSeq                                     uint64
373			tgoodKey, tcorruptedKey, tcorruptedBlock int
374			imin, imax                               []byte
375		)
376		tr, err := table.NewReader(reader, size, fd, nil, bpool, o)
377		if err != nil {
378			return err
379		}
380		iter := tr.NewIterator(nil, nil)
381		if itererr, ok := iter.(iterator.ErrorCallbackSetter); ok {
382			itererr.SetErrorCallback(func(err error) {
383				if errors.IsCorrupted(err) {
384					s.logf("table@recovery block corruption @%d %q", fd.Num, err)
385					tcorruptedBlock++
386				}
387			})
388		}
389
390		// Scan the table.
391		for iter.Next() {
392			key := iter.Key()
393			_, seq, _, kerr := parseInternalKey(key)
394			if kerr != nil {
395				tcorruptedKey++
396				continue
397			}
398			tgoodKey++
399			if seq > tSeq {
400				tSeq = seq
401			}
402			if imin == nil {
403				imin = append([]byte{}, key...)
404			}
405			imax = append(imax[:0], key...)
406		}
407		if err := iter.Error(); err != nil && !errors.IsCorrupted(err) {
408			iter.Release()
409			return err
410		}
411		iter.Release()
412
413		goodKey += tgoodKey
414		corruptedKey += tcorruptedKey
415		corruptedBlock += tcorruptedBlock
416
417		if strict && (tcorruptedKey > 0 || tcorruptedBlock > 0) {
418			droppedTable++
419			s.logf("table@recovery dropped @%d Gk·%d Ck·%d Cb·%d S·%d Q·%d", fd.Num, tgoodKey, tcorruptedKey, tcorruptedBlock, size, tSeq)
420			return nil
421		}
422
423		if tgoodKey > 0 {
424			if tcorruptedKey > 0 || tcorruptedBlock > 0 {
425				// Rebuild the table.
426				s.logf("table@recovery rebuilding @%d", fd.Num)
427				iter := tr.NewIterator(nil, nil)
428				tmpFd, newSize, err := buildTable(iter)
429				iter.Release()
430				if err != nil {
431					return err
432				}
433				closed = true
434				reader.Close()
435				if err := s.stor.Rename(tmpFd, fd); err != nil {
436					return err
437				}
438				size = newSize
439			}
440			if tSeq > maxSeq {
441				maxSeq = tSeq
442			}
443			recoveredKey += tgoodKey
444			// Add table to level 0.
445			rec.addTable(0, fd.Num, size, imin, imax)
446			s.logf("table@recovery recovered @%d Gk·%d Ck·%d Cb·%d S·%d Q·%d", fd.Num, tgoodKey, tcorruptedKey, tcorruptedBlock, size, tSeq)
447		} else {
448			droppedTable++
449			s.logf("table@recovery unrecoverable @%d Ck·%d Cb·%d S·%d", fd.Num, tcorruptedKey, tcorruptedBlock, size)
450		}
451
452		return nil
453	}
454
455	// Recover all tables.
456	if len(fds) > 0 {
457		s.logf("table@recovery F·%d", len(fds))
458
459		// Mark file number as used.
460		s.markFileNum(fds[len(fds)-1].Num)
461
462		for _, fd := range fds {
463			if err := recoverTable(fd); err != nil {
464				return err
465			}
466		}
467
468		s.logf("table@recovery recovered F·%d N·%d Gk·%d Ck·%d Q·%d", len(fds), recoveredKey, goodKey, corruptedKey, maxSeq)
469	}
470
471	// Set sequence number.
472	rec.setSeqNum(maxSeq)
473
474	// Create new manifest.
475	if err := s.create(); err != nil {
476		return err
477	}
478
479	// Commit.
480	return s.commit(rec, false)
481}
482
483func (db *DB) recoverJournal() error {
484	// Get all journals and sort it by file number.
485	rawFds, err := db.s.stor.List(storage.TypeJournal)
486	if err != nil {
487		return err
488	}
489	sortFds(rawFds)
490
491	// Journals that will be recovered.
492	var fds []storage.FileDesc
493	for _, fd := range rawFds {
494		if fd.Num >= db.s.stJournalNum || fd.Num == db.s.stPrevJournalNum {
495			fds = append(fds, fd)
496		}
497	}
498
499	var (
500		ofd storage.FileDesc // Obsolete file.
501		rec = &sessionRecord{}
502	)
503
504	// Recover journals.
505	if len(fds) > 0 {
506		db.logf("journal@recovery F·%d", len(fds))
507
508		// Mark file number as used.
509		db.s.markFileNum(fds[len(fds)-1].Num)
510
511		var (
512			// Options.
513			strict      = db.s.o.GetStrict(opt.StrictJournal)
514			checksum    = db.s.o.GetStrict(opt.StrictJournalChecksum)
515			writeBuffer = db.s.o.GetWriteBuffer()
516
517			jr       *journal.Reader
518			mdb      = memdb.New(db.s.icmp, writeBuffer)
519			buf      = &util.Buffer{}
520			batchSeq uint64
521			batchLen int
522		)
523
524		for _, fd := range fds {
525			db.logf("journal@recovery recovering @%d", fd.Num)
526
527			fr, err := db.s.stor.Open(fd)
528			if err != nil {
529				return err
530			}
531
532			// Create or reset journal reader instance.
533			if jr == nil {
534				jr = journal.NewReader(fr, dropper{db.s, fd}, strict, checksum)
535			} else {
536				jr.Reset(fr, dropper{db.s, fd}, strict, checksum)
537			}
538
539			// Flush memdb and remove obsolete journal file.
540			if !ofd.Zero() {
541				if mdb.Len() > 0 {
542					if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil {
543						fr.Close()
544						return err
545					}
546				}
547
548				rec.setJournalNum(fd.Num)
549				rec.setSeqNum(db.seq)
550				if err := db.s.commit(rec, false); err != nil {
551					fr.Close()
552					return err
553				}
554				rec.resetAddedTables()
555
556				db.s.stor.Remove(ofd)
557				ofd = storage.FileDesc{}
558			}
559
560			// Replay journal to memdb.
561			mdb.Reset()
562			for {
563				r, err := jr.Next()
564				if err != nil {
565					if err == io.EOF {
566						break
567					}
568
569					fr.Close()
570					return errors.SetFd(err, fd)
571				}
572
573				buf.Reset()
574				if _, err := buf.ReadFrom(r); err != nil {
575					if err == io.ErrUnexpectedEOF {
576						// This is error returned due to corruption, with strict == false.
577						continue
578					}
579
580					fr.Close()
581					return errors.SetFd(err, fd)
582				}
583				batchSeq, batchLen, err = decodeBatchToMem(buf.Bytes(), db.seq, mdb)
584				if err != nil {
585					if !strict && errors.IsCorrupted(err) {
586						db.s.logf("journal error: %v (skipped)", err)
587						// We won't apply sequence number as it might be corrupted.
588						continue
589					}
590
591					fr.Close()
592					return errors.SetFd(err, fd)
593				}
594
595				// Save sequence number.
596				db.seq = batchSeq + uint64(batchLen)
597
598				// Flush it if large enough.
599				if mdb.Size() >= writeBuffer {
600					if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil {
601						fr.Close()
602						return err
603					}
604
605					mdb.Reset()
606				}
607			}
608
609			fr.Close()
610			ofd = fd
611		}
612
613		// Flush the last memdb.
614		if mdb.Len() > 0 {
615			if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil {
616				return err
617			}
618		}
619	}
620
621	// Create a new journal.
622	if _, err := db.newMem(0); err != nil {
623		return err
624	}
625
626	// Commit.
627	rec.setJournalNum(db.journalFd.Num)
628	rec.setSeqNum(db.seq)
629	if err := db.s.commit(rec, false); err != nil {
630		// Close journal on error.
631		if db.journal != nil {
632			db.journal.Close()
633			db.journalWriter.Close()
634		}
635		return err
636	}
637
638	// Remove the last obsolete journal file.
639	if !ofd.Zero() {
640		db.s.stor.Remove(ofd)
641	}
642
643	return nil
644}
645
646func (db *DB) recoverJournalRO() error {
647	// Get all journals and sort it by file number.
648	rawFds, err := db.s.stor.List(storage.TypeJournal)
649	if err != nil {
650		return err
651	}
652	sortFds(rawFds)
653
654	// Journals that will be recovered.
655	var fds []storage.FileDesc
656	for _, fd := range rawFds {
657		if fd.Num >= db.s.stJournalNum || fd.Num == db.s.stPrevJournalNum {
658			fds = append(fds, fd)
659		}
660	}
661
662	var (
663		// Options.
664		strict      = db.s.o.GetStrict(opt.StrictJournal)
665		checksum    = db.s.o.GetStrict(opt.StrictJournalChecksum)
666		writeBuffer = db.s.o.GetWriteBuffer()
667
668		mdb = memdb.New(db.s.icmp, writeBuffer)
669	)
670
671	// Recover journals.
672	if len(fds) > 0 {
673		db.logf("journal@recovery RO·Mode F·%d", len(fds))
674
675		var (
676			jr       *journal.Reader
677			buf      = &util.Buffer{}
678			batchSeq uint64
679			batchLen int
680		)
681
682		for _, fd := range fds {
683			db.logf("journal@recovery recovering @%d", fd.Num)
684
685			fr, err := db.s.stor.Open(fd)
686			if err != nil {
687				return err
688			}
689
690			// Create or reset journal reader instance.
691			if jr == nil {
692				jr = journal.NewReader(fr, dropper{db.s, fd}, strict, checksum)
693			} else {
694				jr.Reset(fr, dropper{db.s, fd}, strict, checksum)
695			}
696
697			// Replay journal to memdb.
698			for {
699				r, err := jr.Next()
700				if err != nil {
701					if err == io.EOF {
702						break
703					}
704
705					fr.Close()
706					return errors.SetFd(err, fd)
707				}
708
709				buf.Reset()
710				if _, err := buf.ReadFrom(r); err != nil {
711					if err == io.ErrUnexpectedEOF {
712						// This is error returned due to corruption, with strict == false.
713						continue
714					}
715
716					fr.Close()
717					return errors.SetFd(err, fd)
718				}
719				batchSeq, batchLen, err = decodeBatchToMem(buf.Bytes(), db.seq, mdb)
720				if err != nil {
721					if !strict && errors.IsCorrupted(err) {
722						db.s.logf("journal error: %v (skipped)", err)
723						// We won't apply sequence number as it might be corrupted.
724						continue
725					}
726
727					fr.Close()
728					return errors.SetFd(err, fd)
729				}
730
731				// Save sequence number.
732				db.seq = batchSeq + uint64(batchLen)
733			}
734
735			fr.Close()
736		}
737	}
738
739	// Set memDB.
740	db.mem = &memDB{db: db, DB: mdb, ref: 1}
741
742	return nil
743}
744
745func memGet(mdb *memdb.DB, ikey internalKey, icmp *iComparer) (ok bool, mv []byte, err error) {
746	mk, mv, err := mdb.Find(ikey)
747	if err == nil {
748		ukey, _, kt, kerr := parseInternalKey(mk)
749		if kerr != nil {
750			// Shouldn't have had happen.
751			panic(kerr)
752		}
753		if icmp.uCompare(ukey, ikey.ukey()) == 0 {
754			if kt == keyTypeDel {
755				return true, nil, ErrNotFound
756			}
757			return true, mv, nil
758
759		}
760	} else if err != ErrNotFound {
761		return true, nil, err
762	}
763	return
764}
765
766func (db *DB) get(auxm *memdb.DB, auxt tFiles, key []byte, seq uint64, ro *opt.ReadOptions) (value []byte, err error) {
767	ikey := makeInternalKey(nil, key, seq, keyTypeSeek)
768
769	if auxm != nil {
770		if ok, mv, me := memGet(auxm, ikey, db.s.icmp); ok {
771			return append([]byte{}, mv...), me
772		}
773	}
774
775	em, fm := db.getMems()
776	for _, m := range [...]*memDB{em, fm} {
777		if m == nil {
778			continue
779		}
780		defer m.decref()
781
782		if ok, mv, me := memGet(m.DB, ikey, db.s.icmp); ok {
783			return append([]byte{}, mv...), me
784		}
785	}
786
787	v := db.s.version()
788	value, cSched, err := v.get(auxt, ikey, ro, false)
789	v.release()
790	if cSched {
791		// Trigger table compaction.
792		db.compTrigger(db.tcompCmdC)
793	}
794	return
795}
796
797func nilIfNotFound(err error) error {
798	if err == ErrNotFound {
799		return nil
800	}
801	return err
802}
803
804func (db *DB) has(auxm *memdb.DB, auxt tFiles, key []byte, seq uint64, ro *opt.ReadOptions) (ret bool, err error) {
805	ikey := makeInternalKey(nil, key, seq, keyTypeSeek)
806
807	if auxm != nil {
808		if ok, _, me := memGet(auxm, ikey, db.s.icmp); ok {
809			return me == nil, nilIfNotFound(me)
810		}
811	}
812
813	em, fm := db.getMems()
814	for _, m := range [...]*memDB{em, fm} {
815		if m == nil {
816			continue
817		}
818		defer m.decref()
819
820		if ok, _, me := memGet(m.DB, ikey, db.s.icmp); ok {
821			return me == nil, nilIfNotFound(me)
822		}
823	}
824
825	v := db.s.version()
826	_, cSched, err := v.get(auxt, ikey, ro, true)
827	v.release()
828	if cSched {
829		// Trigger table compaction.
830		db.compTrigger(db.tcompCmdC)
831	}
832	if err == nil {
833		ret = true
834	} else if err == ErrNotFound {
835		err = nil
836	}
837	return
838}
839
840// Get gets the value for the given key. It returns ErrNotFound if the
841// DB does not contains the key.
842//
843// The returned slice is its own copy, it is safe to modify the contents
844// of the returned slice.
845// It is safe to modify the contents of the argument after Get returns.
846func (db *DB) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) {
847	err = db.ok()
848	if err != nil {
849		return
850	}
851
852	se := db.acquireSnapshot()
853	defer db.releaseSnapshot(se)
854	return db.get(nil, nil, key, se.seq, ro)
855}
856
857// Has returns true if the DB does contains the given key.
858//
859// It is safe to modify the contents of the argument after Has returns.
860func (db *DB) Has(key []byte, ro *opt.ReadOptions) (ret bool, err error) {
861	err = db.ok()
862	if err != nil {
863		return
864	}
865
866	se := db.acquireSnapshot()
867	defer db.releaseSnapshot(se)
868	return db.has(nil, nil, key, se.seq, ro)
869}
870
871// NewIterator returns an iterator for the latest snapshot of the
872// underlying DB.
873// The returned iterator is not safe for concurrent use, but it is safe to use
874// multiple iterators concurrently, with each in a dedicated goroutine.
875// It is also safe to use an iterator concurrently with modifying its
876// underlying DB. The resultant key/value pairs are guaranteed to be
877// consistent.
878//
879// Slice allows slicing the iterator to only contains keys in the given
880// range. A nil Range.Start is treated as a key before all keys in the
881// DB. And a nil Range.Limit is treated as a key after all keys in
882// the DB.
883//
884// WARNING: Any slice returned by interator (e.g. slice returned by calling
885// Iterator.Key() or Iterator.Key() methods), its content should not be modified
886// unless noted otherwise.
887//
888// The iterator must be released after use, by calling Release method.
889//
890// Also read Iterator documentation of the leveldb/iterator package.
891func (db *DB) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
892	if err := db.ok(); err != nil {
893		return iterator.NewEmptyIterator(err)
894	}
895
896	se := db.acquireSnapshot()
897	defer db.releaseSnapshot(se)
898	// Iterator holds 'version' lock, 'version' is immutable so snapshot
899	// can be released after iterator created.
900	return db.newIterator(nil, nil, se.seq, slice, ro)
901}
902
903// GetSnapshot returns a latest snapshot of the underlying DB. A snapshot
904// is a frozen snapshot of a DB state at a particular point in time. The
905// content of snapshot are guaranteed to be consistent.
906//
907// The snapshot must be released after use, by calling Release method.
908func (db *DB) GetSnapshot() (*Snapshot, error) {
909	if err := db.ok(); err != nil {
910		return nil, err
911	}
912
913	return db.newSnapshot(), nil
914}
915
916// GetProperty returns value of the given property name.
917//
918// Property names:
919//	leveldb.num-files-at-level{n}
920//		Returns the number of files at level 'n'.
921//	leveldb.stats
922//		Returns statistics of the underlying DB.
923//	leveldb.iostats
924//		Returns statistics of effective disk read and write.
925//	leveldb.writedelay
926//		Returns cumulative write delay caused by compaction.
927//	leveldb.sstables
928//		Returns sstables list for each level.
929//	leveldb.blockpool
930//		Returns block pool stats.
931//	leveldb.cachedblock
932//		Returns size of cached block.
933//	leveldb.openedtables
934//		Returns number of opened tables.
935//	leveldb.alivesnaps
936//		Returns number of alive snapshots.
937//	leveldb.aliveiters
938//		Returns number of alive iterators.
939func (db *DB) GetProperty(name string) (value string, err error) {
940	err = db.ok()
941	if err != nil {
942		return
943	}
944
945	const prefix = "leveldb."
946	if !strings.HasPrefix(name, prefix) {
947		return "", ErrNotFound
948	}
949	p := name[len(prefix):]
950
951	v := db.s.version()
952	defer v.release()
953
954	numFilesPrefix := "num-files-at-level"
955	switch {
956	case strings.HasPrefix(p, numFilesPrefix):
957		var level uint
958		var rest string
959		n, _ := fmt.Sscanf(p[len(numFilesPrefix):], "%d%s", &level, &rest)
960		if n != 1 {
961			err = ErrNotFound
962		} else {
963			value = fmt.Sprint(v.tLen(int(level)))
964		}
965	case p == "stats":
966		value = "Compactions\n" +
967			" Level |   Tables   |    Size(MB)   |    Time(sec)  |    Read(MB)   |   Write(MB)\n" +
968			"-------+------------+---------------+---------------+---------------+---------------\n"
969		var totalTables int
970		var totalSize, totalRead, totalWrite int64
971		var totalDuration time.Duration
972		for level, tables := range v.levels {
973			duration, read, write := db.compStats.getStat(level)
974			if len(tables) == 0 && duration == 0 {
975				continue
976			}
977			totalTables += len(tables)
978			totalSize += tables.size()
979			totalRead += read
980			totalWrite += write
981			totalDuration += duration
982			value += fmt.Sprintf(" %3d   | %10d | %13.5f | %13.5f | %13.5f | %13.5f\n",
983				level, len(tables), float64(tables.size())/1048576.0, duration.Seconds(),
984				float64(read)/1048576.0, float64(write)/1048576.0)
985		}
986		value += "-------+------------+---------------+---------------+---------------+---------------\n"
987		value += fmt.Sprintf(" Total | %10d | %13.5f | %13.5f | %13.5f | %13.5f\n",
988			totalTables, float64(totalSize)/1048576.0, totalDuration.Seconds(),
989			float64(totalRead)/1048576.0, float64(totalWrite)/1048576.0)
990	case p == "compcount":
991		value = fmt.Sprintf("MemComp:%d Level0Comp:%d NonLevel0Comp:%d SeekComp:%d", atomic.LoadUint32(&db.memComp), atomic.LoadUint32(&db.level0Comp), atomic.LoadUint32(&db.nonLevel0Comp), atomic.LoadUint32(&db.seekComp))
992	case p == "iostats":
993		value = fmt.Sprintf("Read(MB):%.5f Write(MB):%.5f",
994			float64(db.s.stor.reads())/1048576.0,
995			float64(db.s.stor.writes())/1048576.0)
996	case p == "writedelay":
997		writeDelayN, writeDelay := atomic.LoadInt32(&db.cWriteDelayN), time.Duration(atomic.LoadInt64(&db.cWriteDelay))
998		paused := atomic.LoadInt32(&db.inWritePaused) == 1
999		value = fmt.Sprintf("DelayN:%d Delay:%s Paused:%t", writeDelayN, writeDelay, paused)
1000	case p == "sstables":
1001		for level, tables := range v.levels {
1002			value += fmt.Sprintf("--- level %d ---\n", level)
1003			for _, t := range tables {
1004				value += fmt.Sprintf("%d:%d[%q .. %q]\n", t.fd.Num, t.size, t.imin, t.imax)
1005			}
1006		}
1007	case p == "blockpool":
1008		value = fmt.Sprintf("%v", db.s.tops.bpool)
1009	case p == "cachedblock":
1010		if db.s.tops.bcache != nil {
1011			value = fmt.Sprintf("%d", db.s.tops.bcache.Size())
1012		} else {
1013			value = "<nil>"
1014		}
1015	case p == "openedtables":
1016		value = fmt.Sprintf("%d", db.s.tops.cache.Size())
1017	case p == "alivesnaps":
1018		value = fmt.Sprintf("%d", atomic.LoadInt32(&db.aliveSnaps))
1019	case p == "aliveiters":
1020		value = fmt.Sprintf("%d", atomic.LoadInt32(&db.aliveIters))
1021	default:
1022		err = ErrNotFound
1023	}
1024
1025	return
1026}
1027
1028// DBStats is database statistics.
1029type DBStats struct {
1030	WriteDelayCount    int32
1031	WriteDelayDuration time.Duration
1032	WritePaused        bool
1033
1034	AliveSnapshots int32
1035	AliveIterators int32
1036
1037	IOWrite uint64
1038	IORead  uint64
1039
1040	BlockCacheSize    int
1041	OpenedTablesCount int
1042
1043	LevelSizes        Sizes
1044	LevelTablesCounts []int
1045	LevelRead         Sizes
1046	LevelWrite        Sizes
1047	LevelDurations    []time.Duration
1048
1049	MemComp       uint32
1050	Level0Comp    uint32
1051	NonLevel0Comp uint32
1052	SeekComp      uint32
1053
1054	MemCompactionActive   bool
1055	TableCompactionActive bool
1056}
1057
1058// Stats populates s with database statistics.
1059func (db *DB) Stats(s *DBStats) error {
1060	err := db.ok()
1061	if err != nil {
1062		return err
1063	}
1064
1065	s.IORead = db.s.stor.reads()
1066	s.IOWrite = db.s.stor.writes()
1067	s.WriteDelayCount = atomic.LoadInt32(&db.cWriteDelayN)
1068	s.WriteDelayDuration = time.Duration(atomic.LoadInt64(&db.cWriteDelay))
1069	s.WritePaused = atomic.LoadInt32(&db.inWritePaused) == 1
1070
1071	s.OpenedTablesCount = db.s.tops.cache.Size()
1072	if db.s.tops.bcache != nil {
1073		s.BlockCacheSize = db.s.tops.bcache.Size()
1074	} else {
1075		s.BlockCacheSize = 0
1076	}
1077
1078	s.AliveIterators = atomic.LoadInt32(&db.aliveIters)
1079	s.AliveSnapshots = atomic.LoadInt32(&db.aliveSnaps)
1080
1081	s.LevelDurations = s.LevelDurations[:0]
1082	s.LevelRead = s.LevelRead[:0]
1083	s.LevelWrite = s.LevelWrite[:0]
1084	s.LevelSizes = s.LevelSizes[:0]
1085	s.LevelTablesCounts = s.LevelTablesCounts[:0]
1086
1087	func() {
1088		db.compActiveLk.RLock()
1089		defer db.compActiveLk.RUnlock()
1090		s.MemCompactionActive = db.memCompActive
1091		s.TableCompactionActive = db.tableCompActive
1092	}()
1093
1094	v := db.s.version()
1095	defer v.release()
1096
1097	for level, tables := range v.levels {
1098		duration, read, write := db.compStats.getStat(level)
1099
1100		s.LevelDurations = append(s.LevelDurations, duration)
1101		s.LevelRead = append(s.LevelRead, read)
1102		s.LevelWrite = append(s.LevelWrite, write)
1103		s.LevelSizes = append(s.LevelSizes, tables.size())
1104		s.LevelTablesCounts = append(s.LevelTablesCounts, len(tables))
1105	}
1106	s.MemComp = atomic.LoadUint32(&db.memComp)
1107	s.Level0Comp = atomic.LoadUint32(&db.level0Comp)
1108	s.NonLevel0Comp = atomic.LoadUint32(&db.nonLevel0Comp)
1109	s.SeekComp = atomic.LoadUint32(&db.seekComp)
1110	return nil
1111}
1112
1113// SizeOf calculates approximate sizes of the given key ranges.
1114// The length of the returned sizes are equal with the length of the given
1115// ranges. The returned sizes measure storage space usage, so if the user
1116// data compresses by a factor of ten, the returned sizes will be one-tenth
1117// the size of the corresponding user data size.
1118// The results may not include the sizes of recently written data.
1119func (db *DB) SizeOf(ranges []util.Range) (Sizes, error) {
1120	if err := db.ok(); err != nil {
1121		return nil, err
1122	}
1123
1124	v := db.s.version()
1125	defer v.release()
1126
1127	sizes := make(Sizes, 0, len(ranges))
1128	for _, r := range ranges {
1129		imin := makeInternalKey(nil, r.Start, keyMaxSeq, keyTypeSeek)
1130		imax := makeInternalKey(nil, r.Limit, keyMaxSeq, keyTypeSeek)
1131		start, err := v.offsetOf(imin)
1132		if err != nil {
1133			return nil, err
1134		}
1135		limit, err := v.offsetOf(imax)
1136		if err != nil {
1137			return nil, err
1138		}
1139		var size int64
1140		if limit >= start {
1141			size = limit - start
1142		}
1143		sizes = append(sizes, size)
1144	}
1145
1146	return sizes, nil
1147}
1148
1149// Close closes the DB. This will also releases any outstanding snapshot,
1150// abort any in-flight compaction and discard open transaction.
1151//
1152// It is not safe to close a DB until all outstanding iterators are released.
1153// It is valid to call Close multiple times. Other methods should not be
1154// called after the DB has been closed.
1155func (db *DB) Close() error {
1156	if !db.setClosed() {
1157		return ErrClosed
1158	}
1159
1160	start := time.Now()
1161	db.log("db@close closing")
1162
1163	// Clear the finalizer.
1164	runtime.SetFinalizer(db, nil)
1165
1166	// Get compaction error.
1167	var err error
1168	select {
1169	case err = <-db.compErrC:
1170		if err == ErrReadOnly {
1171			err = nil
1172		}
1173	default:
1174	}
1175
1176	// Signal all goroutines.
1177	close(db.closeC)
1178
1179	// Discard open transaction.
1180	if db.tr != nil {
1181		db.tr.Discard()
1182	}
1183
1184	// Acquire writer lock.
1185	db.writeLockC <- struct{}{}
1186
1187	// Wait for all gorotines to exit.
1188	db.closeW.Wait()
1189
1190	// Closes journal.
1191	if db.journal != nil {
1192		db.journal.Close()
1193		db.journalWriter.Close()
1194		db.journal = nil
1195		db.journalWriter = nil
1196	}
1197
1198	if db.writeDelayN > 0 {
1199		db.logf("db@write was delayed N·%d T·%v", db.writeDelayN, db.writeDelay)
1200	}
1201
1202	// Close session.
1203	db.s.close()
1204	db.logf("db@close done T·%v", time.Since(start))
1205	db.s.release()
1206
1207	if db.closer != nil {
1208		if err1 := db.closer.Close(); err == nil {
1209			err = err1
1210		}
1211		db.closer = nil
1212	}
1213
1214	// Clear memdbs.
1215	db.clearMems()
1216
1217	return err
1218}
1219