1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. 2 // This source code is licensed under both the GPLv2 (found in the 3 // COPYING file in the root directory) and Apache 2.0 License 4 // (found in the LICENSE.Apache file in the root directory). 5 #pragma once 6 7 #ifndef ROCKSDB_LITE 8 #include <vector> 9 10 #include "db/log_reader.h" 11 #include "db/version_set.h" 12 #include "file/filename.h" 13 #include "logging/logging.h" 14 #include "options/db_options.h" 15 #include "port/port.h" 16 #include "rocksdb/env.h" 17 #include "rocksdb/options.h" 18 #include "rocksdb/transaction_log.h" 19 #include "rocksdb/types.h" 20 21 namespace ROCKSDB_NAMESPACE { 22 23 class LogFileImpl : public LogFile { 24 public: LogFileImpl(uint64_t logNum,WalFileType logType,SequenceNumber startSeq,uint64_t sizeBytes)25 LogFileImpl(uint64_t logNum, WalFileType logType, SequenceNumber startSeq, 26 uint64_t sizeBytes) : 27 logNumber_(logNum), 28 type_(logType), 29 startSequence_(startSeq), 30 sizeFileBytes_(sizeBytes) { 31 } 32 PathName()33 std::string PathName() const override { 34 if (type_ == kArchivedLogFile) { 35 return ArchivedLogFileName("", logNumber_); 36 } 37 return LogFileName("", logNumber_); 38 } 39 LogNumber()40 uint64_t LogNumber() const override { return logNumber_; } 41 Type()42 WalFileType Type() const override { return type_; } 43 StartSequence()44 SequenceNumber StartSequence() const override { return startSequence_; } 45 SizeFileBytes()46 uint64_t SizeFileBytes() const override { return sizeFileBytes_; } 47 48 bool operator < (const LogFile& that) const { 49 return LogNumber() < that.LogNumber(); 50 } 51 52 private: 53 uint64_t logNumber_; 54 WalFileType type_; 55 SequenceNumber startSequence_; 56 uint64_t sizeFileBytes_; 57 58 }; 59 60 class TransactionLogIteratorImpl : public TransactionLogIterator { 61 public: 62 TransactionLogIteratorImpl( 63 const std::string& dir, const ImmutableDBOptions* options, 64 const TransactionLogIterator::ReadOptions& read_options, 65 const EnvOptions& soptions, const SequenceNumber seqNum, 66 std::unique_ptr<VectorLogPtr> files, VersionSet const* const versions, 67 const bool seq_per_batch, const std::shared_ptr<IOTracer>& io_tracer); 68 69 virtual bool Valid() override; 70 71 virtual void Next() override; 72 73 virtual Status status() override; 74 75 virtual BatchResult GetBatch() override; 76 77 private: 78 const std::string& dir_; 79 const ImmutableDBOptions* options_; 80 const TransactionLogIterator::ReadOptions read_options_; 81 const EnvOptions& soptions_; 82 SequenceNumber starting_sequence_number_; 83 std::unique_ptr<VectorLogPtr> files_; 84 bool started_; 85 bool is_valid_; // not valid when it starts of. 86 Status current_status_; 87 size_t current_file_index_; 88 std::unique_ptr<WriteBatch> current_batch_; 89 std::unique_ptr<log::Reader> current_log_reader_; 90 std::string scratch_; 91 Status OpenLogFile(const LogFile* log_file, 92 std::unique_ptr<SequentialFileReader>* file); 93 94 struct LogReporter : public log::Reader::Reporter { 95 Env* env; 96 Logger* info_log; CorruptionLogReporter97 virtual void Corruption(size_t bytes, const Status& s) override { 98 ROCKS_LOG_ERROR(info_log, "dropping %" ROCKSDB_PRIszt " bytes; %s", bytes, 99 s.ToString().c_str()); 100 } InfoLogReporter101 virtual void Info(const char* s) { ROCKS_LOG_INFO(info_log, "%s", s); } 102 } reporter_; 103 104 SequenceNumber 105 current_batch_seq_; // sequence number at start of current batch 106 SequenceNumber current_last_seq_; // last sequence in the current batch 107 // Used only to get latest seq. num 108 // TODO(icanadi) can this be just a callback? 109 VersionSet const* const versions_; 110 const bool seq_per_batch_; 111 // Reads from transaction log only if the writebatch record has been written 112 bool RestrictedRead(Slice* record); 113 // Seeks to startingSequenceNumber reading from startFileIndex in files_. 114 // If strict is set,then must get a batch starting with startingSequenceNumber 115 void SeekToStartSequence(uint64_t start_file_index = 0, bool strict = false); 116 // Implementation of Next. SeekToStartSequence calls it internally with 117 // internal=true to let it find next entry even if it has to jump gaps because 118 // the iterator may start off from the first available entry but promises to 119 // be continuous after that 120 void NextImpl(bool internal = false); 121 // Check if batch is expected, else return false 122 bool IsBatchExpected(const WriteBatch* batch, SequenceNumber expected_seq); 123 // Update current batch if a continuous batch is found, else return false 124 void UpdateCurrentWriteBatch(const Slice& record); 125 Status OpenLogReader(const LogFile* file); 126 std::shared_ptr<IOTracer> io_tracer_; 127 }; 128 } // namespace ROCKSDB_NAMESPACE 129 #endif // ROCKSDB_LITE 130