1// Copyright 2014 The kv Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package kv
6
7import (
8	"encoding/binary"
9	"fmt"
10	"io"
11	"os"
12	"sync"
13	"time"
14
15	"github.com/cznic/fileutil"
16	"github.com/cznic/internal/buffer"
17	"github.com/cznic/lldb"
18)
19
20const (
21	magic = "\x60\xdbKV"
22)
23
24const (
25	stDisabled = iota // stDisabled must be zero
26	stIdle
27	stCollecting
28	stIdleArmed
29	stCollectingArmed
30	stCollectingTriggered
31	stEndUpdateFailed
32)
33
34func init() {
35	if stDisabled != 0 {
36		panic("stDisabled != 0")
37	}
38}
39
40// DB represents the database (the KV store).
41type DB struct {
42	acidNest      int             // Grace period nesting level
43	acidState     int             // Grace period FSM state.
44	acidTimer     *time.Timer     // Grace period timer
45	alloc         *lldb.Allocator // The machinery. Wraps filer
46	bkl           sync.Mutex      // Big Kernel Lock
47	closeMu       sync.Mutex      // Close() coordination
48	closed        bool            // it was
49	filer         lldb.Filer      // Wraps f
50	gracePeriod   time.Duration   // WAL grace period
51	isMem         bool            // No signal capture
52	lastCommitErr error           // from failed EndUpdate
53	lock          io.Closer       // The DB file lock
54	opts          *Options
55	root          *lldb.BTree // The KV layer
56	wal           *os.File    // WAL if any
57}
58
59// CreateFromFiler is like Create but accepts an arbitrary backing storage
60// provided by filer.
61//
62// For the meaning of opts please see documentation of Options.
63func CreateFromFiler(filer lldb.Filer, opts *Options) (db *DB, err error) {
64	opts = opts.clone()
65	opts._ACID = _ACIDFull
66	return create(filer, opts, false)
67}
68
69// Create creates the named DB file mode 0666 (before umask). The file must not
70// already exist. If successful, methods on the returned DB can be used for
71// I/O; the associated file descriptor has mode os.O_RDWR. If there is an
72// error, it will be of type *os.PathError.
73//
74// For the meaning of opts please see documentation of Options.
75func Create(name string, opts *Options) (db *DB, err error) {
76	opts = opts.clone()
77	opts._ACID = _ACIDFull
78	f, err := os.OpenFile(name, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0666)
79	if err != nil {
80		return
81	}
82
83	return CreateFromFiler(lldb.NewSimpleFileFiler(f), opts)
84}
85
86func create(filer lldb.Filer, opts *Options, isMem bool) (db *DB, err error) {
87	defer func() {
88		if db != nil {
89			db.opts = opts
90		}
91	}()
92	defer func() {
93		lock := opts.lock
94		if err != nil && lock != nil {
95			lock.Close()
96			db = nil
97		}
98	}()
99
100	if err = opts.check(filer.Name(), true, !isMem); err != nil {
101		return
102	}
103
104	b := [16]byte{byte(magic[0]), byte(magic[1]), byte(magic[2]), byte(magic[3]), 0x00} // ver 0x00
105	if n, err := filer.WriteAt(b[:], 0); n != 16 {
106		return nil, &os.PathError{Op: "kv.create.WriteAt", Path: filer.Name(), Err: err}
107	}
108
109	db = &DB{lock: opts.lock}
110
111	filer = lldb.NewInnerFiler(filer, 16)
112	if filer, err = opts.acidFiler(db, filer); err != nil {
113		return nil, err
114	}
115
116	db.filer = filer
117	if err = filer.BeginUpdate(); err != nil {
118		return
119	}
120
121	defer func() {
122		if e := filer.EndUpdate(); e != nil {
123			if err == nil {
124				err = e
125			}
126		}
127	}()
128
129	if db.alloc, err = lldb.NewAllocator(filer, &lldb.Options{}); err != nil {
130		return nil, &os.PathError{Op: "kv.create", Path: filer.Name(), Err: err}
131	}
132
133	db.alloc.Compress = true
134	db.isMem = isMem
135	var h int64
136	if db.root, h, err = lldb.CreateBTree(db.alloc, opts.Compare); err != nil {
137		return
138	}
139
140	if h != 1 {
141		panic("internal error")
142	}
143
144	db.wal = opts.wal
145	return
146}
147
148// CreateMem creates a new instance of an in-memory DB not backed by a disk
149// file. Memory DBs are resource limited as they are completely held in memory
150// and are not automatically persisted.
151//
152// For the meaning of opts please see documentation of Options.
153func CreateMem(opts *Options) (db *DB, err error) {
154	opts = opts.clone()
155	opts._ACID = _ACIDTransactions
156	f := lldb.NewMemFiler()
157	return create(f, opts, true)
158}
159
160// CreateTemp creates a new temporary DB in the directory dir with a basename
161// beginning with prefix and name ending in suffix. If dir is the empty string,
162// CreateTemp uses the default directory for temporary files (see os.TempDir).
163// Multiple programs calling CreateTemp simultaneously will not choose the same
164// file name for the DB. The caller can use Name() to find the pathname of the
165// DB file. It is the caller's responsibility to remove the file when no longer
166// needed.
167//
168// For the meaning of opts please see documentation of Options.
169func CreateTemp(dir, prefix, suffix string, opts *Options) (db *DB, err error) {
170	opts = opts.clone()
171	opts._ACID = _ACIDFull
172	f, err := fileutil.TempFile(dir, prefix, suffix)
173	if err != nil {
174		return
175	}
176
177	return create(lldb.NewSimpleFileFiler(f), opts, false)
178}
179
180// Open opens the named DB file for reading/writing. If successful, methods on
181// the returned DB can be used for I/O; the associated file descriptor has mode
182// os.O_RDWR. If there is an error, it will be of type *os.PathError.
183//
184// Note: While a DB is opened, it is locked and cannot be simultaneously opened
185// again.
186//
187// For the meaning of opts please see documentation of Options.
188func Open(name string, opts *Options) (db *DB, err error) {
189	f, err := os.OpenFile(name, os.O_RDWR, 0666)
190	if err != nil {
191		return nil, err
192	}
193
194	return OpenFromFiler(lldb.NewSimpleFileFiler(f), opts)
195}
196
197// OpenFromFiler is like Open but it accepts an arbitrary backing storage
198// provided by filer.
199func OpenFromFiler(filer lldb.Filer, opts *Options) (db *DB, err error) {
200	opts = opts.clone()
201	opts._ACID = _ACIDFull
202	defer func() {
203		if db != nil {
204			db.opts = opts
205		}
206	}()
207	defer func() {
208		lock := opts.lock
209		if err != nil && lock != nil {
210			lock.Close()
211			db = nil
212		}
213		if err != nil {
214			if db != nil {
215				db.Close()
216				db = nil
217			}
218		}
219	}()
220
221	name := filer.Name()
222	if err = opts.check(name, false, true); err != nil {
223		return
224	}
225
226	sz, err := filer.Size()
227	if err != nil {
228		return
229	}
230
231	if sz%16 != 0 {
232		return nil, &os.PathError{Op: "kv.Open:", Path: name, Err: fmt.Errorf("file size %d(%#x) is not 0 (mod 16)", sz, sz)}
233	}
234
235	var b [16]byte
236	if n, err := filer.ReadAt(b[:], 0); n != 16 || err != nil {
237		return nil, &os.PathError{Op: "kv.Open.ReadAt", Path: name, Err: err}
238	}
239
240	var h header
241	if err = h.rd(b[:]); err != nil {
242		return nil, &os.PathError{Op: "kv.Open:validate header", Path: name, Err: err}
243	}
244
245	db = &DB{lock: opts.lock}
246	if filer, err = opts.acidFiler(db, filer); err != nil {
247		return nil, err
248	}
249
250	db.filer = filer
251	switch h.ver {
252	default:
253		return nil, &os.PathError{Op: "kv.Open", Path: name, Err: fmt.Errorf("unknown/unsupported kv file format version %#x", h.ver)}
254	case 0x00:
255		if _, err = open00(name, db); err != nil {
256			return nil, err
257		}
258	}
259
260	db.root, err = lldb.OpenBTree(db.alloc, opts.Compare, 1)
261	db.wal = opts.wal
262	if opts.VerifyDbAfterOpen {
263		err = verifyAllocator(db.alloc)
264	}
265	return
266}
267
268// Close closes the DB, rendering it unusable for I/O. It returns an error, if
269// any. Failing to call Close before exiting a program can lose the last open
270// or being committed transaction.
271//
272// Successful Close is idempotent.
273func (db *DB) Close() (err error) {
274	db.closeMu.Lock()
275	defer db.closeMu.Unlock()
276	if db.closed {
277		return
278	}
279
280	db.closed = true
281
282	if err = db.enter(); err != nil {
283		return
284	}
285
286	doLeave := true
287	defer func() {
288		db.wal = nil
289		if e := recover(); e != nil {
290			err = fmt.Errorf("%v", e)
291		}
292		if doLeave {
293			db.leave(&err)
294		}
295	}()
296
297	if db.acidTimer != nil {
298		db.acidTimer.Stop()
299	}
300
301	var e error
302	for db.acidNest > 0 {
303		db.acidNest--
304		if e = db.filer.EndUpdate(); err == nil {
305			err = e
306		}
307	}
308
309	doLeave = false
310	if e = db.leave(&err); err == nil {
311		err = e
312	}
313	if db.opts.VerifyDbBeforeClose {
314		if e = verifyAllocator(db.alloc); err == nil {
315			err = e
316		}
317	}
318	if e = db.close(); err == nil {
319		err = e
320	}
321	if lock := db.lock; lock != nil {
322		if e = lock.Close(); err == nil {
323			err = e
324		}
325	}
326	if wal := db.wal; wal != nil {
327		e = wal.Close()
328		db.wal = nil
329		if err == nil {
330			err = e
331		}
332	}
333	return
334}
335
336func (db *DB) close() (err error) {
337	// We are safe to close due to locked db.closeMu, but not safe against
338	// any other goroutine concurrently calling other exported db methods,
339	// causing a race[0] in the db.enter() mechanism. So we must lock
340	// db.bkl.
341	//
342	//  [0]: https://github.com/cznic/kv/issues/17#issuecomment-31960658
343	db.bkl.Lock()
344	defer db.bkl.Unlock()
345
346	if db.isMem { // lldb.MemFiler
347		return
348	}
349
350	err = db.filer.Sync()
351	if e := db.filer.Close(); err == nil {
352		err = e
353	}
354	if db.opts.VerifyDbAfterClose {
355		if e := verifyDbFile(db.Name()); err == nil {
356			err = e
357		}
358	}
359	return
360}
361
362// Name returns the name of the DB file.
363func (db *DB) Name() string {
364	return db.filer.Name()
365}
366
367// Size returns the size of the DB file.
368func (db *DB) Size() (sz int64, err error) {
369	db.bkl.Lock()
370	defer func() {
371		if e := recover(); e != nil {
372			err = fmt.Errorf("%v", e)
373		}
374		db.bkl.Unlock()
375	}()
376
377	return db.filer.Size()
378}
379
380func (db *DB) enter() (err error) {
381	db.bkl.Lock()
382	switch db.acidState {
383	default:
384		panic("internal error")
385	case stDisabled:
386		db.acidNest++
387		if db.acidNest == 1 {
388			if err = db.filer.BeginUpdate(); err != nil {
389				return err
390			}
391		}
392	case stIdle:
393		if err = db.filer.BeginUpdate(); err != nil {
394			return err
395		}
396
397		db.acidNest = 1
398		db.acidTimer = time.AfterFunc(db.gracePeriod, db.timeout)
399		db.acidState = stCollecting
400	case stCollecting:
401		db.acidNest++
402	case stIdleArmed:
403		db.acidNest = 1
404		db.acidState = stCollectingArmed
405	case stCollectingArmed:
406		db.acidNest++
407	case stCollectingTriggered:
408		db.acidNest++
409	case stEndUpdateFailed:
410		return db.leave(&err)
411	}
412
413	return nil
414}
415
416func (db *DB) leave(err *error) error {
417	switch db.acidState {
418	default:
419		panic("internal error")
420	case stDisabled:
421		db.acidNest--
422		if db.acidNest == 0 {
423			if e := db.filer.EndUpdate(); e != nil && *err == nil {
424				*err = e
425			}
426		}
427	case stCollecting:
428		db.acidNest--
429		if db.acidNest == 0 {
430			db.acidState = stIdleArmed
431		}
432	case stCollectingArmed:
433		db.acidNest--
434		if db.acidNest == 0 {
435			db.acidState = stIdleArmed
436		}
437	case stCollectingTriggered:
438		db.acidNest--
439		if db.acidNest == 0 {
440			if e := db.filer.EndUpdate(); e != nil && *err == nil {
441				*err = e
442			}
443			db.acidState = stIdle
444		}
445	case stEndUpdateFailed:
446		db.bkl.Unlock()
447		return fmt.Errorf("Last transaction commit failed: %v", db.lastCommitErr)
448	}
449
450	if *err != nil {
451		db.filer.Rollback() // return the original, input error
452	}
453	db.bkl.Unlock()
454	return *err
455}
456
457func (db *DB) timeout() {
458	db.closeMu.Lock()
459	defer db.closeMu.Unlock()
460	if db.closed {
461		return
462	}
463
464	db.bkl.Lock()
465	defer db.bkl.Unlock()
466
467	switch db.acidState {
468	default:
469		panic("internal error")
470	case stIdle:
471		panic("internal error")
472	case stCollecting:
473		db.acidState = stCollectingTriggered
474	case stIdleArmed:
475		if err := db.filer.EndUpdate(); err != nil { // If EndUpdate fails, no WAL was written (automatic Rollback)
476			db.acidState = stEndUpdateFailed
477			db.lastCommitErr = err
478			return
479		}
480
481		db.acidState = stIdle
482	case stCollectingArmed:
483		db.acidState = stCollectingTriggered
484	case stCollectingTriggered:
485		panic("internal error")
486	}
487}
488
489// BeginTransaction starts a new transaction. Every call to BeginTransaction
490// must be eventually "balanced" by exactly one call to Commit or Rollback (but
491// not both). Calls to BeginTransaction may nest.
492//
493// BeginTransaction is atomic and it is safe for concurrent use by multiple
494// goroutines (if/when that makes sense).
495func (db *DB) BeginTransaction() (err error) {
496	if err = db.enter(); err != nil {
497		return
498	}
499
500	defer func() {
501		if e := recover(); e != nil {
502			err = fmt.Errorf("%v", e)
503		}
504		db.leave(&err)
505	}()
506
507	db.acidNest++
508	return db.filer.BeginUpdate()
509}
510
511// Commit commits the current transaction. If the transaction is the top level
512// one, then all of the changes made within the transaction are atomically made
513// persistent in the DB.  Invocation of an unbalanced Commit is an error.
514//
515// Commit is atomic and it is safe for concurrent use by multiple goroutines
516// (if/when that makes sense).
517func (db *DB) Commit() (err error) {
518	if err = db.enter(); err != nil {
519		return
520	}
521
522	defer func() {
523		if e := recover(); e != nil {
524			err = fmt.Errorf("%v", e)
525		}
526		db.leave(&err)
527	}()
528
529	db.acidNest--
530	return db.filer.EndUpdate()
531}
532
533// Rollback cancels and undoes the innermost transaction level. If the
534// transaction is the top level one, then no of the changes made within the
535// transactions are persisted. Invocation of an unbalanced Rollback is an
536// error.
537//
538// Rollback is atomic and it is safe for concurrent use by multiple goroutines
539// (if/when that makes sense).
540func (db *DB) Rollback() (err error) {
541	if err = db.enter(); err != nil {
542		return
543	}
544
545	defer func() {
546		if e := recover(); e != nil {
547			err = fmt.Errorf("%v", e)
548		}
549		db.leave(&err)
550	}()
551
552	db.acidNest--
553	return db.filer.Rollback()
554}
555
556// Verify attempts to find any structural errors in DB wrt the organization of
557// it as defined by lldb.Allocator. Any problems found are reported to 'log'
558// except non verify related errors like disk read fails etc. If 'log' returns
559// false or the error doesn't allow to (reliably) continue, the verification
560// process is stopped and an error is returned from the Verify function.
561// Passing a nil log works like providing a log function always returning
562// false. Any non-structural errors, like for instance Filer read errors, are
563// NOT reported to 'log', but returned as the Verify's return value, because
564// Verify cannot proceed in such cases. Verify returns nil only if it fully
565// completed verifying DB without detecting any error.
566//
567// It is recommended to limit the number reported problems by returning false
568// from 'log' after reaching some limit. Huge and corrupted DB can produce an
569// overwhelming error report dataset.
570//
571// The verifying process will scan the whole DB at least 3 times (a trade
572// between processing space and time consumed). It doesn't read the content of
573// free blocks above the head/tail info bytes. If the 3rd phase detects lost
574// free space, then a 4th scan (a faster one) is performed to precisely report
575// all of them.
576//
577// Statistics are returned via 'stats' if non nil. The statistics are valid
578// only if Verify succeeded, ie. it didn't reported anything to log and it
579// returned a nil error.
580func (db *DB) Verify(log func(error) bool, stats *lldb.AllocStats) (err error) {
581	bitmapf, err := fileutil.TempFile("", "verifier", ".tmp")
582	if err != nil {
583		return
584	}
585
586	defer func() {
587		tn := bitmapf.Name()
588		bitmapf.Close()
589		os.Remove(tn)
590	}()
591
592	bitmap := lldb.NewSimpleFileFiler(bitmapf)
593
594	if err = db.enter(); err != nil {
595		return
596	}
597
598	defer func() {
599		if e := recover(); e != nil {
600			err = fmt.Errorf("%v", e)
601		}
602		db.leave(&err)
603	}()
604
605	return db.alloc.Verify(bitmap, log, stats)
606}
607
608// Delete deletes key and its associated value from the DB.
609//
610// Delete is atomic and it is safe for concurrent use by multiple goroutines.
611func (db *DB) Delete(key []byte) (err error) {
612	if err = db.enter(); err != nil {
613		return
614	}
615
616	err = db.root.Delete(key)
617	return db.leave(&err)
618}
619
620// Extract is a combination of Get and Delete. If the key exists in the DB, it
621// is returned (like Get) and also deleted from the DB in a more efficient way
622// which doesn't search for the key twice. The returned slice may be a
623// sub-slice of buf if buf was large enough to hold the entire content.
624// Otherwise, a newly allocated slice will be returned. It is valid to pass a
625// nil buf.
626//
627// Extract is atomic and it is safe for concurrent use by multiple goroutines.
628func (db *DB) Extract(buf, key []byte) (value []byte, err error) {
629	if err = db.enter(); err != nil {
630		return
631	}
632
633	value, err = db.root.Extract(buf, key)
634	db.leave(&err)
635	return
636}
637
638// First returns the first KV pair in the DB, if it exists. Otherwise key ==
639// nil and value == nil.
640//
641// First is atomic and it is safe for concurrent use by multiple goroutines.
642func (db *DB) First() (key, value []byte, err error) {
643	db.bkl.Lock()
644	defer db.bkl.Unlock()
645	return db.root.First()
646}
647
648// Get returns the value associated with key, or nil if no such value exists.
649// The returned slice may be a sub-slice of buf if buf was large enough to hold
650// the entire content. Otherwise, a newly allocated slice will be returned. It
651// is valid to pass a nil buf.
652//
653// Get is atomic and it is safe for concurrent use by multiple goroutines.
654func (db *DB) Get(buf, key []byte) (value []byte, err error) {
655	db.bkl.Lock()
656	defer db.bkl.Unlock()
657	return db.root.Get(buf, key)
658}
659
660// Last returns the last KV pair of the DB, if it exists. Otherwise key ==
661// nil and value == nil.
662//
663// Last is atomic and it is safe for concurrent use by multiple goroutines.
664func (db *DB) Last() (key, value []byte, err error) {
665	db.bkl.Lock()
666	defer db.bkl.Unlock()
667	return db.root.Last()
668}
669
670// Put combines Get and Set in a more efficient way where the DB is searched
671// for the key only once. The upd(ater) receives the current (key, old-value),
672// if that exists or (key, nil) otherwise. It can then return a (new-value,
673// true, nil) to create or overwrite the existing value in the KV pair, or
674// (whatever, false, nil) if it decides not to create or not to update the
675// value of the KV pair.
676//
677//	db.Set(k, v)
678//
679// conceptually equals
680//
681//	db.Put(k, func(k, v []byte){ return v, true }([]byte, bool))
682//
683// modulo the differing return values.
684//
685// The returned slice may be a sub-slice of buf if buf was large enough to hold
686// the entire content. Otherwise, a newly allocated slice will be returned. It
687// is valid to pass a nil buf.
688//
689// Put is atomic and it is safe for concurrent use by multiple goroutines.
690func (db *DB) Put(buf, key []byte, upd func(key, old []byte) (new []byte, write bool, err error)) (old []byte, written bool, err error) {
691	if err = db.enter(); err != nil {
692		return
693	}
694
695	old, written, err = db.root.Put(buf, key, upd)
696	db.leave(&err)
697	return
698}
699
700// Seek returns an enumerator positioned on the first key/value pair whose key
701// is 'greater than or equal to' the given key. There may be no such pair, in
702// which case the Next,Prev methods of the returned enumerator will always
703// return io.EOF.
704//
705// Seek is atomic and it is safe for concurrent use by multiple goroutines.
706func (db *DB) Seek(key []byte) (enum *Enumerator, hit bool, err error) {
707	db.bkl.Lock()
708	defer db.bkl.Unlock()
709	enum0, hit, err := db.root.Seek(key)
710	if err != nil {
711		return
712	}
713
714	enum = &Enumerator{
715		db:   db,
716		enum: enum0,
717	}
718	return
719}
720
721// SeekFirst returns an enumerator positioned on the first KV pair in the DB,
722// if any. For an empty DB, err == io.EOF is returned.
723//
724// SeekFirst is atomic and it is safe for concurrent use by multiple
725// goroutines.
726func (db *DB) SeekFirst() (enum *Enumerator, err error) {
727	db.bkl.Lock()
728	defer db.bkl.Unlock()
729	enum0, err := db.root.SeekFirst()
730	if err != nil {
731		return
732	}
733
734	enum = &Enumerator{
735		db:   db,
736		enum: enum0,
737	}
738	return
739}
740
741// SeekLast returns an enumerator positioned on the last KV pair in the DB,
742// if any. For an empty DB, err == io.EOF is returned.
743//
744// SeekLast is atomic and it is safe for concurrent use by multiple
745// goroutines.
746func (db *DB) SeekLast() (enum *Enumerator, err error) {
747	db.bkl.Lock()
748	defer db.bkl.Unlock()
749	enum0, err := db.root.SeekLast()
750	if err != nil {
751		return
752	}
753
754	enum = &Enumerator{
755		db:   db,
756		enum: enum0,
757	}
758	return
759}
760
761// Set sets the value associated with key. Any previous value, if existed, is
762// overwritten by the new one.
763//
764// Set is atomic and it is safe for concurrent use by multiple goroutines.
765func (db *DB) Set(key, value []byte) (err error) {
766	if err = db.enter(); err != nil {
767		return
768	}
769
770	err = db.root.Set(key, value)
771	db.leave(&err)
772	return
773}
774
775// Enumerator captures the state of enumerating a DB. It is returned from the
776// Seek* methods. Multiple enumerations may be in progress simultaneously.  The
777// enumerator is aware of any mutations made to the tree in the process of
778// enumerating it and automatically resumes the enumeration.
779//
780// Multiple concurrently executing enumerations may be in progress.
781type Enumerator struct {
782	db   *DB
783	enum *lldb.BTreeEnumerator
784}
785
786// Next returns the currently enumerated KV pair, if it exists and moves to the
787// next KV in the key collation order. If there is no KV pair to return, err ==
788// io.EOF is returned.
789//
790// Next is atomic and it is safe for concurrent use by multiple goroutines.
791func (e *Enumerator) Next() (key, value []byte, err error) {
792	e.db.bkl.Lock()
793	defer e.db.bkl.Unlock()
794	return e.enum.Next()
795}
796
797// Prev returns the currently enumerated KV pair, if it exists and moves to the
798// previous KV in the key collation order. If there is no KV pair to return,
799// err == io.EOF is returned.
800//
801// Prev is atomic and it is safe for concurrent use by multiple goroutines.
802func (e *Enumerator) Prev() (key, value []byte, err error) {
803	e.db.bkl.Lock()
804	defer e.db.bkl.Unlock()
805	return e.enum.Prev()
806}
807
808// Inc atomically increments the value associated with key by delta and
809// returns the new value. If the value doesn't exists before calling Inc or if
810// the value is not an [8]byte, the value is considered to be zero before peforming Inc.
811//
812// Inc is atomic and it is safe for concurrent use by multiple goroutines.
813func (db *DB) Inc(key []byte, delta int64) (val int64, err error) {
814	if err = db.enter(); err != nil {
815		return
816	}
817
818	defer db.leave(&err)
819
820	pbuf := buffer.Get(8)
821	defer buffer.Put(pbuf)
822	_, _, err = db.root.Put(
823		*pbuf,
824		key,
825		func(key []byte, old []byte) (new []byte, write bool, err error) {
826			write = true
827			if len(old) == 8 {
828				val = int64(binary.BigEndian.Uint64(old))
829			} else {
830				old = make([]byte, 8)
831				val = 0
832			}
833			val += delta
834			binary.BigEndian.PutUint64(old, uint64(val))
835			new = old
836			return
837		},
838	)
839
840	return
841}
842
843// WALName returns the name of the WAL file in use or an empty string for memory
844// or closed databases.
845func (db *DB) WALName() string {
846	if f := db.wal; f != nil {
847		return f.Name()
848	}
849
850	return ""
851}
852