1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "db/version_set.h"
11
12 #include "db/db_impl/db_impl.h"
13 #include "db/log_writer.h"
14 #include "rocksdb/file_system.h"
15 #include "table/block_based/block_based_table_factory.h"
16 #include "table/mock_table.h"
17 #include "test_util/testharness.h"
18 #include "test_util/testutil.h"
19 #include "util/string_util.h"
20
21 namespace ROCKSDB_NAMESPACE {
22
23 class GenerateLevelFilesBriefTest : public testing::Test {
24 public:
25 std::vector<FileMetaData*> files_;
26 LevelFilesBrief file_level_;
27 Arena arena_;
28
GenerateLevelFilesBriefTest()29 GenerateLevelFilesBriefTest() { }
30
~GenerateLevelFilesBriefTest()31 ~GenerateLevelFilesBriefTest() override {
32 for (size_t i = 0; i < files_.size(); i++) {
33 delete files_[i];
34 }
35 }
36
Add(const char * smallest,const char * largest,SequenceNumber smallest_seq=100,SequenceNumber largest_seq=100)37 void Add(const char* smallest, const char* largest,
38 SequenceNumber smallest_seq = 100,
39 SequenceNumber largest_seq = 100) {
40 FileMetaData* f = new FileMetaData(
41 files_.size() + 1, 0, 0,
42 InternalKey(smallest, smallest_seq, kTypeValue),
43 InternalKey(largest, largest_seq, kTypeValue), smallest_seq,
44 largest_seq, /* marked_for_compact */ false, kInvalidBlobFileNumber,
45 kUnknownOldestAncesterTime, kUnknownFileCreationTime,
46 kUnknownFileChecksum, kUnknownFileChecksumFuncName);
47 files_.push_back(f);
48 }
49
Compare()50 int Compare() {
51 int diff = 0;
52 for (size_t i = 0; i < files_.size(); i++) {
53 if (file_level_.files[i].fd.GetNumber() != files_[i]->fd.GetNumber()) {
54 diff++;
55 }
56 }
57 return diff;
58 }
59 };
60
TEST_F(GenerateLevelFilesBriefTest,Empty)61 TEST_F(GenerateLevelFilesBriefTest, Empty) {
62 DoGenerateLevelFilesBrief(&file_level_, files_, &arena_);
63 ASSERT_EQ(0u, file_level_.num_files);
64 ASSERT_EQ(0, Compare());
65 }
66
TEST_F(GenerateLevelFilesBriefTest,Single)67 TEST_F(GenerateLevelFilesBriefTest, Single) {
68 Add("p", "q");
69 DoGenerateLevelFilesBrief(&file_level_, files_, &arena_);
70 ASSERT_EQ(1u, file_level_.num_files);
71 ASSERT_EQ(0, Compare());
72 }
73
TEST_F(GenerateLevelFilesBriefTest,Multiple)74 TEST_F(GenerateLevelFilesBriefTest, Multiple) {
75 Add("150", "200");
76 Add("200", "250");
77 Add("300", "350");
78 Add("400", "450");
79 DoGenerateLevelFilesBrief(&file_level_, files_, &arena_);
80 ASSERT_EQ(4u, file_level_.num_files);
81 ASSERT_EQ(0, Compare());
82 }
83
84 class CountingLogger : public Logger {
85 public:
CountingLogger()86 CountingLogger() : log_count(0) {}
87 using Logger::Logv;
Logv(const char *,va_list)88 void Logv(const char* /*format*/, va_list /*ap*/) override { log_count++; }
89 int log_count;
90 };
91
GetOptionsWithNumLevels(int num_levels,std::shared_ptr<CountingLogger> logger)92 Options GetOptionsWithNumLevels(int num_levels,
93 std::shared_ptr<CountingLogger> logger) {
94 Options opt;
95 opt.num_levels = num_levels;
96 opt.info_log = logger;
97 return opt;
98 }
99
100 class VersionStorageInfoTestBase : public testing::Test {
101 public:
102 const Comparator* ucmp_;
103 InternalKeyComparator icmp_;
104 std::shared_ptr<CountingLogger> logger_;
105 Options options_;
106 ImmutableOptions ioptions_;
107 MutableCFOptions mutable_cf_options_;
108 VersionStorageInfo vstorage_;
109
GetInternalKey(const char * ukey,SequenceNumber smallest_seq=100)110 InternalKey GetInternalKey(const char* ukey,
111 SequenceNumber smallest_seq = 100) {
112 return InternalKey(ukey, smallest_seq, kTypeValue);
113 }
114
VersionStorageInfoTestBase(const Comparator * ucmp)115 explicit VersionStorageInfoTestBase(const Comparator* ucmp)
116 : ucmp_(ucmp),
117 icmp_(ucmp_),
118 logger_(new CountingLogger()),
119 options_(GetOptionsWithNumLevels(6, logger_)),
120 ioptions_(options_),
121 mutable_cf_options_(options_),
122 vstorage_(&icmp_, ucmp_, 6, kCompactionStyleLevel,
123 /*src_vstorage=*/nullptr,
124 /*_force_consistency_checks=*/false) {}
125
~VersionStorageInfoTestBase()126 ~VersionStorageInfoTestBase() override {
127 for (int i = 0; i < vstorage_.num_levels(); ++i) {
128 for (auto* f : vstorage_.LevelFiles(i)) {
129 if (--f->refs == 0) {
130 delete f;
131 }
132 }
133 }
134 }
135
Add(int level,uint32_t file_number,const char * smallest,const char * largest,uint64_t file_size=0)136 void Add(int level, uint32_t file_number, const char* smallest,
137 const char* largest, uint64_t file_size = 0) {
138 assert(level < vstorage_.num_levels());
139 FileMetaData* f = new FileMetaData(
140 file_number, 0, file_size, GetInternalKey(smallest, 0),
141 GetInternalKey(largest, 0), /* smallest_seq */ 0, /* largest_seq */ 0,
142 /* marked_for_compact */ false, kInvalidBlobFileNumber,
143 kUnknownOldestAncesterTime, kUnknownFileCreationTime,
144 kUnknownFileChecksum, kUnknownFileChecksumFuncName);
145 f->compensated_file_size = file_size;
146 vstorage_.AddFile(level, f);
147 }
148
Add(int level,uint32_t file_number,const InternalKey & smallest,const InternalKey & largest,uint64_t file_size=0)149 void Add(int level, uint32_t file_number, const InternalKey& smallest,
150 const InternalKey& largest, uint64_t file_size = 0) {
151 assert(level < vstorage_.num_levels());
152 FileMetaData* f = new FileMetaData(
153 file_number, 0, file_size, smallest, largest, /* smallest_seq */ 0,
154 /* largest_seq */ 0, /* marked_for_compact */ false,
155 kInvalidBlobFileNumber, kUnknownOldestAncesterTime,
156 kUnknownFileCreationTime, kUnknownFileChecksum,
157 kUnknownFileChecksumFuncName);
158 f->compensated_file_size = file_size;
159 vstorage_.AddFile(level, f);
160 }
161
GetOverlappingFiles(int level,const InternalKey & begin,const InternalKey & end)162 std::string GetOverlappingFiles(int level, const InternalKey& begin,
163 const InternalKey& end) {
164 std::vector<FileMetaData*> inputs;
165 vstorage_.GetOverlappingInputs(level, &begin, &end, &inputs);
166
167 std::string result;
168 for (size_t i = 0; i < inputs.size(); ++i) {
169 if (i > 0) {
170 result += ",";
171 }
172 AppendNumberTo(&result, inputs[i]->fd.GetNumber());
173 }
174 return result;
175 }
176 };
177
178 class VersionStorageInfoTest : public VersionStorageInfoTestBase {
179 public:
VersionStorageInfoTest()180 VersionStorageInfoTest() : VersionStorageInfoTestBase(BytewiseComparator()) {}
181
~VersionStorageInfoTest()182 ~VersionStorageInfoTest() override {}
183 };
184
TEST_F(VersionStorageInfoTest,MaxBytesForLevelStatic)185 TEST_F(VersionStorageInfoTest, MaxBytesForLevelStatic) {
186 ioptions_.level_compaction_dynamic_level_bytes = false;
187 mutable_cf_options_.max_bytes_for_level_base = 10;
188 mutable_cf_options_.max_bytes_for_level_multiplier = 5;
189 Add(4, 100U, "1", "2");
190 Add(5, 101U, "1", "2");
191
192 vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
193 ASSERT_EQ(vstorage_.MaxBytesForLevel(1), 10U);
194 ASSERT_EQ(vstorage_.MaxBytesForLevel(2), 50U);
195 ASSERT_EQ(vstorage_.MaxBytesForLevel(3), 250U);
196 ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 1250U);
197
198 ASSERT_EQ(0, logger_->log_count);
199 }
200
TEST_F(VersionStorageInfoTest,MaxBytesForLevelDynamic)201 TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamic) {
202 ioptions_.level_compaction_dynamic_level_bytes = true;
203 mutable_cf_options_.max_bytes_for_level_base = 1000;
204 mutable_cf_options_.max_bytes_for_level_multiplier = 5;
205 Add(5, 1U, "1", "2", 500U);
206
207 vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
208 ASSERT_EQ(0, logger_->log_count);
209 ASSERT_EQ(vstorage_.base_level(), 5);
210
211 Add(5, 2U, "3", "4", 550U);
212 vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
213 ASSERT_EQ(0, logger_->log_count);
214 ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 1000U);
215 ASSERT_EQ(vstorage_.base_level(), 4);
216
217 Add(4, 3U, "3", "4", 550U);
218 vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
219 ASSERT_EQ(0, logger_->log_count);
220 ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 1000U);
221 ASSERT_EQ(vstorage_.base_level(), 4);
222
223 Add(3, 4U, "3", "4", 250U);
224 Add(3, 5U, "5", "7", 300U);
225 vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
226 ASSERT_EQ(1, logger_->log_count);
227 ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 1005U);
228 ASSERT_EQ(vstorage_.MaxBytesForLevel(3), 1000U);
229 ASSERT_EQ(vstorage_.base_level(), 3);
230
231 Add(1, 6U, "3", "4", 5U);
232 Add(1, 7U, "8", "9", 5U);
233 logger_->log_count = 0;
234 vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
235 ASSERT_EQ(1, logger_->log_count);
236 ASSERT_GT(vstorage_.MaxBytesForLevel(4), 1005U);
237 ASSERT_GT(vstorage_.MaxBytesForLevel(3), 1005U);
238 ASSERT_EQ(vstorage_.MaxBytesForLevel(2), 1005U);
239 ASSERT_EQ(vstorage_.MaxBytesForLevel(1), 1000U);
240 ASSERT_EQ(vstorage_.base_level(), 1);
241 }
242
TEST_F(VersionStorageInfoTest,MaxBytesForLevelDynamicLotsOfData)243 TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicLotsOfData) {
244 ioptions_.level_compaction_dynamic_level_bytes = true;
245 mutable_cf_options_.max_bytes_for_level_base = 100;
246 mutable_cf_options_.max_bytes_for_level_multiplier = 2;
247 Add(0, 1U, "1", "2", 50U);
248 Add(1, 2U, "1", "2", 50U);
249 Add(2, 3U, "1", "2", 500U);
250 Add(3, 4U, "1", "2", 500U);
251 Add(4, 5U, "1", "2", 1700U);
252 Add(5, 6U, "1", "2", 500U);
253
254 vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
255 ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 800U);
256 ASSERT_EQ(vstorage_.MaxBytesForLevel(3), 400U);
257 ASSERT_EQ(vstorage_.MaxBytesForLevel(2), 200U);
258 ASSERT_EQ(vstorage_.MaxBytesForLevel(1), 100U);
259 ASSERT_EQ(vstorage_.base_level(), 1);
260 ASSERT_EQ(0, logger_->log_count);
261 }
262
TEST_F(VersionStorageInfoTest,MaxBytesForLevelDynamicLargeLevel)263 TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicLargeLevel) {
264 uint64_t kOneGB = 1000U * 1000U * 1000U;
265 ioptions_.level_compaction_dynamic_level_bytes = true;
266 mutable_cf_options_.max_bytes_for_level_base = 10U * kOneGB;
267 mutable_cf_options_.max_bytes_for_level_multiplier = 10;
268 Add(0, 1U, "1", "2", 50U);
269 Add(3, 4U, "1", "2", 32U * kOneGB);
270 Add(4, 5U, "1", "2", 500U * kOneGB);
271 Add(5, 6U, "1", "2", 3000U * kOneGB);
272
273 vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
274 ASSERT_EQ(vstorage_.MaxBytesForLevel(5), 3000U * kOneGB);
275 ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 300U * kOneGB);
276 ASSERT_EQ(vstorage_.MaxBytesForLevel(3), 30U * kOneGB);
277 ASSERT_EQ(vstorage_.MaxBytesForLevel(2), 10U * kOneGB);
278 ASSERT_EQ(vstorage_.base_level(), 2);
279 ASSERT_EQ(0, logger_->log_count);
280 }
281
TEST_F(VersionStorageInfoTest,MaxBytesForLevelDynamicWithLargeL0_1)282 TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicWithLargeL0_1) {
283 ioptions_.level_compaction_dynamic_level_bytes = true;
284 mutable_cf_options_.max_bytes_for_level_base = 40000;
285 mutable_cf_options_.max_bytes_for_level_multiplier = 5;
286 mutable_cf_options_.level0_file_num_compaction_trigger = 2;
287
288 Add(0, 1U, "1", "2", 10000U);
289 Add(0, 2U, "1", "2", 10000U);
290 Add(0, 3U, "1", "2", 10000U);
291
292 Add(5, 4U, "1", "2", 1286250U);
293 Add(4, 5U, "1", "2", 200000U);
294 Add(3, 6U, "1", "2", 40000U);
295 Add(2, 7U, "1", "2", 8000U);
296
297 vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
298 ASSERT_EQ(0, logger_->log_count);
299 ASSERT_EQ(2, vstorage_.base_level());
300 // level multiplier should be 3.5
301 ASSERT_EQ(vstorage_.level_multiplier(), 5.0);
302 // Level size should be around 30,000, 105,000, 367,500
303 ASSERT_EQ(40000U, vstorage_.MaxBytesForLevel(2));
304 ASSERT_EQ(51450U, vstorage_.MaxBytesForLevel(3));
305 ASSERT_EQ(257250U, vstorage_.MaxBytesForLevel(4));
306 }
307
TEST_F(VersionStorageInfoTest,MaxBytesForLevelDynamicWithLargeL0_2)308 TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicWithLargeL0_2) {
309 ioptions_.level_compaction_dynamic_level_bytes = true;
310 mutable_cf_options_.max_bytes_for_level_base = 10000;
311 mutable_cf_options_.max_bytes_for_level_multiplier = 5;
312 mutable_cf_options_.level0_file_num_compaction_trigger = 2;
313
314 Add(0, 11U, "1", "2", 10000U);
315 Add(0, 12U, "1", "2", 10000U);
316 Add(0, 13U, "1", "2", 10000U);
317
318 Add(5, 4U, "1", "2", 1286250U);
319 Add(4, 5U, "1", "2", 200000U);
320 Add(3, 6U, "1", "2", 40000U);
321 Add(2, 7U, "1", "2", 8000U);
322
323 vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
324 ASSERT_EQ(0, logger_->log_count);
325 ASSERT_EQ(2, vstorage_.base_level());
326 // level multiplier should be 3.5
327 ASSERT_LT(vstorage_.level_multiplier(), 3.6);
328 ASSERT_GT(vstorage_.level_multiplier(), 3.4);
329 // Level size should be around 30,000, 105,000, 367,500
330 ASSERT_EQ(30000U, vstorage_.MaxBytesForLevel(2));
331 ASSERT_LT(vstorage_.MaxBytesForLevel(3), 110000U);
332 ASSERT_GT(vstorage_.MaxBytesForLevel(3), 100000U);
333 ASSERT_LT(vstorage_.MaxBytesForLevel(4), 370000U);
334 ASSERT_GT(vstorage_.MaxBytesForLevel(4), 360000U);
335 }
336
TEST_F(VersionStorageInfoTest,MaxBytesForLevelDynamicWithLargeL0_3)337 TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicWithLargeL0_3) {
338 ioptions_.level_compaction_dynamic_level_bytes = true;
339 mutable_cf_options_.max_bytes_for_level_base = 10000;
340 mutable_cf_options_.max_bytes_for_level_multiplier = 5;
341 mutable_cf_options_.level0_file_num_compaction_trigger = 2;
342
343 Add(0, 11U, "1", "2", 5000U);
344 Add(0, 12U, "1", "2", 5000U);
345 Add(0, 13U, "1", "2", 5000U);
346 Add(0, 14U, "1", "2", 5000U);
347 Add(0, 15U, "1", "2", 5000U);
348 Add(0, 16U, "1", "2", 5000U);
349
350 Add(5, 4U, "1", "2", 1286250U);
351 Add(4, 5U, "1", "2", 200000U);
352 Add(3, 6U, "1", "2", 40000U);
353 Add(2, 7U, "1", "2", 8000U);
354
355 vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
356 ASSERT_EQ(0, logger_->log_count);
357 ASSERT_EQ(2, vstorage_.base_level());
358 // level multiplier should be 3.5
359 ASSERT_LT(vstorage_.level_multiplier(), 3.6);
360 ASSERT_GT(vstorage_.level_multiplier(), 3.4);
361 // Level size should be around 30,000, 105,000, 367,500
362 ASSERT_EQ(30000U, vstorage_.MaxBytesForLevel(2));
363 ASSERT_LT(vstorage_.MaxBytesForLevel(3), 110000U);
364 ASSERT_GT(vstorage_.MaxBytesForLevel(3), 100000U);
365 ASSERT_LT(vstorage_.MaxBytesForLevel(4), 370000U);
366 ASSERT_GT(vstorage_.MaxBytesForLevel(4), 360000U);
367 }
368
TEST_F(VersionStorageInfoTest,EstimateLiveDataSize)369 TEST_F(VersionStorageInfoTest, EstimateLiveDataSize) {
370 // Test whether the overlaps are detected as expected
371 Add(1, 1U, "4", "7", 1U); // Perfect overlap with last level
372 Add(2, 2U, "3", "5", 1U); // Partial overlap with last level
373 Add(2, 3U, "6", "8", 1U); // Partial overlap with last level
374 Add(3, 4U, "1", "9", 1U); // Contains range of last level
375 Add(4, 5U, "4", "5", 1U); // Inside range of last level
376 Add(4, 6U, "6", "7", 1U); // Inside range of last level
377 Add(5, 7U, "4", "7", 10U);
378 ASSERT_EQ(10U, vstorage_.EstimateLiveDataSize());
379 }
380
TEST_F(VersionStorageInfoTest,EstimateLiveDataSize2)381 TEST_F(VersionStorageInfoTest, EstimateLiveDataSize2) {
382 Add(0, 1U, "9", "9", 1U); // Level 0 is not ordered
383 Add(0, 2U, "5", "6", 1U); // Ignored because of [5,6] in l1
384 Add(1, 3U, "1", "2", 1U); // Ignored because of [2,3] in l2
385 Add(1, 4U, "3", "4", 1U); // Ignored because of [2,3] in l2
386 Add(1, 5U, "5", "6", 1U);
387 Add(2, 6U, "2", "3", 1U);
388 Add(3, 7U, "7", "8", 1U);
389 ASSERT_EQ(4U, vstorage_.EstimateLiveDataSize());
390 }
391
TEST_F(VersionStorageInfoTest,GetOverlappingInputs)392 TEST_F(VersionStorageInfoTest, GetOverlappingInputs) {
393 // Two files that overlap at the range deletion tombstone sentinel.
394 Add(1, 1U, {"a", 0, kTypeValue}, {"b", kMaxSequenceNumber, kTypeRangeDeletion}, 1);
395 Add(1, 2U, {"b", 0, kTypeValue}, {"c", 0, kTypeValue}, 1);
396 // Two files that overlap at the same user key.
397 Add(1, 3U, {"d", 0, kTypeValue}, {"e", kMaxSequenceNumber, kTypeValue}, 1);
398 Add(1, 4U, {"e", 0, kTypeValue}, {"f", 0, kTypeValue}, 1);
399 // Two files that do not overlap.
400 Add(1, 5U, {"g", 0, kTypeValue}, {"h", 0, kTypeValue}, 1);
401 Add(1, 6U, {"i", 0, kTypeValue}, {"j", 0, kTypeValue}, 1);
402 vstorage_.UpdateNumNonEmptyLevels();
403 vstorage_.GenerateLevelFilesBrief();
404
405 ASSERT_EQ("1,2", GetOverlappingFiles(
406 1, {"a", 0, kTypeValue}, {"b", 0, kTypeValue}));
407 ASSERT_EQ("1", GetOverlappingFiles(
408 1, {"a", 0, kTypeValue}, {"b", kMaxSequenceNumber, kTypeRangeDeletion}));
409 ASSERT_EQ("2", GetOverlappingFiles(
410 1, {"b", kMaxSequenceNumber, kTypeValue}, {"c", 0, kTypeValue}));
411 ASSERT_EQ("3,4", GetOverlappingFiles(
412 1, {"d", 0, kTypeValue}, {"e", 0, kTypeValue}));
413 ASSERT_EQ("3", GetOverlappingFiles(
414 1, {"d", 0, kTypeValue}, {"e", kMaxSequenceNumber, kTypeRangeDeletion}));
415 ASSERT_EQ("3,4", GetOverlappingFiles(
416 1, {"e", kMaxSequenceNumber, kTypeValue}, {"f", 0, kTypeValue}));
417 ASSERT_EQ("3,4", GetOverlappingFiles(
418 1, {"e", 0, kTypeValue}, {"f", 0, kTypeValue}));
419 ASSERT_EQ("5", GetOverlappingFiles(
420 1, {"g", 0, kTypeValue}, {"h", 0, kTypeValue}));
421 ASSERT_EQ("6", GetOverlappingFiles(
422 1, {"i", 0, kTypeValue}, {"j", 0, kTypeValue}));
423 }
424
TEST_F(VersionStorageInfoTest,FileLocationAndMetaDataByNumber)425 TEST_F(VersionStorageInfoTest, FileLocationAndMetaDataByNumber) {
426 Add(0, 11U, "1", "2", 5000U);
427 Add(0, 12U, "1", "2", 5000U);
428
429 Add(2, 7U, "1", "2", 8000U);
430
431 ASSERT_EQ(vstorage_.GetFileLocation(11U),
432 VersionStorageInfo::FileLocation(0, 0));
433 ASSERT_NE(vstorage_.GetFileMetaDataByNumber(11U), nullptr);
434
435 ASSERT_EQ(vstorage_.GetFileLocation(12U),
436 VersionStorageInfo::FileLocation(0, 1));
437 ASSERT_NE(vstorage_.GetFileMetaDataByNumber(12U), nullptr);
438
439 ASSERT_EQ(vstorage_.GetFileLocation(7U),
440 VersionStorageInfo::FileLocation(2, 0));
441 ASSERT_NE(vstorage_.GetFileMetaDataByNumber(7U), nullptr);
442
443 ASSERT_FALSE(vstorage_.GetFileLocation(999U).IsValid());
444 ASSERT_EQ(vstorage_.GetFileMetaDataByNumber(999U), nullptr);
445 }
446
447 class VersionStorageInfoTimestampTest : public VersionStorageInfoTestBase {
448 public:
VersionStorageInfoTimestampTest()449 VersionStorageInfoTimestampTest()
450 : VersionStorageInfoTestBase(test::ComparatorWithU64Ts()) {}
~VersionStorageInfoTimestampTest()451 ~VersionStorageInfoTimestampTest() override {}
Timestamp(uint64_t ts) const452 std::string Timestamp(uint64_t ts) const {
453 std::string ret;
454 PutFixed64(&ret, ts);
455 return ret;
456 }
PackUserKeyAndTimestamp(const Slice & ukey,uint64_t ts) const457 std::string PackUserKeyAndTimestamp(const Slice& ukey, uint64_t ts) const {
458 std::string ret;
459 ret.assign(ukey.data(), ukey.size());
460 PutFixed64(&ret, ts);
461 return ret;
462 }
463 };
464
TEST_F(VersionStorageInfoTimestampTest,GetOverlappingInputs)465 TEST_F(VersionStorageInfoTimestampTest, GetOverlappingInputs) {
466 Add(/*level=*/1, /*file_number=*/1, /*smallest=*/
467 {PackUserKeyAndTimestamp("a", /*ts=*/9), /*s=*/0, kTypeValue},
468 /*largest=*/
469 {PackUserKeyAndTimestamp("a", /*ts=*/8), /*s=*/0, kTypeValue},
470 /*file_size=*/100);
471 Add(/*level=*/1, /*file_number=*/2, /*smallest=*/
472 {PackUserKeyAndTimestamp("a", /*ts=*/5), /*s=*/0, kTypeValue},
473 /*largest=*/
474 {PackUserKeyAndTimestamp("b", /*ts=*/10), /*s=*/0, kTypeValue},
475 /*file_size=*/100);
476 Add(/*level=*/1, /*file_number=*/3, /*smallest=*/
477 {PackUserKeyAndTimestamp("c", /*ts=*/12), /*s=*/0, kTypeValue},
478 /*largest=*/
479 {PackUserKeyAndTimestamp("d", /*ts=*/1), /*s=*/0, kTypeValue},
480 /*file_size=*/100);
481 vstorage_.UpdateNumNonEmptyLevels();
482 vstorage_.GenerateLevelFilesBrief();
483 ASSERT_EQ(
484 "1,2",
485 GetOverlappingFiles(
486 /*level=*/1,
487 {PackUserKeyAndTimestamp("a", /*ts=*/12), /*s=*/0, kTypeValue},
488 {PackUserKeyAndTimestamp("a", /*ts=*/11), /*s=*/0, kTypeValue}));
489 ASSERT_EQ("3",
490 GetOverlappingFiles(
491 /*level=*/1,
492 {PackUserKeyAndTimestamp("c", /*ts=*/15), /*s=*/0, kTypeValue},
493 {PackUserKeyAndTimestamp("c", /*ts=*/2), /*s=*/0, kTypeValue}));
494 }
495
496 class FindLevelFileTest : public testing::Test {
497 public:
498 LevelFilesBrief file_level_;
499 bool disjoint_sorted_files_;
500 Arena arena_;
501
FindLevelFileTest()502 FindLevelFileTest() : disjoint_sorted_files_(true) { }
503
~FindLevelFileTest()504 ~FindLevelFileTest() override {}
505
LevelFileInit(size_t num=0)506 void LevelFileInit(size_t num = 0) {
507 char* mem = arena_.AllocateAligned(num * sizeof(FdWithKeyRange));
508 file_level_.files = new (mem)FdWithKeyRange[num];
509 file_level_.num_files = 0;
510 }
511
Add(const char * smallest,const char * largest,SequenceNumber smallest_seq=100,SequenceNumber largest_seq=100)512 void Add(const char* smallest, const char* largest,
513 SequenceNumber smallest_seq = 100,
514 SequenceNumber largest_seq = 100) {
515 InternalKey smallest_key = InternalKey(smallest, smallest_seq, kTypeValue);
516 InternalKey largest_key = InternalKey(largest, largest_seq, kTypeValue);
517
518 Slice smallest_slice = smallest_key.Encode();
519 Slice largest_slice = largest_key.Encode();
520
521 char* mem = arena_.AllocateAligned(
522 smallest_slice.size() + largest_slice.size());
523 memcpy(mem, smallest_slice.data(), smallest_slice.size());
524 memcpy(mem + smallest_slice.size(), largest_slice.data(),
525 largest_slice.size());
526
527 // add to file_level_
528 size_t num = file_level_.num_files;
529 auto& file = file_level_.files[num];
530 file.fd = FileDescriptor(num + 1, 0, 0);
531 file.smallest_key = Slice(mem, smallest_slice.size());
532 file.largest_key = Slice(mem + smallest_slice.size(),
533 largest_slice.size());
534 file_level_.num_files++;
535 }
536
Find(const char * key)537 int Find(const char* key) {
538 InternalKey target(key, 100, kTypeValue);
539 InternalKeyComparator cmp(BytewiseComparator());
540 return FindFile(cmp, file_level_, target.Encode());
541 }
542
Overlaps(const char * smallest,const char * largest)543 bool Overlaps(const char* smallest, const char* largest) {
544 InternalKeyComparator cmp(BytewiseComparator());
545 Slice s(smallest != nullptr ? smallest : "");
546 Slice l(largest != nullptr ? largest : "");
547 return SomeFileOverlapsRange(cmp, disjoint_sorted_files_, file_level_,
548 (smallest != nullptr ? &s : nullptr),
549 (largest != nullptr ? &l : nullptr));
550 }
551 };
552
TEST_F(FindLevelFileTest,LevelEmpty)553 TEST_F(FindLevelFileTest, LevelEmpty) {
554 LevelFileInit(0);
555
556 ASSERT_EQ(0, Find("foo"));
557 ASSERT_TRUE(! Overlaps("a", "z"));
558 ASSERT_TRUE(! Overlaps(nullptr, "z"));
559 ASSERT_TRUE(! Overlaps("a", nullptr));
560 ASSERT_TRUE(! Overlaps(nullptr, nullptr));
561 }
562
TEST_F(FindLevelFileTest,LevelSingle)563 TEST_F(FindLevelFileTest, LevelSingle) {
564 LevelFileInit(1);
565
566 Add("p", "q");
567 ASSERT_EQ(0, Find("a"));
568 ASSERT_EQ(0, Find("p"));
569 ASSERT_EQ(0, Find("p1"));
570 ASSERT_EQ(0, Find("q"));
571 ASSERT_EQ(1, Find("q1"));
572 ASSERT_EQ(1, Find("z"));
573
574 ASSERT_TRUE(! Overlaps("a", "b"));
575 ASSERT_TRUE(! Overlaps("z1", "z2"));
576 ASSERT_TRUE(Overlaps("a", "p"));
577 ASSERT_TRUE(Overlaps("a", "q"));
578 ASSERT_TRUE(Overlaps("a", "z"));
579 ASSERT_TRUE(Overlaps("p", "p1"));
580 ASSERT_TRUE(Overlaps("p", "q"));
581 ASSERT_TRUE(Overlaps("p", "z"));
582 ASSERT_TRUE(Overlaps("p1", "p2"));
583 ASSERT_TRUE(Overlaps("p1", "z"));
584 ASSERT_TRUE(Overlaps("q", "q"));
585 ASSERT_TRUE(Overlaps("q", "q1"));
586
587 ASSERT_TRUE(! Overlaps(nullptr, "j"));
588 ASSERT_TRUE(! Overlaps("r", nullptr));
589 ASSERT_TRUE(Overlaps(nullptr, "p"));
590 ASSERT_TRUE(Overlaps(nullptr, "p1"));
591 ASSERT_TRUE(Overlaps("q", nullptr));
592 ASSERT_TRUE(Overlaps(nullptr, nullptr));
593 }
594
TEST_F(FindLevelFileTest,LevelMultiple)595 TEST_F(FindLevelFileTest, LevelMultiple) {
596 LevelFileInit(4);
597
598 Add("150", "200");
599 Add("200", "250");
600 Add("300", "350");
601 Add("400", "450");
602 ASSERT_EQ(0, Find("100"));
603 ASSERT_EQ(0, Find("150"));
604 ASSERT_EQ(0, Find("151"));
605 ASSERT_EQ(0, Find("199"));
606 ASSERT_EQ(0, Find("200"));
607 ASSERT_EQ(1, Find("201"));
608 ASSERT_EQ(1, Find("249"));
609 ASSERT_EQ(1, Find("250"));
610 ASSERT_EQ(2, Find("251"));
611 ASSERT_EQ(2, Find("299"));
612 ASSERT_EQ(2, Find("300"));
613 ASSERT_EQ(2, Find("349"));
614 ASSERT_EQ(2, Find("350"));
615 ASSERT_EQ(3, Find("351"));
616 ASSERT_EQ(3, Find("400"));
617 ASSERT_EQ(3, Find("450"));
618 ASSERT_EQ(4, Find("451"));
619
620 ASSERT_TRUE(! Overlaps("100", "149"));
621 ASSERT_TRUE(! Overlaps("251", "299"));
622 ASSERT_TRUE(! Overlaps("451", "500"));
623 ASSERT_TRUE(! Overlaps("351", "399"));
624
625 ASSERT_TRUE(Overlaps("100", "150"));
626 ASSERT_TRUE(Overlaps("100", "200"));
627 ASSERT_TRUE(Overlaps("100", "300"));
628 ASSERT_TRUE(Overlaps("100", "400"));
629 ASSERT_TRUE(Overlaps("100", "500"));
630 ASSERT_TRUE(Overlaps("375", "400"));
631 ASSERT_TRUE(Overlaps("450", "450"));
632 ASSERT_TRUE(Overlaps("450", "500"));
633 }
634
TEST_F(FindLevelFileTest,LevelMultipleNullBoundaries)635 TEST_F(FindLevelFileTest, LevelMultipleNullBoundaries) {
636 LevelFileInit(4);
637
638 Add("150", "200");
639 Add("200", "250");
640 Add("300", "350");
641 Add("400", "450");
642 ASSERT_TRUE(! Overlaps(nullptr, "149"));
643 ASSERT_TRUE(! Overlaps("451", nullptr));
644 ASSERT_TRUE(Overlaps(nullptr, nullptr));
645 ASSERT_TRUE(Overlaps(nullptr, "150"));
646 ASSERT_TRUE(Overlaps(nullptr, "199"));
647 ASSERT_TRUE(Overlaps(nullptr, "200"));
648 ASSERT_TRUE(Overlaps(nullptr, "201"));
649 ASSERT_TRUE(Overlaps(nullptr, "400"));
650 ASSERT_TRUE(Overlaps(nullptr, "800"));
651 ASSERT_TRUE(Overlaps("100", nullptr));
652 ASSERT_TRUE(Overlaps("200", nullptr));
653 ASSERT_TRUE(Overlaps("449", nullptr));
654 ASSERT_TRUE(Overlaps("450", nullptr));
655 }
656
TEST_F(FindLevelFileTest,LevelOverlapSequenceChecks)657 TEST_F(FindLevelFileTest, LevelOverlapSequenceChecks) {
658 LevelFileInit(1);
659
660 Add("200", "200", 5000, 3000);
661 ASSERT_TRUE(! Overlaps("199", "199"));
662 ASSERT_TRUE(! Overlaps("201", "300"));
663 ASSERT_TRUE(Overlaps("200", "200"));
664 ASSERT_TRUE(Overlaps("190", "200"));
665 ASSERT_TRUE(Overlaps("200", "210"));
666 }
667
TEST_F(FindLevelFileTest,LevelOverlappingFiles)668 TEST_F(FindLevelFileTest, LevelOverlappingFiles) {
669 LevelFileInit(2);
670
671 Add("150", "600");
672 Add("400", "500");
673 disjoint_sorted_files_ = false;
674 ASSERT_TRUE(! Overlaps("100", "149"));
675 ASSERT_TRUE(! Overlaps("601", "700"));
676 ASSERT_TRUE(Overlaps("100", "150"));
677 ASSERT_TRUE(Overlaps("100", "200"));
678 ASSERT_TRUE(Overlaps("100", "300"));
679 ASSERT_TRUE(Overlaps("100", "400"));
680 ASSERT_TRUE(Overlaps("100", "500"));
681 ASSERT_TRUE(Overlaps("375", "400"));
682 ASSERT_TRUE(Overlaps("450", "450"));
683 ASSERT_TRUE(Overlaps("450", "500"));
684 ASSERT_TRUE(Overlaps("450", "700"));
685 ASSERT_TRUE(Overlaps("600", "700"));
686 }
687
688 class VersionSetTestBase {
689 public:
690 const static std::string kColumnFamilyName1;
691 const static std::string kColumnFamilyName2;
692 const static std::string kColumnFamilyName3;
693 int num_initial_edits_;
694
VersionSetTestBase(const std::string & name)695 explicit VersionSetTestBase(const std::string& name)
696 : env_(nullptr),
697 dbname_(test::PerThreadDBPath(name)),
698 options_(),
699 db_options_(options_),
700 cf_options_(options_),
701 immutable_cf_options_(db_options_, cf_options_),
702 mutable_cf_options_(cf_options_),
703 table_cache_(NewLRUCache(50000, 16)),
704 write_buffer_manager_(db_options_.db_write_buffer_size),
705 shutting_down_(false),
706 mock_table_factory_(std::make_shared<mock::MockTableFactory>()) {
707 const char* test_env_uri = getenv("TEST_ENV_URI");
708 if (test_env_uri) {
709 Status s = Env::LoadEnv(test_env_uri, &env_, &env_guard_);
710 EXPECT_OK(s);
711 } else if (getenv("MEM_ENV")) {
712 env_guard_.reset(NewMemEnv(Env::Default()));
713 env_ = env_guard_.get();
714 } else {
715 env_ = Env::Default();
716 }
717 EXPECT_NE(nullptr, env_);
718
719 fs_ = env_->GetFileSystem();
720 EXPECT_OK(fs_->CreateDirIfMissing(dbname_, IOOptions(), nullptr));
721
722 options_.env = env_;
723 db_options_.env = env_;
724 db_options_.fs = fs_;
725 immutable_cf_options_.env = env_;
726 immutable_cf_options_.fs = fs_;
727 immutable_cf_options_.clock = env_->GetSystemClock().get();
728
729 versions_.reset(
730 new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
731 &write_buffer_manager_, &write_controller_,
732 /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
733 reactive_versions_ = std::make_shared<ReactiveVersionSet>(
734 dbname_, &db_options_, env_options_, table_cache_.get(),
735 &write_buffer_manager_, &write_controller_, nullptr);
736 db_options_.db_paths.emplace_back(dbname_,
737 std::numeric_limits<uint64_t>::max());
738 }
739
~VersionSetTestBase()740 virtual ~VersionSetTestBase() {
741 if (getenv("KEEP_DB")) {
742 fprintf(stdout, "DB is still at %s\n", dbname_.c_str());
743 } else {
744 Options options;
745 options.env = env_;
746 EXPECT_OK(DestroyDB(dbname_, options));
747 }
748 }
749
750 protected:
PrepareManifest(std::vector<ColumnFamilyDescriptor> * column_families,SequenceNumber * last_seqno,std::unique_ptr<log::Writer> * log_writer)751 virtual void PrepareManifest(
752 std::vector<ColumnFamilyDescriptor>* column_families,
753 SequenceNumber* last_seqno, std::unique_ptr<log::Writer>* log_writer) {
754 assert(column_families != nullptr);
755 assert(last_seqno != nullptr);
756 assert(log_writer != nullptr);
757 VersionEdit new_db;
758 if (db_options_.write_dbid_to_manifest) {
759 DBOptions tmp_db_options;
760 tmp_db_options.env = env_;
761 std::unique_ptr<DBImpl> impl(new DBImpl(tmp_db_options, dbname_));
762 std::string db_id;
763 impl->GetDbIdentityFromIdentityFile(&db_id);
764 new_db.SetDBId(db_id);
765 }
766 new_db.SetLogNumber(0);
767 new_db.SetNextFile(2);
768 new_db.SetLastSequence(0);
769
770 const std::vector<std::string> cf_names = {
771 kDefaultColumnFamilyName, kColumnFamilyName1, kColumnFamilyName2,
772 kColumnFamilyName3};
773 const int kInitialNumOfCfs = static_cast<int>(cf_names.size());
774 autovector<VersionEdit> new_cfs;
775 uint64_t last_seq = 1;
776 uint32_t cf_id = 1;
777 for (int i = 1; i != kInitialNumOfCfs; ++i) {
778 VersionEdit new_cf;
779 new_cf.AddColumnFamily(cf_names[i]);
780 new_cf.SetColumnFamily(cf_id++);
781 new_cf.SetLogNumber(0);
782 new_cf.SetNextFile(2);
783 new_cf.SetLastSequence(last_seq++);
784 new_cfs.emplace_back(new_cf);
785 }
786 *last_seqno = last_seq;
787 num_initial_edits_ = static_cast<int>(new_cfs.size() + 1);
788 std::unique_ptr<WritableFileWriter> file_writer;
789 const std::string manifest = DescriptorFileName(dbname_, 1);
790 const auto& fs = env_->GetFileSystem();
791 Status s = WritableFileWriter::Create(
792 fs, manifest, fs->OptimizeForManifestWrite(env_options_), &file_writer,
793 nullptr);
794 ASSERT_OK(s);
795 {
796 log_writer->reset(new log::Writer(std::move(file_writer), 0, false));
797 std::string record;
798 new_db.EncodeTo(&record);
799 s = (*log_writer)->AddRecord(record);
800 for (const auto& e : new_cfs) {
801 record.clear();
802 e.EncodeTo(&record);
803 s = (*log_writer)->AddRecord(record);
804 ASSERT_OK(s);
805 }
806 }
807 ASSERT_OK(s);
808
809 cf_options_.table_factory = mock_table_factory_;
810 for (const auto& cf_name : cf_names) {
811 column_families->emplace_back(cf_name, cf_options_);
812 }
813 }
814
815 // Create DB with 3 column families.
NewDB()816 void NewDB() {
817 SequenceNumber last_seqno;
818 std::unique_ptr<log::Writer> log_writer;
819 SetIdentityFile(env_, dbname_);
820 PrepareManifest(&column_families_, &last_seqno, &log_writer);
821 log_writer.reset();
822 // Make "CURRENT" file point to the new manifest file.
823 Status s = SetCurrentFile(fs_.get(), dbname_, 1, nullptr);
824 ASSERT_OK(s);
825
826 EXPECT_OK(versions_->Recover(column_families_, false));
827 EXPECT_EQ(column_families_.size(),
828 versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
829 }
830
ReopenDB()831 void ReopenDB() {
832 versions_.reset(
833 new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
834 &write_buffer_manager_, &write_controller_,
835 /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
836 EXPECT_OK(versions_->Recover(column_families_, false));
837 }
838
VerifyManifest(std::string * manifest_path) const839 void VerifyManifest(std::string* manifest_path) const {
840 assert(manifest_path != nullptr);
841 uint64_t manifest_file_number = 0;
842 Status s = versions_->GetCurrentManifestPath(
843 dbname_, fs_.get(), manifest_path, &manifest_file_number);
844 ASSERT_OK(s);
845 ASSERT_EQ(1, manifest_file_number);
846 }
847
LogAndApplyToDefaultCF(VersionEdit & edit)848 Status LogAndApplyToDefaultCF(VersionEdit& edit) {
849 mutex_.Lock();
850 Status s =
851 versions_->LogAndApply(versions_->GetColumnFamilySet()->GetDefault(),
852 mutable_cf_options_, &edit, &mutex_);
853 mutex_.Unlock();
854 return s;
855 }
856
LogAndApplyToDefaultCF(const autovector<std::unique_ptr<VersionEdit>> & edits)857 Status LogAndApplyToDefaultCF(
858 const autovector<std::unique_ptr<VersionEdit>>& edits) {
859 autovector<VersionEdit*> vedits;
860 for (auto& e : edits) {
861 vedits.push_back(e.get());
862 }
863 mutex_.Lock();
864 Status s =
865 versions_->LogAndApply(versions_->GetColumnFamilySet()->GetDefault(),
866 mutable_cf_options_, vedits, &mutex_);
867 mutex_.Unlock();
868 return s;
869 }
870
CreateNewManifest()871 void CreateNewManifest() {
872 constexpr FSDirectory* db_directory = nullptr;
873 constexpr bool new_descriptor_log = true;
874 mutex_.Lock();
875 VersionEdit dummy;
876 ASSERT_OK(versions_->LogAndApply(
877 versions_->GetColumnFamilySet()->GetDefault(), mutable_cf_options_,
878 &dummy, &mutex_, db_directory, new_descriptor_log));
879 mutex_.Unlock();
880 }
881
CreateColumnFamily(const std::string & cf_name,const ColumnFamilyOptions & cf_options)882 ColumnFamilyData* CreateColumnFamily(const std::string& cf_name,
883 const ColumnFamilyOptions& cf_options) {
884 VersionEdit new_cf;
885 new_cf.AddColumnFamily(cf_name);
886 uint32_t new_id = versions_->GetColumnFamilySet()->GetNextColumnFamilyID();
887 new_cf.SetColumnFamily(new_id);
888 new_cf.SetLogNumber(0);
889 new_cf.SetComparatorName(cf_options.comparator->Name());
890 Status s;
891 mutex_.Lock();
892 s = versions_->LogAndApply(/*column_family_data=*/nullptr,
893 MutableCFOptions(cf_options), &new_cf, &mutex_,
894 /*db_directory=*/nullptr,
895 /*new_descriptor_log=*/false, &cf_options);
896 mutex_.Unlock();
897 EXPECT_OK(s);
898 ColumnFamilyData* cfd =
899 versions_->GetColumnFamilySet()->GetColumnFamily(cf_name);
900 EXPECT_NE(nullptr, cfd);
901 return cfd;
902 }
903
904 Env* mem_env_;
905 Env* env_;
906 std::shared_ptr<Env> env_guard_;
907 std::shared_ptr<FileSystem> fs_;
908 const std::string dbname_;
909 EnvOptions env_options_;
910 Options options_;
911 ImmutableDBOptions db_options_;
912 ColumnFamilyOptions cf_options_;
913 ImmutableOptions immutable_cf_options_;
914 MutableCFOptions mutable_cf_options_;
915 std::shared_ptr<Cache> table_cache_;
916 WriteController write_controller_;
917 WriteBufferManager write_buffer_manager_;
918 std::shared_ptr<VersionSet> versions_;
919 std::shared_ptr<ReactiveVersionSet> reactive_versions_;
920 InstrumentedMutex mutex_;
921 std::atomic<bool> shutting_down_;
922 std::shared_ptr<mock::MockTableFactory> mock_table_factory_;
923 std::vector<ColumnFamilyDescriptor> column_families_;
924 };
925
926 const std::string VersionSetTestBase::kColumnFamilyName1 = "alice";
927 const std::string VersionSetTestBase::kColumnFamilyName2 = "bob";
928 const std::string VersionSetTestBase::kColumnFamilyName3 = "charles";
929
930 class VersionSetTest : public VersionSetTestBase, public testing::Test {
931 public:
VersionSetTest()932 VersionSetTest() : VersionSetTestBase("version_set_test") {}
933 };
934
TEST_F(VersionSetTest,SameColumnFamilyGroupCommit)935 TEST_F(VersionSetTest, SameColumnFamilyGroupCommit) {
936 NewDB();
937 const int kGroupSize = 5;
938 autovector<VersionEdit> edits;
939 for (int i = 0; i != kGroupSize; ++i) {
940 edits.emplace_back(VersionEdit());
941 }
942 autovector<ColumnFamilyData*> cfds;
943 autovector<const MutableCFOptions*> all_mutable_cf_options;
944 autovector<autovector<VersionEdit*>> edit_lists;
945 for (int i = 0; i != kGroupSize; ++i) {
946 cfds.emplace_back(versions_->GetColumnFamilySet()->GetDefault());
947 all_mutable_cf_options.emplace_back(&mutable_cf_options_);
948 autovector<VersionEdit*> edit_list;
949 edit_list.emplace_back(&edits[i]);
950 edit_lists.emplace_back(edit_list);
951 }
952
953 SyncPoint::GetInstance()->DisableProcessing();
954 SyncPoint::GetInstance()->ClearAllCallBacks();
955 int count = 0;
956 SyncPoint::GetInstance()->SetCallBack(
957 "VersionSet::ProcessManifestWrites:SameColumnFamily", [&](void* arg) {
958 uint32_t* cf_id = reinterpret_cast<uint32_t*>(arg);
959 EXPECT_EQ(0u, *cf_id);
960 ++count;
961 });
962 SyncPoint::GetInstance()->EnableProcessing();
963 mutex_.Lock();
964 Status s =
965 versions_->LogAndApply(cfds, all_mutable_cf_options, edit_lists, &mutex_);
966 mutex_.Unlock();
967 EXPECT_OK(s);
968 EXPECT_EQ(kGroupSize - 1, count);
969 }
970
TEST_F(VersionSetTest,PersistBlobFileStateInNewManifest)971 TEST_F(VersionSetTest, PersistBlobFileStateInNewManifest) {
972 // Initialize the database and add a couple of blob files, one with some
973 // garbage in it, and one without any garbage.
974 NewDB();
975
976 assert(versions_);
977 assert(versions_->GetColumnFamilySet());
978
979 ColumnFamilyData* const cfd = versions_->GetColumnFamilySet()->GetDefault();
980 assert(cfd);
981
982 Version* const version = cfd->current();
983 assert(version);
984
985 VersionStorageInfo* const storage_info = version->storage_info();
986 assert(storage_info);
987
988 {
989 constexpr uint64_t blob_file_number = 123;
990 constexpr uint64_t total_blob_count = 456;
991 constexpr uint64_t total_blob_bytes = 77777777;
992 constexpr char checksum_method[] = "SHA1";
993 constexpr char checksum_value[] =
994 "bdb7f34a59dfa1592ce7f52e99f98c570c525cbd";
995
996 auto shared_meta = SharedBlobFileMetaData::Create(
997 blob_file_number, total_blob_count, total_blob_bytes, checksum_method,
998 checksum_value);
999
1000 constexpr uint64_t garbage_blob_count = 89;
1001 constexpr uint64_t garbage_blob_bytes = 1000000;
1002
1003 auto meta = BlobFileMetaData::Create(
1004 std::move(shared_meta), BlobFileMetaData::LinkedSsts(),
1005 garbage_blob_count, garbage_blob_bytes);
1006
1007 storage_info->AddBlobFile(std::move(meta));
1008 }
1009
1010 {
1011 constexpr uint64_t blob_file_number = 234;
1012 constexpr uint64_t total_blob_count = 555;
1013 constexpr uint64_t total_blob_bytes = 66666;
1014 constexpr char checksum_method[] = "CRC32";
1015 constexpr char checksum_value[] = "3d87ff57";
1016
1017 auto shared_meta = SharedBlobFileMetaData::Create(
1018 blob_file_number, total_blob_count, total_blob_bytes, checksum_method,
1019 checksum_value);
1020
1021 constexpr uint64_t garbage_blob_count = 0;
1022 constexpr uint64_t garbage_blob_bytes = 0;
1023
1024 auto meta = BlobFileMetaData::Create(
1025 std::move(shared_meta), BlobFileMetaData::LinkedSsts(),
1026 garbage_blob_count, garbage_blob_bytes);
1027
1028 storage_info->AddBlobFile(std::move(meta));
1029 }
1030
1031 // Force the creation of a new manifest file and make sure metadata for
1032 // the blob files is re-persisted.
1033 size_t addition_encoded = 0;
1034 SyncPoint::GetInstance()->SetCallBack(
1035 "BlobFileAddition::EncodeTo::CustomFields",
1036 [&](void* /* arg */) { ++addition_encoded; });
1037
1038 size_t garbage_encoded = 0;
1039 SyncPoint::GetInstance()->SetCallBack(
1040 "BlobFileGarbage::EncodeTo::CustomFields",
1041 [&](void* /* arg */) { ++garbage_encoded; });
1042 SyncPoint::GetInstance()->EnableProcessing();
1043
1044 CreateNewManifest();
1045
1046 ASSERT_EQ(addition_encoded, 2);
1047 ASSERT_EQ(garbage_encoded, 1);
1048
1049 SyncPoint::GetInstance()->DisableProcessing();
1050 SyncPoint::GetInstance()->ClearAllCallBacks();
1051 }
1052
TEST_F(VersionSetTest,AddLiveBlobFiles)1053 TEST_F(VersionSetTest, AddLiveBlobFiles) {
1054 // Initialize the database and add a blob file.
1055 NewDB();
1056
1057 assert(versions_);
1058 assert(versions_->GetColumnFamilySet());
1059
1060 ColumnFamilyData* const cfd = versions_->GetColumnFamilySet()->GetDefault();
1061 assert(cfd);
1062
1063 Version* const first_version = cfd->current();
1064 assert(first_version);
1065
1066 VersionStorageInfo* const first_storage_info = first_version->storage_info();
1067 assert(first_storage_info);
1068
1069 constexpr uint64_t first_blob_file_number = 234;
1070 constexpr uint64_t first_total_blob_count = 555;
1071 constexpr uint64_t first_total_blob_bytes = 66666;
1072 constexpr char first_checksum_method[] = "CRC32";
1073 constexpr char first_checksum_value[] = "3d87ff57";
1074
1075 auto first_shared_meta = SharedBlobFileMetaData::Create(
1076 first_blob_file_number, first_total_blob_count, first_total_blob_bytes,
1077 first_checksum_method, first_checksum_value);
1078
1079 constexpr uint64_t garbage_blob_count = 0;
1080 constexpr uint64_t garbage_blob_bytes = 0;
1081
1082 auto first_meta = BlobFileMetaData::Create(
1083 std::move(first_shared_meta), BlobFileMetaData::LinkedSsts(),
1084 garbage_blob_count, garbage_blob_bytes);
1085
1086 first_storage_info->AddBlobFile(first_meta);
1087
1088 // Reference the version so it stays alive even after the following version
1089 // edit.
1090 first_version->Ref();
1091
1092 // Get live files directly from version.
1093 std::vector<uint64_t> version_table_files;
1094 std::vector<uint64_t> version_blob_files;
1095
1096 first_version->AddLiveFiles(&version_table_files, &version_blob_files);
1097
1098 ASSERT_EQ(version_blob_files.size(), 1);
1099 ASSERT_EQ(version_blob_files[0], first_blob_file_number);
1100
1101 // Create a new version containing an additional blob file.
1102 versions_->TEST_CreateAndAppendVersion(cfd);
1103
1104 Version* const second_version = cfd->current();
1105 assert(second_version);
1106 assert(second_version != first_version);
1107
1108 VersionStorageInfo* const second_storage_info =
1109 second_version->storage_info();
1110 assert(second_storage_info);
1111
1112 constexpr uint64_t second_blob_file_number = 456;
1113 constexpr uint64_t second_total_blob_count = 100;
1114 constexpr uint64_t second_total_blob_bytes = 2000000;
1115 constexpr char second_checksum_method[] = "CRC32B";
1116 constexpr char second_checksum_value[] = "6dbdf23a";
1117
1118 auto second_shared_meta = SharedBlobFileMetaData::Create(
1119 second_blob_file_number, second_total_blob_count, second_total_blob_bytes,
1120 second_checksum_method, second_checksum_value);
1121
1122 auto second_meta = BlobFileMetaData::Create(
1123 std::move(second_shared_meta), BlobFileMetaData::LinkedSsts(),
1124 garbage_blob_count, garbage_blob_bytes);
1125
1126 second_storage_info->AddBlobFile(std::move(first_meta));
1127 second_storage_info->AddBlobFile(std::move(second_meta));
1128
1129 // Get all live files from version set. Note that the result contains
1130 // duplicates.
1131 std::vector<uint64_t> all_table_files;
1132 std::vector<uint64_t> all_blob_files;
1133
1134 versions_->AddLiveFiles(&all_table_files, &all_blob_files);
1135
1136 ASSERT_EQ(all_blob_files.size(), 3);
1137 ASSERT_EQ(all_blob_files[0], first_blob_file_number);
1138 ASSERT_EQ(all_blob_files[1], first_blob_file_number);
1139 ASSERT_EQ(all_blob_files[2], second_blob_file_number);
1140
1141 // Clean up previous version.
1142 first_version->Unref();
1143 }
1144
TEST_F(VersionSetTest,ObsoleteBlobFile)1145 TEST_F(VersionSetTest, ObsoleteBlobFile) {
1146 // Initialize the database and add a blob file that is entirely garbage
1147 // and thus can immediately be marked obsolete.
1148 NewDB();
1149
1150 VersionEdit edit;
1151
1152 constexpr uint64_t blob_file_number = 234;
1153 constexpr uint64_t total_blob_count = 555;
1154 constexpr uint64_t total_blob_bytes = 66666;
1155 constexpr char checksum_method[] = "CRC32";
1156 constexpr char checksum_value[] = "3d87ff57";
1157
1158 edit.AddBlobFile(blob_file_number, total_blob_count, total_blob_bytes,
1159 checksum_method, checksum_value);
1160
1161 edit.AddBlobFileGarbage(blob_file_number, total_blob_count, total_blob_bytes);
1162
1163 mutex_.Lock();
1164 Status s =
1165 versions_->LogAndApply(versions_->GetColumnFamilySet()->GetDefault(),
1166 mutable_cf_options_, &edit, &mutex_);
1167 mutex_.Unlock();
1168
1169 ASSERT_OK(s);
1170
1171 // Make sure blob files from the pending number range are not returned
1172 // as obsolete.
1173 {
1174 std::vector<ObsoleteFileInfo> table_files;
1175 std::vector<ObsoleteBlobFileInfo> blob_files;
1176 std::vector<std::string> manifest_files;
1177 constexpr uint64_t min_pending_output = blob_file_number;
1178
1179 versions_->GetObsoleteFiles(&table_files, &blob_files, &manifest_files,
1180 min_pending_output);
1181
1182 ASSERT_TRUE(blob_files.empty());
1183 }
1184
1185 // Make sure the blob file is returned as obsolete if it's not in the pending
1186 // range.
1187 {
1188 std::vector<ObsoleteFileInfo> table_files;
1189 std::vector<ObsoleteBlobFileInfo> blob_files;
1190 std::vector<std::string> manifest_files;
1191 constexpr uint64_t min_pending_output = blob_file_number + 1;
1192
1193 versions_->GetObsoleteFiles(&table_files, &blob_files, &manifest_files,
1194 min_pending_output);
1195
1196 ASSERT_EQ(blob_files.size(), 1);
1197 ASSERT_EQ(blob_files[0].GetBlobFileNumber(), blob_file_number);
1198 }
1199
1200 // Make sure it's not returned a second time.
1201 {
1202 std::vector<ObsoleteFileInfo> table_files;
1203 std::vector<ObsoleteBlobFileInfo> blob_files;
1204 std::vector<std::string> manifest_files;
1205 constexpr uint64_t min_pending_output = blob_file_number + 1;
1206
1207 versions_->GetObsoleteFiles(&table_files, &blob_files, &manifest_files,
1208 min_pending_output);
1209
1210 ASSERT_TRUE(blob_files.empty());
1211 }
1212 }
1213
TEST_F(VersionSetTest,WalEditsNotAppliedToVersion)1214 TEST_F(VersionSetTest, WalEditsNotAppliedToVersion) {
1215 NewDB();
1216
1217 constexpr uint64_t kNumWals = 5;
1218
1219 autovector<std::unique_ptr<VersionEdit>> edits;
1220 // Add some WALs.
1221 for (uint64_t i = 1; i <= kNumWals; i++) {
1222 edits.emplace_back(new VersionEdit);
1223 // WAL's size equals its log number.
1224 edits.back()->AddWal(i, WalMetadata(i));
1225 }
1226 // Delete the first half of the WALs.
1227 edits.emplace_back(new VersionEdit);
1228 edits.back()->DeleteWalsBefore(kNumWals / 2 + 1);
1229
1230 autovector<Version*> versions;
1231 SyncPoint::GetInstance()->SetCallBack(
1232 "VersionSet::ProcessManifestWrites:NewVersion",
1233 [&](void* arg) { versions.push_back(reinterpret_cast<Version*>(arg)); });
1234 SyncPoint::GetInstance()->EnableProcessing();
1235
1236 ASSERT_OK(LogAndApplyToDefaultCF(edits));
1237
1238 SyncPoint::GetInstance()->DisableProcessing();
1239 SyncPoint::GetInstance()->ClearAllCallBacks();
1240
1241 // Since the edits are all WAL edits, no version should be created.
1242 ASSERT_EQ(versions.size(), 1);
1243 ASSERT_EQ(versions[0], nullptr);
1244 }
1245
1246 // Similar to WalEditsNotAppliedToVersion, but contains a non-WAL edit.
TEST_F(VersionSetTest,NonWalEditsAppliedToVersion)1247 TEST_F(VersionSetTest, NonWalEditsAppliedToVersion) {
1248 NewDB();
1249
1250 const std::string kDBId = "db_db";
1251 constexpr uint64_t kNumWals = 5;
1252
1253 autovector<std::unique_ptr<VersionEdit>> edits;
1254 // Add some WALs.
1255 for (uint64_t i = 1; i <= kNumWals; i++) {
1256 edits.emplace_back(new VersionEdit);
1257 // WAL's size equals its log number.
1258 edits.back()->AddWal(i, WalMetadata(i));
1259 }
1260 // Delete the first half of the WALs.
1261 edits.emplace_back(new VersionEdit);
1262 edits.back()->DeleteWalsBefore(kNumWals / 2 + 1);
1263 edits.emplace_back(new VersionEdit);
1264 edits.back()->SetDBId(kDBId);
1265
1266 autovector<Version*> versions;
1267 SyncPoint::GetInstance()->SetCallBack(
1268 "VersionSet::ProcessManifestWrites:NewVersion",
1269 [&](void* arg) { versions.push_back(reinterpret_cast<Version*>(arg)); });
1270 SyncPoint::GetInstance()->EnableProcessing();
1271
1272 ASSERT_OK(LogAndApplyToDefaultCF(edits));
1273
1274 SyncPoint::GetInstance()->DisableProcessing();
1275 SyncPoint::GetInstance()->ClearAllCallBacks();
1276
1277 // Since the edits are all WAL edits, no version should be created.
1278 ASSERT_EQ(versions.size(), 1);
1279 ASSERT_NE(versions[0], nullptr);
1280 }
1281
TEST_F(VersionSetTest,WalAddition)1282 TEST_F(VersionSetTest, WalAddition) {
1283 NewDB();
1284
1285 constexpr WalNumber kLogNumber = 10;
1286 constexpr uint64_t kSizeInBytes = 111;
1287
1288 // A WAL is just created.
1289 {
1290 VersionEdit edit;
1291 edit.AddWal(kLogNumber);
1292
1293 ASSERT_OK(LogAndApplyToDefaultCF(edit));
1294
1295 const auto& wals = versions_->GetWalSet().GetWals();
1296 ASSERT_EQ(wals.size(), 1);
1297 ASSERT_TRUE(wals.find(kLogNumber) != wals.end());
1298 ASSERT_FALSE(wals.at(kLogNumber).HasSyncedSize());
1299 }
1300
1301 // The WAL is synced for several times before closing.
1302 {
1303 for (uint64_t size_delta = 100; size_delta > 0; size_delta /= 2) {
1304 uint64_t size = kSizeInBytes - size_delta;
1305 WalMetadata wal(size);
1306 VersionEdit edit;
1307 edit.AddWal(kLogNumber, wal);
1308
1309 ASSERT_OK(LogAndApplyToDefaultCF(edit));
1310
1311 const auto& wals = versions_->GetWalSet().GetWals();
1312 ASSERT_EQ(wals.size(), 1);
1313 ASSERT_TRUE(wals.find(kLogNumber) != wals.end());
1314 ASSERT_TRUE(wals.at(kLogNumber).HasSyncedSize());
1315 ASSERT_EQ(wals.at(kLogNumber).GetSyncedSizeInBytes(), size);
1316 }
1317 }
1318
1319 // The WAL is closed.
1320 {
1321 WalMetadata wal(kSizeInBytes);
1322 VersionEdit edit;
1323 edit.AddWal(kLogNumber, wal);
1324
1325 ASSERT_OK(LogAndApplyToDefaultCF(edit));
1326
1327 const auto& wals = versions_->GetWalSet().GetWals();
1328 ASSERT_EQ(wals.size(), 1);
1329 ASSERT_TRUE(wals.find(kLogNumber) != wals.end());
1330 ASSERT_TRUE(wals.at(kLogNumber).HasSyncedSize());
1331 ASSERT_EQ(wals.at(kLogNumber).GetSyncedSizeInBytes(), kSizeInBytes);
1332 }
1333
1334 // Recover a new VersionSet.
1335 {
1336 std::unique_ptr<VersionSet> new_versions(
1337 new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
1338 &write_buffer_manager_, &write_controller_,
1339 /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
1340 ASSERT_OK(new_versions->Recover(column_families_, /*read_only=*/false));
1341 const auto& wals = new_versions->GetWalSet().GetWals();
1342 ASSERT_EQ(wals.size(), 1);
1343 ASSERT_TRUE(wals.find(kLogNumber) != wals.end());
1344 ASSERT_TRUE(wals.at(kLogNumber).HasSyncedSize());
1345 ASSERT_EQ(wals.at(kLogNumber).GetSyncedSizeInBytes(), kSizeInBytes);
1346 }
1347 }
1348
TEST_F(VersionSetTest,WalCloseWithoutSync)1349 TEST_F(VersionSetTest, WalCloseWithoutSync) {
1350 NewDB();
1351
1352 constexpr WalNumber kLogNumber = 10;
1353 constexpr uint64_t kSizeInBytes = 111;
1354 constexpr uint64_t kSyncedSizeInBytes = kSizeInBytes / 2;
1355
1356 // A WAL is just created.
1357 {
1358 VersionEdit edit;
1359 edit.AddWal(kLogNumber);
1360
1361 ASSERT_OK(LogAndApplyToDefaultCF(edit));
1362
1363 const auto& wals = versions_->GetWalSet().GetWals();
1364 ASSERT_EQ(wals.size(), 1);
1365 ASSERT_TRUE(wals.find(kLogNumber) != wals.end());
1366 ASSERT_FALSE(wals.at(kLogNumber).HasSyncedSize());
1367 }
1368
1369 // The WAL is synced before closing.
1370 {
1371 WalMetadata wal(kSyncedSizeInBytes);
1372 VersionEdit edit;
1373 edit.AddWal(kLogNumber, wal);
1374
1375 ASSERT_OK(LogAndApplyToDefaultCF(edit));
1376
1377 const auto& wals = versions_->GetWalSet().GetWals();
1378 ASSERT_EQ(wals.size(), 1);
1379 ASSERT_TRUE(wals.find(kLogNumber) != wals.end());
1380 ASSERT_TRUE(wals.at(kLogNumber).HasSyncedSize());
1381 ASSERT_EQ(wals.at(kLogNumber).GetSyncedSizeInBytes(), kSyncedSizeInBytes);
1382 }
1383
1384 // A new WAL with larger log number is created,
1385 // implicitly marking the current WAL closed.
1386 {
1387 VersionEdit edit;
1388 edit.AddWal(kLogNumber + 1);
1389 ASSERT_OK(LogAndApplyToDefaultCF(edit));
1390
1391 const auto& wals = versions_->GetWalSet().GetWals();
1392 ASSERT_EQ(wals.size(), 2);
1393 ASSERT_TRUE(wals.find(kLogNumber) != wals.end());
1394 ASSERT_TRUE(wals.at(kLogNumber).HasSyncedSize());
1395 ASSERT_EQ(wals.at(kLogNumber).GetSyncedSizeInBytes(), kSyncedSizeInBytes);
1396 ASSERT_TRUE(wals.find(kLogNumber + 1) != wals.end());
1397 ASSERT_FALSE(wals.at(kLogNumber + 1).HasSyncedSize());
1398 }
1399
1400 // Recover a new VersionSet.
1401 {
1402 std::unique_ptr<VersionSet> new_versions(
1403 new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
1404 &write_buffer_manager_, &write_controller_,
1405 /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
1406 ASSERT_OK(new_versions->Recover(column_families_, false));
1407 const auto& wals = new_versions->GetWalSet().GetWals();
1408 ASSERT_EQ(wals.size(), 2);
1409 ASSERT_TRUE(wals.find(kLogNumber) != wals.end());
1410 ASSERT_TRUE(wals.at(kLogNumber).HasSyncedSize());
1411 ASSERT_EQ(wals.at(kLogNumber).GetSyncedSizeInBytes(), kSyncedSizeInBytes);
1412 }
1413 }
1414
TEST_F(VersionSetTest,WalDeletion)1415 TEST_F(VersionSetTest, WalDeletion) {
1416 NewDB();
1417
1418 constexpr WalNumber kClosedLogNumber = 10;
1419 constexpr WalNumber kNonClosedLogNumber = 20;
1420 constexpr uint64_t kSizeInBytes = 111;
1421
1422 // Add a non-closed and a closed WAL.
1423 {
1424 VersionEdit edit;
1425 edit.AddWal(kClosedLogNumber, WalMetadata(kSizeInBytes));
1426 edit.AddWal(kNonClosedLogNumber);
1427
1428 ASSERT_OK(LogAndApplyToDefaultCF(edit));
1429
1430 const auto& wals = versions_->GetWalSet().GetWals();
1431 ASSERT_EQ(wals.size(), 2);
1432 ASSERT_TRUE(wals.find(kNonClosedLogNumber) != wals.end());
1433 ASSERT_TRUE(wals.find(kClosedLogNumber) != wals.end());
1434 ASSERT_FALSE(wals.at(kNonClosedLogNumber).HasSyncedSize());
1435 ASSERT_TRUE(wals.at(kClosedLogNumber).HasSyncedSize());
1436 ASSERT_EQ(wals.at(kClosedLogNumber).GetSyncedSizeInBytes(), kSizeInBytes);
1437 }
1438
1439 // Delete the closed WAL.
1440 {
1441 VersionEdit edit;
1442 edit.DeleteWalsBefore(kNonClosedLogNumber);
1443
1444 ASSERT_OK(LogAndApplyToDefaultCF(edit));
1445
1446 const auto& wals = versions_->GetWalSet().GetWals();
1447 ASSERT_EQ(wals.size(), 1);
1448 ASSERT_TRUE(wals.find(kNonClosedLogNumber) != wals.end());
1449 ASSERT_FALSE(wals.at(kNonClosedLogNumber).HasSyncedSize());
1450 }
1451
1452 // Recover a new VersionSet, only the non-closed WAL should show up.
1453 {
1454 std::unique_ptr<VersionSet> new_versions(
1455 new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
1456 &write_buffer_manager_, &write_controller_,
1457 /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
1458 ASSERT_OK(new_versions->Recover(column_families_, false));
1459 const auto& wals = new_versions->GetWalSet().GetWals();
1460 ASSERT_EQ(wals.size(), 1);
1461 ASSERT_TRUE(wals.find(kNonClosedLogNumber) != wals.end());
1462 ASSERT_FALSE(wals.at(kNonClosedLogNumber).HasSyncedSize());
1463 }
1464
1465 // Force the creation of a new MANIFEST file,
1466 // only the non-closed WAL should be written to the new MANIFEST.
1467 {
1468 std::vector<WalAddition> wal_additions;
1469 SyncPoint::GetInstance()->SetCallBack(
1470 "VersionSet::WriteCurrentStateToManifest:SaveWal", [&](void* arg) {
1471 VersionEdit* edit = reinterpret_cast<VersionEdit*>(arg);
1472 ASSERT_TRUE(edit->IsWalAddition());
1473 for (auto& addition : edit->GetWalAdditions()) {
1474 wal_additions.push_back(addition);
1475 }
1476 });
1477 SyncPoint::GetInstance()->EnableProcessing();
1478
1479 CreateNewManifest();
1480
1481 SyncPoint::GetInstance()->DisableProcessing();
1482 SyncPoint::GetInstance()->ClearAllCallBacks();
1483
1484 ASSERT_EQ(wal_additions.size(), 1);
1485 ASSERT_EQ(wal_additions[0].GetLogNumber(), kNonClosedLogNumber);
1486 ASSERT_FALSE(wal_additions[0].GetMetadata().HasSyncedSize());
1487 }
1488
1489 // Recover from the new MANIFEST, only the non-closed WAL should show up.
1490 {
1491 std::unique_ptr<VersionSet> new_versions(
1492 new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
1493 &write_buffer_manager_, &write_controller_,
1494 /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
1495 ASSERT_OK(new_versions->Recover(column_families_, false));
1496 const auto& wals = new_versions->GetWalSet().GetWals();
1497 ASSERT_EQ(wals.size(), 1);
1498 ASSERT_TRUE(wals.find(kNonClosedLogNumber) != wals.end());
1499 ASSERT_FALSE(wals.at(kNonClosedLogNumber).HasSyncedSize());
1500 }
1501 }
1502
TEST_F(VersionSetTest,WalCreateTwice)1503 TEST_F(VersionSetTest, WalCreateTwice) {
1504 NewDB();
1505
1506 constexpr WalNumber kLogNumber = 10;
1507
1508 VersionEdit edit;
1509 edit.AddWal(kLogNumber);
1510
1511 ASSERT_OK(LogAndApplyToDefaultCF(edit));
1512
1513 Status s = LogAndApplyToDefaultCF(edit);
1514 ASSERT_TRUE(s.IsCorruption());
1515 ASSERT_TRUE(s.ToString().find("WAL 10 is created more than once") !=
1516 std::string::npos)
1517 << s.ToString();
1518 }
1519
TEST_F(VersionSetTest,WalCreateAfterClose)1520 TEST_F(VersionSetTest, WalCreateAfterClose) {
1521 NewDB();
1522
1523 constexpr WalNumber kLogNumber = 10;
1524 constexpr uint64_t kSizeInBytes = 111;
1525
1526 {
1527 // Add a closed WAL.
1528 VersionEdit edit;
1529 edit.AddWal(kLogNumber);
1530 WalMetadata wal(kSizeInBytes);
1531 edit.AddWal(kLogNumber, wal);
1532
1533 ASSERT_OK(LogAndApplyToDefaultCF(edit));
1534 }
1535
1536 {
1537 // Create the same WAL again.
1538 VersionEdit edit;
1539 edit.AddWal(kLogNumber);
1540
1541 Status s = LogAndApplyToDefaultCF(edit);
1542 ASSERT_TRUE(s.IsCorruption());
1543 ASSERT_TRUE(s.ToString().find("WAL 10 is created more than once") !=
1544 std::string::npos)
1545 << s.ToString();
1546 }
1547 }
1548
TEST_F(VersionSetTest,AddWalWithSmallerSize)1549 TEST_F(VersionSetTest, AddWalWithSmallerSize) {
1550 NewDB();
1551
1552 constexpr WalNumber kLogNumber = 10;
1553 constexpr uint64_t kSizeInBytes = 111;
1554
1555 {
1556 // Add a closed WAL.
1557 VersionEdit edit;
1558 WalMetadata wal(kSizeInBytes);
1559 edit.AddWal(kLogNumber, wal);
1560
1561 ASSERT_OK(LogAndApplyToDefaultCF(edit));
1562 }
1563
1564 {
1565 // Add the same WAL with smaller synced size.
1566 VersionEdit edit;
1567 WalMetadata wal(kSizeInBytes / 2);
1568 edit.AddWal(kLogNumber, wal);
1569
1570 Status s = LogAndApplyToDefaultCF(edit);
1571 ASSERT_TRUE(s.IsCorruption());
1572 ASSERT_TRUE(
1573 s.ToString().find(
1574 "WAL 10 must not have smaller synced size than previous one") !=
1575 std::string::npos)
1576 << s.ToString();
1577 }
1578 }
1579
TEST_F(VersionSetTest,DeleteWalsBeforeNonExistingWalNumber)1580 TEST_F(VersionSetTest, DeleteWalsBeforeNonExistingWalNumber) {
1581 NewDB();
1582
1583 constexpr WalNumber kLogNumber0 = 10;
1584 constexpr WalNumber kLogNumber1 = 20;
1585 constexpr WalNumber kNonExistingNumber = 15;
1586 constexpr uint64_t kSizeInBytes = 111;
1587
1588 {
1589 // Add closed WALs.
1590 VersionEdit edit;
1591 WalMetadata wal(kSizeInBytes);
1592 edit.AddWal(kLogNumber0, wal);
1593 edit.AddWal(kLogNumber1, wal);
1594
1595 ASSERT_OK(LogAndApplyToDefaultCF(edit));
1596 }
1597
1598 {
1599 // Delete WALs before a non-existing WAL.
1600 VersionEdit edit;
1601 edit.DeleteWalsBefore(kNonExistingNumber);
1602
1603 ASSERT_OK(LogAndApplyToDefaultCF(edit));
1604 }
1605
1606 // Recover a new VersionSet, WAL0 is deleted, WAL1 is not.
1607 {
1608 std::unique_ptr<VersionSet> new_versions(
1609 new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
1610 &write_buffer_manager_, &write_controller_,
1611 /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
1612 ASSERT_OK(new_versions->Recover(column_families_, false));
1613 const auto& wals = new_versions->GetWalSet().GetWals();
1614 ASSERT_EQ(wals.size(), 1);
1615 ASSERT_TRUE(wals.find(kLogNumber1) != wals.end());
1616 }
1617 }
1618
TEST_F(VersionSetTest,DeleteAllWals)1619 TEST_F(VersionSetTest, DeleteAllWals) {
1620 NewDB();
1621
1622 constexpr WalNumber kMaxLogNumber = 10;
1623 constexpr uint64_t kSizeInBytes = 111;
1624
1625 {
1626 // Add a closed WAL.
1627 VersionEdit edit;
1628 WalMetadata wal(kSizeInBytes);
1629 edit.AddWal(kMaxLogNumber, wal);
1630
1631 ASSERT_OK(LogAndApplyToDefaultCF(edit));
1632 }
1633
1634 {
1635 VersionEdit edit;
1636 edit.DeleteWalsBefore(kMaxLogNumber + 10);
1637
1638 ASSERT_OK(LogAndApplyToDefaultCF(edit));
1639 }
1640
1641 // Recover a new VersionSet, all WALs are deleted.
1642 {
1643 std::unique_ptr<VersionSet> new_versions(
1644 new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
1645 &write_buffer_manager_, &write_controller_,
1646 /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
1647 ASSERT_OK(new_versions->Recover(column_families_, false));
1648 const auto& wals = new_versions->GetWalSet().GetWals();
1649 ASSERT_EQ(wals.size(), 0);
1650 }
1651 }
1652
TEST_F(VersionSetTest,AtomicGroupWithWalEdits)1653 TEST_F(VersionSetTest, AtomicGroupWithWalEdits) {
1654 NewDB();
1655
1656 constexpr int kAtomicGroupSize = 7;
1657 constexpr uint64_t kNumWals = 5;
1658 const std::string kDBId = "db_db";
1659
1660 int remaining = kAtomicGroupSize;
1661 autovector<std::unique_ptr<VersionEdit>> edits;
1662 // Add 5 WALs.
1663 for (uint64_t i = 1; i <= kNumWals; i++) {
1664 edits.emplace_back(new VersionEdit);
1665 // WAL's size equals its log number.
1666 edits.back()->AddWal(i, WalMetadata(i));
1667 edits.back()->MarkAtomicGroup(--remaining);
1668 }
1669 // One edit with the min log number set.
1670 edits.emplace_back(new VersionEdit);
1671 edits.back()->SetDBId(kDBId);
1672 edits.back()->MarkAtomicGroup(--remaining);
1673 // Delete the first added 4 WALs.
1674 edits.emplace_back(new VersionEdit);
1675 edits.back()->DeleteWalsBefore(kNumWals);
1676 edits.back()->MarkAtomicGroup(--remaining);
1677 ASSERT_EQ(remaining, 0);
1678
1679 ASSERT_OK(LogAndApplyToDefaultCF(edits));
1680
1681 // Recover a new VersionSet, the min log number and the last WAL should be
1682 // kept.
1683 {
1684 std::unique_ptr<VersionSet> new_versions(
1685 new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
1686 &write_buffer_manager_, &write_controller_,
1687 /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
1688 std::string db_id;
1689 ASSERT_OK(
1690 new_versions->Recover(column_families_, /*read_only=*/false, &db_id));
1691
1692 ASSERT_EQ(db_id, kDBId);
1693
1694 const auto& wals = new_versions->GetWalSet().GetWals();
1695 ASSERT_EQ(wals.size(), 1);
1696 ASSERT_TRUE(wals.find(kNumWals) != wals.end());
1697 ASSERT_TRUE(wals.at(kNumWals).HasSyncedSize());
1698 ASSERT_EQ(wals.at(kNumWals).GetSyncedSizeInBytes(), kNumWals);
1699 }
1700 }
1701
1702 class VersionSetWithTimestampTest : public VersionSetTest {
1703 public:
1704 static const std::string kNewCfName;
1705
VersionSetWithTimestampTest()1706 explicit VersionSetWithTimestampTest() : VersionSetTest() {}
1707
SetUp()1708 void SetUp() override {
1709 NewDB();
1710 Options options;
1711 options.comparator = test::ComparatorWithU64Ts();
1712 cfd_ = CreateColumnFamily(kNewCfName, options);
1713 EXPECT_NE(nullptr, cfd_);
1714 EXPECT_NE(nullptr, cfd_->GetLatestMutableCFOptions());
1715 column_families_.emplace_back(kNewCfName, options);
1716 }
1717
TearDown()1718 void TearDown() override {
1719 for (auto* e : edits_) {
1720 delete e;
1721 }
1722 edits_.clear();
1723 }
1724
GenVersionEditsToSetFullHistoryTsLow(const std::vector<uint64_t> & ts_lbs)1725 void GenVersionEditsToSetFullHistoryTsLow(
1726 const std::vector<uint64_t>& ts_lbs) {
1727 for (const auto ts_lb : ts_lbs) {
1728 VersionEdit* edit = new VersionEdit;
1729 edit->SetColumnFamily(cfd_->GetID());
1730 std::string ts_str = test::EncodeInt(ts_lb);
1731 edit->SetFullHistoryTsLow(ts_str);
1732 edits_.emplace_back(edit);
1733 }
1734 }
1735
VerifyFullHistoryTsLow(uint64_t expected_ts_low)1736 void VerifyFullHistoryTsLow(uint64_t expected_ts_low) {
1737 std::unique_ptr<VersionSet> vset(
1738 new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
1739 &write_buffer_manager_, &write_controller_,
1740 /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
1741 ASSERT_OK(vset->Recover(column_families_, /*read_only=*/false,
1742 /*db_id=*/nullptr));
1743 for (auto* cfd : *(vset->GetColumnFamilySet())) {
1744 ASSERT_NE(nullptr, cfd);
1745 if (cfd->GetName() == kNewCfName) {
1746 ASSERT_EQ(test::EncodeInt(expected_ts_low), cfd->GetFullHistoryTsLow());
1747 } else {
1748 ASSERT_TRUE(cfd->GetFullHistoryTsLow().empty());
1749 }
1750 }
1751 }
1752
DoTest(const std::vector<uint64_t> & ts_lbs)1753 void DoTest(const std::vector<uint64_t>& ts_lbs) {
1754 if (ts_lbs.empty()) {
1755 return;
1756 }
1757
1758 GenVersionEditsToSetFullHistoryTsLow(ts_lbs);
1759
1760 Status s;
1761 mutex_.Lock();
1762 s = versions_->LogAndApply(cfd_, *(cfd_->GetLatestMutableCFOptions()),
1763 edits_, &mutex_);
1764 mutex_.Unlock();
1765 ASSERT_OK(s);
1766 VerifyFullHistoryTsLow(*std::max_element(ts_lbs.begin(), ts_lbs.end()));
1767 }
1768
1769 protected:
1770 ColumnFamilyData* cfd_{nullptr};
1771 // edits_ must contain and own pointers to heap-alloc VersionEdit objects.
1772 autovector<VersionEdit*> edits_;
1773 };
1774
1775 const std::string VersionSetWithTimestampTest::kNewCfName("new_cf");
1776
TEST_F(VersionSetWithTimestampTest,SetFullHistoryTsLbOnce)1777 TEST_F(VersionSetWithTimestampTest, SetFullHistoryTsLbOnce) {
1778 constexpr uint64_t kTsLow = 100;
1779 DoTest({kTsLow});
1780 }
1781
1782 // Simulate the application increasing full_history_ts_low.
TEST_F(VersionSetWithTimestampTest,IncreaseFullHistoryTsLb)1783 TEST_F(VersionSetWithTimestampTest, IncreaseFullHistoryTsLb) {
1784 const std::vector<uint64_t> ts_lbs = {100, 101, 102, 103};
1785 DoTest(ts_lbs);
1786 }
1787
1788 // Simulate the application trying to decrease full_history_ts_low
1789 // unsuccessfully. If the application calls public API sequentially to
1790 // decrease the lower bound ts, RocksDB will return an InvalidArgument
1791 // status before involving VersionSet. Only when multiple threads trying
1792 // to decrease the lower bound concurrently will this case ever happen. Even
1793 // so, the lower bound cannot be decreased. The application will be notified
1794 // via return value of the API.
TEST_F(VersionSetWithTimestampTest,TryDecreaseFullHistoryTsLb)1795 TEST_F(VersionSetWithTimestampTest, TryDecreaseFullHistoryTsLb) {
1796 const std::vector<uint64_t> ts_lbs = {103, 102, 101, 100};
1797 DoTest(ts_lbs);
1798 }
1799
1800 class VersionSetAtomicGroupTest : public VersionSetTestBase,
1801 public testing::Test {
1802 public:
VersionSetAtomicGroupTest()1803 VersionSetAtomicGroupTest()
1804 : VersionSetTestBase("version_set_atomic_group_test") {}
1805
SetUp()1806 void SetUp() override {
1807 PrepareManifest(&column_families_, &last_seqno_, &log_writer_);
1808 SetupTestSyncPoints();
1809 }
1810
SetupValidAtomicGroup(int atomic_group_size)1811 void SetupValidAtomicGroup(int atomic_group_size) {
1812 edits_.resize(atomic_group_size);
1813 int remaining = atomic_group_size;
1814 for (size_t i = 0; i != edits_.size(); ++i) {
1815 edits_[i].SetLogNumber(0);
1816 edits_[i].SetNextFile(2);
1817 edits_[i].MarkAtomicGroup(--remaining);
1818 edits_[i].SetLastSequence(last_seqno_++);
1819 }
1820 ASSERT_OK(SetCurrentFile(fs_.get(), dbname_, 1, nullptr));
1821 }
1822
SetupIncompleteTrailingAtomicGroup(int atomic_group_size)1823 void SetupIncompleteTrailingAtomicGroup(int atomic_group_size) {
1824 edits_.resize(atomic_group_size);
1825 int remaining = atomic_group_size;
1826 for (size_t i = 0; i != edits_.size(); ++i) {
1827 edits_[i].SetLogNumber(0);
1828 edits_[i].SetNextFile(2);
1829 edits_[i].MarkAtomicGroup(--remaining);
1830 edits_[i].SetLastSequence(last_seqno_++);
1831 }
1832 ASSERT_OK(SetCurrentFile(fs_.get(), dbname_, 1, nullptr));
1833 }
1834
SetupCorruptedAtomicGroup(int atomic_group_size)1835 void SetupCorruptedAtomicGroup(int atomic_group_size) {
1836 edits_.resize(atomic_group_size);
1837 int remaining = atomic_group_size;
1838 for (size_t i = 0; i != edits_.size(); ++i) {
1839 edits_[i].SetLogNumber(0);
1840 edits_[i].SetNextFile(2);
1841 if (i != ((size_t)atomic_group_size / 2)) {
1842 edits_[i].MarkAtomicGroup(--remaining);
1843 }
1844 edits_[i].SetLastSequence(last_seqno_++);
1845 }
1846 ASSERT_OK(SetCurrentFile(fs_.get(), dbname_, 1, nullptr));
1847 }
1848
SetupIncorrectAtomicGroup(int atomic_group_size)1849 void SetupIncorrectAtomicGroup(int atomic_group_size) {
1850 edits_.resize(atomic_group_size);
1851 int remaining = atomic_group_size;
1852 for (size_t i = 0; i != edits_.size(); ++i) {
1853 edits_[i].SetLogNumber(0);
1854 edits_[i].SetNextFile(2);
1855 if (i != 1) {
1856 edits_[i].MarkAtomicGroup(--remaining);
1857 } else {
1858 edits_[i].MarkAtomicGroup(remaining--);
1859 }
1860 edits_[i].SetLastSequence(last_seqno_++);
1861 }
1862 ASSERT_OK(SetCurrentFile(fs_.get(), dbname_, 1, nullptr));
1863 }
1864
SetupTestSyncPoints()1865 void SetupTestSyncPoints() {
1866 SyncPoint::GetInstance()->DisableProcessing();
1867 SyncPoint::GetInstance()->ClearAllCallBacks();
1868 SyncPoint::GetInstance()->SetCallBack(
1869 "AtomicGroupReadBuffer::AddEdit:FirstInAtomicGroup", [&](void* arg) {
1870 VersionEdit* e = reinterpret_cast<VersionEdit*>(arg);
1871 EXPECT_EQ(edits_.front().DebugString(),
1872 e->DebugString()); // compare based on value
1873 first_in_atomic_group_ = true;
1874 });
1875 SyncPoint::GetInstance()->SetCallBack(
1876 "AtomicGroupReadBuffer::AddEdit:LastInAtomicGroup", [&](void* arg) {
1877 VersionEdit* e = reinterpret_cast<VersionEdit*>(arg);
1878 EXPECT_EQ(edits_.back().DebugString(),
1879 e->DebugString()); // compare based on value
1880 EXPECT_TRUE(first_in_atomic_group_);
1881 last_in_atomic_group_ = true;
1882 });
1883 SyncPoint::GetInstance()->SetCallBack(
1884 "VersionEditHandlerBase::Iterate:Finish", [&](void* arg) {
1885 num_recovered_edits_ = *reinterpret_cast<int*>(arg);
1886 });
1887 SyncPoint::GetInstance()->SetCallBack(
1888 "AtomicGroupReadBuffer::AddEdit:AtomicGroup",
1889 [&](void* /* arg */) { ++num_edits_in_atomic_group_; });
1890 SyncPoint::GetInstance()->SetCallBack(
1891 "AtomicGroupReadBuffer::AddEdit:AtomicGroupMixedWithNormalEdits",
1892 [&](void* arg) {
1893 corrupted_edit_ = *reinterpret_cast<VersionEdit*>(arg);
1894 });
1895 SyncPoint::GetInstance()->SetCallBack(
1896 "AtomicGroupReadBuffer::AddEdit:IncorrectAtomicGroupSize",
1897 [&](void* arg) {
1898 edit_with_incorrect_group_size_ =
1899 *reinterpret_cast<VersionEdit*>(arg);
1900 });
1901 SyncPoint::GetInstance()->EnableProcessing();
1902 }
1903
AddNewEditsToLog(int num_edits)1904 void AddNewEditsToLog(int num_edits) {
1905 for (int i = 0; i < num_edits; i++) {
1906 std::string record;
1907 edits_[i].EncodeTo(&record);
1908 ASSERT_OK(log_writer_->AddRecord(record));
1909 }
1910 }
1911
TearDown()1912 void TearDown() override {
1913 SyncPoint::GetInstance()->DisableProcessing();
1914 SyncPoint::GetInstance()->ClearAllCallBacks();
1915 log_writer_.reset();
1916 }
1917
1918 protected:
1919 std::vector<ColumnFamilyDescriptor> column_families_;
1920 SequenceNumber last_seqno_;
1921 std::vector<VersionEdit> edits_;
1922 bool first_in_atomic_group_ = false;
1923 bool last_in_atomic_group_ = false;
1924 int num_edits_in_atomic_group_ = 0;
1925 int num_recovered_edits_ = 0;
1926 VersionEdit corrupted_edit_;
1927 VersionEdit edit_with_incorrect_group_size_;
1928 std::unique_ptr<log::Writer> log_writer_;
1929 };
1930
TEST_F(VersionSetAtomicGroupTest,HandleValidAtomicGroupWithVersionSetRecover)1931 TEST_F(VersionSetAtomicGroupTest, HandleValidAtomicGroupWithVersionSetRecover) {
1932 const int kAtomicGroupSize = 3;
1933 SetupValidAtomicGroup(kAtomicGroupSize);
1934 AddNewEditsToLog(kAtomicGroupSize);
1935 EXPECT_OK(versions_->Recover(column_families_, false));
1936 EXPECT_EQ(column_families_.size(),
1937 versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
1938 EXPECT_TRUE(first_in_atomic_group_);
1939 EXPECT_TRUE(last_in_atomic_group_);
1940 EXPECT_EQ(num_initial_edits_ + kAtomicGroupSize, num_recovered_edits_);
1941 }
1942
TEST_F(VersionSetAtomicGroupTest,HandleValidAtomicGroupWithReactiveVersionSetRecover)1943 TEST_F(VersionSetAtomicGroupTest,
1944 HandleValidAtomicGroupWithReactiveVersionSetRecover) {
1945 const int kAtomicGroupSize = 3;
1946 SetupValidAtomicGroup(kAtomicGroupSize);
1947 AddNewEditsToLog(kAtomicGroupSize);
1948 std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
1949 std::unique_ptr<log::Reader::Reporter> manifest_reporter;
1950 std::unique_ptr<Status> manifest_reader_status;
1951 EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,
1952 &manifest_reporter,
1953 &manifest_reader_status));
1954 EXPECT_EQ(column_families_.size(),
1955 reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
1956 EXPECT_TRUE(first_in_atomic_group_);
1957 EXPECT_TRUE(last_in_atomic_group_);
1958 // The recover should clean up the replay buffer.
1959 EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() == 0);
1960 EXPECT_TRUE(reactive_versions_->replay_buffer().size() == 0);
1961 EXPECT_EQ(num_initial_edits_ + kAtomicGroupSize, num_recovered_edits_);
1962 }
1963
TEST_F(VersionSetAtomicGroupTest,HandleValidAtomicGroupWithReactiveVersionSetReadAndApply)1964 TEST_F(VersionSetAtomicGroupTest,
1965 HandleValidAtomicGroupWithReactiveVersionSetReadAndApply) {
1966 const int kAtomicGroupSize = 3;
1967 SetupValidAtomicGroup(kAtomicGroupSize);
1968 std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
1969 std::unique_ptr<log::Reader::Reporter> manifest_reporter;
1970 std::unique_ptr<Status> manifest_reader_status;
1971 EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,
1972 &manifest_reporter,
1973 &manifest_reader_status));
1974 EXPECT_EQ(num_initial_edits_, num_recovered_edits_);
1975 AddNewEditsToLog(kAtomicGroupSize);
1976 InstrumentedMutex mu;
1977 std::unordered_set<ColumnFamilyData*> cfds_changed;
1978 mu.Lock();
1979 EXPECT_OK(reactive_versions_->ReadAndApply(
1980 &mu, &manifest_reader, manifest_reader_status.get(), &cfds_changed));
1981 mu.Unlock();
1982 EXPECT_TRUE(first_in_atomic_group_);
1983 EXPECT_TRUE(last_in_atomic_group_);
1984 // The recover should clean up the replay buffer.
1985 EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() == 0);
1986 EXPECT_TRUE(reactive_versions_->replay_buffer().size() == 0);
1987 EXPECT_EQ(kAtomicGroupSize, num_recovered_edits_);
1988 }
1989
TEST_F(VersionSetAtomicGroupTest,HandleIncompleteTrailingAtomicGroupWithVersionSetRecover)1990 TEST_F(VersionSetAtomicGroupTest,
1991 HandleIncompleteTrailingAtomicGroupWithVersionSetRecover) {
1992 const int kAtomicGroupSize = 4;
1993 const int kNumberOfPersistedVersionEdits = kAtomicGroupSize - 1;
1994 SetupIncompleteTrailingAtomicGroup(kAtomicGroupSize);
1995 AddNewEditsToLog(kNumberOfPersistedVersionEdits);
1996 EXPECT_OK(versions_->Recover(column_families_, false));
1997 EXPECT_EQ(column_families_.size(),
1998 versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
1999 EXPECT_TRUE(first_in_atomic_group_);
2000 EXPECT_FALSE(last_in_atomic_group_);
2001 EXPECT_EQ(kNumberOfPersistedVersionEdits, num_edits_in_atomic_group_);
2002 EXPECT_EQ(num_initial_edits_, num_recovered_edits_);
2003 }
2004
TEST_F(VersionSetAtomicGroupTest,HandleIncompleteTrailingAtomicGroupWithReactiveVersionSetRecover)2005 TEST_F(VersionSetAtomicGroupTest,
2006 HandleIncompleteTrailingAtomicGroupWithReactiveVersionSetRecover) {
2007 const int kAtomicGroupSize = 4;
2008 const int kNumberOfPersistedVersionEdits = kAtomicGroupSize - 1;
2009 SetupIncompleteTrailingAtomicGroup(kAtomicGroupSize);
2010 AddNewEditsToLog(kNumberOfPersistedVersionEdits);
2011 std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
2012 std::unique_ptr<log::Reader::Reporter> manifest_reporter;
2013 std::unique_ptr<Status> manifest_reader_status;
2014 EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,
2015 &manifest_reporter,
2016 &manifest_reader_status));
2017 EXPECT_EQ(column_families_.size(),
2018 reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
2019 EXPECT_TRUE(first_in_atomic_group_);
2020 EXPECT_FALSE(last_in_atomic_group_);
2021 EXPECT_EQ(kNumberOfPersistedVersionEdits, num_edits_in_atomic_group_);
2022 // Reactive version set should store the edits in the replay buffer.
2023 EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() ==
2024 kNumberOfPersistedVersionEdits);
2025 EXPECT_TRUE(reactive_versions_->replay_buffer().size() == kAtomicGroupSize);
2026 // Write the last record. The reactive version set should now apply all
2027 // edits.
2028 std::string last_record;
2029 edits_[kAtomicGroupSize - 1].EncodeTo(&last_record);
2030 EXPECT_OK(log_writer_->AddRecord(last_record));
2031 InstrumentedMutex mu;
2032 std::unordered_set<ColumnFamilyData*> cfds_changed;
2033 mu.Lock();
2034 EXPECT_OK(reactive_versions_->ReadAndApply(
2035 &mu, &manifest_reader, manifest_reader_status.get(), &cfds_changed));
2036 mu.Unlock();
2037 // Reactive version set should be empty now.
2038 EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() == 0);
2039 EXPECT_TRUE(reactive_versions_->replay_buffer().size() == 0);
2040 EXPECT_EQ(num_initial_edits_, num_recovered_edits_);
2041 }
2042
TEST_F(VersionSetAtomicGroupTest,HandleIncompleteTrailingAtomicGroupWithReactiveVersionSetReadAndApply)2043 TEST_F(VersionSetAtomicGroupTest,
2044 HandleIncompleteTrailingAtomicGroupWithReactiveVersionSetReadAndApply) {
2045 const int kAtomicGroupSize = 4;
2046 const int kNumberOfPersistedVersionEdits = kAtomicGroupSize - 1;
2047 SetupIncompleteTrailingAtomicGroup(kAtomicGroupSize);
2048 std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
2049 std::unique_ptr<log::Reader::Reporter> manifest_reporter;
2050 std::unique_ptr<Status> manifest_reader_status;
2051 // No edits in an atomic group.
2052 EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,
2053 &manifest_reporter,
2054 &manifest_reader_status));
2055 EXPECT_EQ(column_families_.size(),
2056 reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
2057 EXPECT_EQ(num_initial_edits_, num_recovered_edits_);
2058 // Write a few edits in an atomic group.
2059 AddNewEditsToLog(kNumberOfPersistedVersionEdits);
2060 InstrumentedMutex mu;
2061 std::unordered_set<ColumnFamilyData*> cfds_changed;
2062 mu.Lock();
2063 EXPECT_OK(reactive_versions_->ReadAndApply(
2064 &mu, &manifest_reader, manifest_reader_status.get(), &cfds_changed));
2065 mu.Unlock();
2066 EXPECT_TRUE(first_in_atomic_group_);
2067 EXPECT_FALSE(last_in_atomic_group_);
2068 EXPECT_EQ(kNumberOfPersistedVersionEdits, num_edits_in_atomic_group_);
2069 // Reactive version set should store the edits in the replay buffer.
2070 EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() ==
2071 kNumberOfPersistedVersionEdits);
2072 EXPECT_TRUE(reactive_versions_->replay_buffer().size() == kAtomicGroupSize);
2073 }
2074
TEST_F(VersionSetAtomicGroupTest,HandleCorruptedAtomicGroupWithVersionSetRecover)2075 TEST_F(VersionSetAtomicGroupTest,
2076 HandleCorruptedAtomicGroupWithVersionSetRecover) {
2077 const int kAtomicGroupSize = 4;
2078 SetupCorruptedAtomicGroup(kAtomicGroupSize);
2079 AddNewEditsToLog(kAtomicGroupSize);
2080 EXPECT_NOK(versions_->Recover(column_families_, false));
2081 EXPECT_EQ(column_families_.size(),
2082 versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
2083 EXPECT_EQ(edits_[kAtomicGroupSize / 2].DebugString(),
2084 corrupted_edit_.DebugString());
2085 }
2086
TEST_F(VersionSetAtomicGroupTest,HandleCorruptedAtomicGroupWithReactiveVersionSetRecover)2087 TEST_F(VersionSetAtomicGroupTest,
2088 HandleCorruptedAtomicGroupWithReactiveVersionSetRecover) {
2089 const int kAtomicGroupSize = 4;
2090 SetupCorruptedAtomicGroup(kAtomicGroupSize);
2091 AddNewEditsToLog(kAtomicGroupSize);
2092 std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
2093 std::unique_ptr<log::Reader::Reporter> manifest_reporter;
2094 std::unique_ptr<Status> manifest_reader_status;
2095 EXPECT_NOK(reactive_versions_->Recover(column_families_, &manifest_reader,
2096 &manifest_reporter,
2097 &manifest_reader_status));
2098 EXPECT_EQ(column_families_.size(),
2099 reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
2100 EXPECT_EQ(edits_[kAtomicGroupSize / 2].DebugString(),
2101 corrupted_edit_.DebugString());
2102 }
2103
TEST_F(VersionSetAtomicGroupTest,HandleCorruptedAtomicGroupWithReactiveVersionSetReadAndApply)2104 TEST_F(VersionSetAtomicGroupTest,
2105 HandleCorruptedAtomicGroupWithReactiveVersionSetReadAndApply) {
2106 const int kAtomicGroupSize = 4;
2107 SetupCorruptedAtomicGroup(kAtomicGroupSize);
2108 InstrumentedMutex mu;
2109 std::unordered_set<ColumnFamilyData*> cfds_changed;
2110 std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
2111 std::unique_ptr<log::Reader::Reporter> manifest_reporter;
2112 std::unique_ptr<Status> manifest_reader_status;
2113 EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,
2114 &manifest_reporter,
2115 &manifest_reader_status));
2116 // Write the corrupted edits.
2117 AddNewEditsToLog(kAtomicGroupSize);
2118 mu.Lock();
2119 EXPECT_NOK(reactive_versions_->ReadAndApply(
2120 &mu, &manifest_reader, manifest_reader_status.get(), &cfds_changed));
2121 mu.Unlock();
2122 EXPECT_EQ(edits_[kAtomicGroupSize / 2].DebugString(),
2123 corrupted_edit_.DebugString());
2124 }
2125
TEST_F(VersionSetAtomicGroupTest,HandleIncorrectAtomicGroupSizeWithVersionSetRecover)2126 TEST_F(VersionSetAtomicGroupTest,
2127 HandleIncorrectAtomicGroupSizeWithVersionSetRecover) {
2128 const int kAtomicGroupSize = 4;
2129 SetupIncorrectAtomicGroup(kAtomicGroupSize);
2130 AddNewEditsToLog(kAtomicGroupSize);
2131 EXPECT_NOK(versions_->Recover(column_families_, false));
2132 EXPECT_EQ(column_families_.size(),
2133 versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
2134 EXPECT_EQ(edits_[1].DebugString(),
2135 edit_with_incorrect_group_size_.DebugString());
2136 }
2137
TEST_F(VersionSetAtomicGroupTest,HandleIncorrectAtomicGroupSizeWithReactiveVersionSetRecover)2138 TEST_F(VersionSetAtomicGroupTest,
2139 HandleIncorrectAtomicGroupSizeWithReactiveVersionSetRecover) {
2140 const int kAtomicGroupSize = 4;
2141 SetupIncorrectAtomicGroup(kAtomicGroupSize);
2142 AddNewEditsToLog(kAtomicGroupSize);
2143 std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
2144 std::unique_ptr<log::Reader::Reporter> manifest_reporter;
2145 std::unique_ptr<Status> manifest_reader_status;
2146 EXPECT_NOK(reactive_versions_->Recover(column_families_, &manifest_reader,
2147 &manifest_reporter,
2148 &manifest_reader_status));
2149 EXPECT_EQ(column_families_.size(),
2150 reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
2151 EXPECT_EQ(edits_[1].DebugString(),
2152 edit_with_incorrect_group_size_.DebugString());
2153 }
2154
TEST_F(VersionSetAtomicGroupTest,HandleIncorrectAtomicGroupSizeWithReactiveVersionSetReadAndApply)2155 TEST_F(VersionSetAtomicGroupTest,
2156 HandleIncorrectAtomicGroupSizeWithReactiveVersionSetReadAndApply) {
2157 const int kAtomicGroupSize = 4;
2158 SetupIncorrectAtomicGroup(kAtomicGroupSize);
2159 InstrumentedMutex mu;
2160 std::unordered_set<ColumnFamilyData*> cfds_changed;
2161 std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
2162 std::unique_ptr<log::Reader::Reporter> manifest_reporter;
2163 std::unique_ptr<Status> manifest_reader_status;
2164 EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,
2165 &manifest_reporter,
2166 &manifest_reader_status));
2167 AddNewEditsToLog(kAtomicGroupSize);
2168 mu.Lock();
2169 EXPECT_NOK(reactive_versions_->ReadAndApply(
2170 &mu, &manifest_reader, manifest_reader_status.get(), &cfds_changed));
2171 mu.Unlock();
2172 EXPECT_EQ(edits_[1].DebugString(),
2173 edit_with_incorrect_group_size_.DebugString());
2174 }
2175
2176 class VersionSetTestDropOneCF : public VersionSetTestBase,
2177 public testing::TestWithParam<std::string> {
2178 public:
VersionSetTestDropOneCF()2179 VersionSetTestDropOneCF()
2180 : VersionSetTestBase("version_set_test_drop_one_cf") {}
2181 };
2182
2183 // This test simulates the following execution sequence
2184 // Time thread1 bg_flush_thr
2185 // | Prepare version edits (e1,e2,e3) for atomic
2186 // | flush cf1, cf2, cf3
2187 // | Enqueue e to drop cfi
2188 // | to manifest_writers_
2189 // | Enqueue (e1,e2,e3) to manifest_writers_
2190 // |
2191 // | Apply e,
2192 // | cfi.IsDropped() is true
2193 // | Apply (e1,e2,e3),
2194 // | since cfi.IsDropped() == true, we need to
2195 // | drop ei and write the rest to MANIFEST.
2196 // V
2197 //
2198 // Repeat the test for i = 1, 2, 3 to simulate dropping the first, middle and
2199 // last column family in an atomic group.
TEST_P(VersionSetTestDropOneCF,HandleDroppedColumnFamilyInAtomicGroup)2200 TEST_P(VersionSetTestDropOneCF, HandleDroppedColumnFamilyInAtomicGroup) {
2201 std::vector<ColumnFamilyDescriptor> column_families;
2202 SequenceNumber last_seqno;
2203 std::unique_ptr<log::Writer> log_writer;
2204 PrepareManifest(&column_families, &last_seqno, &log_writer);
2205 Status s = SetCurrentFile(fs_.get(), dbname_, 1, nullptr);
2206 ASSERT_OK(s);
2207
2208 EXPECT_OK(versions_->Recover(column_families, false /* read_only */));
2209 EXPECT_EQ(column_families.size(),
2210 versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
2211
2212 const int kAtomicGroupSize = 3;
2213 const std::vector<std::string> non_default_cf_names = {
2214 kColumnFamilyName1, kColumnFamilyName2, kColumnFamilyName3};
2215
2216 // Drop one column family
2217 VersionEdit drop_cf_edit;
2218 drop_cf_edit.DropColumnFamily();
2219 const std::string cf_to_drop_name(GetParam());
2220 auto cfd_to_drop =
2221 versions_->GetColumnFamilySet()->GetColumnFamily(cf_to_drop_name);
2222 ASSERT_NE(nullptr, cfd_to_drop);
2223 // Increase its refcount because cfd_to_drop is used later, and we need to
2224 // prevent it from being deleted.
2225 cfd_to_drop->Ref();
2226 drop_cf_edit.SetColumnFamily(cfd_to_drop->GetID());
2227 mutex_.Lock();
2228 s = versions_->LogAndApply(cfd_to_drop,
2229 *cfd_to_drop->GetLatestMutableCFOptions(),
2230 &drop_cf_edit, &mutex_);
2231 mutex_.Unlock();
2232 ASSERT_OK(s);
2233
2234 std::vector<VersionEdit> edits(kAtomicGroupSize);
2235 uint32_t remaining = kAtomicGroupSize;
2236 size_t i = 0;
2237 autovector<ColumnFamilyData*> cfds;
2238 autovector<const MutableCFOptions*> mutable_cf_options_list;
2239 autovector<autovector<VersionEdit*>> edit_lists;
2240 for (const auto& cf_name : non_default_cf_names) {
2241 auto cfd = (cf_name != cf_to_drop_name)
2242 ? versions_->GetColumnFamilySet()->GetColumnFamily(cf_name)
2243 : cfd_to_drop;
2244 ASSERT_NE(nullptr, cfd);
2245 cfds.push_back(cfd);
2246 mutable_cf_options_list.emplace_back(cfd->GetLatestMutableCFOptions());
2247 edits[i].SetColumnFamily(cfd->GetID());
2248 edits[i].SetLogNumber(0);
2249 edits[i].SetNextFile(2);
2250 edits[i].MarkAtomicGroup(--remaining);
2251 edits[i].SetLastSequence(last_seqno++);
2252 autovector<VersionEdit*> tmp_edits;
2253 tmp_edits.push_back(&edits[i]);
2254 edit_lists.emplace_back(tmp_edits);
2255 ++i;
2256 }
2257 int called = 0;
2258 SyncPoint::GetInstance()->DisableProcessing();
2259 SyncPoint::GetInstance()->ClearAllCallBacks();
2260 SyncPoint::GetInstance()->SetCallBack(
2261 "VersionSet::ProcessManifestWrites:CheckOneAtomicGroup", [&](void* arg) {
2262 std::vector<VersionEdit*>* tmp_edits =
2263 reinterpret_cast<std::vector<VersionEdit*>*>(arg);
2264 EXPECT_EQ(kAtomicGroupSize - 1, tmp_edits->size());
2265 for (const auto e : *tmp_edits) {
2266 bool found = false;
2267 for (const auto& e2 : edits) {
2268 if (&e2 == e) {
2269 found = true;
2270 break;
2271 }
2272 }
2273 ASSERT_TRUE(found);
2274 }
2275 ++called;
2276 });
2277 SyncPoint::GetInstance()->EnableProcessing();
2278 mutex_.Lock();
2279 s = versions_->LogAndApply(cfds, mutable_cf_options_list, edit_lists,
2280 &mutex_);
2281 mutex_.Unlock();
2282 ASSERT_OK(s);
2283 ASSERT_EQ(1, called);
2284 cfd_to_drop->UnrefAndTryDelete();
2285 }
2286
2287 INSTANTIATE_TEST_CASE_P(
2288 AtomicGroup, VersionSetTestDropOneCF,
2289 testing::Values(VersionSetTestBase::kColumnFamilyName1,
2290 VersionSetTestBase::kColumnFamilyName2,
2291 VersionSetTestBase::kColumnFamilyName3));
2292
2293 class EmptyDefaultCfNewManifest : public VersionSetTestBase,
2294 public testing::Test {
2295 public:
EmptyDefaultCfNewManifest()2296 EmptyDefaultCfNewManifest() : VersionSetTestBase("version_set_new_db_test") {}
2297 // Emulate DBImpl::NewDB()
PrepareManifest(std::vector<ColumnFamilyDescriptor> *,SequenceNumber *,std::unique_ptr<log::Writer> * log_writer)2298 void PrepareManifest(std::vector<ColumnFamilyDescriptor>* /*column_families*/,
2299 SequenceNumber* /*last_seqno*/,
2300 std::unique_ptr<log::Writer>* log_writer) override {
2301 assert(log_writer != nullptr);
2302 VersionEdit new_db;
2303 new_db.SetLogNumber(0);
2304 const std::string manifest_path = DescriptorFileName(dbname_, 1);
2305 const auto& fs = env_->GetFileSystem();
2306 std::unique_ptr<WritableFileWriter> file_writer;
2307 Status s = WritableFileWriter::Create(
2308 fs, manifest_path, fs->OptimizeForManifestWrite(env_options_),
2309 &file_writer, nullptr);
2310 ASSERT_OK(s);
2311 log_writer->reset(new log::Writer(std::move(file_writer), 0, true));
2312 std::string record;
2313 ASSERT_TRUE(new_db.EncodeTo(&record));
2314 s = (*log_writer)->AddRecord(record);
2315 ASSERT_OK(s);
2316 // Create new column family
2317 VersionEdit new_cf;
2318 new_cf.AddColumnFamily(VersionSetTestBase::kColumnFamilyName1);
2319 new_cf.SetColumnFamily(1);
2320 new_cf.SetLastSequence(2);
2321 new_cf.SetNextFile(2);
2322 record.clear();
2323 ASSERT_TRUE(new_cf.EncodeTo(&record));
2324 s = (*log_writer)->AddRecord(record);
2325 ASSERT_OK(s);
2326 }
2327
2328 protected:
2329 bool write_dbid_to_manifest_ = false;
2330 std::unique_ptr<log::Writer> log_writer_;
2331 };
2332
2333 // Create db, create column family. Cf creation will switch to a new MANIFEST.
2334 // Then reopen db, trying to recover.
TEST_F(EmptyDefaultCfNewManifest,Recover)2335 TEST_F(EmptyDefaultCfNewManifest, Recover) {
2336 PrepareManifest(nullptr, nullptr, &log_writer_);
2337 log_writer_.reset();
2338 Status s =
2339 SetCurrentFile(fs_.get(), dbname_, 1, /*directory_to_fsync=*/nullptr);
2340 ASSERT_OK(s);
2341 std::string manifest_path;
2342 VerifyManifest(&manifest_path);
2343 std::vector<ColumnFamilyDescriptor> column_families;
2344 column_families.emplace_back(kDefaultColumnFamilyName, cf_options_);
2345 column_families.emplace_back(VersionSetTestBase::kColumnFamilyName1,
2346 cf_options_);
2347 std::string db_id;
2348 bool has_missing_table_file = false;
2349 s = versions_->TryRecoverFromOneManifest(
2350 manifest_path, column_families, false, &db_id, &has_missing_table_file);
2351 ASSERT_OK(s);
2352 ASSERT_FALSE(has_missing_table_file);
2353 }
2354
2355 class VersionSetTestEmptyDb
2356 : public VersionSetTestBase,
2357 public testing::TestWithParam<
2358 std::tuple<bool, bool, std::vector<std::string>>> {
2359 public:
2360 static const std::string kUnknownColumnFamilyName;
VersionSetTestEmptyDb()2361 VersionSetTestEmptyDb() : VersionSetTestBase("version_set_test_empty_db") {}
2362
2363 protected:
PrepareManifest(std::vector<ColumnFamilyDescriptor> *,SequenceNumber *,std::unique_ptr<log::Writer> * log_writer)2364 void PrepareManifest(std::vector<ColumnFamilyDescriptor>* /*column_families*/,
2365 SequenceNumber* /*last_seqno*/,
2366 std::unique_ptr<log::Writer>* log_writer) override {
2367 assert(nullptr != log_writer);
2368 VersionEdit new_db;
2369 if (db_options_.write_dbid_to_manifest) {
2370 DBOptions tmp_db_options;
2371 tmp_db_options.env = env_;
2372 std::unique_ptr<DBImpl> impl(new DBImpl(tmp_db_options, dbname_));
2373 std::string db_id;
2374 impl->GetDbIdentityFromIdentityFile(&db_id);
2375 new_db.SetDBId(db_id);
2376 }
2377 const std::string manifest_path = DescriptorFileName(dbname_, 1);
2378 const auto& fs = env_->GetFileSystem();
2379 std::unique_ptr<WritableFileWriter> file_writer;
2380 Status s = WritableFileWriter::Create(
2381 fs, manifest_path, fs->OptimizeForManifestWrite(env_options_),
2382 &file_writer, nullptr);
2383 ASSERT_OK(s);
2384 {
2385 log_writer->reset(new log::Writer(std::move(file_writer), 0, false));
2386 std::string record;
2387 new_db.EncodeTo(&record);
2388 s = (*log_writer)->AddRecord(record);
2389 ASSERT_OK(s);
2390 }
2391 }
2392
2393 std::unique_ptr<log::Writer> log_writer_;
2394 };
2395
2396 const std::string VersionSetTestEmptyDb::kUnknownColumnFamilyName = "unknown";
2397
TEST_P(VersionSetTestEmptyDb,OpenFromIncompleteManifest0)2398 TEST_P(VersionSetTestEmptyDb, OpenFromIncompleteManifest0) {
2399 db_options_.write_dbid_to_manifest = std::get<0>(GetParam());
2400 PrepareManifest(nullptr, nullptr, &log_writer_);
2401 log_writer_.reset();
2402 Status s =
2403 SetCurrentFile(fs_.get(), dbname_, 1, /*directory_to_fsync=*/nullptr);
2404 ASSERT_OK(s);
2405
2406 std::string manifest_path;
2407 VerifyManifest(&manifest_path);
2408
2409 bool read_only = std::get<1>(GetParam());
2410 const std::vector<std::string> cf_names = std::get<2>(GetParam());
2411
2412 std::vector<ColumnFamilyDescriptor> column_families;
2413 for (const auto& cf_name : cf_names) {
2414 column_families.emplace_back(cf_name, cf_options_);
2415 }
2416
2417 std::string db_id;
2418 bool has_missing_table_file = false;
2419 s = versions_->TryRecoverFromOneManifest(manifest_path, column_families,
2420 read_only, &db_id,
2421 &has_missing_table_file);
2422 auto iter =
2423 std::find(cf_names.begin(), cf_names.end(), kDefaultColumnFamilyName);
2424 if (iter == cf_names.end()) {
2425 ASSERT_TRUE(s.IsInvalidArgument());
2426 } else {
2427 ASSERT_TRUE(s.IsCorruption());
2428 }
2429 }
2430
TEST_P(VersionSetTestEmptyDb,OpenFromIncompleteManifest1)2431 TEST_P(VersionSetTestEmptyDb, OpenFromIncompleteManifest1) {
2432 db_options_.write_dbid_to_manifest = std::get<0>(GetParam());
2433 PrepareManifest(nullptr, nullptr, &log_writer_);
2434 // Only a subset of column families in the MANIFEST.
2435 VersionEdit new_cf1;
2436 new_cf1.AddColumnFamily(VersionSetTestBase::kColumnFamilyName1);
2437 new_cf1.SetColumnFamily(1);
2438 Status s;
2439 {
2440 std::string record;
2441 new_cf1.EncodeTo(&record);
2442 s = log_writer_->AddRecord(record);
2443 ASSERT_OK(s);
2444 }
2445 log_writer_.reset();
2446 s = SetCurrentFile(fs_.get(), dbname_, 1, /*directory_to_fsync=*/nullptr);
2447 ASSERT_OK(s);
2448
2449 std::string manifest_path;
2450 VerifyManifest(&manifest_path);
2451
2452 bool read_only = std::get<1>(GetParam());
2453 const std::vector<std::string>& cf_names = std::get<2>(GetParam());
2454 std::vector<ColumnFamilyDescriptor> column_families;
2455 for (const auto& cf_name : cf_names) {
2456 column_families.emplace_back(cf_name, cf_options_);
2457 }
2458 std::string db_id;
2459 bool has_missing_table_file = false;
2460 s = versions_->TryRecoverFromOneManifest(manifest_path, column_families,
2461 read_only, &db_id,
2462 &has_missing_table_file);
2463 auto iter =
2464 std::find(cf_names.begin(), cf_names.end(), kDefaultColumnFamilyName);
2465 if (iter == cf_names.end()) {
2466 ASSERT_TRUE(s.IsInvalidArgument());
2467 } else {
2468 ASSERT_TRUE(s.IsCorruption());
2469 }
2470 }
2471
TEST_P(VersionSetTestEmptyDb,OpenFromInCompleteManifest2)2472 TEST_P(VersionSetTestEmptyDb, OpenFromInCompleteManifest2) {
2473 db_options_.write_dbid_to_manifest = std::get<0>(GetParam());
2474 PrepareManifest(nullptr, nullptr, &log_writer_);
2475 // Write all column families but no log_number, next_file_number and
2476 // last_sequence.
2477 const std::vector<std::string> all_cf_names = {
2478 kDefaultColumnFamilyName, kColumnFamilyName1, kColumnFamilyName2,
2479 kColumnFamilyName3};
2480 uint32_t cf_id = 1;
2481 Status s;
2482 for (size_t i = 1; i != all_cf_names.size(); ++i) {
2483 VersionEdit new_cf;
2484 new_cf.AddColumnFamily(all_cf_names[i]);
2485 new_cf.SetColumnFamily(cf_id++);
2486 std::string record;
2487 ASSERT_TRUE(new_cf.EncodeTo(&record));
2488 s = log_writer_->AddRecord(record);
2489 ASSERT_OK(s);
2490 }
2491 log_writer_.reset();
2492 s = SetCurrentFile(fs_.get(), dbname_, 1, /*directory_to_fsync=*/nullptr);
2493 ASSERT_OK(s);
2494
2495 std::string manifest_path;
2496 VerifyManifest(&manifest_path);
2497
2498 bool read_only = std::get<1>(GetParam());
2499 const std::vector<std::string>& cf_names = std::get<2>(GetParam());
2500 std::vector<ColumnFamilyDescriptor> column_families;
2501 for (const auto& cf_name : cf_names) {
2502 column_families.emplace_back(cf_name, cf_options_);
2503 }
2504 std::string db_id;
2505 bool has_missing_table_file = false;
2506 s = versions_->TryRecoverFromOneManifest(manifest_path, column_families,
2507 read_only, &db_id,
2508 &has_missing_table_file);
2509 auto iter =
2510 std::find(cf_names.begin(), cf_names.end(), kDefaultColumnFamilyName);
2511 if (iter == cf_names.end()) {
2512 ASSERT_TRUE(s.IsInvalidArgument());
2513 } else {
2514 ASSERT_TRUE(s.IsCorruption());
2515 }
2516 }
2517
TEST_P(VersionSetTestEmptyDb,OpenManifestWithUnknownCF)2518 TEST_P(VersionSetTestEmptyDb, OpenManifestWithUnknownCF) {
2519 db_options_.write_dbid_to_manifest = std::get<0>(GetParam());
2520 PrepareManifest(nullptr, nullptr, &log_writer_);
2521 // Write all column families but no log_number, next_file_number and
2522 // last_sequence.
2523 const std::vector<std::string> all_cf_names = {
2524 kDefaultColumnFamilyName, kColumnFamilyName1, kColumnFamilyName2,
2525 kColumnFamilyName3};
2526 uint32_t cf_id = 1;
2527 Status s;
2528 for (size_t i = 1; i != all_cf_names.size(); ++i) {
2529 VersionEdit new_cf;
2530 new_cf.AddColumnFamily(all_cf_names[i]);
2531 new_cf.SetColumnFamily(cf_id++);
2532 std::string record;
2533 ASSERT_TRUE(new_cf.EncodeTo(&record));
2534 s = log_writer_->AddRecord(record);
2535 ASSERT_OK(s);
2536 }
2537 {
2538 VersionEdit tmp_edit;
2539 tmp_edit.SetColumnFamily(4);
2540 tmp_edit.SetLogNumber(0);
2541 tmp_edit.SetNextFile(2);
2542 tmp_edit.SetLastSequence(0);
2543 std::string record;
2544 ASSERT_TRUE(tmp_edit.EncodeTo(&record));
2545 s = log_writer_->AddRecord(record);
2546 ASSERT_OK(s);
2547 }
2548 log_writer_.reset();
2549 s = SetCurrentFile(fs_.get(), dbname_, 1, /*directory_to_fsync=*/nullptr);
2550 ASSERT_OK(s);
2551
2552 std::string manifest_path;
2553 VerifyManifest(&manifest_path);
2554
2555 bool read_only = std::get<1>(GetParam());
2556 const std::vector<std::string>& cf_names = std::get<2>(GetParam());
2557 std::vector<ColumnFamilyDescriptor> column_families;
2558 for (const auto& cf_name : cf_names) {
2559 column_families.emplace_back(cf_name, cf_options_);
2560 }
2561 std::string db_id;
2562 bool has_missing_table_file = false;
2563 s = versions_->TryRecoverFromOneManifest(manifest_path, column_families,
2564 read_only, &db_id,
2565 &has_missing_table_file);
2566 auto iter =
2567 std::find(cf_names.begin(), cf_names.end(), kDefaultColumnFamilyName);
2568 if (iter == cf_names.end()) {
2569 ASSERT_TRUE(s.IsInvalidArgument());
2570 } else {
2571 ASSERT_TRUE(s.IsCorruption());
2572 }
2573 }
2574
TEST_P(VersionSetTestEmptyDb,OpenCompleteManifest)2575 TEST_P(VersionSetTestEmptyDb, OpenCompleteManifest) {
2576 db_options_.write_dbid_to_manifest = std::get<0>(GetParam());
2577 PrepareManifest(nullptr, nullptr, &log_writer_);
2578 // Write all column families but no log_number, next_file_number and
2579 // last_sequence.
2580 const std::vector<std::string> all_cf_names = {
2581 kDefaultColumnFamilyName, kColumnFamilyName1, kColumnFamilyName2,
2582 kColumnFamilyName3};
2583 uint32_t cf_id = 1;
2584 Status s;
2585 for (size_t i = 1; i != all_cf_names.size(); ++i) {
2586 VersionEdit new_cf;
2587 new_cf.AddColumnFamily(all_cf_names[i]);
2588 new_cf.SetColumnFamily(cf_id++);
2589 std::string record;
2590 ASSERT_TRUE(new_cf.EncodeTo(&record));
2591 s = log_writer_->AddRecord(record);
2592 ASSERT_OK(s);
2593 }
2594 {
2595 VersionEdit tmp_edit;
2596 tmp_edit.SetLogNumber(0);
2597 tmp_edit.SetNextFile(2);
2598 tmp_edit.SetLastSequence(0);
2599 std::string record;
2600 ASSERT_TRUE(tmp_edit.EncodeTo(&record));
2601 s = log_writer_->AddRecord(record);
2602 ASSERT_OK(s);
2603 }
2604 log_writer_.reset();
2605 s = SetCurrentFile(fs_.get(), dbname_, 1, /*directory_to_fsync=*/nullptr);
2606 ASSERT_OK(s);
2607
2608 std::string manifest_path;
2609 VerifyManifest(&manifest_path);
2610
2611 bool read_only = std::get<1>(GetParam());
2612 const std::vector<std::string>& cf_names = std::get<2>(GetParam());
2613 std::vector<ColumnFamilyDescriptor> column_families;
2614 for (const auto& cf_name : cf_names) {
2615 column_families.emplace_back(cf_name, cf_options_);
2616 }
2617 std::string db_id;
2618 bool has_missing_table_file = false;
2619 s = versions_->TryRecoverFromOneManifest(manifest_path, column_families,
2620 read_only, &db_id,
2621 &has_missing_table_file);
2622 auto iter =
2623 std::find(cf_names.begin(), cf_names.end(), kDefaultColumnFamilyName);
2624 if (iter == cf_names.end()) {
2625 ASSERT_TRUE(s.IsInvalidArgument());
2626 } else if (read_only) {
2627 ASSERT_OK(s);
2628 ASSERT_FALSE(has_missing_table_file);
2629 } else if (cf_names.size() == all_cf_names.size()) {
2630 ASSERT_OK(s);
2631 ASSERT_FALSE(has_missing_table_file);
2632 } else if (cf_names.size() < all_cf_names.size()) {
2633 ASSERT_TRUE(s.IsInvalidArgument());
2634 } else {
2635 ASSERT_OK(s);
2636 ASSERT_FALSE(has_missing_table_file);
2637 ColumnFamilyData* cfd = versions_->GetColumnFamilySet()->GetColumnFamily(
2638 kUnknownColumnFamilyName);
2639 ASSERT_EQ(nullptr, cfd);
2640 }
2641 }
2642
2643 INSTANTIATE_TEST_CASE_P(
2644 BestEffortRecovery, VersionSetTestEmptyDb,
2645 testing::Combine(
2646 /*write_dbid_to_manifest=*/testing::Bool(),
2647 /*read_only=*/testing::Bool(),
2648 /*cf_names=*/
2649 testing::Values(
2650 std::vector<std::string>(),
2651 std::vector<std::string>({kDefaultColumnFamilyName}),
2652 std::vector<std::string>({VersionSetTestBase::kColumnFamilyName1,
2653 VersionSetTestBase::kColumnFamilyName2,
2654 VersionSetTestBase::kColumnFamilyName3}),
2655 std::vector<std::string>({kDefaultColumnFamilyName,
2656 VersionSetTestBase::kColumnFamilyName1}),
2657 std::vector<std::string>({kDefaultColumnFamilyName,
2658 VersionSetTestBase::kColumnFamilyName1,
2659 VersionSetTestBase::kColumnFamilyName2,
2660 VersionSetTestBase::kColumnFamilyName3}),
2661 std::vector<std::string>(
2662 {kDefaultColumnFamilyName,
2663 VersionSetTestBase::kColumnFamilyName1,
2664 VersionSetTestBase::kColumnFamilyName2,
2665 VersionSetTestBase::kColumnFamilyName3,
2666 VersionSetTestEmptyDb::kUnknownColumnFamilyName}))));
2667
2668 class VersionSetTestMissingFiles : public VersionSetTestBase,
2669 public testing::Test {
2670 public:
VersionSetTestMissingFiles()2671 VersionSetTestMissingFiles()
2672 : VersionSetTestBase("version_set_test_missing_files"),
2673 block_based_table_options_(),
2674 table_factory_(std::make_shared<BlockBasedTableFactory>(
2675 block_based_table_options_)),
2676 internal_comparator_(
2677 std::make_shared<InternalKeyComparator>(options_.comparator)) {}
2678
2679 protected:
PrepareManifest(std::vector<ColumnFamilyDescriptor> * column_families,SequenceNumber * last_seqno,std::unique_ptr<log::Writer> * log_writer)2680 void PrepareManifest(std::vector<ColumnFamilyDescriptor>* column_families,
2681 SequenceNumber* last_seqno,
2682 std::unique_ptr<log::Writer>* log_writer) override {
2683 assert(column_families != nullptr);
2684 assert(last_seqno != nullptr);
2685 assert(log_writer != nullptr);
2686 const std::string manifest = DescriptorFileName(dbname_, 1);
2687 const auto& fs = env_->GetFileSystem();
2688 std::unique_ptr<WritableFileWriter> file_writer;
2689 Status s = WritableFileWriter::Create(
2690 fs, manifest, fs->OptimizeForManifestWrite(env_options_), &file_writer,
2691 nullptr);
2692 ASSERT_OK(s);
2693 log_writer->reset(new log::Writer(std::move(file_writer), 0, false));
2694 VersionEdit new_db;
2695 if (db_options_.write_dbid_to_manifest) {
2696 DBOptions tmp_db_options;
2697 tmp_db_options.env = env_;
2698 std::unique_ptr<DBImpl> impl(new DBImpl(tmp_db_options, dbname_));
2699 std::string db_id;
2700 impl->GetDbIdentityFromIdentityFile(&db_id);
2701 new_db.SetDBId(db_id);
2702 }
2703 {
2704 std::string record;
2705 ASSERT_TRUE(new_db.EncodeTo(&record));
2706 s = (*log_writer)->AddRecord(record);
2707 ASSERT_OK(s);
2708 }
2709 const std::vector<std::string> cf_names = {
2710 kDefaultColumnFamilyName, kColumnFamilyName1, kColumnFamilyName2,
2711 kColumnFamilyName3};
2712 uint32_t cf_id = 1; // default cf id is 0
2713 cf_options_.table_factory = table_factory_;
2714 for (const auto& cf_name : cf_names) {
2715 column_families->emplace_back(cf_name, cf_options_);
2716 if (cf_name == kDefaultColumnFamilyName) {
2717 continue;
2718 }
2719 VersionEdit new_cf;
2720 new_cf.AddColumnFamily(cf_name);
2721 new_cf.SetColumnFamily(cf_id);
2722 std::string record;
2723 ASSERT_TRUE(new_cf.EncodeTo(&record));
2724 s = (*log_writer)->AddRecord(record);
2725 ASSERT_OK(s);
2726
2727 VersionEdit cf_files;
2728 cf_files.SetColumnFamily(cf_id);
2729 cf_files.SetLogNumber(0);
2730 record.clear();
2731 ASSERT_TRUE(cf_files.EncodeTo(&record));
2732 s = (*log_writer)->AddRecord(record);
2733 ASSERT_OK(s);
2734 ++cf_id;
2735 }
2736 SequenceNumber seq = 2;
2737 {
2738 VersionEdit edit;
2739 edit.SetNextFile(7);
2740 edit.SetLastSequence(seq);
2741 std::string record;
2742 ASSERT_TRUE(edit.EncodeTo(&record));
2743 s = (*log_writer)->AddRecord(record);
2744 ASSERT_OK(s);
2745 }
2746 *last_seqno = seq + 1;
2747 }
2748
2749 struct SstInfo {
2750 uint64_t file_number;
2751 std::string column_family;
2752 std::string key; // the only key
2753 int level = 0;
SstInfoROCKSDB_NAMESPACE::VersionSetTestMissingFiles::SstInfo2754 SstInfo(uint64_t file_num, const std::string& cf_name,
2755 const std::string& _key)
2756 : SstInfo(file_num, cf_name, _key, 0) {}
SstInfoROCKSDB_NAMESPACE::VersionSetTestMissingFiles::SstInfo2757 SstInfo(uint64_t file_num, const std::string& cf_name,
2758 const std::string& _key, int lvl)
2759 : file_number(file_num),
2760 column_family(cf_name),
2761 key(_key),
2762 level(lvl) {}
2763 };
2764
2765 // Create dummy sst, return their metadata. Note that only file name and size
2766 // are used.
CreateDummyTableFiles(const std::vector<SstInfo> & file_infos,std::vector<FileMetaData> * file_metas)2767 void CreateDummyTableFiles(const std::vector<SstInfo>& file_infos,
2768 std::vector<FileMetaData>* file_metas) {
2769 assert(file_metas != nullptr);
2770 for (const auto& info : file_infos) {
2771 uint64_t file_num = info.file_number;
2772 std::string fname = MakeTableFileName(dbname_, file_num);
2773 std::unique_ptr<FSWritableFile> file;
2774 Status s = fs_->NewWritableFile(fname, FileOptions(), &file, nullptr);
2775 ASSERT_OK(s);
2776 std::unique_ptr<WritableFileWriter> fwriter(new WritableFileWriter(
2777 std::move(file), fname, FileOptions(), env_->GetSystemClock().get()));
2778 IntTblPropCollectorFactories int_tbl_prop_collector_factories;
2779
2780 std::unique_ptr<TableBuilder> builder(table_factory_->NewTableBuilder(
2781 TableBuilderOptions(
2782 immutable_cf_options_, mutable_cf_options_, *internal_comparator_,
2783 &int_tbl_prop_collector_factories, kNoCompression,
2784 CompressionOptions(),
2785 TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
2786 info.column_family, info.level),
2787 fwriter.get()));
2788 InternalKey ikey(info.key, 0, ValueType::kTypeValue);
2789 builder->Add(ikey.Encode(), "value");
2790 ASSERT_OK(builder->Finish());
2791 fwriter->Flush();
2792 uint64_t file_size = 0;
2793 s = fs_->GetFileSize(fname, IOOptions(), &file_size, nullptr);
2794 ASSERT_OK(s);
2795 ASSERT_NE(0, file_size);
2796 file_metas->emplace_back(file_num, /*file_path_id=*/0, file_size, ikey,
2797 ikey, 0, 0, false, 0, 0, 0, kUnknownFileChecksum,
2798 kUnknownFileChecksumFuncName);
2799 }
2800 }
2801
2802 // This method updates last_sequence_.
WriteFileAdditionAndDeletionToManifest(uint32_t cf,const std::vector<std::pair<int,FileMetaData>> & added_files,const std::vector<std::pair<int,uint64_t>> & deleted_files)2803 void WriteFileAdditionAndDeletionToManifest(
2804 uint32_t cf, const std::vector<std::pair<int, FileMetaData>>& added_files,
2805 const std::vector<std::pair<int, uint64_t>>& deleted_files) {
2806 VersionEdit edit;
2807 edit.SetColumnFamily(cf);
2808 for (const auto& elem : added_files) {
2809 int level = elem.first;
2810 edit.AddFile(level, elem.second);
2811 }
2812 for (const auto& elem : deleted_files) {
2813 int level = elem.first;
2814 edit.DeleteFile(level, elem.second);
2815 }
2816 edit.SetLastSequence(last_seqno_);
2817 ++last_seqno_;
2818 assert(log_writer_.get() != nullptr);
2819 std::string record;
2820 ASSERT_TRUE(edit.EncodeTo(&record));
2821 Status s = log_writer_->AddRecord(record);
2822 ASSERT_OK(s);
2823 }
2824
2825 BlockBasedTableOptions block_based_table_options_;
2826 std::shared_ptr<TableFactory> table_factory_;
2827 std::shared_ptr<InternalKeyComparator> internal_comparator_;
2828 std::vector<ColumnFamilyDescriptor> column_families_;
2829 SequenceNumber last_seqno_;
2830 std::unique_ptr<log::Writer> log_writer_;
2831 };
2832
TEST_F(VersionSetTestMissingFiles,ManifestFarBehindSst)2833 TEST_F(VersionSetTestMissingFiles, ManifestFarBehindSst) {
2834 std::vector<SstInfo> existing_files = {
2835 SstInfo(100, kDefaultColumnFamilyName, "a"),
2836 SstInfo(102, kDefaultColumnFamilyName, "b"),
2837 SstInfo(103, kDefaultColumnFamilyName, "c"),
2838 SstInfo(107, kDefaultColumnFamilyName, "d"),
2839 SstInfo(110, kDefaultColumnFamilyName, "e")};
2840 std::vector<FileMetaData> file_metas;
2841 CreateDummyTableFiles(existing_files, &file_metas);
2842
2843 PrepareManifest(&column_families_, &last_seqno_, &log_writer_);
2844 std::vector<std::pair<int, FileMetaData>> added_files;
2845 for (uint64_t file_num = 10; file_num < 15; ++file_num) {
2846 std::string smallest_ukey = "a";
2847 std::string largest_ukey = "b";
2848 InternalKey smallest_ikey(smallest_ukey, 1, ValueType::kTypeValue);
2849 InternalKey largest_ikey(largest_ukey, 1, ValueType::kTypeValue);
2850 FileMetaData meta =
2851 FileMetaData(file_num, /*file_path_id=*/0, /*file_size=*/12,
2852 smallest_ikey, largest_ikey, 0, 0, false, 0, 0, 0,
2853 kUnknownFileChecksum, kUnknownFileChecksumFuncName);
2854 added_files.emplace_back(0, meta);
2855 }
2856 WriteFileAdditionAndDeletionToManifest(
2857 /*cf=*/0, added_files, std::vector<std::pair<int, uint64_t>>());
2858 std::vector<std::pair<int, uint64_t>> deleted_files;
2859 deleted_files.emplace_back(0, 10);
2860 WriteFileAdditionAndDeletionToManifest(
2861 /*cf=*/0, std::vector<std::pair<int, FileMetaData>>(), deleted_files);
2862 log_writer_.reset();
2863 Status s = SetCurrentFile(fs_.get(), dbname_, 1, nullptr);
2864 ASSERT_OK(s);
2865 std::string manifest_path;
2866 VerifyManifest(&manifest_path);
2867 std::string db_id;
2868 bool has_missing_table_file = false;
2869 s = versions_->TryRecoverFromOneManifest(manifest_path, column_families_,
2870 /*read_only=*/false, &db_id,
2871 &has_missing_table_file);
2872 ASSERT_OK(s);
2873 ASSERT_TRUE(has_missing_table_file);
2874 for (ColumnFamilyData* cfd : *(versions_->GetColumnFamilySet())) {
2875 VersionStorageInfo* vstorage = cfd->current()->storage_info();
2876 const std::vector<FileMetaData*>& files = vstorage->LevelFiles(0);
2877 ASSERT_TRUE(files.empty());
2878 }
2879 }
2880
TEST_F(VersionSetTestMissingFiles,ManifestAheadofSst)2881 TEST_F(VersionSetTestMissingFiles, ManifestAheadofSst) {
2882 std::vector<SstInfo> existing_files = {
2883 SstInfo(100, kDefaultColumnFamilyName, "a"),
2884 SstInfo(102, kDefaultColumnFamilyName, "b"),
2885 SstInfo(103, kDefaultColumnFamilyName, "c"),
2886 SstInfo(107, kDefaultColumnFamilyName, "d"),
2887 SstInfo(110, kDefaultColumnFamilyName, "e")};
2888 std::vector<FileMetaData> file_metas;
2889 CreateDummyTableFiles(existing_files, &file_metas);
2890
2891 PrepareManifest(&column_families_, &last_seqno_, &log_writer_);
2892 std::vector<std::pair<int, FileMetaData>> added_files;
2893 for (size_t i = 3; i != 5; ++i) {
2894 added_files.emplace_back(0, file_metas[i]);
2895 }
2896 WriteFileAdditionAndDeletionToManifest(
2897 /*cf=*/0, added_files, std::vector<std::pair<int, uint64_t>>());
2898
2899 added_files.clear();
2900 for (uint64_t file_num = 120; file_num < 130; ++file_num) {
2901 std::string smallest_ukey = "a";
2902 std::string largest_ukey = "b";
2903 InternalKey smallest_ikey(smallest_ukey, 1, ValueType::kTypeValue);
2904 InternalKey largest_ikey(largest_ukey, 1, ValueType::kTypeValue);
2905 FileMetaData meta =
2906 FileMetaData(file_num, /*file_path_id=*/0, /*file_size=*/12,
2907 smallest_ikey, largest_ikey, 0, 0, false, 0, 0, 0,
2908 kUnknownFileChecksum, kUnknownFileChecksumFuncName);
2909 added_files.emplace_back(0, meta);
2910 }
2911 WriteFileAdditionAndDeletionToManifest(
2912 /*cf=*/0, added_files, std::vector<std::pair<int, uint64_t>>());
2913 log_writer_.reset();
2914 Status s = SetCurrentFile(fs_.get(), dbname_, 1, nullptr);
2915 ASSERT_OK(s);
2916 std::string manifest_path;
2917 VerifyManifest(&manifest_path);
2918 std::string db_id;
2919 bool has_missing_table_file = false;
2920 s = versions_->TryRecoverFromOneManifest(manifest_path, column_families_,
2921 /*read_only=*/false, &db_id,
2922 &has_missing_table_file);
2923 ASSERT_OK(s);
2924 ASSERT_TRUE(has_missing_table_file);
2925 for (ColumnFamilyData* cfd : *(versions_->GetColumnFamilySet())) {
2926 VersionStorageInfo* vstorage = cfd->current()->storage_info();
2927 const std::vector<FileMetaData*>& files = vstorage->LevelFiles(0);
2928 if (cfd->GetName() == kDefaultColumnFamilyName) {
2929 ASSERT_EQ(2, files.size());
2930 for (const auto* fmeta : files) {
2931 if (fmeta->fd.GetNumber() != 107 && fmeta->fd.GetNumber() != 110) {
2932 ASSERT_FALSE(true);
2933 }
2934 }
2935 } else {
2936 ASSERT_TRUE(files.empty());
2937 }
2938 }
2939 }
2940
TEST_F(VersionSetTestMissingFiles,NoFileMissing)2941 TEST_F(VersionSetTestMissingFiles, NoFileMissing) {
2942 std::vector<SstInfo> existing_files = {
2943 SstInfo(100, kDefaultColumnFamilyName, "a"),
2944 SstInfo(102, kDefaultColumnFamilyName, "b"),
2945 SstInfo(103, kDefaultColumnFamilyName, "c"),
2946 SstInfo(107, kDefaultColumnFamilyName, "d"),
2947 SstInfo(110, kDefaultColumnFamilyName, "e")};
2948 std::vector<FileMetaData> file_metas;
2949 CreateDummyTableFiles(existing_files, &file_metas);
2950
2951 PrepareManifest(&column_families_, &last_seqno_, &log_writer_);
2952 std::vector<std::pair<int, FileMetaData>> added_files;
2953 for (const auto& meta : file_metas) {
2954 added_files.emplace_back(0, meta);
2955 }
2956 WriteFileAdditionAndDeletionToManifest(
2957 /*cf=*/0, added_files, std::vector<std::pair<int, uint64_t>>());
2958 std::vector<std::pair<int, uint64_t>> deleted_files;
2959 deleted_files.emplace_back(/*level=*/0, 100);
2960 WriteFileAdditionAndDeletionToManifest(
2961 /*cf=*/0, std::vector<std::pair<int, FileMetaData>>(), deleted_files);
2962 log_writer_.reset();
2963 Status s = SetCurrentFile(fs_.get(), dbname_, 1, nullptr);
2964 ASSERT_OK(s);
2965 std::string manifest_path;
2966 VerifyManifest(&manifest_path);
2967 std::string db_id;
2968 bool has_missing_table_file = false;
2969 s = versions_->TryRecoverFromOneManifest(manifest_path, column_families_,
2970 /*read_only=*/false, &db_id,
2971 &has_missing_table_file);
2972 ASSERT_OK(s);
2973 ASSERT_FALSE(has_missing_table_file);
2974 for (ColumnFamilyData* cfd : *(versions_->GetColumnFamilySet())) {
2975 VersionStorageInfo* vstorage = cfd->current()->storage_info();
2976 const std::vector<FileMetaData*>& files = vstorage->LevelFiles(0);
2977 if (cfd->GetName() == kDefaultColumnFamilyName) {
2978 ASSERT_EQ(existing_files.size() - deleted_files.size(), files.size());
2979 bool has_deleted_file = false;
2980 for (const auto* fmeta : files) {
2981 if (fmeta->fd.GetNumber() == 100) {
2982 has_deleted_file = true;
2983 break;
2984 }
2985 }
2986 ASSERT_FALSE(has_deleted_file);
2987 } else {
2988 ASSERT_TRUE(files.empty());
2989 }
2990 }
2991 }
2992
TEST_F(VersionSetTestMissingFiles,MinLogNumberToKeep2PC)2993 TEST_F(VersionSetTestMissingFiles, MinLogNumberToKeep2PC) {
2994 NewDB();
2995
2996 SstInfo sst(100, kDefaultColumnFamilyName, "a");
2997 std::vector<FileMetaData> file_metas;
2998 CreateDummyTableFiles({sst}, &file_metas);
2999
3000 constexpr WalNumber kMinWalNumberToKeep2PC = 10;
3001 VersionEdit edit;
3002 edit.AddFile(0, file_metas[0]);
3003 edit.SetMinLogNumberToKeep(kMinWalNumberToKeep2PC);
3004 ASSERT_OK(LogAndApplyToDefaultCF(edit));
3005 ASSERT_EQ(versions_->min_log_number_to_keep_2pc(), kMinWalNumberToKeep2PC);
3006
3007 for (int i = 0; i < 3; i++) {
3008 CreateNewManifest();
3009 ReopenDB();
3010 ASSERT_EQ(versions_->min_log_number_to_keep_2pc(), kMinWalNumberToKeep2PC);
3011 }
3012 }
3013
3014 } // namespace ROCKSDB_NAMESPACE
3015
main(int argc,char ** argv)3016 int main(int argc, char** argv) {
3017 ::testing::InitGoogleTest(&argc, argv);
3018 return RUN_ALL_TESTS();
3019 }
3020