1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 #pragma once
6 
7 #ifndef ROCKSDB_LITE
8 #include <vector>
9 
10 #include "db/log_reader.h"
11 #include "db/version_set.h"
12 #include "file/filename.h"
13 #include "logging/logging.h"
14 #include "options/db_options.h"
15 #include "port/port.h"
16 #include "rocksdb/env.h"
17 #include "rocksdb/options.h"
18 #include "rocksdb/transaction_log.h"
19 #include "rocksdb/types.h"
20 
21 namespace ROCKSDB_NAMESPACE {
22 
23 class LogFileImpl : public LogFile {
24  public:
LogFileImpl(uint64_t logNum,WalFileType logType,SequenceNumber startSeq,uint64_t sizeBytes)25   LogFileImpl(uint64_t logNum, WalFileType logType, SequenceNumber startSeq,
26               uint64_t sizeBytes) :
27     logNumber_(logNum),
28     type_(logType),
29     startSequence_(startSeq),
30     sizeFileBytes_(sizeBytes) {
31   }
32 
PathName()33   std::string PathName() const override {
34     if (type_ == kArchivedLogFile) {
35       return ArchivedLogFileName("", logNumber_);
36     }
37     return LogFileName("", logNumber_);
38   }
39 
LogNumber()40   uint64_t LogNumber() const override { return logNumber_; }
41 
Type()42   WalFileType Type() const override { return type_; }
43 
StartSequence()44   SequenceNumber StartSequence() const override { return startSequence_; }
45 
SizeFileBytes()46   uint64_t SizeFileBytes() const override { return sizeFileBytes_; }
47 
48   bool operator < (const LogFile& that) const {
49     return LogNumber() < that.LogNumber();
50   }
51 
52  private:
53   uint64_t logNumber_;
54   WalFileType type_;
55   SequenceNumber startSequence_;
56   uint64_t sizeFileBytes_;
57 
58 };
59 
60 class TransactionLogIteratorImpl : public TransactionLogIterator {
61  public:
62   TransactionLogIteratorImpl(
63       const std::string& dir, const ImmutableDBOptions* options,
64       const TransactionLogIterator::ReadOptions& read_options,
65       const EnvOptions& soptions, const SequenceNumber seqNum,
66       std::unique_ptr<VectorLogPtr> files, VersionSet const* const versions,
67       const bool seq_per_batch, const std::shared_ptr<IOTracer>& io_tracer);
68 
69   virtual bool Valid() override;
70 
71   virtual void Next() override;
72 
73   virtual Status status() override;
74 
75   virtual BatchResult GetBatch() override;
76 
77  private:
78   const std::string& dir_;
79   const ImmutableDBOptions* options_;
80   const TransactionLogIterator::ReadOptions read_options_;
81   const EnvOptions& soptions_;
82   SequenceNumber starting_sequence_number_;
83   std::unique_ptr<VectorLogPtr> files_;
84   bool started_;
85   bool is_valid_;  // not valid when it starts of.
86   Status current_status_;
87   size_t current_file_index_;
88   std::unique_ptr<WriteBatch> current_batch_;
89   std::unique_ptr<log::Reader> current_log_reader_;
90   std::string scratch_;
91   Status OpenLogFile(const LogFile* log_file,
92                      std::unique_ptr<SequentialFileReader>* file);
93 
94   struct LogReporter : public log::Reader::Reporter {
95     Env* env;
96     Logger* info_log;
CorruptionLogReporter97     virtual void Corruption(size_t bytes, const Status& s) override {
98       ROCKS_LOG_ERROR(info_log, "dropping %" ROCKSDB_PRIszt " bytes; %s", bytes,
99                       s.ToString().c_str());
100     }
InfoLogReporter101     virtual void Info(const char* s) { ROCKS_LOG_INFO(info_log, "%s", s); }
102   } reporter_;
103 
104   SequenceNumber
105       current_batch_seq_;  // sequence number at start of current batch
106   SequenceNumber current_last_seq_;  // last sequence in the current batch
107   // Used only to get latest seq. num
108   // TODO(icanadi) can this be just a callback?
109   VersionSet const* const versions_;
110   const bool seq_per_batch_;
111   // Reads from transaction log only if the writebatch record has been written
112   bool RestrictedRead(Slice* record);
113   // Seeks to startingSequenceNumber reading from startFileIndex in files_.
114   // If strict is set,then must get a batch starting with startingSequenceNumber
115   void SeekToStartSequence(uint64_t start_file_index = 0, bool strict = false);
116   // Implementation of Next. SeekToStartSequence calls it internally with
117   // internal=true to let it find next entry even if it has to jump gaps because
118   // the iterator may start off from the first available entry but promises to
119   // be continuous after that
120   void NextImpl(bool internal = false);
121   // Check if batch is expected, else return false
122   bool IsBatchExpected(const WriteBatch* batch, SequenceNumber expected_seq);
123   // Update current batch if a continuous batch is found, else return false
124   void UpdateCurrentWriteBatch(const Slice& record);
125   Status OpenLogReader(const LogFile* file);
126   std::shared_ptr<IOTracer> io_tracer_;
127 };
128 }  // namespace ROCKSDB_NAMESPACE
129 #endif  // ROCKSDB_LITE
130