1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #ifndef ROCKSDB_LITE
7
8 #include "db/wal_manager.h"
9
10 #include <map>
11 #include <string>
12
13 #include "db/column_family.h"
14 #include "db/db_impl/db_impl.h"
15 #include "db/log_writer.h"
16 #include "db/version_set.h"
17 #include "env/mock_env.h"
18 #include "file/writable_file_writer.h"
19 #include "rocksdb/cache.h"
20 #include "rocksdb/file_system.h"
21 #include "rocksdb/write_batch.h"
22 #include "rocksdb/write_buffer_manager.h"
23 #include "table/mock_table.h"
24 #include "test_util/testharness.h"
25 #include "test_util/testutil.h"
26 #include "util/string_util.h"
27
28 namespace ROCKSDB_NAMESPACE {
29
30 // TODO(icanadi) mock out VersionSet
31 // TODO(icanadi) move other WalManager-specific tests from db_test here
32 class WalManagerTest : public testing::Test {
33 public:
WalManagerTest()34 WalManagerTest()
35 : env_(new MockEnv(Env::Default())),
36 dbname_(test::PerThreadDBPath("wal_manager_test")),
37 db_options_(),
38 table_cache_(NewLRUCache(50000, 16)),
39 write_buffer_manager_(db_options_.db_write_buffer_size),
40 current_log_number_(0) {
41 DestroyDB(dbname_, Options());
42 }
43
Init()44 void Init() {
45 ASSERT_OK(env_->CreateDirIfMissing(dbname_));
46 ASSERT_OK(env_->CreateDirIfMissing(ArchivalDirectory(dbname_)));
47 db_options_.db_paths.emplace_back(dbname_,
48 std::numeric_limits<uint64_t>::max());
49 db_options_.wal_dir = dbname_;
50 db_options_.env = env_.get();
51 db_options_.fs = env_->GetFileSystem();
52 db_options_.clock = env_->GetSystemClock().get();
53
54 versions_.reset(
55 new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
56 &write_buffer_manager_, &write_controller_,
57 /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
58
59 wal_manager_.reset(
60 new WalManager(db_options_, env_options_, nullptr /*IOTracer*/));
61 }
62
Reopen()63 void Reopen() {
64 wal_manager_.reset(
65 new WalManager(db_options_, env_options_, nullptr /*IOTracer*/));
66 }
67
68 // NOT thread safe
Put(const std::string & key,const std::string & value)69 void Put(const std::string& key, const std::string& value) {
70 assert(current_log_writer_.get() != nullptr);
71 uint64_t seq = versions_->LastSequence() + 1;
72 WriteBatch batch;
73 ASSERT_OK(batch.Put(key, value));
74 WriteBatchInternal::SetSequence(&batch, seq);
75 ASSERT_OK(
76 current_log_writer_->AddRecord(WriteBatchInternal::Contents(&batch)));
77 versions_->SetLastAllocatedSequence(seq);
78 versions_->SetLastPublishedSequence(seq);
79 versions_->SetLastSequence(seq);
80 }
81
82 // NOT thread safe
RollTheLog(bool)83 void RollTheLog(bool /*archived*/) {
84 current_log_number_++;
85 std::string fname = ArchivedLogFileName(dbname_, current_log_number_);
86 const auto& fs = env_->GetFileSystem();
87 std::unique_ptr<WritableFileWriter> file_writer;
88 ASSERT_OK(WritableFileWriter::Create(fs, fname, env_options_, &file_writer,
89 nullptr));
90 current_log_writer_.reset(new log::Writer(std::move(file_writer), 0, false));
91 }
92
CreateArchiveLogs(int num_logs,int entries_per_log)93 void CreateArchiveLogs(int num_logs, int entries_per_log) {
94 for (int i = 1; i <= num_logs; ++i) {
95 RollTheLog(true);
96 for (int k = 0; k < entries_per_log; ++k) {
97 Put(ToString(k), std::string(1024, 'a'));
98 }
99 }
100 }
101
OpenTransactionLogIter(const SequenceNumber seq)102 std::unique_ptr<TransactionLogIterator> OpenTransactionLogIter(
103 const SequenceNumber seq) {
104 std::unique_ptr<TransactionLogIterator> iter;
105 Status status = wal_manager_->GetUpdatesSince(
106 seq, &iter, TransactionLogIterator::ReadOptions(), versions_.get());
107 EXPECT_OK(status);
108 return iter;
109 }
110
111 std::unique_ptr<MockEnv> env_;
112 std::string dbname_;
113 ImmutableDBOptions db_options_;
114 WriteController write_controller_;
115 EnvOptions env_options_;
116 std::shared_ptr<Cache> table_cache_;
117 WriteBufferManager write_buffer_manager_;
118 std::unique_ptr<VersionSet> versions_;
119 std::unique_ptr<WalManager> wal_manager_;
120
121 std::unique_ptr<log::Writer> current_log_writer_;
122 uint64_t current_log_number_;
123 };
124
TEST_F(WalManagerTest,ReadFirstRecordCache)125 TEST_F(WalManagerTest, ReadFirstRecordCache) {
126 Init();
127 std::string path = dbname_ + "/000001.log";
128 std::unique_ptr<FSWritableFile> file;
129 ASSERT_OK(env_->GetFileSystem()->NewWritableFile(path, FileOptions(), &file,
130 nullptr));
131
132 SequenceNumber s;
133 ASSERT_OK(wal_manager_->TEST_ReadFirstLine(path, 1 /* number */, &s));
134 ASSERT_EQ(s, 0U);
135
136 ASSERT_OK(
137 wal_manager_->TEST_ReadFirstRecord(kAliveLogFile, 1 /* number */, &s));
138 ASSERT_EQ(s, 0U);
139
140 std::unique_ptr<WritableFileWriter> file_writer(
141 new WritableFileWriter(std::move(file), path, FileOptions()));
142 log::Writer writer(std::move(file_writer), 1,
143 db_options_.recycle_log_file_num > 0);
144 WriteBatch batch;
145 ASSERT_OK(batch.Put("foo", "bar"));
146 WriteBatchInternal::SetSequence(&batch, 10);
147 ASSERT_OK(writer.AddRecord(WriteBatchInternal::Contents(&batch)));
148
149 // TODO(icanadi) move SpecialEnv outside of db_test, so we can reuse it here.
150 // Waiting for lei to finish with db_test
151 // env_->count_sequential_reads_ = true;
152 // sequential_read_counter_ sanity test
153 // ASSERT_EQ(env_->sequential_read_counter_.Read(), 0);
154
155 ASSERT_OK(wal_manager_->TEST_ReadFirstRecord(kAliveLogFile, 1, &s));
156 ASSERT_EQ(s, 10U);
157 // did a read
158 // TODO(icanadi) move SpecialEnv outside of db_test, so we can reuse it here
159 // ASSERT_EQ(env_->sequential_read_counter_.Read(), 1);
160
161 ASSERT_OK(wal_manager_->TEST_ReadFirstRecord(kAliveLogFile, 1, &s));
162 ASSERT_EQ(s, 10U);
163 // no new reads since the value is cached
164 // TODO(icanadi) move SpecialEnv outside of db_test, so we can reuse it here
165 // ASSERT_EQ(env_->sequential_read_counter_.Read(), 1);
166 }
167
168 namespace {
GetLogDirSize(std::string dir_path,Env * env)169 uint64_t GetLogDirSize(std::string dir_path, Env* env) {
170 uint64_t dir_size = 0;
171 std::vector<std::string> files;
172 EXPECT_OK(env->GetChildren(dir_path, &files));
173 for (auto& f : files) {
174 uint64_t number;
175 FileType type;
176 if (ParseFileName(f, &number, &type) && type == kWalFile) {
177 std::string const file_path = dir_path + "/" + f;
178 uint64_t file_size;
179 EXPECT_OK(env->GetFileSize(file_path, &file_size));
180 dir_size += file_size;
181 }
182 }
183 return dir_size;
184 }
ListSpecificFiles(Env * env,const std::string & path,const FileType expected_file_type)185 std::vector<std::uint64_t> ListSpecificFiles(
186 Env* env, const std::string& path, const FileType expected_file_type) {
187 std::vector<std::string> files;
188 std::vector<uint64_t> file_numbers;
189 uint64_t number;
190 FileType type;
191 EXPECT_OK(env->GetChildren(path, &files));
192 for (size_t i = 0; i < files.size(); ++i) {
193 if (ParseFileName(files[i], &number, &type)) {
194 if (type == expected_file_type) {
195 file_numbers.push_back(number);
196 }
197 }
198 }
199 return file_numbers;
200 }
201
CountRecords(TransactionLogIterator * iter)202 int CountRecords(TransactionLogIterator* iter) {
203 int count = 0;
204 SequenceNumber lastSequence = 0;
205 BatchResult res;
206 while (iter->Valid()) {
207 res = iter->GetBatch();
208 EXPECT_TRUE(res.sequence > lastSequence);
209 ++count;
210 lastSequence = res.sequence;
211 EXPECT_OK(iter->status());
212 iter->Next();
213 }
214 EXPECT_OK(iter->status());
215 return count;
216 }
217 } // namespace
218
TEST_F(WalManagerTest,WALArchivalSizeLimit)219 TEST_F(WalManagerTest, WALArchivalSizeLimit) {
220 db_options_.WAL_ttl_seconds = 0;
221 db_options_.WAL_size_limit_MB = 1000;
222 Init();
223
224 // TEST : Create WalManager with huge size limit and no ttl.
225 // Create some archived files and call PurgeObsoleteWALFiles().
226 // Count the archived log files that survived.
227 // Assert that all of them did.
228 // Change size limit. Re-open WalManager.
229 // Assert that archive is not greater than WAL_size_limit_MB after
230 // PurgeObsoleteWALFiles()
231 // Set ttl and time_to_check_ to small values. Re-open db.
232 // Assert that there are no archived logs left.
233
234 std::string archive_dir = ArchivalDirectory(dbname_);
235 CreateArchiveLogs(20, 5000);
236
237 std::vector<std::uint64_t> log_files =
238 ListSpecificFiles(env_.get(), archive_dir, kWalFile);
239 ASSERT_EQ(log_files.size(), 20U);
240
241 db_options_.WAL_size_limit_MB = 8;
242 Reopen();
243 wal_manager_->PurgeObsoleteWALFiles();
244
245 uint64_t archive_size = GetLogDirSize(archive_dir, env_.get());
246 ASSERT_TRUE(archive_size <= db_options_.WAL_size_limit_MB * 1024 * 1024);
247
248 db_options_.WAL_ttl_seconds = 1;
249 env_->FakeSleepForMicroseconds(2 * 1000 * 1000);
250 Reopen();
251 wal_manager_->PurgeObsoleteWALFiles();
252
253 log_files = ListSpecificFiles(env_.get(), archive_dir, kWalFile);
254 ASSERT_TRUE(log_files.empty());
255 }
256
TEST_F(WalManagerTest,WALArchivalTtl)257 TEST_F(WalManagerTest, WALArchivalTtl) {
258 db_options_.WAL_ttl_seconds = 1000;
259 Init();
260
261 // TEST : Create WalManager with a ttl and no size limit.
262 // Create some archived log files and call PurgeObsoleteWALFiles().
263 // Assert that files are not deleted
264 // Reopen db with small ttl.
265 // Assert that all archived logs was removed.
266
267 std::string archive_dir = ArchivalDirectory(dbname_);
268 CreateArchiveLogs(20, 5000);
269
270 std::vector<uint64_t> log_files =
271 ListSpecificFiles(env_.get(), archive_dir, kWalFile);
272 ASSERT_GT(log_files.size(), 0U);
273
274 db_options_.WAL_ttl_seconds = 1;
275 env_->FakeSleepForMicroseconds(3 * 1000 * 1000);
276 Reopen();
277 wal_manager_->PurgeObsoleteWALFiles();
278
279 log_files = ListSpecificFiles(env_.get(), archive_dir, kWalFile);
280 ASSERT_TRUE(log_files.empty());
281 }
282
TEST_F(WalManagerTest,TransactionLogIteratorMoveOverZeroFiles)283 TEST_F(WalManagerTest, TransactionLogIteratorMoveOverZeroFiles) {
284 Init();
285 RollTheLog(false);
286 Put("key1", std::string(1024, 'a'));
287 // Create a zero record WAL file.
288 RollTheLog(false);
289 RollTheLog(false);
290
291 Put("key2", std::string(1024, 'a'));
292
293 auto iter = OpenTransactionLogIter(0);
294 ASSERT_EQ(2, CountRecords(iter.get()));
295 }
296
TEST_F(WalManagerTest,TransactionLogIteratorJustEmptyFile)297 TEST_F(WalManagerTest, TransactionLogIteratorJustEmptyFile) {
298 Init();
299 RollTheLog(false);
300 auto iter = OpenTransactionLogIter(0);
301 // Check that an empty iterator is returned
302 ASSERT_TRUE(!iter->Valid());
303 }
304
TEST_F(WalManagerTest,TransactionLogIteratorNewFileWhileScanning)305 TEST_F(WalManagerTest, TransactionLogIteratorNewFileWhileScanning) {
306 Init();
307 CreateArchiveLogs(2, 100);
308 auto iter = OpenTransactionLogIter(0);
309 CreateArchiveLogs(1, 100);
310 int i = 0;
311 for (; iter->Valid(); iter->Next()) {
312 i++;
313 }
314 ASSERT_EQ(i, 200);
315 // A new log file was added after the iterator was created.
316 // TryAgain indicates a new iterator is needed to fetch the new data
317 ASSERT_TRUE(iter->status().IsTryAgain());
318
319 iter = OpenTransactionLogIter(0);
320 i = 0;
321 for (; iter->Valid(); iter->Next()) {
322 i++;
323 }
324 ASSERT_EQ(i, 300);
325 ASSERT_TRUE(iter->status().ok());
326 }
327
328 } // namespace ROCKSDB_NAMESPACE
329
main(int argc,char ** argv)330 int main(int argc, char** argv) {
331 ::testing::InitGoogleTest(&argc, argv);
332 return RUN_ALL_TESTS();
333 }
334
335 #else
336 #include <stdio.h>
337
main(int,char **)338 int main(int /*argc*/, char** /*argv*/) {
339 fprintf(stderr, "SKIPPED as WalManager is not supported in ROCKSDB_LITE\n");
340 return 0;
341 }
342
343 #endif // !ROCKSDB_LITE
344