1// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com> 2// All rights reserved. 3// 4// Use of this source code is governed by a BSD-style license that can be 5// found in the LICENSE file. 6 7package leveldb 8 9import ( 10 "container/list" 11 "fmt" 12 "io" 13 "os" 14 "runtime" 15 "strings" 16 "sync" 17 "sync/atomic" 18 "time" 19 20 "github.com/syndtr/goleveldb/leveldb/errors" 21 "github.com/syndtr/goleveldb/leveldb/iterator" 22 "github.com/syndtr/goleveldb/leveldb/journal" 23 "github.com/syndtr/goleveldb/leveldb/memdb" 24 "github.com/syndtr/goleveldb/leveldb/opt" 25 "github.com/syndtr/goleveldb/leveldb/storage" 26 "github.com/syndtr/goleveldb/leveldb/table" 27 "github.com/syndtr/goleveldb/leveldb/util" 28) 29 30// DB is a LevelDB database. 31type DB struct { 32 // Need 64-bit alignment. 33 seq uint64 34 35 // Stats. Need 64-bit alignment. 36 cWriteDelay int64 // The cumulative duration of write delays 37 cWriteDelayN int32 // The cumulative number of write delays 38 aliveSnaps, aliveIters int32 39 40 // Session. 41 s *session 42 43 // MemDB. 44 memMu sync.RWMutex 45 memPool chan *memdb.DB 46 mem, frozenMem *memDB 47 journal *journal.Writer 48 journalWriter storage.Writer 49 journalFd storage.FileDesc 50 frozenJournalFd storage.FileDesc 51 frozenSeq uint64 52 53 // Snapshot. 54 snapsMu sync.Mutex 55 snapsList *list.List 56 57 // Write. 58 batchPool sync.Pool 59 writeMergeC chan writeMerge 60 writeMergedC chan bool 61 writeLockC chan struct{} 62 writeAckC chan error 63 writeDelay time.Duration 64 writeDelayN int 65 tr *Transaction 66 67 // Compaction. 68 compCommitLk sync.Mutex 69 tcompCmdC chan cCmd 70 tcompPauseC chan chan<- struct{} 71 mcompCmdC chan cCmd 72 compErrC chan error 73 compPerErrC chan error 74 compErrSetC chan error 75 compWriteLocking bool 76 compStats cStats 77 memdbMaxLevel int // For testing. 78 79 // Close. 80 closeW sync.WaitGroup 81 closeC chan struct{} 82 closed uint32 83 closer io.Closer 84} 85 86func openDB(s *session) (*DB, error) { 87 s.log("db@open opening") 88 start := time.Now() 89 db := &DB{ 90 s: s, 91 // Initial sequence 92 seq: s.stSeqNum, 93 // MemDB 94 memPool: make(chan *memdb.DB, 1), 95 // Snapshot 96 snapsList: list.New(), 97 // Write 98 batchPool: sync.Pool{New: newBatch}, 99 writeMergeC: make(chan writeMerge), 100 writeMergedC: make(chan bool), 101 writeLockC: make(chan struct{}, 1), 102 writeAckC: make(chan error), 103 // Compaction 104 tcompCmdC: make(chan cCmd), 105 tcompPauseC: make(chan chan<- struct{}), 106 mcompCmdC: make(chan cCmd), 107 compErrC: make(chan error), 108 compPerErrC: make(chan error), 109 compErrSetC: make(chan error), 110 // Close 111 closeC: make(chan struct{}), 112 } 113 114 // Read-only mode. 115 readOnly := s.o.GetReadOnly() 116 117 if readOnly { 118 // Recover journals (read-only mode). 119 if err := db.recoverJournalRO(); err != nil { 120 return nil, err 121 } 122 } else { 123 // Recover journals. 124 if err := db.recoverJournal(); err != nil { 125 return nil, err 126 } 127 128 // Remove any obsolete files. 129 if err := db.checkAndCleanFiles(); err != nil { 130 // Close journal. 131 if db.journal != nil { 132 db.journal.Close() 133 db.journalWriter.Close() 134 } 135 return nil, err 136 } 137 138 } 139 140 // Doesn't need to be included in the wait group. 141 go db.compactionError() 142 go db.mpoolDrain() 143 144 if readOnly { 145 db.SetReadOnly() 146 } else { 147 db.closeW.Add(2) 148 go db.tCompaction() 149 go db.mCompaction() 150 // go db.jWriter() 151 } 152 153 s.logf("db@open done T·%v", time.Since(start)) 154 155 runtime.SetFinalizer(db, (*DB).Close) 156 return db, nil 157} 158 159// Open opens or creates a DB for the given storage. 160// The DB will be created if not exist, unless ErrorIfMissing is true. 161// Also, if ErrorIfExist is true and the DB exist Open will returns 162// os.ErrExist error. 163// 164// Open will return an error with type of ErrCorrupted if corruption 165// detected in the DB. Use errors.IsCorrupted to test whether an error is 166// due to corruption. Corrupted DB can be recovered with Recover function. 167// 168// The returned DB instance is safe for concurrent use. 169// The DB must be closed after use, by calling Close method. 170func Open(stor storage.Storage, o *opt.Options) (db *DB, err error) { 171 s, err := newSession(stor, o) 172 if err != nil { 173 return 174 } 175 defer func() { 176 if err != nil { 177 s.close() 178 s.release() 179 } 180 }() 181 182 err = s.recover() 183 if err != nil { 184 if !os.IsNotExist(err) || s.o.GetErrorIfMissing() { 185 return 186 } 187 err = s.create() 188 if err != nil { 189 return 190 } 191 } else if s.o.GetErrorIfExist() { 192 err = os.ErrExist 193 return 194 } 195 196 return openDB(s) 197} 198 199// OpenFile opens or creates a DB for the given path. 200// The DB will be created if not exist, unless ErrorIfMissing is true. 201// Also, if ErrorIfExist is true and the DB exist OpenFile will returns 202// os.ErrExist error. 203// 204// OpenFile uses standard file-system backed storage implementation as 205// described in the leveldb/storage package. 206// 207// OpenFile will return an error with type of ErrCorrupted if corruption 208// detected in the DB. Use errors.IsCorrupted to test whether an error is 209// due to corruption. Corrupted DB can be recovered with Recover function. 210// 211// The returned DB instance is safe for concurrent use. 212// The DB must be closed after use, by calling Close method. 213func OpenFile(path string, o *opt.Options) (db *DB, err error) { 214 stor, err := storage.OpenFile(path, o.GetReadOnly()) 215 if err != nil { 216 return 217 } 218 db, err = Open(stor, o) 219 if err != nil { 220 stor.Close() 221 } else { 222 db.closer = stor 223 } 224 return 225} 226 227// Recover recovers and opens a DB with missing or corrupted manifest files 228// for the given storage. It will ignore any manifest files, valid or not. 229// The DB must already exist or it will returns an error. 230// Also, Recover will ignore ErrorIfMissing and ErrorIfExist options. 231// 232// The returned DB instance is safe for concurrent use. 233// The DB must be closed after use, by calling Close method. 234func Recover(stor storage.Storage, o *opt.Options) (db *DB, err error) { 235 s, err := newSession(stor, o) 236 if err != nil { 237 return 238 } 239 defer func() { 240 if err != nil { 241 s.close() 242 s.release() 243 } 244 }() 245 246 err = recoverTable(s, o) 247 if err != nil { 248 return 249 } 250 return openDB(s) 251} 252 253// RecoverFile recovers and opens a DB with missing or corrupted manifest files 254// for the given path. It will ignore any manifest files, valid or not. 255// The DB must already exist or it will returns an error. 256// Also, Recover will ignore ErrorIfMissing and ErrorIfExist options. 257// 258// RecoverFile uses standard file-system backed storage implementation as described 259// in the leveldb/storage package. 260// 261// The returned DB instance is safe for concurrent use. 262// The DB must be closed after use, by calling Close method. 263func RecoverFile(path string, o *opt.Options) (db *DB, err error) { 264 stor, err := storage.OpenFile(path, false) 265 if err != nil { 266 return 267 } 268 db, err = Recover(stor, o) 269 if err != nil { 270 stor.Close() 271 } else { 272 db.closer = stor 273 } 274 return 275} 276 277func recoverTable(s *session, o *opt.Options) error { 278 o = dupOptions(o) 279 // Mask StrictReader, lets StrictRecovery doing its job. 280 o.Strict &= ^opt.StrictReader 281 282 // Get all tables and sort it by file number. 283 fds, err := s.stor.List(storage.TypeTable) 284 if err != nil { 285 return err 286 } 287 sortFds(fds) 288 289 var ( 290 maxSeq uint64 291 recoveredKey, goodKey, corruptedKey, corruptedBlock, droppedTable int 292 293 // We will drop corrupted table. 294 strict = o.GetStrict(opt.StrictRecovery) 295 noSync = o.GetNoSync() 296 297 rec = &sessionRecord{} 298 bpool = util.NewBufferPool(o.GetBlockSize() + 5) 299 ) 300 buildTable := func(iter iterator.Iterator) (tmpFd storage.FileDesc, size int64, err error) { 301 tmpFd = s.newTemp() 302 writer, err := s.stor.Create(tmpFd) 303 if err != nil { 304 return 305 } 306 defer func() { 307 writer.Close() 308 if err != nil { 309 s.stor.Remove(tmpFd) 310 tmpFd = storage.FileDesc{} 311 } 312 }() 313 314 // Copy entries. 315 tw := table.NewWriter(writer, o) 316 for iter.Next() { 317 key := iter.Key() 318 if validInternalKey(key) { 319 err = tw.Append(key, iter.Value()) 320 if err != nil { 321 return 322 } 323 } 324 } 325 err = iter.Error() 326 if err != nil && !errors.IsCorrupted(err) { 327 return 328 } 329 err = tw.Close() 330 if err != nil { 331 return 332 } 333 if !noSync { 334 err = writer.Sync() 335 if err != nil { 336 return 337 } 338 } 339 size = int64(tw.BytesLen()) 340 return 341 } 342 recoverTable := func(fd storage.FileDesc) error { 343 s.logf("table@recovery recovering @%d", fd.Num) 344 reader, err := s.stor.Open(fd) 345 if err != nil { 346 return err 347 } 348 var closed bool 349 defer func() { 350 if !closed { 351 reader.Close() 352 } 353 }() 354 355 // Get file size. 356 size, err := reader.Seek(0, 2) 357 if err != nil { 358 return err 359 } 360 361 var ( 362 tSeq uint64 363 tgoodKey, tcorruptedKey, tcorruptedBlock int 364 imin, imax []byte 365 ) 366 tr, err := table.NewReader(reader, size, fd, nil, bpool, o) 367 if err != nil { 368 return err 369 } 370 iter := tr.NewIterator(nil, nil) 371 if itererr, ok := iter.(iterator.ErrorCallbackSetter); ok { 372 itererr.SetErrorCallback(func(err error) { 373 if errors.IsCorrupted(err) { 374 s.logf("table@recovery block corruption @%d %q", fd.Num, err) 375 tcorruptedBlock++ 376 } 377 }) 378 } 379 380 // Scan the table. 381 for iter.Next() { 382 key := iter.Key() 383 _, seq, _, kerr := parseInternalKey(key) 384 if kerr != nil { 385 tcorruptedKey++ 386 continue 387 } 388 tgoodKey++ 389 if seq > tSeq { 390 tSeq = seq 391 } 392 if imin == nil { 393 imin = append([]byte{}, key...) 394 } 395 imax = append(imax[:0], key...) 396 } 397 if err := iter.Error(); err != nil && !errors.IsCorrupted(err) { 398 iter.Release() 399 return err 400 } 401 iter.Release() 402 403 goodKey += tgoodKey 404 corruptedKey += tcorruptedKey 405 corruptedBlock += tcorruptedBlock 406 407 if strict && (tcorruptedKey > 0 || tcorruptedBlock > 0) { 408 droppedTable++ 409 s.logf("table@recovery dropped @%d Gk·%d Ck·%d Cb·%d S·%d Q·%d", fd.Num, tgoodKey, tcorruptedKey, tcorruptedBlock, size, tSeq) 410 return nil 411 } 412 413 if tgoodKey > 0 { 414 if tcorruptedKey > 0 || tcorruptedBlock > 0 { 415 // Rebuild the table. 416 s.logf("table@recovery rebuilding @%d", fd.Num) 417 iter := tr.NewIterator(nil, nil) 418 tmpFd, newSize, err := buildTable(iter) 419 iter.Release() 420 if err != nil { 421 return err 422 } 423 closed = true 424 reader.Close() 425 if err := s.stor.Rename(tmpFd, fd); err != nil { 426 return err 427 } 428 size = newSize 429 } 430 if tSeq > maxSeq { 431 maxSeq = tSeq 432 } 433 recoveredKey += tgoodKey 434 // Add table to level 0. 435 rec.addTable(0, fd.Num, size, imin, imax) 436 s.logf("table@recovery recovered @%d Gk·%d Ck·%d Cb·%d S·%d Q·%d", fd.Num, tgoodKey, tcorruptedKey, tcorruptedBlock, size, tSeq) 437 } else { 438 droppedTable++ 439 s.logf("table@recovery unrecoverable @%d Ck·%d Cb·%d S·%d", fd.Num, tcorruptedKey, tcorruptedBlock, size) 440 } 441 442 return nil 443 } 444 445 // Recover all tables. 446 if len(fds) > 0 { 447 s.logf("table@recovery F·%d", len(fds)) 448 449 // Mark file number as used. 450 s.markFileNum(fds[len(fds)-1].Num) 451 452 for _, fd := range fds { 453 if err := recoverTable(fd); err != nil { 454 return err 455 } 456 } 457 458 s.logf("table@recovery recovered F·%d N·%d Gk·%d Ck·%d Q·%d", len(fds), recoveredKey, goodKey, corruptedKey, maxSeq) 459 } 460 461 // Set sequence number. 462 rec.setSeqNum(maxSeq) 463 464 // Create new manifest. 465 if err := s.create(); err != nil { 466 return err 467 } 468 469 // Commit. 470 return s.commit(rec) 471} 472 473func (db *DB) recoverJournal() error { 474 // Get all journals and sort it by file number. 475 rawFds, err := db.s.stor.List(storage.TypeJournal) 476 if err != nil { 477 return err 478 } 479 sortFds(rawFds) 480 481 // Journals that will be recovered. 482 var fds []storage.FileDesc 483 for _, fd := range rawFds { 484 if fd.Num >= db.s.stJournalNum || fd.Num == db.s.stPrevJournalNum { 485 fds = append(fds, fd) 486 } 487 } 488 489 var ( 490 ofd storage.FileDesc // Obsolete file. 491 rec = &sessionRecord{} 492 ) 493 494 // Recover journals. 495 if len(fds) > 0 { 496 db.logf("journal@recovery F·%d", len(fds)) 497 498 // Mark file number as used. 499 db.s.markFileNum(fds[len(fds)-1].Num) 500 501 var ( 502 // Options. 503 strict = db.s.o.GetStrict(opt.StrictJournal) 504 checksum = db.s.o.GetStrict(opt.StrictJournalChecksum) 505 writeBuffer = db.s.o.GetWriteBuffer() 506 507 jr *journal.Reader 508 mdb = memdb.New(db.s.icmp, writeBuffer) 509 buf = &util.Buffer{} 510 batchSeq uint64 511 batchLen int 512 ) 513 514 for _, fd := range fds { 515 db.logf("journal@recovery recovering @%d", fd.Num) 516 517 fr, err := db.s.stor.Open(fd) 518 if err != nil { 519 return err 520 } 521 522 // Create or reset journal reader instance. 523 if jr == nil { 524 jr = journal.NewReader(fr, dropper{db.s, fd}, strict, checksum) 525 } else { 526 jr.Reset(fr, dropper{db.s, fd}, strict, checksum) 527 } 528 529 // Flush memdb and remove obsolete journal file. 530 if !ofd.Zero() { 531 if mdb.Len() > 0 { 532 if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil { 533 fr.Close() 534 return err 535 } 536 } 537 538 rec.setJournalNum(fd.Num) 539 rec.setSeqNum(db.seq) 540 if err := db.s.commit(rec); err != nil { 541 fr.Close() 542 return err 543 } 544 rec.resetAddedTables() 545 546 db.s.stor.Remove(ofd) 547 ofd = storage.FileDesc{} 548 } 549 550 // Replay journal to memdb. 551 mdb.Reset() 552 for { 553 r, err := jr.Next() 554 if err != nil { 555 if err == io.EOF { 556 break 557 } 558 559 fr.Close() 560 return errors.SetFd(err, fd) 561 } 562 563 buf.Reset() 564 if _, err := buf.ReadFrom(r); err != nil { 565 if err == io.ErrUnexpectedEOF { 566 // This is error returned due to corruption, with strict == false. 567 continue 568 } 569 570 fr.Close() 571 return errors.SetFd(err, fd) 572 } 573 batchSeq, batchLen, err = decodeBatchToMem(buf.Bytes(), db.seq, mdb) 574 if err != nil { 575 if !strict && errors.IsCorrupted(err) { 576 db.s.logf("journal error: %v (skipped)", err) 577 // We won't apply sequence number as it might be corrupted. 578 continue 579 } 580 581 fr.Close() 582 return errors.SetFd(err, fd) 583 } 584 585 // Save sequence number. 586 db.seq = batchSeq + uint64(batchLen) 587 588 // Flush it if large enough. 589 if mdb.Size() >= writeBuffer { 590 if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil { 591 fr.Close() 592 return err 593 } 594 595 mdb.Reset() 596 } 597 } 598 599 fr.Close() 600 ofd = fd 601 } 602 603 // Flush the last memdb. 604 if mdb.Len() > 0 { 605 if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil { 606 return err 607 } 608 } 609 } 610 611 // Create a new journal. 612 if _, err := db.newMem(0); err != nil { 613 return err 614 } 615 616 // Commit. 617 rec.setJournalNum(db.journalFd.Num) 618 rec.setSeqNum(db.seq) 619 if err := db.s.commit(rec); err != nil { 620 // Close journal on error. 621 if db.journal != nil { 622 db.journal.Close() 623 db.journalWriter.Close() 624 } 625 return err 626 } 627 628 // Remove the last obsolete journal file. 629 if !ofd.Zero() { 630 db.s.stor.Remove(ofd) 631 } 632 633 return nil 634} 635 636func (db *DB) recoverJournalRO() error { 637 // Get all journals and sort it by file number. 638 rawFds, err := db.s.stor.List(storage.TypeJournal) 639 if err != nil { 640 return err 641 } 642 sortFds(rawFds) 643 644 // Journals that will be recovered. 645 var fds []storage.FileDesc 646 for _, fd := range rawFds { 647 if fd.Num >= db.s.stJournalNum || fd.Num == db.s.stPrevJournalNum { 648 fds = append(fds, fd) 649 } 650 } 651 652 var ( 653 // Options. 654 strict = db.s.o.GetStrict(opt.StrictJournal) 655 checksum = db.s.o.GetStrict(opt.StrictJournalChecksum) 656 writeBuffer = db.s.o.GetWriteBuffer() 657 658 mdb = memdb.New(db.s.icmp, writeBuffer) 659 ) 660 661 // Recover journals. 662 if len(fds) > 0 { 663 db.logf("journal@recovery RO·Mode F·%d", len(fds)) 664 665 var ( 666 jr *journal.Reader 667 buf = &util.Buffer{} 668 batchSeq uint64 669 batchLen int 670 ) 671 672 for _, fd := range fds { 673 db.logf("journal@recovery recovering @%d", fd.Num) 674 675 fr, err := db.s.stor.Open(fd) 676 if err != nil { 677 return err 678 } 679 680 // Create or reset journal reader instance. 681 if jr == nil { 682 jr = journal.NewReader(fr, dropper{db.s, fd}, strict, checksum) 683 } else { 684 jr.Reset(fr, dropper{db.s, fd}, strict, checksum) 685 } 686 687 // Replay journal to memdb. 688 for { 689 r, err := jr.Next() 690 if err != nil { 691 if err == io.EOF { 692 break 693 } 694 695 fr.Close() 696 return errors.SetFd(err, fd) 697 } 698 699 buf.Reset() 700 if _, err := buf.ReadFrom(r); err != nil { 701 if err == io.ErrUnexpectedEOF { 702 // This is error returned due to corruption, with strict == false. 703 continue 704 } 705 706 fr.Close() 707 return errors.SetFd(err, fd) 708 } 709 batchSeq, batchLen, err = decodeBatchToMem(buf.Bytes(), db.seq, mdb) 710 if err != nil { 711 if !strict && errors.IsCorrupted(err) { 712 db.s.logf("journal error: %v (skipped)", err) 713 // We won't apply sequence number as it might be corrupted. 714 continue 715 } 716 717 fr.Close() 718 return errors.SetFd(err, fd) 719 } 720 721 // Save sequence number. 722 db.seq = batchSeq + uint64(batchLen) 723 } 724 725 fr.Close() 726 } 727 } 728 729 // Set memDB. 730 db.mem = &memDB{db: db, DB: mdb, ref: 1} 731 732 return nil 733} 734 735func memGet(mdb *memdb.DB, ikey internalKey, icmp *iComparer) (ok bool, mv []byte, err error) { 736 mk, mv, err := mdb.Find(ikey) 737 if err == nil { 738 ukey, _, kt, kerr := parseInternalKey(mk) 739 if kerr != nil { 740 // Shouldn't have had happen. 741 panic(kerr) 742 } 743 if icmp.uCompare(ukey, ikey.ukey()) == 0 { 744 if kt == keyTypeDel { 745 return true, nil, ErrNotFound 746 } 747 return true, mv, nil 748 749 } 750 } else if err != ErrNotFound { 751 return true, nil, err 752 } 753 return 754} 755 756func (db *DB) get(auxm *memdb.DB, auxt tFiles, key []byte, seq uint64, ro *opt.ReadOptions) (value []byte, err error) { 757 ikey := makeInternalKey(nil, key, seq, keyTypeSeek) 758 759 if auxm != nil { 760 if ok, mv, me := memGet(auxm, ikey, db.s.icmp); ok { 761 return append([]byte{}, mv...), me 762 } 763 } 764 765 em, fm := db.getMems() 766 for _, m := range [...]*memDB{em, fm} { 767 if m == nil { 768 continue 769 } 770 defer m.decref() 771 772 if ok, mv, me := memGet(m.DB, ikey, db.s.icmp); ok { 773 return append([]byte{}, mv...), me 774 } 775 } 776 777 v := db.s.version() 778 value, cSched, err := v.get(auxt, ikey, ro, false) 779 v.release() 780 if cSched { 781 // Trigger table compaction. 782 db.compTrigger(db.tcompCmdC) 783 } 784 return 785} 786 787func nilIfNotFound(err error) error { 788 if err == ErrNotFound { 789 return nil 790 } 791 return err 792} 793 794func (db *DB) has(auxm *memdb.DB, auxt tFiles, key []byte, seq uint64, ro *opt.ReadOptions) (ret bool, err error) { 795 ikey := makeInternalKey(nil, key, seq, keyTypeSeek) 796 797 if auxm != nil { 798 if ok, _, me := memGet(auxm, ikey, db.s.icmp); ok { 799 return me == nil, nilIfNotFound(me) 800 } 801 } 802 803 em, fm := db.getMems() 804 for _, m := range [...]*memDB{em, fm} { 805 if m == nil { 806 continue 807 } 808 defer m.decref() 809 810 if ok, _, me := memGet(m.DB, ikey, db.s.icmp); ok { 811 return me == nil, nilIfNotFound(me) 812 } 813 } 814 815 v := db.s.version() 816 _, cSched, err := v.get(auxt, ikey, ro, true) 817 v.release() 818 if cSched { 819 // Trigger table compaction. 820 db.compTrigger(db.tcompCmdC) 821 } 822 if err == nil { 823 ret = true 824 } else if err == ErrNotFound { 825 err = nil 826 } 827 return 828} 829 830// Get gets the value for the given key. It returns ErrNotFound if the 831// DB does not contains the key. 832// 833// The returned slice is its own copy, it is safe to modify the contents 834// of the returned slice. 835// It is safe to modify the contents of the argument after Get returns. 836func (db *DB) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) { 837 err = db.ok() 838 if err != nil { 839 return 840 } 841 842 se := db.acquireSnapshot() 843 defer db.releaseSnapshot(se) 844 return db.get(nil, nil, key, se.seq, ro) 845} 846 847// Has returns true if the DB does contains the given key. 848// 849// It is safe to modify the contents of the argument after Has returns. 850func (db *DB) Has(key []byte, ro *opt.ReadOptions) (ret bool, err error) { 851 err = db.ok() 852 if err != nil { 853 return 854 } 855 856 se := db.acquireSnapshot() 857 defer db.releaseSnapshot(se) 858 return db.has(nil, nil, key, se.seq, ro) 859} 860 861// NewIterator returns an iterator for the latest snapshot of the 862// underlying DB. 863// The returned iterator is not safe for concurrent use, but it is safe to use 864// multiple iterators concurrently, with each in a dedicated goroutine. 865// It is also safe to use an iterator concurrently with modifying its 866// underlying DB. The resultant key/value pairs are guaranteed to be 867// consistent. 868// 869// Slice allows slicing the iterator to only contains keys in the given 870// range. A nil Range.Start is treated as a key before all keys in the 871// DB. And a nil Range.Limit is treated as a key after all keys in 872// the DB. 873// 874// The iterator must be released after use, by calling Release method. 875// 876// Also read Iterator documentation of the leveldb/iterator package. 877func (db *DB) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator { 878 if err := db.ok(); err != nil { 879 return iterator.NewEmptyIterator(err) 880 } 881 882 se := db.acquireSnapshot() 883 defer db.releaseSnapshot(se) 884 // Iterator holds 'version' lock, 'version' is immutable so snapshot 885 // can be released after iterator created. 886 return db.newIterator(nil, nil, se.seq, slice, ro) 887} 888 889// GetSnapshot returns a latest snapshot of the underlying DB. A snapshot 890// is a frozen snapshot of a DB state at a particular point in time. The 891// content of snapshot are guaranteed to be consistent. 892// 893// The snapshot must be released after use, by calling Release method. 894func (db *DB) GetSnapshot() (*Snapshot, error) { 895 if err := db.ok(); err != nil { 896 return nil, err 897 } 898 899 return db.newSnapshot(), nil 900} 901 902// GetProperty returns value of the given property name. 903// 904// Property names: 905// leveldb.num-files-at-level{n} 906// Returns the number of files at level 'n'. 907// leveldb.stats 908// Returns statistics of the underlying DB. 909// leveldb.iostats 910// Returns statistics of effective disk read and write. 911// leveldb.writedelay 912// Returns cumulative write delay caused by compaction. 913// leveldb.sstables 914// Returns sstables list for each level. 915// leveldb.blockpool 916// Returns block pool stats. 917// leveldb.cachedblock 918// Returns size of cached block. 919// leveldb.openedtables 920// Returns number of opened tables. 921// leveldb.alivesnaps 922// Returns number of alive snapshots. 923// leveldb.aliveiters 924// Returns number of alive iterators. 925func (db *DB) GetProperty(name string) (value string, err error) { 926 err = db.ok() 927 if err != nil { 928 return 929 } 930 931 const prefix = "leveldb." 932 if !strings.HasPrefix(name, prefix) { 933 return "", ErrNotFound 934 } 935 p := name[len(prefix):] 936 937 v := db.s.version() 938 defer v.release() 939 940 numFilesPrefix := "num-files-at-level" 941 switch { 942 case strings.HasPrefix(p, numFilesPrefix): 943 var level uint 944 var rest string 945 n, _ := fmt.Sscanf(p[len(numFilesPrefix):], "%d%s", &level, &rest) 946 if n != 1 { 947 err = ErrNotFound 948 } else { 949 value = fmt.Sprint(v.tLen(int(level))) 950 } 951 case p == "stats": 952 value = "Compactions\n" + 953 " Level | Tables | Size(MB) | Time(sec) | Read(MB) | Write(MB)\n" + 954 "-------+------------+---------------+---------------+---------------+---------------\n" 955 for level, tables := range v.levels { 956 duration, read, write := db.compStats.getStat(level) 957 if len(tables) == 0 && duration == 0 { 958 continue 959 } 960 value += fmt.Sprintf(" %3d | %10d | %13.5f | %13.5f | %13.5f | %13.5f\n", 961 level, len(tables), float64(tables.size())/1048576.0, duration.Seconds(), 962 float64(read)/1048576.0, float64(write)/1048576.0) 963 } 964 case p == "iostats": 965 value = fmt.Sprintf("Read(MB):%.5f Write(MB):%.5f", 966 float64(db.s.stor.reads())/1048576.0, 967 float64(db.s.stor.writes())/1048576.0) 968 case p == "writedelay": 969 writeDelayN, writeDelay := atomic.LoadInt32(&db.cWriteDelayN), time.Duration(atomic.LoadInt64(&db.cWriteDelay)) 970 value = fmt.Sprintf("DelayN:%d Delay:%s", writeDelayN, writeDelay) 971 case p == "sstables": 972 for level, tables := range v.levels { 973 value += fmt.Sprintf("--- level %d ---\n", level) 974 for _, t := range tables { 975 value += fmt.Sprintf("%d:%d[%q .. %q]\n", t.fd.Num, t.size, t.imin, t.imax) 976 } 977 } 978 case p == "blockpool": 979 value = fmt.Sprintf("%v", db.s.tops.bpool) 980 case p == "cachedblock": 981 if db.s.tops.bcache != nil { 982 value = fmt.Sprintf("%d", db.s.tops.bcache.Size()) 983 } else { 984 value = "<nil>" 985 } 986 case p == "openedtables": 987 value = fmt.Sprintf("%d", db.s.tops.cache.Size()) 988 case p == "alivesnaps": 989 value = fmt.Sprintf("%d", atomic.LoadInt32(&db.aliveSnaps)) 990 case p == "aliveiters": 991 value = fmt.Sprintf("%d", atomic.LoadInt32(&db.aliveIters)) 992 default: 993 err = ErrNotFound 994 } 995 996 return 997} 998 999// SizeOf calculates approximate sizes of the given key ranges. 1000// The length of the returned sizes are equal with the length of the given 1001// ranges. The returned sizes measure storage space usage, so if the user 1002// data compresses by a factor of ten, the returned sizes will be one-tenth 1003// the size of the corresponding user data size. 1004// The results may not include the sizes of recently written data. 1005func (db *DB) SizeOf(ranges []util.Range) (Sizes, error) { 1006 if err := db.ok(); err != nil { 1007 return nil, err 1008 } 1009 1010 v := db.s.version() 1011 defer v.release() 1012 1013 sizes := make(Sizes, 0, len(ranges)) 1014 for _, r := range ranges { 1015 imin := makeInternalKey(nil, r.Start, keyMaxSeq, keyTypeSeek) 1016 imax := makeInternalKey(nil, r.Limit, keyMaxSeq, keyTypeSeek) 1017 start, err := v.offsetOf(imin) 1018 if err != nil { 1019 return nil, err 1020 } 1021 limit, err := v.offsetOf(imax) 1022 if err != nil { 1023 return nil, err 1024 } 1025 var size int64 1026 if limit >= start { 1027 size = limit - start 1028 } 1029 sizes = append(sizes, size) 1030 } 1031 1032 return sizes, nil 1033} 1034 1035// Close closes the DB. This will also releases any outstanding snapshot, 1036// abort any in-flight compaction and discard open transaction. 1037// 1038// It is not safe to close a DB until all outstanding iterators are released. 1039// It is valid to call Close multiple times. Other methods should not be 1040// called after the DB has been closed. 1041func (db *DB) Close() error { 1042 if !db.setClosed() { 1043 return ErrClosed 1044 } 1045 1046 start := time.Now() 1047 db.log("db@close closing") 1048 1049 // Clear the finalizer. 1050 runtime.SetFinalizer(db, nil) 1051 1052 // Get compaction error. 1053 var err error 1054 select { 1055 case err = <-db.compErrC: 1056 if err == ErrReadOnly { 1057 err = nil 1058 } 1059 default: 1060 } 1061 1062 // Signal all goroutines. 1063 close(db.closeC) 1064 1065 // Discard open transaction. 1066 if db.tr != nil { 1067 db.tr.Discard() 1068 } 1069 1070 // Acquire writer lock. 1071 db.writeLockC <- struct{}{} 1072 1073 // Wait for all gorotines to exit. 1074 db.closeW.Wait() 1075 1076 // Closes journal. 1077 if db.journal != nil { 1078 db.journal.Close() 1079 db.journalWriter.Close() 1080 db.journal = nil 1081 db.journalWriter = nil 1082 } 1083 1084 if db.writeDelayN > 0 { 1085 db.logf("db@write was delayed N·%d T·%v", db.writeDelayN, db.writeDelay) 1086 } 1087 1088 // Close session. 1089 db.s.close() 1090 db.logf("db@close done T·%v", time.Since(start)) 1091 db.s.release() 1092 1093 if db.closer != nil { 1094 if err1 := db.closer.Close(); err == nil { 1095 err = err1 1096 } 1097 db.closer = nil 1098 } 1099 1100 // Clear memdbs. 1101 db.clearMems() 1102 1103 return err 1104} 1105