1// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com> 2// All rights reserved. 3// 4// Use of this source code is governed by a BSD-style license that can be 5// found in the LICENSE file. 6 7package leveldb 8 9import ( 10 "container/list" 11 "fmt" 12 "io" 13 "os" 14 "runtime" 15 "strings" 16 "sync" 17 "sync/atomic" 18 "time" 19 20 "github.com/syndtr/goleveldb/leveldb/errors" 21 "github.com/syndtr/goleveldb/leveldb/iterator" 22 "github.com/syndtr/goleveldb/leveldb/journal" 23 "github.com/syndtr/goleveldb/leveldb/memdb" 24 "github.com/syndtr/goleveldb/leveldb/opt" 25 "github.com/syndtr/goleveldb/leveldb/storage" 26 "github.com/syndtr/goleveldb/leveldb/table" 27 "github.com/syndtr/goleveldb/leveldb/util" 28) 29 30// DB is a LevelDB database. 31type DB struct { 32 // Need 64-bit alignment. 33 seq uint64 34 35 // Stats. Need 64-bit alignment. 36 cWriteDelay int64 // The cumulative duration of write delays 37 cWriteDelayN int32 // The cumulative number of write delays 38 inWritePaused int32 // The indicator whether write operation is paused by compaction 39 aliveSnaps, aliveIters int32 40 41 // Session. 42 s *session 43 44 // MemDB. 45 memMu sync.RWMutex 46 memPool chan *memdb.DB 47 mem, frozenMem *memDB 48 journal *journal.Writer 49 journalWriter storage.Writer 50 journalFd storage.FileDesc 51 frozenJournalFd storage.FileDesc 52 frozenSeq uint64 53 54 // Snapshot. 55 snapsMu sync.Mutex 56 snapsList *list.List 57 58 // Write. 59 batchPool sync.Pool 60 writeMergeC chan writeMerge 61 writeMergedC chan bool 62 writeLockC chan struct{} 63 writeAckC chan error 64 writeDelay time.Duration 65 writeDelayN int 66 tr *Transaction 67 68 // Compaction. 69 compCommitLk sync.Mutex 70 tcompCmdC chan cCmd 71 tcompPauseC chan chan<- struct{} 72 mcompCmdC chan cCmd 73 compErrC chan error 74 compPerErrC chan error 75 compErrSetC chan error 76 compWriteLocking bool 77 compStats cStats 78 memdbMaxLevel int // For testing. 79 80 // Close. 81 closeW sync.WaitGroup 82 closeC chan struct{} 83 closed uint32 84 closer io.Closer 85} 86 87func openDB(s *session) (*DB, error) { 88 s.log("db@open opening") 89 start := time.Now() 90 db := &DB{ 91 s: s, 92 // Initial sequence 93 seq: s.stSeqNum, 94 // MemDB 95 memPool: make(chan *memdb.DB, 1), 96 // Snapshot 97 snapsList: list.New(), 98 // Write 99 batchPool: sync.Pool{New: newBatch}, 100 writeMergeC: make(chan writeMerge), 101 writeMergedC: make(chan bool), 102 writeLockC: make(chan struct{}, 1), 103 writeAckC: make(chan error), 104 // Compaction 105 tcompCmdC: make(chan cCmd), 106 tcompPauseC: make(chan chan<- struct{}), 107 mcompCmdC: make(chan cCmd), 108 compErrC: make(chan error), 109 compPerErrC: make(chan error), 110 compErrSetC: make(chan error), 111 // Close 112 closeC: make(chan struct{}), 113 } 114 115 // Read-only mode. 116 readOnly := s.o.GetReadOnly() 117 118 if readOnly { 119 // Recover journals (read-only mode). 120 if err := db.recoverJournalRO(); err != nil { 121 return nil, err 122 } 123 } else { 124 // Recover journals. 125 if err := db.recoverJournal(); err != nil { 126 return nil, err 127 } 128 129 // Remove any obsolete files. 130 if err := db.checkAndCleanFiles(); err != nil { 131 // Close journal. 132 if db.journal != nil { 133 db.journal.Close() 134 db.journalWriter.Close() 135 } 136 return nil, err 137 } 138 139 } 140 141 // Doesn't need to be included in the wait group. 142 go db.compactionError() 143 go db.mpoolDrain() 144 145 if readOnly { 146 db.SetReadOnly() 147 } else { 148 db.closeW.Add(2) 149 go db.tCompaction() 150 go db.mCompaction() 151 // go db.jWriter() 152 } 153 154 s.logf("db@open done T·%v", time.Since(start)) 155 156 runtime.SetFinalizer(db, (*DB).Close) 157 return db, nil 158} 159 160// Open opens or creates a DB for the given storage. 161// The DB will be created if not exist, unless ErrorIfMissing is true. 162// Also, if ErrorIfExist is true and the DB exist Open will returns 163// os.ErrExist error. 164// 165// Open will return an error with type of ErrCorrupted if corruption 166// detected in the DB. Use errors.IsCorrupted to test whether an error is 167// due to corruption. Corrupted DB can be recovered with Recover function. 168// 169// The returned DB instance is safe for concurrent use. 170// The DB must be closed after use, by calling Close method. 171func Open(stor storage.Storage, o *opt.Options) (db *DB, err error) { 172 s, err := newSession(stor, o) 173 if err != nil { 174 return 175 } 176 defer func() { 177 if err != nil { 178 s.close() 179 s.release() 180 } 181 }() 182 183 err = s.recover() 184 if err != nil { 185 if !os.IsNotExist(err) || s.o.GetErrorIfMissing() || s.o.GetReadOnly() { 186 return 187 } 188 err = s.create() 189 if err != nil { 190 return 191 } 192 } else if s.o.GetErrorIfExist() { 193 err = os.ErrExist 194 return 195 } 196 197 return openDB(s) 198} 199 200// OpenFile opens or creates a DB for the given path. 201// The DB will be created if not exist, unless ErrorIfMissing is true. 202// Also, if ErrorIfExist is true and the DB exist OpenFile will returns 203// os.ErrExist error. 204// 205// OpenFile uses standard file-system backed storage implementation as 206// described in the leveldb/storage package. 207// 208// OpenFile will return an error with type of ErrCorrupted if corruption 209// detected in the DB. Use errors.IsCorrupted to test whether an error is 210// due to corruption. Corrupted DB can be recovered with Recover function. 211// 212// The returned DB instance is safe for concurrent use. 213// The DB must be closed after use, by calling Close method. 214func OpenFile(path string, o *opt.Options) (db *DB, err error) { 215 stor, err := storage.OpenFile(path, o.GetReadOnly()) 216 if err != nil { 217 return 218 } 219 db, err = Open(stor, o) 220 if err != nil { 221 stor.Close() 222 } else { 223 db.closer = stor 224 } 225 return 226} 227 228// Recover recovers and opens a DB with missing or corrupted manifest files 229// for the given storage. It will ignore any manifest files, valid or not. 230// The DB must already exist or it will returns an error. 231// Also, Recover will ignore ErrorIfMissing and ErrorIfExist options. 232// 233// The returned DB instance is safe for concurrent use. 234// The DB must be closed after use, by calling Close method. 235func Recover(stor storage.Storage, o *opt.Options) (db *DB, err error) { 236 s, err := newSession(stor, o) 237 if err != nil { 238 return 239 } 240 defer func() { 241 if err != nil { 242 s.close() 243 s.release() 244 } 245 }() 246 247 err = recoverTable(s, o) 248 if err != nil { 249 return 250 } 251 return openDB(s) 252} 253 254// RecoverFile recovers and opens a DB with missing or corrupted manifest files 255// for the given path. It will ignore any manifest files, valid or not. 256// The DB must already exist or it will returns an error. 257// Also, Recover will ignore ErrorIfMissing and ErrorIfExist options. 258// 259// RecoverFile uses standard file-system backed storage implementation as described 260// in the leveldb/storage package. 261// 262// The returned DB instance is safe for concurrent use. 263// The DB must be closed after use, by calling Close method. 264func RecoverFile(path string, o *opt.Options) (db *DB, err error) { 265 stor, err := storage.OpenFile(path, false) 266 if err != nil { 267 return 268 } 269 db, err = Recover(stor, o) 270 if err != nil { 271 stor.Close() 272 } else { 273 db.closer = stor 274 } 275 return 276} 277 278func recoverTable(s *session, o *opt.Options) error { 279 o = dupOptions(o) 280 // Mask StrictReader, lets StrictRecovery doing its job. 281 o.Strict &= ^opt.StrictReader 282 283 // Get all tables and sort it by file number. 284 fds, err := s.stor.List(storage.TypeTable) 285 if err != nil { 286 return err 287 } 288 sortFds(fds) 289 290 var ( 291 maxSeq uint64 292 recoveredKey, goodKey, corruptedKey, corruptedBlock, droppedTable int 293 294 // We will drop corrupted table. 295 strict = o.GetStrict(opt.StrictRecovery) 296 noSync = o.GetNoSync() 297 298 rec = &sessionRecord{} 299 bpool = util.NewBufferPool(o.GetBlockSize() + 5) 300 ) 301 buildTable := func(iter iterator.Iterator) (tmpFd storage.FileDesc, size int64, err error) { 302 tmpFd = s.newTemp() 303 writer, err := s.stor.Create(tmpFd) 304 if err != nil { 305 return 306 } 307 defer func() { 308 writer.Close() 309 if err != nil { 310 s.stor.Remove(tmpFd) 311 tmpFd = storage.FileDesc{} 312 } 313 }() 314 315 // Copy entries. 316 tw := table.NewWriter(writer, o) 317 for iter.Next() { 318 key := iter.Key() 319 if validInternalKey(key) { 320 err = tw.Append(key, iter.Value()) 321 if err != nil { 322 return 323 } 324 } 325 } 326 err = iter.Error() 327 if err != nil && !errors.IsCorrupted(err) { 328 return 329 } 330 err = tw.Close() 331 if err != nil { 332 return 333 } 334 if !noSync { 335 err = writer.Sync() 336 if err != nil { 337 return 338 } 339 } 340 size = int64(tw.BytesLen()) 341 return 342 } 343 recoverTable := func(fd storage.FileDesc) error { 344 s.logf("table@recovery recovering @%d", fd.Num) 345 reader, err := s.stor.Open(fd) 346 if err != nil { 347 return err 348 } 349 var closed bool 350 defer func() { 351 if !closed { 352 reader.Close() 353 } 354 }() 355 356 // Get file size. 357 size, err := reader.Seek(0, 2) 358 if err != nil { 359 return err 360 } 361 362 var ( 363 tSeq uint64 364 tgoodKey, tcorruptedKey, tcorruptedBlock int 365 imin, imax []byte 366 ) 367 tr, err := table.NewReader(reader, size, fd, nil, bpool, o) 368 if err != nil { 369 return err 370 } 371 iter := tr.NewIterator(nil, nil) 372 if itererr, ok := iter.(iterator.ErrorCallbackSetter); ok { 373 itererr.SetErrorCallback(func(err error) { 374 if errors.IsCorrupted(err) { 375 s.logf("table@recovery block corruption @%d %q", fd.Num, err) 376 tcorruptedBlock++ 377 } 378 }) 379 } 380 381 // Scan the table. 382 for iter.Next() { 383 key := iter.Key() 384 _, seq, _, kerr := parseInternalKey(key) 385 if kerr != nil { 386 tcorruptedKey++ 387 continue 388 } 389 tgoodKey++ 390 if seq > tSeq { 391 tSeq = seq 392 } 393 if imin == nil { 394 imin = append([]byte{}, key...) 395 } 396 imax = append(imax[:0], key...) 397 } 398 if err := iter.Error(); err != nil && !errors.IsCorrupted(err) { 399 iter.Release() 400 return err 401 } 402 iter.Release() 403 404 goodKey += tgoodKey 405 corruptedKey += tcorruptedKey 406 corruptedBlock += tcorruptedBlock 407 408 if strict && (tcorruptedKey > 0 || tcorruptedBlock > 0) { 409 droppedTable++ 410 s.logf("table@recovery dropped @%d Gk·%d Ck·%d Cb·%d S·%d Q·%d", fd.Num, tgoodKey, tcorruptedKey, tcorruptedBlock, size, tSeq) 411 return nil 412 } 413 414 if tgoodKey > 0 { 415 if tcorruptedKey > 0 || tcorruptedBlock > 0 { 416 // Rebuild the table. 417 s.logf("table@recovery rebuilding @%d", fd.Num) 418 iter := tr.NewIterator(nil, nil) 419 tmpFd, newSize, err := buildTable(iter) 420 iter.Release() 421 if err != nil { 422 return err 423 } 424 closed = true 425 reader.Close() 426 if err := s.stor.Rename(tmpFd, fd); err != nil { 427 return err 428 } 429 size = newSize 430 } 431 if tSeq > maxSeq { 432 maxSeq = tSeq 433 } 434 recoveredKey += tgoodKey 435 // Add table to level 0. 436 rec.addTable(0, fd.Num, size, imin, imax) 437 s.logf("table@recovery recovered @%d Gk·%d Ck·%d Cb·%d S·%d Q·%d", fd.Num, tgoodKey, tcorruptedKey, tcorruptedBlock, size, tSeq) 438 } else { 439 droppedTable++ 440 s.logf("table@recovery unrecoverable @%d Ck·%d Cb·%d S·%d", fd.Num, tcorruptedKey, tcorruptedBlock, size) 441 } 442 443 return nil 444 } 445 446 // Recover all tables. 447 if len(fds) > 0 { 448 s.logf("table@recovery F·%d", len(fds)) 449 450 // Mark file number as used. 451 s.markFileNum(fds[len(fds)-1].Num) 452 453 for _, fd := range fds { 454 if err := recoverTable(fd); err != nil { 455 return err 456 } 457 } 458 459 s.logf("table@recovery recovered F·%d N·%d Gk·%d Ck·%d Q·%d", len(fds), recoveredKey, goodKey, corruptedKey, maxSeq) 460 } 461 462 // Set sequence number. 463 rec.setSeqNum(maxSeq) 464 465 // Create new manifest. 466 if err := s.create(); err != nil { 467 return err 468 } 469 470 // Commit. 471 return s.commit(rec) 472} 473 474func (db *DB) recoverJournal() error { 475 // Get all journals and sort it by file number. 476 rawFds, err := db.s.stor.List(storage.TypeJournal) 477 if err != nil { 478 return err 479 } 480 sortFds(rawFds) 481 482 // Journals that will be recovered. 483 var fds []storage.FileDesc 484 for _, fd := range rawFds { 485 if fd.Num >= db.s.stJournalNum || fd.Num == db.s.stPrevJournalNum { 486 fds = append(fds, fd) 487 } 488 } 489 490 var ( 491 ofd storage.FileDesc // Obsolete file. 492 rec = &sessionRecord{} 493 ) 494 495 // Recover journals. 496 if len(fds) > 0 { 497 db.logf("journal@recovery F·%d", len(fds)) 498 499 // Mark file number as used. 500 db.s.markFileNum(fds[len(fds)-1].Num) 501 502 var ( 503 // Options. 504 strict = db.s.o.GetStrict(opt.StrictJournal) 505 checksum = db.s.o.GetStrict(opt.StrictJournalChecksum) 506 writeBuffer = db.s.o.GetWriteBuffer() 507 508 jr *journal.Reader 509 mdb = memdb.New(db.s.icmp, writeBuffer) 510 buf = &util.Buffer{} 511 batchSeq uint64 512 batchLen int 513 ) 514 515 for _, fd := range fds { 516 db.logf("journal@recovery recovering @%d", fd.Num) 517 518 fr, err := db.s.stor.Open(fd) 519 if err != nil { 520 return err 521 } 522 523 // Create or reset journal reader instance. 524 if jr == nil { 525 jr = journal.NewReader(fr, dropper{db.s, fd}, strict, checksum) 526 } else { 527 jr.Reset(fr, dropper{db.s, fd}, strict, checksum) 528 } 529 530 // Flush memdb and remove obsolete journal file. 531 if !ofd.Zero() { 532 if mdb.Len() > 0 { 533 if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil { 534 fr.Close() 535 return err 536 } 537 } 538 539 rec.setJournalNum(fd.Num) 540 rec.setSeqNum(db.seq) 541 if err := db.s.commit(rec); err != nil { 542 fr.Close() 543 return err 544 } 545 rec.resetAddedTables() 546 547 db.s.stor.Remove(ofd) 548 ofd = storage.FileDesc{} 549 } 550 551 // Replay journal to memdb. 552 mdb.Reset() 553 for { 554 r, err := jr.Next() 555 if err != nil { 556 if err == io.EOF { 557 break 558 } 559 560 fr.Close() 561 return errors.SetFd(err, fd) 562 } 563 564 buf.Reset() 565 if _, err := buf.ReadFrom(r); err != nil { 566 if err == io.ErrUnexpectedEOF { 567 // This is error returned due to corruption, with strict == false. 568 continue 569 } 570 571 fr.Close() 572 return errors.SetFd(err, fd) 573 } 574 batchSeq, batchLen, err = decodeBatchToMem(buf.Bytes(), db.seq, mdb) 575 if err != nil { 576 if !strict && errors.IsCorrupted(err) { 577 db.s.logf("journal error: %v (skipped)", err) 578 // We won't apply sequence number as it might be corrupted. 579 continue 580 } 581 582 fr.Close() 583 return errors.SetFd(err, fd) 584 } 585 586 // Save sequence number. 587 db.seq = batchSeq + uint64(batchLen) 588 589 // Flush it if large enough. 590 if mdb.Size() >= writeBuffer { 591 if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil { 592 fr.Close() 593 return err 594 } 595 596 mdb.Reset() 597 } 598 } 599 600 fr.Close() 601 ofd = fd 602 } 603 604 // Flush the last memdb. 605 if mdb.Len() > 0 { 606 if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil { 607 return err 608 } 609 } 610 } 611 612 // Create a new journal. 613 if _, err := db.newMem(0); err != nil { 614 return err 615 } 616 617 // Commit. 618 rec.setJournalNum(db.journalFd.Num) 619 rec.setSeqNum(db.seq) 620 if err := db.s.commit(rec); err != nil { 621 // Close journal on error. 622 if db.journal != nil { 623 db.journal.Close() 624 db.journalWriter.Close() 625 } 626 return err 627 } 628 629 // Remove the last obsolete journal file. 630 if !ofd.Zero() { 631 db.s.stor.Remove(ofd) 632 } 633 634 return nil 635} 636 637func (db *DB) recoverJournalRO() error { 638 // Get all journals and sort it by file number. 639 rawFds, err := db.s.stor.List(storage.TypeJournal) 640 if err != nil { 641 return err 642 } 643 sortFds(rawFds) 644 645 // Journals that will be recovered. 646 var fds []storage.FileDesc 647 for _, fd := range rawFds { 648 if fd.Num >= db.s.stJournalNum || fd.Num == db.s.stPrevJournalNum { 649 fds = append(fds, fd) 650 } 651 } 652 653 var ( 654 // Options. 655 strict = db.s.o.GetStrict(opt.StrictJournal) 656 checksum = db.s.o.GetStrict(opt.StrictJournalChecksum) 657 writeBuffer = db.s.o.GetWriteBuffer() 658 659 mdb = memdb.New(db.s.icmp, writeBuffer) 660 ) 661 662 // Recover journals. 663 if len(fds) > 0 { 664 db.logf("journal@recovery RO·Mode F·%d", len(fds)) 665 666 var ( 667 jr *journal.Reader 668 buf = &util.Buffer{} 669 batchSeq uint64 670 batchLen int 671 ) 672 673 for _, fd := range fds { 674 db.logf("journal@recovery recovering @%d", fd.Num) 675 676 fr, err := db.s.stor.Open(fd) 677 if err != nil { 678 return err 679 } 680 681 // Create or reset journal reader instance. 682 if jr == nil { 683 jr = journal.NewReader(fr, dropper{db.s, fd}, strict, checksum) 684 } else { 685 jr.Reset(fr, dropper{db.s, fd}, strict, checksum) 686 } 687 688 // Replay journal to memdb. 689 for { 690 r, err := jr.Next() 691 if err != nil { 692 if err == io.EOF { 693 break 694 } 695 696 fr.Close() 697 return errors.SetFd(err, fd) 698 } 699 700 buf.Reset() 701 if _, err := buf.ReadFrom(r); err != nil { 702 if err == io.ErrUnexpectedEOF { 703 // This is error returned due to corruption, with strict == false. 704 continue 705 } 706 707 fr.Close() 708 return errors.SetFd(err, fd) 709 } 710 batchSeq, batchLen, err = decodeBatchToMem(buf.Bytes(), db.seq, mdb) 711 if err != nil { 712 if !strict && errors.IsCorrupted(err) { 713 db.s.logf("journal error: %v (skipped)", err) 714 // We won't apply sequence number as it might be corrupted. 715 continue 716 } 717 718 fr.Close() 719 return errors.SetFd(err, fd) 720 } 721 722 // Save sequence number. 723 db.seq = batchSeq + uint64(batchLen) 724 } 725 726 fr.Close() 727 } 728 } 729 730 // Set memDB. 731 db.mem = &memDB{db: db, DB: mdb, ref: 1} 732 733 return nil 734} 735 736func memGet(mdb *memdb.DB, ikey internalKey, icmp *iComparer) (ok bool, mv []byte, err error) { 737 mk, mv, err := mdb.Find(ikey) 738 if err == nil { 739 ukey, _, kt, kerr := parseInternalKey(mk) 740 if kerr != nil { 741 // Shouldn't have had happen. 742 panic(kerr) 743 } 744 if icmp.uCompare(ukey, ikey.ukey()) == 0 { 745 if kt == keyTypeDel { 746 return true, nil, ErrNotFound 747 } 748 return true, mv, nil 749 750 } 751 } else if err != ErrNotFound { 752 return true, nil, err 753 } 754 return 755} 756 757func (db *DB) get(auxm *memdb.DB, auxt tFiles, key []byte, seq uint64, ro *opt.ReadOptions) (value []byte, err error) { 758 ikey := makeInternalKey(nil, key, seq, keyTypeSeek) 759 760 if auxm != nil { 761 if ok, mv, me := memGet(auxm, ikey, db.s.icmp); ok { 762 return append([]byte{}, mv...), me 763 } 764 } 765 766 em, fm := db.getMems() 767 for _, m := range [...]*memDB{em, fm} { 768 if m == nil { 769 continue 770 } 771 defer m.decref() 772 773 if ok, mv, me := memGet(m.DB, ikey, db.s.icmp); ok { 774 return append([]byte{}, mv...), me 775 } 776 } 777 778 v := db.s.version() 779 value, cSched, err := v.get(auxt, ikey, ro, false) 780 v.release() 781 if cSched { 782 // Trigger table compaction. 783 db.compTrigger(db.tcompCmdC) 784 } 785 return 786} 787 788func nilIfNotFound(err error) error { 789 if err == ErrNotFound { 790 return nil 791 } 792 return err 793} 794 795func (db *DB) has(auxm *memdb.DB, auxt tFiles, key []byte, seq uint64, ro *opt.ReadOptions) (ret bool, err error) { 796 ikey := makeInternalKey(nil, key, seq, keyTypeSeek) 797 798 if auxm != nil { 799 if ok, _, me := memGet(auxm, ikey, db.s.icmp); ok { 800 return me == nil, nilIfNotFound(me) 801 } 802 } 803 804 em, fm := db.getMems() 805 for _, m := range [...]*memDB{em, fm} { 806 if m == nil { 807 continue 808 } 809 defer m.decref() 810 811 if ok, _, me := memGet(m.DB, ikey, db.s.icmp); ok { 812 return me == nil, nilIfNotFound(me) 813 } 814 } 815 816 v := db.s.version() 817 _, cSched, err := v.get(auxt, ikey, ro, true) 818 v.release() 819 if cSched { 820 // Trigger table compaction. 821 db.compTrigger(db.tcompCmdC) 822 } 823 if err == nil { 824 ret = true 825 } else if err == ErrNotFound { 826 err = nil 827 } 828 return 829} 830 831// Get gets the value for the given key. It returns ErrNotFound if the 832// DB does not contains the key. 833// 834// The returned slice is its own copy, it is safe to modify the contents 835// of the returned slice. 836// It is safe to modify the contents of the argument after Get returns. 837func (db *DB) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) { 838 err = db.ok() 839 if err != nil { 840 return 841 } 842 843 se := db.acquireSnapshot() 844 defer db.releaseSnapshot(se) 845 return db.get(nil, nil, key, se.seq, ro) 846} 847 848// Has returns true if the DB does contains the given key. 849// 850// It is safe to modify the contents of the argument after Has returns. 851func (db *DB) Has(key []byte, ro *opt.ReadOptions) (ret bool, err error) { 852 err = db.ok() 853 if err != nil { 854 return 855 } 856 857 se := db.acquireSnapshot() 858 defer db.releaseSnapshot(se) 859 return db.has(nil, nil, key, se.seq, ro) 860} 861 862// NewIterator returns an iterator for the latest snapshot of the 863// underlying DB. 864// The returned iterator is not safe for concurrent use, but it is safe to use 865// multiple iterators concurrently, with each in a dedicated goroutine. 866// It is also safe to use an iterator concurrently with modifying its 867// underlying DB. The resultant key/value pairs are guaranteed to be 868// consistent. 869// 870// Slice allows slicing the iterator to only contains keys in the given 871// range. A nil Range.Start is treated as a key before all keys in the 872// DB. And a nil Range.Limit is treated as a key after all keys in 873// the DB. 874// 875// WARNING: Any slice returned by interator (e.g. slice returned by calling 876// Iterator.Key() or Iterator.Key() methods), its content should not be modified 877// unless noted otherwise. 878// 879// The iterator must be released after use, by calling Release method. 880// 881// Also read Iterator documentation of the leveldb/iterator package. 882func (db *DB) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator { 883 if err := db.ok(); err != nil { 884 return iterator.NewEmptyIterator(err) 885 } 886 887 se := db.acquireSnapshot() 888 defer db.releaseSnapshot(se) 889 // Iterator holds 'version' lock, 'version' is immutable so snapshot 890 // can be released after iterator created. 891 return db.newIterator(nil, nil, se.seq, slice, ro) 892} 893 894// GetSnapshot returns a latest snapshot of the underlying DB. A snapshot 895// is a frozen snapshot of a DB state at a particular point in time. The 896// content of snapshot are guaranteed to be consistent. 897// 898// The snapshot must be released after use, by calling Release method. 899func (db *DB) GetSnapshot() (*Snapshot, error) { 900 if err := db.ok(); err != nil { 901 return nil, err 902 } 903 904 return db.newSnapshot(), nil 905} 906 907// GetProperty returns value of the given property name. 908// 909// Property names: 910// leveldb.num-files-at-level{n} 911// Returns the number of files at level 'n'. 912// leveldb.stats 913// Returns statistics of the underlying DB. 914// leveldb.iostats 915// Returns statistics of effective disk read and write. 916// leveldb.writedelay 917// Returns cumulative write delay caused by compaction. 918// leveldb.sstables 919// Returns sstables list for each level. 920// leveldb.blockpool 921// Returns block pool stats. 922// leveldb.cachedblock 923// Returns size of cached block. 924// leveldb.openedtables 925// Returns number of opened tables. 926// leveldb.alivesnaps 927// Returns number of alive snapshots. 928// leveldb.aliveiters 929// Returns number of alive iterators. 930func (db *DB) GetProperty(name string) (value string, err error) { 931 err = db.ok() 932 if err != nil { 933 return 934 } 935 936 const prefix = "leveldb." 937 if !strings.HasPrefix(name, prefix) { 938 return "", ErrNotFound 939 } 940 p := name[len(prefix):] 941 942 v := db.s.version() 943 defer v.release() 944 945 numFilesPrefix := "num-files-at-level" 946 switch { 947 case strings.HasPrefix(p, numFilesPrefix): 948 var level uint 949 var rest string 950 n, _ := fmt.Sscanf(p[len(numFilesPrefix):], "%d%s", &level, &rest) 951 if n != 1 { 952 err = ErrNotFound 953 } else { 954 value = fmt.Sprint(v.tLen(int(level))) 955 } 956 case p == "stats": 957 value = "Compactions\n" + 958 " Level | Tables | Size(MB) | Time(sec) | Read(MB) | Write(MB)\n" + 959 "-------+------------+---------------+---------------+---------------+---------------\n" 960 for level, tables := range v.levels { 961 duration, read, write := db.compStats.getStat(level) 962 if len(tables) == 0 && duration == 0 { 963 continue 964 } 965 value += fmt.Sprintf(" %3d | %10d | %13.5f | %13.5f | %13.5f | %13.5f\n", 966 level, len(tables), float64(tables.size())/1048576.0, duration.Seconds(), 967 float64(read)/1048576.0, float64(write)/1048576.0) 968 } 969 case p == "iostats": 970 value = fmt.Sprintf("Read(MB):%.5f Write(MB):%.5f", 971 float64(db.s.stor.reads())/1048576.0, 972 float64(db.s.stor.writes())/1048576.0) 973 case p == "writedelay": 974 writeDelayN, writeDelay := atomic.LoadInt32(&db.cWriteDelayN), time.Duration(atomic.LoadInt64(&db.cWriteDelay)) 975 paused := atomic.LoadInt32(&db.inWritePaused) == 1 976 value = fmt.Sprintf("DelayN:%d Delay:%s Paused:%t", writeDelayN, writeDelay, paused) 977 case p == "sstables": 978 for level, tables := range v.levels { 979 value += fmt.Sprintf("--- level %d ---\n", level) 980 for _, t := range tables { 981 value += fmt.Sprintf("%d:%d[%q .. %q]\n", t.fd.Num, t.size, t.imin, t.imax) 982 } 983 } 984 case p == "blockpool": 985 value = fmt.Sprintf("%v", db.s.tops.bpool) 986 case p == "cachedblock": 987 if db.s.tops.bcache != nil { 988 value = fmt.Sprintf("%d", db.s.tops.bcache.Size()) 989 } else { 990 value = "<nil>" 991 } 992 case p == "openedtables": 993 value = fmt.Sprintf("%d", db.s.tops.cache.Size()) 994 case p == "alivesnaps": 995 value = fmt.Sprintf("%d", atomic.LoadInt32(&db.aliveSnaps)) 996 case p == "aliveiters": 997 value = fmt.Sprintf("%d", atomic.LoadInt32(&db.aliveIters)) 998 default: 999 err = ErrNotFound 1000 } 1001 1002 return 1003} 1004 1005// DBStats is database statistics. 1006type DBStats struct { 1007 WriteDelayCount int32 1008 WriteDelayDuration time.Duration 1009 WritePaused bool 1010 1011 AliveSnapshots int32 1012 AliveIterators int32 1013 1014 IOWrite uint64 1015 IORead uint64 1016 1017 BlockCacheSize int 1018 OpenedTablesCount int 1019 1020 LevelSizes []int64 1021 LevelTablesCounts []int 1022 LevelRead []int64 1023 LevelWrite []int64 1024 LevelDurations []time.Duration 1025} 1026 1027// Stats populates s with database statistics. 1028func (db *DB) Stats(s *DBStats) error { 1029 err := db.ok() 1030 if err != nil { 1031 return err 1032 } 1033 1034 s.IORead = db.s.stor.reads() 1035 s.IOWrite = db.s.stor.writes() 1036 s.WriteDelayCount = atomic.LoadInt32(&db.cWriteDelayN) 1037 s.WriteDelayDuration = time.Duration(atomic.LoadInt64(&db.cWriteDelay)) 1038 s.WritePaused = atomic.LoadInt32(&db.inWritePaused) == 1 1039 1040 s.OpenedTablesCount = db.s.tops.cache.Size() 1041 if db.s.tops.bcache != nil { 1042 s.BlockCacheSize = db.s.tops.bcache.Size() 1043 } else { 1044 s.BlockCacheSize = 0 1045 } 1046 1047 s.AliveIterators = atomic.LoadInt32(&db.aliveIters) 1048 s.AliveSnapshots = atomic.LoadInt32(&db.aliveSnaps) 1049 1050 s.LevelDurations = s.LevelDurations[:0] 1051 s.LevelRead = s.LevelRead[:0] 1052 s.LevelWrite = s.LevelWrite[:0] 1053 s.LevelSizes = s.LevelSizes[:0] 1054 s.LevelTablesCounts = s.LevelTablesCounts[:0] 1055 1056 v := db.s.version() 1057 defer v.release() 1058 1059 for level, tables := range v.levels { 1060 duration, read, write := db.compStats.getStat(level) 1061 if len(tables) == 0 && duration == 0 { 1062 continue 1063 } 1064 s.LevelDurations = append(s.LevelDurations, duration) 1065 s.LevelRead = append(s.LevelRead, read) 1066 s.LevelWrite = append(s.LevelWrite, write) 1067 s.LevelSizes = append(s.LevelSizes, tables.size()) 1068 s.LevelTablesCounts = append(s.LevelTablesCounts, len(tables)) 1069 } 1070 1071 return nil 1072} 1073 1074// SizeOf calculates approximate sizes of the given key ranges. 1075// The length of the returned sizes are equal with the length of the given 1076// ranges. The returned sizes measure storage space usage, so if the user 1077// data compresses by a factor of ten, the returned sizes will be one-tenth 1078// the size of the corresponding user data size. 1079// The results may not include the sizes of recently written data. 1080func (db *DB) SizeOf(ranges []util.Range) (Sizes, error) { 1081 if err := db.ok(); err != nil { 1082 return nil, err 1083 } 1084 1085 v := db.s.version() 1086 defer v.release() 1087 1088 sizes := make(Sizes, 0, len(ranges)) 1089 for _, r := range ranges { 1090 imin := makeInternalKey(nil, r.Start, keyMaxSeq, keyTypeSeek) 1091 imax := makeInternalKey(nil, r.Limit, keyMaxSeq, keyTypeSeek) 1092 start, err := v.offsetOf(imin) 1093 if err != nil { 1094 return nil, err 1095 } 1096 limit, err := v.offsetOf(imax) 1097 if err != nil { 1098 return nil, err 1099 } 1100 var size int64 1101 if limit >= start { 1102 size = limit - start 1103 } 1104 sizes = append(sizes, size) 1105 } 1106 1107 return sizes, nil 1108} 1109 1110// Close closes the DB. This will also releases any outstanding snapshot, 1111// abort any in-flight compaction and discard open transaction. 1112// 1113// It is not safe to close a DB until all outstanding iterators are released. 1114// It is valid to call Close multiple times. Other methods should not be 1115// called after the DB has been closed. 1116func (db *DB) Close() error { 1117 if !db.setClosed() { 1118 return ErrClosed 1119 } 1120 1121 start := time.Now() 1122 db.log("db@close closing") 1123 1124 // Clear the finalizer. 1125 runtime.SetFinalizer(db, nil) 1126 1127 // Get compaction error. 1128 var err error 1129 select { 1130 case err = <-db.compErrC: 1131 if err == ErrReadOnly { 1132 err = nil 1133 } 1134 default: 1135 } 1136 1137 // Signal all goroutines. 1138 close(db.closeC) 1139 1140 // Discard open transaction. 1141 if db.tr != nil { 1142 db.tr.Discard() 1143 } 1144 1145 // Acquire writer lock. 1146 db.writeLockC <- struct{}{} 1147 1148 // Wait for all gorotines to exit. 1149 db.closeW.Wait() 1150 1151 // Closes journal. 1152 if db.journal != nil { 1153 db.journal.Close() 1154 db.journalWriter.Close() 1155 db.journal = nil 1156 db.journalWriter = nil 1157 } 1158 1159 if db.writeDelayN > 0 { 1160 db.logf("db@write was delayed N·%d T·%v", db.writeDelayN, db.writeDelay) 1161 } 1162 1163 // Close session. 1164 db.s.close() 1165 db.logf("db@close done T·%v", time.Since(start)) 1166 db.s.release() 1167 1168 if db.closer != nil { 1169 if err1 := db.closer.Close(); err == nil { 1170 err = err1 1171 } 1172 db.closer = nil 1173 } 1174 1175 // Clear memdbs. 1176 db.clearMems() 1177 1178 return err 1179} 1180