1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "db/version_set.h"
11 
12 #include "db/db_impl/db_impl.h"
13 #include "db/log_writer.h"
14 #include "rocksdb/file_system.h"
15 #include "table/block_based/block_based_table_factory.h"
16 #include "table/mock_table.h"
17 #include "test_util/testharness.h"
18 #include "test_util/testutil.h"
19 #include "util/string_util.h"
20 
21 namespace ROCKSDB_NAMESPACE {
22 
23 class GenerateLevelFilesBriefTest : public testing::Test {
24  public:
25   std::vector<FileMetaData*> files_;
26   LevelFilesBrief file_level_;
27   Arena arena_;
28 
GenerateLevelFilesBriefTest()29   GenerateLevelFilesBriefTest() { }
30 
~GenerateLevelFilesBriefTest()31   ~GenerateLevelFilesBriefTest() override {
32     for (size_t i = 0; i < files_.size(); i++) {
33       delete files_[i];
34     }
35   }
36 
Add(const char * smallest,const char * largest,SequenceNumber smallest_seq=100,SequenceNumber largest_seq=100)37   void Add(const char* smallest, const char* largest,
38            SequenceNumber smallest_seq = 100,
39            SequenceNumber largest_seq = 100) {
40     FileMetaData* f = new FileMetaData(
41         files_.size() + 1, 0, 0,
42         InternalKey(smallest, smallest_seq, kTypeValue),
43         InternalKey(largest, largest_seq, kTypeValue), smallest_seq,
44         largest_seq, /* marked_for_compact */ false, kInvalidBlobFileNumber,
45         kUnknownOldestAncesterTime, kUnknownFileCreationTime,
46         kUnknownFileChecksum, kUnknownFileChecksumFuncName);
47     files_.push_back(f);
48   }
49 
Compare()50   int Compare() {
51     int diff = 0;
52     for (size_t i = 0; i < files_.size(); i++) {
53       if (file_level_.files[i].fd.GetNumber() != files_[i]->fd.GetNumber()) {
54         diff++;
55       }
56     }
57     return diff;
58   }
59 };
60 
TEST_F(GenerateLevelFilesBriefTest,Empty)61 TEST_F(GenerateLevelFilesBriefTest, Empty) {
62   DoGenerateLevelFilesBrief(&file_level_, files_, &arena_);
63   ASSERT_EQ(0u, file_level_.num_files);
64   ASSERT_EQ(0, Compare());
65 }
66 
TEST_F(GenerateLevelFilesBriefTest,Single)67 TEST_F(GenerateLevelFilesBriefTest, Single) {
68   Add("p", "q");
69   DoGenerateLevelFilesBrief(&file_level_, files_, &arena_);
70   ASSERT_EQ(1u, file_level_.num_files);
71   ASSERT_EQ(0, Compare());
72 }
73 
TEST_F(GenerateLevelFilesBriefTest,Multiple)74 TEST_F(GenerateLevelFilesBriefTest, Multiple) {
75   Add("150", "200");
76   Add("200", "250");
77   Add("300", "350");
78   Add("400", "450");
79   DoGenerateLevelFilesBrief(&file_level_, files_, &arena_);
80   ASSERT_EQ(4u, file_level_.num_files);
81   ASSERT_EQ(0, Compare());
82 }
83 
84 class CountingLogger : public Logger {
85  public:
CountingLogger()86   CountingLogger() : log_count(0) {}
87   using Logger::Logv;
Logv(const char *,va_list)88   void Logv(const char* /*format*/, va_list /*ap*/) override { log_count++; }
89   int log_count;
90 };
91 
GetOptionsWithNumLevels(int num_levels,std::shared_ptr<CountingLogger> logger)92 Options GetOptionsWithNumLevels(int num_levels,
93                                 std::shared_ptr<CountingLogger> logger) {
94   Options opt;
95   opt.num_levels = num_levels;
96   opt.info_log = logger;
97   return opt;
98 }
99 
100 class VersionStorageInfoTestBase : public testing::Test {
101  public:
102   const Comparator* ucmp_;
103   InternalKeyComparator icmp_;
104   std::shared_ptr<CountingLogger> logger_;
105   Options options_;
106   ImmutableOptions ioptions_;
107   MutableCFOptions mutable_cf_options_;
108   VersionStorageInfo vstorage_;
109 
GetInternalKey(const char * ukey,SequenceNumber smallest_seq=100)110   InternalKey GetInternalKey(const char* ukey,
111                              SequenceNumber smallest_seq = 100) {
112     return InternalKey(ukey, smallest_seq, kTypeValue);
113   }
114 
VersionStorageInfoTestBase(const Comparator * ucmp)115   explicit VersionStorageInfoTestBase(const Comparator* ucmp)
116       : ucmp_(ucmp),
117         icmp_(ucmp_),
118         logger_(new CountingLogger()),
119         options_(GetOptionsWithNumLevels(6, logger_)),
120         ioptions_(options_),
121         mutable_cf_options_(options_),
122         vstorage_(&icmp_, ucmp_, 6, kCompactionStyleLevel,
123                   /*src_vstorage=*/nullptr,
124                   /*_force_consistency_checks=*/false) {}
125 
~VersionStorageInfoTestBase()126   ~VersionStorageInfoTestBase() override {
127     for (int i = 0; i < vstorage_.num_levels(); ++i) {
128       for (auto* f : vstorage_.LevelFiles(i)) {
129         if (--f->refs == 0) {
130           delete f;
131         }
132       }
133     }
134   }
135 
Add(int level,uint32_t file_number,const char * smallest,const char * largest,uint64_t file_size=0)136   void Add(int level, uint32_t file_number, const char* smallest,
137            const char* largest, uint64_t file_size = 0) {
138     assert(level < vstorage_.num_levels());
139     FileMetaData* f = new FileMetaData(
140         file_number, 0, file_size, GetInternalKey(smallest, 0),
141         GetInternalKey(largest, 0), /* smallest_seq */ 0, /* largest_seq */ 0,
142         /* marked_for_compact */ false, kInvalidBlobFileNumber,
143         kUnknownOldestAncesterTime, kUnknownFileCreationTime,
144         kUnknownFileChecksum, kUnknownFileChecksumFuncName);
145     f->compensated_file_size = file_size;
146     vstorage_.AddFile(level, f);
147   }
148 
Add(int level,uint32_t file_number,const InternalKey & smallest,const InternalKey & largest,uint64_t file_size=0)149   void Add(int level, uint32_t file_number, const InternalKey& smallest,
150            const InternalKey& largest, uint64_t file_size = 0) {
151     assert(level < vstorage_.num_levels());
152     FileMetaData* f = new FileMetaData(
153         file_number, 0, file_size, smallest, largest, /* smallest_seq */ 0,
154         /* largest_seq */ 0, /* marked_for_compact */ false,
155         kInvalidBlobFileNumber, kUnknownOldestAncesterTime,
156         kUnknownFileCreationTime, kUnknownFileChecksum,
157         kUnknownFileChecksumFuncName);
158     f->compensated_file_size = file_size;
159     vstorage_.AddFile(level, f);
160   }
161 
GetOverlappingFiles(int level,const InternalKey & begin,const InternalKey & end)162   std::string GetOverlappingFiles(int level, const InternalKey& begin,
163                                   const InternalKey& end) {
164     std::vector<FileMetaData*> inputs;
165     vstorage_.GetOverlappingInputs(level, &begin, &end, &inputs);
166 
167     std::string result;
168     for (size_t i = 0; i < inputs.size(); ++i) {
169       if (i > 0) {
170         result += ",";
171       }
172       AppendNumberTo(&result, inputs[i]->fd.GetNumber());
173     }
174     return result;
175   }
176 };
177 
178 class VersionStorageInfoTest : public VersionStorageInfoTestBase {
179  public:
VersionStorageInfoTest()180   VersionStorageInfoTest() : VersionStorageInfoTestBase(BytewiseComparator()) {}
181 
~VersionStorageInfoTest()182   ~VersionStorageInfoTest() override {}
183 };
184 
TEST_F(VersionStorageInfoTest,MaxBytesForLevelStatic)185 TEST_F(VersionStorageInfoTest, MaxBytesForLevelStatic) {
186   ioptions_.level_compaction_dynamic_level_bytes = false;
187   mutable_cf_options_.max_bytes_for_level_base = 10;
188   mutable_cf_options_.max_bytes_for_level_multiplier = 5;
189   Add(4, 100U, "1", "2");
190   Add(5, 101U, "1", "2");
191 
192   vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
193   ASSERT_EQ(vstorage_.MaxBytesForLevel(1), 10U);
194   ASSERT_EQ(vstorage_.MaxBytesForLevel(2), 50U);
195   ASSERT_EQ(vstorage_.MaxBytesForLevel(3), 250U);
196   ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 1250U);
197 
198   ASSERT_EQ(0, logger_->log_count);
199 }
200 
TEST_F(VersionStorageInfoTest,MaxBytesForLevelDynamic)201 TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamic) {
202   ioptions_.level_compaction_dynamic_level_bytes = true;
203   mutable_cf_options_.max_bytes_for_level_base = 1000;
204   mutable_cf_options_.max_bytes_for_level_multiplier = 5;
205   Add(5, 1U, "1", "2", 500U);
206 
207   vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
208   ASSERT_EQ(0, logger_->log_count);
209   ASSERT_EQ(vstorage_.base_level(), 5);
210 
211   Add(5, 2U, "3", "4", 550U);
212   vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
213   ASSERT_EQ(0, logger_->log_count);
214   ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 1000U);
215   ASSERT_EQ(vstorage_.base_level(), 4);
216 
217   Add(4, 3U, "3", "4", 550U);
218   vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
219   ASSERT_EQ(0, logger_->log_count);
220   ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 1000U);
221   ASSERT_EQ(vstorage_.base_level(), 4);
222 
223   Add(3, 4U, "3", "4", 250U);
224   Add(3, 5U, "5", "7", 300U);
225   vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
226   ASSERT_EQ(1, logger_->log_count);
227   ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 1005U);
228   ASSERT_EQ(vstorage_.MaxBytesForLevel(3), 1000U);
229   ASSERT_EQ(vstorage_.base_level(), 3);
230 
231   Add(1, 6U, "3", "4", 5U);
232   Add(1, 7U, "8", "9", 5U);
233   logger_->log_count = 0;
234   vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
235   ASSERT_EQ(1, logger_->log_count);
236   ASSERT_GT(vstorage_.MaxBytesForLevel(4), 1005U);
237   ASSERT_GT(vstorage_.MaxBytesForLevel(3), 1005U);
238   ASSERT_EQ(vstorage_.MaxBytesForLevel(2), 1005U);
239   ASSERT_EQ(vstorage_.MaxBytesForLevel(1), 1000U);
240   ASSERT_EQ(vstorage_.base_level(), 1);
241 }
242 
TEST_F(VersionStorageInfoTest,MaxBytesForLevelDynamicLotsOfData)243 TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicLotsOfData) {
244   ioptions_.level_compaction_dynamic_level_bytes = true;
245   mutable_cf_options_.max_bytes_for_level_base = 100;
246   mutable_cf_options_.max_bytes_for_level_multiplier = 2;
247   Add(0, 1U, "1", "2", 50U);
248   Add(1, 2U, "1", "2", 50U);
249   Add(2, 3U, "1", "2", 500U);
250   Add(3, 4U, "1", "2", 500U);
251   Add(4, 5U, "1", "2", 1700U);
252   Add(5, 6U, "1", "2", 500U);
253 
254   vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
255   ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 800U);
256   ASSERT_EQ(vstorage_.MaxBytesForLevel(3), 400U);
257   ASSERT_EQ(vstorage_.MaxBytesForLevel(2), 200U);
258   ASSERT_EQ(vstorage_.MaxBytesForLevel(1), 100U);
259   ASSERT_EQ(vstorage_.base_level(), 1);
260   ASSERT_EQ(0, logger_->log_count);
261 }
262 
TEST_F(VersionStorageInfoTest,MaxBytesForLevelDynamicLargeLevel)263 TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicLargeLevel) {
264   uint64_t kOneGB = 1000U * 1000U * 1000U;
265   ioptions_.level_compaction_dynamic_level_bytes = true;
266   mutable_cf_options_.max_bytes_for_level_base = 10U * kOneGB;
267   mutable_cf_options_.max_bytes_for_level_multiplier = 10;
268   Add(0, 1U, "1", "2", 50U);
269   Add(3, 4U, "1", "2", 32U * kOneGB);
270   Add(4, 5U, "1", "2", 500U * kOneGB);
271   Add(5, 6U, "1", "2", 3000U * kOneGB);
272 
273   vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
274   ASSERT_EQ(vstorage_.MaxBytesForLevel(5), 3000U * kOneGB);
275   ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 300U * kOneGB);
276   ASSERT_EQ(vstorage_.MaxBytesForLevel(3), 30U * kOneGB);
277   ASSERT_EQ(vstorage_.MaxBytesForLevel(2), 10U * kOneGB);
278   ASSERT_EQ(vstorage_.base_level(), 2);
279   ASSERT_EQ(0, logger_->log_count);
280 }
281 
TEST_F(VersionStorageInfoTest,MaxBytesForLevelDynamicWithLargeL0_1)282 TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicWithLargeL0_1) {
283   ioptions_.level_compaction_dynamic_level_bytes = true;
284   mutable_cf_options_.max_bytes_for_level_base = 40000;
285   mutable_cf_options_.max_bytes_for_level_multiplier = 5;
286   mutable_cf_options_.level0_file_num_compaction_trigger = 2;
287 
288   Add(0, 1U, "1", "2", 10000U);
289   Add(0, 2U, "1", "2", 10000U);
290   Add(0, 3U, "1", "2", 10000U);
291 
292   Add(5, 4U, "1", "2", 1286250U);
293   Add(4, 5U, "1", "2", 200000U);
294   Add(3, 6U, "1", "2", 40000U);
295   Add(2, 7U, "1", "2", 8000U);
296 
297   vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
298   ASSERT_EQ(0, logger_->log_count);
299   ASSERT_EQ(2, vstorage_.base_level());
300   // level multiplier should be 3.5
301   ASSERT_EQ(vstorage_.level_multiplier(), 5.0);
302   // Level size should be around 30,000, 105,000, 367,500
303   ASSERT_EQ(40000U, vstorage_.MaxBytesForLevel(2));
304   ASSERT_EQ(51450U, vstorage_.MaxBytesForLevel(3));
305   ASSERT_EQ(257250U, vstorage_.MaxBytesForLevel(4));
306 }
307 
TEST_F(VersionStorageInfoTest,MaxBytesForLevelDynamicWithLargeL0_2)308 TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicWithLargeL0_2) {
309   ioptions_.level_compaction_dynamic_level_bytes = true;
310   mutable_cf_options_.max_bytes_for_level_base = 10000;
311   mutable_cf_options_.max_bytes_for_level_multiplier = 5;
312   mutable_cf_options_.level0_file_num_compaction_trigger = 2;
313 
314   Add(0, 11U, "1", "2", 10000U);
315   Add(0, 12U, "1", "2", 10000U);
316   Add(0, 13U, "1", "2", 10000U);
317 
318   Add(5, 4U, "1", "2", 1286250U);
319   Add(4, 5U, "1", "2", 200000U);
320   Add(3, 6U, "1", "2", 40000U);
321   Add(2, 7U, "1", "2", 8000U);
322 
323   vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
324   ASSERT_EQ(0, logger_->log_count);
325   ASSERT_EQ(2, vstorage_.base_level());
326   // level multiplier should be 3.5
327   ASSERT_LT(vstorage_.level_multiplier(), 3.6);
328   ASSERT_GT(vstorage_.level_multiplier(), 3.4);
329   // Level size should be around 30,000, 105,000, 367,500
330   ASSERT_EQ(30000U, vstorage_.MaxBytesForLevel(2));
331   ASSERT_LT(vstorage_.MaxBytesForLevel(3), 110000U);
332   ASSERT_GT(vstorage_.MaxBytesForLevel(3), 100000U);
333   ASSERT_LT(vstorage_.MaxBytesForLevel(4), 370000U);
334   ASSERT_GT(vstorage_.MaxBytesForLevel(4), 360000U);
335 }
336 
TEST_F(VersionStorageInfoTest,MaxBytesForLevelDynamicWithLargeL0_3)337 TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicWithLargeL0_3) {
338   ioptions_.level_compaction_dynamic_level_bytes = true;
339   mutable_cf_options_.max_bytes_for_level_base = 10000;
340   mutable_cf_options_.max_bytes_for_level_multiplier = 5;
341   mutable_cf_options_.level0_file_num_compaction_trigger = 2;
342 
343   Add(0, 11U, "1", "2", 5000U);
344   Add(0, 12U, "1", "2", 5000U);
345   Add(0, 13U, "1", "2", 5000U);
346   Add(0, 14U, "1", "2", 5000U);
347   Add(0, 15U, "1", "2", 5000U);
348   Add(0, 16U, "1", "2", 5000U);
349 
350   Add(5, 4U, "1", "2", 1286250U);
351   Add(4, 5U, "1", "2", 200000U);
352   Add(3, 6U, "1", "2", 40000U);
353   Add(2, 7U, "1", "2", 8000U);
354 
355   vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
356   ASSERT_EQ(0, logger_->log_count);
357   ASSERT_EQ(2, vstorage_.base_level());
358   // level multiplier should be 3.5
359   ASSERT_LT(vstorage_.level_multiplier(), 3.6);
360   ASSERT_GT(vstorage_.level_multiplier(), 3.4);
361   // Level size should be around 30,000, 105,000, 367,500
362   ASSERT_EQ(30000U, vstorage_.MaxBytesForLevel(2));
363   ASSERT_LT(vstorage_.MaxBytesForLevel(3), 110000U);
364   ASSERT_GT(vstorage_.MaxBytesForLevel(3), 100000U);
365   ASSERT_LT(vstorage_.MaxBytesForLevel(4), 370000U);
366   ASSERT_GT(vstorage_.MaxBytesForLevel(4), 360000U);
367 }
368 
TEST_F(VersionStorageInfoTest,EstimateLiveDataSize)369 TEST_F(VersionStorageInfoTest, EstimateLiveDataSize) {
370   // Test whether the overlaps are detected as expected
371   Add(1, 1U, "4", "7", 1U);  // Perfect overlap with last level
372   Add(2, 2U, "3", "5", 1U);  // Partial overlap with last level
373   Add(2, 3U, "6", "8", 1U);  // Partial overlap with last level
374   Add(3, 4U, "1", "9", 1U);  // Contains range of last level
375   Add(4, 5U, "4", "5", 1U);  // Inside range of last level
376   Add(4, 6U, "6", "7", 1U);  // Inside range of last level
377   Add(5, 7U, "4", "7", 10U);
378   ASSERT_EQ(10U, vstorage_.EstimateLiveDataSize());
379 }
380 
TEST_F(VersionStorageInfoTest,EstimateLiveDataSize2)381 TEST_F(VersionStorageInfoTest, EstimateLiveDataSize2) {
382   Add(0, 1U, "9", "9", 1U);  // Level 0 is not ordered
383   Add(0, 2U, "5", "6", 1U);  // Ignored because of [5,6] in l1
384   Add(1, 3U, "1", "2", 1U);  // Ignored because of [2,3] in l2
385   Add(1, 4U, "3", "4", 1U);  // Ignored because of [2,3] in l2
386   Add(1, 5U, "5", "6", 1U);
387   Add(2, 6U, "2", "3", 1U);
388   Add(3, 7U, "7", "8", 1U);
389   ASSERT_EQ(4U, vstorage_.EstimateLiveDataSize());
390 }
391 
TEST_F(VersionStorageInfoTest,GetOverlappingInputs)392 TEST_F(VersionStorageInfoTest, GetOverlappingInputs) {
393   // Two files that overlap at the range deletion tombstone sentinel.
394   Add(1, 1U, {"a", 0, kTypeValue}, {"b", kMaxSequenceNumber, kTypeRangeDeletion}, 1);
395   Add(1, 2U, {"b", 0, kTypeValue}, {"c", 0, kTypeValue}, 1);
396   // Two files that overlap at the same user key.
397   Add(1, 3U, {"d", 0, kTypeValue}, {"e", kMaxSequenceNumber, kTypeValue}, 1);
398   Add(1, 4U, {"e", 0, kTypeValue}, {"f", 0, kTypeValue}, 1);
399   // Two files that do not overlap.
400   Add(1, 5U, {"g", 0, kTypeValue}, {"h", 0, kTypeValue}, 1);
401   Add(1, 6U, {"i", 0, kTypeValue}, {"j", 0, kTypeValue}, 1);
402   vstorage_.UpdateNumNonEmptyLevels();
403   vstorage_.GenerateLevelFilesBrief();
404 
405   ASSERT_EQ("1,2", GetOverlappingFiles(
406       1, {"a", 0, kTypeValue}, {"b", 0, kTypeValue}));
407   ASSERT_EQ("1", GetOverlappingFiles(
408       1, {"a", 0, kTypeValue}, {"b", kMaxSequenceNumber, kTypeRangeDeletion}));
409   ASSERT_EQ("2", GetOverlappingFiles(
410       1, {"b", kMaxSequenceNumber, kTypeValue}, {"c", 0, kTypeValue}));
411   ASSERT_EQ("3,4", GetOverlappingFiles(
412       1, {"d", 0, kTypeValue}, {"e", 0, kTypeValue}));
413   ASSERT_EQ("3", GetOverlappingFiles(
414       1, {"d", 0, kTypeValue}, {"e", kMaxSequenceNumber, kTypeRangeDeletion}));
415   ASSERT_EQ("3,4", GetOverlappingFiles(
416       1, {"e", kMaxSequenceNumber, kTypeValue}, {"f", 0, kTypeValue}));
417   ASSERT_EQ("3,4", GetOverlappingFiles(
418       1, {"e", 0, kTypeValue}, {"f", 0, kTypeValue}));
419   ASSERT_EQ("5", GetOverlappingFiles(
420       1, {"g", 0, kTypeValue}, {"h", 0, kTypeValue}));
421   ASSERT_EQ("6", GetOverlappingFiles(
422       1, {"i", 0, kTypeValue}, {"j", 0, kTypeValue}));
423 }
424 
TEST_F(VersionStorageInfoTest,FileLocationAndMetaDataByNumber)425 TEST_F(VersionStorageInfoTest, FileLocationAndMetaDataByNumber) {
426   Add(0, 11U, "1", "2", 5000U);
427   Add(0, 12U, "1", "2", 5000U);
428 
429   Add(2, 7U, "1", "2", 8000U);
430 
431   ASSERT_EQ(vstorage_.GetFileLocation(11U),
432             VersionStorageInfo::FileLocation(0, 0));
433   ASSERT_NE(vstorage_.GetFileMetaDataByNumber(11U), nullptr);
434 
435   ASSERT_EQ(vstorage_.GetFileLocation(12U),
436             VersionStorageInfo::FileLocation(0, 1));
437   ASSERT_NE(vstorage_.GetFileMetaDataByNumber(12U), nullptr);
438 
439   ASSERT_EQ(vstorage_.GetFileLocation(7U),
440             VersionStorageInfo::FileLocation(2, 0));
441   ASSERT_NE(vstorage_.GetFileMetaDataByNumber(7U), nullptr);
442 
443   ASSERT_FALSE(vstorage_.GetFileLocation(999U).IsValid());
444   ASSERT_EQ(vstorage_.GetFileMetaDataByNumber(999U), nullptr);
445 }
446 
447 class VersionStorageInfoTimestampTest : public VersionStorageInfoTestBase {
448  public:
VersionStorageInfoTimestampTest()449   VersionStorageInfoTimestampTest()
450       : VersionStorageInfoTestBase(test::ComparatorWithU64Ts()) {}
~VersionStorageInfoTimestampTest()451   ~VersionStorageInfoTimestampTest() override {}
Timestamp(uint64_t ts) const452   std::string Timestamp(uint64_t ts) const {
453     std::string ret;
454     PutFixed64(&ret, ts);
455     return ret;
456   }
PackUserKeyAndTimestamp(const Slice & ukey,uint64_t ts) const457   std::string PackUserKeyAndTimestamp(const Slice& ukey, uint64_t ts) const {
458     std::string ret;
459     ret.assign(ukey.data(), ukey.size());
460     PutFixed64(&ret, ts);
461     return ret;
462   }
463 };
464 
TEST_F(VersionStorageInfoTimestampTest,GetOverlappingInputs)465 TEST_F(VersionStorageInfoTimestampTest, GetOverlappingInputs) {
466   Add(/*level=*/1, /*file_number=*/1, /*smallest=*/
467       {PackUserKeyAndTimestamp("a", /*ts=*/9), /*s=*/0, kTypeValue},
468       /*largest=*/
469       {PackUserKeyAndTimestamp("a", /*ts=*/8), /*s=*/0, kTypeValue},
470       /*file_size=*/100);
471   Add(/*level=*/1, /*file_number=*/2, /*smallest=*/
472       {PackUserKeyAndTimestamp("a", /*ts=*/5), /*s=*/0, kTypeValue},
473       /*largest=*/
474       {PackUserKeyAndTimestamp("b", /*ts=*/10), /*s=*/0, kTypeValue},
475       /*file_size=*/100);
476   Add(/*level=*/1, /*file_number=*/3, /*smallest=*/
477       {PackUserKeyAndTimestamp("c", /*ts=*/12), /*s=*/0, kTypeValue},
478       /*largest=*/
479       {PackUserKeyAndTimestamp("d", /*ts=*/1), /*s=*/0, kTypeValue},
480       /*file_size=*/100);
481   vstorage_.UpdateNumNonEmptyLevels();
482   vstorage_.GenerateLevelFilesBrief();
483   ASSERT_EQ(
484       "1,2",
485       GetOverlappingFiles(
486           /*level=*/1,
487           {PackUserKeyAndTimestamp("a", /*ts=*/12), /*s=*/0, kTypeValue},
488           {PackUserKeyAndTimestamp("a", /*ts=*/11), /*s=*/0, kTypeValue}));
489   ASSERT_EQ("3",
490             GetOverlappingFiles(
491                 /*level=*/1,
492                 {PackUserKeyAndTimestamp("c", /*ts=*/15), /*s=*/0, kTypeValue},
493                 {PackUserKeyAndTimestamp("c", /*ts=*/2), /*s=*/0, kTypeValue}));
494 }
495 
496 class FindLevelFileTest : public testing::Test {
497  public:
498   LevelFilesBrief file_level_;
499   bool disjoint_sorted_files_;
500   Arena arena_;
501 
FindLevelFileTest()502   FindLevelFileTest() : disjoint_sorted_files_(true) { }
503 
~FindLevelFileTest()504   ~FindLevelFileTest() override {}
505 
LevelFileInit(size_t num=0)506   void LevelFileInit(size_t num = 0) {
507     char* mem = arena_.AllocateAligned(num * sizeof(FdWithKeyRange));
508     file_level_.files = new (mem)FdWithKeyRange[num];
509     file_level_.num_files = 0;
510   }
511 
Add(const char * smallest,const char * largest,SequenceNumber smallest_seq=100,SequenceNumber largest_seq=100)512   void Add(const char* smallest, const char* largest,
513            SequenceNumber smallest_seq = 100,
514            SequenceNumber largest_seq = 100) {
515     InternalKey smallest_key = InternalKey(smallest, smallest_seq, kTypeValue);
516     InternalKey largest_key = InternalKey(largest, largest_seq, kTypeValue);
517 
518     Slice smallest_slice = smallest_key.Encode();
519     Slice largest_slice = largest_key.Encode();
520 
521     char* mem = arena_.AllocateAligned(
522         smallest_slice.size() + largest_slice.size());
523     memcpy(mem, smallest_slice.data(), smallest_slice.size());
524     memcpy(mem + smallest_slice.size(), largest_slice.data(),
525         largest_slice.size());
526 
527     // add to file_level_
528     size_t num = file_level_.num_files;
529     auto& file = file_level_.files[num];
530     file.fd = FileDescriptor(num + 1, 0, 0);
531     file.smallest_key = Slice(mem, smallest_slice.size());
532     file.largest_key = Slice(mem + smallest_slice.size(),
533         largest_slice.size());
534     file_level_.num_files++;
535   }
536 
Find(const char * key)537   int Find(const char* key) {
538     InternalKey target(key, 100, kTypeValue);
539     InternalKeyComparator cmp(BytewiseComparator());
540     return FindFile(cmp, file_level_, target.Encode());
541   }
542 
Overlaps(const char * smallest,const char * largest)543   bool Overlaps(const char* smallest, const char* largest) {
544     InternalKeyComparator cmp(BytewiseComparator());
545     Slice s(smallest != nullptr ? smallest : "");
546     Slice l(largest != nullptr ? largest : "");
547     return SomeFileOverlapsRange(cmp, disjoint_sorted_files_, file_level_,
548                                  (smallest != nullptr ? &s : nullptr),
549                                  (largest != nullptr ? &l : nullptr));
550   }
551 };
552 
TEST_F(FindLevelFileTest,LevelEmpty)553 TEST_F(FindLevelFileTest, LevelEmpty) {
554   LevelFileInit(0);
555 
556   ASSERT_EQ(0, Find("foo"));
557   ASSERT_TRUE(! Overlaps("a", "z"));
558   ASSERT_TRUE(! Overlaps(nullptr, "z"));
559   ASSERT_TRUE(! Overlaps("a", nullptr));
560   ASSERT_TRUE(! Overlaps(nullptr, nullptr));
561 }
562 
TEST_F(FindLevelFileTest,LevelSingle)563 TEST_F(FindLevelFileTest, LevelSingle) {
564   LevelFileInit(1);
565 
566   Add("p", "q");
567   ASSERT_EQ(0, Find("a"));
568   ASSERT_EQ(0, Find("p"));
569   ASSERT_EQ(0, Find("p1"));
570   ASSERT_EQ(0, Find("q"));
571   ASSERT_EQ(1, Find("q1"));
572   ASSERT_EQ(1, Find("z"));
573 
574   ASSERT_TRUE(! Overlaps("a", "b"));
575   ASSERT_TRUE(! Overlaps("z1", "z2"));
576   ASSERT_TRUE(Overlaps("a", "p"));
577   ASSERT_TRUE(Overlaps("a", "q"));
578   ASSERT_TRUE(Overlaps("a", "z"));
579   ASSERT_TRUE(Overlaps("p", "p1"));
580   ASSERT_TRUE(Overlaps("p", "q"));
581   ASSERT_TRUE(Overlaps("p", "z"));
582   ASSERT_TRUE(Overlaps("p1", "p2"));
583   ASSERT_TRUE(Overlaps("p1", "z"));
584   ASSERT_TRUE(Overlaps("q", "q"));
585   ASSERT_TRUE(Overlaps("q", "q1"));
586 
587   ASSERT_TRUE(! Overlaps(nullptr, "j"));
588   ASSERT_TRUE(! Overlaps("r", nullptr));
589   ASSERT_TRUE(Overlaps(nullptr, "p"));
590   ASSERT_TRUE(Overlaps(nullptr, "p1"));
591   ASSERT_TRUE(Overlaps("q", nullptr));
592   ASSERT_TRUE(Overlaps(nullptr, nullptr));
593 }
594 
TEST_F(FindLevelFileTest,LevelMultiple)595 TEST_F(FindLevelFileTest, LevelMultiple) {
596   LevelFileInit(4);
597 
598   Add("150", "200");
599   Add("200", "250");
600   Add("300", "350");
601   Add("400", "450");
602   ASSERT_EQ(0, Find("100"));
603   ASSERT_EQ(0, Find("150"));
604   ASSERT_EQ(0, Find("151"));
605   ASSERT_EQ(0, Find("199"));
606   ASSERT_EQ(0, Find("200"));
607   ASSERT_EQ(1, Find("201"));
608   ASSERT_EQ(1, Find("249"));
609   ASSERT_EQ(1, Find("250"));
610   ASSERT_EQ(2, Find("251"));
611   ASSERT_EQ(2, Find("299"));
612   ASSERT_EQ(2, Find("300"));
613   ASSERT_EQ(2, Find("349"));
614   ASSERT_EQ(2, Find("350"));
615   ASSERT_EQ(3, Find("351"));
616   ASSERT_EQ(3, Find("400"));
617   ASSERT_EQ(3, Find("450"));
618   ASSERT_EQ(4, Find("451"));
619 
620   ASSERT_TRUE(! Overlaps("100", "149"));
621   ASSERT_TRUE(! Overlaps("251", "299"));
622   ASSERT_TRUE(! Overlaps("451", "500"));
623   ASSERT_TRUE(! Overlaps("351", "399"));
624 
625   ASSERT_TRUE(Overlaps("100", "150"));
626   ASSERT_TRUE(Overlaps("100", "200"));
627   ASSERT_TRUE(Overlaps("100", "300"));
628   ASSERT_TRUE(Overlaps("100", "400"));
629   ASSERT_TRUE(Overlaps("100", "500"));
630   ASSERT_TRUE(Overlaps("375", "400"));
631   ASSERT_TRUE(Overlaps("450", "450"));
632   ASSERT_TRUE(Overlaps("450", "500"));
633 }
634 
TEST_F(FindLevelFileTest,LevelMultipleNullBoundaries)635 TEST_F(FindLevelFileTest, LevelMultipleNullBoundaries) {
636   LevelFileInit(4);
637 
638   Add("150", "200");
639   Add("200", "250");
640   Add("300", "350");
641   Add("400", "450");
642   ASSERT_TRUE(! Overlaps(nullptr, "149"));
643   ASSERT_TRUE(! Overlaps("451", nullptr));
644   ASSERT_TRUE(Overlaps(nullptr, nullptr));
645   ASSERT_TRUE(Overlaps(nullptr, "150"));
646   ASSERT_TRUE(Overlaps(nullptr, "199"));
647   ASSERT_TRUE(Overlaps(nullptr, "200"));
648   ASSERT_TRUE(Overlaps(nullptr, "201"));
649   ASSERT_TRUE(Overlaps(nullptr, "400"));
650   ASSERT_TRUE(Overlaps(nullptr, "800"));
651   ASSERT_TRUE(Overlaps("100", nullptr));
652   ASSERT_TRUE(Overlaps("200", nullptr));
653   ASSERT_TRUE(Overlaps("449", nullptr));
654   ASSERT_TRUE(Overlaps("450", nullptr));
655 }
656 
TEST_F(FindLevelFileTest,LevelOverlapSequenceChecks)657 TEST_F(FindLevelFileTest, LevelOverlapSequenceChecks) {
658   LevelFileInit(1);
659 
660   Add("200", "200", 5000, 3000);
661   ASSERT_TRUE(! Overlaps("199", "199"));
662   ASSERT_TRUE(! Overlaps("201", "300"));
663   ASSERT_TRUE(Overlaps("200", "200"));
664   ASSERT_TRUE(Overlaps("190", "200"));
665   ASSERT_TRUE(Overlaps("200", "210"));
666 }
667 
TEST_F(FindLevelFileTest,LevelOverlappingFiles)668 TEST_F(FindLevelFileTest, LevelOverlappingFiles) {
669   LevelFileInit(2);
670 
671   Add("150", "600");
672   Add("400", "500");
673   disjoint_sorted_files_ = false;
674   ASSERT_TRUE(! Overlaps("100", "149"));
675   ASSERT_TRUE(! Overlaps("601", "700"));
676   ASSERT_TRUE(Overlaps("100", "150"));
677   ASSERT_TRUE(Overlaps("100", "200"));
678   ASSERT_TRUE(Overlaps("100", "300"));
679   ASSERT_TRUE(Overlaps("100", "400"));
680   ASSERT_TRUE(Overlaps("100", "500"));
681   ASSERT_TRUE(Overlaps("375", "400"));
682   ASSERT_TRUE(Overlaps("450", "450"));
683   ASSERT_TRUE(Overlaps("450", "500"));
684   ASSERT_TRUE(Overlaps("450", "700"));
685   ASSERT_TRUE(Overlaps("600", "700"));
686 }
687 
688 class VersionSetTestBase {
689  public:
690   const static std::string kColumnFamilyName1;
691   const static std::string kColumnFamilyName2;
692   const static std::string kColumnFamilyName3;
693   int num_initial_edits_;
694 
VersionSetTestBase(const std::string & name)695   explicit VersionSetTestBase(const std::string& name)
696       : env_(nullptr),
697         dbname_(test::PerThreadDBPath(name)),
698         options_(),
699         db_options_(options_),
700         cf_options_(options_),
701         immutable_cf_options_(db_options_, cf_options_),
702         mutable_cf_options_(cf_options_),
703         table_cache_(NewLRUCache(50000, 16)),
704         write_buffer_manager_(db_options_.db_write_buffer_size),
705         shutting_down_(false),
706         mock_table_factory_(std::make_shared<mock::MockTableFactory>()) {
707     const char* test_env_uri = getenv("TEST_ENV_URI");
708     if (test_env_uri) {
709       Status s = Env::LoadEnv(test_env_uri, &env_, &env_guard_);
710       EXPECT_OK(s);
711     } else if (getenv("MEM_ENV")) {
712       env_guard_.reset(NewMemEnv(Env::Default()));
713       env_ = env_guard_.get();
714     } else {
715       env_ = Env::Default();
716     }
717     EXPECT_NE(nullptr, env_);
718 
719     fs_ = env_->GetFileSystem();
720     EXPECT_OK(fs_->CreateDirIfMissing(dbname_, IOOptions(), nullptr));
721 
722     options_.env = env_;
723     db_options_.env = env_;
724     db_options_.fs = fs_;
725     immutable_cf_options_.env = env_;
726     immutable_cf_options_.fs = fs_;
727     immutable_cf_options_.clock = env_->GetSystemClock().get();
728 
729     versions_.reset(
730         new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
731                        &write_buffer_manager_, &write_controller_,
732                        /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
733     reactive_versions_ = std::make_shared<ReactiveVersionSet>(
734         dbname_, &db_options_, env_options_, table_cache_.get(),
735         &write_buffer_manager_, &write_controller_, nullptr);
736     db_options_.db_paths.emplace_back(dbname_,
737                                       std::numeric_limits<uint64_t>::max());
738   }
739 
~VersionSetTestBase()740   virtual ~VersionSetTestBase() {
741     if (getenv("KEEP_DB")) {
742       fprintf(stdout, "DB is still at %s\n", dbname_.c_str());
743     } else {
744       Options options;
745       options.env = env_;
746       EXPECT_OK(DestroyDB(dbname_, options));
747     }
748   }
749 
750  protected:
PrepareManifest(std::vector<ColumnFamilyDescriptor> * column_families,SequenceNumber * last_seqno,std::unique_ptr<log::Writer> * log_writer)751   virtual void PrepareManifest(
752       std::vector<ColumnFamilyDescriptor>* column_families,
753       SequenceNumber* last_seqno, std::unique_ptr<log::Writer>* log_writer) {
754     assert(column_families != nullptr);
755     assert(last_seqno != nullptr);
756     assert(log_writer != nullptr);
757     VersionEdit new_db;
758     if (db_options_.write_dbid_to_manifest) {
759       DBOptions tmp_db_options;
760       tmp_db_options.env = env_;
761       std::unique_ptr<DBImpl> impl(new DBImpl(tmp_db_options, dbname_));
762       std::string db_id;
763       impl->GetDbIdentityFromIdentityFile(&db_id);
764       new_db.SetDBId(db_id);
765     }
766     new_db.SetLogNumber(0);
767     new_db.SetNextFile(2);
768     new_db.SetLastSequence(0);
769 
770     const std::vector<std::string> cf_names = {
771         kDefaultColumnFamilyName, kColumnFamilyName1, kColumnFamilyName2,
772         kColumnFamilyName3};
773     const int kInitialNumOfCfs = static_cast<int>(cf_names.size());
774     autovector<VersionEdit> new_cfs;
775     uint64_t last_seq = 1;
776     uint32_t cf_id = 1;
777     for (int i = 1; i != kInitialNumOfCfs; ++i) {
778       VersionEdit new_cf;
779       new_cf.AddColumnFamily(cf_names[i]);
780       new_cf.SetColumnFamily(cf_id++);
781       new_cf.SetLogNumber(0);
782       new_cf.SetNextFile(2);
783       new_cf.SetLastSequence(last_seq++);
784       new_cfs.emplace_back(new_cf);
785     }
786     *last_seqno = last_seq;
787     num_initial_edits_ = static_cast<int>(new_cfs.size() + 1);
788     std::unique_ptr<WritableFileWriter> file_writer;
789     const std::string manifest = DescriptorFileName(dbname_, 1);
790     const auto& fs = env_->GetFileSystem();
791     Status s = WritableFileWriter::Create(
792         fs, manifest, fs->OptimizeForManifestWrite(env_options_), &file_writer,
793         nullptr);
794     ASSERT_OK(s);
795     {
796       log_writer->reset(new log::Writer(std::move(file_writer), 0, false));
797       std::string record;
798       new_db.EncodeTo(&record);
799       s = (*log_writer)->AddRecord(record);
800       for (const auto& e : new_cfs) {
801         record.clear();
802         e.EncodeTo(&record);
803         s = (*log_writer)->AddRecord(record);
804         ASSERT_OK(s);
805       }
806     }
807     ASSERT_OK(s);
808 
809     cf_options_.table_factory = mock_table_factory_;
810     for (const auto& cf_name : cf_names) {
811       column_families->emplace_back(cf_name, cf_options_);
812     }
813   }
814 
815   // Create DB with 3 column families.
NewDB()816   void NewDB() {
817     SequenceNumber last_seqno;
818     std::unique_ptr<log::Writer> log_writer;
819     SetIdentityFile(env_, dbname_);
820     PrepareManifest(&column_families_, &last_seqno, &log_writer);
821     log_writer.reset();
822     // Make "CURRENT" file point to the new manifest file.
823     Status s = SetCurrentFile(fs_.get(), dbname_, 1, nullptr);
824     ASSERT_OK(s);
825 
826     EXPECT_OK(versions_->Recover(column_families_, false));
827     EXPECT_EQ(column_families_.size(),
828               versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
829   }
830 
ReopenDB()831   void ReopenDB() {
832     versions_.reset(
833         new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
834                        &write_buffer_manager_, &write_controller_,
835                        /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
836     EXPECT_OK(versions_->Recover(column_families_, false));
837   }
838 
VerifyManifest(std::string * manifest_path) const839   void VerifyManifest(std::string* manifest_path) const {
840     assert(manifest_path != nullptr);
841     uint64_t manifest_file_number = 0;
842     Status s = versions_->GetCurrentManifestPath(
843         dbname_, fs_.get(), manifest_path, &manifest_file_number);
844     ASSERT_OK(s);
845     ASSERT_EQ(1, manifest_file_number);
846   }
847 
LogAndApplyToDefaultCF(VersionEdit & edit)848   Status LogAndApplyToDefaultCF(VersionEdit& edit) {
849     mutex_.Lock();
850     Status s =
851         versions_->LogAndApply(versions_->GetColumnFamilySet()->GetDefault(),
852                                mutable_cf_options_, &edit, &mutex_);
853     mutex_.Unlock();
854     return s;
855   }
856 
LogAndApplyToDefaultCF(const autovector<std::unique_ptr<VersionEdit>> & edits)857   Status LogAndApplyToDefaultCF(
858       const autovector<std::unique_ptr<VersionEdit>>& edits) {
859     autovector<VersionEdit*> vedits;
860     for (auto& e : edits) {
861       vedits.push_back(e.get());
862     }
863     mutex_.Lock();
864     Status s =
865         versions_->LogAndApply(versions_->GetColumnFamilySet()->GetDefault(),
866                                mutable_cf_options_, vedits, &mutex_);
867     mutex_.Unlock();
868     return s;
869   }
870 
CreateNewManifest()871   void CreateNewManifest() {
872     constexpr FSDirectory* db_directory = nullptr;
873     constexpr bool new_descriptor_log = true;
874     mutex_.Lock();
875     VersionEdit dummy;
876     ASSERT_OK(versions_->LogAndApply(
877         versions_->GetColumnFamilySet()->GetDefault(), mutable_cf_options_,
878         &dummy, &mutex_, db_directory, new_descriptor_log));
879     mutex_.Unlock();
880   }
881 
CreateColumnFamily(const std::string & cf_name,const ColumnFamilyOptions & cf_options)882   ColumnFamilyData* CreateColumnFamily(const std::string& cf_name,
883                                        const ColumnFamilyOptions& cf_options) {
884     VersionEdit new_cf;
885     new_cf.AddColumnFamily(cf_name);
886     uint32_t new_id = versions_->GetColumnFamilySet()->GetNextColumnFamilyID();
887     new_cf.SetColumnFamily(new_id);
888     new_cf.SetLogNumber(0);
889     new_cf.SetComparatorName(cf_options.comparator->Name());
890     Status s;
891     mutex_.Lock();
892     s = versions_->LogAndApply(/*column_family_data=*/nullptr,
893                                MutableCFOptions(cf_options), &new_cf, &mutex_,
894                                /*db_directory=*/nullptr,
895                                /*new_descriptor_log=*/false, &cf_options);
896     mutex_.Unlock();
897     EXPECT_OK(s);
898     ColumnFamilyData* cfd =
899         versions_->GetColumnFamilySet()->GetColumnFamily(cf_name);
900     EXPECT_NE(nullptr, cfd);
901     return cfd;
902   }
903 
904   Env* mem_env_;
905   Env* env_;
906   std::shared_ptr<Env> env_guard_;
907   std::shared_ptr<FileSystem> fs_;
908   const std::string dbname_;
909   EnvOptions env_options_;
910   Options options_;
911   ImmutableDBOptions db_options_;
912   ColumnFamilyOptions cf_options_;
913   ImmutableOptions immutable_cf_options_;
914   MutableCFOptions mutable_cf_options_;
915   std::shared_ptr<Cache> table_cache_;
916   WriteController write_controller_;
917   WriteBufferManager write_buffer_manager_;
918   std::shared_ptr<VersionSet> versions_;
919   std::shared_ptr<ReactiveVersionSet> reactive_versions_;
920   InstrumentedMutex mutex_;
921   std::atomic<bool> shutting_down_;
922   std::shared_ptr<mock::MockTableFactory> mock_table_factory_;
923   std::vector<ColumnFamilyDescriptor> column_families_;
924 };
925 
926 const std::string VersionSetTestBase::kColumnFamilyName1 = "alice";
927 const std::string VersionSetTestBase::kColumnFamilyName2 = "bob";
928 const std::string VersionSetTestBase::kColumnFamilyName3 = "charles";
929 
930 class VersionSetTest : public VersionSetTestBase, public testing::Test {
931  public:
VersionSetTest()932   VersionSetTest() : VersionSetTestBase("version_set_test") {}
933 };
934 
TEST_F(VersionSetTest,SameColumnFamilyGroupCommit)935 TEST_F(VersionSetTest, SameColumnFamilyGroupCommit) {
936   NewDB();
937   const int kGroupSize = 5;
938   autovector<VersionEdit> edits;
939   for (int i = 0; i != kGroupSize; ++i) {
940     edits.emplace_back(VersionEdit());
941   }
942   autovector<ColumnFamilyData*> cfds;
943   autovector<const MutableCFOptions*> all_mutable_cf_options;
944   autovector<autovector<VersionEdit*>> edit_lists;
945   for (int i = 0; i != kGroupSize; ++i) {
946     cfds.emplace_back(versions_->GetColumnFamilySet()->GetDefault());
947     all_mutable_cf_options.emplace_back(&mutable_cf_options_);
948     autovector<VersionEdit*> edit_list;
949     edit_list.emplace_back(&edits[i]);
950     edit_lists.emplace_back(edit_list);
951   }
952 
953   SyncPoint::GetInstance()->DisableProcessing();
954   SyncPoint::GetInstance()->ClearAllCallBacks();
955   int count = 0;
956   SyncPoint::GetInstance()->SetCallBack(
957       "VersionSet::ProcessManifestWrites:SameColumnFamily", [&](void* arg) {
958         uint32_t* cf_id = reinterpret_cast<uint32_t*>(arg);
959         EXPECT_EQ(0u, *cf_id);
960         ++count;
961       });
962   SyncPoint::GetInstance()->EnableProcessing();
963   mutex_.Lock();
964   Status s =
965       versions_->LogAndApply(cfds, all_mutable_cf_options, edit_lists, &mutex_);
966   mutex_.Unlock();
967   EXPECT_OK(s);
968   EXPECT_EQ(kGroupSize - 1, count);
969 }
970 
TEST_F(VersionSetTest,PersistBlobFileStateInNewManifest)971 TEST_F(VersionSetTest, PersistBlobFileStateInNewManifest) {
972   // Initialize the database and add a couple of blob files, one with some
973   // garbage in it, and one without any garbage.
974   NewDB();
975 
976   assert(versions_);
977   assert(versions_->GetColumnFamilySet());
978 
979   ColumnFamilyData* const cfd = versions_->GetColumnFamilySet()->GetDefault();
980   assert(cfd);
981 
982   Version* const version = cfd->current();
983   assert(version);
984 
985   VersionStorageInfo* const storage_info = version->storage_info();
986   assert(storage_info);
987 
988   {
989     constexpr uint64_t blob_file_number = 123;
990     constexpr uint64_t total_blob_count = 456;
991     constexpr uint64_t total_blob_bytes = 77777777;
992     constexpr char checksum_method[] = "SHA1";
993     constexpr char checksum_value[] =
994         "bdb7f34a59dfa1592ce7f52e99f98c570c525cbd";
995 
996     auto shared_meta = SharedBlobFileMetaData::Create(
997         blob_file_number, total_blob_count, total_blob_bytes, checksum_method,
998         checksum_value);
999 
1000     constexpr uint64_t garbage_blob_count = 89;
1001     constexpr uint64_t garbage_blob_bytes = 1000000;
1002 
1003     auto meta = BlobFileMetaData::Create(
1004         std::move(shared_meta), BlobFileMetaData::LinkedSsts(),
1005         garbage_blob_count, garbage_blob_bytes);
1006 
1007     storage_info->AddBlobFile(std::move(meta));
1008   }
1009 
1010   {
1011     constexpr uint64_t blob_file_number = 234;
1012     constexpr uint64_t total_blob_count = 555;
1013     constexpr uint64_t total_blob_bytes = 66666;
1014     constexpr char checksum_method[] = "CRC32";
1015     constexpr char checksum_value[] = "3d87ff57";
1016 
1017     auto shared_meta = SharedBlobFileMetaData::Create(
1018         blob_file_number, total_blob_count, total_blob_bytes, checksum_method,
1019         checksum_value);
1020 
1021     constexpr uint64_t garbage_blob_count = 0;
1022     constexpr uint64_t garbage_blob_bytes = 0;
1023 
1024     auto meta = BlobFileMetaData::Create(
1025         std::move(shared_meta), BlobFileMetaData::LinkedSsts(),
1026         garbage_blob_count, garbage_blob_bytes);
1027 
1028     storage_info->AddBlobFile(std::move(meta));
1029   }
1030 
1031   // Force the creation of a new manifest file and make sure metadata for
1032   // the blob files is re-persisted.
1033   size_t addition_encoded = 0;
1034   SyncPoint::GetInstance()->SetCallBack(
1035       "BlobFileAddition::EncodeTo::CustomFields",
1036       [&](void* /* arg */) { ++addition_encoded; });
1037 
1038   size_t garbage_encoded = 0;
1039   SyncPoint::GetInstance()->SetCallBack(
1040       "BlobFileGarbage::EncodeTo::CustomFields",
1041       [&](void* /* arg */) { ++garbage_encoded; });
1042   SyncPoint::GetInstance()->EnableProcessing();
1043 
1044   CreateNewManifest();
1045 
1046   ASSERT_EQ(addition_encoded, 2);
1047   ASSERT_EQ(garbage_encoded, 1);
1048 
1049   SyncPoint::GetInstance()->DisableProcessing();
1050   SyncPoint::GetInstance()->ClearAllCallBacks();
1051 }
1052 
TEST_F(VersionSetTest,AddLiveBlobFiles)1053 TEST_F(VersionSetTest, AddLiveBlobFiles) {
1054   // Initialize the database and add a blob file.
1055   NewDB();
1056 
1057   assert(versions_);
1058   assert(versions_->GetColumnFamilySet());
1059 
1060   ColumnFamilyData* const cfd = versions_->GetColumnFamilySet()->GetDefault();
1061   assert(cfd);
1062 
1063   Version* const first_version = cfd->current();
1064   assert(first_version);
1065 
1066   VersionStorageInfo* const first_storage_info = first_version->storage_info();
1067   assert(first_storage_info);
1068 
1069   constexpr uint64_t first_blob_file_number = 234;
1070   constexpr uint64_t first_total_blob_count = 555;
1071   constexpr uint64_t first_total_blob_bytes = 66666;
1072   constexpr char first_checksum_method[] = "CRC32";
1073   constexpr char first_checksum_value[] = "3d87ff57";
1074 
1075   auto first_shared_meta = SharedBlobFileMetaData::Create(
1076       first_blob_file_number, first_total_blob_count, first_total_blob_bytes,
1077       first_checksum_method, first_checksum_value);
1078 
1079   constexpr uint64_t garbage_blob_count = 0;
1080   constexpr uint64_t garbage_blob_bytes = 0;
1081 
1082   auto first_meta = BlobFileMetaData::Create(
1083       std::move(first_shared_meta), BlobFileMetaData::LinkedSsts(),
1084       garbage_blob_count, garbage_blob_bytes);
1085 
1086   first_storage_info->AddBlobFile(first_meta);
1087 
1088   // Reference the version so it stays alive even after the following version
1089   // edit.
1090   first_version->Ref();
1091 
1092   // Get live files directly from version.
1093   std::vector<uint64_t> version_table_files;
1094   std::vector<uint64_t> version_blob_files;
1095 
1096   first_version->AddLiveFiles(&version_table_files, &version_blob_files);
1097 
1098   ASSERT_EQ(version_blob_files.size(), 1);
1099   ASSERT_EQ(version_blob_files[0], first_blob_file_number);
1100 
1101   // Create a new version containing an additional blob file.
1102   versions_->TEST_CreateAndAppendVersion(cfd);
1103 
1104   Version* const second_version = cfd->current();
1105   assert(second_version);
1106   assert(second_version != first_version);
1107 
1108   VersionStorageInfo* const second_storage_info =
1109       second_version->storage_info();
1110   assert(second_storage_info);
1111 
1112   constexpr uint64_t second_blob_file_number = 456;
1113   constexpr uint64_t second_total_blob_count = 100;
1114   constexpr uint64_t second_total_blob_bytes = 2000000;
1115   constexpr char second_checksum_method[] = "CRC32B";
1116   constexpr char second_checksum_value[] = "6dbdf23a";
1117 
1118   auto second_shared_meta = SharedBlobFileMetaData::Create(
1119       second_blob_file_number, second_total_blob_count, second_total_blob_bytes,
1120       second_checksum_method, second_checksum_value);
1121 
1122   auto second_meta = BlobFileMetaData::Create(
1123       std::move(second_shared_meta), BlobFileMetaData::LinkedSsts(),
1124       garbage_blob_count, garbage_blob_bytes);
1125 
1126   second_storage_info->AddBlobFile(std::move(first_meta));
1127   second_storage_info->AddBlobFile(std::move(second_meta));
1128 
1129   // Get all live files from version set. Note that the result contains
1130   // duplicates.
1131   std::vector<uint64_t> all_table_files;
1132   std::vector<uint64_t> all_blob_files;
1133 
1134   versions_->AddLiveFiles(&all_table_files, &all_blob_files);
1135 
1136   ASSERT_EQ(all_blob_files.size(), 3);
1137   ASSERT_EQ(all_blob_files[0], first_blob_file_number);
1138   ASSERT_EQ(all_blob_files[1], first_blob_file_number);
1139   ASSERT_EQ(all_blob_files[2], second_blob_file_number);
1140 
1141   // Clean up previous version.
1142   first_version->Unref();
1143 }
1144 
TEST_F(VersionSetTest,ObsoleteBlobFile)1145 TEST_F(VersionSetTest, ObsoleteBlobFile) {
1146   // Initialize the database and add a blob file that is entirely garbage
1147   // and thus can immediately be marked obsolete.
1148   NewDB();
1149 
1150   VersionEdit edit;
1151 
1152   constexpr uint64_t blob_file_number = 234;
1153   constexpr uint64_t total_blob_count = 555;
1154   constexpr uint64_t total_blob_bytes = 66666;
1155   constexpr char checksum_method[] = "CRC32";
1156   constexpr char checksum_value[] = "3d87ff57";
1157 
1158   edit.AddBlobFile(blob_file_number, total_blob_count, total_blob_bytes,
1159                    checksum_method, checksum_value);
1160 
1161   edit.AddBlobFileGarbage(blob_file_number, total_blob_count, total_blob_bytes);
1162 
1163   mutex_.Lock();
1164   Status s =
1165       versions_->LogAndApply(versions_->GetColumnFamilySet()->GetDefault(),
1166                              mutable_cf_options_, &edit, &mutex_);
1167   mutex_.Unlock();
1168 
1169   ASSERT_OK(s);
1170 
1171   // Make sure blob files from the pending number range are not returned
1172   // as obsolete.
1173   {
1174     std::vector<ObsoleteFileInfo> table_files;
1175     std::vector<ObsoleteBlobFileInfo> blob_files;
1176     std::vector<std::string> manifest_files;
1177     constexpr uint64_t min_pending_output = blob_file_number;
1178 
1179     versions_->GetObsoleteFiles(&table_files, &blob_files, &manifest_files,
1180                                 min_pending_output);
1181 
1182     ASSERT_TRUE(blob_files.empty());
1183   }
1184 
1185   // Make sure the blob file is returned as obsolete if it's not in the pending
1186   // range.
1187   {
1188     std::vector<ObsoleteFileInfo> table_files;
1189     std::vector<ObsoleteBlobFileInfo> blob_files;
1190     std::vector<std::string> manifest_files;
1191     constexpr uint64_t min_pending_output = blob_file_number + 1;
1192 
1193     versions_->GetObsoleteFiles(&table_files, &blob_files, &manifest_files,
1194                                 min_pending_output);
1195 
1196     ASSERT_EQ(blob_files.size(), 1);
1197     ASSERT_EQ(blob_files[0].GetBlobFileNumber(), blob_file_number);
1198   }
1199 
1200   // Make sure it's not returned a second time.
1201   {
1202     std::vector<ObsoleteFileInfo> table_files;
1203     std::vector<ObsoleteBlobFileInfo> blob_files;
1204     std::vector<std::string> manifest_files;
1205     constexpr uint64_t min_pending_output = blob_file_number + 1;
1206 
1207     versions_->GetObsoleteFiles(&table_files, &blob_files, &manifest_files,
1208                                 min_pending_output);
1209 
1210     ASSERT_TRUE(blob_files.empty());
1211   }
1212 }
1213 
TEST_F(VersionSetTest,WalEditsNotAppliedToVersion)1214 TEST_F(VersionSetTest, WalEditsNotAppliedToVersion) {
1215   NewDB();
1216 
1217   constexpr uint64_t kNumWals = 5;
1218 
1219   autovector<std::unique_ptr<VersionEdit>> edits;
1220   // Add some WALs.
1221   for (uint64_t i = 1; i <= kNumWals; i++) {
1222     edits.emplace_back(new VersionEdit);
1223     // WAL's size equals its log number.
1224     edits.back()->AddWal(i, WalMetadata(i));
1225   }
1226   // Delete the first half of the WALs.
1227   edits.emplace_back(new VersionEdit);
1228   edits.back()->DeleteWalsBefore(kNumWals / 2 + 1);
1229 
1230   autovector<Version*> versions;
1231   SyncPoint::GetInstance()->SetCallBack(
1232       "VersionSet::ProcessManifestWrites:NewVersion",
1233       [&](void* arg) { versions.push_back(reinterpret_cast<Version*>(arg)); });
1234   SyncPoint::GetInstance()->EnableProcessing();
1235 
1236   ASSERT_OK(LogAndApplyToDefaultCF(edits));
1237 
1238   SyncPoint::GetInstance()->DisableProcessing();
1239   SyncPoint::GetInstance()->ClearAllCallBacks();
1240 
1241   // Since the edits are all WAL edits, no version should be created.
1242   ASSERT_EQ(versions.size(), 1);
1243   ASSERT_EQ(versions[0], nullptr);
1244 }
1245 
1246 // Similar to WalEditsNotAppliedToVersion, but contains a non-WAL edit.
TEST_F(VersionSetTest,NonWalEditsAppliedToVersion)1247 TEST_F(VersionSetTest, NonWalEditsAppliedToVersion) {
1248   NewDB();
1249 
1250   const std::string kDBId = "db_db";
1251   constexpr uint64_t kNumWals = 5;
1252 
1253   autovector<std::unique_ptr<VersionEdit>> edits;
1254   // Add some WALs.
1255   for (uint64_t i = 1; i <= kNumWals; i++) {
1256     edits.emplace_back(new VersionEdit);
1257     // WAL's size equals its log number.
1258     edits.back()->AddWal(i, WalMetadata(i));
1259   }
1260   // Delete the first half of the WALs.
1261   edits.emplace_back(new VersionEdit);
1262   edits.back()->DeleteWalsBefore(kNumWals / 2 + 1);
1263   edits.emplace_back(new VersionEdit);
1264   edits.back()->SetDBId(kDBId);
1265 
1266   autovector<Version*> versions;
1267   SyncPoint::GetInstance()->SetCallBack(
1268       "VersionSet::ProcessManifestWrites:NewVersion",
1269       [&](void* arg) { versions.push_back(reinterpret_cast<Version*>(arg)); });
1270   SyncPoint::GetInstance()->EnableProcessing();
1271 
1272   ASSERT_OK(LogAndApplyToDefaultCF(edits));
1273 
1274   SyncPoint::GetInstance()->DisableProcessing();
1275   SyncPoint::GetInstance()->ClearAllCallBacks();
1276 
1277   // Since the edits are all WAL edits, no version should be created.
1278   ASSERT_EQ(versions.size(), 1);
1279   ASSERT_NE(versions[0], nullptr);
1280 }
1281 
TEST_F(VersionSetTest,WalAddition)1282 TEST_F(VersionSetTest, WalAddition) {
1283   NewDB();
1284 
1285   constexpr WalNumber kLogNumber = 10;
1286   constexpr uint64_t kSizeInBytes = 111;
1287 
1288   // A WAL is just created.
1289   {
1290     VersionEdit edit;
1291     edit.AddWal(kLogNumber);
1292 
1293     ASSERT_OK(LogAndApplyToDefaultCF(edit));
1294 
1295     const auto& wals = versions_->GetWalSet().GetWals();
1296     ASSERT_EQ(wals.size(), 1);
1297     ASSERT_TRUE(wals.find(kLogNumber) != wals.end());
1298     ASSERT_FALSE(wals.at(kLogNumber).HasSyncedSize());
1299   }
1300 
1301   // The WAL is synced for several times before closing.
1302   {
1303     for (uint64_t size_delta = 100; size_delta > 0; size_delta /= 2) {
1304       uint64_t size = kSizeInBytes - size_delta;
1305       WalMetadata wal(size);
1306       VersionEdit edit;
1307       edit.AddWal(kLogNumber, wal);
1308 
1309       ASSERT_OK(LogAndApplyToDefaultCF(edit));
1310 
1311       const auto& wals = versions_->GetWalSet().GetWals();
1312       ASSERT_EQ(wals.size(), 1);
1313       ASSERT_TRUE(wals.find(kLogNumber) != wals.end());
1314       ASSERT_TRUE(wals.at(kLogNumber).HasSyncedSize());
1315       ASSERT_EQ(wals.at(kLogNumber).GetSyncedSizeInBytes(), size);
1316     }
1317   }
1318 
1319   // The WAL is closed.
1320   {
1321     WalMetadata wal(kSizeInBytes);
1322     VersionEdit edit;
1323     edit.AddWal(kLogNumber, wal);
1324 
1325     ASSERT_OK(LogAndApplyToDefaultCF(edit));
1326 
1327     const auto& wals = versions_->GetWalSet().GetWals();
1328     ASSERT_EQ(wals.size(), 1);
1329     ASSERT_TRUE(wals.find(kLogNumber) != wals.end());
1330     ASSERT_TRUE(wals.at(kLogNumber).HasSyncedSize());
1331     ASSERT_EQ(wals.at(kLogNumber).GetSyncedSizeInBytes(), kSizeInBytes);
1332   }
1333 
1334   // Recover a new VersionSet.
1335   {
1336     std::unique_ptr<VersionSet> new_versions(
1337         new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
1338                        &write_buffer_manager_, &write_controller_,
1339                        /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
1340     ASSERT_OK(new_versions->Recover(column_families_, /*read_only=*/false));
1341     const auto& wals = new_versions->GetWalSet().GetWals();
1342     ASSERT_EQ(wals.size(), 1);
1343     ASSERT_TRUE(wals.find(kLogNumber) != wals.end());
1344     ASSERT_TRUE(wals.at(kLogNumber).HasSyncedSize());
1345     ASSERT_EQ(wals.at(kLogNumber).GetSyncedSizeInBytes(), kSizeInBytes);
1346   }
1347 }
1348 
TEST_F(VersionSetTest,WalCloseWithoutSync)1349 TEST_F(VersionSetTest, WalCloseWithoutSync) {
1350   NewDB();
1351 
1352   constexpr WalNumber kLogNumber = 10;
1353   constexpr uint64_t kSizeInBytes = 111;
1354   constexpr uint64_t kSyncedSizeInBytes = kSizeInBytes / 2;
1355 
1356   // A WAL is just created.
1357   {
1358     VersionEdit edit;
1359     edit.AddWal(kLogNumber);
1360 
1361     ASSERT_OK(LogAndApplyToDefaultCF(edit));
1362 
1363     const auto& wals = versions_->GetWalSet().GetWals();
1364     ASSERT_EQ(wals.size(), 1);
1365     ASSERT_TRUE(wals.find(kLogNumber) != wals.end());
1366     ASSERT_FALSE(wals.at(kLogNumber).HasSyncedSize());
1367   }
1368 
1369   // The WAL is synced before closing.
1370   {
1371     WalMetadata wal(kSyncedSizeInBytes);
1372     VersionEdit edit;
1373     edit.AddWal(kLogNumber, wal);
1374 
1375     ASSERT_OK(LogAndApplyToDefaultCF(edit));
1376 
1377     const auto& wals = versions_->GetWalSet().GetWals();
1378     ASSERT_EQ(wals.size(), 1);
1379     ASSERT_TRUE(wals.find(kLogNumber) != wals.end());
1380     ASSERT_TRUE(wals.at(kLogNumber).HasSyncedSize());
1381     ASSERT_EQ(wals.at(kLogNumber).GetSyncedSizeInBytes(), kSyncedSizeInBytes);
1382   }
1383 
1384   // A new WAL with larger log number is created,
1385   // implicitly marking the current WAL closed.
1386   {
1387     VersionEdit edit;
1388     edit.AddWal(kLogNumber + 1);
1389     ASSERT_OK(LogAndApplyToDefaultCF(edit));
1390 
1391     const auto& wals = versions_->GetWalSet().GetWals();
1392     ASSERT_EQ(wals.size(), 2);
1393     ASSERT_TRUE(wals.find(kLogNumber) != wals.end());
1394     ASSERT_TRUE(wals.at(kLogNumber).HasSyncedSize());
1395     ASSERT_EQ(wals.at(kLogNumber).GetSyncedSizeInBytes(), kSyncedSizeInBytes);
1396     ASSERT_TRUE(wals.find(kLogNumber + 1) != wals.end());
1397     ASSERT_FALSE(wals.at(kLogNumber + 1).HasSyncedSize());
1398   }
1399 
1400   // Recover a new VersionSet.
1401   {
1402     std::unique_ptr<VersionSet> new_versions(
1403         new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
1404                        &write_buffer_manager_, &write_controller_,
1405                        /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
1406     ASSERT_OK(new_versions->Recover(column_families_, false));
1407     const auto& wals = new_versions->GetWalSet().GetWals();
1408     ASSERT_EQ(wals.size(), 2);
1409     ASSERT_TRUE(wals.find(kLogNumber) != wals.end());
1410     ASSERT_TRUE(wals.at(kLogNumber).HasSyncedSize());
1411     ASSERT_EQ(wals.at(kLogNumber).GetSyncedSizeInBytes(), kSyncedSizeInBytes);
1412   }
1413 }
1414 
TEST_F(VersionSetTest,WalDeletion)1415 TEST_F(VersionSetTest, WalDeletion) {
1416   NewDB();
1417 
1418   constexpr WalNumber kClosedLogNumber = 10;
1419   constexpr WalNumber kNonClosedLogNumber = 20;
1420   constexpr uint64_t kSizeInBytes = 111;
1421 
1422   // Add a non-closed and a closed WAL.
1423   {
1424     VersionEdit edit;
1425     edit.AddWal(kClosedLogNumber, WalMetadata(kSizeInBytes));
1426     edit.AddWal(kNonClosedLogNumber);
1427 
1428     ASSERT_OK(LogAndApplyToDefaultCF(edit));
1429 
1430     const auto& wals = versions_->GetWalSet().GetWals();
1431     ASSERT_EQ(wals.size(), 2);
1432     ASSERT_TRUE(wals.find(kNonClosedLogNumber) != wals.end());
1433     ASSERT_TRUE(wals.find(kClosedLogNumber) != wals.end());
1434     ASSERT_FALSE(wals.at(kNonClosedLogNumber).HasSyncedSize());
1435     ASSERT_TRUE(wals.at(kClosedLogNumber).HasSyncedSize());
1436     ASSERT_EQ(wals.at(kClosedLogNumber).GetSyncedSizeInBytes(), kSizeInBytes);
1437   }
1438 
1439   // Delete the closed WAL.
1440   {
1441     VersionEdit edit;
1442     edit.DeleteWalsBefore(kNonClosedLogNumber);
1443 
1444     ASSERT_OK(LogAndApplyToDefaultCF(edit));
1445 
1446     const auto& wals = versions_->GetWalSet().GetWals();
1447     ASSERT_EQ(wals.size(), 1);
1448     ASSERT_TRUE(wals.find(kNonClosedLogNumber) != wals.end());
1449     ASSERT_FALSE(wals.at(kNonClosedLogNumber).HasSyncedSize());
1450   }
1451 
1452   // Recover a new VersionSet, only the non-closed WAL should show up.
1453   {
1454     std::unique_ptr<VersionSet> new_versions(
1455         new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
1456                        &write_buffer_manager_, &write_controller_,
1457                        /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
1458     ASSERT_OK(new_versions->Recover(column_families_, false));
1459     const auto& wals = new_versions->GetWalSet().GetWals();
1460     ASSERT_EQ(wals.size(), 1);
1461     ASSERT_TRUE(wals.find(kNonClosedLogNumber) != wals.end());
1462     ASSERT_FALSE(wals.at(kNonClosedLogNumber).HasSyncedSize());
1463   }
1464 
1465   // Force the creation of a new MANIFEST file,
1466   // only the non-closed WAL should be written to the new MANIFEST.
1467   {
1468     std::vector<WalAddition> wal_additions;
1469     SyncPoint::GetInstance()->SetCallBack(
1470         "VersionSet::WriteCurrentStateToManifest:SaveWal", [&](void* arg) {
1471           VersionEdit* edit = reinterpret_cast<VersionEdit*>(arg);
1472           ASSERT_TRUE(edit->IsWalAddition());
1473           for (auto& addition : edit->GetWalAdditions()) {
1474             wal_additions.push_back(addition);
1475           }
1476         });
1477     SyncPoint::GetInstance()->EnableProcessing();
1478 
1479     CreateNewManifest();
1480 
1481     SyncPoint::GetInstance()->DisableProcessing();
1482     SyncPoint::GetInstance()->ClearAllCallBacks();
1483 
1484     ASSERT_EQ(wal_additions.size(), 1);
1485     ASSERT_EQ(wal_additions[0].GetLogNumber(), kNonClosedLogNumber);
1486     ASSERT_FALSE(wal_additions[0].GetMetadata().HasSyncedSize());
1487   }
1488 
1489   // Recover from the new MANIFEST, only the non-closed WAL should show up.
1490   {
1491     std::unique_ptr<VersionSet> new_versions(
1492         new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
1493                        &write_buffer_manager_, &write_controller_,
1494                        /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
1495     ASSERT_OK(new_versions->Recover(column_families_, false));
1496     const auto& wals = new_versions->GetWalSet().GetWals();
1497     ASSERT_EQ(wals.size(), 1);
1498     ASSERT_TRUE(wals.find(kNonClosedLogNumber) != wals.end());
1499     ASSERT_FALSE(wals.at(kNonClosedLogNumber).HasSyncedSize());
1500   }
1501 }
1502 
TEST_F(VersionSetTest,WalCreateTwice)1503 TEST_F(VersionSetTest, WalCreateTwice) {
1504   NewDB();
1505 
1506   constexpr WalNumber kLogNumber = 10;
1507 
1508   VersionEdit edit;
1509   edit.AddWal(kLogNumber);
1510 
1511   ASSERT_OK(LogAndApplyToDefaultCF(edit));
1512 
1513   Status s = LogAndApplyToDefaultCF(edit);
1514   ASSERT_TRUE(s.IsCorruption());
1515   ASSERT_TRUE(s.ToString().find("WAL 10 is created more than once") !=
1516               std::string::npos)
1517       << s.ToString();
1518 }
1519 
TEST_F(VersionSetTest,WalCreateAfterClose)1520 TEST_F(VersionSetTest, WalCreateAfterClose) {
1521   NewDB();
1522 
1523   constexpr WalNumber kLogNumber = 10;
1524   constexpr uint64_t kSizeInBytes = 111;
1525 
1526   {
1527     // Add a closed WAL.
1528     VersionEdit edit;
1529     edit.AddWal(kLogNumber);
1530     WalMetadata wal(kSizeInBytes);
1531     edit.AddWal(kLogNumber, wal);
1532 
1533     ASSERT_OK(LogAndApplyToDefaultCF(edit));
1534   }
1535 
1536   {
1537     // Create the same WAL again.
1538     VersionEdit edit;
1539     edit.AddWal(kLogNumber);
1540 
1541     Status s = LogAndApplyToDefaultCF(edit);
1542     ASSERT_TRUE(s.IsCorruption());
1543     ASSERT_TRUE(s.ToString().find("WAL 10 is created more than once") !=
1544                 std::string::npos)
1545         << s.ToString();
1546   }
1547 }
1548 
TEST_F(VersionSetTest,AddWalWithSmallerSize)1549 TEST_F(VersionSetTest, AddWalWithSmallerSize) {
1550   NewDB();
1551 
1552   constexpr WalNumber kLogNumber = 10;
1553   constexpr uint64_t kSizeInBytes = 111;
1554 
1555   {
1556     // Add a closed WAL.
1557     VersionEdit edit;
1558     WalMetadata wal(kSizeInBytes);
1559     edit.AddWal(kLogNumber, wal);
1560 
1561     ASSERT_OK(LogAndApplyToDefaultCF(edit));
1562   }
1563 
1564   {
1565     // Add the same WAL with smaller synced size.
1566     VersionEdit edit;
1567     WalMetadata wal(kSizeInBytes / 2);
1568     edit.AddWal(kLogNumber, wal);
1569 
1570     Status s = LogAndApplyToDefaultCF(edit);
1571     ASSERT_TRUE(s.IsCorruption());
1572     ASSERT_TRUE(
1573         s.ToString().find(
1574             "WAL 10 must not have smaller synced size than previous one") !=
1575         std::string::npos)
1576         << s.ToString();
1577   }
1578 }
1579 
TEST_F(VersionSetTest,DeleteWalsBeforeNonExistingWalNumber)1580 TEST_F(VersionSetTest, DeleteWalsBeforeNonExistingWalNumber) {
1581   NewDB();
1582 
1583   constexpr WalNumber kLogNumber0 = 10;
1584   constexpr WalNumber kLogNumber1 = 20;
1585   constexpr WalNumber kNonExistingNumber = 15;
1586   constexpr uint64_t kSizeInBytes = 111;
1587 
1588   {
1589     // Add closed WALs.
1590     VersionEdit edit;
1591     WalMetadata wal(kSizeInBytes);
1592     edit.AddWal(kLogNumber0, wal);
1593     edit.AddWal(kLogNumber1, wal);
1594 
1595     ASSERT_OK(LogAndApplyToDefaultCF(edit));
1596   }
1597 
1598   {
1599     // Delete WALs before a non-existing WAL.
1600     VersionEdit edit;
1601     edit.DeleteWalsBefore(kNonExistingNumber);
1602 
1603     ASSERT_OK(LogAndApplyToDefaultCF(edit));
1604   }
1605 
1606   // Recover a new VersionSet, WAL0 is deleted, WAL1 is not.
1607   {
1608     std::unique_ptr<VersionSet> new_versions(
1609         new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
1610                        &write_buffer_manager_, &write_controller_,
1611                        /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
1612     ASSERT_OK(new_versions->Recover(column_families_, false));
1613     const auto& wals = new_versions->GetWalSet().GetWals();
1614     ASSERT_EQ(wals.size(), 1);
1615     ASSERT_TRUE(wals.find(kLogNumber1) != wals.end());
1616   }
1617 }
1618 
TEST_F(VersionSetTest,DeleteAllWals)1619 TEST_F(VersionSetTest, DeleteAllWals) {
1620   NewDB();
1621 
1622   constexpr WalNumber kMaxLogNumber = 10;
1623   constexpr uint64_t kSizeInBytes = 111;
1624 
1625   {
1626     // Add a closed WAL.
1627     VersionEdit edit;
1628     WalMetadata wal(kSizeInBytes);
1629     edit.AddWal(kMaxLogNumber, wal);
1630 
1631     ASSERT_OK(LogAndApplyToDefaultCF(edit));
1632   }
1633 
1634   {
1635     VersionEdit edit;
1636     edit.DeleteWalsBefore(kMaxLogNumber + 10);
1637 
1638     ASSERT_OK(LogAndApplyToDefaultCF(edit));
1639   }
1640 
1641   // Recover a new VersionSet, all WALs are deleted.
1642   {
1643     std::unique_ptr<VersionSet> new_versions(
1644         new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
1645                        &write_buffer_manager_, &write_controller_,
1646                        /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
1647     ASSERT_OK(new_versions->Recover(column_families_, false));
1648     const auto& wals = new_versions->GetWalSet().GetWals();
1649     ASSERT_EQ(wals.size(), 0);
1650   }
1651 }
1652 
TEST_F(VersionSetTest,AtomicGroupWithWalEdits)1653 TEST_F(VersionSetTest, AtomicGroupWithWalEdits) {
1654   NewDB();
1655 
1656   constexpr int kAtomicGroupSize = 7;
1657   constexpr uint64_t kNumWals = 5;
1658   const std::string kDBId = "db_db";
1659 
1660   int remaining = kAtomicGroupSize;
1661   autovector<std::unique_ptr<VersionEdit>> edits;
1662   // Add 5 WALs.
1663   for (uint64_t i = 1; i <= kNumWals; i++) {
1664     edits.emplace_back(new VersionEdit);
1665     // WAL's size equals its log number.
1666     edits.back()->AddWal(i, WalMetadata(i));
1667     edits.back()->MarkAtomicGroup(--remaining);
1668   }
1669   // One edit with the min log number set.
1670   edits.emplace_back(new VersionEdit);
1671   edits.back()->SetDBId(kDBId);
1672   edits.back()->MarkAtomicGroup(--remaining);
1673   // Delete the first added 4 WALs.
1674   edits.emplace_back(new VersionEdit);
1675   edits.back()->DeleteWalsBefore(kNumWals);
1676   edits.back()->MarkAtomicGroup(--remaining);
1677   ASSERT_EQ(remaining, 0);
1678 
1679   ASSERT_OK(LogAndApplyToDefaultCF(edits));
1680 
1681   // Recover a new VersionSet, the min log number and the last WAL should be
1682   // kept.
1683   {
1684     std::unique_ptr<VersionSet> new_versions(
1685         new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
1686                        &write_buffer_manager_, &write_controller_,
1687                        /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
1688     std::string db_id;
1689     ASSERT_OK(
1690         new_versions->Recover(column_families_, /*read_only=*/false, &db_id));
1691 
1692     ASSERT_EQ(db_id, kDBId);
1693 
1694     const auto& wals = new_versions->GetWalSet().GetWals();
1695     ASSERT_EQ(wals.size(), 1);
1696     ASSERT_TRUE(wals.find(kNumWals) != wals.end());
1697     ASSERT_TRUE(wals.at(kNumWals).HasSyncedSize());
1698     ASSERT_EQ(wals.at(kNumWals).GetSyncedSizeInBytes(), kNumWals);
1699   }
1700 }
1701 
1702 class VersionSetWithTimestampTest : public VersionSetTest {
1703  public:
1704   static const std::string kNewCfName;
1705 
VersionSetWithTimestampTest()1706   explicit VersionSetWithTimestampTest() : VersionSetTest() {}
1707 
SetUp()1708   void SetUp() override {
1709     NewDB();
1710     Options options;
1711     options.comparator = test::ComparatorWithU64Ts();
1712     cfd_ = CreateColumnFamily(kNewCfName, options);
1713     EXPECT_NE(nullptr, cfd_);
1714     EXPECT_NE(nullptr, cfd_->GetLatestMutableCFOptions());
1715     column_families_.emplace_back(kNewCfName, options);
1716   }
1717 
TearDown()1718   void TearDown() override {
1719     for (auto* e : edits_) {
1720       delete e;
1721     }
1722     edits_.clear();
1723   }
1724 
GenVersionEditsToSetFullHistoryTsLow(const std::vector<uint64_t> & ts_lbs)1725   void GenVersionEditsToSetFullHistoryTsLow(
1726       const std::vector<uint64_t>& ts_lbs) {
1727     for (const auto ts_lb : ts_lbs) {
1728       VersionEdit* edit = new VersionEdit;
1729       edit->SetColumnFamily(cfd_->GetID());
1730       std::string ts_str = test::EncodeInt(ts_lb);
1731       edit->SetFullHistoryTsLow(ts_str);
1732       edits_.emplace_back(edit);
1733     }
1734   }
1735 
VerifyFullHistoryTsLow(uint64_t expected_ts_low)1736   void VerifyFullHistoryTsLow(uint64_t expected_ts_low) {
1737     std::unique_ptr<VersionSet> vset(
1738         new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
1739                        &write_buffer_manager_, &write_controller_,
1740                        /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
1741     ASSERT_OK(vset->Recover(column_families_, /*read_only=*/false,
1742                             /*db_id=*/nullptr));
1743     for (auto* cfd : *(vset->GetColumnFamilySet())) {
1744       ASSERT_NE(nullptr, cfd);
1745       if (cfd->GetName() == kNewCfName) {
1746         ASSERT_EQ(test::EncodeInt(expected_ts_low), cfd->GetFullHistoryTsLow());
1747       } else {
1748         ASSERT_TRUE(cfd->GetFullHistoryTsLow().empty());
1749       }
1750     }
1751   }
1752 
DoTest(const std::vector<uint64_t> & ts_lbs)1753   void DoTest(const std::vector<uint64_t>& ts_lbs) {
1754     if (ts_lbs.empty()) {
1755       return;
1756     }
1757 
1758     GenVersionEditsToSetFullHistoryTsLow(ts_lbs);
1759 
1760     Status s;
1761     mutex_.Lock();
1762     s = versions_->LogAndApply(cfd_, *(cfd_->GetLatestMutableCFOptions()),
1763                                edits_, &mutex_);
1764     mutex_.Unlock();
1765     ASSERT_OK(s);
1766     VerifyFullHistoryTsLow(*std::max_element(ts_lbs.begin(), ts_lbs.end()));
1767   }
1768 
1769  protected:
1770   ColumnFamilyData* cfd_{nullptr};
1771   // edits_ must contain and own pointers to heap-alloc VersionEdit objects.
1772   autovector<VersionEdit*> edits_;
1773 };
1774 
1775 const std::string VersionSetWithTimestampTest::kNewCfName("new_cf");
1776 
TEST_F(VersionSetWithTimestampTest,SetFullHistoryTsLbOnce)1777 TEST_F(VersionSetWithTimestampTest, SetFullHistoryTsLbOnce) {
1778   constexpr uint64_t kTsLow = 100;
1779   DoTest({kTsLow});
1780 }
1781 
1782 // Simulate the application increasing full_history_ts_low.
TEST_F(VersionSetWithTimestampTest,IncreaseFullHistoryTsLb)1783 TEST_F(VersionSetWithTimestampTest, IncreaseFullHistoryTsLb) {
1784   const std::vector<uint64_t> ts_lbs = {100, 101, 102, 103};
1785   DoTest(ts_lbs);
1786 }
1787 
1788 // Simulate the application trying to decrease full_history_ts_low
1789 // unsuccessfully. If the application calls public API sequentially to
1790 // decrease the lower bound ts, RocksDB will return an InvalidArgument
1791 // status before involving VersionSet. Only when multiple threads trying
1792 // to decrease the lower bound concurrently will this case ever happen. Even
1793 // so, the lower bound cannot be decreased. The application will be notified
1794 // via return value of the API.
TEST_F(VersionSetWithTimestampTest,TryDecreaseFullHistoryTsLb)1795 TEST_F(VersionSetWithTimestampTest, TryDecreaseFullHistoryTsLb) {
1796   const std::vector<uint64_t> ts_lbs = {103, 102, 101, 100};
1797   DoTest(ts_lbs);
1798 }
1799 
1800 class VersionSetAtomicGroupTest : public VersionSetTestBase,
1801                                   public testing::Test {
1802  public:
VersionSetAtomicGroupTest()1803   VersionSetAtomicGroupTest()
1804       : VersionSetTestBase("version_set_atomic_group_test") {}
1805 
SetUp()1806   void SetUp() override {
1807     PrepareManifest(&column_families_, &last_seqno_, &log_writer_);
1808     SetupTestSyncPoints();
1809   }
1810 
SetupValidAtomicGroup(int atomic_group_size)1811   void SetupValidAtomicGroup(int atomic_group_size) {
1812     edits_.resize(atomic_group_size);
1813     int remaining = atomic_group_size;
1814     for (size_t i = 0; i != edits_.size(); ++i) {
1815       edits_[i].SetLogNumber(0);
1816       edits_[i].SetNextFile(2);
1817       edits_[i].MarkAtomicGroup(--remaining);
1818       edits_[i].SetLastSequence(last_seqno_++);
1819     }
1820     ASSERT_OK(SetCurrentFile(fs_.get(), dbname_, 1, nullptr));
1821   }
1822 
SetupIncompleteTrailingAtomicGroup(int atomic_group_size)1823   void SetupIncompleteTrailingAtomicGroup(int atomic_group_size) {
1824     edits_.resize(atomic_group_size);
1825     int remaining = atomic_group_size;
1826     for (size_t i = 0; i != edits_.size(); ++i) {
1827       edits_[i].SetLogNumber(0);
1828       edits_[i].SetNextFile(2);
1829       edits_[i].MarkAtomicGroup(--remaining);
1830       edits_[i].SetLastSequence(last_seqno_++);
1831     }
1832     ASSERT_OK(SetCurrentFile(fs_.get(), dbname_, 1, nullptr));
1833   }
1834 
SetupCorruptedAtomicGroup(int atomic_group_size)1835   void SetupCorruptedAtomicGroup(int atomic_group_size) {
1836     edits_.resize(atomic_group_size);
1837     int remaining = atomic_group_size;
1838     for (size_t i = 0; i != edits_.size(); ++i) {
1839       edits_[i].SetLogNumber(0);
1840       edits_[i].SetNextFile(2);
1841       if (i != ((size_t)atomic_group_size / 2)) {
1842         edits_[i].MarkAtomicGroup(--remaining);
1843       }
1844       edits_[i].SetLastSequence(last_seqno_++);
1845     }
1846     ASSERT_OK(SetCurrentFile(fs_.get(), dbname_, 1, nullptr));
1847   }
1848 
SetupIncorrectAtomicGroup(int atomic_group_size)1849   void SetupIncorrectAtomicGroup(int atomic_group_size) {
1850     edits_.resize(atomic_group_size);
1851     int remaining = atomic_group_size;
1852     for (size_t i = 0; i != edits_.size(); ++i) {
1853       edits_[i].SetLogNumber(0);
1854       edits_[i].SetNextFile(2);
1855       if (i != 1) {
1856         edits_[i].MarkAtomicGroup(--remaining);
1857       } else {
1858         edits_[i].MarkAtomicGroup(remaining--);
1859       }
1860       edits_[i].SetLastSequence(last_seqno_++);
1861     }
1862     ASSERT_OK(SetCurrentFile(fs_.get(), dbname_, 1, nullptr));
1863   }
1864 
SetupTestSyncPoints()1865   void SetupTestSyncPoints() {
1866     SyncPoint::GetInstance()->DisableProcessing();
1867     SyncPoint::GetInstance()->ClearAllCallBacks();
1868     SyncPoint::GetInstance()->SetCallBack(
1869         "AtomicGroupReadBuffer::AddEdit:FirstInAtomicGroup", [&](void* arg) {
1870           VersionEdit* e = reinterpret_cast<VersionEdit*>(arg);
1871           EXPECT_EQ(edits_.front().DebugString(),
1872                     e->DebugString());  // compare based on value
1873           first_in_atomic_group_ = true;
1874         });
1875     SyncPoint::GetInstance()->SetCallBack(
1876         "AtomicGroupReadBuffer::AddEdit:LastInAtomicGroup", [&](void* arg) {
1877           VersionEdit* e = reinterpret_cast<VersionEdit*>(arg);
1878           EXPECT_EQ(edits_.back().DebugString(),
1879                     e->DebugString());  // compare based on value
1880           EXPECT_TRUE(first_in_atomic_group_);
1881           last_in_atomic_group_ = true;
1882         });
1883     SyncPoint::GetInstance()->SetCallBack(
1884         "VersionEditHandlerBase::Iterate:Finish", [&](void* arg) {
1885           num_recovered_edits_ = *reinterpret_cast<int*>(arg);
1886         });
1887     SyncPoint::GetInstance()->SetCallBack(
1888         "AtomicGroupReadBuffer::AddEdit:AtomicGroup",
1889         [&](void* /* arg */) { ++num_edits_in_atomic_group_; });
1890     SyncPoint::GetInstance()->SetCallBack(
1891         "AtomicGroupReadBuffer::AddEdit:AtomicGroupMixedWithNormalEdits",
1892         [&](void* arg) {
1893           corrupted_edit_ = *reinterpret_cast<VersionEdit*>(arg);
1894         });
1895     SyncPoint::GetInstance()->SetCallBack(
1896         "AtomicGroupReadBuffer::AddEdit:IncorrectAtomicGroupSize",
1897         [&](void* arg) {
1898           edit_with_incorrect_group_size_ =
1899               *reinterpret_cast<VersionEdit*>(arg);
1900         });
1901     SyncPoint::GetInstance()->EnableProcessing();
1902   }
1903 
AddNewEditsToLog(int num_edits)1904   void AddNewEditsToLog(int num_edits) {
1905     for (int i = 0; i < num_edits; i++) {
1906       std::string record;
1907       edits_[i].EncodeTo(&record);
1908       ASSERT_OK(log_writer_->AddRecord(record));
1909     }
1910   }
1911 
TearDown()1912   void TearDown() override {
1913     SyncPoint::GetInstance()->DisableProcessing();
1914     SyncPoint::GetInstance()->ClearAllCallBacks();
1915     log_writer_.reset();
1916   }
1917 
1918  protected:
1919   std::vector<ColumnFamilyDescriptor> column_families_;
1920   SequenceNumber last_seqno_;
1921   std::vector<VersionEdit> edits_;
1922   bool first_in_atomic_group_ = false;
1923   bool last_in_atomic_group_ = false;
1924   int num_edits_in_atomic_group_ = 0;
1925   int num_recovered_edits_ = 0;
1926   VersionEdit corrupted_edit_;
1927   VersionEdit edit_with_incorrect_group_size_;
1928   std::unique_ptr<log::Writer> log_writer_;
1929 };
1930 
TEST_F(VersionSetAtomicGroupTest,HandleValidAtomicGroupWithVersionSetRecover)1931 TEST_F(VersionSetAtomicGroupTest, HandleValidAtomicGroupWithVersionSetRecover) {
1932   const int kAtomicGroupSize = 3;
1933   SetupValidAtomicGroup(kAtomicGroupSize);
1934   AddNewEditsToLog(kAtomicGroupSize);
1935   EXPECT_OK(versions_->Recover(column_families_, false));
1936   EXPECT_EQ(column_families_.size(),
1937             versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
1938   EXPECT_TRUE(first_in_atomic_group_);
1939   EXPECT_TRUE(last_in_atomic_group_);
1940   EXPECT_EQ(num_initial_edits_ + kAtomicGroupSize, num_recovered_edits_);
1941 }
1942 
TEST_F(VersionSetAtomicGroupTest,HandleValidAtomicGroupWithReactiveVersionSetRecover)1943 TEST_F(VersionSetAtomicGroupTest,
1944        HandleValidAtomicGroupWithReactiveVersionSetRecover) {
1945   const int kAtomicGroupSize = 3;
1946   SetupValidAtomicGroup(kAtomicGroupSize);
1947   AddNewEditsToLog(kAtomicGroupSize);
1948   std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
1949   std::unique_ptr<log::Reader::Reporter> manifest_reporter;
1950   std::unique_ptr<Status> manifest_reader_status;
1951   EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,
1952                                         &manifest_reporter,
1953                                         &manifest_reader_status));
1954   EXPECT_EQ(column_families_.size(),
1955             reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
1956   EXPECT_TRUE(first_in_atomic_group_);
1957   EXPECT_TRUE(last_in_atomic_group_);
1958   // The recover should clean up the replay buffer.
1959   EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() == 0);
1960   EXPECT_TRUE(reactive_versions_->replay_buffer().size() == 0);
1961   EXPECT_EQ(num_initial_edits_ + kAtomicGroupSize, num_recovered_edits_);
1962 }
1963 
TEST_F(VersionSetAtomicGroupTest,HandleValidAtomicGroupWithReactiveVersionSetReadAndApply)1964 TEST_F(VersionSetAtomicGroupTest,
1965        HandleValidAtomicGroupWithReactiveVersionSetReadAndApply) {
1966   const int kAtomicGroupSize = 3;
1967   SetupValidAtomicGroup(kAtomicGroupSize);
1968   std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
1969   std::unique_ptr<log::Reader::Reporter> manifest_reporter;
1970   std::unique_ptr<Status> manifest_reader_status;
1971   EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,
1972                                         &manifest_reporter,
1973                                         &manifest_reader_status));
1974   EXPECT_EQ(num_initial_edits_, num_recovered_edits_);
1975   AddNewEditsToLog(kAtomicGroupSize);
1976   InstrumentedMutex mu;
1977   std::unordered_set<ColumnFamilyData*> cfds_changed;
1978   mu.Lock();
1979   EXPECT_OK(reactive_versions_->ReadAndApply(
1980       &mu, &manifest_reader, manifest_reader_status.get(), &cfds_changed));
1981   mu.Unlock();
1982   EXPECT_TRUE(first_in_atomic_group_);
1983   EXPECT_TRUE(last_in_atomic_group_);
1984   // The recover should clean up the replay buffer.
1985   EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() == 0);
1986   EXPECT_TRUE(reactive_versions_->replay_buffer().size() == 0);
1987   EXPECT_EQ(kAtomicGroupSize, num_recovered_edits_);
1988 }
1989 
TEST_F(VersionSetAtomicGroupTest,HandleIncompleteTrailingAtomicGroupWithVersionSetRecover)1990 TEST_F(VersionSetAtomicGroupTest,
1991        HandleIncompleteTrailingAtomicGroupWithVersionSetRecover) {
1992   const int kAtomicGroupSize = 4;
1993   const int kNumberOfPersistedVersionEdits = kAtomicGroupSize - 1;
1994   SetupIncompleteTrailingAtomicGroup(kAtomicGroupSize);
1995   AddNewEditsToLog(kNumberOfPersistedVersionEdits);
1996   EXPECT_OK(versions_->Recover(column_families_, false));
1997   EXPECT_EQ(column_families_.size(),
1998             versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
1999   EXPECT_TRUE(first_in_atomic_group_);
2000   EXPECT_FALSE(last_in_atomic_group_);
2001   EXPECT_EQ(kNumberOfPersistedVersionEdits, num_edits_in_atomic_group_);
2002   EXPECT_EQ(num_initial_edits_, num_recovered_edits_);
2003 }
2004 
TEST_F(VersionSetAtomicGroupTest,HandleIncompleteTrailingAtomicGroupWithReactiveVersionSetRecover)2005 TEST_F(VersionSetAtomicGroupTest,
2006        HandleIncompleteTrailingAtomicGroupWithReactiveVersionSetRecover) {
2007   const int kAtomicGroupSize = 4;
2008   const int kNumberOfPersistedVersionEdits = kAtomicGroupSize - 1;
2009   SetupIncompleteTrailingAtomicGroup(kAtomicGroupSize);
2010   AddNewEditsToLog(kNumberOfPersistedVersionEdits);
2011   std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
2012   std::unique_ptr<log::Reader::Reporter> manifest_reporter;
2013   std::unique_ptr<Status> manifest_reader_status;
2014   EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,
2015                                         &manifest_reporter,
2016                                         &manifest_reader_status));
2017   EXPECT_EQ(column_families_.size(),
2018             reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
2019   EXPECT_TRUE(first_in_atomic_group_);
2020   EXPECT_FALSE(last_in_atomic_group_);
2021   EXPECT_EQ(kNumberOfPersistedVersionEdits, num_edits_in_atomic_group_);
2022   // Reactive version set should store the edits in the replay buffer.
2023   EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() ==
2024               kNumberOfPersistedVersionEdits);
2025   EXPECT_TRUE(reactive_versions_->replay_buffer().size() == kAtomicGroupSize);
2026   // Write the last record. The reactive version set should now apply all
2027   // edits.
2028   std::string last_record;
2029   edits_[kAtomicGroupSize - 1].EncodeTo(&last_record);
2030   EXPECT_OK(log_writer_->AddRecord(last_record));
2031   InstrumentedMutex mu;
2032   std::unordered_set<ColumnFamilyData*> cfds_changed;
2033   mu.Lock();
2034   EXPECT_OK(reactive_versions_->ReadAndApply(
2035       &mu, &manifest_reader, manifest_reader_status.get(), &cfds_changed));
2036   mu.Unlock();
2037   // Reactive version set should be empty now.
2038   EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() == 0);
2039   EXPECT_TRUE(reactive_versions_->replay_buffer().size() == 0);
2040   EXPECT_EQ(num_initial_edits_, num_recovered_edits_);
2041 }
2042 
TEST_F(VersionSetAtomicGroupTest,HandleIncompleteTrailingAtomicGroupWithReactiveVersionSetReadAndApply)2043 TEST_F(VersionSetAtomicGroupTest,
2044        HandleIncompleteTrailingAtomicGroupWithReactiveVersionSetReadAndApply) {
2045   const int kAtomicGroupSize = 4;
2046   const int kNumberOfPersistedVersionEdits = kAtomicGroupSize - 1;
2047   SetupIncompleteTrailingAtomicGroup(kAtomicGroupSize);
2048   std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
2049   std::unique_ptr<log::Reader::Reporter> manifest_reporter;
2050   std::unique_ptr<Status> manifest_reader_status;
2051   // No edits in an atomic group.
2052   EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,
2053                                         &manifest_reporter,
2054                                         &manifest_reader_status));
2055   EXPECT_EQ(column_families_.size(),
2056             reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
2057   EXPECT_EQ(num_initial_edits_, num_recovered_edits_);
2058   // Write a few edits in an atomic group.
2059   AddNewEditsToLog(kNumberOfPersistedVersionEdits);
2060   InstrumentedMutex mu;
2061   std::unordered_set<ColumnFamilyData*> cfds_changed;
2062   mu.Lock();
2063   EXPECT_OK(reactive_versions_->ReadAndApply(
2064       &mu, &manifest_reader, manifest_reader_status.get(), &cfds_changed));
2065   mu.Unlock();
2066   EXPECT_TRUE(first_in_atomic_group_);
2067   EXPECT_FALSE(last_in_atomic_group_);
2068   EXPECT_EQ(kNumberOfPersistedVersionEdits, num_edits_in_atomic_group_);
2069   // Reactive version set should store the edits in the replay buffer.
2070   EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() ==
2071               kNumberOfPersistedVersionEdits);
2072   EXPECT_TRUE(reactive_versions_->replay_buffer().size() == kAtomicGroupSize);
2073 }
2074 
TEST_F(VersionSetAtomicGroupTest,HandleCorruptedAtomicGroupWithVersionSetRecover)2075 TEST_F(VersionSetAtomicGroupTest,
2076        HandleCorruptedAtomicGroupWithVersionSetRecover) {
2077   const int kAtomicGroupSize = 4;
2078   SetupCorruptedAtomicGroup(kAtomicGroupSize);
2079   AddNewEditsToLog(kAtomicGroupSize);
2080   EXPECT_NOK(versions_->Recover(column_families_, false));
2081   EXPECT_EQ(column_families_.size(),
2082             versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
2083   EXPECT_EQ(edits_[kAtomicGroupSize / 2].DebugString(),
2084             corrupted_edit_.DebugString());
2085 }
2086 
TEST_F(VersionSetAtomicGroupTest,HandleCorruptedAtomicGroupWithReactiveVersionSetRecover)2087 TEST_F(VersionSetAtomicGroupTest,
2088        HandleCorruptedAtomicGroupWithReactiveVersionSetRecover) {
2089   const int kAtomicGroupSize = 4;
2090   SetupCorruptedAtomicGroup(kAtomicGroupSize);
2091   AddNewEditsToLog(kAtomicGroupSize);
2092   std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
2093   std::unique_ptr<log::Reader::Reporter> manifest_reporter;
2094   std::unique_ptr<Status> manifest_reader_status;
2095   EXPECT_NOK(reactive_versions_->Recover(column_families_, &manifest_reader,
2096                                          &manifest_reporter,
2097                                          &manifest_reader_status));
2098   EXPECT_EQ(column_families_.size(),
2099             reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
2100   EXPECT_EQ(edits_[kAtomicGroupSize / 2].DebugString(),
2101             corrupted_edit_.DebugString());
2102 }
2103 
TEST_F(VersionSetAtomicGroupTest,HandleCorruptedAtomicGroupWithReactiveVersionSetReadAndApply)2104 TEST_F(VersionSetAtomicGroupTest,
2105        HandleCorruptedAtomicGroupWithReactiveVersionSetReadAndApply) {
2106   const int kAtomicGroupSize = 4;
2107   SetupCorruptedAtomicGroup(kAtomicGroupSize);
2108   InstrumentedMutex mu;
2109   std::unordered_set<ColumnFamilyData*> cfds_changed;
2110   std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
2111   std::unique_ptr<log::Reader::Reporter> manifest_reporter;
2112   std::unique_ptr<Status> manifest_reader_status;
2113   EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,
2114                                         &manifest_reporter,
2115                                         &manifest_reader_status));
2116   // Write the corrupted edits.
2117   AddNewEditsToLog(kAtomicGroupSize);
2118   mu.Lock();
2119   EXPECT_NOK(reactive_versions_->ReadAndApply(
2120       &mu, &manifest_reader, manifest_reader_status.get(), &cfds_changed));
2121   mu.Unlock();
2122   EXPECT_EQ(edits_[kAtomicGroupSize / 2].DebugString(),
2123             corrupted_edit_.DebugString());
2124 }
2125 
TEST_F(VersionSetAtomicGroupTest,HandleIncorrectAtomicGroupSizeWithVersionSetRecover)2126 TEST_F(VersionSetAtomicGroupTest,
2127        HandleIncorrectAtomicGroupSizeWithVersionSetRecover) {
2128   const int kAtomicGroupSize = 4;
2129   SetupIncorrectAtomicGroup(kAtomicGroupSize);
2130   AddNewEditsToLog(kAtomicGroupSize);
2131   EXPECT_NOK(versions_->Recover(column_families_, false));
2132   EXPECT_EQ(column_families_.size(),
2133             versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
2134   EXPECT_EQ(edits_[1].DebugString(),
2135             edit_with_incorrect_group_size_.DebugString());
2136 }
2137 
TEST_F(VersionSetAtomicGroupTest,HandleIncorrectAtomicGroupSizeWithReactiveVersionSetRecover)2138 TEST_F(VersionSetAtomicGroupTest,
2139        HandleIncorrectAtomicGroupSizeWithReactiveVersionSetRecover) {
2140   const int kAtomicGroupSize = 4;
2141   SetupIncorrectAtomicGroup(kAtomicGroupSize);
2142   AddNewEditsToLog(kAtomicGroupSize);
2143   std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
2144   std::unique_ptr<log::Reader::Reporter> manifest_reporter;
2145   std::unique_ptr<Status> manifest_reader_status;
2146   EXPECT_NOK(reactive_versions_->Recover(column_families_, &manifest_reader,
2147                                          &manifest_reporter,
2148                                          &manifest_reader_status));
2149   EXPECT_EQ(column_families_.size(),
2150             reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
2151   EXPECT_EQ(edits_[1].DebugString(),
2152             edit_with_incorrect_group_size_.DebugString());
2153 }
2154 
TEST_F(VersionSetAtomicGroupTest,HandleIncorrectAtomicGroupSizeWithReactiveVersionSetReadAndApply)2155 TEST_F(VersionSetAtomicGroupTest,
2156        HandleIncorrectAtomicGroupSizeWithReactiveVersionSetReadAndApply) {
2157   const int kAtomicGroupSize = 4;
2158   SetupIncorrectAtomicGroup(kAtomicGroupSize);
2159   InstrumentedMutex mu;
2160   std::unordered_set<ColumnFamilyData*> cfds_changed;
2161   std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
2162   std::unique_ptr<log::Reader::Reporter> manifest_reporter;
2163   std::unique_ptr<Status> manifest_reader_status;
2164   EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,
2165                                         &manifest_reporter,
2166                                         &manifest_reader_status));
2167   AddNewEditsToLog(kAtomicGroupSize);
2168   mu.Lock();
2169   EXPECT_NOK(reactive_versions_->ReadAndApply(
2170       &mu, &manifest_reader, manifest_reader_status.get(), &cfds_changed));
2171   mu.Unlock();
2172   EXPECT_EQ(edits_[1].DebugString(),
2173             edit_with_incorrect_group_size_.DebugString());
2174 }
2175 
2176 class VersionSetTestDropOneCF : public VersionSetTestBase,
2177                                 public testing::TestWithParam<std::string> {
2178  public:
VersionSetTestDropOneCF()2179   VersionSetTestDropOneCF()
2180       : VersionSetTestBase("version_set_test_drop_one_cf") {}
2181 };
2182 
2183 // This test simulates the following execution sequence
2184 // Time  thread1                  bg_flush_thr
2185 //  |                             Prepare version edits (e1,e2,e3) for atomic
2186 //  |                             flush cf1, cf2, cf3
2187 //  |    Enqueue e to drop cfi
2188 //  |    to manifest_writers_
2189 //  |                             Enqueue (e1,e2,e3) to manifest_writers_
2190 //  |
2191 //  |    Apply e,
2192 //  |    cfi.IsDropped() is true
2193 //  |                             Apply (e1,e2,e3),
2194 //  |                             since cfi.IsDropped() == true, we need to
2195 //  |                             drop ei and write the rest to MANIFEST.
2196 //  V
2197 //
2198 //  Repeat the test for i = 1, 2, 3 to simulate dropping the first, middle and
2199 //  last column family in an atomic group.
TEST_P(VersionSetTestDropOneCF,HandleDroppedColumnFamilyInAtomicGroup)2200 TEST_P(VersionSetTestDropOneCF, HandleDroppedColumnFamilyInAtomicGroup) {
2201   std::vector<ColumnFamilyDescriptor> column_families;
2202   SequenceNumber last_seqno;
2203   std::unique_ptr<log::Writer> log_writer;
2204   PrepareManifest(&column_families, &last_seqno, &log_writer);
2205   Status s = SetCurrentFile(fs_.get(), dbname_, 1, nullptr);
2206   ASSERT_OK(s);
2207 
2208   EXPECT_OK(versions_->Recover(column_families, false /* read_only */));
2209   EXPECT_EQ(column_families.size(),
2210             versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
2211 
2212   const int kAtomicGroupSize = 3;
2213   const std::vector<std::string> non_default_cf_names = {
2214       kColumnFamilyName1, kColumnFamilyName2, kColumnFamilyName3};
2215 
2216   // Drop one column family
2217   VersionEdit drop_cf_edit;
2218   drop_cf_edit.DropColumnFamily();
2219   const std::string cf_to_drop_name(GetParam());
2220   auto cfd_to_drop =
2221       versions_->GetColumnFamilySet()->GetColumnFamily(cf_to_drop_name);
2222   ASSERT_NE(nullptr, cfd_to_drop);
2223   // Increase its refcount because cfd_to_drop is used later, and we need to
2224   // prevent it from being deleted.
2225   cfd_to_drop->Ref();
2226   drop_cf_edit.SetColumnFamily(cfd_to_drop->GetID());
2227   mutex_.Lock();
2228   s = versions_->LogAndApply(cfd_to_drop,
2229                              *cfd_to_drop->GetLatestMutableCFOptions(),
2230                              &drop_cf_edit, &mutex_);
2231   mutex_.Unlock();
2232   ASSERT_OK(s);
2233 
2234   std::vector<VersionEdit> edits(kAtomicGroupSize);
2235   uint32_t remaining = kAtomicGroupSize;
2236   size_t i = 0;
2237   autovector<ColumnFamilyData*> cfds;
2238   autovector<const MutableCFOptions*> mutable_cf_options_list;
2239   autovector<autovector<VersionEdit*>> edit_lists;
2240   for (const auto& cf_name : non_default_cf_names) {
2241     auto cfd = (cf_name != cf_to_drop_name)
2242                    ? versions_->GetColumnFamilySet()->GetColumnFamily(cf_name)
2243                    : cfd_to_drop;
2244     ASSERT_NE(nullptr, cfd);
2245     cfds.push_back(cfd);
2246     mutable_cf_options_list.emplace_back(cfd->GetLatestMutableCFOptions());
2247     edits[i].SetColumnFamily(cfd->GetID());
2248     edits[i].SetLogNumber(0);
2249     edits[i].SetNextFile(2);
2250     edits[i].MarkAtomicGroup(--remaining);
2251     edits[i].SetLastSequence(last_seqno++);
2252     autovector<VersionEdit*> tmp_edits;
2253     tmp_edits.push_back(&edits[i]);
2254     edit_lists.emplace_back(tmp_edits);
2255     ++i;
2256   }
2257   int called = 0;
2258   SyncPoint::GetInstance()->DisableProcessing();
2259   SyncPoint::GetInstance()->ClearAllCallBacks();
2260   SyncPoint::GetInstance()->SetCallBack(
2261       "VersionSet::ProcessManifestWrites:CheckOneAtomicGroup", [&](void* arg) {
2262         std::vector<VersionEdit*>* tmp_edits =
2263             reinterpret_cast<std::vector<VersionEdit*>*>(arg);
2264         EXPECT_EQ(kAtomicGroupSize - 1, tmp_edits->size());
2265         for (const auto e : *tmp_edits) {
2266           bool found = false;
2267           for (const auto& e2 : edits) {
2268             if (&e2 == e) {
2269               found = true;
2270               break;
2271             }
2272           }
2273           ASSERT_TRUE(found);
2274         }
2275         ++called;
2276       });
2277   SyncPoint::GetInstance()->EnableProcessing();
2278   mutex_.Lock();
2279   s = versions_->LogAndApply(cfds, mutable_cf_options_list, edit_lists,
2280                              &mutex_);
2281   mutex_.Unlock();
2282   ASSERT_OK(s);
2283   ASSERT_EQ(1, called);
2284   cfd_to_drop->UnrefAndTryDelete();
2285 }
2286 
2287 INSTANTIATE_TEST_CASE_P(
2288     AtomicGroup, VersionSetTestDropOneCF,
2289     testing::Values(VersionSetTestBase::kColumnFamilyName1,
2290                     VersionSetTestBase::kColumnFamilyName2,
2291                     VersionSetTestBase::kColumnFamilyName3));
2292 
2293 class EmptyDefaultCfNewManifest : public VersionSetTestBase,
2294                                   public testing::Test {
2295  public:
EmptyDefaultCfNewManifest()2296   EmptyDefaultCfNewManifest() : VersionSetTestBase("version_set_new_db_test") {}
2297   // Emulate DBImpl::NewDB()
PrepareManifest(std::vector<ColumnFamilyDescriptor> *,SequenceNumber *,std::unique_ptr<log::Writer> * log_writer)2298   void PrepareManifest(std::vector<ColumnFamilyDescriptor>* /*column_families*/,
2299                        SequenceNumber* /*last_seqno*/,
2300                        std::unique_ptr<log::Writer>* log_writer) override {
2301     assert(log_writer != nullptr);
2302     VersionEdit new_db;
2303     new_db.SetLogNumber(0);
2304     const std::string manifest_path = DescriptorFileName(dbname_, 1);
2305     const auto& fs = env_->GetFileSystem();
2306     std::unique_ptr<WritableFileWriter> file_writer;
2307     Status s = WritableFileWriter::Create(
2308         fs, manifest_path, fs->OptimizeForManifestWrite(env_options_),
2309         &file_writer, nullptr);
2310     ASSERT_OK(s);
2311     log_writer->reset(new log::Writer(std::move(file_writer), 0, true));
2312     std::string record;
2313     ASSERT_TRUE(new_db.EncodeTo(&record));
2314     s = (*log_writer)->AddRecord(record);
2315     ASSERT_OK(s);
2316     // Create new column family
2317     VersionEdit new_cf;
2318     new_cf.AddColumnFamily(VersionSetTestBase::kColumnFamilyName1);
2319     new_cf.SetColumnFamily(1);
2320     new_cf.SetLastSequence(2);
2321     new_cf.SetNextFile(2);
2322     record.clear();
2323     ASSERT_TRUE(new_cf.EncodeTo(&record));
2324     s = (*log_writer)->AddRecord(record);
2325     ASSERT_OK(s);
2326   }
2327 
2328  protected:
2329   bool write_dbid_to_manifest_ = false;
2330   std::unique_ptr<log::Writer> log_writer_;
2331 };
2332 
2333 // Create db, create column family. Cf creation will switch to a new MANIFEST.
2334 // Then reopen db, trying to recover.
TEST_F(EmptyDefaultCfNewManifest,Recover)2335 TEST_F(EmptyDefaultCfNewManifest, Recover) {
2336   PrepareManifest(nullptr, nullptr, &log_writer_);
2337   log_writer_.reset();
2338   Status s =
2339       SetCurrentFile(fs_.get(), dbname_, 1, /*directory_to_fsync=*/nullptr);
2340   ASSERT_OK(s);
2341   std::string manifest_path;
2342   VerifyManifest(&manifest_path);
2343   std::vector<ColumnFamilyDescriptor> column_families;
2344   column_families.emplace_back(kDefaultColumnFamilyName, cf_options_);
2345   column_families.emplace_back(VersionSetTestBase::kColumnFamilyName1,
2346                                cf_options_);
2347   std::string db_id;
2348   bool has_missing_table_file = false;
2349   s = versions_->TryRecoverFromOneManifest(
2350       manifest_path, column_families, false, &db_id, &has_missing_table_file);
2351   ASSERT_OK(s);
2352   ASSERT_FALSE(has_missing_table_file);
2353 }
2354 
2355 class VersionSetTestEmptyDb
2356     : public VersionSetTestBase,
2357       public testing::TestWithParam<
2358           std::tuple<bool, bool, std::vector<std::string>>> {
2359  public:
2360   static const std::string kUnknownColumnFamilyName;
VersionSetTestEmptyDb()2361   VersionSetTestEmptyDb() : VersionSetTestBase("version_set_test_empty_db") {}
2362 
2363  protected:
PrepareManifest(std::vector<ColumnFamilyDescriptor> *,SequenceNumber *,std::unique_ptr<log::Writer> * log_writer)2364   void PrepareManifest(std::vector<ColumnFamilyDescriptor>* /*column_families*/,
2365                        SequenceNumber* /*last_seqno*/,
2366                        std::unique_ptr<log::Writer>* log_writer) override {
2367     assert(nullptr != log_writer);
2368     VersionEdit new_db;
2369     if (db_options_.write_dbid_to_manifest) {
2370       DBOptions tmp_db_options;
2371       tmp_db_options.env = env_;
2372       std::unique_ptr<DBImpl> impl(new DBImpl(tmp_db_options, dbname_));
2373       std::string db_id;
2374       impl->GetDbIdentityFromIdentityFile(&db_id);
2375       new_db.SetDBId(db_id);
2376     }
2377     const std::string manifest_path = DescriptorFileName(dbname_, 1);
2378     const auto& fs = env_->GetFileSystem();
2379     std::unique_ptr<WritableFileWriter> file_writer;
2380     Status s = WritableFileWriter::Create(
2381         fs, manifest_path, fs->OptimizeForManifestWrite(env_options_),
2382         &file_writer, nullptr);
2383     ASSERT_OK(s);
2384     {
2385       log_writer->reset(new log::Writer(std::move(file_writer), 0, false));
2386       std::string record;
2387       new_db.EncodeTo(&record);
2388       s = (*log_writer)->AddRecord(record);
2389       ASSERT_OK(s);
2390     }
2391   }
2392 
2393   std::unique_ptr<log::Writer> log_writer_;
2394 };
2395 
2396 const std::string VersionSetTestEmptyDb::kUnknownColumnFamilyName = "unknown";
2397 
TEST_P(VersionSetTestEmptyDb,OpenFromIncompleteManifest0)2398 TEST_P(VersionSetTestEmptyDb, OpenFromIncompleteManifest0) {
2399   db_options_.write_dbid_to_manifest = std::get<0>(GetParam());
2400   PrepareManifest(nullptr, nullptr, &log_writer_);
2401   log_writer_.reset();
2402   Status s =
2403       SetCurrentFile(fs_.get(), dbname_, 1, /*directory_to_fsync=*/nullptr);
2404   ASSERT_OK(s);
2405 
2406   std::string manifest_path;
2407   VerifyManifest(&manifest_path);
2408 
2409   bool read_only = std::get<1>(GetParam());
2410   const std::vector<std::string> cf_names = std::get<2>(GetParam());
2411 
2412   std::vector<ColumnFamilyDescriptor> column_families;
2413   for (const auto& cf_name : cf_names) {
2414     column_families.emplace_back(cf_name, cf_options_);
2415   }
2416 
2417   std::string db_id;
2418   bool has_missing_table_file = false;
2419   s = versions_->TryRecoverFromOneManifest(manifest_path, column_families,
2420                                            read_only, &db_id,
2421                                            &has_missing_table_file);
2422   auto iter =
2423       std::find(cf_names.begin(), cf_names.end(), kDefaultColumnFamilyName);
2424   if (iter == cf_names.end()) {
2425     ASSERT_TRUE(s.IsInvalidArgument());
2426   } else {
2427     ASSERT_TRUE(s.IsCorruption());
2428   }
2429 }
2430 
TEST_P(VersionSetTestEmptyDb,OpenFromIncompleteManifest1)2431 TEST_P(VersionSetTestEmptyDb, OpenFromIncompleteManifest1) {
2432   db_options_.write_dbid_to_manifest = std::get<0>(GetParam());
2433   PrepareManifest(nullptr, nullptr, &log_writer_);
2434   // Only a subset of column families in the MANIFEST.
2435   VersionEdit new_cf1;
2436   new_cf1.AddColumnFamily(VersionSetTestBase::kColumnFamilyName1);
2437   new_cf1.SetColumnFamily(1);
2438   Status s;
2439   {
2440     std::string record;
2441     new_cf1.EncodeTo(&record);
2442     s = log_writer_->AddRecord(record);
2443     ASSERT_OK(s);
2444   }
2445   log_writer_.reset();
2446   s = SetCurrentFile(fs_.get(), dbname_, 1, /*directory_to_fsync=*/nullptr);
2447   ASSERT_OK(s);
2448 
2449   std::string manifest_path;
2450   VerifyManifest(&manifest_path);
2451 
2452   bool read_only = std::get<1>(GetParam());
2453   const std::vector<std::string>& cf_names = std::get<2>(GetParam());
2454   std::vector<ColumnFamilyDescriptor> column_families;
2455   for (const auto& cf_name : cf_names) {
2456     column_families.emplace_back(cf_name, cf_options_);
2457   }
2458   std::string db_id;
2459   bool has_missing_table_file = false;
2460   s = versions_->TryRecoverFromOneManifest(manifest_path, column_families,
2461                                            read_only, &db_id,
2462                                            &has_missing_table_file);
2463   auto iter =
2464       std::find(cf_names.begin(), cf_names.end(), kDefaultColumnFamilyName);
2465   if (iter == cf_names.end()) {
2466     ASSERT_TRUE(s.IsInvalidArgument());
2467   } else {
2468     ASSERT_TRUE(s.IsCorruption());
2469   }
2470 }
2471 
TEST_P(VersionSetTestEmptyDb,OpenFromInCompleteManifest2)2472 TEST_P(VersionSetTestEmptyDb, OpenFromInCompleteManifest2) {
2473   db_options_.write_dbid_to_manifest = std::get<0>(GetParam());
2474   PrepareManifest(nullptr, nullptr, &log_writer_);
2475   // Write all column families but no log_number, next_file_number and
2476   // last_sequence.
2477   const std::vector<std::string> all_cf_names = {
2478       kDefaultColumnFamilyName, kColumnFamilyName1, kColumnFamilyName2,
2479       kColumnFamilyName3};
2480   uint32_t cf_id = 1;
2481   Status s;
2482   for (size_t i = 1; i != all_cf_names.size(); ++i) {
2483     VersionEdit new_cf;
2484     new_cf.AddColumnFamily(all_cf_names[i]);
2485     new_cf.SetColumnFamily(cf_id++);
2486     std::string record;
2487     ASSERT_TRUE(new_cf.EncodeTo(&record));
2488     s = log_writer_->AddRecord(record);
2489     ASSERT_OK(s);
2490   }
2491   log_writer_.reset();
2492   s = SetCurrentFile(fs_.get(), dbname_, 1, /*directory_to_fsync=*/nullptr);
2493   ASSERT_OK(s);
2494 
2495   std::string manifest_path;
2496   VerifyManifest(&manifest_path);
2497 
2498   bool read_only = std::get<1>(GetParam());
2499   const std::vector<std::string>& cf_names = std::get<2>(GetParam());
2500   std::vector<ColumnFamilyDescriptor> column_families;
2501   for (const auto& cf_name : cf_names) {
2502     column_families.emplace_back(cf_name, cf_options_);
2503   }
2504   std::string db_id;
2505   bool has_missing_table_file = false;
2506   s = versions_->TryRecoverFromOneManifest(manifest_path, column_families,
2507                                            read_only, &db_id,
2508                                            &has_missing_table_file);
2509   auto iter =
2510       std::find(cf_names.begin(), cf_names.end(), kDefaultColumnFamilyName);
2511   if (iter == cf_names.end()) {
2512     ASSERT_TRUE(s.IsInvalidArgument());
2513   } else {
2514     ASSERT_TRUE(s.IsCorruption());
2515   }
2516 }
2517 
TEST_P(VersionSetTestEmptyDb,OpenManifestWithUnknownCF)2518 TEST_P(VersionSetTestEmptyDb, OpenManifestWithUnknownCF) {
2519   db_options_.write_dbid_to_manifest = std::get<0>(GetParam());
2520   PrepareManifest(nullptr, nullptr, &log_writer_);
2521   // Write all column families but no log_number, next_file_number and
2522   // last_sequence.
2523   const std::vector<std::string> all_cf_names = {
2524       kDefaultColumnFamilyName, kColumnFamilyName1, kColumnFamilyName2,
2525       kColumnFamilyName3};
2526   uint32_t cf_id = 1;
2527   Status s;
2528   for (size_t i = 1; i != all_cf_names.size(); ++i) {
2529     VersionEdit new_cf;
2530     new_cf.AddColumnFamily(all_cf_names[i]);
2531     new_cf.SetColumnFamily(cf_id++);
2532     std::string record;
2533     ASSERT_TRUE(new_cf.EncodeTo(&record));
2534     s = log_writer_->AddRecord(record);
2535     ASSERT_OK(s);
2536   }
2537   {
2538     VersionEdit tmp_edit;
2539     tmp_edit.SetColumnFamily(4);
2540     tmp_edit.SetLogNumber(0);
2541     tmp_edit.SetNextFile(2);
2542     tmp_edit.SetLastSequence(0);
2543     std::string record;
2544     ASSERT_TRUE(tmp_edit.EncodeTo(&record));
2545     s = log_writer_->AddRecord(record);
2546     ASSERT_OK(s);
2547   }
2548   log_writer_.reset();
2549   s = SetCurrentFile(fs_.get(), dbname_, 1, /*directory_to_fsync=*/nullptr);
2550   ASSERT_OK(s);
2551 
2552   std::string manifest_path;
2553   VerifyManifest(&manifest_path);
2554 
2555   bool read_only = std::get<1>(GetParam());
2556   const std::vector<std::string>& cf_names = std::get<2>(GetParam());
2557   std::vector<ColumnFamilyDescriptor> column_families;
2558   for (const auto& cf_name : cf_names) {
2559     column_families.emplace_back(cf_name, cf_options_);
2560   }
2561   std::string db_id;
2562   bool has_missing_table_file = false;
2563   s = versions_->TryRecoverFromOneManifest(manifest_path, column_families,
2564                                            read_only, &db_id,
2565                                            &has_missing_table_file);
2566   auto iter =
2567       std::find(cf_names.begin(), cf_names.end(), kDefaultColumnFamilyName);
2568   if (iter == cf_names.end()) {
2569     ASSERT_TRUE(s.IsInvalidArgument());
2570   } else {
2571     ASSERT_TRUE(s.IsCorruption());
2572   }
2573 }
2574 
TEST_P(VersionSetTestEmptyDb,OpenCompleteManifest)2575 TEST_P(VersionSetTestEmptyDb, OpenCompleteManifest) {
2576   db_options_.write_dbid_to_manifest = std::get<0>(GetParam());
2577   PrepareManifest(nullptr, nullptr, &log_writer_);
2578   // Write all column families but no log_number, next_file_number and
2579   // last_sequence.
2580   const std::vector<std::string> all_cf_names = {
2581       kDefaultColumnFamilyName, kColumnFamilyName1, kColumnFamilyName2,
2582       kColumnFamilyName3};
2583   uint32_t cf_id = 1;
2584   Status s;
2585   for (size_t i = 1; i != all_cf_names.size(); ++i) {
2586     VersionEdit new_cf;
2587     new_cf.AddColumnFamily(all_cf_names[i]);
2588     new_cf.SetColumnFamily(cf_id++);
2589     std::string record;
2590     ASSERT_TRUE(new_cf.EncodeTo(&record));
2591     s = log_writer_->AddRecord(record);
2592     ASSERT_OK(s);
2593   }
2594   {
2595     VersionEdit tmp_edit;
2596     tmp_edit.SetLogNumber(0);
2597     tmp_edit.SetNextFile(2);
2598     tmp_edit.SetLastSequence(0);
2599     std::string record;
2600     ASSERT_TRUE(tmp_edit.EncodeTo(&record));
2601     s = log_writer_->AddRecord(record);
2602     ASSERT_OK(s);
2603   }
2604   log_writer_.reset();
2605   s = SetCurrentFile(fs_.get(), dbname_, 1, /*directory_to_fsync=*/nullptr);
2606   ASSERT_OK(s);
2607 
2608   std::string manifest_path;
2609   VerifyManifest(&manifest_path);
2610 
2611   bool read_only = std::get<1>(GetParam());
2612   const std::vector<std::string>& cf_names = std::get<2>(GetParam());
2613   std::vector<ColumnFamilyDescriptor> column_families;
2614   for (const auto& cf_name : cf_names) {
2615     column_families.emplace_back(cf_name, cf_options_);
2616   }
2617   std::string db_id;
2618   bool has_missing_table_file = false;
2619   s = versions_->TryRecoverFromOneManifest(manifest_path, column_families,
2620                                            read_only, &db_id,
2621                                            &has_missing_table_file);
2622   auto iter =
2623       std::find(cf_names.begin(), cf_names.end(), kDefaultColumnFamilyName);
2624   if (iter == cf_names.end()) {
2625     ASSERT_TRUE(s.IsInvalidArgument());
2626   } else if (read_only) {
2627     ASSERT_OK(s);
2628     ASSERT_FALSE(has_missing_table_file);
2629   } else if (cf_names.size() == all_cf_names.size()) {
2630     ASSERT_OK(s);
2631     ASSERT_FALSE(has_missing_table_file);
2632   } else if (cf_names.size() < all_cf_names.size()) {
2633     ASSERT_TRUE(s.IsInvalidArgument());
2634   } else {
2635     ASSERT_OK(s);
2636     ASSERT_FALSE(has_missing_table_file);
2637     ColumnFamilyData* cfd = versions_->GetColumnFamilySet()->GetColumnFamily(
2638         kUnknownColumnFamilyName);
2639     ASSERT_EQ(nullptr, cfd);
2640   }
2641 }
2642 
2643 INSTANTIATE_TEST_CASE_P(
2644     BestEffortRecovery, VersionSetTestEmptyDb,
2645     testing::Combine(
2646         /*write_dbid_to_manifest=*/testing::Bool(),
2647         /*read_only=*/testing::Bool(),
2648         /*cf_names=*/
2649         testing::Values(
2650             std::vector<std::string>(),
2651             std::vector<std::string>({kDefaultColumnFamilyName}),
2652             std::vector<std::string>({VersionSetTestBase::kColumnFamilyName1,
2653                                       VersionSetTestBase::kColumnFamilyName2,
2654                                       VersionSetTestBase::kColumnFamilyName3}),
2655             std::vector<std::string>({kDefaultColumnFamilyName,
2656                                       VersionSetTestBase::kColumnFamilyName1}),
2657             std::vector<std::string>({kDefaultColumnFamilyName,
2658                                       VersionSetTestBase::kColumnFamilyName1,
2659                                       VersionSetTestBase::kColumnFamilyName2,
2660                                       VersionSetTestBase::kColumnFamilyName3}),
2661             std::vector<std::string>(
2662                 {kDefaultColumnFamilyName,
2663                  VersionSetTestBase::kColumnFamilyName1,
2664                  VersionSetTestBase::kColumnFamilyName2,
2665                  VersionSetTestBase::kColumnFamilyName3,
2666                  VersionSetTestEmptyDb::kUnknownColumnFamilyName}))));
2667 
2668 class VersionSetTestMissingFiles : public VersionSetTestBase,
2669                                    public testing::Test {
2670  public:
VersionSetTestMissingFiles()2671   VersionSetTestMissingFiles()
2672       : VersionSetTestBase("version_set_test_missing_files"),
2673         block_based_table_options_(),
2674         table_factory_(std::make_shared<BlockBasedTableFactory>(
2675             block_based_table_options_)),
2676         internal_comparator_(
2677             std::make_shared<InternalKeyComparator>(options_.comparator)) {}
2678 
2679  protected:
PrepareManifest(std::vector<ColumnFamilyDescriptor> * column_families,SequenceNumber * last_seqno,std::unique_ptr<log::Writer> * log_writer)2680   void PrepareManifest(std::vector<ColumnFamilyDescriptor>* column_families,
2681                        SequenceNumber* last_seqno,
2682                        std::unique_ptr<log::Writer>* log_writer) override {
2683     assert(column_families != nullptr);
2684     assert(last_seqno != nullptr);
2685     assert(log_writer != nullptr);
2686     const std::string manifest = DescriptorFileName(dbname_, 1);
2687     const auto& fs = env_->GetFileSystem();
2688     std::unique_ptr<WritableFileWriter> file_writer;
2689     Status s = WritableFileWriter::Create(
2690         fs, manifest, fs->OptimizeForManifestWrite(env_options_), &file_writer,
2691         nullptr);
2692     ASSERT_OK(s);
2693     log_writer->reset(new log::Writer(std::move(file_writer), 0, false));
2694     VersionEdit new_db;
2695     if (db_options_.write_dbid_to_manifest) {
2696       DBOptions tmp_db_options;
2697       tmp_db_options.env = env_;
2698       std::unique_ptr<DBImpl> impl(new DBImpl(tmp_db_options, dbname_));
2699       std::string db_id;
2700       impl->GetDbIdentityFromIdentityFile(&db_id);
2701       new_db.SetDBId(db_id);
2702     }
2703     {
2704       std::string record;
2705       ASSERT_TRUE(new_db.EncodeTo(&record));
2706       s = (*log_writer)->AddRecord(record);
2707       ASSERT_OK(s);
2708     }
2709     const std::vector<std::string> cf_names = {
2710         kDefaultColumnFamilyName, kColumnFamilyName1, kColumnFamilyName2,
2711         kColumnFamilyName3};
2712     uint32_t cf_id = 1;  // default cf id is 0
2713     cf_options_.table_factory = table_factory_;
2714     for (const auto& cf_name : cf_names) {
2715       column_families->emplace_back(cf_name, cf_options_);
2716       if (cf_name == kDefaultColumnFamilyName) {
2717         continue;
2718       }
2719       VersionEdit new_cf;
2720       new_cf.AddColumnFamily(cf_name);
2721       new_cf.SetColumnFamily(cf_id);
2722       std::string record;
2723       ASSERT_TRUE(new_cf.EncodeTo(&record));
2724       s = (*log_writer)->AddRecord(record);
2725       ASSERT_OK(s);
2726 
2727       VersionEdit cf_files;
2728       cf_files.SetColumnFamily(cf_id);
2729       cf_files.SetLogNumber(0);
2730       record.clear();
2731       ASSERT_TRUE(cf_files.EncodeTo(&record));
2732       s = (*log_writer)->AddRecord(record);
2733       ASSERT_OK(s);
2734       ++cf_id;
2735     }
2736     SequenceNumber seq = 2;
2737     {
2738       VersionEdit edit;
2739       edit.SetNextFile(7);
2740       edit.SetLastSequence(seq);
2741       std::string record;
2742       ASSERT_TRUE(edit.EncodeTo(&record));
2743       s = (*log_writer)->AddRecord(record);
2744       ASSERT_OK(s);
2745     }
2746     *last_seqno = seq + 1;
2747   }
2748 
2749   struct SstInfo {
2750     uint64_t file_number;
2751     std::string column_family;
2752     std::string key;  // the only key
2753     int level = 0;
SstInfoROCKSDB_NAMESPACE::VersionSetTestMissingFiles::SstInfo2754     SstInfo(uint64_t file_num, const std::string& cf_name,
2755             const std::string& _key)
2756         : SstInfo(file_num, cf_name, _key, 0) {}
SstInfoROCKSDB_NAMESPACE::VersionSetTestMissingFiles::SstInfo2757     SstInfo(uint64_t file_num, const std::string& cf_name,
2758             const std::string& _key, int lvl)
2759         : file_number(file_num),
2760           column_family(cf_name),
2761           key(_key),
2762           level(lvl) {}
2763   };
2764 
2765   // Create dummy sst, return their metadata. Note that only file name and size
2766   // are used.
CreateDummyTableFiles(const std::vector<SstInfo> & file_infos,std::vector<FileMetaData> * file_metas)2767   void CreateDummyTableFiles(const std::vector<SstInfo>& file_infos,
2768                              std::vector<FileMetaData>* file_metas) {
2769     assert(file_metas != nullptr);
2770     for (const auto& info : file_infos) {
2771       uint64_t file_num = info.file_number;
2772       std::string fname = MakeTableFileName(dbname_, file_num);
2773       std::unique_ptr<FSWritableFile> file;
2774       Status s = fs_->NewWritableFile(fname, FileOptions(), &file, nullptr);
2775       ASSERT_OK(s);
2776       std::unique_ptr<WritableFileWriter> fwriter(new WritableFileWriter(
2777           std::move(file), fname, FileOptions(), env_->GetSystemClock().get()));
2778       IntTblPropCollectorFactories int_tbl_prop_collector_factories;
2779 
2780       std::unique_ptr<TableBuilder> builder(table_factory_->NewTableBuilder(
2781           TableBuilderOptions(
2782               immutable_cf_options_, mutable_cf_options_, *internal_comparator_,
2783               &int_tbl_prop_collector_factories, kNoCompression,
2784               CompressionOptions(),
2785               TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
2786               info.column_family, info.level),
2787           fwriter.get()));
2788       InternalKey ikey(info.key, 0, ValueType::kTypeValue);
2789       builder->Add(ikey.Encode(), "value");
2790       ASSERT_OK(builder->Finish());
2791       fwriter->Flush();
2792       uint64_t file_size = 0;
2793       s = fs_->GetFileSize(fname, IOOptions(), &file_size, nullptr);
2794       ASSERT_OK(s);
2795       ASSERT_NE(0, file_size);
2796       file_metas->emplace_back(file_num, /*file_path_id=*/0, file_size, ikey,
2797                                ikey, 0, 0, false, 0, 0, 0, kUnknownFileChecksum,
2798                                kUnknownFileChecksumFuncName);
2799     }
2800   }
2801 
2802   // This method updates last_sequence_.
WriteFileAdditionAndDeletionToManifest(uint32_t cf,const std::vector<std::pair<int,FileMetaData>> & added_files,const std::vector<std::pair<int,uint64_t>> & deleted_files)2803   void WriteFileAdditionAndDeletionToManifest(
2804       uint32_t cf, const std::vector<std::pair<int, FileMetaData>>& added_files,
2805       const std::vector<std::pair<int, uint64_t>>& deleted_files) {
2806     VersionEdit edit;
2807     edit.SetColumnFamily(cf);
2808     for (const auto& elem : added_files) {
2809       int level = elem.first;
2810       edit.AddFile(level, elem.second);
2811     }
2812     for (const auto& elem : deleted_files) {
2813       int level = elem.first;
2814       edit.DeleteFile(level, elem.second);
2815     }
2816     edit.SetLastSequence(last_seqno_);
2817     ++last_seqno_;
2818     assert(log_writer_.get() != nullptr);
2819     std::string record;
2820     ASSERT_TRUE(edit.EncodeTo(&record));
2821     Status s = log_writer_->AddRecord(record);
2822     ASSERT_OK(s);
2823   }
2824 
2825   BlockBasedTableOptions block_based_table_options_;
2826   std::shared_ptr<TableFactory> table_factory_;
2827   std::shared_ptr<InternalKeyComparator> internal_comparator_;
2828   std::vector<ColumnFamilyDescriptor> column_families_;
2829   SequenceNumber last_seqno_;
2830   std::unique_ptr<log::Writer> log_writer_;
2831 };
2832 
TEST_F(VersionSetTestMissingFiles,ManifestFarBehindSst)2833 TEST_F(VersionSetTestMissingFiles, ManifestFarBehindSst) {
2834   std::vector<SstInfo> existing_files = {
2835       SstInfo(100, kDefaultColumnFamilyName, "a"),
2836       SstInfo(102, kDefaultColumnFamilyName, "b"),
2837       SstInfo(103, kDefaultColumnFamilyName, "c"),
2838       SstInfo(107, kDefaultColumnFamilyName, "d"),
2839       SstInfo(110, kDefaultColumnFamilyName, "e")};
2840   std::vector<FileMetaData> file_metas;
2841   CreateDummyTableFiles(existing_files, &file_metas);
2842 
2843   PrepareManifest(&column_families_, &last_seqno_, &log_writer_);
2844   std::vector<std::pair<int, FileMetaData>> added_files;
2845   for (uint64_t file_num = 10; file_num < 15; ++file_num) {
2846     std::string smallest_ukey = "a";
2847     std::string largest_ukey = "b";
2848     InternalKey smallest_ikey(smallest_ukey, 1, ValueType::kTypeValue);
2849     InternalKey largest_ikey(largest_ukey, 1, ValueType::kTypeValue);
2850     FileMetaData meta =
2851         FileMetaData(file_num, /*file_path_id=*/0, /*file_size=*/12,
2852                      smallest_ikey, largest_ikey, 0, 0, false, 0, 0, 0,
2853                      kUnknownFileChecksum, kUnknownFileChecksumFuncName);
2854     added_files.emplace_back(0, meta);
2855   }
2856   WriteFileAdditionAndDeletionToManifest(
2857       /*cf=*/0, added_files, std::vector<std::pair<int, uint64_t>>());
2858   std::vector<std::pair<int, uint64_t>> deleted_files;
2859   deleted_files.emplace_back(0, 10);
2860   WriteFileAdditionAndDeletionToManifest(
2861       /*cf=*/0, std::vector<std::pair<int, FileMetaData>>(), deleted_files);
2862   log_writer_.reset();
2863   Status s = SetCurrentFile(fs_.get(), dbname_, 1, nullptr);
2864   ASSERT_OK(s);
2865   std::string manifest_path;
2866   VerifyManifest(&manifest_path);
2867   std::string db_id;
2868   bool has_missing_table_file = false;
2869   s = versions_->TryRecoverFromOneManifest(manifest_path, column_families_,
2870                                            /*read_only=*/false, &db_id,
2871                                            &has_missing_table_file);
2872   ASSERT_OK(s);
2873   ASSERT_TRUE(has_missing_table_file);
2874   for (ColumnFamilyData* cfd : *(versions_->GetColumnFamilySet())) {
2875     VersionStorageInfo* vstorage = cfd->current()->storage_info();
2876     const std::vector<FileMetaData*>& files = vstorage->LevelFiles(0);
2877     ASSERT_TRUE(files.empty());
2878   }
2879 }
2880 
TEST_F(VersionSetTestMissingFiles,ManifestAheadofSst)2881 TEST_F(VersionSetTestMissingFiles, ManifestAheadofSst) {
2882   std::vector<SstInfo> existing_files = {
2883       SstInfo(100, kDefaultColumnFamilyName, "a"),
2884       SstInfo(102, kDefaultColumnFamilyName, "b"),
2885       SstInfo(103, kDefaultColumnFamilyName, "c"),
2886       SstInfo(107, kDefaultColumnFamilyName, "d"),
2887       SstInfo(110, kDefaultColumnFamilyName, "e")};
2888   std::vector<FileMetaData> file_metas;
2889   CreateDummyTableFiles(existing_files, &file_metas);
2890 
2891   PrepareManifest(&column_families_, &last_seqno_, &log_writer_);
2892   std::vector<std::pair<int, FileMetaData>> added_files;
2893   for (size_t i = 3; i != 5; ++i) {
2894     added_files.emplace_back(0, file_metas[i]);
2895   }
2896   WriteFileAdditionAndDeletionToManifest(
2897       /*cf=*/0, added_files, std::vector<std::pair<int, uint64_t>>());
2898 
2899   added_files.clear();
2900   for (uint64_t file_num = 120; file_num < 130; ++file_num) {
2901     std::string smallest_ukey = "a";
2902     std::string largest_ukey = "b";
2903     InternalKey smallest_ikey(smallest_ukey, 1, ValueType::kTypeValue);
2904     InternalKey largest_ikey(largest_ukey, 1, ValueType::kTypeValue);
2905     FileMetaData meta =
2906         FileMetaData(file_num, /*file_path_id=*/0, /*file_size=*/12,
2907                      smallest_ikey, largest_ikey, 0, 0, false, 0, 0, 0,
2908                      kUnknownFileChecksum, kUnknownFileChecksumFuncName);
2909     added_files.emplace_back(0, meta);
2910   }
2911   WriteFileAdditionAndDeletionToManifest(
2912       /*cf=*/0, added_files, std::vector<std::pair<int, uint64_t>>());
2913   log_writer_.reset();
2914   Status s = SetCurrentFile(fs_.get(), dbname_, 1, nullptr);
2915   ASSERT_OK(s);
2916   std::string manifest_path;
2917   VerifyManifest(&manifest_path);
2918   std::string db_id;
2919   bool has_missing_table_file = false;
2920   s = versions_->TryRecoverFromOneManifest(manifest_path, column_families_,
2921                                            /*read_only=*/false, &db_id,
2922                                            &has_missing_table_file);
2923   ASSERT_OK(s);
2924   ASSERT_TRUE(has_missing_table_file);
2925   for (ColumnFamilyData* cfd : *(versions_->GetColumnFamilySet())) {
2926     VersionStorageInfo* vstorage = cfd->current()->storage_info();
2927     const std::vector<FileMetaData*>& files = vstorage->LevelFiles(0);
2928     if (cfd->GetName() == kDefaultColumnFamilyName) {
2929       ASSERT_EQ(2, files.size());
2930       for (const auto* fmeta : files) {
2931         if (fmeta->fd.GetNumber() != 107 && fmeta->fd.GetNumber() != 110) {
2932           ASSERT_FALSE(true);
2933         }
2934       }
2935     } else {
2936       ASSERT_TRUE(files.empty());
2937     }
2938   }
2939 }
2940 
TEST_F(VersionSetTestMissingFiles,NoFileMissing)2941 TEST_F(VersionSetTestMissingFiles, NoFileMissing) {
2942   std::vector<SstInfo> existing_files = {
2943       SstInfo(100, kDefaultColumnFamilyName, "a"),
2944       SstInfo(102, kDefaultColumnFamilyName, "b"),
2945       SstInfo(103, kDefaultColumnFamilyName, "c"),
2946       SstInfo(107, kDefaultColumnFamilyName, "d"),
2947       SstInfo(110, kDefaultColumnFamilyName, "e")};
2948   std::vector<FileMetaData> file_metas;
2949   CreateDummyTableFiles(existing_files, &file_metas);
2950 
2951   PrepareManifest(&column_families_, &last_seqno_, &log_writer_);
2952   std::vector<std::pair<int, FileMetaData>> added_files;
2953   for (const auto& meta : file_metas) {
2954     added_files.emplace_back(0, meta);
2955   }
2956   WriteFileAdditionAndDeletionToManifest(
2957       /*cf=*/0, added_files, std::vector<std::pair<int, uint64_t>>());
2958   std::vector<std::pair<int, uint64_t>> deleted_files;
2959   deleted_files.emplace_back(/*level=*/0, 100);
2960   WriteFileAdditionAndDeletionToManifest(
2961       /*cf=*/0, std::vector<std::pair<int, FileMetaData>>(), deleted_files);
2962   log_writer_.reset();
2963   Status s = SetCurrentFile(fs_.get(), dbname_, 1, nullptr);
2964   ASSERT_OK(s);
2965   std::string manifest_path;
2966   VerifyManifest(&manifest_path);
2967   std::string db_id;
2968   bool has_missing_table_file = false;
2969   s = versions_->TryRecoverFromOneManifest(manifest_path, column_families_,
2970                                            /*read_only=*/false, &db_id,
2971                                            &has_missing_table_file);
2972   ASSERT_OK(s);
2973   ASSERT_FALSE(has_missing_table_file);
2974   for (ColumnFamilyData* cfd : *(versions_->GetColumnFamilySet())) {
2975     VersionStorageInfo* vstorage = cfd->current()->storage_info();
2976     const std::vector<FileMetaData*>& files = vstorage->LevelFiles(0);
2977     if (cfd->GetName() == kDefaultColumnFamilyName) {
2978       ASSERT_EQ(existing_files.size() - deleted_files.size(), files.size());
2979       bool has_deleted_file = false;
2980       for (const auto* fmeta : files) {
2981         if (fmeta->fd.GetNumber() == 100) {
2982           has_deleted_file = true;
2983           break;
2984         }
2985       }
2986       ASSERT_FALSE(has_deleted_file);
2987     } else {
2988       ASSERT_TRUE(files.empty());
2989     }
2990   }
2991 }
2992 
TEST_F(VersionSetTestMissingFiles,MinLogNumberToKeep2PC)2993 TEST_F(VersionSetTestMissingFiles, MinLogNumberToKeep2PC) {
2994   NewDB();
2995 
2996   SstInfo sst(100, kDefaultColumnFamilyName, "a");
2997   std::vector<FileMetaData> file_metas;
2998   CreateDummyTableFiles({sst}, &file_metas);
2999 
3000   constexpr WalNumber kMinWalNumberToKeep2PC = 10;
3001   VersionEdit edit;
3002   edit.AddFile(0, file_metas[0]);
3003   edit.SetMinLogNumberToKeep(kMinWalNumberToKeep2PC);
3004   ASSERT_OK(LogAndApplyToDefaultCF(edit));
3005   ASSERT_EQ(versions_->min_log_number_to_keep_2pc(), kMinWalNumberToKeep2PC);
3006 
3007   for (int i = 0; i < 3; i++) {
3008     CreateNewManifest();
3009     ReopenDB();
3010     ASSERT_EQ(versions_->min_log_number_to_keep_2pc(), kMinWalNumberToKeep2PC);
3011   }
3012 }
3013 
3014 }  // namespace ROCKSDB_NAMESPACE
3015 
main(int argc,char ** argv)3016 int main(int argc, char** argv) {
3017   ::testing::InitGoogleTest(&argc, argv);
3018   return RUN_ALL_TESTS();
3019 }
3020