1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #ifndef ROCKSDB_LITE
7 
8 #include "db/wal_manager.h"
9 
10 #include <map>
11 #include <string>
12 
13 #include "db/column_family.h"
14 #include "db/db_impl/db_impl.h"
15 #include "db/log_writer.h"
16 #include "db/version_set.h"
17 #include "env/mock_env.h"
18 #include "file/writable_file_writer.h"
19 #include "rocksdb/cache.h"
20 #include "rocksdb/file_system.h"
21 #include "rocksdb/write_batch.h"
22 #include "rocksdb/write_buffer_manager.h"
23 #include "table/mock_table.h"
24 #include "test_util/testharness.h"
25 #include "test_util/testutil.h"
26 #include "util/string_util.h"
27 
28 namespace ROCKSDB_NAMESPACE {
29 
30 // TODO(icanadi) mock out VersionSet
31 // TODO(icanadi) move other WalManager-specific tests from db_test here
32 class WalManagerTest : public testing::Test {
33  public:
WalManagerTest()34   WalManagerTest()
35       : env_(new MockEnv(Env::Default())),
36         dbname_(test::PerThreadDBPath("wal_manager_test")),
37         db_options_(),
38         table_cache_(NewLRUCache(50000, 16)),
39         write_buffer_manager_(db_options_.db_write_buffer_size),
40         current_log_number_(0) {
41     DestroyDB(dbname_, Options());
42   }
43 
Init()44   void Init() {
45     ASSERT_OK(env_->CreateDirIfMissing(dbname_));
46     ASSERT_OK(env_->CreateDirIfMissing(ArchivalDirectory(dbname_)));
47     db_options_.db_paths.emplace_back(dbname_,
48                                       std::numeric_limits<uint64_t>::max());
49     db_options_.wal_dir = dbname_;
50     db_options_.env = env_.get();
51     db_options_.fs = env_->GetFileSystem();
52     db_options_.clock = env_->GetSystemClock().get();
53 
54     versions_.reset(
55         new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
56                        &write_buffer_manager_, &write_controller_,
57                        /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
58 
59     wal_manager_.reset(
60         new WalManager(db_options_, env_options_, nullptr /*IOTracer*/));
61   }
62 
Reopen()63   void Reopen() {
64     wal_manager_.reset(
65         new WalManager(db_options_, env_options_, nullptr /*IOTracer*/));
66   }
67 
68   // NOT thread safe
Put(const std::string & key,const std::string & value)69   void Put(const std::string& key, const std::string& value) {
70     assert(current_log_writer_.get() != nullptr);
71     uint64_t seq =  versions_->LastSequence() + 1;
72     WriteBatch batch;
73     ASSERT_OK(batch.Put(key, value));
74     WriteBatchInternal::SetSequence(&batch, seq);
75     ASSERT_OK(
76         current_log_writer_->AddRecord(WriteBatchInternal::Contents(&batch)));
77     versions_->SetLastAllocatedSequence(seq);
78     versions_->SetLastPublishedSequence(seq);
79     versions_->SetLastSequence(seq);
80   }
81 
82   // NOT thread safe
RollTheLog(bool)83   void RollTheLog(bool /*archived*/) {
84     current_log_number_++;
85     std::string fname = ArchivedLogFileName(dbname_, current_log_number_);
86     const auto& fs = env_->GetFileSystem();
87     std::unique_ptr<WritableFileWriter> file_writer;
88     ASSERT_OK(WritableFileWriter::Create(fs, fname, env_options_, &file_writer,
89                                          nullptr));
90     current_log_writer_.reset(new log::Writer(std::move(file_writer), 0, false));
91   }
92 
CreateArchiveLogs(int num_logs,int entries_per_log)93   void CreateArchiveLogs(int num_logs, int entries_per_log) {
94     for (int i = 1; i <= num_logs; ++i) {
95       RollTheLog(true);
96       for (int k = 0; k < entries_per_log; ++k) {
97         Put(ToString(k), std::string(1024, 'a'));
98       }
99     }
100   }
101 
OpenTransactionLogIter(const SequenceNumber seq)102   std::unique_ptr<TransactionLogIterator> OpenTransactionLogIter(
103       const SequenceNumber seq) {
104     std::unique_ptr<TransactionLogIterator> iter;
105     Status status = wal_manager_->GetUpdatesSince(
106         seq, &iter, TransactionLogIterator::ReadOptions(), versions_.get());
107     EXPECT_OK(status);
108     return iter;
109   }
110 
111   std::unique_ptr<MockEnv> env_;
112   std::string dbname_;
113   ImmutableDBOptions db_options_;
114   WriteController write_controller_;
115   EnvOptions env_options_;
116   std::shared_ptr<Cache> table_cache_;
117   WriteBufferManager write_buffer_manager_;
118   std::unique_ptr<VersionSet> versions_;
119   std::unique_ptr<WalManager> wal_manager_;
120 
121   std::unique_ptr<log::Writer> current_log_writer_;
122   uint64_t current_log_number_;
123 };
124 
TEST_F(WalManagerTest,ReadFirstRecordCache)125 TEST_F(WalManagerTest, ReadFirstRecordCache) {
126   Init();
127   std::string path = dbname_ + "/000001.log";
128   std::unique_ptr<FSWritableFile> file;
129   ASSERT_OK(env_->GetFileSystem()->NewWritableFile(path, FileOptions(), &file,
130                                                    nullptr));
131 
132   SequenceNumber s;
133   ASSERT_OK(wal_manager_->TEST_ReadFirstLine(path, 1 /* number */, &s));
134   ASSERT_EQ(s, 0U);
135 
136   ASSERT_OK(
137       wal_manager_->TEST_ReadFirstRecord(kAliveLogFile, 1 /* number */, &s));
138   ASSERT_EQ(s, 0U);
139 
140   std::unique_ptr<WritableFileWriter> file_writer(
141       new WritableFileWriter(std::move(file), path, FileOptions()));
142   log::Writer writer(std::move(file_writer), 1,
143                      db_options_.recycle_log_file_num > 0);
144   WriteBatch batch;
145   ASSERT_OK(batch.Put("foo", "bar"));
146   WriteBatchInternal::SetSequence(&batch, 10);
147   ASSERT_OK(writer.AddRecord(WriteBatchInternal::Contents(&batch)));
148 
149   // TODO(icanadi) move SpecialEnv outside of db_test, so we can reuse it here.
150   // Waiting for lei to finish with db_test
151   // env_->count_sequential_reads_ = true;
152   // sequential_read_counter_ sanity test
153   // ASSERT_EQ(env_->sequential_read_counter_.Read(), 0);
154 
155   ASSERT_OK(wal_manager_->TEST_ReadFirstRecord(kAliveLogFile, 1, &s));
156   ASSERT_EQ(s, 10U);
157   // did a read
158   // TODO(icanadi) move SpecialEnv outside of db_test, so we can reuse it here
159   // ASSERT_EQ(env_->sequential_read_counter_.Read(), 1);
160 
161   ASSERT_OK(wal_manager_->TEST_ReadFirstRecord(kAliveLogFile, 1, &s));
162   ASSERT_EQ(s, 10U);
163   // no new reads since the value is cached
164   // TODO(icanadi) move SpecialEnv outside of db_test, so we can reuse it here
165   // ASSERT_EQ(env_->sequential_read_counter_.Read(), 1);
166 }
167 
168 namespace {
GetLogDirSize(std::string dir_path,Env * env)169 uint64_t GetLogDirSize(std::string dir_path, Env* env) {
170   uint64_t dir_size = 0;
171   std::vector<std::string> files;
172   EXPECT_OK(env->GetChildren(dir_path, &files));
173   for (auto& f : files) {
174     uint64_t number;
175     FileType type;
176     if (ParseFileName(f, &number, &type) && type == kWalFile) {
177       std::string const file_path = dir_path + "/" + f;
178       uint64_t file_size;
179       EXPECT_OK(env->GetFileSize(file_path, &file_size));
180       dir_size += file_size;
181     }
182   }
183   return dir_size;
184 }
ListSpecificFiles(Env * env,const std::string & path,const FileType expected_file_type)185 std::vector<std::uint64_t> ListSpecificFiles(
186     Env* env, const std::string& path, const FileType expected_file_type) {
187   std::vector<std::string> files;
188   std::vector<uint64_t> file_numbers;
189   uint64_t number;
190   FileType type;
191   EXPECT_OK(env->GetChildren(path, &files));
192   for (size_t i = 0; i < files.size(); ++i) {
193     if (ParseFileName(files[i], &number, &type)) {
194       if (type == expected_file_type) {
195         file_numbers.push_back(number);
196       }
197     }
198   }
199   return file_numbers;
200 }
201 
CountRecords(TransactionLogIterator * iter)202 int CountRecords(TransactionLogIterator* iter) {
203   int count = 0;
204   SequenceNumber lastSequence = 0;
205   BatchResult res;
206   while (iter->Valid()) {
207     res = iter->GetBatch();
208     EXPECT_TRUE(res.sequence > lastSequence);
209     ++count;
210     lastSequence = res.sequence;
211     EXPECT_OK(iter->status());
212     iter->Next();
213   }
214   EXPECT_OK(iter->status());
215   return count;
216 }
217 }  // namespace
218 
TEST_F(WalManagerTest,WALArchivalSizeLimit)219 TEST_F(WalManagerTest, WALArchivalSizeLimit) {
220   db_options_.WAL_ttl_seconds = 0;
221   db_options_.WAL_size_limit_MB = 1000;
222   Init();
223 
224   // TEST : Create WalManager with huge size limit and no ttl.
225   // Create some archived files and call PurgeObsoleteWALFiles().
226   // Count the archived log files that survived.
227   // Assert that all of them did.
228   // Change size limit. Re-open WalManager.
229   // Assert that archive is not greater than WAL_size_limit_MB after
230   // PurgeObsoleteWALFiles()
231   // Set ttl and time_to_check_ to small values. Re-open db.
232   // Assert that there are no archived logs left.
233 
234   std::string archive_dir = ArchivalDirectory(dbname_);
235   CreateArchiveLogs(20, 5000);
236 
237   std::vector<std::uint64_t> log_files =
238       ListSpecificFiles(env_.get(), archive_dir, kWalFile);
239   ASSERT_EQ(log_files.size(), 20U);
240 
241   db_options_.WAL_size_limit_MB = 8;
242   Reopen();
243   wal_manager_->PurgeObsoleteWALFiles();
244 
245   uint64_t archive_size = GetLogDirSize(archive_dir, env_.get());
246   ASSERT_TRUE(archive_size <= db_options_.WAL_size_limit_MB * 1024 * 1024);
247 
248   db_options_.WAL_ttl_seconds = 1;
249   env_->FakeSleepForMicroseconds(2 * 1000 * 1000);
250   Reopen();
251   wal_manager_->PurgeObsoleteWALFiles();
252 
253   log_files = ListSpecificFiles(env_.get(), archive_dir, kWalFile);
254   ASSERT_TRUE(log_files.empty());
255 }
256 
TEST_F(WalManagerTest,WALArchivalTtl)257 TEST_F(WalManagerTest, WALArchivalTtl) {
258   db_options_.WAL_ttl_seconds = 1000;
259   Init();
260 
261   // TEST : Create WalManager with a ttl and no size limit.
262   // Create some archived log files and call PurgeObsoleteWALFiles().
263   // Assert that files are not deleted
264   // Reopen db with small ttl.
265   // Assert that all archived logs was removed.
266 
267   std::string archive_dir = ArchivalDirectory(dbname_);
268   CreateArchiveLogs(20, 5000);
269 
270   std::vector<uint64_t> log_files =
271       ListSpecificFiles(env_.get(), archive_dir, kWalFile);
272   ASSERT_GT(log_files.size(), 0U);
273 
274   db_options_.WAL_ttl_seconds = 1;
275   env_->FakeSleepForMicroseconds(3 * 1000 * 1000);
276   Reopen();
277   wal_manager_->PurgeObsoleteWALFiles();
278 
279   log_files = ListSpecificFiles(env_.get(), archive_dir, kWalFile);
280   ASSERT_TRUE(log_files.empty());
281 }
282 
TEST_F(WalManagerTest,TransactionLogIteratorMoveOverZeroFiles)283 TEST_F(WalManagerTest, TransactionLogIteratorMoveOverZeroFiles) {
284   Init();
285   RollTheLog(false);
286   Put("key1", std::string(1024, 'a'));
287   // Create a zero record WAL file.
288   RollTheLog(false);
289   RollTheLog(false);
290 
291   Put("key2", std::string(1024, 'a'));
292 
293   auto iter = OpenTransactionLogIter(0);
294   ASSERT_EQ(2, CountRecords(iter.get()));
295 }
296 
TEST_F(WalManagerTest,TransactionLogIteratorJustEmptyFile)297 TEST_F(WalManagerTest, TransactionLogIteratorJustEmptyFile) {
298   Init();
299   RollTheLog(false);
300   auto iter = OpenTransactionLogIter(0);
301   // Check that an empty iterator is returned
302   ASSERT_TRUE(!iter->Valid());
303 }
304 
TEST_F(WalManagerTest,TransactionLogIteratorNewFileWhileScanning)305 TEST_F(WalManagerTest, TransactionLogIteratorNewFileWhileScanning) {
306   Init();
307   CreateArchiveLogs(2, 100);
308   auto iter = OpenTransactionLogIter(0);
309   CreateArchiveLogs(1, 100);
310   int i = 0;
311   for (; iter->Valid(); iter->Next()) {
312     i++;
313   }
314   ASSERT_EQ(i, 200);
315   // A new log file was added after the iterator was created.
316   // TryAgain indicates a new iterator is needed to fetch the new data
317   ASSERT_TRUE(iter->status().IsTryAgain());
318 
319   iter = OpenTransactionLogIter(0);
320   i = 0;
321   for (; iter->Valid(); iter->Next()) {
322     i++;
323   }
324   ASSERT_EQ(i, 300);
325   ASSERT_TRUE(iter->status().ok());
326 }
327 
328 }  // namespace ROCKSDB_NAMESPACE
329 
main(int argc,char ** argv)330 int main(int argc, char** argv) {
331   ::testing::InitGoogleTest(&argc, argv);
332   return RUN_ALL_TESTS();
333 }
334 
335 #else
336 #include <stdio.h>
337 
main(int,char **)338 int main(int /*argc*/, char** /*argv*/) {
339   fprintf(stderr, "SKIPPED as WalManager is not supported in ROCKSDB_LITE\n");
340   return 0;
341 }
342 
343 #endif  // !ROCKSDB_LITE
344