1// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com> 2// All rights reserved. 3// 4// Use of this source code is governed by a BSD-style license that can be 5// found in the LICENSE file. 6 7package leveldb 8 9import ( 10 "container/list" 11 "fmt" 12 "io" 13 "os" 14 "runtime" 15 "strings" 16 "sync" 17 "sync/atomic" 18 "time" 19 20 "github.com/syndtr/goleveldb/leveldb/errors" 21 "github.com/syndtr/goleveldb/leveldb/iterator" 22 "github.com/syndtr/goleveldb/leveldb/journal" 23 "github.com/syndtr/goleveldb/leveldb/memdb" 24 "github.com/syndtr/goleveldb/leveldb/opt" 25 "github.com/syndtr/goleveldb/leveldb/storage" 26 "github.com/syndtr/goleveldb/leveldb/table" 27 "github.com/syndtr/goleveldb/leveldb/util" 28) 29 30// DB is a LevelDB database. 31type DB struct { 32 // Need 64-bit alignment. 33 seq uint64 34 35 // Stats. Need 64-bit alignment. 36 cWriteDelay int64 // The cumulative duration of write delays 37 cWriteDelayN int32 // The cumulative number of write delays 38 inWritePaused int32 // The indicator whether write operation is paused by compaction 39 aliveSnaps, aliveIters int32 40 41 // Compaction statistic 42 memComp uint32 // The cumulative number of memory compaction 43 level0Comp uint32 // The cumulative number of level0 compaction 44 nonLevel0Comp uint32 // The cumulative number of non-level0 compaction 45 seekComp uint32 // The cumulative number of seek compaction 46 47 // Session. 48 s *session 49 50 // MemDB. 51 memMu sync.RWMutex 52 memPool chan *memdb.DB 53 mem, frozenMem *memDB 54 journal *journal.Writer 55 journalWriter storage.Writer 56 journalFd storage.FileDesc 57 frozenJournalFd storage.FileDesc 58 frozenSeq uint64 59 60 // Snapshot. 61 snapsMu sync.Mutex 62 snapsList *list.List 63 64 // Write. 65 batchPool sync.Pool 66 writeMergeC chan writeMerge 67 writeMergedC chan bool 68 writeLockC chan struct{} 69 writeAckC chan error 70 writeDelay time.Duration 71 writeDelayN int 72 tr *Transaction 73 74 // Compaction. 75 compCommitLk sync.Mutex 76 tcompCmdC chan cCmd 77 tcompPauseC chan chan<- struct{} 78 mcompCmdC chan cCmd 79 compErrC chan error 80 compPerErrC chan error 81 compErrSetC chan error 82 compWriteLocking bool 83 compStats cStats 84 memdbMaxLevel int // For testing. 85 compActiveLk sync.RWMutex 86 memCompActive bool 87 tableCompActive bool 88 89 // Close. 90 closeW sync.WaitGroup 91 closeC chan struct{} 92 closed uint32 93 closer io.Closer 94} 95 96func openDB(s *session) (*DB, error) { 97 s.log("db@open opening") 98 start := time.Now() 99 db := &DB{ 100 s: s, 101 // Initial sequence 102 seq: s.stSeqNum, 103 // MemDB 104 memPool: make(chan *memdb.DB, 1), 105 // Snapshot 106 snapsList: list.New(), 107 // Write 108 batchPool: sync.Pool{New: newBatch}, 109 writeMergeC: make(chan writeMerge), 110 writeMergedC: make(chan bool), 111 writeLockC: make(chan struct{}, 1), 112 writeAckC: make(chan error), 113 // Compaction 114 tcompCmdC: make(chan cCmd), 115 tcompPauseC: make(chan chan<- struct{}), 116 mcompCmdC: make(chan cCmd), 117 compErrC: make(chan error), 118 compPerErrC: make(chan error), 119 compErrSetC: make(chan error), 120 // Close 121 closeC: make(chan struct{}), 122 } 123 124 // Read-only mode. 125 readOnly := s.o.GetReadOnly() 126 127 if readOnly { 128 // Recover journals (read-only mode). 129 if err := db.recoverJournalRO(); err != nil { 130 return nil, err 131 } 132 } else { 133 // Recover journals. 134 if err := db.recoverJournal(); err != nil { 135 return nil, err 136 } 137 138 // Remove any obsolete files. 139 if err := db.checkAndCleanFiles(); err != nil { 140 // Close journal. 141 if db.journal != nil { 142 db.journal.Close() 143 db.journalWriter.Close() 144 } 145 return nil, err 146 } 147 148 } 149 150 // Doesn't need to be included in the wait group. 151 go db.compactionError() 152 go db.mpoolDrain() 153 154 if readOnly { 155 db.SetReadOnly() 156 } else { 157 db.closeW.Add(2) 158 go db.tCompaction() 159 go db.mCompaction() 160 // go db.jWriter() 161 } 162 163 s.logf("db@open done T·%v", time.Since(start)) 164 165 runtime.SetFinalizer(db, (*DB).Close) 166 return db, nil 167} 168 169// Open opens or creates a DB for the given storage. 170// The DB will be created if not exist, unless ErrorIfMissing is true. 171// Also, if ErrorIfExist is true and the DB exist Open will returns 172// os.ErrExist error. 173// 174// Open will return an error with type of ErrCorrupted if corruption 175// detected in the DB. Use errors.IsCorrupted to test whether an error is 176// due to corruption. Corrupted DB can be recovered with Recover function. 177// 178// The returned DB instance is safe for concurrent use. 179// The DB must be closed after use, by calling Close method. 180func Open(stor storage.Storage, o *opt.Options) (db *DB, err error) { 181 s, err := newSession(stor, o) 182 if err != nil { 183 return 184 } 185 defer func() { 186 if err != nil { 187 s.close() 188 s.release() 189 } 190 }() 191 192 err = s.recover() 193 if err != nil { 194 if !os.IsNotExist(err) || s.o.GetErrorIfMissing() || s.o.GetReadOnly() { 195 return 196 } 197 err = s.create() 198 if err != nil { 199 return 200 } 201 } else if s.o.GetErrorIfExist() { 202 err = os.ErrExist 203 return 204 } 205 206 return openDB(s) 207} 208 209// OpenFile opens or creates a DB for the given path. 210// The DB will be created if not exist, unless ErrorIfMissing is true. 211// Also, if ErrorIfExist is true and the DB exist OpenFile will returns 212// os.ErrExist error. 213// 214// OpenFile uses standard file-system backed storage implementation as 215// described in the leveldb/storage package. 216// 217// OpenFile will return an error with type of ErrCorrupted if corruption 218// detected in the DB. Use errors.IsCorrupted to test whether an error is 219// due to corruption. Corrupted DB can be recovered with Recover function. 220// 221// The returned DB instance is safe for concurrent use. 222// The DB must be closed after use, by calling Close method. 223func OpenFile(path string, o *opt.Options) (db *DB, err error) { 224 stor, err := storage.OpenFile(path, o.GetReadOnly()) 225 if err != nil { 226 return 227 } 228 db, err = Open(stor, o) 229 if err != nil { 230 stor.Close() 231 } else { 232 db.closer = stor 233 } 234 return 235} 236 237// Recover recovers and opens a DB with missing or corrupted manifest files 238// for the given storage. It will ignore any manifest files, valid or not. 239// The DB must already exist or it will returns an error. 240// Also, Recover will ignore ErrorIfMissing and ErrorIfExist options. 241// 242// The returned DB instance is safe for concurrent use. 243// The DB must be closed after use, by calling Close method. 244func Recover(stor storage.Storage, o *opt.Options) (db *DB, err error) { 245 s, err := newSession(stor, o) 246 if err != nil { 247 return 248 } 249 defer func() { 250 if err != nil { 251 s.close() 252 s.release() 253 } 254 }() 255 256 err = recoverTable(s, o) 257 if err != nil { 258 return 259 } 260 return openDB(s) 261} 262 263// RecoverFile recovers and opens a DB with missing or corrupted manifest files 264// for the given path. It will ignore any manifest files, valid or not. 265// The DB must already exist or it will returns an error. 266// Also, Recover will ignore ErrorIfMissing and ErrorIfExist options. 267// 268// RecoverFile uses standard file-system backed storage implementation as described 269// in the leveldb/storage package. 270// 271// The returned DB instance is safe for concurrent use. 272// The DB must be closed after use, by calling Close method. 273func RecoverFile(path string, o *opt.Options) (db *DB, err error) { 274 stor, err := storage.OpenFile(path, false) 275 if err != nil { 276 return 277 } 278 db, err = Recover(stor, o) 279 if err != nil { 280 stor.Close() 281 } else { 282 db.closer = stor 283 } 284 return 285} 286 287func recoverTable(s *session, o *opt.Options) error { 288 o = dupOptions(o) 289 // Mask StrictReader, lets StrictRecovery doing its job. 290 o.Strict &= ^opt.StrictReader 291 292 // Get all tables and sort it by file number. 293 fds, err := s.stor.List(storage.TypeTable) 294 if err != nil { 295 return err 296 } 297 sortFds(fds) 298 299 var ( 300 maxSeq uint64 301 recoveredKey, goodKey, corruptedKey, corruptedBlock, droppedTable int 302 303 // We will drop corrupted table. 304 strict = o.GetStrict(opt.StrictRecovery) 305 noSync = o.GetNoSync() 306 307 rec = &sessionRecord{} 308 bpool = util.NewBufferPool(o.GetBlockSize() + 5) 309 ) 310 buildTable := func(iter iterator.Iterator) (tmpFd storage.FileDesc, size int64, err error) { 311 tmpFd = s.newTemp() 312 writer, err := s.stor.Create(tmpFd) 313 if err != nil { 314 return 315 } 316 defer func() { 317 writer.Close() 318 if err != nil { 319 s.stor.Remove(tmpFd) 320 tmpFd = storage.FileDesc{} 321 } 322 }() 323 324 // Copy entries. 325 tw := table.NewWriter(writer, o) 326 for iter.Next() { 327 key := iter.Key() 328 if validInternalKey(key) { 329 err = tw.Append(key, iter.Value()) 330 if err != nil { 331 return 332 } 333 } 334 } 335 err = iter.Error() 336 if err != nil && !errors.IsCorrupted(err) { 337 return 338 } 339 err = tw.Close() 340 if err != nil { 341 return 342 } 343 if !noSync { 344 err = writer.Sync() 345 if err != nil { 346 return 347 } 348 } 349 size = int64(tw.BytesLen()) 350 return 351 } 352 recoverTable := func(fd storage.FileDesc) error { 353 s.logf("table@recovery recovering @%d", fd.Num) 354 reader, err := s.stor.Open(fd) 355 if err != nil { 356 return err 357 } 358 var closed bool 359 defer func() { 360 if !closed { 361 reader.Close() 362 } 363 }() 364 365 // Get file size. 366 size, err := reader.Seek(0, 2) 367 if err != nil { 368 return err 369 } 370 371 var ( 372 tSeq uint64 373 tgoodKey, tcorruptedKey, tcorruptedBlock int 374 imin, imax []byte 375 ) 376 tr, err := table.NewReader(reader, size, fd, nil, bpool, o) 377 if err != nil { 378 return err 379 } 380 iter := tr.NewIterator(nil, nil) 381 if itererr, ok := iter.(iterator.ErrorCallbackSetter); ok { 382 itererr.SetErrorCallback(func(err error) { 383 if errors.IsCorrupted(err) { 384 s.logf("table@recovery block corruption @%d %q", fd.Num, err) 385 tcorruptedBlock++ 386 } 387 }) 388 } 389 390 // Scan the table. 391 for iter.Next() { 392 key := iter.Key() 393 _, seq, _, kerr := parseInternalKey(key) 394 if kerr != nil { 395 tcorruptedKey++ 396 continue 397 } 398 tgoodKey++ 399 if seq > tSeq { 400 tSeq = seq 401 } 402 if imin == nil { 403 imin = append([]byte{}, key...) 404 } 405 imax = append(imax[:0], key...) 406 } 407 if err := iter.Error(); err != nil && !errors.IsCorrupted(err) { 408 iter.Release() 409 return err 410 } 411 iter.Release() 412 413 goodKey += tgoodKey 414 corruptedKey += tcorruptedKey 415 corruptedBlock += tcorruptedBlock 416 417 if strict && (tcorruptedKey > 0 || tcorruptedBlock > 0) { 418 droppedTable++ 419 s.logf("table@recovery dropped @%d Gk·%d Ck·%d Cb·%d S·%d Q·%d", fd.Num, tgoodKey, tcorruptedKey, tcorruptedBlock, size, tSeq) 420 return nil 421 } 422 423 if tgoodKey > 0 { 424 if tcorruptedKey > 0 || tcorruptedBlock > 0 { 425 // Rebuild the table. 426 s.logf("table@recovery rebuilding @%d", fd.Num) 427 iter := tr.NewIterator(nil, nil) 428 tmpFd, newSize, err := buildTable(iter) 429 iter.Release() 430 if err != nil { 431 return err 432 } 433 closed = true 434 reader.Close() 435 if err := s.stor.Rename(tmpFd, fd); err != nil { 436 return err 437 } 438 size = newSize 439 } 440 if tSeq > maxSeq { 441 maxSeq = tSeq 442 } 443 recoveredKey += tgoodKey 444 // Add table to level 0. 445 rec.addTable(0, fd.Num, size, imin, imax) 446 s.logf("table@recovery recovered @%d Gk·%d Ck·%d Cb·%d S·%d Q·%d", fd.Num, tgoodKey, tcorruptedKey, tcorruptedBlock, size, tSeq) 447 } else { 448 droppedTable++ 449 s.logf("table@recovery unrecoverable @%d Ck·%d Cb·%d S·%d", fd.Num, tcorruptedKey, tcorruptedBlock, size) 450 } 451 452 return nil 453 } 454 455 // Recover all tables. 456 if len(fds) > 0 { 457 s.logf("table@recovery F·%d", len(fds)) 458 459 // Mark file number as used. 460 s.markFileNum(fds[len(fds)-1].Num) 461 462 for _, fd := range fds { 463 if err := recoverTable(fd); err != nil { 464 return err 465 } 466 } 467 468 s.logf("table@recovery recovered F·%d N·%d Gk·%d Ck·%d Q·%d", len(fds), recoveredKey, goodKey, corruptedKey, maxSeq) 469 } 470 471 // Set sequence number. 472 rec.setSeqNum(maxSeq) 473 474 // Create new manifest. 475 if err := s.create(); err != nil { 476 return err 477 } 478 479 // Commit. 480 return s.commit(rec, false) 481} 482 483func (db *DB) recoverJournal() error { 484 // Get all journals and sort it by file number. 485 rawFds, err := db.s.stor.List(storage.TypeJournal) 486 if err != nil { 487 return err 488 } 489 sortFds(rawFds) 490 491 // Journals that will be recovered. 492 var fds []storage.FileDesc 493 for _, fd := range rawFds { 494 if fd.Num >= db.s.stJournalNum || fd.Num == db.s.stPrevJournalNum { 495 fds = append(fds, fd) 496 } 497 } 498 499 var ( 500 ofd storage.FileDesc // Obsolete file. 501 rec = &sessionRecord{} 502 ) 503 504 // Recover journals. 505 if len(fds) > 0 { 506 db.logf("journal@recovery F·%d", len(fds)) 507 508 // Mark file number as used. 509 db.s.markFileNum(fds[len(fds)-1].Num) 510 511 var ( 512 // Options. 513 strict = db.s.o.GetStrict(opt.StrictJournal) 514 checksum = db.s.o.GetStrict(opt.StrictJournalChecksum) 515 writeBuffer = db.s.o.GetWriteBuffer() 516 517 jr *journal.Reader 518 mdb = memdb.New(db.s.icmp, writeBuffer) 519 buf = &util.Buffer{} 520 batchSeq uint64 521 batchLen int 522 ) 523 524 for _, fd := range fds { 525 db.logf("journal@recovery recovering @%d", fd.Num) 526 527 fr, err := db.s.stor.Open(fd) 528 if err != nil { 529 return err 530 } 531 532 // Create or reset journal reader instance. 533 if jr == nil { 534 jr = journal.NewReader(fr, dropper{db.s, fd}, strict, checksum) 535 } else { 536 jr.Reset(fr, dropper{db.s, fd}, strict, checksum) 537 } 538 539 // Flush memdb and remove obsolete journal file. 540 if !ofd.Zero() { 541 if mdb.Len() > 0 { 542 if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil { 543 fr.Close() 544 return err 545 } 546 } 547 548 rec.setJournalNum(fd.Num) 549 rec.setSeqNum(db.seq) 550 if err := db.s.commit(rec, false); err != nil { 551 fr.Close() 552 return err 553 } 554 rec.resetAddedTables() 555 556 db.s.stor.Remove(ofd) 557 ofd = storage.FileDesc{} 558 } 559 560 // Replay journal to memdb. 561 mdb.Reset() 562 for { 563 r, err := jr.Next() 564 if err != nil { 565 if err == io.EOF { 566 break 567 } 568 569 fr.Close() 570 return errors.SetFd(err, fd) 571 } 572 573 buf.Reset() 574 if _, err := buf.ReadFrom(r); err != nil { 575 if err == io.ErrUnexpectedEOF { 576 // This is error returned due to corruption, with strict == false. 577 continue 578 } 579 580 fr.Close() 581 return errors.SetFd(err, fd) 582 } 583 batchSeq, batchLen, err = decodeBatchToMem(buf.Bytes(), db.seq, mdb) 584 if err != nil { 585 if !strict && errors.IsCorrupted(err) { 586 db.s.logf("journal error: %v (skipped)", err) 587 // We won't apply sequence number as it might be corrupted. 588 continue 589 } 590 591 fr.Close() 592 return errors.SetFd(err, fd) 593 } 594 595 // Save sequence number. 596 db.seq = batchSeq + uint64(batchLen) 597 598 // Flush it if large enough. 599 if mdb.Size() >= writeBuffer { 600 if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil { 601 fr.Close() 602 return err 603 } 604 605 mdb.Reset() 606 } 607 } 608 609 fr.Close() 610 ofd = fd 611 } 612 613 // Flush the last memdb. 614 if mdb.Len() > 0 { 615 if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil { 616 return err 617 } 618 } 619 } 620 621 // Create a new journal. 622 if _, err := db.newMem(0); err != nil { 623 return err 624 } 625 626 // Commit. 627 rec.setJournalNum(db.journalFd.Num) 628 rec.setSeqNum(db.seq) 629 if err := db.s.commit(rec, false); err != nil { 630 // Close journal on error. 631 if db.journal != nil { 632 db.journal.Close() 633 db.journalWriter.Close() 634 } 635 return err 636 } 637 638 // Remove the last obsolete journal file. 639 if !ofd.Zero() { 640 db.s.stor.Remove(ofd) 641 } 642 643 return nil 644} 645 646func (db *DB) recoverJournalRO() error { 647 // Get all journals and sort it by file number. 648 rawFds, err := db.s.stor.List(storage.TypeJournal) 649 if err != nil { 650 return err 651 } 652 sortFds(rawFds) 653 654 // Journals that will be recovered. 655 var fds []storage.FileDesc 656 for _, fd := range rawFds { 657 if fd.Num >= db.s.stJournalNum || fd.Num == db.s.stPrevJournalNum { 658 fds = append(fds, fd) 659 } 660 } 661 662 var ( 663 // Options. 664 strict = db.s.o.GetStrict(opt.StrictJournal) 665 checksum = db.s.o.GetStrict(opt.StrictJournalChecksum) 666 writeBuffer = db.s.o.GetWriteBuffer() 667 668 mdb = memdb.New(db.s.icmp, writeBuffer) 669 ) 670 671 // Recover journals. 672 if len(fds) > 0 { 673 db.logf("journal@recovery RO·Mode F·%d", len(fds)) 674 675 var ( 676 jr *journal.Reader 677 buf = &util.Buffer{} 678 batchSeq uint64 679 batchLen int 680 ) 681 682 for _, fd := range fds { 683 db.logf("journal@recovery recovering @%d", fd.Num) 684 685 fr, err := db.s.stor.Open(fd) 686 if err != nil { 687 return err 688 } 689 690 // Create or reset journal reader instance. 691 if jr == nil { 692 jr = journal.NewReader(fr, dropper{db.s, fd}, strict, checksum) 693 } else { 694 jr.Reset(fr, dropper{db.s, fd}, strict, checksum) 695 } 696 697 // Replay journal to memdb. 698 for { 699 r, err := jr.Next() 700 if err != nil { 701 if err == io.EOF { 702 break 703 } 704 705 fr.Close() 706 return errors.SetFd(err, fd) 707 } 708 709 buf.Reset() 710 if _, err := buf.ReadFrom(r); err != nil { 711 if err == io.ErrUnexpectedEOF { 712 // This is error returned due to corruption, with strict == false. 713 continue 714 } 715 716 fr.Close() 717 return errors.SetFd(err, fd) 718 } 719 batchSeq, batchLen, err = decodeBatchToMem(buf.Bytes(), db.seq, mdb) 720 if err != nil { 721 if !strict && errors.IsCorrupted(err) { 722 db.s.logf("journal error: %v (skipped)", err) 723 // We won't apply sequence number as it might be corrupted. 724 continue 725 } 726 727 fr.Close() 728 return errors.SetFd(err, fd) 729 } 730 731 // Save sequence number. 732 db.seq = batchSeq + uint64(batchLen) 733 } 734 735 fr.Close() 736 } 737 } 738 739 // Set memDB. 740 db.mem = &memDB{db: db, DB: mdb, ref: 1} 741 742 return nil 743} 744 745func memGet(mdb *memdb.DB, ikey internalKey, icmp *iComparer) (ok bool, mv []byte, err error) { 746 mk, mv, err := mdb.Find(ikey) 747 if err == nil { 748 ukey, _, kt, kerr := parseInternalKey(mk) 749 if kerr != nil { 750 // Shouldn't have had happen. 751 panic(kerr) 752 } 753 if icmp.uCompare(ukey, ikey.ukey()) == 0 { 754 if kt == keyTypeDel { 755 return true, nil, ErrNotFound 756 } 757 return true, mv, nil 758 759 } 760 } else if err != ErrNotFound { 761 return true, nil, err 762 } 763 return 764} 765 766func (db *DB) get(auxm *memdb.DB, auxt tFiles, key []byte, seq uint64, ro *opt.ReadOptions) (value []byte, err error) { 767 ikey := makeInternalKey(nil, key, seq, keyTypeSeek) 768 769 if auxm != nil { 770 if ok, mv, me := memGet(auxm, ikey, db.s.icmp); ok { 771 return append([]byte{}, mv...), me 772 } 773 } 774 775 em, fm := db.getMems() 776 for _, m := range [...]*memDB{em, fm} { 777 if m == nil { 778 continue 779 } 780 defer m.decref() 781 782 if ok, mv, me := memGet(m.DB, ikey, db.s.icmp); ok { 783 return append([]byte{}, mv...), me 784 } 785 } 786 787 v := db.s.version() 788 value, cSched, err := v.get(auxt, ikey, ro, false) 789 v.release() 790 if cSched { 791 // Trigger table compaction. 792 db.compTrigger(db.tcompCmdC) 793 } 794 return 795} 796 797func nilIfNotFound(err error) error { 798 if err == ErrNotFound { 799 return nil 800 } 801 return err 802} 803 804func (db *DB) has(auxm *memdb.DB, auxt tFiles, key []byte, seq uint64, ro *opt.ReadOptions) (ret bool, err error) { 805 ikey := makeInternalKey(nil, key, seq, keyTypeSeek) 806 807 if auxm != nil { 808 if ok, _, me := memGet(auxm, ikey, db.s.icmp); ok { 809 return me == nil, nilIfNotFound(me) 810 } 811 } 812 813 em, fm := db.getMems() 814 for _, m := range [...]*memDB{em, fm} { 815 if m == nil { 816 continue 817 } 818 defer m.decref() 819 820 if ok, _, me := memGet(m.DB, ikey, db.s.icmp); ok { 821 return me == nil, nilIfNotFound(me) 822 } 823 } 824 825 v := db.s.version() 826 _, cSched, err := v.get(auxt, ikey, ro, true) 827 v.release() 828 if cSched { 829 // Trigger table compaction. 830 db.compTrigger(db.tcompCmdC) 831 } 832 if err == nil { 833 ret = true 834 } else if err == ErrNotFound { 835 err = nil 836 } 837 return 838} 839 840// Get gets the value for the given key. It returns ErrNotFound if the 841// DB does not contains the key. 842// 843// The returned slice is its own copy, it is safe to modify the contents 844// of the returned slice. 845// It is safe to modify the contents of the argument after Get returns. 846func (db *DB) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) { 847 err = db.ok() 848 if err != nil { 849 return 850 } 851 852 se := db.acquireSnapshot() 853 defer db.releaseSnapshot(se) 854 return db.get(nil, nil, key, se.seq, ro) 855} 856 857// Has returns true if the DB does contains the given key. 858// 859// It is safe to modify the contents of the argument after Has returns. 860func (db *DB) Has(key []byte, ro *opt.ReadOptions) (ret bool, err error) { 861 err = db.ok() 862 if err != nil { 863 return 864 } 865 866 se := db.acquireSnapshot() 867 defer db.releaseSnapshot(se) 868 return db.has(nil, nil, key, se.seq, ro) 869} 870 871// NewIterator returns an iterator for the latest snapshot of the 872// underlying DB. 873// The returned iterator is not safe for concurrent use, but it is safe to use 874// multiple iterators concurrently, with each in a dedicated goroutine. 875// It is also safe to use an iterator concurrently with modifying its 876// underlying DB. The resultant key/value pairs are guaranteed to be 877// consistent. 878// 879// Slice allows slicing the iterator to only contains keys in the given 880// range. A nil Range.Start is treated as a key before all keys in the 881// DB. And a nil Range.Limit is treated as a key after all keys in 882// the DB. 883// 884// WARNING: Any slice returned by interator (e.g. slice returned by calling 885// Iterator.Key() or Iterator.Key() methods), its content should not be modified 886// unless noted otherwise. 887// 888// The iterator must be released after use, by calling Release method. 889// 890// Also read Iterator documentation of the leveldb/iterator package. 891func (db *DB) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator { 892 if err := db.ok(); err != nil { 893 return iterator.NewEmptyIterator(err) 894 } 895 896 se := db.acquireSnapshot() 897 defer db.releaseSnapshot(se) 898 // Iterator holds 'version' lock, 'version' is immutable so snapshot 899 // can be released after iterator created. 900 return db.newIterator(nil, nil, se.seq, slice, ro) 901} 902 903// GetSnapshot returns a latest snapshot of the underlying DB. A snapshot 904// is a frozen snapshot of a DB state at a particular point in time. The 905// content of snapshot are guaranteed to be consistent. 906// 907// The snapshot must be released after use, by calling Release method. 908func (db *DB) GetSnapshot() (*Snapshot, error) { 909 if err := db.ok(); err != nil { 910 return nil, err 911 } 912 913 return db.newSnapshot(), nil 914} 915 916// GetProperty returns value of the given property name. 917// 918// Property names: 919// leveldb.num-files-at-level{n} 920// Returns the number of files at level 'n'. 921// leveldb.stats 922// Returns statistics of the underlying DB. 923// leveldb.iostats 924// Returns statistics of effective disk read and write. 925// leveldb.writedelay 926// Returns cumulative write delay caused by compaction. 927// leveldb.sstables 928// Returns sstables list for each level. 929// leveldb.blockpool 930// Returns block pool stats. 931// leveldb.cachedblock 932// Returns size of cached block. 933// leveldb.openedtables 934// Returns number of opened tables. 935// leveldb.alivesnaps 936// Returns number of alive snapshots. 937// leveldb.aliveiters 938// Returns number of alive iterators. 939func (db *DB) GetProperty(name string) (value string, err error) { 940 err = db.ok() 941 if err != nil { 942 return 943 } 944 945 const prefix = "leveldb." 946 if !strings.HasPrefix(name, prefix) { 947 return "", ErrNotFound 948 } 949 p := name[len(prefix):] 950 951 v := db.s.version() 952 defer v.release() 953 954 numFilesPrefix := "num-files-at-level" 955 switch { 956 case strings.HasPrefix(p, numFilesPrefix): 957 var level uint 958 var rest string 959 n, _ := fmt.Sscanf(p[len(numFilesPrefix):], "%d%s", &level, &rest) 960 if n != 1 { 961 err = ErrNotFound 962 } else { 963 value = fmt.Sprint(v.tLen(int(level))) 964 } 965 case p == "stats": 966 value = "Compactions\n" + 967 " Level | Tables | Size(MB) | Time(sec) | Read(MB) | Write(MB)\n" + 968 "-------+------------+---------------+---------------+---------------+---------------\n" 969 var totalTables int 970 var totalSize, totalRead, totalWrite int64 971 var totalDuration time.Duration 972 for level, tables := range v.levels { 973 duration, read, write := db.compStats.getStat(level) 974 if len(tables) == 0 && duration == 0 { 975 continue 976 } 977 totalTables += len(tables) 978 totalSize += tables.size() 979 totalRead += read 980 totalWrite += write 981 totalDuration += duration 982 value += fmt.Sprintf(" %3d | %10d | %13.5f | %13.5f | %13.5f | %13.5f\n", 983 level, len(tables), float64(tables.size())/1048576.0, duration.Seconds(), 984 float64(read)/1048576.0, float64(write)/1048576.0) 985 } 986 value += "-------+------------+---------------+---------------+---------------+---------------\n" 987 value += fmt.Sprintf(" Total | %10d | %13.5f | %13.5f | %13.5f | %13.5f\n", 988 totalTables, float64(totalSize)/1048576.0, totalDuration.Seconds(), 989 float64(totalRead)/1048576.0, float64(totalWrite)/1048576.0) 990 case p == "compcount": 991 value = fmt.Sprintf("MemComp:%d Level0Comp:%d NonLevel0Comp:%d SeekComp:%d", atomic.LoadUint32(&db.memComp), atomic.LoadUint32(&db.level0Comp), atomic.LoadUint32(&db.nonLevel0Comp), atomic.LoadUint32(&db.seekComp)) 992 case p == "iostats": 993 value = fmt.Sprintf("Read(MB):%.5f Write(MB):%.5f", 994 float64(db.s.stor.reads())/1048576.0, 995 float64(db.s.stor.writes())/1048576.0) 996 case p == "writedelay": 997 writeDelayN, writeDelay := atomic.LoadInt32(&db.cWriteDelayN), time.Duration(atomic.LoadInt64(&db.cWriteDelay)) 998 paused := atomic.LoadInt32(&db.inWritePaused) == 1 999 value = fmt.Sprintf("DelayN:%d Delay:%s Paused:%t", writeDelayN, writeDelay, paused) 1000 case p == "sstables": 1001 for level, tables := range v.levels { 1002 value += fmt.Sprintf("--- level %d ---\n", level) 1003 for _, t := range tables { 1004 value += fmt.Sprintf("%d:%d[%q .. %q]\n", t.fd.Num, t.size, t.imin, t.imax) 1005 } 1006 } 1007 case p == "blockpool": 1008 value = fmt.Sprintf("%v", db.s.tops.bpool) 1009 case p == "cachedblock": 1010 if db.s.tops.bcache != nil { 1011 value = fmt.Sprintf("%d", db.s.tops.bcache.Size()) 1012 } else { 1013 value = "<nil>" 1014 } 1015 case p == "openedtables": 1016 value = fmt.Sprintf("%d", db.s.tops.cache.Size()) 1017 case p == "alivesnaps": 1018 value = fmt.Sprintf("%d", atomic.LoadInt32(&db.aliveSnaps)) 1019 case p == "aliveiters": 1020 value = fmt.Sprintf("%d", atomic.LoadInt32(&db.aliveIters)) 1021 default: 1022 err = ErrNotFound 1023 } 1024 1025 return 1026} 1027 1028// DBStats is database statistics. 1029type DBStats struct { 1030 WriteDelayCount int32 1031 WriteDelayDuration time.Duration 1032 WritePaused bool 1033 1034 AliveSnapshots int32 1035 AliveIterators int32 1036 1037 IOWrite uint64 1038 IORead uint64 1039 1040 BlockCacheSize int 1041 OpenedTablesCount int 1042 1043 LevelSizes Sizes 1044 LevelTablesCounts []int 1045 LevelRead Sizes 1046 LevelWrite Sizes 1047 LevelDurations []time.Duration 1048 1049 MemComp uint32 1050 Level0Comp uint32 1051 NonLevel0Comp uint32 1052 SeekComp uint32 1053 1054 MemCompactionActive bool 1055 TableCompactionActive bool 1056} 1057 1058// Stats populates s with database statistics. 1059func (db *DB) Stats(s *DBStats) error { 1060 err := db.ok() 1061 if err != nil { 1062 return err 1063 } 1064 1065 s.IORead = db.s.stor.reads() 1066 s.IOWrite = db.s.stor.writes() 1067 s.WriteDelayCount = atomic.LoadInt32(&db.cWriteDelayN) 1068 s.WriteDelayDuration = time.Duration(atomic.LoadInt64(&db.cWriteDelay)) 1069 s.WritePaused = atomic.LoadInt32(&db.inWritePaused) == 1 1070 1071 s.OpenedTablesCount = db.s.tops.cache.Size() 1072 if db.s.tops.bcache != nil { 1073 s.BlockCacheSize = db.s.tops.bcache.Size() 1074 } else { 1075 s.BlockCacheSize = 0 1076 } 1077 1078 s.AliveIterators = atomic.LoadInt32(&db.aliveIters) 1079 s.AliveSnapshots = atomic.LoadInt32(&db.aliveSnaps) 1080 1081 s.LevelDurations = s.LevelDurations[:0] 1082 s.LevelRead = s.LevelRead[:0] 1083 s.LevelWrite = s.LevelWrite[:0] 1084 s.LevelSizes = s.LevelSizes[:0] 1085 s.LevelTablesCounts = s.LevelTablesCounts[:0] 1086 1087 func() { 1088 db.compActiveLk.RLock() 1089 defer db.compActiveLk.RUnlock() 1090 s.MemCompactionActive = db.memCompActive 1091 s.TableCompactionActive = db.tableCompActive 1092 }() 1093 1094 v := db.s.version() 1095 defer v.release() 1096 1097 for level, tables := range v.levels { 1098 duration, read, write := db.compStats.getStat(level) 1099 1100 s.LevelDurations = append(s.LevelDurations, duration) 1101 s.LevelRead = append(s.LevelRead, read) 1102 s.LevelWrite = append(s.LevelWrite, write) 1103 s.LevelSizes = append(s.LevelSizes, tables.size()) 1104 s.LevelTablesCounts = append(s.LevelTablesCounts, len(tables)) 1105 } 1106 s.MemComp = atomic.LoadUint32(&db.memComp) 1107 s.Level0Comp = atomic.LoadUint32(&db.level0Comp) 1108 s.NonLevel0Comp = atomic.LoadUint32(&db.nonLevel0Comp) 1109 s.SeekComp = atomic.LoadUint32(&db.seekComp) 1110 return nil 1111} 1112 1113// SizeOf calculates approximate sizes of the given key ranges. 1114// The length of the returned sizes are equal with the length of the given 1115// ranges. The returned sizes measure storage space usage, so if the user 1116// data compresses by a factor of ten, the returned sizes will be one-tenth 1117// the size of the corresponding user data size. 1118// The results may not include the sizes of recently written data. 1119func (db *DB) SizeOf(ranges []util.Range) (Sizes, error) { 1120 if err := db.ok(); err != nil { 1121 return nil, err 1122 } 1123 1124 v := db.s.version() 1125 defer v.release() 1126 1127 sizes := make(Sizes, 0, len(ranges)) 1128 for _, r := range ranges { 1129 imin := makeInternalKey(nil, r.Start, keyMaxSeq, keyTypeSeek) 1130 imax := makeInternalKey(nil, r.Limit, keyMaxSeq, keyTypeSeek) 1131 start, err := v.offsetOf(imin) 1132 if err != nil { 1133 return nil, err 1134 } 1135 limit, err := v.offsetOf(imax) 1136 if err != nil { 1137 return nil, err 1138 } 1139 var size int64 1140 if limit >= start { 1141 size = limit - start 1142 } 1143 sizes = append(sizes, size) 1144 } 1145 1146 return sizes, nil 1147} 1148 1149// Close closes the DB. This will also releases any outstanding snapshot, 1150// abort any in-flight compaction and discard open transaction. 1151// 1152// It is not safe to close a DB until all outstanding iterators are released. 1153// It is valid to call Close multiple times. Other methods should not be 1154// called after the DB has been closed. 1155func (db *DB) Close() error { 1156 if !db.setClosed() { 1157 return ErrClosed 1158 } 1159 1160 start := time.Now() 1161 db.log("db@close closing") 1162 1163 // Clear the finalizer. 1164 runtime.SetFinalizer(db, nil) 1165 1166 // Get compaction error. 1167 var err error 1168 select { 1169 case err = <-db.compErrC: 1170 if err == ErrReadOnly { 1171 err = nil 1172 } 1173 default: 1174 } 1175 1176 // Signal all goroutines. 1177 close(db.closeC) 1178 1179 // Discard open transaction. 1180 if db.tr != nil { 1181 db.tr.Discard() 1182 } 1183 1184 // Acquire writer lock. 1185 db.writeLockC <- struct{}{} 1186 1187 // Wait for all gorotines to exit. 1188 db.closeW.Wait() 1189 1190 // Closes journal. 1191 if db.journal != nil { 1192 db.journal.Close() 1193 db.journalWriter.Close() 1194 db.journal = nil 1195 db.journalWriter = nil 1196 } 1197 1198 if db.writeDelayN > 0 { 1199 db.logf("db@write was delayed N·%d T·%v", db.writeDelayN, db.writeDelay) 1200 } 1201 1202 // Close session. 1203 db.s.close() 1204 db.logf("db@close done T·%v", time.Since(start)) 1205 db.s.release() 1206 1207 if db.closer != nil { 1208 if err1 := db.closer.Close(); err == nil { 1209 err = err1 1210 } 1211 db.closer = nil 1212 } 1213 1214 // Clear memdbs. 1215 db.clearMems() 1216 1217 return err 1218} 1219