1// Copyright 2014 The kv Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style 3// license that can be found in the LICENSE file. 4 5package kv 6 7import ( 8 "encoding/binary" 9 "fmt" 10 "io" 11 "os" 12 "sync" 13 "time" 14 15 "github.com/cznic/fileutil" 16 "github.com/cznic/internal/buffer" 17 "github.com/cznic/lldb" 18) 19 20const ( 21 magic = "\x60\xdbKV" 22) 23 24const ( 25 stDisabled = iota // stDisabled must be zero 26 stIdle 27 stCollecting 28 stIdleArmed 29 stCollectingArmed 30 stCollectingTriggered 31 stEndUpdateFailed 32) 33 34func init() { 35 if stDisabled != 0 { 36 panic("stDisabled != 0") 37 } 38} 39 40// DB represents the database (the KV store). 41type DB struct { 42 acidNest int // Grace period nesting level 43 acidState int // Grace period FSM state. 44 acidTimer *time.Timer // Grace period timer 45 alloc *lldb.Allocator // The machinery. Wraps filer 46 bkl sync.Mutex // Big Kernel Lock 47 closeMu sync.Mutex // Close() coordination 48 closed bool // it was 49 filer lldb.Filer // Wraps f 50 gracePeriod time.Duration // WAL grace period 51 isMem bool // No signal capture 52 lastCommitErr error // from failed EndUpdate 53 lock io.Closer // The DB file lock 54 opts *Options 55 root *lldb.BTree // The KV layer 56 wal *os.File // WAL if any 57} 58 59// CreateFromFiler is like Create but accepts an arbitrary backing storage 60// provided by filer. 61// 62// For the meaning of opts please see documentation of Options. 63func CreateFromFiler(filer lldb.Filer, opts *Options) (db *DB, err error) { 64 opts = opts.clone() 65 opts._ACID = _ACIDFull 66 return create(filer, opts, false) 67} 68 69// Create creates the named DB file mode 0666 (before umask). The file must not 70// already exist. If successful, methods on the returned DB can be used for 71// I/O; the associated file descriptor has mode os.O_RDWR. If there is an 72// error, it will be of type *os.PathError. 73// 74// For the meaning of opts please see documentation of Options. 75func Create(name string, opts *Options) (db *DB, err error) { 76 opts = opts.clone() 77 opts._ACID = _ACIDFull 78 f, err := os.OpenFile(name, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0666) 79 if err != nil { 80 return 81 } 82 83 return CreateFromFiler(lldb.NewSimpleFileFiler(f), opts) 84} 85 86func create(filer lldb.Filer, opts *Options, isMem bool) (db *DB, err error) { 87 defer func() { 88 if db != nil { 89 db.opts = opts 90 } 91 }() 92 defer func() { 93 lock := opts.lock 94 if err != nil && lock != nil { 95 lock.Close() 96 db = nil 97 } 98 }() 99 100 if err = opts.check(filer.Name(), true, !isMem); err != nil { 101 return 102 } 103 104 b := [16]byte{byte(magic[0]), byte(magic[1]), byte(magic[2]), byte(magic[3]), 0x00} // ver 0x00 105 if n, err := filer.WriteAt(b[:], 0); n != 16 { 106 return nil, &os.PathError{Op: "kv.create.WriteAt", Path: filer.Name(), Err: err} 107 } 108 109 db = &DB{lock: opts.lock} 110 111 filer = lldb.NewInnerFiler(filer, 16) 112 if filer, err = opts.acidFiler(db, filer); err != nil { 113 return nil, err 114 } 115 116 db.filer = filer 117 if err = filer.BeginUpdate(); err != nil { 118 return 119 } 120 121 defer func() { 122 if e := filer.EndUpdate(); e != nil { 123 if err == nil { 124 err = e 125 } 126 } 127 }() 128 129 if db.alloc, err = lldb.NewAllocator(filer, &lldb.Options{}); err != nil { 130 return nil, &os.PathError{Op: "kv.create", Path: filer.Name(), Err: err} 131 } 132 133 db.alloc.Compress = true 134 db.isMem = isMem 135 var h int64 136 if db.root, h, err = lldb.CreateBTree(db.alloc, opts.Compare); err != nil { 137 return 138 } 139 140 if h != 1 { 141 panic("internal error") 142 } 143 144 db.wal = opts.wal 145 return 146} 147 148// CreateMem creates a new instance of an in-memory DB not backed by a disk 149// file. Memory DBs are resource limited as they are completely held in memory 150// and are not automatically persisted. 151// 152// For the meaning of opts please see documentation of Options. 153func CreateMem(opts *Options) (db *DB, err error) { 154 opts = opts.clone() 155 opts._ACID = _ACIDTransactions 156 f := lldb.NewMemFiler() 157 return create(f, opts, true) 158} 159 160// CreateTemp creates a new temporary DB in the directory dir with a basename 161// beginning with prefix and name ending in suffix. If dir is the empty string, 162// CreateTemp uses the default directory for temporary files (see os.TempDir). 163// Multiple programs calling CreateTemp simultaneously will not choose the same 164// file name for the DB. The caller can use Name() to find the pathname of the 165// DB file. It is the caller's responsibility to remove the file when no longer 166// needed. 167// 168// For the meaning of opts please see documentation of Options. 169func CreateTemp(dir, prefix, suffix string, opts *Options) (db *DB, err error) { 170 opts = opts.clone() 171 opts._ACID = _ACIDFull 172 f, err := fileutil.TempFile(dir, prefix, suffix) 173 if err != nil { 174 return 175 } 176 177 return create(lldb.NewSimpleFileFiler(f), opts, false) 178} 179 180// Open opens the named DB file for reading/writing. If successful, methods on 181// the returned DB can be used for I/O; the associated file descriptor has mode 182// os.O_RDWR. If there is an error, it will be of type *os.PathError. 183// 184// Note: While a DB is opened, it is locked and cannot be simultaneously opened 185// again. 186// 187// For the meaning of opts please see documentation of Options. 188func Open(name string, opts *Options) (db *DB, err error) { 189 f, err := os.OpenFile(name, os.O_RDWR, 0666) 190 if err != nil { 191 return nil, err 192 } 193 194 return OpenFromFiler(lldb.NewSimpleFileFiler(f), opts) 195} 196 197// OpenFromFiler is like Open but it accepts an arbitrary backing storage 198// provided by filer. 199func OpenFromFiler(filer lldb.Filer, opts *Options) (db *DB, err error) { 200 opts = opts.clone() 201 opts._ACID = _ACIDFull 202 defer func() { 203 if db != nil { 204 db.opts = opts 205 } 206 }() 207 defer func() { 208 lock := opts.lock 209 if err != nil && lock != nil { 210 lock.Close() 211 db = nil 212 } 213 if err != nil { 214 if db != nil { 215 db.Close() 216 db = nil 217 } 218 } 219 }() 220 221 name := filer.Name() 222 if err = opts.check(name, false, true); err != nil { 223 return 224 } 225 226 sz, err := filer.Size() 227 if err != nil { 228 return 229 } 230 231 if sz%16 != 0 { 232 return nil, &os.PathError{Op: "kv.Open:", Path: name, Err: fmt.Errorf("file size %d(%#x) is not 0 (mod 16)", sz, sz)} 233 } 234 235 var b [16]byte 236 if n, err := filer.ReadAt(b[:], 0); n != 16 || err != nil { 237 return nil, &os.PathError{Op: "kv.Open.ReadAt", Path: name, Err: err} 238 } 239 240 var h header 241 if err = h.rd(b[:]); err != nil { 242 return nil, &os.PathError{Op: "kv.Open:validate header", Path: name, Err: err} 243 } 244 245 db = &DB{lock: opts.lock} 246 if filer, err = opts.acidFiler(db, filer); err != nil { 247 return nil, err 248 } 249 250 db.filer = filer 251 switch h.ver { 252 default: 253 return nil, &os.PathError{Op: "kv.Open", Path: name, Err: fmt.Errorf("unknown/unsupported kv file format version %#x", h.ver)} 254 case 0x00: 255 if _, err = open00(name, db); err != nil { 256 return nil, err 257 } 258 } 259 260 db.root, err = lldb.OpenBTree(db.alloc, opts.Compare, 1) 261 db.wal = opts.wal 262 if opts.VerifyDbAfterOpen { 263 err = verifyAllocator(db.alloc) 264 } 265 return 266} 267 268// Close closes the DB, rendering it unusable for I/O. It returns an error, if 269// any. Failing to call Close before exiting a program can lose the last open 270// or being committed transaction. 271// 272// Successful Close is idempotent. 273func (db *DB) Close() (err error) { 274 db.closeMu.Lock() 275 defer db.closeMu.Unlock() 276 if db.closed { 277 return 278 } 279 280 db.closed = true 281 282 if err = db.enter(); err != nil { 283 return 284 } 285 286 doLeave := true 287 defer func() { 288 db.wal = nil 289 if e := recover(); e != nil { 290 err = fmt.Errorf("%v", e) 291 } 292 if doLeave { 293 db.leave(&err) 294 } 295 }() 296 297 if db.acidTimer != nil { 298 db.acidTimer.Stop() 299 } 300 301 var e error 302 for db.acidNest > 0 { 303 db.acidNest-- 304 if e = db.filer.EndUpdate(); err == nil { 305 err = e 306 } 307 } 308 309 doLeave = false 310 if e = db.leave(&err); err == nil { 311 err = e 312 } 313 if db.opts.VerifyDbBeforeClose { 314 if e = verifyAllocator(db.alloc); err == nil { 315 err = e 316 } 317 } 318 if e = db.close(); err == nil { 319 err = e 320 } 321 if lock := db.lock; lock != nil { 322 if e = lock.Close(); err == nil { 323 err = e 324 } 325 } 326 if wal := db.wal; wal != nil { 327 e = wal.Close() 328 db.wal = nil 329 if err == nil { 330 err = e 331 } 332 } 333 return 334} 335 336func (db *DB) close() (err error) { 337 // We are safe to close due to locked db.closeMu, but not safe against 338 // any other goroutine concurrently calling other exported db methods, 339 // causing a race[0] in the db.enter() mechanism. So we must lock 340 // db.bkl. 341 // 342 // [0]: https://github.com/cznic/kv/issues/17#issuecomment-31960658 343 db.bkl.Lock() 344 defer db.bkl.Unlock() 345 346 if db.isMem { // lldb.MemFiler 347 return 348 } 349 350 err = db.filer.Sync() 351 if e := db.filer.Close(); err == nil { 352 err = e 353 } 354 if db.opts.VerifyDbAfterClose { 355 if e := verifyDbFile(db.Name()); err == nil { 356 err = e 357 } 358 } 359 return 360} 361 362// Name returns the name of the DB file. 363func (db *DB) Name() string { 364 return db.filer.Name() 365} 366 367// Size returns the size of the DB file. 368func (db *DB) Size() (sz int64, err error) { 369 db.bkl.Lock() 370 defer func() { 371 if e := recover(); e != nil { 372 err = fmt.Errorf("%v", e) 373 } 374 db.bkl.Unlock() 375 }() 376 377 return db.filer.Size() 378} 379 380func (db *DB) enter() (err error) { 381 db.bkl.Lock() 382 switch db.acidState { 383 default: 384 panic("internal error") 385 case stDisabled: 386 db.acidNest++ 387 if db.acidNest == 1 { 388 if err = db.filer.BeginUpdate(); err != nil { 389 return err 390 } 391 } 392 case stIdle: 393 if err = db.filer.BeginUpdate(); err != nil { 394 return err 395 } 396 397 db.acidNest = 1 398 db.acidTimer = time.AfterFunc(db.gracePeriod, db.timeout) 399 db.acidState = stCollecting 400 case stCollecting: 401 db.acidNest++ 402 case stIdleArmed: 403 db.acidNest = 1 404 db.acidState = stCollectingArmed 405 case stCollectingArmed: 406 db.acidNest++ 407 case stCollectingTriggered: 408 db.acidNest++ 409 case stEndUpdateFailed: 410 return db.leave(&err) 411 } 412 413 return nil 414} 415 416func (db *DB) leave(err *error) error { 417 switch db.acidState { 418 default: 419 panic("internal error") 420 case stDisabled: 421 db.acidNest-- 422 if db.acidNest == 0 { 423 if e := db.filer.EndUpdate(); e != nil && *err == nil { 424 *err = e 425 } 426 } 427 case stCollecting: 428 db.acidNest-- 429 if db.acidNest == 0 { 430 db.acidState = stIdleArmed 431 } 432 case stCollectingArmed: 433 db.acidNest-- 434 if db.acidNest == 0 { 435 db.acidState = stIdleArmed 436 } 437 case stCollectingTriggered: 438 db.acidNest-- 439 if db.acidNest == 0 { 440 if e := db.filer.EndUpdate(); e != nil && *err == nil { 441 *err = e 442 } 443 db.acidState = stIdle 444 } 445 case stEndUpdateFailed: 446 db.bkl.Unlock() 447 return fmt.Errorf("Last transaction commit failed: %v", db.lastCommitErr) 448 } 449 450 if *err != nil { 451 db.filer.Rollback() // return the original, input error 452 } 453 db.bkl.Unlock() 454 return *err 455} 456 457func (db *DB) timeout() { 458 db.closeMu.Lock() 459 defer db.closeMu.Unlock() 460 if db.closed { 461 return 462 } 463 464 db.bkl.Lock() 465 defer db.bkl.Unlock() 466 467 switch db.acidState { 468 default: 469 panic("internal error") 470 case stIdle: 471 panic("internal error") 472 case stCollecting: 473 db.acidState = stCollectingTriggered 474 case stIdleArmed: 475 if err := db.filer.EndUpdate(); err != nil { // If EndUpdate fails, no WAL was written (automatic Rollback) 476 db.acidState = stEndUpdateFailed 477 db.lastCommitErr = err 478 return 479 } 480 481 db.acidState = stIdle 482 case stCollectingArmed: 483 db.acidState = stCollectingTriggered 484 case stCollectingTriggered: 485 panic("internal error") 486 } 487} 488 489// BeginTransaction starts a new transaction. Every call to BeginTransaction 490// must be eventually "balanced" by exactly one call to Commit or Rollback (but 491// not both). Calls to BeginTransaction may nest. 492// 493// BeginTransaction is atomic and it is safe for concurrent use by multiple 494// goroutines (if/when that makes sense). 495func (db *DB) BeginTransaction() (err error) { 496 if err = db.enter(); err != nil { 497 return 498 } 499 500 defer func() { 501 if e := recover(); e != nil { 502 err = fmt.Errorf("%v", e) 503 } 504 db.leave(&err) 505 }() 506 507 db.acidNest++ 508 return db.filer.BeginUpdate() 509} 510 511// Commit commits the current transaction. If the transaction is the top level 512// one, then all of the changes made within the transaction are atomically made 513// persistent in the DB. Invocation of an unbalanced Commit is an error. 514// 515// Commit is atomic and it is safe for concurrent use by multiple goroutines 516// (if/when that makes sense). 517func (db *DB) Commit() (err error) { 518 if err = db.enter(); err != nil { 519 return 520 } 521 522 defer func() { 523 if e := recover(); e != nil { 524 err = fmt.Errorf("%v", e) 525 } 526 db.leave(&err) 527 }() 528 529 db.acidNest-- 530 return db.filer.EndUpdate() 531} 532 533// Rollback cancels and undoes the innermost transaction level. If the 534// transaction is the top level one, then no of the changes made within the 535// transactions are persisted. Invocation of an unbalanced Rollback is an 536// error. 537// 538// Rollback is atomic and it is safe for concurrent use by multiple goroutines 539// (if/when that makes sense). 540func (db *DB) Rollback() (err error) { 541 if err = db.enter(); err != nil { 542 return 543 } 544 545 defer func() { 546 if e := recover(); e != nil { 547 err = fmt.Errorf("%v", e) 548 } 549 db.leave(&err) 550 }() 551 552 db.acidNest-- 553 return db.filer.Rollback() 554} 555 556// Verify attempts to find any structural errors in DB wrt the organization of 557// it as defined by lldb.Allocator. Any problems found are reported to 'log' 558// except non verify related errors like disk read fails etc. If 'log' returns 559// false or the error doesn't allow to (reliably) continue, the verification 560// process is stopped and an error is returned from the Verify function. 561// Passing a nil log works like providing a log function always returning 562// false. Any non-structural errors, like for instance Filer read errors, are 563// NOT reported to 'log', but returned as the Verify's return value, because 564// Verify cannot proceed in such cases. Verify returns nil only if it fully 565// completed verifying DB without detecting any error. 566// 567// It is recommended to limit the number reported problems by returning false 568// from 'log' after reaching some limit. Huge and corrupted DB can produce an 569// overwhelming error report dataset. 570// 571// The verifying process will scan the whole DB at least 3 times (a trade 572// between processing space and time consumed). It doesn't read the content of 573// free blocks above the head/tail info bytes. If the 3rd phase detects lost 574// free space, then a 4th scan (a faster one) is performed to precisely report 575// all of them. 576// 577// Statistics are returned via 'stats' if non nil. The statistics are valid 578// only if Verify succeeded, ie. it didn't reported anything to log and it 579// returned a nil error. 580func (db *DB) Verify(log func(error) bool, stats *lldb.AllocStats) (err error) { 581 bitmapf, err := fileutil.TempFile("", "verifier", ".tmp") 582 if err != nil { 583 return 584 } 585 586 defer func() { 587 tn := bitmapf.Name() 588 bitmapf.Close() 589 os.Remove(tn) 590 }() 591 592 bitmap := lldb.NewSimpleFileFiler(bitmapf) 593 594 if err = db.enter(); err != nil { 595 return 596 } 597 598 defer func() { 599 if e := recover(); e != nil { 600 err = fmt.Errorf("%v", e) 601 } 602 db.leave(&err) 603 }() 604 605 return db.alloc.Verify(bitmap, log, stats) 606} 607 608// Delete deletes key and its associated value from the DB. 609// 610// Delete is atomic and it is safe for concurrent use by multiple goroutines. 611func (db *DB) Delete(key []byte) (err error) { 612 if err = db.enter(); err != nil { 613 return 614 } 615 616 err = db.root.Delete(key) 617 return db.leave(&err) 618} 619 620// Extract is a combination of Get and Delete. If the key exists in the DB, it 621// is returned (like Get) and also deleted from the DB in a more efficient way 622// which doesn't search for the key twice. The returned slice may be a 623// sub-slice of buf if buf was large enough to hold the entire content. 624// Otherwise, a newly allocated slice will be returned. It is valid to pass a 625// nil buf. 626// 627// Extract is atomic and it is safe for concurrent use by multiple goroutines. 628func (db *DB) Extract(buf, key []byte) (value []byte, err error) { 629 if err = db.enter(); err != nil { 630 return 631 } 632 633 value, err = db.root.Extract(buf, key) 634 db.leave(&err) 635 return 636} 637 638// First returns the first KV pair in the DB, if it exists. Otherwise key == 639// nil and value == nil. 640// 641// First is atomic and it is safe for concurrent use by multiple goroutines. 642func (db *DB) First() (key, value []byte, err error) { 643 db.bkl.Lock() 644 defer db.bkl.Unlock() 645 return db.root.First() 646} 647 648// Get returns the value associated with key, or nil if no such value exists. 649// The returned slice may be a sub-slice of buf if buf was large enough to hold 650// the entire content. Otherwise, a newly allocated slice will be returned. It 651// is valid to pass a nil buf. 652// 653// Get is atomic and it is safe for concurrent use by multiple goroutines. 654func (db *DB) Get(buf, key []byte) (value []byte, err error) { 655 db.bkl.Lock() 656 defer db.bkl.Unlock() 657 return db.root.Get(buf, key) 658} 659 660// Last returns the last KV pair of the DB, if it exists. Otherwise key == 661// nil and value == nil. 662// 663// Last is atomic and it is safe for concurrent use by multiple goroutines. 664func (db *DB) Last() (key, value []byte, err error) { 665 db.bkl.Lock() 666 defer db.bkl.Unlock() 667 return db.root.Last() 668} 669 670// Put combines Get and Set in a more efficient way where the DB is searched 671// for the key only once. The upd(ater) receives the current (key, old-value), 672// if that exists or (key, nil) otherwise. It can then return a (new-value, 673// true, nil) to create or overwrite the existing value in the KV pair, or 674// (whatever, false, nil) if it decides not to create or not to update the 675// value of the KV pair. 676// 677// db.Set(k, v) 678// 679// conceptually equals 680// 681// db.Put(k, func(k, v []byte){ return v, true }([]byte, bool)) 682// 683// modulo the differing return values. 684// 685// The returned slice may be a sub-slice of buf if buf was large enough to hold 686// the entire content. Otherwise, a newly allocated slice will be returned. It 687// is valid to pass a nil buf. 688// 689// Put is atomic and it is safe for concurrent use by multiple goroutines. 690func (db *DB) Put(buf, key []byte, upd func(key, old []byte) (new []byte, write bool, err error)) (old []byte, written bool, err error) { 691 if err = db.enter(); err != nil { 692 return 693 } 694 695 old, written, err = db.root.Put(buf, key, upd) 696 db.leave(&err) 697 return 698} 699 700// Seek returns an enumerator positioned on the first key/value pair whose key 701// is 'greater than or equal to' the given key. There may be no such pair, in 702// which case the Next,Prev methods of the returned enumerator will always 703// return io.EOF. 704// 705// Seek is atomic and it is safe for concurrent use by multiple goroutines. 706func (db *DB) Seek(key []byte) (enum *Enumerator, hit bool, err error) { 707 db.bkl.Lock() 708 defer db.bkl.Unlock() 709 enum0, hit, err := db.root.Seek(key) 710 if err != nil { 711 return 712 } 713 714 enum = &Enumerator{ 715 db: db, 716 enum: enum0, 717 } 718 return 719} 720 721// SeekFirst returns an enumerator positioned on the first KV pair in the DB, 722// if any. For an empty DB, err == io.EOF is returned. 723// 724// SeekFirst is atomic and it is safe for concurrent use by multiple 725// goroutines. 726func (db *DB) SeekFirst() (enum *Enumerator, err error) { 727 db.bkl.Lock() 728 defer db.bkl.Unlock() 729 enum0, err := db.root.SeekFirst() 730 if err != nil { 731 return 732 } 733 734 enum = &Enumerator{ 735 db: db, 736 enum: enum0, 737 } 738 return 739} 740 741// SeekLast returns an enumerator positioned on the last KV pair in the DB, 742// if any. For an empty DB, err == io.EOF is returned. 743// 744// SeekLast is atomic and it is safe for concurrent use by multiple 745// goroutines. 746func (db *DB) SeekLast() (enum *Enumerator, err error) { 747 db.bkl.Lock() 748 defer db.bkl.Unlock() 749 enum0, err := db.root.SeekLast() 750 if err != nil { 751 return 752 } 753 754 enum = &Enumerator{ 755 db: db, 756 enum: enum0, 757 } 758 return 759} 760 761// Set sets the value associated with key. Any previous value, if existed, is 762// overwritten by the new one. 763// 764// Set is atomic and it is safe for concurrent use by multiple goroutines. 765func (db *DB) Set(key, value []byte) (err error) { 766 if err = db.enter(); err != nil { 767 return 768 } 769 770 err = db.root.Set(key, value) 771 db.leave(&err) 772 return 773} 774 775// Enumerator captures the state of enumerating a DB. It is returned from the 776// Seek* methods. Multiple enumerations may be in progress simultaneously. The 777// enumerator is aware of any mutations made to the tree in the process of 778// enumerating it and automatically resumes the enumeration. 779// 780// Multiple concurrently executing enumerations may be in progress. 781type Enumerator struct { 782 db *DB 783 enum *lldb.BTreeEnumerator 784} 785 786// Next returns the currently enumerated KV pair, if it exists and moves to the 787// next KV in the key collation order. If there is no KV pair to return, err == 788// io.EOF is returned. 789// 790// Next is atomic and it is safe for concurrent use by multiple goroutines. 791func (e *Enumerator) Next() (key, value []byte, err error) { 792 e.db.bkl.Lock() 793 defer e.db.bkl.Unlock() 794 return e.enum.Next() 795} 796 797// Prev returns the currently enumerated KV pair, if it exists and moves to the 798// previous KV in the key collation order. If there is no KV pair to return, 799// err == io.EOF is returned. 800// 801// Prev is atomic and it is safe for concurrent use by multiple goroutines. 802func (e *Enumerator) Prev() (key, value []byte, err error) { 803 e.db.bkl.Lock() 804 defer e.db.bkl.Unlock() 805 return e.enum.Prev() 806} 807 808// Inc atomically increments the value associated with key by delta and 809// returns the new value. If the value doesn't exists before calling Inc or if 810// the value is not an [8]byte, the value is considered to be zero before peforming Inc. 811// 812// Inc is atomic and it is safe for concurrent use by multiple goroutines. 813func (db *DB) Inc(key []byte, delta int64) (val int64, err error) { 814 if err = db.enter(); err != nil { 815 return 816 } 817 818 defer db.leave(&err) 819 820 pbuf := buffer.Get(8) 821 defer buffer.Put(pbuf) 822 _, _, err = db.root.Put( 823 *pbuf, 824 key, 825 func(key []byte, old []byte) (new []byte, write bool, err error) { 826 write = true 827 if len(old) == 8 { 828 val = int64(binary.BigEndian.Uint64(old)) 829 } else { 830 old = make([]byte, 8) 831 val = 0 832 } 833 val += delta 834 binary.BigEndian.PutUint64(old, uint64(val)) 835 new = old 836 return 837 }, 838 ) 839 840 return 841} 842 843// WALName returns the name of the WAL file in use or an empty string for memory 844// or closed databases. 845func (db *DB) WALName() string { 846 if f := db.wal; f != nil { 847 return f.Name() 848 } 849 850 return "" 851} 852