1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 #pragma once
6 
7 #ifndef ROCKSDB_LITE
8 #include <vector>
9 
10 #include "db/log_reader.h"
11 #include "db/version_set.h"
12 #include "file/filename.h"
13 #include "options/db_options.h"
14 #include "port/port.h"
15 #include "rocksdb/env.h"
16 #include "rocksdb/options.h"
17 #include "rocksdb/transaction_log.h"
18 #include "rocksdb/types.h"
19 
20 namespace ROCKSDB_NAMESPACE {
21 
22 class LogFileImpl : public LogFile {
23  public:
LogFileImpl(uint64_t logNum,WalFileType logType,SequenceNumber startSeq,uint64_t sizeBytes)24   LogFileImpl(uint64_t logNum, WalFileType logType, SequenceNumber startSeq,
25               uint64_t sizeBytes) :
26     logNumber_(logNum),
27     type_(logType),
28     startSequence_(startSeq),
29     sizeFileBytes_(sizeBytes) {
30   }
31 
PathName()32   std::string PathName() const override {
33     if (type_ == kArchivedLogFile) {
34       return ArchivedLogFileName("", logNumber_);
35     }
36     return LogFileName("", logNumber_);
37   }
38 
LogNumber()39   uint64_t LogNumber() const override { return logNumber_; }
40 
Type()41   WalFileType Type() const override { return type_; }
42 
StartSequence()43   SequenceNumber StartSequence() const override { return startSequence_; }
44 
SizeFileBytes()45   uint64_t SizeFileBytes() const override { return sizeFileBytes_; }
46 
47   bool operator < (const LogFile& that) const {
48     return LogNumber() < that.LogNumber();
49   }
50 
51  private:
52   uint64_t logNumber_;
53   WalFileType type_;
54   SequenceNumber startSequence_;
55   uint64_t sizeFileBytes_;
56 
57 };
58 
59 class TransactionLogIteratorImpl : public TransactionLogIterator {
60  public:
61   TransactionLogIteratorImpl(
62       const std::string& dir, const ImmutableDBOptions* options,
63       const TransactionLogIterator::ReadOptions& read_options,
64       const EnvOptions& soptions, const SequenceNumber seqNum,
65       std::unique_ptr<VectorLogPtr> files, VersionSet const* const versions,
66       const bool seq_per_batch);
67 
68   virtual bool Valid() override;
69 
70   virtual void Next() override;
71 
72   virtual Status status() override;
73 
74   virtual BatchResult GetBatch() override;
75 
76  private:
77   const std::string& dir_;
78   const ImmutableDBOptions* options_;
79   const TransactionLogIterator::ReadOptions read_options_;
80   const EnvOptions& soptions_;
81   SequenceNumber starting_sequence_number_;
82   std::unique_ptr<VectorLogPtr> files_;
83   bool started_;
84   bool is_valid_;  // not valid when it starts of.
85   Status current_status_;
86   size_t current_file_index_;
87   std::unique_ptr<WriteBatch> current_batch_;
88   std::unique_ptr<log::Reader> current_log_reader_;
89   std::string scratch_;
90   Status OpenLogFile(const LogFile* log_file,
91                      std::unique_ptr<SequentialFileReader>* file);
92 
93   struct LogReporter : public log::Reader::Reporter {
94     Env* env;
95     Logger* info_log;
CorruptionLogReporter96     virtual void Corruption(size_t bytes, const Status& s) override {
97       ROCKS_LOG_ERROR(info_log, "dropping %" ROCKSDB_PRIszt " bytes; %s", bytes,
98                       s.ToString().c_str());
99     }
InfoLogReporter100     virtual void Info(const char* s) { ROCKS_LOG_INFO(info_log, "%s", s); }
101   } reporter_;
102 
103   SequenceNumber
104       current_batch_seq_;  // sequence number at start of current batch
105   SequenceNumber current_last_seq_;  // last sequence in the current batch
106   // Used only to get latest seq. num
107   // TODO(icanadi) can this be just a callback?
108   VersionSet const* const versions_;
109   const bool seq_per_batch_;
110   // Reads from transaction log only if the writebatch record has been written
111   bool RestrictedRead(Slice* record);
112   // Seeks to startingSequenceNumber reading from startFileIndex in files_.
113   // If strict is set,then must get a batch starting with startingSequenceNumber
114   void SeekToStartSequence(uint64_t start_file_index = 0, bool strict = false);
115   // Implementation of Next. SeekToStartSequence calls it internally with
116   // internal=true to let it find next entry even if it has to jump gaps because
117   // the iterator may start off from the first available entry but promises to
118   // be continuous after that
119   void NextImpl(bool internal = false);
120   // Check if batch is expected, else return false
121   bool IsBatchExpected(const WriteBatch* batch, SequenceNumber expected_seq);
122   // Update current batch if a continuous batch is found, else return false
123   void UpdateCurrentWriteBatch(const Slice& record);
124   Status OpenLogReader(const LogFile* file);
125 };
126 }  // namespace ROCKSDB_NAMESPACE
127 #endif  // ROCKSDB_LITE
128