1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "db/version_set.h"
11 #include "db/db_impl/db_impl.h"
12 #include "db/log_writer.h"
13 #include "logging/logging.h"
14 #include "table/mock_table.h"
15 #include "test_util/testharness.h"
16 #include "test_util/testutil.h"
17 #include "util/string_util.h"
18
19 namespace ROCKSDB_NAMESPACE {
20
21 class GenerateLevelFilesBriefTest : public testing::Test {
22 public:
23 std::vector<FileMetaData*> files_;
24 LevelFilesBrief file_level_;
25 Arena arena_;
26
GenerateLevelFilesBriefTest()27 GenerateLevelFilesBriefTest() { }
28
~GenerateLevelFilesBriefTest()29 ~GenerateLevelFilesBriefTest() override {
30 for (size_t i = 0; i < files_.size(); i++) {
31 delete files_[i];
32 }
33 }
34
Add(const char * smallest,const char * largest,SequenceNumber smallest_seq=100,SequenceNumber largest_seq=100)35 void Add(const char* smallest, const char* largest,
36 SequenceNumber smallest_seq = 100,
37 SequenceNumber largest_seq = 100) {
38 FileMetaData* f = new FileMetaData(
39 files_.size() + 1, 0, 0,
40 InternalKey(smallest, smallest_seq, kTypeValue),
41 InternalKey(largest, largest_seq, kTypeValue), smallest_seq,
42 largest_seq, /* marked_for_compact */ false, kInvalidBlobFileNumber,
43 kUnknownOldestAncesterTime, kUnknownFileCreationTime,
44 kUnknownFileChecksum, kUnknownFileChecksumFuncName);
45 files_.push_back(f);
46 }
47
Compare()48 int Compare() {
49 int diff = 0;
50 for (size_t i = 0; i < files_.size(); i++) {
51 if (file_level_.files[i].fd.GetNumber() != files_[i]->fd.GetNumber()) {
52 diff++;
53 }
54 }
55 return diff;
56 }
57 };
58
TEST_F(GenerateLevelFilesBriefTest,Empty)59 TEST_F(GenerateLevelFilesBriefTest, Empty) {
60 DoGenerateLevelFilesBrief(&file_level_, files_, &arena_);
61 ASSERT_EQ(0u, file_level_.num_files);
62 ASSERT_EQ(0, Compare());
63 }
64
TEST_F(GenerateLevelFilesBriefTest,Single)65 TEST_F(GenerateLevelFilesBriefTest, Single) {
66 Add("p", "q");
67 DoGenerateLevelFilesBrief(&file_level_, files_, &arena_);
68 ASSERT_EQ(1u, file_level_.num_files);
69 ASSERT_EQ(0, Compare());
70 }
71
TEST_F(GenerateLevelFilesBriefTest,Multiple)72 TEST_F(GenerateLevelFilesBriefTest, Multiple) {
73 Add("150", "200");
74 Add("200", "250");
75 Add("300", "350");
76 Add("400", "450");
77 DoGenerateLevelFilesBrief(&file_level_, files_, &arena_);
78 ASSERT_EQ(4u, file_level_.num_files);
79 ASSERT_EQ(0, Compare());
80 }
81
82 class CountingLogger : public Logger {
83 public:
CountingLogger()84 CountingLogger() : log_count(0) {}
85 using Logger::Logv;
Logv(const char *,va_list)86 void Logv(const char* /*format*/, va_list /*ap*/) override { log_count++; }
87 int log_count;
88 };
89
GetOptionsWithNumLevels(int num_levels,std::shared_ptr<CountingLogger> logger)90 Options GetOptionsWithNumLevels(int num_levels,
91 std::shared_ptr<CountingLogger> logger) {
92 Options opt;
93 opt.num_levels = num_levels;
94 opt.info_log = logger;
95 return opt;
96 }
97
98 class VersionStorageInfoTest : public testing::Test {
99 public:
100 const Comparator* ucmp_;
101 InternalKeyComparator icmp_;
102 std::shared_ptr<CountingLogger> logger_;
103 Options options_;
104 ImmutableCFOptions ioptions_;
105 MutableCFOptions mutable_cf_options_;
106 VersionStorageInfo vstorage_;
107
GetInternalKey(const char * ukey,SequenceNumber smallest_seq=100)108 InternalKey GetInternalKey(const char* ukey,
109 SequenceNumber smallest_seq = 100) {
110 return InternalKey(ukey, smallest_seq, kTypeValue);
111 }
112
VersionStorageInfoTest()113 VersionStorageInfoTest()
114 : ucmp_(BytewiseComparator()),
115 icmp_(ucmp_),
116 logger_(new CountingLogger()),
117 options_(GetOptionsWithNumLevels(6, logger_)),
118 ioptions_(options_),
119 mutable_cf_options_(options_),
120 vstorage_(&icmp_, ucmp_, 6, kCompactionStyleLevel, nullptr, false) {}
121
~VersionStorageInfoTest()122 ~VersionStorageInfoTest() override {
123 for (int i = 0; i < vstorage_.num_levels(); i++) {
124 for (auto* f : vstorage_.LevelFiles(i)) {
125 if (--f->refs == 0) {
126 delete f;
127 }
128 }
129 }
130 }
131
Add(int level,uint32_t file_number,const char * smallest,const char * largest,uint64_t file_size=0)132 void Add(int level, uint32_t file_number, const char* smallest,
133 const char* largest, uint64_t file_size = 0) {
134 assert(level < vstorage_.num_levels());
135 FileMetaData* f = new FileMetaData(
136 file_number, 0, file_size, GetInternalKey(smallest, 0),
137 GetInternalKey(largest, 0), /* smallest_seq */ 0, /* largest_seq */ 0,
138 /* marked_for_compact */ false, kInvalidBlobFileNumber,
139 kUnknownOldestAncesterTime, kUnknownFileCreationTime,
140 kUnknownFileChecksum, kUnknownFileChecksumFuncName);
141 f->compensated_file_size = file_size;
142 vstorage_.AddFile(level, f);
143 }
144
Add(int level,uint32_t file_number,const InternalKey & smallest,const InternalKey & largest,uint64_t file_size=0)145 void Add(int level, uint32_t file_number, const InternalKey& smallest,
146 const InternalKey& largest, uint64_t file_size = 0) {
147 assert(level < vstorage_.num_levels());
148 FileMetaData* f = new FileMetaData(
149 file_number, 0, file_size, smallest, largest, /* smallest_seq */ 0,
150 /* largest_seq */ 0, /* marked_for_compact */ false,
151 kInvalidBlobFileNumber, kUnknownOldestAncesterTime,
152 kUnknownFileCreationTime, kUnknownFileChecksum,
153 kUnknownFileChecksumFuncName);
154 f->compensated_file_size = file_size;
155 vstorage_.AddFile(level, f);
156 }
157
GetOverlappingFiles(int level,const InternalKey & begin,const InternalKey & end)158 std::string GetOverlappingFiles(int level, const InternalKey& begin,
159 const InternalKey& end) {
160 std::vector<FileMetaData*> inputs;
161 vstorage_.GetOverlappingInputs(level, &begin, &end, &inputs);
162
163 std::string result;
164 for (size_t i = 0; i < inputs.size(); ++i) {
165 if (i > 0) {
166 result += ",";
167 }
168 AppendNumberTo(&result, inputs[i]->fd.GetNumber());
169 }
170 return result;
171 }
172 };
173
TEST_F(VersionStorageInfoTest,MaxBytesForLevelStatic)174 TEST_F(VersionStorageInfoTest, MaxBytesForLevelStatic) {
175 ioptions_.level_compaction_dynamic_level_bytes = false;
176 mutable_cf_options_.max_bytes_for_level_base = 10;
177 mutable_cf_options_.max_bytes_for_level_multiplier = 5;
178 Add(4, 100U, "1", "2");
179 Add(5, 101U, "1", "2");
180
181 vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
182 ASSERT_EQ(vstorage_.MaxBytesForLevel(1), 10U);
183 ASSERT_EQ(vstorage_.MaxBytesForLevel(2), 50U);
184 ASSERT_EQ(vstorage_.MaxBytesForLevel(3), 250U);
185 ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 1250U);
186
187 ASSERT_EQ(0, logger_->log_count);
188 }
189
TEST_F(VersionStorageInfoTest,MaxBytesForLevelDynamic)190 TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamic) {
191 ioptions_.level_compaction_dynamic_level_bytes = true;
192 mutable_cf_options_.max_bytes_for_level_base = 1000;
193 mutable_cf_options_.max_bytes_for_level_multiplier = 5;
194 Add(5, 1U, "1", "2", 500U);
195
196 vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
197 ASSERT_EQ(0, logger_->log_count);
198 ASSERT_EQ(vstorage_.base_level(), 5);
199
200 Add(5, 2U, "3", "4", 550U);
201 vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
202 ASSERT_EQ(0, logger_->log_count);
203 ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 1000U);
204 ASSERT_EQ(vstorage_.base_level(), 4);
205
206 Add(4, 3U, "3", "4", 550U);
207 vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
208 ASSERT_EQ(0, logger_->log_count);
209 ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 1000U);
210 ASSERT_EQ(vstorage_.base_level(), 4);
211
212 Add(3, 4U, "3", "4", 250U);
213 Add(3, 5U, "5", "7", 300U);
214 vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
215 ASSERT_EQ(1, logger_->log_count);
216 ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 1005U);
217 ASSERT_EQ(vstorage_.MaxBytesForLevel(3), 1000U);
218 ASSERT_EQ(vstorage_.base_level(), 3);
219
220 Add(1, 6U, "3", "4", 5U);
221 Add(1, 7U, "8", "9", 5U);
222 logger_->log_count = 0;
223 vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
224 ASSERT_EQ(1, logger_->log_count);
225 ASSERT_GT(vstorage_.MaxBytesForLevel(4), 1005U);
226 ASSERT_GT(vstorage_.MaxBytesForLevel(3), 1005U);
227 ASSERT_EQ(vstorage_.MaxBytesForLevel(2), 1005U);
228 ASSERT_EQ(vstorage_.MaxBytesForLevel(1), 1000U);
229 ASSERT_EQ(vstorage_.base_level(), 1);
230 }
231
TEST_F(VersionStorageInfoTest,MaxBytesForLevelDynamicLotsOfData)232 TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicLotsOfData) {
233 ioptions_.level_compaction_dynamic_level_bytes = true;
234 mutable_cf_options_.max_bytes_for_level_base = 100;
235 mutable_cf_options_.max_bytes_for_level_multiplier = 2;
236 Add(0, 1U, "1", "2", 50U);
237 Add(1, 2U, "1", "2", 50U);
238 Add(2, 3U, "1", "2", 500U);
239 Add(3, 4U, "1", "2", 500U);
240 Add(4, 5U, "1", "2", 1700U);
241 Add(5, 6U, "1", "2", 500U);
242
243 vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
244 ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 800U);
245 ASSERT_EQ(vstorage_.MaxBytesForLevel(3), 400U);
246 ASSERT_EQ(vstorage_.MaxBytesForLevel(2), 200U);
247 ASSERT_EQ(vstorage_.MaxBytesForLevel(1), 100U);
248 ASSERT_EQ(vstorage_.base_level(), 1);
249 ASSERT_EQ(0, logger_->log_count);
250 }
251
TEST_F(VersionStorageInfoTest,MaxBytesForLevelDynamicLargeLevel)252 TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicLargeLevel) {
253 uint64_t kOneGB = 1000U * 1000U * 1000U;
254 ioptions_.level_compaction_dynamic_level_bytes = true;
255 mutable_cf_options_.max_bytes_for_level_base = 10U * kOneGB;
256 mutable_cf_options_.max_bytes_for_level_multiplier = 10;
257 Add(0, 1U, "1", "2", 50U);
258 Add(3, 4U, "1", "2", 32U * kOneGB);
259 Add(4, 5U, "1", "2", 500U * kOneGB);
260 Add(5, 6U, "1", "2", 3000U * kOneGB);
261
262 vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
263 ASSERT_EQ(vstorage_.MaxBytesForLevel(5), 3000U * kOneGB);
264 ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 300U * kOneGB);
265 ASSERT_EQ(vstorage_.MaxBytesForLevel(3), 30U * kOneGB);
266 ASSERT_EQ(vstorage_.MaxBytesForLevel(2), 10U * kOneGB);
267 ASSERT_EQ(vstorage_.base_level(), 2);
268 ASSERT_EQ(0, logger_->log_count);
269 }
270
TEST_F(VersionStorageInfoTest,MaxBytesForLevelDynamicWithLargeL0_1)271 TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicWithLargeL0_1) {
272 ioptions_.level_compaction_dynamic_level_bytes = true;
273 mutable_cf_options_.max_bytes_for_level_base = 40000;
274 mutable_cf_options_.max_bytes_for_level_multiplier = 5;
275 mutable_cf_options_.level0_file_num_compaction_trigger = 2;
276
277 Add(0, 1U, "1", "2", 10000U);
278 Add(0, 2U, "1", "2", 10000U);
279 Add(0, 3U, "1", "2", 10000U);
280
281 Add(5, 4U, "1", "2", 1286250U);
282 Add(4, 5U, "1", "2", 200000U);
283 Add(3, 6U, "1", "2", 40000U);
284 Add(2, 7U, "1", "2", 8000U);
285
286 vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
287 ASSERT_EQ(0, logger_->log_count);
288 ASSERT_EQ(2, vstorage_.base_level());
289 // level multiplier should be 3.5
290 ASSERT_EQ(vstorage_.level_multiplier(), 5.0);
291 // Level size should be around 30,000, 105,000, 367,500
292 ASSERT_EQ(40000U, vstorage_.MaxBytesForLevel(2));
293 ASSERT_EQ(51450U, vstorage_.MaxBytesForLevel(3));
294 ASSERT_EQ(257250U, vstorage_.MaxBytesForLevel(4));
295 }
296
TEST_F(VersionStorageInfoTest,MaxBytesForLevelDynamicWithLargeL0_2)297 TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicWithLargeL0_2) {
298 ioptions_.level_compaction_dynamic_level_bytes = true;
299 mutable_cf_options_.max_bytes_for_level_base = 10000;
300 mutable_cf_options_.max_bytes_for_level_multiplier = 5;
301 mutable_cf_options_.level0_file_num_compaction_trigger = 2;
302
303 Add(0, 11U, "1", "2", 10000U);
304 Add(0, 12U, "1", "2", 10000U);
305 Add(0, 13U, "1", "2", 10000U);
306
307 Add(5, 4U, "1", "2", 1286250U);
308 Add(4, 5U, "1", "2", 200000U);
309 Add(3, 6U, "1", "2", 40000U);
310 Add(2, 7U, "1", "2", 8000U);
311
312 vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
313 ASSERT_EQ(0, logger_->log_count);
314 ASSERT_EQ(2, vstorage_.base_level());
315 // level multiplier should be 3.5
316 ASSERT_LT(vstorage_.level_multiplier(), 3.6);
317 ASSERT_GT(vstorage_.level_multiplier(), 3.4);
318 // Level size should be around 30,000, 105,000, 367,500
319 ASSERT_EQ(30000U, vstorage_.MaxBytesForLevel(2));
320 ASSERT_LT(vstorage_.MaxBytesForLevel(3), 110000U);
321 ASSERT_GT(vstorage_.MaxBytesForLevel(3), 100000U);
322 ASSERT_LT(vstorage_.MaxBytesForLevel(4), 370000U);
323 ASSERT_GT(vstorage_.MaxBytesForLevel(4), 360000U);
324 }
325
TEST_F(VersionStorageInfoTest,MaxBytesForLevelDynamicWithLargeL0_3)326 TEST_F(VersionStorageInfoTest, MaxBytesForLevelDynamicWithLargeL0_3) {
327 ioptions_.level_compaction_dynamic_level_bytes = true;
328 mutable_cf_options_.max_bytes_for_level_base = 10000;
329 mutable_cf_options_.max_bytes_for_level_multiplier = 5;
330 mutable_cf_options_.level0_file_num_compaction_trigger = 2;
331
332 Add(0, 11U, "1", "2", 5000U);
333 Add(0, 12U, "1", "2", 5000U);
334 Add(0, 13U, "1", "2", 5000U);
335 Add(0, 14U, "1", "2", 5000U);
336 Add(0, 15U, "1", "2", 5000U);
337 Add(0, 16U, "1", "2", 5000U);
338
339 Add(5, 4U, "1", "2", 1286250U);
340 Add(4, 5U, "1", "2", 200000U);
341 Add(3, 6U, "1", "2", 40000U);
342 Add(2, 7U, "1", "2", 8000U);
343
344 vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
345 ASSERT_EQ(0, logger_->log_count);
346 ASSERT_EQ(2, vstorage_.base_level());
347 // level multiplier should be 3.5
348 ASSERT_LT(vstorage_.level_multiplier(), 3.6);
349 ASSERT_GT(vstorage_.level_multiplier(), 3.4);
350 // Level size should be around 30,000, 105,000, 367,500
351 ASSERT_EQ(30000U, vstorage_.MaxBytesForLevel(2));
352 ASSERT_LT(vstorage_.MaxBytesForLevel(3), 110000U);
353 ASSERT_GT(vstorage_.MaxBytesForLevel(3), 100000U);
354 ASSERT_LT(vstorage_.MaxBytesForLevel(4), 370000U);
355 ASSERT_GT(vstorage_.MaxBytesForLevel(4), 360000U);
356 }
357
TEST_F(VersionStorageInfoTest,EstimateLiveDataSize)358 TEST_F(VersionStorageInfoTest, EstimateLiveDataSize) {
359 // Test whether the overlaps are detected as expected
360 Add(1, 1U, "4", "7", 1U); // Perfect overlap with last level
361 Add(2, 2U, "3", "5", 1U); // Partial overlap with last level
362 Add(2, 3U, "6", "8", 1U); // Partial overlap with last level
363 Add(3, 4U, "1", "9", 1U); // Contains range of last level
364 Add(4, 5U, "4", "5", 1U); // Inside range of last level
365 Add(4, 5U, "6", "7", 1U); // Inside range of last level
366 Add(5, 6U, "4", "7", 10U);
367 ASSERT_EQ(10U, vstorage_.EstimateLiveDataSize());
368 }
369
TEST_F(VersionStorageInfoTest,EstimateLiveDataSize2)370 TEST_F(VersionStorageInfoTest, EstimateLiveDataSize2) {
371 Add(0, 1U, "9", "9", 1U); // Level 0 is not ordered
372 Add(0, 1U, "5", "6", 1U); // Ignored because of [5,6] in l1
373 Add(1, 1U, "1", "2", 1U); // Ignored because of [2,3] in l2
374 Add(1, 2U, "3", "4", 1U); // Ignored because of [2,3] in l2
375 Add(1, 3U, "5", "6", 1U);
376 Add(2, 4U, "2", "3", 1U);
377 Add(3, 5U, "7", "8", 1U);
378 ASSERT_EQ(4U, vstorage_.EstimateLiveDataSize());
379 }
380
TEST_F(VersionStorageInfoTest,GetOverlappingInputs)381 TEST_F(VersionStorageInfoTest, GetOverlappingInputs) {
382 // Two files that overlap at the range deletion tombstone sentinel.
383 Add(1, 1U, {"a", 0, kTypeValue}, {"b", kMaxSequenceNumber, kTypeRangeDeletion}, 1);
384 Add(1, 2U, {"b", 0, kTypeValue}, {"c", 0, kTypeValue}, 1);
385 // Two files that overlap at the same user key.
386 Add(1, 3U, {"d", 0, kTypeValue}, {"e", kMaxSequenceNumber, kTypeValue}, 1);
387 Add(1, 4U, {"e", 0, kTypeValue}, {"f", 0, kTypeValue}, 1);
388 // Two files that do not overlap.
389 Add(1, 5U, {"g", 0, kTypeValue}, {"h", 0, kTypeValue}, 1);
390 Add(1, 6U, {"i", 0, kTypeValue}, {"j", 0, kTypeValue}, 1);
391 vstorage_.UpdateNumNonEmptyLevels();
392 vstorage_.GenerateLevelFilesBrief();
393
394 ASSERT_EQ("1,2", GetOverlappingFiles(
395 1, {"a", 0, kTypeValue}, {"b", 0, kTypeValue}));
396 ASSERT_EQ("1", GetOverlappingFiles(
397 1, {"a", 0, kTypeValue}, {"b", kMaxSequenceNumber, kTypeRangeDeletion}));
398 ASSERT_EQ("2", GetOverlappingFiles(
399 1, {"b", kMaxSequenceNumber, kTypeValue}, {"c", 0, kTypeValue}));
400 ASSERT_EQ("3,4", GetOverlappingFiles(
401 1, {"d", 0, kTypeValue}, {"e", 0, kTypeValue}));
402 ASSERT_EQ("3", GetOverlappingFiles(
403 1, {"d", 0, kTypeValue}, {"e", kMaxSequenceNumber, kTypeRangeDeletion}));
404 ASSERT_EQ("3,4", GetOverlappingFiles(
405 1, {"e", kMaxSequenceNumber, kTypeValue}, {"f", 0, kTypeValue}));
406 ASSERT_EQ("3,4", GetOverlappingFiles(
407 1, {"e", 0, kTypeValue}, {"f", 0, kTypeValue}));
408 ASSERT_EQ("5", GetOverlappingFiles(
409 1, {"g", 0, kTypeValue}, {"h", 0, kTypeValue}));
410 ASSERT_EQ("6", GetOverlappingFiles(
411 1, {"i", 0, kTypeValue}, {"j", 0, kTypeValue}));
412 }
413
414
415 class FindLevelFileTest : public testing::Test {
416 public:
417 LevelFilesBrief file_level_;
418 bool disjoint_sorted_files_;
419 Arena arena_;
420
FindLevelFileTest()421 FindLevelFileTest() : disjoint_sorted_files_(true) { }
422
~FindLevelFileTest()423 ~FindLevelFileTest() override {}
424
LevelFileInit(size_t num=0)425 void LevelFileInit(size_t num = 0) {
426 char* mem = arena_.AllocateAligned(num * sizeof(FdWithKeyRange));
427 file_level_.files = new (mem)FdWithKeyRange[num];
428 file_level_.num_files = 0;
429 }
430
Add(const char * smallest,const char * largest,SequenceNumber smallest_seq=100,SequenceNumber largest_seq=100)431 void Add(const char* smallest, const char* largest,
432 SequenceNumber smallest_seq = 100,
433 SequenceNumber largest_seq = 100) {
434 InternalKey smallest_key = InternalKey(smallest, smallest_seq, kTypeValue);
435 InternalKey largest_key = InternalKey(largest, largest_seq, kTypeValue);
436
437 Slice smallest_slice = smallest_key.Encode();
438 Slice largest_slice = largest_key.Encode();
439
440 char* mem = arena_.AllocateAligned(
441 smallest_slice.size() + largest_slice.size());
442 memcpy(mem, smallest_slice.data(), smallest_slice.size());
443 memcpy(mem + smallest_slice.size(), largest_slice.data(),
444 largest_slice.size());
445
446 // add to file_level_
447 size_t num = file_level_.num_files;
448 auto& file = file_level_.files[num];
449 file.fd = FileDescriptor(num + 1, 0, 0);
450 file.smallest_key = Slice(mem, smallest_slice.size());
451 file.largest_key = Slice(mem + smallest_slice.size(),
452 largest_slice.size());
453 file_level_.num_files++;
454 }
455
Find(const char * key)456 int Find(const char* key) {
457 InternalKey target(key, 100, kTypeValue);
458 InternalKeyComparator cmp(BytewiseComparator());
459 return FindFile(cmp, file_level_, target.Encode());
460 }
461
Overlaps(const char * smallest,const char * largest)462 bool Overlaps(const char* smallest, const char* largest) {
463 InternalKeyComparator cmp(BytewiseComparator());
464 Slice s(smallest != nullptr ? smallest : "");
465 Slice l(largest != nullptr ? largest : "");
466 return SomeFileOverlapsRange(cmp, disjoint_sorted_files_, file_level_,
467 (smallest != nullptr ? &s : nullptr),
468 (largest != nullptr ? &l : nullptr));
469 }
470 };
471
TEST_F(FindLevelFileTest,LevelEmpty)472 TEST_F(FindLevelFileTest, LevelEmpty) {
473 LevelFileInit(0);
474
475 ASSERT_EQ(0, Find("foo"));
476 ASSERT_TRUE(! Overlaps("a", "z"));
477 ASSERT_TRUE(! Overlaps(nullptr, "z"));
478 ASSERT_TRUE(! Overlaps("a", nullptr));
479 ASSERT_TRUE(! Overlaps(nullptr, nullptr));
480 }
481
TEST_F(FindLevelFileTest,LevelSingle)482 TEST_F(FindLevelFileTest, LevelSingle) {
483 LevelFileInit(1);
484
485 Add("p", "q");
486 ASSERT_EQ(0, Find("a"));
487 ASSERT_EQ(0, Find("p"));
488 ASSERT_EQ(0, Find("p1"));
489 ASSERT_EQ(0, Find("q"));
490 ASSERT_EQ(1, Find("q1"));
491 ASSERT_EQ(1, Find("z"));
492
493 ASSERT_TRUE(! Overlaps("a", "b"));
494 ASSERT_TRUE(! Overlaps("z1", "z2"));
495 ASSERT_TRUE(Overlaps("a", "p"));
496 ASSERT_TRUE(Overlaps("a", "q"));
497 ASSERT_TRUE(Overlaps("a", "z"));
498 ASSERT_TRUE(Overlaps("p", "p1"));
499 ASSERT_TRUE(Overlaps("p", "q"));
500 ASSERT_TRUE(Overlaps("p", "z"));
501 ASSERT_TRUE(Overlaps("p1", "p2"));
502 ASSERT_TRUE(Overlaps("p1", "z"));
503 ASSERT_TRUE(Overlaps("q", "q"));
504 ASSERT_TRUE(Overlaps("q", "q1"));
505
506 ASSERT_TRUE(! Overlaps(nullptr, "j"));
507 ASSERT_TRUE(! Overlaps("r", nullptr));
508 ASSERT_TRUE(Overlaps(nullptr, "p"));
509 ASSERT_TRUE(Overlaps(nullptr, "p1"));
510 ASSERT_TRUE(Overlaps("q", nullptr));
511 ASSERT_TRUE(Overlaps(nullptr, nullptr));
512 }
513
TEST_F(FindLevelFileTest,LevelMultiple)514 TEST_F(FindLevelFileTest, LevelMultiple) {
515 LevelFileInit(4);
516
517 Add("150", "200");
518 Add("200", "250");
519 Add("300", "350");
520 Add("400", "450");
521 ASSERT_EQ(0, Find("100"));
522 ASSERT_EQ(0, Find("150"));
523 ASSERT_EQ(0, Find("151"));
524 ASSERT_EQ(0, Find("199"));
525 ASSERT_EQ(0, Find("200"));
526 ASSERT_EQ(1, Find("201"));
527 ASSERT_EQ(1, Find("249"));
528 ASSERT_EQ(1, Find("250"));
529 ASSERT_EQ(2, Find("251"));
530 ASSERT_EQ(2, Find("299"));
531 ASSERT_EQ(2, Find("300"));
532 ASSERT_EQ(2, Find("349"));
533 ASSERT_EQ(2, Find("350"));
534 ASSERT_EQ(3, Find("351"));
535 ASSERT_EQ(3, Find("400"));
536 ASSERT_EQ(3, Find("450"));
537 ASSERT_EQ(4, Find("451"));
538
539 ASSERT_TRUE(! Overlaps("100", "149"));
540 ASSERT_TRUE(! Overlaps("251", "299"));
541 ASSERT_TRUE(! Overlaps("451", "500"));
542 ASSERT_TRUE(! Overlaps("351", "399"));
543
544 ASSERT_TRUE(Overlaps("100", "150"));
545 ASSERT_TRUE(Overlaps("100", "200"));
546 ASSERT_TRUE(Overlaps("100", "300"));
547 ASSERT_TRUE(Overlaps("100", "400"));
548 ASSERT_TRUE(Overlaps("100", "500"));
549 ASSERT_TRUE(Overlaps("375", "400"));
550 ASSERT_TRUE(Overlaps("450", "450"));
551 ASSERT_TRUE(Overlaps("450", "500"));
552 }
553
TEST_F(FindLevelFileTest,LevelMultipleNullBoundaries)554 TEST_F(FindLevelFileTest, LevelMultipleNullBoundaries) {
555 LevelFileInit(4);
556
557 Add("150", "200");
558 Add("200", "250");
559 Add("300", "350");
560 Add("400", "450");
561 ASSERT_TRUE(! Overlaps(nullptr, "149"));
562 ASSERT_TRUE(! Overlaps("451", nullptr));
563 ASSERT_TRUE(Overlaps(nullptr, nullptr));
564 ASSERT_TRUE(Overlaps(nullptr, "150"));
565 ASSERT_TRUE(Overlaps(nullptr, "199"));
566 ASSERT_TRUE(Overlaps(nullptr, "200"));
567 ASSERT_TRUE(Overlaps(nullptr, "201"));
568 ASSERT_TRUE(Overlaps(nullptr, "400"));
569 ASSERT_TRUE(Overlaps(nullptr, "800"));
570 ASSERT_TRUE(Overlaps("100", nullptr));
571 ASSERT_TRUE(Overlaps("200", nullptr));
572 ASSERT_TRUE(Overlaps("449", nullptr));
573 ASSERT_TRUE(Overlaps("450", nullptr));
574 }
575
TEST_F(FindLevelFileTest,LevelOverlapSequenceChecks)576 TEST_F(FindLevelFileTest, LevelOverlapSequenceChecks) {
577 LevelFileInit(1);
578
579 Add("200", "200", 5000, 3000);
580 ASSERT_TRUE(! Overlaps("199", "199"));
581 ASSERT_TRUE(! Overlaps("201", "300"));
582 ASSERT_TRUE(Overlaps("200", "200"));
583 ASSERT_TRUE(Overlaps("190", "200"));
584 ASSERT_TRUE(Overlaps("200", "210"));
585 }
586
TEST_F(FindLevelFileTest,LevelOverlappingFiles)587 TEST_F(FindLevelFileTest, LevelOverlappingFiles) {
588 LevelFileInit(2);
589
590 Add("150", "600");
591 Add("400", "500");
592 disjoint_sorted_files_ = false;
593 ASSERT_TRUE(! Overlaps("100", "149"));
594 ASSERT_TRUE(! Overlaps("601", "700"));
595 ASSERT_TRUE(Overlaps("100", "150"));
596 ASSERT_TRUE(Overlaps("100", "200"));
597 ASSERT_TRUE(Overlaps("100", "300"));
598 ASSERT_TRUE(Overlaps("100", "400"));
599 ASSERT_TRUE(Overlaps("100", "500"));
600 ASSERT_TRUE(Overlaps("375", "400"));
601 ASSERT_TRUE(Overlaps("450", "450"));
602 ASSERT_TRUE(Overlaps("450", "500"));
603 ASSERT_TRUE(Overlaps("450", "700"));
604 ASSERT_TRUE(Overlaps("600", "700"));
605 }
606
607 class VersionSetTestBase {
608 public:
609 const static std::string kColumnFamilyName1;
610 const static std::string kColumnFamilyName2;
611 const static std::string kColumnFamilyName3;
612 int num_initial_edits_;
613
VersionSetTestBase()614 VersionSetTestBase()
615 : env_(Env::Default()),
616 fs_(std::make_shared<LegacyFileSystemWrapper>(env_)),
617 dbname_(test::PerThreadDBPath("version_set_test")),
618 db_options_(),
619 mutable_cf_options_(cf_options_),
620 table_cache_(NewLRUCache(50000, 16)),
621 write_buffer_manager_(db_options_.db_write_buffer_size),
622 shutting_down_(false),
623 mock_table_factory_(std::make_shared<mock::MockTableFactory>()) {
624 EXPECT_OK(env_->CreateDirIfMissing(dbname_));
625
626 db_options_.env = env_;
627 db_options_.fs = fs_;
628 versions_.reset(new VersionSet(dbname_, &db_options_, env_options_,
629 table_cache_.get(), &write_buffer_manager_,
630 &write_controller_,
631 /*block_cache_tracer=*/nullptr)),
632 reactive_versions_ = std::make_shared<ReactiveVersionSet>(
633 dbname_, &db_options_, env_options_, table_cache_.get(),
634 &write_buffer_manager_, &write_controller_);
635 db_options_.db_paths.emplace_back(dbname_,
636 std::numeric_limits<uint64_t>::max());
637 }
638
PrepareManifest(std::vector<ColumnFamilyDescriptor> * column_families,SequenceNumber * last_seqno,std::unique_ptr<log::Writer> * log_writer)639 void PrepareManifest(std::vector<ColumnFamilyDescriptor>* column_families,
640 SequenceNumber* last_seqno,
641 std::unique_ptr<log::Writer>* log_writer) {
642 assert(column_families != nullptr);
643 assert(last_seqno != nullptr);
644 assert(log_writer != nullptr);
645 VersionEdit new_db;
646 if (db_options_.write_dbid_to_manifest) {
647 DBImpl* impl = new DBImpl(DBOptions(), dbname_);
648 std::string db_id;
649 impl->GetDbIdentityFromIdentityFile(&db_id);
650 new_db.SetDBId(db_id);
651 }
652 new_db.SetLogNumber(0);
653 new_db.SetNextFile(2);
654 new_db.SetLastSequence(0);
655
656 const std::vector<std::string> cf_names = {
657 kDefaultColumnFamilyName, kColumnFamilyName1, kColumnFamilyName2,
658 kColumnFamilyName3};
659 const int kInitialNumOfCfs = static_cast<int>(cf_names.size());
660 autovector<VersionEdit> new_cfs;
661 uint64_t last_seq = 1;
662 uint32_t cf_id = 1;
663 for (int i = 1; i != kInitialNumOfCfs; ++i) {
664 VersionEdit new_cf;
665 new_cf.AddColumnFamily(cf_names[i]);
666 new_cf.SetColumnFamily(cf_id++);
667 new_cf.SetLogNumber(0);
668 new_cf.SetNextFile(2);
669 new_cf.SetLastSequence(last_seq++);
670 new_cfs.emplace_back(new_cf);
671 }
672 *last_seqno = last_seq;
673 num_initial_edits_ = static_cast<int>(new_cfs.size() + 1);
674 const std::string manifest = DescriptorFileName(dbname_, 1);
675 std::unique_ptr<WritableFile> file;
676 Status s = env_->NewWritableFile(
677 manifest, &file, env_->OptimizeForManifestWrite(env_options_));
678 ASSERT_OK(s);
679 std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
680 NewLegacyWritableFileWrapper(std::move(file)), manifest, env_options_));
681 {
682 log_writer->reset(new log::Writer(std::move(file_writer), 0, false));
683 std::string record;
684 new_db.EncodeTo(&record);
685 s = (*log_writer)->AddRecord(record);
686 for (const auto& e : new_cfs) {
687 record.clear();
688 e.EncodeTo(&record);
689 s = (*log_writer)->AddRecord(record);
690 ASSERT_OK(s);
691 }
692 }
693 ASSERT_OK(s);
694
695 cf_options_.table_factory = mock_table_factory_;
696 for (const auto& cf_name : cf_names) {
697 column_families->emplace_back(cf_name, cf_options_);
698 }
699 }
700
701 // Create DB with 3 column families.
NewDB()702 void NewDB() {
703 std::vector<ColumnFamilyDescriptor> column_families;
704 SequenceNumber last_seqno;
705 std::unique_ptr<log::Writer> log_writer;
706 SetIdentityFile(env_, dbname_);
707 PrepareManifest(&column_families, &last_seqno, &log_writer);
708 log_writer.reset();
709 // Make "CURRENT" file point to the new manifest file.
710 Status s = SetCurrentFile(env_, dbname_, 1, nullptr);
711 ASSERT_OK(s);
712
713 EXPECT_OK(versions_->Recover(column_families, false));
714 EXPECT_EQ(column_families.size(),
715 versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
716 }
717
718 Env* env_;
719 std::shared_ptr<FileSystem> fs_;
720 const std::string dbname_;
721 EnvOptions env_options_;
722 ImmutableDBOptions db_options_;
723 ColumnFamilyOptions cf_options_;
724 MutableCFOptions mutable_cf_options_;
725 std::shared_ptr<Cache> table_cache_;
726 WriteController write_controller_;
727 WriteBufferManager write_buffer_manager_;
728 std::shared_ptr<VersionSet> versions_;
729 std::shared_ptr<ReactiveVersionSet> reactive_versions_;
730 InstrumentedMutex mutex_;
731 std::atomic<bool> shutting_down_;
732 std::shared_ptr<mock::MockTableFactory> mock_table_factory_;
733 };
734
735 const std::string VersionSetTestBase::kColumnFamilyName1 = "alice";
736 const std::string VersionSetTestBase::kColumnFamilyName2 = "bob";
737 const std::string VersionSetTestBase::kColumnFamilyName3 = "charles";
738
739 class VersionSetTest : public VersionSetTestBase, public testing::Test {
740 public:
VersionSetTest()741 VersionSetTest() : VersionSetTestBase() {}
742 };
743
TEST_F(VersionSetTest,SameColumnFamilyGroupCommit)744 TEST_F(VersionSetTest, SameColumnFamilyGroupCommit) {
745 NewDB();
746 const int kGroupSize = 5;
747 autovector<VersionEdit> edits;
748 for (int i = 0; i != kGroupSize; ++i) {
749 edits.emplace_back(VersionEdit());
750 }
751 autovector<ColumnFamilyData*> cfds;
752 autovector<const MutableCFOptions*> all_mutable_cf_options;
753 autovector<autovector<VersionEdit*>> edit_lists;
754 for (int i = 0; i != kGroupSize; ++i) {
755 cfds.emplace_back(versions_->GetColumnFamilySet()->GetDefault());
756 all_mutable_cf_options.emplace_back(&mutable_cf_options_);
757 autovector<VersionEdit*> edit_list;
758 edit_list.emplace_back(&edits[i]);
759 edit_lists.emplace_back(edit_list);
760 }
761
762 SyncPoint::GetInstance()->DisableProcessing();
763 SyncPoint::GetInstance()->ClearAllCallBacks();
764 int count = 0;
765 SyncPoint::GetInstance()->SetCallBack(
766 "VersionSet::ProcessManifestWrites:SameColumnFamily", [&](void* arg) {
767 uint32_t* cf_id = reinterpret_cast<uint32_t*>(arg);
768 EXPECT_EQ(0u, *cf_id);
769 ++count;
770 });
771 SyncPoint::GetInstance()->EnableProcessing();
772 mutex_.Lock();
773 Status s =
774 versions_->LogAndApply(cfds, all_mutable_cf_options, edit_lists, &mutex_);
775 mutex_.Unlock();
776 EXPECT_OK(s);
777 EXPECT_EQ(kGroupSize - 1, count);
778 }
779
780 class VersionSetAtomicGroupTest : public VersionSetTestBase,
781 public testing::Test {
782 public:
VersionSetAtomicGroupTest()783 VersionSetAtomicGroupTest() : VersionSetTestBase() {}
784
SetUp()785 void SetUp() override {
786 PrepareManifest(&column_families_, &last_seqno_, &log_writer_);
787 SetupTestSyncPoints();
788 }
789
SetupValidAtomicGroup(int atomic_group_size)790 void SetupValidAtomicGroup(int atomic_group_size) {
791 edits_.resize(atomic_group_size);
792 int remaining = atomic_group_size;
793 for (size_t i = 0; i != edits_.size(); ++i) {
794 edits_[i].SetLogNumber(0);
795 edits_[i].SetNextFile(2);
796 edits_[i].MarkAtomicGroup(--remaining);
797 edits_[i].SetLastSequence(last_seqno_++);
798 }
799 ASSERT_OK(SetCurrentFile(env_, dbname_, 1, nullptr));
800 }
801
SetupIncompleteTrailingAtomicGroup(int atomic_group_size)802 void SetupIncompleteTrailingAtomicGroup(int atomic_group_size) {
803 edits_.resize(atomic_group_size);
804 int remaining = atomic_group_size;
805 for (size_t i = 0; i != edits_.size(); ++i) {
806 edits_[i].SetLogNumber(0);
807 edits_[i].SetNextFile(2);
808 edits_[i].MarkAtomicGroup(--remaining);
809 edits_[i].SetLastSequence(last_seqno_++);
810 }
811 ASSERT_OK(SetCurrentFile(env_, dbname_, 1, nullptr));
812 }
813
SetupCorruptedAtomicGroup(int atomic_group_size)814 void SetupCorruptedAtomicGroup(int atomic_group_size) {
815 edits_.resize(atomic_group_size);
816 int remaining = atomic_group_size;
817 for (size_t i = 0; i != edits_.size(); ++i) {
818 edits_[i].SetLogNumber(0);
819 edits_[i].SetNextFile(2);
820 if (i != ((size_t)atomic_group_size / 2)) {
821 edits_[i].MarkAtomicGroup(--remaining);
822 }
823 edits_[i].SetLastSequence(last_seqno_++);
824 }
825 ASSERT_OK(SetCurrentFile(env_, dbname_, 1, nullptr));
826 }
827
SetupIncorrectAtomicGroup(int atomic_group_size)828 void SetupIncorrectAtomicGroup(int atomic_group_size) {
829 edits_.resize(atomic_group_size);
830 int remaining = atomic_group_size;
831 for (size_t i = 0; i != edits_.size(); ++i) {
832 edits_[i].SetLogNumber(0);
833 edits_[i].SetNextFile(2);
834 if (i != 1) {
835 edits_[i].MarkAtomicGroup(--remaining);
836 } else {
837 edits_[i].MarkAtomicGroup(remaining--);
838 }
839 edits_[i].SetLastSequence(last_seqno_++);
840 }
841 ASSERT_OK(SetCurrentFile(env_, dbname_, 1, nullptr));
842 }
843
SetupTestSyncPoints()844 void SetupTestSyncPoints() {
845 SyncPoint::GetInstance()->DisableProcessing();
846 SyncPoint::GetInstance()->ClearAllCallBacks();
847 SyncPoint::GetInstance()->SetCallBack(
848 "AtomicGroupReadBuffer::AddEdit:FirstInAtomicGroup", [&](void* arg) {
849 VersionEdit* e = reinterpret_cast<VersionEdit*>(arg);
850 EXPECT_EQ(edits_.front().DebugString(),
851 e->DebugString()); // compare based on value
852 first_in_atomic_group_ = true;
853 });
854 SyncPoint::GetInstance()->SetCallBack(
855 "AtomicGroupReadBuffer::AddEdit:LastInAtomicGroup", [&](void* arg) {
856 VersionEdit* e = reinterpret_cast<VersionEdit*>(arg);
857 EXPECT_EQ(edits_.back().DebugString(),
858 e->DebugString()); // compare based on value
859 EXPECT_TRUE(first_in_atomic_group_);
860 last_in_atomic_group_ = true;
861 });
862 SyncPoint::GetInstance()->SetCallBack(
863 "VersionSet::ReadAndRecover:RecoveredEdits", [&](void* arg) {
864 num_recovered_edits_ = *reinterpret_cast<int*>(arg);
865 });
866 SyncPoint::GetInstance()->SetCallBack(
867 "ReactiveVersionSet::ReadAndApply:AppliedEdits",
868 [&](void* arg) { num_applied_edits_ = *reinterpret_cast<int*>(arg); });
869 SyncPoint::GetInstance()->SetCallBack(
870 "AtomicGroupReadBuffer::AddEdit:AtomicGroup",
871 [&](void* /* arg */) { ++num_edits_in_atomic_group_; });
872 SyncPoint::GetInstance()->SetCallBack(
873 "AtomicGroupReadBuffer::AddEdit:AtomicGroupMixedWithNormalEdits",
874 [&](void* arg) {
875 corrupted_edit_ = *reinterpret_cast<VersionEdit*>(arg);
876 });
877 SyncPoint::GetInstance()->SetCallBack(
878 "AtomicGroupReadBuffer::AddEdit:IncorrectAtomicGroupSize",
879 [&](void* arg) {
880 edit_with_incorrect_group_size_ =
881 *reinterpret_cast<VersionEdit*>(arg);
882 });
883 SyncPoint::GetInstance()->EnableProcessing();
884 }
885
AddNewEditsToLog(int num_edits)886 void AddNewEditsToLog(int num_edits) {
887 for (int i = 0; i < num_edits; i++) {
888 std::string record;
889 edits_[i].EncodeTo(&record);
890 ASSERT_OK(log_writer_->AddRecord(record));
891 }
892 }
893
TearDown()894 void TearDown() override {
895 SyncPoint::GetInstance()->DisableProcessing();
896 SyncPoint::GetInstance()->ClearAllCallBacks();
897 log_writer_.reset();
898 }
899
900 protected:
901 std::vector<ColumnFamilyDescriptor> column_families_;
902 SequenceNumber last_seqno_;
903 std::vector<VersionEdit> edits_;
904 bool first_in_atomic_group_ = false;
905 bool last_in_atomic_group_ = false;
906 int num_edits_in_atomic_group_ = 0;
907 int num_recovered_edits_ = 0;
908 int num_applied_edits_ = 0;
909 VersionEdit corrupted_edit_;
910 VersionEdit edit_with_incorrect_group_size_;
911 std::unique_ptr<log::Writer> log_writer_;
912 };
913
TEST_F(VersionSetAtomicGroupTest,HandleValidAtomicGroupWithVersionSetRecover)914 TEST_F(VersionSetAtomicGroupTest, HandleValidAtomicGroupWithVersionSetRecover) {
915 const int kAtomicGroupSize = 3;
916 SetupValidAtomicGroup(kAtomicGroupSize);
917 AddNewEditsToLog(kAtomicGroupSize);
918 EXPECT_OK(versions_->Recover(column_families_, false));
919 EXPECT_EQ(column_families_.size(),
920 versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
921 EXPECT_TRUE(first_in_atomic_group_);
922 EXPECT_TRUE(last_in_atomic_group_);
923 EXPECT_EQ(num_initial_edits_ + kAtomicGroupSize, num_recovered_edits_);
924 EXPECT_EQ(0, num_applied_edits_);
925 }
926
TEST_F(VersionSetAtomicGroupTest,HandleValidAtomicGroupWithReactiveVersionSetRecover)927 TEST_F(VersionSetAtomicGroupTest,
928 HandleValidAtomicGroupWithReactiveVersionSetRecover) {
929 const int kAtomicGroupSize = 3;
930 SetupValidAtomicGroup(kAtomicGroupSize);
931 AddNewEditsToLog(kAtomicGroupSize);
932 std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
933 std::unique_ptr<log::Reader::Reporter> manifest_reporter;
934 std::unique_ptr<Status> manifest_reader_status;
935 EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,
936 &manifest_reporter,
937 &manifest_reader_status));
938 EXPECT_EQ(column_families_.size(),
939 reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
940 EXPECT_TRUE(first_in_atomic_group_);
941 EXPECT_TRUE(last_in_atomic_group_);
942 // The recover should clean up the replay buffer.
943 EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() == 0);
944 EXPECT_TRUE(reactive_versions_->replay_buffer().size() == 0);
945 EXPECT_EQ(num_initial_edits_ + kAtomicGroupSize, num_recovered_edits_);
946 EXPECT_EQ(0, num_applied_edits_);
947 }
948
TEST_F(VersionSetAtomicGroupTest,HandleValidAtomicGroupWithReactiveVersionSetReadAndApply)949 TEST_F(VersionSetAtomicGroupTest,
950 HandleValidAtomicGroupWithReactiveVersionSetReadAndApply) {
951 const int kAtomicGroupSize = 3;
952 SetupValidAtomicGroup(kAtomicGroupSize);
953 std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
954 std::unique_ptr<log::Reader::Reporter> manifest_reporter;
955 std::unique_ptr<Status> manifest_reader_status;
956 EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,
957 &manifest_reporter,
958 &manifest_reader_status));
959 AddNewEditsToLog(kAtomicGroupSize);
960 InstrumentedMutex mu;
961 std::unordered_set<ColumnFamilyData*> cfds_changed;
962 mu.Lock();
963 EXPECT_OK(
964 reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed));
965 mu.Unlock();
966 EXPECT_TRUE(first_in_atomic_group_);
967 EXPECT_TRUE(last_in_atomic_group_);
968 // The recover should clean up the replay buffer.
969 EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() == 0);
970 EXPECT_TRUE(reactive_versions_->replay_buffer().size() == 0);
971 EXPECT_EQ(num_initial_edits_, num_recovered_edits_);
972 EXPECT_EQ(kAtomicGroupSize, num_applied_edits_);
973 }
974
TEST_F(VersionSetAtomicGroupTest,HandleIncompleteTrailingAtomicGroupWithVersionSetRecover)975 TEST_F(VersionSetAtomicGroupTest,
976 HandleIncompleteTrailingAtomicGroupWithVersionSetRecover) {
977 const int kAtomicGroupSize = 4;
978 const int kNumberOfPersistedVersionEdits = kAtomicGroupSize - 1;
979 SetupIncompleteTrailingAtomicGroup(kAtomicGroupSize);
980 AddNewEditsToLog(kNumberOfPersistedVersionEdits);
981 EXPECT_OK(versions_->Recover(column_families_, false));
982 EXPECT_EQ(column_families_.size(),
983 versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
984 EXPECT_TRUE(first_in_atomic_group_);
985 EXPECT_FALSE(last_in_atomic_group_);
986 EXPECT_EQ(kNumberOfPersistedVersionEdits, num_edits_in_atomic_group_);
987 EXPECT_EQ(num_initial_edits_, num_recovered_edits_);
988 EXPECT_EQ(0, num_applied_edits_);
989 }
990
TEST_F(VersionSetAtomicGroupTest,HandleIncompleteTrailingAtomicGroupWithReactiveVersionSetRecover)991 TEST_F(VersionSetAtomicGroupTest,
992 HandleIncompleteTrailingAtomicGroupWithReactiveVersionSetRecover) {
993 const int kAtomicGroupSize = 4;
994 const int kNumberOfPersistedVersionEdits = kAtomicGroupSize - 1;
995 SetupIncompleteTrailingAtomicGroup(kAtomicGroupSize);
996 AddNewEditsToLog(kNumberOfPersistedVersionEdits);
997 std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
998 std::unique_ptr<log::Reader::Reporter> manifest_reporter;
999 std::unique_ptr<Status> manifest_reader_status;
1000 EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,
1001 &manifest_reporter,
1002 &manifest_reader_status));
1003 EXPECT_EQ(column_families_.size(),
1004 reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
1005 EXPECT_TRUE(first_in_atomic_group_);
1006 EXPECT_FALSE(last_in_atomic_group_);
1007 EXPECT_EQ(kNumberOfPersistedVersionEdits, num_edits_in_atomic_group_);
1008 // Reactive version set should store the edits in the replay buffer.
1009 EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() ==
1010 kNumberOfPersistedVersionEdits);
1011 EXPECT_TRUE(reactive_versions_->replay_buffer().size() == kAtomicGroupSize);
1012 // Write the last record. The reactive version set should now apply all
1013 // edits.
1014 std::string last_record;
1015 edits_[kAtomicGroupSize - 1].EncodeTo(&last_record);
1016 EXPECT_OK(log_writer_->AddRecord(last_record));
1017 InstrumentedMutex mu;
1018 std::unordered_set<ColumnFamilyData*> cfds_changed;
1019 mu.Lock();
1020 EXPECT_OK(
1021 reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed));
1022 mu.Unlock();
1023 // Reactive version set should be empty now.
1024 EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() == 0);
1025 EXPECT_TRUE(reactive_versions_->replay_buffer().size() == 0);
1026 EXPECT_EQ(num_initial_edits_, num_recovered_edits_);
1027 EXPECT_EQ(kAtomicGroupSize, num_applied_edits_);
1028 }
1029
TEST_F(VersionSetAtomicGroupTest,HandleIncompleteTrailingAtomicGroupWithReactiveVersionSetReadAndApply)1030 TEST_F(VersionSetAtomicGroupTest,
1031 HandleIncompleteTrailingAtomicGroupWithReactiveVersionSetReadAndApply) {
1032 const int kAtomicGroupSize = 4;
1033 const int kNumberOfPersistedVersionEdits = kAtomicGroupSize - 1;
1034 SetupIncompleteTrailingAtomicGroup(kAtomicGroupSize);
1035 std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
1036 std::unique_ptr<log::Reader::Reporter> manifest_reporter;
1037 std::unique_ptr<Status> manifest_reader_status;
1038 // No edits in an atomic group.
1039 EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,
1040 &manifest_reporter,
1041 &manifest_reader_status));
1042 EXPECT_EQ(column_families_.size(),
1043 reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
1044 // Write a few edits in an atomic group.
1045 AddNewEditsToLog(kNumberOfPersistedVersionEdits);
1046 InstrumentedMutex mu;
1047 std::unordered_set<ColumnFamilyData*> cfds_changed;
1048 mu.Lock();
1049 EXPECT_OK(
1050 reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed));
1051 mu.Unlock();
1052 EXPECT_TRUE(first_in_atomic_group_);
1053 EXPECT_FALSE(last_in_atomic_group_);
1054 EXPECT_EQ(kNumberOfPersistedVersionEdits, num_edits_in_atomic_group_);
1055 // Reactive version set should store the edits in the replay buffer.
1056 EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() ==
1057 kNumberOfPersistedVersionEdits);
1058 EXPECT_TRUE(reactive_versions_->replay_buffer().size() == kAtomicGroupSize);
1059 EXPECT_EQ(num_initial_edits_, num_recovered_edits_);
1060 EXPECT_EQ(0, num_applied_edits_);
1061 }
1062
TEST_F(VersionSetAtomicGroupTest,HandleCorruptedAtomicGroupWithVersionSetRecover)1063 TEST_F(VersionSetAtomicGroupTest,
1064 HandleCorruptedAtomicGroupWithVersionSetRecover) {
1065 const int kAtomicGroupSize = 4;
1066 SetupCorruptedAtomicGroup(kAtomicGroupSize);
1067 AddNewEditsToLog(kAtomicGroupSize);
1068 EXPECT_NOK(versions_->Recover(column_families_, false));
1069 EXPECT_EQ(column_families_.size(),
1070 versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
1071 EXPECT_EQ(edits_[kAtomicGroupSize / 2].DebugString(),
1072 corrupted_edit_.DebugString());
1073 }
1074
TEST_F(VersionSetAtomicGroupTest,HandleCorruptedAtomicGroupWithReactiveVersionSetRecover)1075 TEST_F(VersionSetAtomicGroupTest,
1076 HandleCorruptedAtomicGroupWithReactiveVersionSetRecover) {
1077 const int kAtomicGroupSize = 4;
1078 SetupCorruptedAtomicGroup(kAtomicGroupSize);
1079 AddNewEditsToLog(kAtomicGroupSize);
1080 std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
1081 std::unique_ptr<log::Reader::Reporter> manifest_reporter;
1082 std::unique_ptr<Status> manifest_reader_status;
1083 EXPECT_NOK(reactive_versions_->Recover(column_families_, &manifest_reader,
1084 &manifest_reporter,
1085 &manifest_reader_status));
1086 EXPECT_EQ(column_families_.size(),
1087 reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
1088 EXPECT_EQ(edits_[kAtomicGroupSize / 2].DebugString(),
1089 corrupted_edit_.DebugString());
1090 }
1091
TEST_F(VersionSetAtomicGroupTest,HandleCorruptedAtomicGroupWithReactiveVersionSetReadAndApply)1092 TEST_F(VersionSetAtomicGroupTest,
1093 HandleCorruptedAtomicGroupWithReactiveVersionSetReadAndApply) {
1094 const int kAtomicGroupSize = 4;
1095 SetupCorruptedAtomicGroup(kAtomicGroupSize);
1096 InstrumentedMutex mu;
1097 std::unordered_set<ColumnFamilyData*> cfds_changed;
1098 std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
1099 std::unique_ptr<log::Reader::Reporter> manifest_reporter;
1100 std::unique_ptr<Status> manifest_reader_status;
1101 EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,
1102 &manifest_reporter,
1103 &manifest_reader_status));
1104 // Write the corrupted edits.
1105 AddNewEditsToLog(kAtomicGroupSize);
1106 mu.Lock();
1107 EXPECT_OK(
1108 reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed));
1109 mu.Unlock();
1110 EXPECT_EQ(edits_[kAtomicGroupSize / 2].DebugString(),
1111 corrupted_edit_.DebugString());
1112 }
1113
TEST_F(VersionSetAtomicGroupTest,HandleIncorrectAtomicGroupSizeWithVersionSetRecover)1114 TEST_F(VersionSetAtomicGroupTest,
1115 HandleIncorrectAtomicGroupSizeWithVersionSetRecover) {
1116 const int kAtomicGroupSize = 4;
1117 SetupIncorrectAtomicGroup(kAtomicGroupSize);
1118 AddNewEditsToLog(kAtomicGroupSize);
1119 EXPECT_NOK(versions_->Recover(column_families_, false));
1120 EXPECT_EQ(column_families_.size(),
1121 versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
1122 EXPECT_EQ(edits_[1].DebugString(),
1123 edit_with_incorrect_group_size_.DebugString());
1124 }
1125
TEST_F(VersionSetAtomicGroupTest,HandleIncorrectAtomicGroupSizeWithReactiveVersionSetRecover)1126 TEST_F(VersionSetAtomicGroupTest,
1127 HandleIncorrectAtomicGroupSizeWithReactiveVersionSetRecover) {
1128 const int kAtomicGroupSize = 4;
1129 SetupIncorrectAtomicGroup(kAtomicGroupSize);
1130 AddNewEditsToLog(kAtomicGroupSize);
1131 std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
1132 std::unique_ptr<log::Reader::Reporter> manifest_reporter;
1133 std::unique_ptr<Status> manifest_reader_status;
1134 EXPECT_NOK(reactive_versions_->Recover(column_families_, &manifest_reader,
1135 &manifest_reporter,
1136 &manifest_reader_status));
1137 EXPECT_EQ(column_families_.size(),
1138 reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
1139 EXPECT_EQ(edits_[1].DebugString(),
1140 edit_with_incorrect_group_size_.DebugString());
1141 }
1142
TEST_F(VersionSetAtomicGroupTest,HandleIncorrectAtomicGroupSizeWithReactiveVersionSetReadAndApply)1143 TEST_F(VersionSetAtomicGroupTest,
1144 HandleIncorrectAtomicGroupSizeWithReactiveVersionSetReadAndApply) {
1145 const int kAtomicGroupSize = 4;
1146 SetupIncorrectAtomicGroup(kAtomicGroupSize);
1147 InstrumentedMutex mu;
1148 std::unordered_set<ColumnFamilyData*> cfds_changed;
1149 std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
1150 std::unique_ptr<log::Reader::Reporter> manifest_reporter;
1151 std::unique_ptr<Status> manifest_reader_status;
1152 EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,
1153 &manifest_reporter,
1154 &manifest_reader_status));
1155 AddNewEditsToLog(kAtomicGroupSize);
1156 mu.Lock();
1157 EXPECT_OK(
1158 reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed));
1159 mu.Unlock();
1160 EXPECT_EQ(edits_[1].DebugString(),
1161 edit_with_incorrect_group_size_.DebugString());
1162 }
1163
1164 class VersionSetTestDropOneCF : public VersionSetTestBase,
1165 public testing::TestWithParam<std::string> {
1166 public:
VersionSetTestDropOneCF()1167 VersionSetTestDropOneCF() : VersionSetTestBase() {}
1168 };
1169
1170 // This test simulates the following execution sequence
1171 // Time thread1 bg_flush_thr
1172 // | Prepare version edits (e1,e2,e3) for atomic
1173 // | flush cf1, cf2, cf3
1174 // | Enqueue e to drop cfi
1175 // | to manifest_writers_
1176 // | Enqueue (e1,e2,e3) to manifest_writers_
1177 // |
1178 // | Apply e,
1179 // | cfi.IsDropped() is true
1180 // | Apply (e1,e2,e3),
1181 // | since cfi.IsDropped() == true, we need to
1182 // | drop ei and write the rest to MANIFEST.
1183 // V
1184 //
1185 // Repeat the test for i = 1, 2, 3 to simulate dropping the first, middle and
1186 // last column family in an atomic group.
TEST_P(VersionSetTestDropOneCF,HandleDroppedColumnFamilyInAtomicGroup)1187 TEST_P(VersionSetTestDropOneCF, HandleDroppedColumnFamilyInAtomicGroup) {
1188 std::vector<ColumnFamilyDescriptor> column_families;
1189 SequenceNumber last_seqno;
1190 std::unique_ptr<log::Writer> log_writer;
1191 PrepareManifest(&column_families, &last_seqno, &log_writer);
1192 Status s = SetCurrentFile(env_, dbname_, 1, nullptr);
1193 ASSERT_OK(s);
1194
1195 EXPECT_OK(versions_->Recover(column_families, false /* read_only */));
1196 EXPECT_EQ(column_families.size(),
1197 versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
1198
1199 const int kAtomicGroupSize = 3;
1200 const std::vector<std::string> non_default_cf_names = {
1201 kColumnFamilyName1, kColumnFamilyName2, kColumnFamilyName3};
1202
1203 // Drop one column family
1204 VersionEdit drop_cf_edit;
1205 drop_cf_edit.DropColumnFamily();
1206 const std::string cf_to_drop_name(GetParam());
1207 auto cfd_to_drop =
1208 versions_->GetColumnFamilySet()->GetColumnFamily(cf_to_drop_name);
1209 ASSERT_NE(nullptr, cfd_to_drop);
1210 // Increase its refcount because cfd_to_drop is used later, and we need to
1211 // prevent it from being deleted.
1212 cfd_to_drop->Ref();
1213 drop_cf_edit.SetColumnFamily(cfd_to_drop->GetID());
1214 mutex_.Lock();
1215 s = versions_->LogAndApply(cfd_to_drop,
1216 *cfd_to_drop->GetLatestMutableCFOptions(),
1217 &drop_cf_edit, &mutex_);
1218 mutex_.Unlock();
1219 ASSERT_OK(s);
1220
1221 std::vector<VersionEdit> edits(kAtomicGroupSize);
1222 uint32_t remaining = kAtomicGroupSize;
1223 size_t i = 0;
1224 autovector<ColumnFamilyData*> cfds;
1225 autovector<const MutableCFOptions*> mutable_cf_options_list;
1226 autovector<autovector<VersionEdit*>> edit_lists;
1227 for (const auto& cf_name : non_default_cf_names) {
1228 auto cfd = (cf_name != cf_to_drop_name)
1229 ? versions_->GetColumnFamilySet()->GetColumnFamily(cf_name)
1230 : cfd_to_drop;
1231 ASSERT_NE(nullptr, cfd);
1232 cfds.push_back(cfd);
1233 mutable_cf_options_list.emplace_back(cfd->GetLatestMutableCFOptions());
1234 edits[i].SetColumnFamily(cfd->GetID());
1235 edits[i].SetLogNumber(0);
1236 edits[i].SetNextFile(2);
1237 edits[i].MarkAtomicGroup(--remaining);
1238 edits[i].SetLastSequence(last_seqno++);
1239 autovector<VersionEdit*> tmp_edits;
1240 tmp_edits.push_back(&edits[i]);
1241 edit_lists.emplace_back(tmp_edits);
1242 ++i;
1243 }
1244 int called = 0;
1245 SyncPoint::GetInstance()->DisableProcessing();
1246 SyncPoint::GetInstance()->ClearAllCallBacks();
1247 SyncPoint::GetInstance()->SetCallBack(
1248 "VersionSet::ProcessManifestWrites:CheckOneAtomicGroup", [&](void* arg) {
1249 std::vector<VersionEdit*>* tmp_edits =
1250 reinterpret_cast<std::vector<VersionEdit*>*>(arg);
1251 EXPECT_EQ(kAtomicGroupSize - 1, tmp_edits->size());
1252 for (const auto e : *tmp_edits) {
1253 bool found = false;
1254 for (const auto& e2 : edits) {
1255 if (&e2 == e) {
1256 found = true;
1257 break;
1258 }
1259 }
1260 ASSERT_TRUE(found);
1261 }
1262 ++called;
1263 });
1264 SyncPoint::GetInstance()->EnableProcessing();
1265 mutex_.Lock();
1266 s = versions_->LogAndApply(cfds, mutable_cf_options_list, edit_lists,
1267 &mutex_);
1268 mutex_.Unlock();
1269 ASSERT_OK(s);
1270 ASSERT_EQ(1, called);
1271 if (cfd_to_drop->Unref()) {
1272 delete cfd_to_drop;
1273 cfd_to_drop = nullptr;
1274 }
1275 }
1276
1277 INSTANTIATE_TEST_CASE_P(
1278 AtomicGroup, VersionSetTestDropOneCF,
1279 testing::Values(VersionSetTestBase::kColumnFamilyName1,
1280 VersionSetTestBase::kColumnFamilyName2,
1281 VersionSetTestBase::kColumnFamilyName3));
1282 } // namespace ROCKSDB_NAMESPACE
1283
main(int argc,char ** argv)1284 int main(int argc, char** argv) {
1285 ::testing::InitGoogleTest(&argc, argv);
1286 return RUN_ALL_TESTS();
1287 }
1288