1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "db/version_set.h"
11 #include "db/db_impl/db_impl.h"
12 #include "db/log_writer.h"
13 #include "logging/logging.h"
14 #include "table/mock_table.h"
15 #include "test_util/testharness.h"
16 #include "test_util/testutil.h"
17 #include "util/string_util.h"
18 
19 namespace ROCKSDB_NAMESPACE {
20 
21 class GenerateLevelFilesBriefTest : public testing::Test {
22  public:
23   std::vector<FileMetaData*> files_;
24   LevelFilesBrief file_level_;
25   Arena arena_;
26 
GenerateLevelFilesBriefTest()27   GenerateLevelFilesBriefTest() { }
28 
~GenerateLevelFilesBriefTest()29   ~GenerateLevelFilesBriefTest() override {
30     for (size_t i = 0; i < files_.size(); i++) {
31       delete files_[i];
32     }
33   }
34 
Add(const char * smallest,const char * largest,SequenceNumber smallest_seq=100,SequenceNumber largest_seq=100)35   void Add(const char* smallest, const char* largest,
36            SequenceNumber smallest_seq = 100,
37            SequenceNumber largest_seq = 100) {
38     FileMetaData* f = new FileMetaData(
39         files_.size() + 1, 0, 0,
40         InternalKey(smallest, smallest_seq, kTypeValue),
41         InternalKey(largest, largest_seq, kTypeValue), smallest_seq,
42         largest_seq, /* marked_for_compact */ false, kInvalidBlobFileNumber,
43         kUnknownOldestAncesterTime, kUnknownFileCreationTime,
44         kUnknownFileChecksum, kUnknownFileChecksumFuncName);
45     files_.push_back(f);
46   }
47 
Compare()48   int Compare() {
49     int diff = 0;
50     for (size_t i = 0; i < files_.size(); i++) {
51       if (file_level_.files[i].fd.GetNumber() != files_[i]->fd.GetNumber()) {
52         diff++;
53       }
54     }
55     return diff;
56   }
57 };
58 
TEST_F(GenerateLevelFilesBriefTest,Empty)59 TEST_F(GenerateLevelFilesBriefTest, Empty) {
60   DoGenerateLevelFilesBrief(&file_level_, files_, &arena_);
61   ASSERT_EQ(0u, file_level_.num_files);
62   ASSERT_EQ(0, Compare());
63 }
64 
TEST_F(GenerateLevelFilesBriefTest,Single)65 TEST_F(GenerateLevelFilesBriefTest, Single) {
66   Add("p", "q");
67   DoGenerateLevelFilesBrief(&file_level_, files_, &arena_);
68   ASSERT_EQ(1u, file_level_.num_files);
69   ASSERT_EQ(0, Compare());
70 }
71 
TEST_F(GenerateLevelFilesBriefTest,Multiple)72 TEST_F(GenerateLevelFilesBriefTest, Multiple) {
73   Add("150", "200");
74   Add("200", "250");
75   Add("300", "350");
76   Add("400", "450");
77   DoGenerateLevelFilesBrief(&file_level_, files_, &arena_);
78   ASSERT_EQ(4u, file_level_.num_files);
79   ASSERT_EQ(0, Compare());
80 }
81 
82 class CountingLogger : public Logger {
83  public:
CountingLogger()84   CountingLogger() : log_count(0) {}
85   using Logger::Logv;
Logv(const char *,va_list)86   void Logv(const char* /*format*/, va_list /*ap*/) override { log_count++; }
87   int log_count;
88 };
89 
GetOptionsWithNumLevels(int num_levels,std::shared_ptr<CountingLogger> logger)90 Options GetOptionsWithNumLevels(int num_levels,
91                                 std::shared_ptr<CountingLogger> logger) {
92   Options opt;
93   opt.num_levels = num_levels;
94   opt.info_log = logger;
95   return opt;
96 }
97 
98 class VersionStorageInfoTest : public testing::Test {
99  public:
100   const Comparator* ucmp_;
101   InternalKeyComparator icmp_;
102   std::shared_ptr<CountingLogger> logger_;
103   Options options_;
104   ImmutableCFOptions ioptions_;
105   MutableCFOptions mutable_cf_options_;
106   VersionStorageInfo vstorage_;
107 
GetInternalKey(const char * ukey,SequenceNumber smallest_seq=100)108   InternalKey GetInternalKey(const char* ukey,
109                              SequenceNumber smallest_seq = 100) {
110     return InternalKey(ukey, smallest_seq, kTypeValue);
111   }
112 
VersionStorageInfoTest()113   VersionStorageInfoTest()
114       : ucmp_(BytewiseComparator()),
115         icmp_(ucmp_),
116         logger_(new CountingLogger()),
117         options_(GetOptionsWithNumLevels(6, logger_)),
118         ioptions_(options_),
119         mutable_cf_options_(options_),
120         vstorage_(&icmp_, ucmp_, 6, kCompactionStyleLevel, nullptr, false) {}
121 
~VersionStorageInfoTest()122   ~VersionStorageInfoTest() override {
123     for (int i = 0; i < vstorage_.num_levels(); i++) {
124       for (auto* f : vstorage_.LevelFiles(i)) {
125         if (--f->refs == 0) {
126           delete f;
127         }
128       }
129     }
130   }
131 
Add(int level,uint32_t file_number,const char * smallest,const char * largest,uint64_t file_size=0)132   void Add(int level, uint32_t file_number, const char* smallest,
133            const char* largest, uint64_t file_size = 0) {
134     assert(level < vstorage_.num_levels());
135     FileMetaData* f = new FileMetaData(
136         file_number, 0, file_size, GetInternalKey(smallest, 0),
137         GetInternalKey(largest, 0), /* smallest_seq */ 0, /* largest_seq */ 0,
138         /* marked_for_compact */ false, kInvalidBlobFileNumber,
139         kUnknownOldestAncesterTime, kUnknownFileCreationTime,
140         kUnknownFileChecksum, kUnknownFileChecksumFuncName);
141     f->compensated_file_size = file_size;
142     vstorage_.AddFile(level, f);
143   }
144 
Add(int level,uint32_t file_number,const InternalKey & smallest,const InternalKey & largest,uint64_t file_size=0)145   void Add(int level, uint32_t file_number, const InternalKey& smallest,
146            const InternalKey& largest, uint64_t file_size = 0) {
147     assert(level < vstorage_.num_levels());
148     FileMetaData* f = new FileMetaData(
149         file_number, 0, file_size, smallest, largest, /* smallest_seq */ 0,
150         /* largest_seq */ 0, /* marked_for_compact */ false,
151         kInvalidBlobFileNumber, kUnknownOldestAncesterTime,
152         kUnknownFileCreationTime, kUnknownFileChecksum,
153         kUnknownFileChecksumFuncName);
154     f->compensated_file_size = file_size;
155     vstorage_.AddFile(level, f);
156   }
157 
GetOverlappingFiles(int level,const InternalKey & begin,const InternalKey & end)158   std::string GetOverlappingFiles(int level, const InternalKey& begin,
159                                   const InternalKey& end) {
160     std::vector<FileMetaData*> inputs;
161     vstorage_.GetOverlappingInputs(level, &begin, &end, &inputs);
162 
163     std::string result;
164     for (size_t i = 0; i < inputs.size(); ++i) {
165       if (i > 0) {
166         result += ",";
167       }
168       AppendNumberTo(&result, inputs[i]->fd.GetNumber());
169     }
170     return result;
171   }
172 };
173 
TEST_F(VersionStorageInfoTest,MaxBytesForLevelStatic)174 TEST_F(VersionStorageInfoTest, MaxBytesForLevelStatic) {
175   ioptions_.level_compaction_dynamic_level_bytes = false;
176   mutable_cf_options_.max_bytes_for_level_base = 10;
177   mutable_cf_options_.max_bytes_for_level_multiplier = 5;
178   Add(4, 100U, "1", "2");
179   Add(5, 101U, "1", "2");
180 
181   vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
182   ASSERT_EQ(vstorage_.MaxBytesForLevel(1), 10U);
183   ASSERT_EQ(vstorage_.MaxBytesForLevel(2), 50U);
184   ASSERT_EQ(vstorage_.MaxBytesForLevel(3), 250U);
185   ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 1250U);
186 
187   ASSERT_EQ(0, logger_->log_count);
188 }
189 
TEST_F(VersionStorageInfoTest,MaxBytesForLevelDynamic)190 TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamic) {
191   ioptions_.level_compaction_dynamic_level_bytes = true;
192   mutable_cf_options_.max_bytes_for_level_base = 1000;
193   mutable_cf_options_.max_bytes_for_level_multiplier = 5;
194   Add(5, 1U, "1", "2", 500U);
195 
196   vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
197   ASSERT_EQ(0, logger_->log_count);
198   ASSERT_EQ(vstorage_.base_level(), 5);
199 
200   Add(5, 2U, "3", "4", 550U);
201   vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
202   ASSERT_EQ(0, logger_->log_count);
203   ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 1000U);
204   ASSERT_EQ(vstorage_.base_level(), 4);
205 
206   Add(4, 3U, "3", "4", 550U);
207   vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
208   ASSERT_EQ(0, logger_->log_count);
209   ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 1000U);
210   ASSERT_EQ(vstorage_.base_level(), 4);
211 
212   Add(3, 4U, "3", "4", 250U);
213   Add(3, 5U, "5", "7", 300U);
214   vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
215   ASSERT_EQ(1, logger_->log_count);
216   ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 1005U);
217   ASSERT_EQ(vstorage_.MaxBytesForLevel(3), 1000U);
218   ASSERT_EQ(vstorage_.base_level(), 3);
219 
220   Add(1, 6U, "3", "4", 5U);
221   Add(1, 7U, "8", "9", 5U);
222   logger_->log_count = 0;
223   vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
224   ASSERT_EQ(1, logger_->log_count);
225   ASSERT_GT(vstorage_.MaxBytesForLevel(4), 1005U);
226   ASSERT_GT(vstorage_.MaxBytesForLevel(3), 1005U);
227   ASSERT_EQ(vstorage_.MaxBytesForLevel(2), 1005U);
228   ASSERT_EQ(vstorage_.MaxBytesForLevel(1), 1000U);
229   ASSERT_EQ(vstorage_.base_level(), 1);
230 }
231 
TEST_F(VersionStorageInfoTest,MaxBytesForLevelDynamicLotsOfData)232 TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicLotsOfData) {
233   ioptions_.level_compaction_dynamic_level_bytes = true;
234   mutable_cf_options_.max_bytes_for_level_base = 100;
235   mutable_cf_options_.max_bytes_for_level_multiplier = 2;
236   Add(0, 1U, "1", "2", 50U);
237   Add(1, 2U, "1", "2", 50U);
238   Add(2, 3U, "1", "2", 500U);
239   Add(3, 4U, "1", "2", 500U);
240   Add(4, 5U, "1", "2", 1700U);
241   Add(5, 6U, "1", "2", 500U);
242 
243   vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
244   ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 800U);
245   ASSERT_EQ(vstorage_.MaxBytesForLevel(3), 400U);
246   ASSERT_EQ(vstorage_.MaxBytesForLevel(2), 200U);
247   ASSERT_EQ(vstorage_.MaxBytesForLevel(1), 100U);
248   ASSERT_EQ(vstorage_.base_level(), 1);
249   ASSERT_EQ(0, logger_->log_count);
250 }
251 
TEST_F(VersionStorageInfoTest,MaxBytesForLevelDynamicLargeLevel)252 TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicLargeLevel) {
253   uint64_t kOneGB = 1000U * 1000U * 1000U;
254   ioptions_.level_compaction_dynamic_level_bytes = true;
255   mutable_cf_options_.max_bytes_for_level_base = 10U * kOneGB;
256   mutable_cf_options_.max_bytes_for_level_multiplier = 10;
257   Add(0, 1U, "1", "2", 50U);
258   Add(3, 4U, "1", "2", 32U * kOneGB);
259   Add(4, 5U, "1", "2", 500U * kOneGB);
260   Add(5, 6U, "1", "2", 3000U * kOneGB);
261 
262   vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
263   ASSERT_EQ(vstorage_.MaxBytesForLevel(5), 3000U * kOneGB);
264   ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 300U * kOneGB);
265   ASSERT_EQ(vstorage_.MaxBytesForLevel(3), 30U * kOneGB);
266   ASSERT_EQ(vstorage_.MaxBytesForLevel(2), 10U * kOneGB);
267   ASSERT_EQ(vstorage_.base_level(), 2);
268   ASSERT_EQ(0, logger_->log_count);
269 }
270 
TEST_F(VersionStorageInfoTest,MaxBytesForLevelDynamicWithLargeL0_1)271 TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicWithLargeL0_1) {
272   ioptions_.level_compaction_dynamic_level_bytes = true;
273   mutable_cf_options_.max_bytes_for_level_base = 40000;
274   mutable_cf_options_.max_bytes_for_level_multiplier = 5;
275   mutable_cf_options_.level0_file_num_compaction_trigger = 2;
276 
277   Add(0, 1U, "1", "2", 10000U);
278   Add(0, 2U, "1", "2", 10000U);
279   Add(0, 3U, "1", "2", 10000U);
280 
281   Add(5, 4U, "1", "2", 1286250U);
282   Add(4, 5U, "1", "2", 200000U);
283   Add(3, 6U, "1", "2", 40000U);
284   Add(2, 7U, "1", "2", 8000U);
285 
286   vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
287   ASSERT_EQ(0, logger_->log_count);
288   ASSERT_EQ(2, vstorage_.base_level());
289   // level multiplier should be 3.5
290   ASSERT_EQ(vstorage_.level_multiplier(), 5.0);
291   // Level size should be around 30,000, 105,000, 367,500
292   ASSERT_EQ(40000U, vstorage_.MaxBytesForLevel(2));
293   ASSERT_EQ(51450U, vstorage_.MaxBytesForLevel(3));
294   ASSERT_EQ(257250U, vstorage_.MaxBytesForLevel(4));
295 }
296 
TEST_F(VersionStorageInfoTest,MaxBytesForLevelDynamicWithLargeL0_2)297 TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicWithLargeL0_2) {
298   ioptions_.level_compaction_dynamic_level_bytes = true;
299   mutable_cf_options_.max_bytes_for_level_base = 10000;
300   mutable_cf_options_.max_bytes_for_level_multiplier = 5;
301   mutable_cf_options_.level0_file_num_compaction_trigger = 2;
302 
303   Add(0, 11U, "1", "2", 10000U);
304   Add(0, 12U, "1", "2", 10000U);
305   Add(0, 13U, "1", "2", 10000U);
306 
307   Add(5, 4U, "1", "2", 1286250U);
308   Add(4, 5U, "1", "2", 200000U);
309   Add(3, 6U, "1", "2", 40000U);
310   Add(2, 7U, "1", "2", 8000U);
311 
312   vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
313   ASSERT_EQ(0, logger_->log_count);
314   ASSERT_EQ(2, vstorage_.base_level());
315   // level multiplier should be 3.5
316   ASSERT_LT(vstorage_.level_multiplier(), 3.6);
317   ASSERT_GT(vstorage_.level_multiplier(), 3.4);
318   // Level size should be around 30,000, 105,000, 367,500
319   ASSERT_EQ(30000U, vstorage_.MaxBytesForLevel(2));
320   ASSERT_LT(vstorage_.MaxBytesForLevel(3), 110000U);
321   ASSERT_GT(vstorage_.MaxBytesForLevel(3), 100000U);
322   ASSERT_LT(vstorage_.MaxBytesForLevel(4), 370000U);
323   ASSERT_GT(vstorage_.MaxBytesForLevel(4), 360000U);
324 }
325 
TEST_F(VersionStorageInfoTest,MaxBytesForLevelDynamicWithLargeL0_3)326 TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicWithLargeL0_3) {
327   ioptions_.level_compaction_dynamic_level_bytes = true;
328   mutable_cf_options_.max_bytes_for_level_base = 10000;
329   mutable_cf_options_.max_bytes_for_level_multiplier = 5;
330   mutable_cf_options_.level0_file_num_compaction_trigger = 2;
331 
332   Add(0, 11U, "1", "2", 5000U);
333   Add(0, 12U, "1", "2", 5000U);
334   Add(0, 13U, "1", "2", 5000U);
335   Add(0, 14U, "1", "2", 5000U);
336   Add(0, 15U, "1", "2", 5000U);
337   Add(0, 16U, "1", "2", 5000U);
338 
339   Add(5, 4U, "1", "2", 1286250U);
340   Add(4, 5U, "1", "2", 200000U);
341   Add(3, 6U, "1", "2", 40000U);
342   Add(2, 7U, "1", "2", 8000U);
343 
344   vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
345   ASSERT_EQ(0, logger_->log_count);
346   ASSERT_EQ(2, vstorage_.base_level());
347   // level multiplier should be 3.5
348   ASSERT_LT(vstorage_.level_multiplier(), 3.6);
349   ASSERT_GT(vstorage_.level_multiplier(), 3.4);
350   // Level size should be around 30,000, 105,000, 367,500
351   ASSERT_EQ(30000U, vstorage_.MaxBytesForLevel(2));
352   ASSERT_LT(vstorage_.MaxBytesForLevel(3), 110000U);
353   ASSERT_GT(vstorage_.MaxBytesForLevel(3), 100000U);
354   ASSERT_LT(vstorage_.MaxBytesForLevel(4), 370000U);
355   ASSERT_GT(vstorage_.MaxBytesForLevel(4), 360000U);
356 }
357 
TEST_F(VersionStorageInfoTest,EstimateLiveDataSize)358 TEST_F(VersionStorageInfoTest, EstimateLiveDataSize) {
359   // Test whether the overlaps are detected as expected
360   Add(1, 1U, "4", "7", 1U);  // Perfect overlap with last level
361   Add(2, 2U, "3", "5", 1U);  // Partial overlap with last level
362   Add(2, 3U, "6", "8", 1U);  // Partial overlap with last level
363   Add(3, 4U, "1", "9", 1U);  // Contains range of last level
364   Add(4, 5U, "4", "5", 1U);  // Inside range of last level
365   Add(4, 5U, "6", "7", 1U);  // Inside range of last level
366   Add(5, 6U, "4", "7", 10U);
367   ASSERT_EQ(10U, vstorage_.EstimateLiveDataSize());
368 }
369 
TEST_F(VersionStorageInfoTest,EstimateLiveDataSize2)370 TEST_F(VersionStorageInfoTest, EstimateLiveDataSize2) {
371   Add(0, 1U, "9", "9", 1U);  // Level 0 is not ordered
372   Add(0, 1U, "5", "6", 1U);  // Ignored because of [5,6] in l1
373   Add(1, 1U, "1", "2", 1U);  // Ignored because of [2,3] in l2
374   Add(1, 2U, "3", "4", 1U);  // Ignored because of [2,3] in l2
375   Add(1, 3U, "5", "6", 1U);
376   Add(2, 4U, "2", "3", 1U);
377   Add(3, 5U, "7", "8", 1U);
378   ASSERT_EQ(4U, vstorage_.EstimateLiveDataSize());
379 }
380 
TEST_F(VersionStorageInfoTest,GetOverlappingInputs)381 TEST_F(VersionStorageInfoTest, GetOverlappingInputs) {
382   // Two files that overlap at the range deletion tombstone sentinel.
383   Add(1, 1U, {"a", 0, kTypeValue}, {"b", kMaxSequenceNumber, kTypeRangeDeletion}, 1);
384   Add(1, 2U, {"b", 0, kTypeValue}, {"c", 0, kTypeValue}, 1);
385   // Two files that overlap at the same user key.
386   Add(1, 3U, {"d", 0, kTypeValue}, {"e", kMaxSequenceNumber, kTypeValue}, 1);
387   Add(1, 4U, {"e", 0, kTypeValue}, {"f", 0, kTypeValue}, 1);
388   // Two files that do not overlap.
389   Add(1, 5U, {"g", 0, kTypeValue}, {"h", 0, kTypeValue}, 1);
390   Add(1, 6U, {"i", 0, kTypeValue}, {"j", 0, kTypeValue}, 1);
391   vstorage_.UpdateNumNonEmptyLevels();
392   vstorage_.GenerateLevelFilesBrief();
393 
394   ASSERT_EQ("1,2", GetOverlappingFiles(
395       1, {"a", 0, kTypeValue}, {"b", 0, kTypeValue}));
396   ASSERT_EQ("1", GetOverlappingFiles(
397       1, {"a", 0, kTypeValue}, {"b", kMaxSequenceNumber, kTypeRangeDeletion}));
398   ASSERT_EQ("2", GetOverlappingFiles(
399       1, {"b", kMaxSequenceNumber, kTypeValue}, {"c", 0, kTypeValue}));
400   ASSERT_EQ("3,4", GetOverlappingFiles(
401       1, {"d", 0, kTypeValue}, {"e", 0, kTypeValue}));
402   ASSERT_EQ("3", GetOverlappingFiles(
403       1, {"d", 0, kTypeValue}, {"e", kMaxSequenceNumber, kTypeRangeDeletion}));
404   ASSERT_EQ("3,4", GetOverlappingFiles(
405       1, {"e", kMaxSequenceNumber, kTypeValue}, {"f", 0, kTypeValue}));
406   ASSERT_EQ("3,4", GetOverlappingFiles(
407       1, {"e", 0, kTypeValue}, {"f", 0, kTypeValue}));
408   ASSERT_EQ("5", GetOverlappingFiles(
409       1, {"g", 0, kTypeValue}, {"h", 0, kTypeValue}));
410   ASSERT_EQ("6", GetOverlappingFiles(
411       1, {"i", 0, kTypeValue}, {"j", 0, kTypeValue}));
412 }
413 
414 
415 class FindLevelFileTest : public testing::Test {
416  public:
417   LevelFilesBrief file_level_;
418   bool disjoint_sorted_files_;
419   Arena arena_;
420 
FindLevelFileTest()421   FindLevelFileTest() : disjoint_sorted_files_(true) { }
422 
~FindLevelFileTest()423   ~FindLevelFileTest() override {}
424 
LevelFileInit(size_t num=0)425   void LevelFileInit(size_t num = 0) {
426     char* mem = arena_.AllocateAligned(num * sizeof(FdWithKeyRange));
427     file_level_.files = new (mem)FdWithKeyRange[num];
428     file_level_.num_files = 0;
429   }
430 
Add(const char * smallest,const char * largest,SequenceNumber smallest_seq=100,SequenceNumber largest_seq=100)431   void Add(const char* smallest, const char* largest,
432            SequenceNumber smallest_seq = 100,
433            SequenceNumber largest_seq = 100) {
434     InternalKey smallest_key = InternalKey(smallest, smallest_seq, kTypeValue);
435     InternalKey largest_key = InternalKey(largest, largest_seq, kTypeValue);
436 
437     Slice smallest_slice = smallest_key.Encode();
438     Slice largest_slice = largest_key.Encode();
439 
440     char* mem = arena_.AllocateAligned(
441         smallest_slice.size() + largest_slice.size());
442     memcpy(mem, smallest_slice.data(), smallest_slice.size());
443     memcpy(mem + smallest_slice.size(), largest_slice.data(),
444         largest_slice.size());
445 
446     // add to file_level_
447     size_t num = file_level_.num_files;
448     auto& file = file_level_.files[num];
449     file.fd = FileDescriptor(num + 1, 0, 0);
450     file.smallest_key = Slice(mem, smallest_slice.size());
451     file.largest_key = Slice(mem + smallest_slice.size(),
452         largest_slice.size());
453     file_level_.num_files++;
454   }
455 
Find(const char * key)456   int Find(const char* key) {
457     InternalKey target(key, 100, kTypeValue);
458     InternalKeyComparator cmp(BytewiseComparator());
459     return FindFile(cmp, file_level_, target.Encode());
460   }
461 
Overlaps(const char * smallest,const char * largest)462   bool Overlaps(const char* smallest, const char* largest) {
463     InternalKeyComparator cmp(BytewiseComparator());
464     Slice s(smallest != nullptr ? smallest : "");
465     Slice l(largest != nullptr ? largest : "");
466     return SomeFileOverlapsRange(cmp, disjoint_sorted_files_, file_level_,
467                                  (smallest != nullptr ? &s : nullptr),
468                                  (largest != nullptr ? &l : nullptr));
469   }
470 };
471 
TEST_F(FindLevelFileTest,LevelEmpty)472 TEST_F(FindLevelFileTest, LevelEmpty) {
473   LevelFileInit(0);
474 
475   ASSERT_EQ(0, Find("foo"));
476   ASSERT_TRUE(! Overlaps("a", "z"));
477   ASSERT_TRUE(! Overlaps(nullptr, "z"));
478   ASSERT_TRUE(! Overlaps("a", nullptr));
479   ASSERT_TRUE(! Overlaps(nullptr, nullptr));
480 }
481 
TEST_F(FindLevelFileTest,LevelSingle)482 TEST_F(FindLevelFileTest, LevelSingle) {
483   LevelFileInit(1);
484 
485   Add("p", "q");
486   ASSERT_EQ(0, Find("a"));
487   ASSERT_EQ(0, Find("p"));
488   ASSERT_EQ(0, Find("p1"));
489   ASSERT_EQ(0, Find("q"));
490   ASSERT_EQ(1, Find("q1"));
491   ASSERT_EQ(1, Find("z"));
492 
493   ASSERT_TRUE(! Overlaps("a", "b"));
494   ASSERT_TRUE(! Overlaps("z1", "z2"));
495   ASSERT_TRUE(Overlaps("a", "p"));
496   ASSERT_TRUE(Overlaps("a", "q"));
497   ASSERT_TRUE(Overlaps("a", "z"));
498   ASSERT_TRUE(Overlaps("p", "p1"));
499   ASSERT_TRUE(Overlaps("p", "q"));
500   ASSERT_TRUE(Overlaps("p", "z"));
501   ASSERT_TRUE(Overlaps("p1", "p2"));
502   ASSERT_TRUE(Overlaps("p1", "z"));
503   ASSERT_TRUE(Overlaps("q", "q"));
504   ASSERT_TRUE(Overlaps("q", "q1"));
505 
506   ASSERT_TRUE(! Overlaps(nullptr, "j"));
507   ASSERT_TRUE(! Overlaps("r", nullptr));
508   ASSERT_TRUE(Overlaps(nullptr, "p"));
509   ASSERT_TRUE(Overlaps(nullptr, "p1"));
510   ASSERT_TRUE(Overlaps("q", nullptr));
511   ASSERT_TRUE(Overlaps(nullptr, nullptr));
512 }
513 
TEST_F(FindLevelFileTest,LevelMultiple)514 TEST_F(FindLevelFileTest, LevelMultiple) {
515   LevelFileInit(4);
516 
517   Add("150", "200");
518   Add("200", "250");
519   Add("300", "350");
520   Add("400", "450");
521   ASSERT_EQ(0, Find("100"));
522   ASSERT_EQ(0, Find("150"));
523   ASSERT_EQ(0, Find("151"));
524   ASSERT_EQ(0, Find("199"));
525   ASSERT_EQ(0, Find("200"));
526   ASSERT_EQ(1, Find("201"));
527   ASSERT_EQ(1, Find("249"));
528   ASSERT_EQ(1, Find("250"));
529   ASSERT_EQ(2, Find("251"));
530   ASSERT_EQ(2, Find("299"));
531   ASSERT_EQ(2, Find("300"));
532   ASSERT_EQ(2, Find("349"));
533   ASSERT_EQ(2, Find("350"));
534   ASSERT_EQ(3, Find("351"));
535   ASSERT_EQ(3, Find("400"));
536   ASSERT_EQ(3, Find("450"));
537   ASSERT_EQ(4, Find("451"));
538 
539   ASSERT_TRUE(! Overlaps("100", "149"));
540   ASSERT_TRUE(! Overlaps("251", "299"));
541   ASSERT_TRUE(! Overlaps("451", "500"));
542   ASSERT_TRUE(! Overlaps("351", "399"));
543 
544   ASSERT_TRUE(Overlaps("100", "150"));
545   ASSERT_TRUE(Overlaps("100", "200"));
546   ASSERT_TRUE(Overlaps("100", "300"));
547   ASSERT_TRUE(Overlaps("100", "400"));
548   ASSERT_TRUE(Overlaps("100", "500"));
549   ASSERT_TRUE(Overlaps("375", "400"));
550   ASSERT_TRUE(Overlaps("450", "450"));
551   ASSERT_TRUE(Overlaps("450", "500"));
552 }
553 
TEST_F(FindLevelFileTest,LevelMultipleNullBoundaries)554 TEST_F(FindLevelFileTest, LevelMultipleNullBoundaries) {
555   LevelFileInit(4);
556 
557   Add("150", "200");
558   Add("200", "250");
559   Add("300", "350");
560   Add("400", "450");
561   ASSERT_TRUE(! Overlaps(nullptr, "149"));
562   ASSERT_TRUE(! Overlaps("451", nullptr));
563   ASSERT_TRUE(Overlaps(nullptr, nullptr));
564   ASSERT_TRUE(Overlaps(nullptr, "150"));
565   ASSERT_TRUE(Overlaps(nullptr, "199"));
566   ASSERT_TRUE(Overlaps(nullptr, "200"));
567   ASSERT_TRUE(Overlaps(nullptr, "201"));
568   ASSERT_TRUE(Overlaps(nullptr, "400"));
569   ASSERT_TRUE(Overlaps(nullptr, "800"));
570   ASSERT_TRUE(Overlaps("100", nullptr));
571   ASSERT_TRUE(Overlaps("200", nullptr));
572   ASSERT_TRUE(Overlaps("449", nullptr));
573   ASSERT_TRUE(Overlaps("450", nullptr));
574 }
575 
TEST_F(FindLevelFileTest,LevelOverlapSequenceChecks)576 TEST_F(FindLevelFileTest, LevelOverlapSequenceChecks) {
577   LevelFileInit(1);
578 
579   Add("200", "200", 5000, 3000);
580   ASSERT_TRUE(! Overlaps("199", "199"));
581   ASSERT_TRUE(! Overlaps("201", "300"));
582   ASSERT_TRUE(Overlaps("200", "200"));
583   ASSERT_TRUE(Overlaps("190", "200"));
584   ASSERT_TRUE(Overlaps("200", "210"));
585 }
586 
TEST_F(FindLevelFileTest,LevelOverlappingFiles)587 TEST_F(FindLevelFileTest, LevelOverlappingFiles) {
588   LevelFileInit(2);
589 
590   Add("150", "600");
591   Add("400", "500");
592   disjoint_sorted_files_ = false;
593   ASSERT_TRUE(! Overlaps("100", "149"));
594   ASSERT_TRUE(! Overlaps("601", "700"));
595   ASSERT_TRUE(Overlaps("100", "150"));
596   ASSERT_TRUE(Overlaps("100", "200"));
597   ASSERT_TRUE(Overlaps("100", "300"));
598   ASSERT_TRUE(Overlaps("100", "400"));
599   ASSERT_TRUE(Overlaps("100", "500"));
600   ASSERT_TRUE(Overlaps("375", "400"));
601   ASSERT_TRUE(Overlaps("450", "450"));
602   ASSERT_TRUE(Overlaps("450", "500"));
603   ASSERT_TRUE(Overlaps("450", "700"));
604   ASSERT_TRUE(Overlaps("600", "700"));
605 }
606 
607 class VersionSetTestBase {
608  public:
609   const static std::string kColumnFamilyName1;
610   const static std::string kColumnFamilyName2;
611   const static std::string kColumnFamilyName3;
612   int num_initial_edits_;
613 
VersionSetTestBase()614   VersionSetTestBase()
615       : env_(Env::Default()),
616         fs_(std::make_shared<LegacyFileSystemWrapper>(env_)),
617         dbname_(test::PerThreadDBPath("version_set_test")),
618         db_options_(),
619         mutable_cf_options_(cf_options_),
620         table_cache_(NewLRUCache(50000, 16)),
621         write_buffer_manager_(db_options_.db_write_buffer_size),
622         shutting_down_(false),
623         mock_table_factory_(std::make_shared<mock::MockTableFactory>()) {
624     EXPECT_OK(env_->CreateDirIfMissing(dbname_));
625 
626     db_options_.env = env_;
627     db_options_.fs = fs_;
628     versions_.reset(new VersionSet(dbname_, &db_options_, env_options_,
629                                    table_cache_.get(), &write_buffer_manager_,
630                                    &write_controller_,
631                                    /*block_cache_tracer=*/nullptr)),
632         reactive_versions_ = std::make_shared<ReactiveVersionSet>(
633             dbname_, &db_options_, env_options_, table_cache_.get(),
634             &write_buffer_manager_, &write_controller_);
635     db_options_.db_paths.emplace_back(dbname_,
636                                       std::numeric_limits<uint64_t>::max());
637   }
638 
PrepareManifest(std::vector<ColumnFamilyDescriptor> * column_families,SequenceNumber * last_seqno,std::unique_ptr<log::Writer> * log_writer)639   void PrepareManifest(std::vector<ColumnFamilyDescriptor>* column_families,
640                        SequenceNumber* last_seqno,
641                        std::unique_ptr<log::Writer>* log_writer) {
642     assert(column_families != nullptr);
643     assert(last_seqno != nullptr);
644     assert(log_writer != nullptr);
645     VersionEdit new_db;
646     if (db_options_.write_dbid_to_manifest) {
647       DBImpl* impl = new DBImpl(DBOptions(), dbname_);
648       std::string db_id;
649       impl->GetDbIdentityFromIdentityFile(&db_id);
650       new_db.SetDBId(db_id);
651     }
652     new_db.SetLogNumber(0);
653     new_db.SetNextFile(2);
654     new_db.SetLastSequence(0);
655 
656     const std::vector<std::string> cf_names = {
657         kDefaultColumnFamilyName, kColumnFamilyName1, kColumnFamilyName2,
658         kColumnFamilyName3};
659     const int kInitialNumOfCfs = static_cast<int>(cf_names.size());
660     autovector<VersionEdit> new_cfs;
661     uint64_t last_seq = 1;
662     uint32_t cf_id = 1;
663     for (int i = 1; i != kInitialNumOfCfs; ++i) {
664       VersionEdit new_cf;
665       new_cf.AddColumnFamily(cf_names[i]);
666       new_cf.SetColumnFamily(cf_id++);
667       new_cf.SetLogNumber(0);
668       new_cf.SetNextFile(2);
669       new_cf.SetLastSequence(last_seq++);
670       new_cfs.emplace_back(new_cf);
671     }
672     *last_seqno = last_seq;
673     num_initial_edits_ = static_cast<int>(new_cfs.size() + 1);
674     const std::string manifest = DescriptorFileName(dbname_, 1);
675     std::unique_ptr<WritableFile> file;
676     Status s = env_->NewWritableFile(
677         manifest, &file, env_->OptimizeForManifestWrite(env_options_));
678     ASSERT_OK(s);
679     std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
680         NewLegacyWritableFileWrapper(std::move(file)), manifest, env_options_));
681     {
682       log_writer->reset(new log::Writer(std::move(file_writer), 0, false));
683       std::string record;
684       new_db.EncodeTo(&record);
685       s = (*log_writer)->AddRecord(record);
686       for (const auto& e : new_cfs) {
687         record.clear();
688         e.EncodeTo(&record);
689         s = (*log_writer)->AddRecord(record);
690         ASSERT_OK(s);
691       }
692     }
693     ASSERT_OK(s);
694 
695     cf_options_.table_factory = mock_table_factory_;
696     for (const auto& cf_name : cf_names) {
697       column_families->emplace_back(cf_name, cf_options_);
698     }
699   }
700 
701   // Create DB with 3 column families.
NewDB()702   void NewDB() {
703     std::vector<ColumnFamilyDescriptor> column_families;
704     SequenceNumber last_seqno;
705     std::unique_ptr<log::Writer> log_writer;
706     SetIdentityFile(env_, dbname_);
707     PrepareManifest(&column_families, &last_seqno, &log_writer);
708     log_writer.reset();
709     // Make "CURRENT" file point to the new manifest file.
710     Status s = SetCurrentFile(env_, dbname_, 1, nullptr);
711     ASSERT_OK(s);
712 
713     EXPECT_OK(versions_->Recover(column_families, false));
714     EXPECT_EQ(column_families.size(),
715               versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
716   }
717 
718   Env* env_;
719   std::shared_ptr<FileSystem> fs_;
720   const std::string dbname_;
721   EnvOptions env_options_;
722   ImmutableDBOptions db_options_;
723   ColumnFamilyOptions cf_options_;
724   MutableCFOptions mutable_cf_options_;
725   std::shared_ptr<Cache> table_cache_;
726   WriteController write_controller_;
727   WriteBufferManager write_buffer_manager_;
728   std::shared_ptr<VersionSet> versions_;
729   std::shared_ptr<ReactiveVersionSet> reactive_versions_;
730   InstrumentedMutex mutex_;
731   std::atomic<bool> shutting_down_;
732   std::shared_ptr<mock::MockTableFactory> mock_table_factory_;
733 };
734 
735 const std::string VersionSetTestBase::kColumnFamilyName1 = "alice";
736 const std::string VersionSetTestBase::kColumnFamilyName2 = "bob";
737 const std::string VersionSetTestBase::kColumnFamilyName3 = "charles";
738 
739 class VersionSetTest : public VersionSetTestBase, public testing::Test {
740  public:
VersionSetTest()741   VersionSetTest() : VersionSetTestBase() {}
742 };
743 
TEST_F(VersionSetTest,SameColumnFamilyGroupCommit)744 TEST_F(VersionSetTest, SameColumnFamilyGroupCommit) {
745   NewDB();
746   const int kGroupSize = 5;
747   autovector<VersionEdit> edits;
748   for (int i = 0; i != kGroupSize; ++i) {
749     edits.emplace_back(VersionEdit());
750   }
751   autovector<ColumnFamilyData*> cfds;
752   autovector<const MutableCFOptions*> all_mutable_cf_options;
753   autovector<autovector<VersionEdit*>> edit_lists;
754   for (int i = 0; i != kGroupSize; ++i) {
755     cfds.emplace_back(versions_->GetColumnFamilySet()->GetDefault());
756     all_mutable_cf_options.emplace_back(&mutable_cf_options_);
757     autovector<VersionEdit*> edit_list;
758     edit_list.emplace_back(&edits[i]);
759     edit_lists.emplace_back(edit_list);
760   }
761 
762   SyncPoint::GetInstance()->DisableProcessing();
763   SyncPoint::GetInstance()->ClearAllCallBacks();
764   int count = 0;
765   SyncPoint::GetInstance()->SetCallBack(
766       "VersionSet::ProcessManifestWrites:SameColumnFamily", [&](void* arg) {
767         uint32_t* cf_id = reinterpret_cast<uint32_t*>(arg);
768         EXPECT_EQ(0u, *cf_id);
769         ++count;
770       });
771   SyncPoint::GetInstance()->EnableProcessing();
772   mutex_.Lock();
773   Status s =
774       versions_->LogAndApply(cfds, all_mutable_cf_options, edit_lists, &mutex_);
775   mutex_.Unlock();
776   EXPECT_OK(s);
777   EXPECT_EQ(kGroupSize - 1, count);
778 }
779 
780 class VersionSetAtomicGroupTest : public VersionSetTestBase,
781                                   public testing::Test {
782  public:
VersionSetAtomicGroupTest()783   VersionSetAtomicGroupTest() : VersionSetTestBase() {}
784 
SetUp()785   void SetUp() override {
786     PrepareManifest(&column_families_, &last_seqno_, &log_writer_);
787     SetupTestSyncPoints();
788   }
789 
SetupValidAtomicGroup(int atomic_group_size)790   void SetupValidAtomicGroup(int atomic_group_size) {
791     edits_.resize(atomic_group_size);
792     int remaining = atomic_group_size;
793     for (size_t i = 0; i != edits_.size(); ++i) {
794       edits_[i].SetLogNumber(0);
795       edits_[i].SetNextFile(2);
796       edits_[i].MarkAtomicGroup(--remaining);
797       edits_[i].SetLastSequence(last_seqno_++);
798     }
799     ASSERT_OK(SetCurrentFile(env_, dbname_, 1, nullptr));
800   }
801 
SetupIncompleteTrailingAtomicGroup(int atomic_group_size)802   void SetupIncompleteTrailingAtomicGroup(int atomic_group_size) {
803     edits_.resize(atomic_group_size);
804     int remaining = atomic_group_size;
805     for (size_t i = 0; i != edits_.size(); ++i) {
806       edits_[i].SetLogNumber(0);
807       edits_[i].SetNextFile(2);
808       edits_[i].MarkAtomicGroup(--remaining);
809       edits_[i].SetLastSequence(last_seqno_++);
810     }
811     ASSERT_OK(SetCurrentFile(env_, dbname_, 1, nullptr));
812   }
813 
SetupCorruptedAtomicGroup(int atomic_group_size)814   void SetupCorruptedAtomicGroup(int atomic_group_size) {
815     edits_.resize(atomic_group_size);
816     int remaining = atomic_group_size;
817     for (size_t i = 0; i != edits_.size(); ++i) {
818       edits_[i].SetLogNumber(0);
819       edits_[i].SetNextFile(2);
820       if (i != ((size_t)atomic_group_size / 2)) {
821         edits_[i].MarkAtomicGroup(--remaining);
822       }
823       edits_[i].SetLastSequence(last_seqno_++);
824     }
825     ASSERT_OK(SetCurrentFile(env_, dbname_, 1, nullptr));
826   }
827 
SetupIncorrectAtomicGroup(int atomic_group_size)828   void SetupIncorrectAtomicGroup(int atomic_group_size) {
829     edits_.resize(atomic_group_size);
830     int remaining = atomic_group_size;
831     for (size_t i = 0; i != edits_.size(); ++i) {
832       edits_[i].SetLogNumber(0);
833       edits_[i].SetNextFile(2);
834       if (i != 1) {
835         edits_[i].MarkAtomicGroup(--remaining);
836       } else {
837         edits_[i].MarkAtomicGroup(remaining--);
838       }
839       edits_[i].SetLastSequence(last_seqno_++);
840     }
841     ASSERT_OK(SetCurrentFile(env_, dbname_, 1, nullptr));
842   }
843 
SetupTestSyncPoints()844   void SetupTestSyncPoints() {
845     SyncPoint::GetInstance()->DisableProcessing();
846     SyncPoint::GetInstance()->ClearAllCallBacks();
847     SyncPoint::GetInstance()->SetCallBack(
848         "AtomicGroupReadBuffer::AddEdit:FirstInAtomicGroup", [&](void* arg) {
849           VersionEdit* e = reinterpret_cast<VersionEdit*>(arg);
850           EXPECT_EQ(edits_.front().DebugString(),
851                     e->DebugString());  // compare based on value
852           first_in_atomic_group_ = true;
853         });
854     SyncPoint::GetInstance()->SetCallBack(
855         "AtomicGroupReadBuffer::AddEdit:LastInAtomicGroup", [&](void* arg) {
856           VersionEdit* e = reinterpret_cast<VersionEdit*>(arg);
857           EXPECT_EQ(edits_.back().DebugString(),
858                     e->DebugString());  // compare based on value
859           EXPECT_TRUE(first_in_atomic_group_);
860           last_in_atomic_group_ = true;
861         });
862     SyncPoint::GetInstance()->SetCallBack(
863         "VersionSet::ReadAndRecover:RecoveredEdits", [&](void* arg) {
864           num_recovered_edits_ = *reinterpret_cast<int*>(arg);
865         });
866     SyncPoint::GetInstance()->SetCallBack(
867         "ReactiveVersionSet::ReadAndApply:AppliedEdits",
868         [&](void* arg) { num_applied_edits_ = *reinterpret_cast<int*>(arg); });
869     SyncPoint::GetInstance()->SetCallBack(
870         "AtomicGroupReadBuffer::AddEdit:AtomicGroup",
871         [&](void* /* arg */) { ++num_edits_in_atomic_group_; });
872     SyncPoint::GetInstance()->SetCallBack(
873         "AtomicGroupReadBuffer::AddEdit:AtomicGroupMixedWithNormalEdits",
874         [&](void* arg) {
875           corrupted_edit_ = *reinterpret_cast<VersionEdit*>(arg);
876         });
877     SyncPoint::GetInstance()->SetCallBack(
878         "AtomicGroupReadBuffer::AddEdit:IncorrectAtomicGroupSize",
879         [&](void* arg) {
880           edit_with_incorrect_group_size_ =
881               *reinterpret_cast<VersionEdit*>(arg);
882         });
883     SyncPoint::GetInstance()->EnableProcessing();
884   }
885 
AddNewEditsToLog(int num_edits)886   void AddNewEditsToLog(int num_edits) {
887     for (int i = 0; i < num_edits; i++) {
888       std::string record;
889       edits_[i].EncodeTo(&record);
890       ASSERT_OK(log_writer_->AddRecord(record));
891     }
892   }
893 
TearDown()894   void TearDown() override {
895     SyncPoint::GetInstance()->DisableProcessing();
896     SyncPoint::GetInstance()->ClearAllCallBacks();
897     log_writer_.reset();
898   }
899 
900  protected:
901   std::vector<ColumnFamilyDescriptor> column_families_;
902   SequenceNumber last_seqno_;
903   std::vector<VersionEdit> edits_;
904   bool first_in_atomic_group_ = false;
905   bool last_in_atomic_group_ = false;
906   int num_edits_in_atomic_group_ = 0;
907   int num_recovered_edits_ = 0;
908   int num_applied_edits_ = 0;
909   VersionEdit corrupted_edit_;
910   VersionEdit edit_with_incorrect_group_size_;
911   std::unique_ptr<log::Writer> log_writer_;
912 };
913 
TEST_F(VersionSetAtomicGroupTest,HandleValidAtomicGroupWithVersionSetRecover)914 TEST_F(VersionSetAtomicGroupTest, HandleValidAtomicGroupWithVersionSetRecover) {
915   const int kAtomicGroupSize = 3;
916   SetupValidAtomicGroup(kAtomicGroupSize);
917   AddNewEditsToLog(kAtomicGroupSize);
918   EXPECT_OK(versions_->Recover(column_families_, false));
919   EXPECT_EQ(column_families_.size(),
920             versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
921   EXPECT_TRUE(first_in_atomic_group_);
922   EXPECT_TRUE(last_in_atomic_group_);
923   EXPECT_EQ(num_initial_edits_ + kAtomicGroupSize, num_recovered_edits_);
924   EXPECT_EQ(0, num_applied_edits_);
925 }
926 
TEST_F(VersionSetAtomicGroupTest,HandleValidAtomicGroupWithReactiveVersionSetRecover)927 TEST_F(VersionSetAtomicGroupTest,
928        HandleValidAtomicGroupWithReactiveVersionSetRecover) {
929   const int kAtomicGroupSize = 3;
930   SetupValidAtomicGroup(kAtomicGroupSize);
931   AddNewEditsToLog(kAtomicGroupSize);
932   std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
933   std::unique_ptr<log::Reader::Reporter> manifest_reporter;
934   std::unique_ptr<Status> manifest_reader_status;
935   EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,
936                                         &manifest_reporter,
937                                         &manifest_reader_status));
938   EXPECT_EQ(column_families_.size(),
939             reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
940   EXPECT_TRUE(first_in_atomic_group_);
941   EXPECT_TRUE(last_in_atomic_group_);
942   // The recover should clean up the replay buffer.
943   EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() == 0);
944   EXPECT_TRUE(reactive_versions_->replay_buffer().size() == 0);
945   EXPECT_EQ(num_initial_edits_ + kAtomicGroupSize, num_recovered_edits_);
946   EXPECT_EQ(0, num_applied_edits_);
947 }
948 
TEST_F(VersionSetAtomicGroupTest,HandleValidAtomicGroupWithReactiveVersionSetReadAndApply)949 TEST_F(VersionSetAtomicGroupTest,
950        HandleValidAtomicGroupWithReactiveVersionSetReadAndApply) {
951   const int kAtomicGroupSize = 3;
952   SetupValidAtomicGroup(kAtomicGroupSize);
953   std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
954   std::unique_ptr<log::Reader::Reporter> manifest_reporter;
955   std::unique_ptr<Status> manifest_reader_status;
956   EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,
957                                         &manifest_reporter,
958                                         &manifest_reader_status));
959   AddNewEditsToLog(kAtomicGroupSize);
960   InstrumentedMutex mu;
961   std::unordered_set<ColumnFamilyData*> cfds_changed;
962   mu.Lock();
963   EXPECT_OK(
964       reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed));
965   mu.Unlock();
966   EXPECT_TRUE(first_in_atomic_group_);
967   EXPECT_TRUE(last_in_atomic_group_);
968   // The recover should clean up the replay buffer.
969   EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() == 0);
970   EXPECT_TRUE(reactive_versions_->replay_buffer().size() == 0);
971   EXPECT_EQ(num_initial_edits_, num_recovered_edits_);
972   EXPECT_EQ(kAtomicGroupSize, num_applied_edits_);
973 }
974 
TEST_F(VersionSetAtomicGroupTest,HandleIncompleteTrailingAtomicGroupWithVersionSetRecover)975 TEST_F(VersionSetAtomicGroupTest,
976        HandleIncompleteTrailingAtomicGroupWithVersionSetRecover) {
977   const int kAtomicGroupSize = 4;
978   const int kNumberOfPersistedVersionEdits = kAtomicGroupSize - 1;
979   SetupIncompleteTrailingAtomicGroup(kAtomicGroupSize);
980   AddNewEditsToLog(kNumberOfPersistedVersionEdits);
981   EXPECT_OK(versions_->Recover(column_families_, false));
982   EXPECT_EQ(column_families_.size(),
983             versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
984   EXPECT_TRUE(first_in_atomic_group_);
985   EXPECT_FALSE(last_in_atomic_group_);
986   EXPECT_EQ(kNumberOfPersistedVersionEdits, num_edits_in_atomic_group_);
987   EXPECT_EQ(num_initial_edits_, num_recovered_edits_);
988   EXPECT_EQ(0, num_applied_edits_);
989 }
990 
TEST_F(VersionSetAtomicGroupTest,HandleIncompleteTrailingAtomicGroupWithReactiveVersionSetRecover)991 TEST_F(VersionSetAtomicGroupTest,
992        HandleIncompleteTrailingAtomicGroupWithReactiveVersionSetRecover) {
993   const int kAtomicGroupSize = 4;
994   const int kNumberOfPersistedVersionEdits = kAtomicGroupSize - 1;
995   SetupIncompleteTrailingAtomicGroup(kAtomicGroupSize);
996   AddNewEditsToLog(kNumberOfPersistedVersionEdits);
997   std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
998   std::unique_ptr<log::Reader::Reporter> manifest_reporter;
999   std::unique_ptr<Status> manifest_reader_status;
1000   EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,
1001                                         &manifest_reporter,
1002                                         &manifest_reader_status));
1003   EXPECT_EQ(column_families_.size(),
1004             reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
1005   EXPECT_TRUE(first_in_atomic_group_);
1006   EXPECT_FALSE(last_in_atomic_group_);
1007   EXPECT_EQ(kNumberOfPersistedVersionEdits, num_edits_in_atomic_group_);
1008   // Reactive version set should store the edits in the replay buffer.
1009   EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() ==
1010               kNumberOfPersistedVersionEdits);
1011   EXPECT_TRUE(reactive_versions_->replay_buffer().size() == kAtomicGroupSize);
1012   // Write the last record. The reactive version set should now apply all
1013   // edits.
1014   std::string last_record;
1015   edits_[kAtomicGroupSize - 1].EncodeTo(&last_record);
1016   EXPECT_OK(log_writer_->AddRecord(last_record));
1017   InstrumentedMutex mu;
1018   std::unordered_set<ColumnFamilyData*> cfds_changed;
1019   mu.Lock();
1020   EXPECT_OK(
1021       reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed));
1022   mu.Unlock();
1023   // Reactive version set should be empty now.
1024   EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() == 0);
1025   EXPECT_TRUE(reactive_versions_->replay_buffer().size() == 0);
1026   EXPECT_EQ(num_initial_edits_, num_recovered_edits_);
1027   EXPECT_EQ(kAtomicGroupSize, num_applied_edits_);
1028 }
1029 
TEST_F(VersionSetAtomicGroupTest,HandleIncompleteTrailingAtomicGroupWithReactiveVersionSetReadAndApply)1030 TEST_F(VersionSetAtomicGroupTest,
1031        HandleIncompleteTrailingAtomicGroupWithReactiveVersionSetReadAndApply) {
1032   const int kAtomicGroupSize = 4;
1033   const int kNumberOfPersistedVersionEdits = kAtomicGroupSize - 1;
1034   SetupIncompleteTrailingAtomicGroup(kAtomicGroupSize);
1035   std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
1036   std::unique_ptr<log::Reader::Reporter> manifest_reporter;
1037   std::unique_ptr<Status> manifest_reader_status;
1038   // No edits in an atomic group.
1039   EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,
1040                                         &manifest_reporter,
1041                                         &manifest_reader_status));
1042   EXPECT_EQ(column_families_.size(),
1043             reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
1044   // Write a few edits in an atomic group.
1045   AddNewEditsToLog(kNumberOfPersistedVersionEdits);
1046   InstrumentedMutex mu;
1047   std::unordered_set<ColumnFamilyData*> cfds_changed;
1048   mu.Lock();
1049   EXPECT_OK(
1050       reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed));
1051   mu.Unlock();
1052   EXPECT_TRUE(first_in_atomic_group_);
1053   EXPECT_FALSE(last_in_atomic_group_);
1054   EXPECT_EQ(kNumberOfPersistedVersionEdits, num_edits_in_atomic_group_);
1055   // Reactive version set should store the edits in the replay buffer.
1056   EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() ==
1057               kNumberOfPersistedVersionEdits);
1058   EXPECT_TRUE(reactive_versions_->replay_buffer().size() == kAtomicGroupSize);
1059   EXPECT_EQ(num_initial_edits_, num_recovered_edits_);
1060   EXPECT_EQ(0, num_applied_edits_);
1061 }
1062 
TEST_F(VersionSetAtomicGroupTest,HandleCorruptedAtomicGroupWithVersionSetRecover)1063 TEST_F(VersionSetAtomicGroupTest,
1064        HandleCorruptedAtomicGroupWithVersionSetRecover) {
1065   const int kAtomicGroupSize = 4;
1066   SetupCorruptedAtomicGroup(kAtomicGroupSize);
1067   AddNewEditsToLog(kAtomicGroupSize);
1068   EXPECT_NOK(versions_->Recover(column_families_, false));
1069   EXPECT_EQ(column_families_.size(),
1070             versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
1071   EXPECT_EQ(edits_[kAtomicGroupSize / 2].DebugString(),
1072             corrupted_edit_.DebugString());
1073 }
1074 
TEST_F(VersionSetAtomicGroupTest,HandleCorruptedAtomicGroupWithReactiveVersionSetRecover)1075 TEST_F(VersionSetAtomicGroupTest,
1076        HandleCorruptedAtomicGroupWithReactiveVersionSetRecover) {
1077   const int kAtomicGroupSize = 4;
1078   SetupCorruptedAtomicGroup(kAtomicGroupSize);
1079   AddNewEditsToLog(kAtomicGroupSize);
1080   std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
1081   std::unique_ptr<log::Reader::Reporter> manifest_reporter;
1082   std::unique_ptr<Status> manifest_reader_status;
1083   EXPECT_NOK(reactive_versions_->Recover(column_families_, &manifest_reader,
1084                                          &manifest_reporter,
1085                                          &manifest_reader_status));
1086   EXPECT_EQ(column_families_.size(),
1087             reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
1088   EXPECT_EQ(edits_[kAtomicGroupSize / 2].DebugString(),
1089             corrupted_edit_.DebugString());
1090 }
1091 
TEST_F(VersionSetAtomicGroupTest,HandleCorruptedAtomicGroupWithReactiveVersionSetReadAndApply)1092 TEST_F(VersionSetAtomicGroupTest,
1093        HandleCorruptedAtomicGroupWithReactiveVersionSetReadAndApply) {
1094   const int kAtomicGroupSize = 4;
1095   SetupCorruptedAtomicGroup(kAtomicGroupSize);
1096   InstrumentedMutex mu;
1097   std::unordered_set<ColumnFamilyData*> cfds_changed;
1098   std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
1099   std::unique_ptr<log::Reader::Reporter> manifest_reporter;
1100   std::unique_ptr<Status> manifest_reader_status;
1101   EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,
1102                                         &manifest_reporter,
1103                                         &manifest_reader_status));
1104   // Write the corrupted edits.
1105   AddNewEditsToLog(kAtomicGroupSize);
1106   mu.Lock();
1107   EXPECT_OK(
1108       reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed));
1109   mu.Unlock();
1110   EXPECT_EQ(edits_[kAtomicGroupSize / 2].DebugString(),
1111             corrupted_edit_.DebugString());
1112 }
1113 
TEST_F(VersionSetAtomicGroupTest,HandleIncorrectAtomicGroupSizeWithVersionSetRecover)1114 TEST_F(VersionSetAtomicGroupTest,
1115        HandleIncorrectAtomicGroupSizeWithVersionSetRecover) {
1116   const int kAtomicGroupSize = 4;
1117   SetupIncorrectAtomicGroup(kAtomicGroupSize);
1118   AddNewEditsToLog(kAtomicGroupSize);
1119   EXPECT_NOK(versions_->Recover(column_families_, false));
1120   EXPECT_EQ(column_families_.size(),
1121             versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
1122   EXPECT_EQ(edits_[1].DebugString(),
1123             edit_with_incorrect_group_size_.DebugString());
1124 }
1125 
TEST_F(VersionSetAtomicGroupTest,HandleIncorrectAtomicGroupSizeWithReactiveVersionSetRecover)1126 TEST_F(VersionSetAtomicGroupTest,
1127        HandleIncorrectAtomicGroupSizeWithReactiveVersionSetRecover) {
1128   const int kAtomicGroupSize = 4;
1129   SetupIncorrectAtomicGroup(kAtomicGroupSize);
1130   AddNewEditsToLog(kAtomicGroupSize);
1131   std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
1132   std::unique_ptr<log::Reader::Reporter> manifest_reporter;
1133   std::unique_ptr<Status> manifest_reader_status;
1134   EXPECT_NOK(reactive_versions_->Recover(column_families_, &manifest_reader,
1135                                          &manifest_reporter,
1136                                          &manifest_reader_status));
1137   EXPECT_EQ(column_families_.size(),
1138             reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
1139   EXPECT_EQ(edits_[1].DebugString(),
1140             edit_with_incorrect_group_size_.DebugString());
1141 }
1142 
TEST_F(VersionSetAtomicGroupTest,HandleIncorrectAtomicGroupSizeWithReactiveVersionSetReadAndApply)1143 TEST_F(VersionSetAtomicGroupTest,
1144        HandleIncorrectAtomicGroupSizeWithReactiveVersionSetReadAndApply) {
1145   const int kAtomicGroupSize = 4;
1146   SetupIncorrectAtomicGroup(kAtomicGroupSize);
1147   InstrumentedMutex mu;
1148   std::unordered_set<ColumnFamilyData*> cfds_changed;
1149   std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
1150   std::unique_ptr<log::Reader::Reporter> manifest_reporter;
1151   std::unique_ptr<Status> manifest_reader_status;
1152   EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,
1153                                         &manifest_reporter,
1154                                         &manifest_reader_status));
1155   AddNewEditsToLog(kAtomicGroupSize);
1156   mu.Lock();
1157   EXPECT_OK(
1158       reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed));
1159   mu.Unlock();
1160   EXPECT_EQ(edits_[1].DebugString(),
1161             edit_with_incorrect_group_size_.DebugString());
1162 }
1163 
1164 class VersionSetTestDropOneCF : public VersionSetTestBase,
1165                                 public testing::TestWithParam<std::string> {
1166  public:
VersionSetTestDropOneCF()1167   VersionSetTestDropOneCF() : VersionSetTestBase() {}
1168 };
1169 
1170 // This test simulates the following execution sequence
1171 // Time  thread1                  bg_flush_thr
1172 //  |                             Prepare version edits (e1,e2,e3) for atomic
1173 //  |                             flush cf1, cf2, cf3
1174 //  |    Enqueue e to drop cfi
1175 //  |    to manifest_writers_
1176 //  |                             Enqueue (e1,e2,e3) to manifest_writers_
1177 //  |
1178 //  |    Apply e,
1179 //  |    cfi.IsDropped() is true
1180 //  |                             Apply (e1,e2,e3),
1181 //  |                             since cfi.IsDropped() == true, we need to
1182 //  |                             drop ei and write the rest to MANIFEST.
1183 //  V
1184 //
1185 //  Repeat the test for i = 1, 2, 3 to simulate dropping the first, middle and
1186 //  last column family in an atomic group.
TEST_P(VersionSetTestDropOneCF,HandleDroppedColumnFamilyInAtomicGroup)1187 TEST_P(VersionSetTestDropOneCF, HandleDroppedColumnFamilyInAtomicGroup) {
1188   std::vector<ColumnFamilyDescriptor> column_families;
1189   SequenceNumber last_seqno;
1190   std::unique_ptr<log::Writer> log_writer;
1191   PrepareManifest(&column_families, &last_seqno, &log_writer);
1192   Status s = SetCurrentFile(env_, dbname_, 1, nullptr);
1193   ASSERT_OK(s);
1194 
1195   EXPECT_OK(versions_->Recover(column_families, false /* read_only */));
1196   EXPECT_EQ(column_families.size(),
1197             versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
1198 
1199   const int kAtomicGroupSize = 3;
1200   const std::vector<std::string> non_default_cf_names = {
1201       kColumnFamilyName1, kColumnFamilyName2, kColumnFamilyName3};
1202 
1203   // Drop one column family
1204   VersionEdit drop_cf_edit;
1205   drop_cf_edit.DropColumnFamily();
1206   const std::string cf_to_drop_name(GetParam());
1207   auto cfd_to_drop =
1208       versions_->GetColumnFamilySet()->GetColumnFamily(cf_to_drop_name);
1209   ASSERT_NE(nullptr, cfd_to_drop);
1210   // Increase its refcount because cfd_to_drop is used later, and we need to
1211   // prevent it from being deleted.
1212   cfd_to_drop->Ref();
1213   drop_cf_edit.SetColumnFamily(cfd_to_drop->GetID());
1214   mutex_.Lock();
1215   s = versions_->LogAndApply(cfd_to_drop,
1216                              *cfd_to_drop->GetLatestMutableCFOptions(),
1217                              &drop_cf_edit, &mutex_);
1218   mutex_.Unlock();
1219   ASSERT_OK(s);
1220 
1221   std::vector<VersionEdit> edits(kAtomicGroupSize);
1222   uint32_t remaining = kAtomicGroupSize;
1223   size_t i = 0;
1224   autovector<ColumnFamilyData*> cfds;
1225   autovector<const MutableCFOptions*> mutable_cf_options_list;
1226   autovector<autovector<VersionEdit*>> edit_lists;
1227   for (const auto& cf_name : non_default_cf_names) {
1228     auto cfd = (cf_name != cf_to_drop_name)
1229                    ? versions_->GetColumnFamilySet()->GetColumnFamily(cf_name)
1230                    : cfd_to_drop;
1231     ASSERT_NE(nullptr, cfd);
1232     cfds.push_back(cfd);
1233     mutable_cf_options_list.emplace_back(cfd->GetLatestMutableCFOptions());
1234     edits[i].SetColumnFamily(cfd->GetID());
1235     edits[i].SetLogNumber(0);
1236     edits[i].SetNextFile(2);
1237     edits[i].MarkAtomicGroup(--remaining);
1238     edits[i].SetLastSequence(last_seqno++);
1239     autovector<VersionEdit*> tmp_edits;
1240     tmp_edits.push_back(&edits[i]);
1241     edit_lists.emplace_back(tmp_edits);
1242     ++i;
1243   }
1244   int called = 0;
1245   SyncPoint::GetInstance()->DisableProcessing();
1246   SyncPoint::GetInstance()->ClearAllCallBacks();
1247   SyncPoint::GetInstance()->SetCallBack(
1248       "VersionSet::ProcessManifestWrites:CheckOneAtomicGroup", [&](void* arg) {
1249         std::vector<VersionEdit*>* tmp_edits =
1250             reinterpret_cast<std::vector<VersionEdit*>*>(arg);
1251         EXPECT_EQ(kAtomicGroupSize - 1, tmp_edits->size());
1252         for (const auto e : *tmp_edits) {
1253           bool found = false;
1254           for (const auto& e2 : edits) {
1255             if (&e2 == e) {
1256               found = true;
1257               break;
1258             }
1259           }
1260           ASSERT_TRUE(found);
1261         }
1262         ++called;
1263       });
1264   SyncPoint::GetInstance()->EnableProcessing();
1265   mutex_.Lock();
1266   s = versions_->LogAndApply(cfds, mutable_cf_options_list, edit_lists,
1267                              &mutex_);
1268   mutex_.Unlock();
1269   ASSERT_OK(s);
1270   ASSERT_EQ(1, called);
1271   if (cfd_to_drop->Unref()) {
1272     delete cfd_to_drop;
1273     cfd_to_drop = nullptr;
1274   }
1275 }
1276 
1277 INSTANTIATE_TEST_CASE_P(
1278     AtomicGroup, VersionSetTestDropOneCF,
1279     testing::Values(VersionSetTestBase::kColumnFamilyName1,
1280                     VersionSetTestBase::kColumnFamilyName2,
1281                     VersionSetTestBase::kColumnFamilyName3));
1282 }  // namespace ROCKSDB_NAMESPACE
1283 
main(int argc,char ** argv)1284 int main(int argc, char** argv) {
1285   ::testing::InitGoogleTest(&argc, argv);
1286   return RUN_ALL_TESTS();
1287 }
1288