1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "db/db_test_util.h"
11 #include "port/port.h"
12 #include "port/stack_trace.h"
13 #include "rocksdb/concurrent_task_limiter.h"
14 #include "rocksdb/experimental.h"
15 #include "rocksdb/sst_file_writer.h"
16 #include "rocksdb/utilities/convenience.h"
17 #include "test_util/fault_injection_test_env.h"
18 #include "test_util/sync_point.h"
19 #include "util/concurrent_task_limiter_impl.h"
20
21 namespace ROCKSDB_NAMESPACE {
22
23 // SYNC_POINT is not supported in released Windows mode.
24 #if !defined(ROCKSDB_LITE)
25
26 class DBCompactionTest : public DBTestBase {
27 public:
DBCompactionTest()28 DBCompactionTest() : DBTestBase("/db_compaction_test") {}
29 };
30
31 class DBCompactionTestWithParam
32 : public DBTestBase,
33 public testing::WithParamInterface<std::tuple<uint32_t, bool>> {
34 public:
DBCompactionTestWithParam()35 DBCompactionTestWithParam() : DBTestBase("/db_compaction_test") {
36 max_subcompactions_ = std::get<0>(GetParam());
37 exclusive_manual_compaction_ = std::get<1>(GetParam());
38 }
39
40 // Required if inheriting from testing::WithParamInterface<>
SetUpTestCase()41 static void SetUpTestCase() {}
TearDownTestCase()42 static void TearDownTestCase() {}
43
44 uint32_t max_subcompactions_;
45 bool exclusive_manual_compaction_;
46 };
47
48 class DBCompactionDirectIOTest : public DBCompactionTest,
49 public ::testing::WithParamInterface<bool> {
50 public:
DBCompactionDirectIOTest()51 DBCompactionDirectIOTest() : DBCompactionTest() {}
52 };
53
54 namespace {
55
56 class FlushedFileCollector : public EventListener {
57 public:
FlushedFileCollector()58 FlushedFileCollector() {}
~FlushedFileCollector()59 ~FlushedFileCollector() override {}
60
OnFlushCompleted(DB *,const FlushJobInfo & info)61 void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override {
62 std::lock_guard<std::mutex> lock(mutex_);
63 flushed_files_.push_back(info.file_path);
64 }
65
GetFlushedFiles()66 std::vector<std::string> GetFlushedFiles() {
67 std::lock_guard<std::mutex> lock(mutex_);
68 std::vector<std::string> result;
69 for (auto fname : flushed_files_) {
70 result.push_back(fname);
71 }
72 return result;
73 }
74
ClearFlushedFiles()75 void ClearFlushedFiles() { flushed_files_.clear(); }
76
77 private:
78 std::vector<std::string> flushed_files_;
79 std::mutex mutex_;
80 };
81
82 class CompactionStatsCollector : public EventListener {
83 public:
CompactionStatsCollector()84 CompactionStatsCollector()
85 : compaction_completed_(static_cast<int>(CompactionReason::kNumOfReasons)) {
86 for (auto& v : compaction_completed_) {
87 v.store(0);
88 }
89 }
90
~CompactionStatsCollector()91 ~CompactionStatsCollector() override {}
92
OnCompactionCompleted(DB *,const CompactionJobInfo & info)93 void OnCompactionCompleted(DB* /* db */,
94 const CompactionJobInfo& info) override {
95 int k = static_cast<int>(info.compaction_reason);
96 int num_of_reasons = static_cast<int>(CompactionReason::kNumOfReasons);
97 assert(k >= 0 && k < num_of_reasons);
98 compaction_completed_[k]++;
99 }
100
OnExternalFileIngested(DB *,const ExternalFileIngestionInfo &)101 void OnExternalFileIngested(
102 DB* /* db */, const ExternalFileIngestionInfo& /* info */) override {
103 int k = static_cast<int>(CompactionReason::kExternalSstIngestion);
104 compaction_completed_[k]++;
105 }
106
OnFlushCompleted(DB *,const FlushJobInfo &)107 void OnFlushCompleted(DB* /* db */, const FlushJobInfo& /* info */) override {
108 int k = static_cast<int>(CompactionReason::kFlush);
109 compaction_completed_[k]++;
110 }
111
NumberOfCompactions(CompactionReason reason) const112 int NumberOfCompactions(CompactionReason reason) const {
113 int num_of_reasons = static_cast<int>(CompactionReason::kNumOfReasons);
114 int k = static_cast<int>(reason);
115 assert(k >= 0 && k < num_of_reasons);
116 return compaction_completed_.at(k).load();
117 }
118
119 private:
120 std::vector<std::atomic<int>> compaction_completed_;
121 };
122
123 class SstStatsCollector : public EventListener {
124 public:
SstStatsCollector()125 SstStatsCollector() : num_ssts_creation_started_(0) {}
126
OnTableFileCreationStarted(const TableFileCreationBriefInfo &)127 void OnTableFileCreationStarted(
128 const TableFileCreationBriefInfo& /* info */) override {
129 ++num_ssts_creation_started_;
130 }
131
num_ssts_creation_started()132 int num_ssts_creation_started() { return num_ssts_creation_started_; }
133
134 private:
135 std::atomic<int> num_ssts_creation_started_;
136 };
137
138 static const int kCDTValueSize = 1000;
139 static const int kCDTKeysPerBuffer = 4;
140 static const int kCDTNumLevels = 8;
DeletionTriggerOptions(Options options)141 Options DeletionTriggerOptions(Options options) {
142 options.compression = kNoCompression;
143 options.write_buffer_size = kCDTKeysPerBuffer * (kCDTValueSize + 24);
144 options.min_write_buffer_number_to_merge = 1;
145 options.max_write_buffer_size_to_maintain = 0;
146 options.num_levels = kCDTNumLevels;
147 options.level0_file_num_compaction_trigger = 1;
148 options.target_file_size_base = options.write_buffer_size * 2;
149 options.target_file_size_multiplier = 2;
150 options.max_bytes_for_level_base =
151 options.target_file_size_base * options.target_file_size_multiplier;
152 options.max_bytes_for_level_multiplier = 2;
153 options.disable_auto_compactions = false;
154 return options;
155 }
156
HaveOverlappingKeyRanges(const Comparator * c,const SstFileMetaData & a,const SstFileMetaData & b)157 bool HaveOverlappingKeyRanges(
158 const Comparator* c,
159 const SstFileMetaData& a, const SstFileMetaData& b) {
160 if (c->Compare(a.smallestkey, b.smallestkey) >= 0) {
161 if (c->Compare(a.smallestkey, b.largestkey) <= 0) {
162 // b.smallestkey <= a.smallestkey <= b.largestkey
163 return true;
164 }
165 } else if (c->Compare(a.largestkey, b.smallestkey) >= 0) {
166 // a.smallestkey < b.smallestkey <= a.largestkey
167 return true;
168 }
169 if (c->Compare(a.largestkey, b.largestkey) <= 0) {
170 if (c->Compare(a.largestkey, b.smallestkey) >= 0) {
171 // b.smallestkey <= a.largestkey <= b.largestkey
172 return true;
173 }
174 } else if (c->Compare(a.smallestkey, b.largestkey) <= 0) {
175 // a.smallestkey <= b.largestkey < a.largestkey
176 return true;
177 }
178 return false;
179 }
180
181 // Identifies all files between level "min_level" and "max_level"
182 // which has overlapping key range with "input_file_meta".
GetOverlappingFileNumbersForLevelCompaction(const ColumnFamilyMetaData & cf_meta,const Comparator * comparator,int min_level,int max_level,const SstFileMetaData * input_file_meta,std::set<std::string> * overlapping_file_names)183 void GetOverlappingFileNumbersForLevelCompaction(
184 const ColumnFamilyMetaData& cf_meta,
185 const Comparator* comparator,
186 int min_level, int max_level,
187 const SstFileMetaData* input_file_meta,
188 std::set<std::string>* overlapping_file_names) {
189 std::set<const SstFileMetaData*> overlapping_files;
190 overlapping_files.insert(input_file_meta);
191 for (int m = min_level; m <= max_level; ++m) {
192 for (auto& file : cf_meta.levels[m].files) {
193 for (auto* included_file : overlapping_files) {
194 if (HaveOverlappingKeyRanges(
195 comparator, *included_file, file)) {
196 overlapping_files.insert(&file);
197 overlapping_file_names->insert(file.name);
198 break;
199 }
200 }
201 }
202 }
203 }
204
VerifyCompactionResult(const ColumnFamilyMetaData & cf_meta,const std::set<std::string> & overlapping_file_numbers)205 void VerifyCompactionResult(
206 const ColumnFamilyMetaData& cf_meta,
207 const std::set<std::string>& overlapping_file_numbers) {
208 #ifndef NDEBUG
209 for (auto& level : cf_meta.levels) {
210 for (auto& file : level.files) {
211 assert(overlapping_file_numbers.find(file.name) ==
212 overlapping_file_numbers.end());
213 }
214 }
215 #endif
216 }
217
218 /*
219 * Verifies compaction stats of cfd are valid.
220 *
221 * For each level of cfd, its compaction stats are valid if
222 * 1) sum(stat.counts) == stat.count, and
223 * 2) stat.counts[i] == collector.NumberOfCompactions(i)
224 */
VerifyCompactionStats(ColumnFamilyData & cfd,const CompactionStatsCollector & collector)225 void VerifyCompactionStats(ColumnFamilyData& cfd,
226 const CompactionStatsCollector& collector) {
227 #ifndef NDEBUG
228 InternalStats* internal_stats_ptr = cfd.internal_stats();
229 ASSERT_TRUE(internal_stats_ptr != nullptr);
230 const std::vector<InternalStats::CompactionStats>& comp_stats =
231 internal_stats_ptr->TEST_GetCompactionStats();
232 const int num_of_reasons = static_cast<int>(CompactionReason::kNumOfReasons);
233 std::vector<int> counts(num_of_reasons, 0);
234 // Count the number of compactions caused by each CompactionReason across
235 // all levels.
236 for (const auto& stat : comp_stats) {
237 int sum = 0;
238 for (int i = 0; i < num_of_reasons; i++) {
239 counts[i] += stat.counts[i];
240 sum += stat.counts[i];
241 }
242 ASSERT_EQ(sum, stat.count);
243 }
244 // Verify InternalStats bookkeeping matches that of CompactionStatsCollector,
245 // assuming that all compactions complete.
246 for (int i = 0; i < num_of_reasons; i++) {
247 ASSERT_EQ(collector.NumberOfCompactions(static_cast<CompactionReason>(i)), counts[i]);
248 }
249 #endif /* NDEBUG */
250 }
251
PickFileRandomly(const ColumnFamilyMetaData & cf_meta,Random * rand,int * level=nullptr)252 const SstFileMetaData* PickFileRandomly(
253 const ColumnFamilyMetaData& cf_meta,
254 Random* rand,
255 int* level = nullptr) {
256 auto file_id = rand->Uniform(static_cast<int>(
257 cf_meta.file_count)) + 1;
258 for (auto& level_meta : cf_meta.levels) {
259 if (file_id <= level_meta.files.size()) {
260 if (level != nullptr) {
261 *level = level_meta.level;
262 }
263 auto result = rand->Uniform(file_id);
264 return &(level_meta.files[result]);
265 }
266 file_id -= static_cast<uint32_t>(level_meta.files.size());
267 }
268 assert(false);
269 return nullptr;
270 }
271 } // anonymous namespace
272
273 #ifndef ROCKSDB_VALGRIND_RUN
274 // All the TEST_P tests run once with sub_compactions disabled (i.e.
275 // options.max_subcompactions = 1) and once with it enabled
TEST_P(DBCompactionTestWithParam,CompactionDeletionTrigger)276 TEST_P(DBCompactionTestWithParam, CompactionDeletionTrigger) {
277 for (int tid = 0; tid < 3; ++tid) {
278 uint64_t db_size[2];
279 Options options = DeletionTriggerOptions(CurrentOptions());
280 options.max_subcompactions = max_subcompactions_;
281
282 if (tid == 1) {
283 // the following only disable stats update in DB::Open()
284 // and should not affect the result of this test.
285 options.skip_stats_update_on_db_open = true;
286 } else if (tid == 2) {
287 // third pass with universal compaction
288 options.compaction_style = kCompactionStyleUniversal;
289 options.num_levels = 1;
290 }
291
292 DestroyAndReopen(options);
293 Random rnd(301);
294
295 const int kTestSize = kCDTKeysPerBuffer * 1024;
296 std::vector<std::string> values;
297 for (int k = 0; k < kTestSize; ++k) {
298 values.push_back(RandomString(&rnd, kCDTValueSize));
299 ASSERT_OK(Put(Key(k), values[k]));
300 }
301 dbfull()->TEST_WaitForFlushMemTable();
302 dbfull()->TEST_WaitForCompact();
303 db_size[0] = Size(Key(0), Key(kTestSize - 1));
304
305 for (int k = 0; k < kTestSize; ++k) {
306 ASSERT_OK(Delete(Key(k)));
307 }
308 dbfull()->TEST_WaitForFlushMemTable();
309 dbfull()->TEST_WaitForCompact();
310 db_size[1] = Size(Key(0), Key(kTestSize - 1));
311
312 // must have much smaller db size.
313 ASSERT_GT(db_size[0] / 3, db_size[1]);
314 }
315 }
316 #endif // ROCKSDB_VALGRIND_RUN
317
TEST_P(DBCompactionTestWithParam,CompactionsPreserveDeletes)318 TEST_P(DBCompactionTestWithParam, CompactionsPreserveDeletes) {
319 // For each options type we test following
320 // - Enable preserve_deletes
321 // - write bunch of keys and deletes
322 // - Set start_seqnum to the beginning; compact; check that keys are present
323 // - rewind start_seqnum way forward; compact; check that keys are gone
324
325 for (int tid = 0; tid < 3; ++tid) {
326 Options options = DeletionTriggerOptions(CurrentOptions());
327 options.max_subcompactions = max_subcompactions_;
328 options.preserve_deletes=true;
329 options.num_levels = 2;
330
331 if (tid == 1) {
332 options.skip_stats_update_on_db_open = true;
333 } else if (tid == 2) {
334 // third pass with universal compaction
335 options.compaction_style = kCompactionStyleUniversal;
336 }
337
338 DestroyAndReopen(options);
339 Random rnd(301);
340 // highlight the default; all deletes should be preserved
341 SetPreserveDeletesSequenceNumber(0);
342
343 const int kTestSize = kCDTKeysPerBuffer;
344 std::vector<std::string> values;
345 for (int k = 0; k < kTestSize; ++k) {
346 values.push_back(RandomString(&rnd, kCDTValueSize));
347 ASSERT_OK(Put(Key(k), values[k]));
348 }
349
350 for (int k = 0; k < kTestSize; ++k) {
351 ASSERT_OK(Delete(Key(k)));
352 }
353 // to ensure we tackle all tombstones
354 CompactRangeOptions cro;
355 cro.change_level = true;
356 cro.target_level = 2;
357 cro.bottommost_level_compaction =
358 BottommostLevelCompaction::kForceOptimized;
359
360 dbfull()->TEST_WaitForFlushMemTable();
361 dbfull()->CompactRange(cro, nullptr, nullptr);
362
363 // check that normal user iterator doesn't see anything
364 Iterator* db_iter = dbfull()->NewIterator(ReadOptions());
365 int i = 0;
366 for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) {
367 i++;
368 }
369 ASSERT_EQ(i, 0);
370 delete db_iter;
371
372 // check that iterator that sees internal keys sees tombstones
373 ReadOptions ro;
374 ro.iter_start_seqnum=1;
375 db_iter = dbfull()->NewIterator(ro);
376 i = 0;
377 for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) {
378 i++;
379 }
380 ASSERT_EQ(i, 4);
381 delete db_iter;
382
383 // now all deletes should be gone
384 SetPreserveDeletesSequenceNumber(100000000);
385 dbfull()->CompactRange(cro, nullptr, nullptr);
386
387 db_iter = dbfull()->NewIterator(ro);
388 i = 0;
389 for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) {
390 i++;
391 }
392 ASSERT_EQ(i, 0);
393 delete db_iter;
394 }
395 }
396
TEST_F(DBCompactionTest,SkipStatsUpdateTest)397 TEST_F(DBCompactionTest, SkipStatsUpdateTest) {
398 // This test verify UpdateAccumulatedStats is not on
399 // if options.skip_stats_update_on_db_open = true
400 // The test will need to be updated if the internal behavior changes.
401
402 Options options = DeletionTriggerOptions(CurrentOptions());
403 options.disable_auto_compactions = true;
404 options.env = env_;
405 DestroyAndReopen(options);
406 Random rnd(301);
407
408 const int kTestSize = kCDTKeysPerBuffer * 512;
409 std::vector<std::string> values;
410 for (int k = 0; k < kTestSize; ++k) {
411 values.push_back(RandomString(&rnd, kCDTValueSize));
412 ASSERT_OK(Put(Key(k), values[k]));
413 }
414
415 ASSERT_OK(Flush());
416
417 Close();
418
419 int update_acc_stats_called = 0;
420 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
421 "VersionStorageInfo::UpdateAccumulatedStats",
422 [&](void* /* arg */) { ++update_acc_stats_called; });
423 SyncPoint::GetInstance()->EnableProcessing();
424
425 // Reopen the DB with stats-update disabled
426 options.skip_stats_update_on_db_open = true;
427 options.max_open_files = 20;
428 Reopen(options);
429
430 ASSERT_EQ(update_acc_stats_called, 0);
431
432 // Repeat the reopen process, but this time we enable
433 // stats-update.
434 options.skip_stats_update_on_db_open = false;
435 Reopen(options);
436
437 ASSERT_GT(update_acc_stats_called, 0);
438
439 SyncPoint::GetInstance()->ClearAllCallBacks();
440 SyncPoint::GetInstance()->DisableProcessing();
441 }
442
TEST_F(DBCompactionTest,TestTableReaderForCompaction)443 TEST_F(DBCompactionTest, TestTableReaderForCompaction) {
444 Options options = CurrentOptions();
445 options.env = env_;
446 options.new_table_reader_for_compaction_inputs = true;
447 options.max_open_files = 20;
448 options.level0_file_num_compaction_trigger = 3;
449 DestroyAndReopen(options);
450 Random rnd(301);
451
452 int num_table_cache_lookup = 0;
453 int num_new_table_reader = 0;
454 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
455 "TableCache::FindTable:0", [&](void* arg) {
456 assert(arg != nullptr);
457 bool no_io = *(reinterpret_cast<bool*>(arg));
458 if (!no_io) {
459 // filter out cases for table properties queries.
460 num_table_cache_lookup++;
461 }
462 });
463 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
464 "TableCache::GetTableReader:0",
465 [&](void* /*arg*/) { num_new_table_reader++; });
466 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
467
468 for (int k = 0; k < options.level0_file_num_compaction_trigger; ++k) {
469 ASSERT_OK(Put(Key(k), Key(k)));
470 ASSERT_OK(Put(Key(10 - k), "bar"));
471 if (k < options.level0_file_num_compaction_trigger - 1) {
472 num_table_cache_lookup = 0;
473 Flush();
474 dbfull()->TEST_WaitForCompact();
475 // preloading iterator issues one table cache lookup and create
476 // a new table reader, if not preloaded.
477 int old_num_table_cache_lookup = num_table_cache_lookup;
478 ASSERT_GE(num_table_cache_lookup, 1);
479 ASSERT_EQ(num_new_table_reader, 1);
480
481 num_table_cache_lookup = 0;
482 num_new_table_reader = 0;
483 ASSERT_EQ(Key(k), Get(Key(k)));
484 // lookup iterator from table cache and no need to create a new one.
485 ASSERT_EQ(old_num_table_cache_lookup + num_table_cache_lookup, 2);
486 ASSERT_EQ(num_new_table_reader, 0);
487 }
488 }
489
490 num_table_cache_lookup = 0;
491 num_new_table_reader = 0;
492 Flush();
493 dbfull()->TEST_WaitForCompact();
494 // Preloading iterator issues one table cache lookup and creates
495 // a new table reader. One file is created for flush and one for compaction.
496 // Compaction inputs make no table cache look-up for data/range deletion
497 // iterators
498 // May preload table cache too.
499 ASSERT_GE(num_table_cache_lookup, 2);
500 int old_num_table_cache_lookup2 = num_table_cache_lookup;
501
502 // Create new iterator for:
503 // (1) 1 for verifying flush results
504 // (2) 1 for verifying compaction results.
505 // (3) New TableReaders will not be created for compaction inputs
506 ASSERT_EQ(num_new_table_reader, 2);
507
508 num_table_cache_lookup = 0;
509 num_new_table_reader = 0;
510 ASSERT_EQ(Key(1), Get(Key(1)));
511 ASSERT_EQ(num_table_cache_lookup + old_num_table_cache_lookup2, 5);
512 ASSERT_EQ(num_new_table_reader, 0);
513
514 num_table_cache_lookup = 0;
515 num_new_table_reader = 0;
516 CompactRangeOptions cro;
517 cro.change_level = true;
518 cro.target_level = 2;
519 cro.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized;
520 db_->CompactRange(cro, nullptr, nullptr);
521 // Only verifying compaction outputs issues one table cache lookup
522 // for both data block and range deletion block).
523 // May preload table cache too.
524 ASSERT_GE(num_table_cache_lookup, 1);
525 old_num_table_cache_lookup2 = num_table_cache_lookup;
526 // One for verifying compaction results.
527 // No new iterator created for compaction.
528 ASSERT_EQ(num_new_table_reader, 1);
529
530 num_table_cache_lookup = 0;
531 num_new_table_reader = 0;
532 ASSERT_EQ(Key(1), Get(Key(1)));
533 ASSERT_EQ(num_table_cache_lookup + old_num_table_cache_lookup2, 3);
534 ASSERT_EQ(num_new_table_reader, 0);
535
536 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
537 }
538
TEST_P(DBCompactionTestWithParam,CompactionDeletionTriggerReopen)539 TEST_P(DBCompactionTestWithParam, CompactionDeletionTriggerReopen) {
540 for (int tid = 0; tid < 2; ++tid) {
541 uint64_t db_size[3];
542 Options options = DeletionTriggerOptions(CurrentOptions());
543 options.max_subcompactions = max_subcompactions_;
544
545 if (tid == 1) {
546 // second pass with universal compaction
547 options.compaction_style = kCompactionStyleUniversal;
548 options.num_levels = 1;
549 }
550
551 DestroyAndReopen(options);
552 Random rnd(301);
553
554 // round 1 --- insert key/value pairs.
555 const int kTestSize = kCDTKeysPerBuffer * 512;
556 std::vector<std::string> values;
557 for (int k = 0; k < kTestSize; ++k) {
558 values.push_back(RandomString(&rnd, kCDTValueSize));
559 ASSERT_OK(Put(Key(k), values[k]));
560 }
561 dbfull()->TEST_WaitForFlushMemTable();
562 dbfull()->TEST_WaitForCompact();
563 db_size[0] = Size(Key(0), Key(kTestSize - 1));
564 Close();
565
566 // round 2 --- disable auto-compactions and issue deletions.
567 options.create_if_missing = false;
568 options.disable_auto_compactions = true;
569 Reopen(options);
570
571 for (int k = 0; k < kTestSize; ++k) {
572 ASSERT_OK(Delete(Key(k)));
573 }
574 db_size[1] = Size(Key(0), Key(kTestSize - 1));
575 Close();
576 // as auto_compaction is off, we shouldn't see too much reduce
577 // in db size.
578 ASSERT_LT(db_size[0] / 3, db_size[1]);
579
580 // round 3 --- reopen db with auto_compaction on and see if
581 // deletion compensation still work.
582 options.disable_auto_compactions = false;
583 Reopen(options);
584 // insert relatively small amount of data to trigger auto compaction.
585 for (int k = 0; k < kTestSize / 10; ++k) {
586 ASSERT_OK(Put(Key(k), values[k]));
587 }
588 dbfull()->TEST_WaitForFlushMemTable();
589 dbfull()->TEST_WaitForCompact();
590 db_size[2] = Size(Key(0), Key(kTestSize - 1));
591 // this time we're expecting significant drop in size.
592 ASSERT_GT(db_size[0] / 3, db_size[2]);
593 }
594 }
595
TEST_F(DBCompactionTest,DisableStatsUpdateReopen)596 TEST_F(DBCompactionTest, DisableStatsUpdateReopen) {
597 uint64_t db_size[3];
598 for (int test = 0; test < 2; ++test) {
599 Options options = DeletionTriggerOptions(CurrentOptions());
600 options.skip_stats_update_on_db_open = (test == 0);
601
602 env_->random_read_counter_.Reset();
603 DestroyAndReopen(options);
604 Random rnd(301);
605
606 // round 1 --- insert key/value pairs.
607 const int kTestSize = kCDTKeysPerBuffer * 512;
608 std::vector<std::string> values;
609 for (int k = 0; k < kTestSize; ++k) {
610 values.push_back(RandomString(&rnd, kCDTValueSize));
611 ASSERT_OK(Put(Key(k), values[k]));
612 }
613 dbfull()->TEST_WaitForFlushMemTable();
614 dbfull()->TEST_WaitForCompact();
615 db_size[0] = Size(Key(0), Key(kTestSize - 1));
616 Close();
617
618 // round 2 --- disable auto-compactions and issue deletions.
619 options.create_if_missing = false;
620 options.disable_auto_compactions = true;
621
622 env_->random_read_counter_.Reset();
623 Reopen(options);
624
625 for (int k = 0; k < kTestSize; ++k) {
626 ASSERT_OK(Delete(Key(k)));
627 }
628 db_size[1] = Size(Key(0), Key(kTestSize - 1));
629 Close();
630 // as auto_compaction is off, we shouldn't see too much reduce
631 // in db size.
632 ASSERT_LT(db_size[0] / 3, db_size[1]);
633
634 // round 3 --- reopen db with auto_compaction on and see if
635 // deletion compensation still work.
636 options.disable_auto_compactions = false;
637 Reopen(options);
638 dbfull()->TEST_WaitForFlushMemTable();
639 dbfull()->TEST_WaitForCompact();
640 db_size[2] = Size(Key(0), Key(kTestSize - 1));
641
642 if (options.skip_stats_update_on_db_open) {
643 // If update stats on DB::Open is disable, we don't expect
644 // deletion entries taking effect.
645 ASSERT_LT(db_size[0] / 3, db_size[2]);
646 } else {
647 // Otherwise, we should see a significant drop in db size.
648 ASSERT_GT(db_size[0] / 3, db_size[2]);
649 }
650 }
651 }
652
653
TEST_P(DBCompactionTestWithParam,CompactionTrigger)654 TEST_P(DBCompactionTestWithParam, CompactionTrigger) {
655 const int kNumKeysPerFile = 100;
656
657 Options options = CurrentOptions();
658 options.write_buffer_size = 110 << 10; // 110KB
659 options.arena_block_size = 4 << 10;
660 options.num_levels = 3;
661 options.level0_file_num_compaction_trigger = 3;
662 options.max_subcompactions = max_subcompactions_;
663 options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
664 CreateAndReopenWithCF({"pikachu"}, options);
665
666 Random rnd(301);
667
668 for (int num = 0; num < options.level0_file_num_compaction_trigger - 1;
669 num++) {
670 std::vector<std::string> values;
671 // Write 100KB (100 values, each 1K)
672 for (int i = 0; i < kNumKeysPerFile; i++) {
673 values.push_back(RandomString(&rnd, 990));
674 ASSERT_OK(Put(1, Key(i), values[i]));
675 }
676 // put extra key to trigger flush
677 ASSERT_OK(Put(1, "", ""));
678 dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
679 ASSERT_EQ(NumTableFilesAtLevel(0, 1), num + 1);
680 }
681
682 // generate one more file in level-0, and should trigger level-0 compaction
683 std::vector<std::string> values;
684 for (int i = 0; i < kNumKeysPerFile; i++) {
685 values.push_back(RandomString(&rnd, 990));
686 ASSERT_OK(Put(1, Key(i), values[i]));
687 }
688 // put extra key to trigger flush
689 ASSERT_OK(Put(1, "", ""));
690 dbfull()->TEST_WaitForCompact();
691
692 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
693 ASSERT_EQ(NumTableFilesAtLevel(1, 1), 1);
694 }
695
TEST_F(DBCompactionTest,BGCompactionsAllowed)696 TEST_F(DBCompactionTest, BGCompactionsAllowed) {
697 // Create several column families. Make compaction triggers in all of them
698 // and see number of compactions scheduled to be less than allowed.
699 const int kNumKeysPerFile = 100;
700
701 Options options = CurrentOptions();
702 options.write_buffer_size = 110 << 10; // 110KB
703 options.arena_block_size = 4 << 10;
704 options.num_levels = 3;
705 // Should speed up compaction when there are 4 files.
706 options.level0_file_num_compaction_trigger = 2;
707 options.level0_slowdown_writes_trigger = 20;
708 options.soft_pending_compaction_bytes_limit = 1 << 30; // Infinitely large
709 options.max_background_compactions = 3;
710 options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
711
712 // Block all threads in thread pool.
713 const size_t kTotalTasks = 4;
714 env_->SetBackgroundThreads(4, Env::LOW);
715 test::SleepingBackgroundTask sleeping_tasks[kTotalTasks];
716 for (size_t i = 0; i < kTotalTasks; i++) {
717 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
718 &sleeping_tasks[i], Env::Priority::LOW);
719 sleeping_tasks[i].WaitUntilSleeping();
720 }
721
722 CreateAndReopenWithCF({"one", "two", "three"}, options);
723
724 Random rnd(301);
725 for (int cf = 0; cf < 4; cf++) {
726 for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) {
727 for (int i = 0; i < kNumKeysPerFile; i++) {
728 ASSERT_OK(Put(cf, Key(i), ""));
729 }
730 // put extra key to trigger flush
731 ASSERT_OK(Put(cf, "", ""));
732 dbfull()->TEST_WaitForFlushMemTable(handles_[cf]);
733 ASSERT_EQ(NumTableFilesAtLevel(0, cf), num + 1);
734 }
735 }
736
737 // Now all column families qualify compaction but only one should be
738 // scheduled, because no column family hits speed up condition.
739 ASSERT_EQ(1u, env_->GetThreadPoolQueueLen(Env::Priority::LOW));
740
741 // Create two more files for one column family, which triggers speed up
742 // condition, three compactions will be scheduled.
743 for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) {
744 for (int i = 0; i < kNumKeysPerFile; i++) {
745 ASSERT_OK(Put(2, Key(i), ""));
746 }
747 // put extra key to trigger flush
748 ASSERT_OK(Put(2, "", ""));
749 dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
750 ASSERT_EQ(options.level0_file_num_compaction_trigger + num + 1,
751 NumTableFilesAtLevel(0, 2));
752 }
753 ASSERT_EQ(3U, env_->GetThreadPoolQueueLen(Env::Priority::LOW));
754
755 // Unblock all threads to unblock all compactions.
756 for (size_t i = 0; i < kTotalTasks; i++) {
757 sleeping_tasks[i].WakeUp();
758 sleeping_tasks[i].WaitUntilDone();
759 }
760 dbfull()->TEST_WaitForCompact();
761
762 // Verify number of compactions allowed will come back to 1.
763
764 for (size_t i = 0; i < kTotalTasks; i++) {
765 sleeping_tasks[i].Reset();
766 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
767 &sleeping_tasks[i], Env::Priority::LOW);
768 sleeping_tasks[i].WaitUntilSleeping();
769 }
770 for (int cf = 0; cf < 4; cf++) {
771 for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) {
772 for (int i = 0; i < kNumKeysPerFile; i++) {
773 ASSERT_OK(Put(cf, Key(i), ""));
774 }
775 // put extra key to trigger flush
776 ASSERT_OK(Put(cf, "", ""));
777 dbfull()->TEST_WaitForFlushMemTable(handles_[cf]);
778 ASSERT_EQ(NumTableFilesAtLevel(0, cf), num + 1);
779 }
780 }
781
782 // Now all column families qualify compaction but only one should be
783 // scheduled, because no column family hits speed up condition.
784 ASSERT_EQ(1U, env_->GetThreadPoolQueueLen(Env::Priority::LOW));
785
786 for (size_t i = 0; i < kTotalTasks; i++) {
787 sleeping_tasks[i].WakeUp();
788 sleeping_tasks[i].WaitUntilDone();
789 }
790 }
791
TEST_P(DBCompactionTestWithParam,CompactionsGenerateMultipleFiles)792 TEST_P(DBCompactionTestWithParam, CompactionsGenerateMultipleFiles) {
793 Options options = CurrentOptions();
794 options.write_buffer_size = 100000000; // Large write buffer
795 options.max_subcompactions = max_subcompactions_;
796 CreateAndReopenWithCF({"pikachu"}, options);
797
798 Random rnd(301);
799
800 // Write 8MB (80 values, each 100K)
801 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
802 std::vector<std::string> values;
803 for (int i = 0; i < 80; i++) {
804 values.push_back(RandomString(&rnd, 100000));
805 ASSERT_OK(Put(1, Key(i), values[i]));
806 }
807
808 // Reopening moves updates to level-0
809 ReopenWithColumnFamilies({"default", "pikachu"}, options);
810 dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1],
811 true /* disallow trivial move */);
812
813 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
814 ASSERT_GT(NumTableFilesAtLevel(1, 1), 1);
815 for (int i = 0; i < 80; i++) {
816 ASSERT_EQ(Get(1, Key(i)), values[i]);
817 }
818 }
819
TEST_F(DBCompactionTest,MinorCompactionsHappen)820 TEST_F(DBCompactionTest, MinorCompactionsHappen) {
821 do {
822 Options options = CurrentOptions();
823 options.write_buffer_size = 10000;
824 CreateAndReopenWithCF({"pikachu"}, options);
825
826 const int N = 500;
827
828 int starting_num_tables = TotalTableFiles(1);
829 for (int i = 0; i < N; i++) {
830 ASSERT_OK(Put(1, Key(i), Key(i) + std::string(1000, 'v')));
831 }
832 int ending_num_tables = TotalTableFiles(1);
833 ASSERT_GT(ending_num_tables, starting_num_tables);
834
835 for (int i = 0; i < N; i++) {
836 ASSERT_EQ(Key(i) + std::string(1000, 'v'), Get(1, Key(i)));
837 }
838
839 ReopenWithColumnFamilies({"default", "pikachu"}, options);
840
841 for (int i = 0; i < N; i++) {
842 ASSERT_EQ(Key(i) + std::string(1000, 'v'), Get(1, Key(i)));
843 }
844 } while (ChangeCompactOptions());
845 }
846
TEST_F(DBCompactionTest,UserKeyCrossFile1)847 TEST_F(DBCompactionTest, UserKeyCrossFile1) {
848 Options options = CurrentOptions();
849 options.compaction_style = kCompactionStyleLevel;
850 options.level0_file_num_compaction_trigger = 3;
851
852 DestroyAndReopen(options);
853
854 // create first file and flush to l0
855 Put("4", "A");
856 Put("3", "A");
857 Flush();
858 dbfull()->TEST_WaitForFlushMemTable();
859
860 Put("2", "A");
861 Delete("3");
862 Flush();
863 dbfull()->TEST_WaitForFlushMemTable();
864 ASSERT_EQ("NOT_FOUND", Get("3"));
865
866 // move both files down to l1
867 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
868 ASSERT_EQ("NOT_FOUND", Get("3"));
869
870 for (int i = 0; i < 3; i++) {
871 Put("2", "B");
872 Flush();
873 dbfull()->TEST_WaitForFlushMemTable();
874 }
875 dbfull()->TEST_WaitForCompact();
876
877 ASSERT_EQ("NOT_FOUND", Get("3"));
878 }
879
TEST_F(DBCompactionTest,UserKeyCrossFile2)880 TEST_F(DBCompactionTest, UserKeyCrossFile2) {
881 Options options = CurrentOptions();
882 options.compaction_style = kCompactionStyleLevel;
883 options.level0_file_num_compaction_trigger = 3;
884
885 DestroyAndReopen(options);
886
887 // create first file and flush to l0
888 Put("4", "A");
889 Put("3", "A");
890 Flush();
891 dbfull()->TEST_WaitForFlushMemTable();
892
893 Put("2", "A");
894 SingleDelete("3");
895 Flush();
896 dbfull()->TEST_WaitForFlushMemTable();
897 ASSERT_EQ("NOT_FOUND", Get("3"));
898
899 // move both files down to l1
900 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
901 ASSERT_EQ("NOT_FOUND", Get("3"));
902
903 for (int i = 0; i < 3; i++) {
904 Put("2", "B");
905 Flush();
906 dbfull()->TEST_WaitForFlushMemTable();
907 }
908 dbfull()->TEST_WaitForCompact();
909
910 ASSERT_EQ("NOT_FOUND", Get("3"));
911 }
912
TEST_F(DBCompactionTest,ZeroSeqIdCompaction)913 TEST_F(DBCompactionTest, ZeroSeqIdCompaction) {
914 Options options = CurrentOptions();
915 options.compaction_style = kCompactionStyleLevel;
916 options.level0_file_num_compaction_trigger = 3;
917
918 FlushedFileCollector* collector = new FlushedFileCollector();
919 options.listeners.emplace_back(collector);
920
921 // compaction options
922 CompactionOptions compact_opt;
923 compact_opt.compression = kNoCompression;
924 compact_opt.output_file_size_limit = 4096;
925 const size_t key_len =
926 static_cast<size_t>(compact_opt.output_file_size_limit) / 5;
927
928 DestroyAndReopen(options);
929
930 std::vector<const Snapshot*> snaps;
931
932 // create first file and flush to l0
933 for (auto& key : {"1", "2", "3", "3", "3", "3"}) {
934 Put(key, std::string(key_len, 'A'));
935 snaps.push_back(dbfull()->GetSnapshot());
936 }
937 Flush();
938 dbfull()->TEST_WaitForFlushMemTable();
939
940 // create second file and flush to l0
941 for (auto& key : {"3", "4", "5", "6", "7", "8"}) {
942 Put(key, std::string(key_len, 'A'));
943 snaps.push_back(dbfull()->GetSnapshot());
944 }
945 Flush();
946 dbfull()->TEST_WaitForFlushMemTable();
947
948 // move both files down to l1
949 dbfull()->CompactFiles(compact_opt, collector->GetFlushedFiles(), 1);
950
951 // release snap so that first instance of key(3) can have seqId=0
952 for (auto snap : snaps) {
953 dbfull()->ReleaseSnapshot(snap);
954 }
955
956 // create 3 files in l0 so to trigger compaction
957 for (int i = 0; i < options.level0_file_num_compaction_trigger; i++) {
958 Put("2", std::string(1, 'A'));
959 Flush();
960 dbfull()->TEST_WaitForFlushMemTable();
961 }
962
963 dbfull()->TEST_WaitForCompact();
964 ASSERT_OK(Put("", ""));
965 }
966
TEST_F(DBCompactionTest,ManualCompactionUnknownOutputSize)967 TEST_F(DBCompactionTest, ManualCompactionUnknownOutputSize) {
968 // github issue #2249
969 Options options = CurrentOptions();
970 options.compaction_style = kCompactionStyleLevel;
971 options.level0_file_num_compaction_trigger = 3;
972 DestroyAndReopen(options);
973
974 // create two files in l1 that we can compact
975 for (int i = 0; i < 2; ++i) {
976 for (int j = 0; j < options.level0_file_num_compaction_trigger; j++) {
977 // make l0 files' ranges overlap to avoid trivial move
978 Put(std::to_string(2 * i), std::string(1, 'A'));
979 Put(std::to_string(2 * i + 1), std::string(1, 'A'));
980 Flush();
981 dbfull()->TEST_WaitForFlushMemTable();
982 }
983 dbfull()->TEST_WaitForCompact();
984 ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0);
985 ASSERT_EQ(NumTableFilesAtLevel(1, 0), i + 1);
986 }
987
988 ColumnFamilyMetaData cf_meta;
989 dbfull()->GetColumnFamilyMetaData(dbfull()->DefaultColumnFamily(), &cf_meta);
990 ASSERT_EQ(2, cf_meta.levels[1].files.size());
991 std::vector<std::string> input_filenames;
992 for (const auto& sst_file : cf_meta.levels[1].files) {
993 input_filenames.push_back(sst_file.name);
994 }
995
996 // note CompactionOptions::output_file_size_limit is unset.
997 CompactionOptions compact_opt;
998 compact_opt.compression = kNoCompression;
999 dbfull()->CompactFiles(compact_opt, input_filenames, 1);
1000 }
1001
1002 // Check that writes done during a memtable compaction are recovered
1003 // if the database is shutdown during the memtable compaction.
TEST_F(DBCompactionTest,RecoverDuringMemtableCompaction)1004 TEST_F(DBCompactionTest, RecoverDuringMemtableCompaction) {
1005 do {
1006 Options options = CurrentOptions();
1007 options.env = env_;
1008 CreateAndReopenWithCF({"pikachu"}, options);
1009
1010 // Trigger a long memtable compaction and reopen the database during it
1011 ASSERT_OK(Put(1, "foo", "v1")); // Goes to 1st log file
1012 ASSERT_OK(Put(1, "big1", std::string(10000000, 'x'))); // Fills memtable
1013 ASSERT_OK(Put(1, "big2", std::string(1000, 'y'))); // Triggers compaction
1014 ASSERT_OK(Put(1, "bar", "v2")); // Goes to new log file
1015
1016 ReopenWithColumnFamilies({"default", "pikachu"}, options);
1017 ASSERT_EQ("v1", Get(1, "foo"));
1018 ASSERT_EQ("v2", Get(1, "bar"));
1019 ASSERT_EQ(std::string(10000000, 'x'), Get(1, "big1"));
1020 ASSERT_EQ(std::string(1000, 'y'), Get(1, "big2"));
1021 } while (ChangeOptions());
1022 }
1023
TEST_P(DBCompactionTestWithParam,TrivialMoveOneFile)1024 TEST_P(DBCompactionTestWithParam, TrivialMoveOneFile) {
1025 int32_t trivial_move = 0;
1026 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1027 "DBImpl::BackgroundCompaction:TrivialMove",
1028 [&](void* /*arg*/) { trivial_move++; });
1029 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1030
1031 Options options = CurrentOptions();
1032 options.write_buffer_size = 100000000;
1033 options.max_subcompactions = max_subcompactions_;
1034 DestroyAndReopen(options);
1035
1036 int32_t num_keys = 80;
1037 int32_t value_size = 100 * 1024; // 100 KB
1038
1039 Random rnd(301);
1040 std::vector<std::string> values;
1041 for (int i = 0; i < num_keys; i++) {
1042 values.push_back(RandomString(&rnd, value_size));
1043 ASSERT_OK(Put(Key(i), values[i]));
1044 }
1045
1046 // Reopening moves updates to L0
1047 Reopen(options);
1048 ASSERT_EQ(NumTableFilesAtLevel(0, 0), 1); // 1 file in L0
1049 ASSERT_EQ(NumTableFilesAtLevel(1, 0), 0); // 0 files in L1
1050
1051 std::vector<LiveFileMetaData> metadata;
1052 db_->GetLiveFilesMetaData(&metadata);
1053 ASSERT_EQ(metadata.size(), 1U);
1054 LiveFileMetaData level0_file = metadata[0]; // L0 file meta
1055
1056 CompactRangeOptions cro;
1057 cro.exclusive_manual_compaction = exclusive_manual_compaction_;
1058
1059 // Compaction will initiate a trivial move from L0 to L1
1060 dbfull()->CompactRange(cro, nullptr, nullptr);
1061
1062 // File moved From L0 to L1
1063 ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0); // 0 files in L0
1064 ASSERT_EQ(NumTableFilesAtLevel(1, 0), 1); // 1 file in L1
1065
1066 metadata.clear();
1067 db_->GetLiveFilesMetaData(&metadata);
1068 ASSERT_EQ(metadata.size(), 1U);
1069 ASSERT_EQ(metadata[0].name /* level1_file.name */, level0_file.name);
1070 ASSERT_EQ(metadata[0].size /* level1_file.size */, level0_file.size);
1071
1072 for (int i = 0; i < num_keys; i++) {
1073 ASSERT_EQ(Get(Key(i)), values[i]);
1074 }
1075
1076 ASSERT_EQ(trivial_move, 1);
1077 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
1078 }
1079
TEST_P(DBCompactionTestWithParam,TrivialMoveNonOverlappingFiles)1080 TEST_P(DBCompactionTestWithParam, TrivialMoveNonOverlappingFiles) {
1081 int32_t trivial_move = 0;
1082 int32_t non_trivial_move = 0;
1083 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1084 "DBImpl::BackgroundCompaction:TrivialMove",
1085 [&](void* /*arg*/) { trivial_move++; });
1086 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1087 "DBImpl::BackgroundCompaction:NonTrivial",
1088 [&](void* /*arg*/) { non_trivial_move++; });
1089 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1090
1091 Options options = CurrentOptions();
1092 options.disable_auto_compactions = true;
1093 options.write_buffer_size = 10 * 1024 * 1024;
1094 options.max_subcompactions = max_subcompactions_;
1095
1096 DestroyAndReopen(options);
1097 // non overlapping ranges
1098 std::vector<std::pair<int32_t, int32_t>> ranges = {
1099 {100, 199},
1100 {300, 399},
1101 {0, 99},
1102 {200, 299},
1103 {600, 699},
1104 {400, 499},
1105 {500, 550},
1106 {551, 599},
1107 };
1108 int32_t value_size = 10 * 1024; // 10 KB
1109
1110 Random rnd(301);
1111 std::map<int32_t, std::string> values;
1112 for (size_t i = 0; i < ranges.size(); i++) {
1113 for (int32_t j = ranges[i].first; j <= ranges[i].second; j++) {
1114 values[j] = RandomString(&rnd, value_size);
1115 ASSERT_OK(Put(Key(j), values[j]));
1116 }
1117 ASSERT_OK(Flush());
1118 }
1119
1120 int32_t level0_files = NumTableFilesAtLevel(0, 0);
1121 ASSERT_EQ(level0_files, ranges.size()); // Multiple files in L0
1122 ASSERT_EQ(NumTableFilesAtLevel(1, 0), 0); // No files in L1
1123
1124 CompactRangeOptions cro;
1125 cro.exclusive_manual_compaction = exclusive_manual_compaction_;
1126
1127 // Since data is non-overlapping we expect compaction to initiate
1128 // a trivial move
1129 db_->CompactRange(cro, nullptr, nullptr);
1130 // We expect that all the files were trivially moved from L0 to L1
1131 ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0);
1132 ASSERT_EQ(NumTableFilesAtLevel(1, 0) /* level1_files */, level0_files);
1133
1134 for (size_t i = 0; i < ranges.size(); i++) {
1135 for (int32_t j = ranges[i].first; j <= ranges[i].second; j++) {
1136 ASSERT_EQ(Get(Key(j)), values[j]);
1137 }
1138 }
1139
1140 ASSERT_EQ(trivial_move, 1);
1141 ASSERT_EQ(non_trivial_move, 0);
1142
1143 trivial_move = 0;
1144 non_trivial_move = 0;
1145 values.clear();
1146 DestroyAndReopen(options);
1147 // Same ranges as above but overlapping
1148 ranges = {
1149 {100, 199},
1150 {300, 399},
1151 {0, 99},
1152 {200, 299},
1153 {600, 699},
1154 {400, 499},
1155 {500, 560}, // this range overlap with the next one
1156 {551, 599},
1157 };
1158 for (size_t i = 0; i < ranges.size(); i++) {
1159 for (int32_t j = ranges[i].first; j <= ranges[i].second; j++) {
1160 values[j] = RandomString(&rnd, value_size);
1161 ASSERT_OK(Put(Key(j), values[j]));
1162 }
1163 ASSERT_OK(Flush());
1164 }
1165
1166 db_->CompactRange(cro, nullptr, nullptr);
1167
1168 for (size_t i = 0; i < ranges.size(); i++) {
1169 for (int32_t j = ranges[i].first; j <= ranges[i].second; j++) {
1170 ASSERT_EQ(Get(Key(j)), values[j]);
1171 }
1172 }
1173 ASSERT_EQ(trivial_move, 0);
1174 ASSERT_EQ(non_trivial_move, 1);
1175
1176 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
1177 }
1178
TEST_P(DBCompactionTestWithParam,TrivialMoveTargetLevel)1179 TEST_P(DBCompactionTestWithParam, TrivialMoveTargetLevel) {
1180 int32_t trivial_move = 0;
1181 int32_t non_trivial_move = 0;
1182 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1183 "DBImpl::BackgroundCompaction:TrivialMove",
1184 [&](void* /*arg*/) { trivial_move++; });
1185 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1186 "DBImpl::BackgroundCompaction:NonTrivial",
1187 [&](void* /*arg*/) { non_trivial_move++; });
1188 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1189
1190 Options options = CurrentOptions();
1191 options.disable_auto_compactions = true;
1192 options.write_buffer_size = 10 * 1024 * 1024;
1193 options.num_levels = 7;
1194 options.max_subcompactions = max_subcompactions_;
1195
1196 DestroyAndReopen(options);
1197 int32_t value_size = 10 * 1024; // 10 KB
1198
1199 // Add 2 non-overlapping files
1200 Random rnd(301);
1201 std::map<int32_t, std::string> values;
1202
1203 // file 1 [0 => 300]
1204 for (int32_t i = 0; i <= 300; i++) {
1205 values[i] = RandomString(&rnd, value_size);
1206 ASSERT_OK(Put(Key(i), values[i]));
1207 }
1208 ASSERT_OK(Flush());
1209
1210 // file 2 [600 => 700]
1211 for (int32_t i = 600; i <= 700; i++) {
1212 values[i] = RandomString(&rnd, value_size);
1213 ASSERT_OK(Put(Key(i), values[i]));
1214 }
1215 ASSERT_OK(Flush());
1216
1217 // 2 files in L0
1218 ASSERT_EQ("2", FilesPerLevel(0));
1219 CompactRangeOptions compact_options;
1220 compact_options.change_level = true;
1221 compact_options.target_level = 6;
1222 compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
1223 ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
1224 // 2 files in L6
1225 ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel(0));
1226
1227 ASSERT_EQ(trivial_move, 1);
1228 ASSERT_EQ(non_trivial_move, 0);
1229
1230 for (int32_t i = 0; i <= 300; i++) {
1231 ASSERT_EQ(Get(Key(i)), values[i]);
1232 }
1233 for (int32_t i = 600; i <= 700; i++) {
1234 ASSERT_EQ(Get(Key(i)), values[i]);
1235 }
1236 }
1237
TEST_P(DBCompactionTestWithParam,ManualCompactionPartial)1238 TEST_P(DBCompactionTestWithParam, ManualCompactionPartial) {
1239 int32_t trivial_move = 0;
1240 int32_t non_trivial_move = 0;
1241 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1242 "DBImpl::BackgroundCompaction:TrivialMove",
1243 [&](void* /*arg*/) { trivial_move++; });
1244 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1245 "DBImpl::BackgroundCompaction:NonTrivial",
1246 [&](void* /*arg*/) { non_trivial_move++; });
1247 bool first = true;
1248 // Purpose of dependencies:
1249 // 4 -> 1: ensure the order of two non-trivial compactions
1250 // 5 -> 2 and 5 -> 3: ensure we do a check before two non-trivial compactions
1251 // are installed
1252 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
1253 {{"DBCompaction::ManualPartial:4", "DBCompaction::ManualPartial:1"},
1254 {"DBCompaction::ManualPartial:5", "DBCompaction::ManualPartial:2"},
1255 {"DBCompaction::ManualPartial:5", "DBCompaction::ManualPartial:3"}});
1256 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1257 "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* /*arg*/) {
1258 if (first) {
1259 first = false;
1260 TEST_SYNC_POINT("DBCompaction::ManualPartial:4");
1261 TEST_SYNC_POINT("DBCompaction::ManualPartial:3");
1262 } else { // second non-trivial compaction
1263 TEST_SYNC_POINT("DBCompaction::ManualPartial:2");
1264 }
1265 });
1266
1267 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1268
1269 Options options = CurrentOptions();
1270 options.write_buffer_size = 10 * 1024 * 1024;
1271 options.num_levels = 7;
1272 options.max_subcompactions = max_subcompactions_;
1273 options.level0_file_num_compaction_trigger = 3;
1274 options.max_background_compactions = 3;
1275 options.target_file_size_base = 1 << 23; // 8 MB
1276
1277 DestroyAndReopen(options);
1278 int32_t value_size = 10 * 1024; // 10 KB
1279
1280 // Add 2 non-overlapping files
1281 Random rnd(301);
1282 std::map<int32_t, std::string> values;
1283
1284 // file 1 [0 => 100]
1285 for (int32_t i = 0; i < 100; i++) {
1286 values[i] = RandomString(&rnd, value_size);
1287 ASSERT_OK(Put(Key(i), values[i]));
1288 }
1289 ASSERT_OK(Flush());
1290
1291 // file 2 [100 => 300]
1292 for (int32_t i = 100; i < 300; i++) {
1293 values[i] = RandomString(&rnd, value_size);
1294 ASSERT_OK(Put(Key(i), values[i]));
1295 }
1296 ASSERT_OK(Flush());
1297
1298 // 2 files in L0
1299 ASSERT_EQ("2", FilesPerLevel(0));
1300 CompactRangeOptions compact_options;
1301 compact_options.change_level = true;
1302 compact_options.target_level = 6;
1303 compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
1304 // Trivial move the two non-overlapping files to level 6
1305 ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
1306 // 2 files in L6
1307 ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel(0));
1308
1309 ASSERT_EQ(trivial_move, 1);
1310 ASSERT_EQ(non_trivial_move, 0);
1311
1312 // file 3 [ 0 => 200]
1313 for (int32_t i = 0; i < 200; i++) {
1314 values[i] = RandomString(&rnd, value_size);
1315 ASSERT_OK(Put(Key(i), values[i]));
1316 }
1317 ASSERT_OK(Flush());
1318
1319 // 1 files in L0
1320 ASSERT_EQ("1,0,0,0,0,0,2", FilesPerLevel(0));
1321 ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr, false));
1322 ASSERT_OK(dbfull()->TEST_CompactRange(1, nullptr, nullptr, nullptr, false));
1323 ASSERT_OK(dbfull()->TEST_CompactRange(2, nullptr, nullptr, nullptr, false));
1324 ASSERT_OK(dbfull()->TEST_CompactRange(3, nullptr, nullptr, nullptr, false));
1325 ASSERT_OK(dbfull()->TEST_CompactRange(4, nullptr, nullptr, nullptr, false));
1326 // 2 files in L6, 1 file in L5
1327 ASSERT_EQ("0,0,0,0,0,1,2", FilesPerLevel(0));
1328
1329 ASSERT_EQ(trivial_move, 6);
1330 ASSERT_EQ(non_trivial_move, 0);
1331
1332 ROCKSDB_NAMESPACE::port::Thread threads([&] {
1333 compact_options.change_level = false;
1334 compact_options.exclusive_manual_compaction = false;
1335 std::string begin_string = Key(0);
1336 std::string end_string = Key(199);
1337 Slice begin(begin_string);
1338 Slice end(end_string);
1339 // First non-trivial compaction is triggered
1340 ASSERT_OK(db_->CompactRange(compact_options, &begin, &end));
1341 });
1342
1343 TEST_SYNC_POINT("DBCompaction::ManualPartial:1");
1344 // file 4 [300 => 400)
1345 for (int32_t i = 300; i <= 400; i++) {
1346 values[i] = RandomString(&rnd, value_size);
1347 ASSERT_OK(Put(Key(i), values[i]));
1348 }
1349 ASSERT_OK(Flush());
1350
1351 // file 5 [400 => 500)
1352 for (int32_t i = 400; i <= 500; i++) {
1353 values[i] = RandomString(&rnd, value_size);
1354 ASSERT_OK(Put(Key(i), values[i]));
1355 }
1356 ASSERT_OK(Flush());
1357
1358 // file 6 [500 => 600)
1359 for (int32_t i = 500; i <= 600; i++) {
1360 values[i] = RandomString(&rnd, value_size);
1361 ASSERT_OK(Put(Key(i), values[i]));
1362 }
1363 // Second non-trivial compaction is triggered
1364 ASSERT_OK(Flush());
1365
1366 // Before two non-trivial compactions are installed, there are 3 files in L0
1367 ASSERT_EQ("3,0,0,0,0,1,2", FilesPerLevel(0));
1368 TEST_SYNC_POINT("DBCompaction::ManualPartial:5");
1369
1370 dbfull()->TEST_WaitForFlushMemTable();
1371 dbfull()->TEST_WaitForCompact();
1372 // After two non-trivial compactions are installed, there is 1 file in L6, and
1373 // 1 file in L1
1374 ASSERT_EQ("0,1,0,0,0,0,1", FilesPerLevel(0));
1375 threads.join();
1376
1377 for (int32_t i = 0; i < 600; i++) {
1378 ASSERT_EQ(Get(Key(i)), values[i]);
1379 }
1380 }
1381
1382 // Disable as the test is flaky.
TEST_F(DBCompactionTest,DISABLED_ManualPartialFill)1383 TEST_F(DBCompactionTest, DISABLED_ManualPartialFill) {
1384 int32_t trivial_move = 0;
1385 int32_t non_trivial_move = 0;
1386 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1387 "DBImpl::BackgroundCompaction:TrivialMove",
1388 [&](void* /*arg*/) { trivial_move++; });
1389 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1390 "DBImpl::BackgroundCompaction:NonTrivial",
1391 [&](void* /*arg*/) { non_trivial_move++; });
1392 bool first = true;
1393 bool second = true;
1394 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
1395 {{"DBCompaction::PartialFill:4", "DBCompaction::PartialFill:1"},
1396 {"DBCompaction::PartialFill:2", "DBCompaction::PartialFill:3"}});
1397 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1398 "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* /*arg*/) {
1399 if (first) {
1400 TEST_SYNC_POINT("DBCompaction::PartialFill:4");
1401 first = false;
1402 TEST_SYNC_POINT("DBCompaction::PartialFill:3");
1403 } else if (second) {
1404 }
1405 });
1406
1407 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1408
1409 Options options = CurrentOptions();
1410 options.write_buffer_size = 10 * 1024 * 1024;
1411 options.max_bytes_for_level_multiplier = 2;
1412 options.num_levels = 4;
1413 options.level0_file_num_compaction_trigger = 3;
1414 options.max_background_compactions = 3;
1415
1416 DestroyAndReopen(options);
1417 // make sure all background compaction jobs can be scheduled
1418 auto stop_token =
1419 dbfull()->TEST_write_controler().GetCompactionPressureToken();
1420 int32_t value_size = 10 * 1024; // 10 KB
1421
1422 // Add 2 non-overlapping files
1423 Random rnd(301);
1424 std::map<int32_t, std::string> values;
1425
1426 // file 1 [0 => 100]
1427 for (int32_t i = 0; i < 100; i++) {
1428 values[i] = RandomString(&rnd, value_size);
1429 ASSERT_OK(Put(Key(i), values[i]));
1430 }
1431 ASSERT_OK(Flush());
1432
1433 // file 2 [100 => 300]
1434 for (int32_t i = 100; i < 300; i++) {
1435 values[i] = RandomString(&rnd, value_size);
1436 ASSERT_OK(Put(Key(i), values[i]));
1437 }
1438 ASSERT_OK(Flush());
1439
1440 // 2 files in L0
1441 ASSERT_EQ("2", FilesPerLevel(0));
1442 CompactRangeOptions compact_options;
1443 compact_options.change_level = true;
1444 compact_options.target_level = 2;
1445 ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
1446 // 2 files in L2
1447 ASSERT_EQ("0,0,2", FilesPerLevel(0));
1448
1449 ASSERT_EQ(trivial_move, 1);
1450 ASSERT_EQ(non_trivial_move, 0);
1451
1452 // file 3 [ 0 => 200]
1453 for (int32_t i = 0; i < 200; i++) {
1454 values[i] = RandomString(&rnd, value_size);
1455 ASSERT_OK(Put(Key(i), values[i]));
1456 }
1457 ASSERT_OK(Flush());
1458
1459 // 2 files in L2, 1 in L0
1460 ASSERT_EQ("1,0,2", FilesPerLevel(0));
1461 ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr, false));
1462 // 2 files in L2, 1 in L1
1463 ASSERT_EQ("0,1,2", FilesPerLevel(0));
1464
1465 ASSERT_EQ(trivial_move, 2);
1466 ASSERT_EQ(non_trivial_move, 0);
1467
1468 ROCKSDB_NAMESPACE::port::Thread threads([&] {
1469 compact_options.change_level = false;
1470 compact_options.exclusive_manual_compaction = false;
1471 std::string begin_string = Key(0);
1472 std::string end_string = Key(199);
1473 Slice begin(begin_string);
1474 Slice end(end_string);
1475 ASSERT_OK(db_->CompactRange(compact_options, &begin, &end));
1476 });
1477
1478 TEST_SYNC_POINT("DBCompaction::PartialFill:1");
1479 // Many files 4 [300 => 4300)
1480 for (int32_t i = 0; i <= 5; i++) {
1481 for (int32_t j = 300; j < 4300; j++) {
1482 if (j == 2300) {
1483 ASSERT_OK(Flush());
1484 dbfull()->TEST_WaitForFlushMemTable();
1485 }
1486 values[j] = RandomString(&rnd, value_size);
1487 ASSERT_OK(Put(Key(j), values[j]));
1488 }
1489 }
1490
1491 // Verify level sizes
1492 uint64_t target_size = 4 * options.max_bytes_for_level_base;
1493 for (int32_t i = 1; i < options.num_levels; i++) {
1494 ASSERT_LE(SizeAtLevel(i), target_size);
1495 target_size = static_cast<uint64_t>(target_size *
1496 options.max_bytes_for_level_multiplier);
1497 }
1498
1499 TEST_SYNC_POINT("DBCompaction::PartialFill:2");
1500 dbfull()->TEST_WaitForFlushMemTable();
1501 dbfull()->TEST_WaitForCompact();
1502 threads.join();
1503
1504 for (int32_t i = 0; i < 4300; i++) {
1505 ASSERT_EQ(Get(Key(i)), values[i]);
1506 }
1507 }
1508
TEST_F(DBCompactionTest,ManualCompactionWithUnorderedWrite)1509 TEST_F(DBCompactionTest, ManualCompactionWithUnorderedWrite) {
1510 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
1511 {{"DBImpl::WriteImpl:UnorderedWriteAfterWriteWAL",
1512 "DBCompactionTest::ManualCompactionWithUnorderedWrite:WaitWriteWAL"},
1513 {"DBImpl::WaitForPendingWrites:BeforeBlock",
1514 "DBImpl::WriteImpl:BeforeUnorderedWriteMemtable"}});
1515
1516 Options options = CurrentOptions();
1517 options.unordered_write = true;
1518 DestroyAndReopen(options);
1519 Put("foo", "v1");
1520 ASSERT_OK(Flush());
1521
1522 Put("bar", "v1");
1523 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1524 port::Thread writer([&]() { Put("foo", "v2"); });
1525
1526 TEST_SYNC_POINT(
1527 "DBCompactionTest::ManualCompactionWithUnorderedWrite:WaitWriteWAL");
1528 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
1529
1530 writer.join();
1531 ASSERT_EQ(Get("foo"), "v2");
1532
1533 SyncPoint::GetInstance()->DisableProcessing();
1534 SyncPoint::GetInstance()->ClearAllCallBacks();
1535
1536 Reopen(options);
1537 ASSERT_EQ(Get("foo"), "v2");
1538 }
1539
TEST_F(DBCompactionTest,DeleteFileRange)1540 TEST_F(DBCompactionTest, DeleteFileRange) {
1541 Options options = CurrentOptions();
1542 options.write_buffer_size = 10 * 1024 * 1024;
1543 options.max_bytes_for_level_multiplier = 2;
1544 options.num_levels = 4;
1545 options.level0_file_num_compaction_trigger = 3;
1546 options.max_background_compactions = 3;
1547
1548 DestroyAndReopen(options);
1549 int32_t value_size = 10 * 1024; // 10 KB
1550
1551 // Add 2 non-overlapping files
1552 Random rnd(301);
1553 std::map<int32_t, std::string> values;
1554
1555 // file 1 [0 => 100]
1556 for (int32_t i = 0; i < 100; i++) {
1557 values[i] = RandomString(&rnd, value_size);
1558 ASSERT_OK(Put(Key(i), values[i]));
1559 }
1560 ASSERT_OK(Flush());
1561
1562 // file 2 [100 => 300]
1563 for (int32_t i = 100; i < 300; i++) {
1564 values[i] = RandomString(&rnd, value_size);
1565 ASSERT_OK(Put(Key(i), values[i]));
1566 }
1567 ASSERT_OK(Flush());
1568
1569 // 2 files in L0
1570 ASSERT_EQ("2", FilesPerLevel(0));
1571 CompactRangeOptions compact_options;
1572 compact_options.change_level = true;
1573 compact_options.target_level = 2;
1574 ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
1575 // 2 files in L2
1576 ASSERT_EQ("0,0,2", FilesPerLevel(0));
1577
1578 // file 3 [ 0 => 200]
1579 for (int32_t i = 0; i < 200; i++) {
1580 values[i] = RandomString(&rnd, value_size);
1581 ASSERT_OK(Put(Key(i), values[i]));
1582 }
1583 ASSERT_OK(Flush());
1584
1585 // Many files 4 [300 => 4300)
1586 for (int32_t i = 0; i <= 5; i++) {
1587 for (int32_t j = 300; j < 4300; j++) {
1588 if (j == 2300) {
1589 ASSERT_OK(Flush());
1590 dbfull()->TEST_WaitForFlushMemTable();
1591 }
1592 values[j] = RandomString(&rnd, value_size);
1593 ASSERT_OK(Put(Key(j), values[j]));
1594 }
1595 }
1596 ASSERT_OK(Flush());
1597 dbfull()->TEST_WaitForFlushMemTable();
1598 dbfull()->TEST_WaitForCompact();
1599
1600 // Verify level sizes
1601 uint64_t target_size = 4 * options.max_bytes_for_level_base;
1602 for (int32_t i = 1; i < options.num_levels; i++) {
1603 ASSERT_LE(SizeAtLevel(i), target_size);
1604 target_size = static_cast<uint64_t>(target_size *
1605 options.max_bytes_for_level_multiplier);
1606 }
1607
1608 size_t old_num_files = CountFiles();
1609 std::string begin_string = Key(1000);
1610 std::string end_string = Key(2000);
1611 Slice begin(begin_string);
1612 Slice end(end_string);
1613 ASSERT_OK(DeleteFilesInRange(db_, db_->DefaultColumnFamily(), &begin, &end));
1614
1615 int32_t deleted_count = 0;
1616 for (int32_t i = 0; i < 4300; i++) {
1617 if (i < 1000 || i > 2000) {
1618 ASSERT_EQ(Get(Key(i)), values[i]);
1619 } else {
1620 ReadOptions roptions;
1621 std::string result;
1622 Status s = db_->Get(roptions, Key(i), &result);
1623 ASSERT_TRUE(s.IsNotFound() || s.ok());
1624 if (s.IsNotFound()) {
1625 deleted_count++;
1626 }
1627 }
1628 }
1629 ASSERT_GT(deleted_count, 0);
1630 begin_string = Key(5000);
1631 end_string = Key(6000);
1632 Slice begin1(begin_string);
1633 Slice end1(end_string);
1634 // Try deleting files in range which contain no keys
1635 ASSERT_OK(
1636 DeleteFilesInRange(db_, db_->DefaultColumnFamily(), &begin1, &end1));
1637
1638 // Push data from level 0 to level 1 to force all data to be deleted
1639 // Note that we don't delete level 0 files
1640 compact_options.change_level = true;
1641 compact_options.target_level = 1;
1642 ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
1643 dbfull()->TEST_WaitForCompact();
1644
1645 ASSERT_OK(
1646 DeleteFilesInRange(db_, db_->DefaultColumnFamily(), nullptr, nullptr));
1647
1648 int32_t deleted_count2 = 0;
1649 for (int32_t i = 0; i < 4300; i++) {
1650 ReadOptions roptions;
1651 std::string result;
1652 Status s = db_->Get(roptions, Key(i), &result);
1653 ASSERT_TRUE(s.IsNotFound());
1654 deleted_count2++;
1655 }
1656 ASSERT_GT(deleted_count2, deleted_count);
1657 size_t new_num_files = CountFiles();
1658 ASSERT_GT(old_num_files, new_num_files);
1659 }
1660
TEST_F(DBCompactionTest,DeleteFilesInRanges)1661 TEST_F(DBCompactionTest, DeleteFilesInRanges) {
1662 Options options = CurrentOptions();
1663 options.write_buffer_size = 10 * 1024 * 1024;
1664 options.max_bytes_for_level_multiplier = 2;
1665 options.num_levels = 4;
1666 options.max_background_compactions = 3;
1667 options.disable_auto_compactions = true;
1668
1669 DestroyAndReopen(options);
1670 int32_t value_size = 10 * 1024; // 10 KB
1671
1672 Random rnd(301);
1673 std::map<int32_t, std::string> values;
1674
1675 // file [0 => 100), [100 => 200), ... [900, 1000)
1676 for (auto i = 0; i < 10; i++) {
1677 for (auto j = 0; j < 100; j++) {
1678 auto k = i * 100 + j;
1679 values[k] = RandomString(&rnd, value_size);
1680 ASSERT_OK(Put(Key(k), values[k]));
1681 }
1682 ASSERT_OK(Flush());
1683 }
1684 ASSERT_EQ("10", FilesPerLevel(0));
1685 CompactRangeOptions compact_options;
1686 compact_options.change_level = true;
1687 compact_options.target_level = 2;
1688 ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
1689 ASSERT_EQ("0,0,10", FilesPerLevel(0));
1690
1691 // file [0 => 100), [200 => 300), ... [800, 900)
1692 for (auto i = 0; i < 10; i+=2) {
1693 for (auto j = 0; j < 100; j++) {
1694 auto k = i * 100 + j;
1695 ASSERT_OK(Put(Key(k), values[k]));
1696 }
1697 ASSERT_OK(Flush());
1698 }
1699 ASSERT_EQ("5,0,10", FilesPerLevel(0));
1700 ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr));
1701 ASSERT_EQ("0,5,10", FilesPerLevel(0));
1702
1703 // Delete files in range [0, 299] (inclusive)
1704 {
1705 auto begin_str1 = Key(0), end_str1 = Key(100);
1706 auto begin_str2 = Key(100), end_str2 = Key(200);
1707 auto begin_str3 = Key(200), end_str3 = Key(299);
1708 Slice begin1(begin_str1), end1(end_str1);
1709 Slice begin2(begin_str2), end2(end_str2);
1710 Slice begin3(begin_str3), end3(end_str3);
1711 std::vector<RangePtr> ranges;
1712 ranges.push_back(RangePtr(&begin1, &end1));
1713 ranges.push_back(RangePtr(&begin2, &end2));
1714 ranges.push_back(RangePtr(&begin3, &end3));
1715 ASSERT_OK(DeleteFilesInRanges(db_, db_->DefaultColumnFamily(),
1716 ranges.data(), ranges.size()));
1717 ASSERT_EQ("0,3,7", FilesPerLevel(0));
1718
1719 // Keys [0, 300) should not exist.
1720 for (auto i = 0; i < 300; i++) {
1721 ReadOptions ropts;
1722 std::string result;
1723 auto s = db_->Get(ropts, Key(i), &result);
1724 ASSERT_TRUE(s.IsNotFound());
1725 }
1726 for (auto i = 300; i < 1000; i++) {
1727 ASSERT_EQ(Get(Key(i)), values[i]);
1728 }
1729 }
1730
1731 // Delete files in range [600, 999) (exclusive)
1732 {
1733 auto begin_str1 = Key(600), end_str1 = Key(800);
1734 auto begin_str2 = Key(700), end_str2 = Key(900);
1735 auto begin_str3 = Key(800), end_str3 = Key(999);
1736 Slice begin1(begin_str1), end1(end_str1);
1737 Slice begin2(begin_str2), end2(end_str2);
1738 Slice begin3(begin_str3), end3(end_str3);
1739 std::vector<RangePtr> ranges;
1740 ranges.push_back(RangePtr(&begin1, &end1));
1741 ranges.push_back(RangePtr(&begin2, &end2));
1742 ranges.push_back(RangePtr(&begin3, &end3));
1743 ASSERT_OK(DeleteFilesInRanges(db_, db_->DefaultColumnFamily(),
1744 ranges.data(), ranges.size(), false));
1745 ASSERT_EQ("0,1,4", FilesPerLevel(0));
1746
1747 // Keys [600, 900) should not exist.
1748 for (auto i = 600; i < 900; i++) {
1749 ReadOptions ropts;
1750 std::string result;
1751 auto s = db_->Get(ropts, Key(i), &result);
1752 ASSERT_TRUE(s.IsNotFound());
1753 }
1754 for (auto i = 300; i < 600; i++) {
1755 ASSERT_EQ(Get(Key(i)), values[i]);
1756 }
1757 for (auto i = 900; i < 1000; i++) {
1758 ASSERT_EQ(Get(Key(i)), values[i]);
1759 }
1760 }
1761
1762 // Delete all files.
1763 {
1764 RangePtr range;
1765 ASSERT_OK(DeleteFilesInRanges(db_, db_->DefaultColumnFamily(), &range, 1));
1766 ASSERT_EQ("", FilesPerLevel(0));
1767
1768 for (auto i = 0; i < 1000; i++) {
1769 ReadOptions ropts;
1770 std::string result;
1771 auto s = db_->Get(ropts, Key(i), &result);
1772 ASSERT_TRUE(s.IsNotFound());
1773 }
1774 }
1775 }
1776
TEST_F(DBCompactionTest,DeleteFileRangeFileEndpointsOverlapBug)1777 TEST_F(DBCompactionTest, DeleteFileRangeFileEndpointsOverlapBug) {
1778 // regression test for #2833: groups of files whose user-keys overlap at the
1779 // endpoints could be split by `DeleteFilesInRange`. This caused old data to
1780 // reappear, either because a new version of the key was removed, or a range
1781 // deletion was partially dropped. It could also cause non-overlapping
1782 // invariant to be violated if the files dropped by DeleteFilesInRange were
1783 // a subset of files that a range deletion spans.
1784 const int kNumL0Files = 2;
1785 const int kValSize = 8 << 10; // 8KB
1786 Options options = CurrentOptions();
1787 options.level0_file_num_compaction_trigger = kNumL0Files;
1788 options.target_file_size_base = 1 << 10; // 1KB
1789 DestroyAndReopen(options);
1790
1791 // The snapshot prevents key 1 from having its old version dropped. The low
1792 // `target_file_size_base` ensures two keys will be in each output file.
1793 const Snapshot* snapshot = nullptr;
1794 Random rnd(301);
1795 // The value indicates which flush the key belonged to, which is enough
1796 // for us to determine the keys' relative ages. After L0 flushes finish,
1797 // files look like:
1798 //
1799 // File 0: 0 -> vals[0], 1 -> vals[0]
1800 // File 1: 1 -> vals[1], 2 -> vals[1]
1801 //
1802 // Then L0->L1 compaction happens, which outputs keys as follows:
1803 //
1804 // File 0: 0 -> vals[0], 1 -> vals[1]
1805 // File 1: 1 -> vals[0], 2 -> vals[1]
1806 //
1807 // DeleteFilesInRange shouldn't be allowed to drop just file 0, as that
1808 // would cause `1 -> vals[0]` (an older key) to reappear.
1809 std::string vals[kNumL0Files];
1810 for (int i = 0; i < kNumL0Files; ++i) {
1811 vals[i] = RandomString(&rnd, kValSize);
1812 Put(Key(i), vals[i]);
1813 Put(Key(i + 1), vals[i]);
1814 Flush();
1815 if (i == 0) {
1816 snapshot = db_->GetSnapshot();
1817 }
1818 }
1819 dbfull()->TEST_WaitForCompact();
1820
1821 // Verify `DeleteFilesInRange` can't drop only file 0 which would cause
1822 // "1 -> vals[0]" to reappear.
1823 std::string begin_str = Key(0), end_str = Key(1);
1824 Slice begin = begin_str, end = end_str;
1825 ASSERT_OK(DeleteFilesInRange(db_, db_->DefaultColumnFamily(), &begin, &end));
1826 ASSERT_EQ(vals[1], Get(Key(1)));
1827
1828 db_->ReleaseSnapshot(snapshot);
1829 }
1830
TEST_P(DBCompactionTestWithParam,TrivialMoveToLastLevelWithFiles)1831 TEST_P(DBCompactionTestWithParam, TrivialMoveToLastLevelWithFiles) {
1832 int32_t trivial_move = 0;
1833 int32_t non_trivial_move = 0;
1834 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1835 "DBImpl::BackgroundCompaction:TrivialMove",
1836 [&](void* /*arg*/) { trivial_move++; });
1837 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1838 "DBImpl::BackgroundCompaction:NonTrivial",
1839 [&](void* /*arg*/) { non_trivial_move++; });
1840 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1841
1842 Options options = CurrentOptions();
1843 options.write_buffer_size = 100000000;
1844 options.max_subcompactions = max_subcompactions_;
1845 DestroyAndReopen(options);
1846
1847 int32_t value_size = 10 * 1024; // 10 KB
1848
1849 Random rnd(301);
1850 std::vector<std::string> values;
1851 // File with keys [ 0 => 99 ]
1852 for (int i = 0; i < 100; i++) {
1853 values.push_back(RandomString(&rnd, value_size));
1854 ASSERT_OK(Put(Key(i), values[i]));
1855 }
1856 ASSERT_OK(Flush());
1857
1858 ASSERT_EQ("1", FilesPerLevel(0));
1859 // Compaction will do L0=>L1 (trivial move) then move L1 files to L3
1860 CompactRangeOptions compact_options;
1861 compact_options.change_level = true;
1862 compact_options.target_level = 3;
1863 compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
1864 ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
1865 ASSERT_EQ("0,0,0,1", FilesPerLevel(0));
1866 ASSERT_EQ(trivial_move, 1);
1867 ASSERT_EQ(non_trivial_move, 0);
1868
1869 // File with keys [ 100 => 199 ]
1870 for (int i = 100; i < 200; i++) {
1871 values.push_back(RandomString(&rnd, value_size));
1872 ASSERT_OK(Put(Key(i), values[i]));
1873 }
1874 ASSERT_OK(Flush());
1875
1876 ASSERT_EQ("1,0,0,1", FilesPerLevel(0));
1877 CompactRangeOptions cro;
1878 cro.exclusive_manual_compaction = exclusive_manual_compaction_;
1879 // Compaction will do L0=>L1 L1=>L2 L2=>L3 (3 trivial moves)
1880 ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
1881 ASSERT_EQ("0,0,0,2", FilesPerLevel(0));
1882 ASSERT_EQ(trivial_move, 4);
1883 ASSERT_EQ(non_trivial_move, 0);
1884
1885 for (int i = 0; i < 200; i++) {
1886 ASSERT_EQ(Get(Key(i)), values[i]);
1887 }
1888
1889 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
1890 }
1891
TEST_P(DBCompactionTestWithParam,LevelCompactionThirdPath)1892 TEST_P(DBCompactionTestWithParam, LevelCompactionThirdPath) {
1893 Options options = CurrentOptions();
1894 options.db_paths.emplace_back(dbname_, 500 * 1024);
1895 options.db_paths.emplace_back(dbname_ + "_2", 4 * 1024 * 1024);
1896 options.db_paths.emplace_back(dbname_ + "_3", 1024 * 1024 * 1024);
1897 options.memtable_factory.reset(
1898 new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
1899 options.compaction_style = kCompactionStyleLevel;
1900 options.write_buffer_size = 110 << 10; // 110KB
1901 options.arena_block_size = 4 << 10;
1902 options.level0_file_num_compaction_trigger = 2;
1903 options.num_levels = 4;
1904 options.max_bytes_for_level_base = 400 * 1024;
1905 options.max_subcompactions = max_subcompactions_;
1906 // options = CurrentOptions(options);
1907
1908 std::vector<std::string> filenames;
1909 env_->GetChildren(options.db_paths[1].path, &filenames);
1910 // Delete archival files.
1911 for (size_t i = 0; i < filenames.size(); ++i) {
1912 env_->DeleteFile(options.db_paths[1].path + "/" + filenames[i]);
1913 }
1914 env_->DeleteDir(options.db_paths[1].path);
1915 Reopen(options);
1916
1917 Random rnd(301);
1918 int key_idx = 0;
1919
1920 // First three 110KB files are not going to second path.
1921 // After that, (100K, 200K)
1922 for (int num = 0; num < 3; num++) {
1923 GenerateNewFile(&rnd, &key_idx);
1924 }
1925
1926 // Another 110KB triggers a compaction to 400K file to fill up first path
1927 GenerateNewFile(&rnd, &key_idx);
1928 ASSERT_EQ(3, GetSstFileCount(options.db_paths[1].path));
1929
1930 // (1, 4)
1931 GenerateNewFile(&rnd, &key_idx);
1932 ASSERT_EQ("1,4", FilesPerLevel(0));
1933 ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path));
1934 ASSERT_EQ(1, GetSstFileCount(dbname_));
1935
1936 // (1, 4, 1)
1937 GenerateNewFile(&rnd, &key_idx);
1938 ASSERT_EQ("1,4,1", FilesPerLevel(0));
1939 ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path));
1940 ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path));
1941 ASSERT_EQ(1, GetSstFileCount(dbname_));
1942
1943 // (1, 4, 2)
1944 GenerateNewFile(&rnd, &key_idx);
1945 ASSERT_EQ("1,4,2", FilesPerLevel(0));
1946 ASSERT_EQ(2, GetSstFileCount(options.db_paths[2].path));
1947 ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path));
1948 ASSERT_EQ(1, GetSstFileCount(dbname_));
1949
1950 // (1, 4, 3)
1951 GenerateNewFile(&rnd, &key_idx);
1952 ASSERT_EQ("1,4,3", FilesPerLevel(0));
1953 ASSERT_EQ(3, GetSstFileCount(options.db_paths[2].path));
1954 ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path));
1955 ASSERT_EQ(1, GetSstFileCount(dbname_));
1956
1957 // (1, 4, 4)
1958 GenerateNewFile(&rnd, &key_idx);
1959 ASSERT_EQ("1,4,4", FilesPerLevel(0));
1960 ASSERT_EQ(4, GetSstFileCount(options.db_paths[2].path));
1961 ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path));
1962 ASSERT_EQ(1, GetSstFileCount(dbname_));
1963
1964 // (1, 4, 5)
1965 GenerateNewFile(&rnd, &key_idx);
1966 ASSERT_EQ("1,4,5", FilesPerLevel(0));
1967 ASSERT_EQ(5, GetSstFileCount(options.db_paths[2].path));
1968 ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path));
1969 ASSERT_EQ(1, GetSstFileCount(dbname_));
1970
1971 // (1, 4, 6)
1972 GenerateNewFile(&rnd, &key_idx);
1973 ASSERT_EQ("1,4,6", FilesPerLevel(0));
1974 ASSERT_EQ(6, GetSstFileCount(options.db_paths[2].path));
1975 ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path));
1976 ASSERT_EQ(1, GetSstFileCount(dbname_));
1977
1978 // (1, 4, 7)
1979 GenerateNewFile(&rnd, &key_idx);
1980 ASSERT_EQ("1,4,7", FilesPerLevel(0));
1981 ASSERT_EQ(7, GetSstFileCount(options.db_paths[2].path));
1982 ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path));
1983 ASSERT_EQ(1, GetSstFileCount(dbname_));
1984
1985 // (1, 4, 8)
1986 GenerateNewFile(&rnd, &key_idx);
1987 ASSERT_EQ("1,4,8", FilesPerLevel(0));
1988 ASSERT_EQ(8, GetSstFileCount(options.db_paths[2].path));
1989 ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path));
1990 ASSERT_EQ(1, GetSstFileCount(dbname_));
1991
1992 for (int i = 0; i < key_idx; i++) {
1993 auto v = Get(Key(i));
1994 ASSERT_NE(v, "NOT_FOUND");
1995 ASSERT_TRUE(v.size() == 1 || v.size() == 990);
1996 }
1997
1998 Reopen(options);
1999
2000 for (int i = 0; i < key_idx; i++) {
2001 auto v = Get(Key(i));
2002 ASSERT_NE(v, "NOT_FOUND");
2003 ASSERT_TRUE(v.size() == 1 || v.size() == 990);
2004 }
2005
2006 Destroy(options);
2007 }
2008
TEST_P(DBCompactionTestWithParam,LevelCompactionPathUse)2009 TEST_P(DBCompactionTestWithParam, LevelCompactionPathUse) {
2010 Options options = CurrentOptions();
2011 options.db_paths.emplace_back(dbname_, 500 * 1024);
2012 options.db_paths.emplace_back(dbname_ + "_2", 4 * 1024 * 1024);
2013 options.db_paths.emplace_back(dbname_ + "_3", 1024 * 1024 * 1024);
2014 options.memtable_factory.reset(
2015 new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
2016 options.compaction_style = kCompactionStyleLevel;
2017 options.write_buffer_size = 110 << 10; // 110KB
2018 options.arena_block_size = 4 << 10;
2019 options.level0_file_num_compaction_trigger = 2;
2020 options.num_levels = 4;
2021 options.max_bytes_for_level_base = 400 * 1024;
2022 options.max_subcompactions = max_subcompactions_;
2023 // options = CurrentOptions(options);
2024
2025 std::vector<std::string> filenames;
2026 env_->GetChildren(options.db_paths[1].path, &filenames);
2027 // Delete archival files.
2028 for (size_t i = 0; i < filenames.size(); ++i) {
2029 env_->DeleteFile(options.db_paths[1].path + "/" + filenames[i]);
2030 }
2031 env_->DeleteDir(options.db_paths[1].path);
2032 Reopen(options);
2033
2034 Random rnd(301);
2035 int key_idx = 0;
2036
2037 // Always gets compacted into 1 Level1 file,
2038 // 0/1 Level 0 file
2039 for (int num = 0; num < 3; num++) {
2040 key_idx = 0;
2041 GenerateNewFile(&rnd, &key_idx);
2042 }
2043
2044 key_idx = 0;
2045 GenerateNewFile(&rnd, &key_idx);
2046 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2047
2048 key_idx = 0;
2049 GenerateNewFile(&rnd, &key_idx);
2050 ASSERT_EQ("1,1", FilesPerLevel(0));
2051 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2052 ASSERT_EQ(1, GetSstFileCount(dbname_));
2053
2054 key_idx = 0;
2055 GenerateNewFile(&rnd, &key_idx);
2056 ASSERT_EQ("0,1", FilesPerLevel(0));
2057 ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path));
2058 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2059 ASSERT_EQ(0, GetSstFileCount(dbname_));
2060
2061 key_idx = 0;
2062 GenerateNewFile(&rnd, &key_idx);
2063 ASSERT_EQ("1,1", FilesPerLevel(0));
2064 ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path));
2065 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2066 ASSERT_EQ(1, GetSstFileCount(dbname_));
2067
2068 key_idx = 0;
2069 GenerateNewFile(&rnd, &key_idx);
2070 ASSERT_EQ("0,1", FilesPerLevel(0));
2071 ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path));
2072 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2073 ASSERT_EQ(0, GetSstFileCount(dbname_));
2074
2075 key_idx = 0;
2076 GenerateNewFile(&rnd, &key_idx);
2077 ASSERT_EQ("1,1", FilesPerLevel(0));
2078 ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path));
2079 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2080 ASSERT_EQ(1, GetSstFileCount(dbname_));
2081
2082 key_idx = 0;
2083 GenerateNewFile(&rnd, &key_idx);
2084 ASSERT_EQ("0,1", FilesPerLevel(0));
2085 ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path));
2086 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2087 ASSERT_EQ(0, GetSstFileCount(dbname_));
2088
2089 key_idx = 0;
2090 GenerateNewFile(&rnd, &key_idx);
2091 ASSERT_EQ("1,1", FilesPerLevel(0));
2092 ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path));
2093 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2094 ASSERT_EQ(1, GetSstFileCount(dbname_));
2095
2096 key_idx = 0;
2097 GenerateNewFile(&rnd, &key_idx);
2098 ASSERT_EQ("0,1", FilesPerLevel(0));
2099 ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path));
2100 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2101 ASSERT_EQ(0, GetSstFileCount(dbname_));
2102
2103 key_idx = 0;
2104 GenerateNewFile(&rnd, &key_idx);
2105 ASSERT_EQ("1,1", FilesPerLevel(0));
2106 ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path));
2107 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2108 ASSERT_EQ(1, GetSstFileCount(dbname_));
2109
2110 for (int i = 0; i < key_idx; i++) {
2111 auto v = Get(Key(i));
2112 ASSERT_NE(v, "NOT_FOUND");
2113 ASSERT_TRUE(v.size() == 1 || v.size() == 990);
2114 }
2115
2116 Reopen(options);
2117
2118 for (int i = 0; i < key_idx; i++) {
2119 auto v = Get(Key(i));
2120 ASSERT_NE(v, "NOT_FOUND");
2121 ASSERT_TRUE(v.size() == 1 || v.size() == 990);
2122 }
2123
2124 Destroy(options);
2125 }
2126
TEST_P(DBCompactionTestWithParam,LevelCompactionCFPathUse)2127 TEST_P(DBCompactionTestWithParam, LevelCompactionCFPathUse) {
2128 Options options = CurrentOptions();
2129 options.db_paths.emplace_back(dbname_, 500 * 1024);
2130 options.db_paths.emplace_back(dbname_ + "_2", 4 * 1024 * 1024);
2131 options.db_paths.emplace_back(dbname_ + "_3", 1024 * 1024 * 1024);
2132 options.memtable_factory.reset(
2133 new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
2134 options.compaction_style = kCompactionStyleLevel;
2135 options.write_buffer_size = 110 << 10; // 110KB
2136 options.arena_block_size = 4 << 10;
2137 options.level0_file_num_compaction_trigger = 2;
2138 options.num_levels = 4;
2139 options.max_bytes_for_level_base = 400 * 1024;
2140 options.max_subcompactions = max_subcompactions_;
2141
2142 std::vector<Options> option_vector;
2143 option_vector.emplace_back(options);
2144 ColumnFamilyOptions cf_opt1(options), cf_opt2(options);
2145 // Configure CF1 specific paths.
2146 cf_opt1.cf_paths.emplace_back(dbname_ + "cf1", 500 * 1024);
2147 cf_opt1.cf_paths.emplace_back(dbname_ + "cf1_2", 4 * 1024 * 1024);
2148 cf_opt1.cf_paths.emplace_back(dbname_ + "cf1_3", 1024 * 1024 * 1024);
2149 option_vector.emplace_back(DBOptions(options), cf_opt1);
2150 CreateColumnFamilies({"one"},option_vector[1]);
2151
2152 // Configura CF2 specific paths.
2153 cf_opt2.cf_paths.emplace_back(dbname_ + "cf2", 500 * 1024);
2154 cf_opt2.cf_paths.emplace_back(dbname_ + "cf2_2", 4 * 1024 * 1024);
2155 cf_opt2.cf_paths.emplace_back(dbname_ + "cf2_3", 1024 * 1024 * 1024);
2156 option_vector.emplace_back(DBOptions(options), cf_opt2);
2157 CreateColumnFamilies({"two"},option_vector[2]);
2158
2159 ReopenWithColumnFamilies({"default", "one", "two"}, option_vector);
2160
2161 Random rnd(301);
2162 int key_idx = 0;
2163 int key_idx1 = 0;
2164 int key_idx2 = 0;
2165
2166 auto generate_file = [&]() {
2167 GenerateNewFile(0, &rnd, &key_idx);
2168 GenerateNewFile(1, &rnd, &key_idx1);
2169 GenerateNewFile(2, &rnd, &key_idx2);
2170 };
2171
2172 auto check_sstfilecount = [&](int path_id, int expected) {
2173 ASSERT_EQ(expected, GetSstFileCount(options.db_paths[path_id].path));
2174 ASSERT_EQ(expected, GetSstFileCount(cf_opt1.cf_paths[path_id].path));
2175 ASSERT_EQ(expected, GetSstFileCount(cf_opt2.cf_paths[path_id].path));
2176 };
2177
2178 auto check_filesperlevel = [&](const std::string& expected) {
2179 ASSERT_EQ(expected, FilesPerLevel(0));
2180 ASSERT_EQ(expected, FilesPerLevel(1));
2181 ASSERT_EQ(expected, FilesPerLevel(2));
2182 };
2183
2184 auto check_getvalues = [&]() {
2185 for (int i = 0; i < key_idx; i++) {
2186 auto v = Get(0, Key(i));
2187 ASSERT_NE(v, "NOT_FOUND");
2188 ASSERT_TRUE(v.size() == 1 || v.size() == 990);
2189 }
2190
2191 for (int i = 0; i < key_idx1; i++) {
2192 auto v = Get(1, Key(i));
2193 ASSERT_NE(v, "NOT_FOUND");
2194 ASSERT_TRUE(v.size() == 1 || v.size() == 990);
2195 }
2196
2197 for (int i = 0; i < key_idx2; i++) {
2198 auto v = Get(2, Key(i));
2199 ASSERT_NE(v, "NOT_FOUND");
2200 ASSERT_TRUE(v.size() == 1 || v.size() == 990);
2201 }
2202 };
2203
2204 // Check that default column family uses db_paths.
2205 // And Column family "one" uses cf_paths.
2206
2207 // First three 110KB files are not going to second path.
2208 // After that, (100K, 200K)
2209 for (int num = 0; num < 3; num++) {
2210 generate_file();
2211 }
2212
2213 // Another 110KB triggers a compaction to 400K file to fill up first path
2214 generate_file();
2215 check_sstfilecount(1, 3);
2216
2217 // (1, 4)
2218 generate_file();
2219 check_filesperlevel("1,4");
2220 check_sstfilecount(1, 4);
2221 check_sstfilecount(0, 1);
2222
2223 // (1, 4, 1)
2224 generate_file();
2225 check_filesperlevel("1,4,1");
2226 check_sstfilecount(2, 1);
2227 check_sstfilecount(1, 4);
2228 check_sstfilecount(0, 1);
2229
2230 // (1, 4, 2)
2231 generate_file();
2232 check_filesperlevel("1,4,2");
2233 check_sstfilecount(2, 2);
2234 check_sstfilecount(1, 4);
2235 check_sstfilecount(0, 1);
2236
2237 check_getvalues();
2238
2239 ReopenWithColumnFamilies({"default", "one", "two"}, option_vector);
2240
2241 check_getvalues();
2242
2243 Destroy(options, true);
2244 }
2245
TEST_P(DBCompactionTestWithParam,ConvertCompactionStyle)2246 TEST_P(DBCompactionTestWithParam, ConvertCompactionStyle) {
2247 Random rnd(301);
2248 int max_key_level_insert = 200;
2249 int max_key_universal_insert = 600;
2250
2251 // Stage 1: generate a db with level compaction
2252 Options options = CurrentOptions();
2253 options.write_buffer_size = 110 << 10; // 110KB
2254 options.arena_block_size = 4 << 10;
2255 options.num_levels = 4;
2256 options.level0_file_num_compaction_trigger = 3;
2257 options.max_bytes_for_level_base = 500 << 10; // 500KB
2258 options.max_bytes_for_level_multiplier = 1;
2259 options.target_file_size_base = 200 << 10; // 200KB
2260 options.target_file_size_multiplier = 1;
2261 options.max_subcompactions = max_subcompactions_;
2262 CreateAndReopenWithCF({"pikachu"}, options);
2263
2264 for (int i = 0; i <= max_key_level_insert; i++) {
2265 // each value is 10K
2266 ASSERT_OK(Put(1, Key(i), RandomString(&rnd, 10000)));
2267 }
2268 ASSERT_OK(Flush(1));
2269 dbfull()->TEST_WaitForCompact();
2270
2271 ASSERT_GT(TotalTableFiles(1, 4), 1);
2272 int non_level0_num_files = 0;
2273 for (int i = 1; i < options.num_levels; i++) {
2274 non_level0_num_files += NumTableFilesAtLevel(i, 1);
2275 }
2276 ASSERT_GT(non_level0_num_files, 0);
2277
2278 // Stage 2: reopen with universal compaction - should fail
2279 options = CurrentOptions();
2280 options.compaction_style = kCompactionStyleUniversal;
2281 options.num_levels = 1;
2282 options = CurrentOptions(options);
2283 Status s = TryReopenWithColumnFamilies({"default", "pikachu"}, options);
2284 ASSERT_TRUE(s.IsInvalidArgument());
2285
2286 // Stage 3: compact into a single file and move the file to level 0
2287 options = CurrentOptions();
2288 options.disable_auto_compactions = true;
2289 options.target_file_size_base = INT_MAX;
2290 options.target_file_size_multiplier = 1;
2291 options.max_bytes_for_level_base = INT_MAX;
2292 options.max_bytes_for_level_multiplier = 1;
2293 options.num_levels = 4;
2294 options = CurrentOptions(options);
2295 ReopenWithColumnFamilies({"default", "pikachu"}, options);
2296
2297 CompactRangeOptions compact_options;
2298 compact_options.change_level = true;
2299 compact_options.target_level = 0;
2300 // cannot use kForceOptimized here because the compaction here is expected
2301 // to generate one output file
2302 compact_options.bottommost_level_compaction =
2303 BottommostLevelCompaction::kForce;
2304 compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
2305 dbfull()->CompactRange(compact_options, handles_[1], nullptr, nullptr);
2306
2307 // Only 1 file in L0
2308 ASSERT_EQ("1", FilesPerLevel(1));
2309
2310 // Stage 4: re-open in universal compaction style and do some db operations
2311 options = CurrentOptions();
2312 options.compaction_style = kCompactionStyleUniversal;
2313 options.num_levels = 4;
2314 options.write_buffer_size = 110 << 10; // 110KB
2315 options.arena_block_size = 4 << 10;
2316 options.level0_file_num_compaction_trigger = 3;
2317 options = CurrentOptions(options);
2318 ReopenWithColumnFamilies({"default", "pikachu"}, options);
2319
2320 options.num_levels = 1;
2321 ReopenWithColumnFamilies({"default", "pikachu"}, options);
2322
2323 for (int i = max_key_level_insert / 2; i <= max_key_universal_insert; i++) {
2324 ASSERT_OK(Put(1, Key(i), RandomString(&rnd, 10000)));
2325 }
2326 dbfull()->Flush(FlushOptions());
2327 ASSERT_OK(Flush(1));
2328 dbfull()->TEST_WaitForCompact();
2329
2330 for (int i = 1; i < options.num_levels; i++) {
2331 ASSERT_EQ(NumTableFilesAtLevel(i, 1), 0);
2332 }
2333
2334 // verify keys inserted in both level compaction style and universal
2335 // compaction style
2336 std::string keys_in_db;
2337 Iterator* iter = dbfull()->NewIterator(ReadOptions(), handles_[1]);
2338 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
2339 keys_in_db.append(iter->key().ToString());
2340 keys_in_db.push_back(',');
2341 }
2342 delete iter;
2343
2344 std::string expected_keys;
2345 for (int i = 0; i <= max_key_universal_insert; i++) {
2346 expected_keys.append(Key(i));
2347 expected_keys.push_back(',');
2348 }
2349
2350 ASSERT_EQ(keys_in_db, expected_keys);
2351 }
2352
TEST_F(DBCompactionTest,L0_CompactionBug_Issue44_a)2353 TEST_F(DBCompactionTest, L0_CompactionBug_Issue44_a) {
2354 do {
2355 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
2356 ASSERT_OK(Put(1, "b", "v"));
2357 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2358 ASSERT_OK(Delete(1, "b"));
2359 ASSERT_OK(Delete(1, "a"));
2360 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2361 ASSERT_OK(Delete(1, "a"));
2362 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2363 ASSERT_OK(Put(1, "a", "v"));
2364 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2365 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2366 ASSERT_EQ("(a->v)", Contents(1));
2367 env_->SleepForMicroseconds(1000000); // Wait for compaction to finish
2368 ASSERT_EQ("(a->v)", Contents(1));
2369 } while (ChangeCompactOptions());
2370 }
2371
TEST_F(DBCompactionTest,L0_CompactionBug_Issue44_b)2372 TEST_F(DBCompactionTest, L0_CompactionBug_Issue44_b) {
2373 do {
2374 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
2375 Put(1, "", "");
2376 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2377 Delete(1, "e");
2378 Put(1, "", "");
2379 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2380 Put(1, "c", "cv");
2381 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2382 Put(1, "", "");
2383 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2384 Put(1, "", "");
2385 env_->SleepForMicroseconds(1000000); // Wait for compaction to finish
2386 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2387 Put(1, "d", "dv");
2388 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2389 Put(1, "", "");
2390 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2391 Delete(1, "d");
2392 Delete(1, "b");
2393 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2394 ASSERT_EQ("(->)(c->cv)", Contents(1));
2395 env_->SleepForMicroseconds(1000000); // Wait for compaction to finish
2396 ASSERT_EQ("(->)(c->cv)", Contents(1));
2397 } while (ChangeCompactOptions());
2398 }
2399
TEST_F(DBCompactionTest,ManualAutoRace)2400 TEST_F(DBCompactionTest, ManualAutoRace) {
2401 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
2402 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
2403 {{"DBImpl::BGWorkCompaction", "DBCompactionTest::ManualAutoRace:1"},
2404 {"DBImpl::RunManualCompaction:WaitScheduled",
2405 "BackgroundCallCompaction:0"}});
2406
2407 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
2408
2409 Put(1, "foo", "");
2410 Put(1, "bar", "");
2411 Flush(1);
2412 Put(1, "foo", "");
2413 Put(1, "bar", "");
2414 // Generate four files in CF 0, which should trigger an auto compaction
2415 Put("foo", "");
2416 Put("bar", "");
2417 Flush();
2418 Put("foo", "");
2419 Put("bar", "");
2420 Flush();
2421 Put("foo", "");
2422 Put("bar", "");
2423 Flush();
2424 Put("foo", "");
2425 Put("bar", "");
2426 Flush();
2427
2428 // The auto compaction is scheduled but waited until here
2429 TEST_SYNC_POINT("DBCompactionTest::ManualAutoRace:1");
2430 // The auto compaction will wait until the manual compaction is registerd
2431 // before processing so that it will be cancelled.
2432 dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr);
2433 ASSERT_EQ("0,1", FilesPerLevel(1));
2434
2435 // Eventually the cancelled compaction will be rescheduled and executed.
2436 dbfull()->TEST_WaitForCompact();
2437 ASSERT_EQ("0,1", FilesPerLevel(0));
2438 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
2439 }
2440
TEST_P(DBCompactionTestWithParam,ManualCompaction)2441 TEST_P(DBCompactionTestWithParam, ManualCompaction) {
2442 Options options = CurrentOptions();
2443 options.max_subcompactions = max_subcompactions_;
2444 options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
2445 CreateAndReopenWithCF({"pikachu"}, options);
2446
2447 // iter - 0 with 7 levels
2448 // iter - 1 with 3 levels
2449 for (int iter = 0; iter < 2; ++iter) {
2450 MakeTables(3, "p", "q", 1);
2451 ASSERT_EQ("1,1,1", FilesPerLevel(1));
2452
2453 // Compaction range falls before files
2454 Compact(1, "", "c");
2455 ASSERT_EQ("1,1,1", FilesPerLevel(1));
2456
2457 // Compaction range falls after files
2458 Compact(1, "r", "z");
2459 ASSERT_EQ("1,1,1", FilesPerLevel(1));
2460
2461 // Compaction range overlaps files
2462 Compact(1, "p1", "p9");
2463 ASSERT_EQ("0,0,1", FilesPerLevel(1));
2464
2465 // Populate a different range
2466 MakeTables(3, "c", "e", 1);
2467 ASSERT_EQ("1,1,2", FilesPerLevel(1));
2468
2469 // Compact just the new range
2470 Compact(1, "b", "f");
2471 ASSERT_EQ("0,0,2", FilesPerLevel(1));
2472
2473 // Compact all
2474 MakeTables(1, "a", "z", 1);
2475 ASSERT_EQ("1,0,2", FilesPerLevel(1));
2476
2477 uint64_t prev_block_cache_add =
2478 options.statistics->getTickerCount(BLOCK_CACHE_ADD);
2479 CompactRangeOptions cro;
2480 cro.exclusive_manual_compaction = exclusive_manual_compaction_;
2481 db_->CompactRange(cro, handles_[1], nullptr, nullptr);
2482 // Verify manual compaction doesn't fill block cache
2483 ASSERT_EQ(prev_block_cache_add,
2484 options.statistics->getTickerCount(BLOCK_CACHE_ADD));
2485
2486 ASSERT_EQ("0,0,1", FilesPerLevel(1));
2487
2488 if (iter == 0) {
2489 options = CurrentOptions();
2490 options.num_levels = 3;
2491 options.create_if_missing = true;
2492 options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
2493 DestroyAndReopen(options);
2494 CreateAndReopenWithCF({"pikachu"}, options);
2495 }
2496 }
2497 }
2498
2499
TEST_P(DBCompactionTestWithParam,ManualLevelCompactionOutputPathId)2500 TEST_P(DBCompactionTestWithParam, ManualLevelCompactionOutputPathId) {
2501 Options options = CurrentOptions();
2502 options.db_paths.emplace_back(dbname_ + "_2", 2 * 10485760);
2503 options.db_paths.emplace_back(dbname_ + "_3", 100 * 10485760);
2504 options.db_paths.emplace_back(dbname_ + "_4", 120 * 10485760);
2505 options.max_subcompactions = max_subcompactions_;
2506 CreateAndReopenWithCF({"pikachu"}, options);
2507
2508 // iter - 0 with 7 levels
2509 // iter - 1 with 3 levels
2510 for (int iter = 0; iter < 2; ++iter) {
2511 for (int i = 0; i < 3; ++i) {
2512 ASSERT_OK(Put(1, "p", "begin"));
2513 ASSERT_OK(Put(1, "q", "end"));
2514 ASSERT_OK(Flush(1));
2515 }
2516 ASSERT_EQ("3", FilesPerLevel(1));
2517 ASSERT_EQ(3, GetSstFileCount(options.db_paths[0].path));
2518 ASSERT_EQ(0, GetSstFileCount(dbname_));
2519
2520 // Compaction range falls before files
2521 Compact(1, "", "c");
2522 ASSERT_EQ("3", FilesPerLevel(1));
2523
2524 // Compaction range falls after files
2525 Compact(1, "r", "z");
2526 ASSERT_EQ("3", FilesPerLevel(1));
2527
2528 // Compaction range overlaps files
2529 Compact(1, "p1", "p9", 1);
2530 ASSERT_OK(dbfull()->TEST_WaitForCompact());
2531 ASSERT_EQ("0,1", FilesPerLevel(1));
2532 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2533 ASSERT_EQ(0, GetSstFileCount(options.db_paths[0].path));
2534 ASSERT_EQ(0, GetSstFileCount(dbname_));
2535
2536 // Populate a different range
2537 for (int i = 0; i < 3; ++i) {
2538 ASSERT_OK(Put(1, "c", "begin"));
2539 ASSERT_OK(Put(1, "e", "end"));
2540 ASSERT_OK(Flush(1));
2541 }
2542 ASSERT_EQ("3,1", FilesPerLevel(1));
2543
2544 // Compact just the new range
2545 Compact(1, "b", "f", 1);
2546 ASSERT_OK(dbfull()->TEST_WaitForCompact());
2547 ASSERT_EQ("0,2", FilesPerLevel(1));
2548 ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path));
2549 ASSERT_EQ(0, GetSstFileCount(options.db_paths[0].path));
2550 ASSERT_EQ(0, GetSstFileCount(dbname_));
2551
2552 // Compact all
2553 ASSERT_OK(Put(1, "a", "begin"));
2554 ASSERT_OK(Put(1, "z", "end"));
2555 ASSERT_OK(Flush(1));
2556 ASSERT_EQ("1,2", FilesPerLevel(1));
2557 ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path));
2558 ASSERT_EQ(1, GetSstFileCount(options.db_paths[0].path));
2559 CompactRangeOptions compact_options;
2560 compact_options.target_path_id = 1;
2561 compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
2562 db_->CompactRange(compact_options, handles_[1], nullptr, nullptr);
2563 ASSERT_OK(dbfull()->TEST_WaitForCompact());
2564
2565 ASSERT_EQ("0,1", FilesPerLevel(1));
2566 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2567 ASSERT_EQ(0, GetSstFileCount(options.db_paths[0].path));
2568 ASSERT_EQ(0, GetSstFileCount(dbname_));
2569
2570 if (iter == 0) {
2571 DestroyAndReopen(options);
2572 options = CurrentOptions();
2573 options.db_paths.emplace_back(dbname_ + "_2", 2 * 10485760);
2574 options.db_paths.emplace_back(dbname_ + "_3", 100 * 10485760);
2575 options.db_paths.emplace_back(dbname_ + "_4", 120 * 10485760);
2576 options.max_background_flushes = 1;
2577 options.num_levels = 3;
2578 options.create_if_missing = true;
2579 CreateAndReopenWithCF({"pikachu"}, options);
2580 }
2581 }
2582 }
2583
TEST_F(DBCompactionTest,FilesDeletedAfterCompaction)2584 TEST_F(DBCompactionTest, FilesDeletedAfterCompaction) {
2585 do {
2586 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
2587 ASSERT_OK(Put(1, "foo", "v2"));
2588 Compact(1, "a", "z");
2589 const size_t num_files = CountLiveFiles();
2590 for (int i = 0; i < 10; i++) {
2591 ASSERT_OK(Put(1, "foo", "v2"));
2592 Compact(1, "a", "z");
2593 }
2594 ASSERT_EQ(CountLiveFiles(), num_files);
2595 } while (ChangeCompactOptions());
2596 }
2597
2598 // Check level comapction with compact files
TEST_P(DBCompactionTestWithParam,DISABLED_CompactFilesOnLevelCompaction)2599 TEST_P(DBCompactionTestWithParam, DISABLED_CompactFilesOnLevelCompaction) {
2600 const int kTestKeySize = 16;
2601 const int kTestValueSize = 984;
2602 const int kEntrySize = kTestKeySize + kTestValueSize;
2603 const int kEntriesPerBuffer = 100;
2604 Options options;
2605 options.create_if_missing = true;
2606 options.write_buffer_size = kEntrySize * kEntriesPerBuffer;
2607 options.compaction_style = kCompactionStyleLevel;
2608 options.target_file_size_base = options.write_buffer_size;
2609 options.max_bytes_for_level_base = options.target_file_size_base * 2;
2610 options.level0_stop_writes_trigger = 2;
2611 options.max_bytes_for_level_multiplier = 2;
2612 options.compression = kNoCompression;
2613 options.max_subcompactions = max_subcompactions_;
2614 options = CurrentOptions(options);
2615 CreateAndReopenWithCF({"pikachu"}, options);
2616
2617 Random rnd(301);
2618 for (int key = 64 * kEntriesPerBuffer; key >= 0; --key) {
2619 ASSERT_OK(Put(1, ToString(key), RandomString(&rnd, kTestValueSize)));
2620 }
2621 dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
2622 dbfull()->TEST_WaitForCompact();
2623
2624 ColumnFamilyMetaData cf_meta;
2625 dbfull()->GetColumnFamilyMetaData(handles_[1], &cf_meta);
2626 int output_level = static_cast<int>(cf_meta.levels.size()) - 1;
2627 for (int file_picked = 5; file_picked > 0; --file_picked) {
2628 std::set<std::string> overlapping_file_names;
2629 std::vector<std::string> compaction_input_file_names;
2630 for (int f = 0; f < file_picked; ++f) {
2631 int level = 0;
2632 auto file_meta = PickFileRandomly(cf_meta, &rnd, &level);
2633 compaction_input_file_names.push_back(file_meta->name);
2634 GetOverlappingFileNumbersForLevelCompaction(
2635 cf_meta, options.comparator, level, output_level,
2636 file_meta, &overlapping_file_names);
2637 }
2638
2639 ASSERT_OK(dbfull()->CompactFiles(
2640 CompactionOptions(), handles_[1],
2641 compaction_input_file_names,
2642 output_level));
2643
2644 // Make sure all overlapping files do not exist after compaction
2645 dbfull()->GetColumnFamilyMetaData(handles_[1], &cf_meta);
2646 VerifyCompactionResult(cf_meta, overlapping_file_names);
2647 }
2648
2649 // make sure all key-values are still there.
2650 for (int key = 64 * kEntriesPerBuffer; key >= 0; --key) {
2651 ASSERT_NE(Get(1, ToString(key)), "NOT_FOUND");
2652 }
2653 }
2654
TEST_P(DBCompactionTestWithParam,PartialCompactionFailure)2655 TEST_P(DBCompactionTestWithParam, PartialCompactionFailure) {
2656 Options options;
2657 const int kKeySize = 16;
2658 const int kKvSize = 1000;
2659 const int kKeysPerBuffer = 100;
2660 const int kNumL1Files = 5;
2661 options.create_if_missing = true;
2662 options.write_buffer_size = kKeysPerBuffer * kKvSize;
2663 options.max_write_buffer_number = 2;
2664 options.target_file_size_base =
2665 options.write_buffer_size *
2666 (options.max_write_buffer_number - 1);
2667 options.level0_file_num_compaction_trigger = kNumL1Files;
2668 options.max_bytes_for_level_base =
2669 options.level0_file_num_compaction_trigger *
2670 options.target_file_size_base;
2671 options.max_bytes_for_level_multiplier = 2;
2672 options.compression = kNoCompression;
2673 options.max_subcompactions = max_subcompactions_;
2674
2675 env_->SetBackgroundThreads(1, Env::HIGH);
2676 env_->SetBackgroundThreads(1, Env::LOW);
2677 // stop the compaction thread until we simulate the file creation failure.
2678 test::SleepingBackgroundTask sleeping_task_low;
2679 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
2680 Env::Priority::LOW);
2681
2682 options.env = env_;
2683
2684 DestroyAndReopen(options);
2685
2686 const int kNumInsertedKeys =
2687 options.level0_file_num_compaction_trigger *
2688 (options.max_write_buffer_number - 1) *
2689 kKeysPerBuffer;
2690
2691 Random rnd(301);
2692 std::vector<std::string> keys;
2693 std::vector<std::string> values;
2694 for (int k = 0; k < kNumInsertedKeys; ++k) {
2695 keys.emplace_back(RandomString(&rnd, kKeySize));
2696 values.emplace_back(RandomString(&rnd, kKvSize - kKeySize));
2697 ASSERT_OK(Put(Slice(keys[k]), Slice(values[k])));
2698 dbfull()->TEST_WaitForFlushMemTable();
2699 }
2700
2701 dbfull()->TEST_FlushMemTable(true);
2702 // Make sure the number of L0 files can trigger compaction.
2703 ASSERT_GE(NumTableFilesAtLevel(0),
2704 options.level0_file_num_compaction_trigger);
2705
2706 auto previous_num_level0_files = NumTableFilesAtLevel(0);
2707
2708 // Fail the first file creation.
2709 env_->non_writable_count_ = 1;
2710 sleeping_task_low.WakeUp();
2711 sleeping_task_low.WaitUntilDone();
2712
2713 // Expect compaction to fail here as one file will fail its
2714 // creation.
2715 ASSERT_TRUE(!dbfull()->TEST_WaitForCompact().ok());
2716
2717 // Verify L0 -> L1 compaction does fail.
2718 ASSERT_EQ(NumTableFilesAtLevel(1), 0);
2719
2720 // Verify all L0 files are still there.
2721 ASSERT_EQ(NumTableFilesAtLevel(0), previous_num_level0_files);
2722
2723 // All key-values must exist after compaction fails.
2724 for (int k = 0; k < kNumInsertedKeys; ++k) {
2725 ASSERT_EQ(values[k], Get(keys[k]));
2726 }
2727
2728 env_->non_writable_count_ = 0;
2729
2730 // Make sure RocksDB will not get into corrupted state.
2731 Reopen(options);
2732
2733 // Verify again after reopen.
2734 for (int k = 0; k < kNumInsertedKeys; ++k) {
2735 ASSERT_EQ(values[k], Get(keys[k]));
2736 }
2737 }
2738
TEST_P(DBCompactionTestWithParam,DeleteMovedFileAfterCompaction)2739 TEST_P(DBCompactionTestWithParam, DeleteMovedFileAfterCompaction) {
2740 // iter 1 -- delete_obsolete_files_period_micros == 0
2741 for (int iter = 0; iter < 2; ++iter) {
2742 // This test triggers move compaction and verifies that the file is not
2743 // deleted when it's part of move compaction
2744 Options options = CurrentOptions();
2745 options.env = env_;
2746 if (iter == 1) {
2747 options.delete_obsolete_files_period_micros = 0;
2748 }
2749 options.create_if_missing = true;
2750 options.level0_file_num_compaction_trigger =
2751 2; // trigger compaction when we have 2 files
2752 OnFileDeletionListener* listener = new OnFileDeletionListener();
2753 options.listeners.emplace_back(listener);
2754 options.max_subcompactions = max_subcompactions_;
2755 DestroyAndReopen(options);
2756
2757 Random rnd(301);
2758 // Create two 1MB sst files
2759 for (int i = 0; i < 2; ++i) {
2760 // Create 1MB sst file
2761 for (int j = 0; j < 100; ++j) {
2762 ASSERT_OK(Put(Key(i * 50 + j), RandomString(&rnd, 10 * 1024)));
2763 }
2764 ASSERT_OK(Flush());
2765 }
2766 // this should execute L0->L1
2767 dbfull()->TEST_WaitForCompact();
2768 ASSERT_EQ("0,1", FilesPerLevel(0));
2769
2770 // block compactions
2771 test::SleepingBackgroundTask sleeping_task;
2772 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task,
2773 Env::Priority::LOW);
2774
2775 options.max_bytes_for_level_base = 1024 * 1024; // 1 MB
2776 Reopen(options);
2777 std::unique_ptr<Iterator> iterator(db_->NewIterator(ReadOptions()));
2778 ASSERT_EQ("0,1", FilesPerLevel(0));
2779 // let compactions go
2780 sleeping_task.WakeUp();
2781 sleeping_task.WaitUntilDone();
2782
2783 // this should execute L1->L2 (move)
2784 dbfull()->TEST_WaitForCompact();
2785
2786 ASSERT_EQ("0,0,1", FilesPerLevel(0));
2787
2788 std::vector<LiveFileMetaData> metadata;
2789 db_->GetLiveFilesMetaData(&metadata);
2790 ASSERT_EQ(metadata.size(), 1U);
2791 auto moved_file_name = metadata[0].name;
2792
2793 // Create two more 1MB sst files
2794 for (int i = 0; i < 2; ++i) {
2795 // Create 1MB sst file
2796 for (int j = 0; j < 100; ++j) {
2797 ASSERT_OK(Put(Key(i * 50 + j + 100), RandomString(&rnd, 10 * 1024)));
2798 }
2799 ASSERT_OK(Flush());
2800 }
2801 // this should execute both L0->L1 and L1->L2 (merge with previous file)
2802 dbfull()->TEST_WaitForCompact();
2803
2804 ASSERT_EQ("0,0,2", FilesPerLevel(0));
2805
2806 // iterator is holding the file
2807 ASSERT_OK(env_->FileExists(dbname_ + moved_file_name));
2808
2809 listener->SetExpectedFileName(dbname_ + moved_file_name);
2810 iterator.reset();
2811
2812 // this file should have been compacted away
2813 ASSERT_NOK(env_->FileExists(dbname_ + moved_file_name));
2814 listener->VerifyMatchedCount(1);
2815 }
2816 }
2817
TEST_P(DBCompactionTestWithParam,CompressLevelCompaction)2818 TEST_P(DBCompactionTestWithParam, CompressLevelCompaction) {
2819 if (!Zlib_Supported()) {
2820 return;
2821 }
2822 Options options = CurrentOptions();
2823 options.memtable_factory.reset(
2824 new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
2825 options.compaction_style = kCompactionStyleLevel;
2826 options.write_buffer_size = 110 << 10; // 110KB
2827 options.arena_block_size = 4 << 10;
2828 options.level0_file_num_compaction_trigger = 2;
2829 options.num_levels = 4;
2830 options.max_bytes_for_level_base = 400 * 1024;
2831 options.max_subcompactions = max_subcompactions_;
2832 // First two levels have no compression, so that a trivial move between
2833 // them will be allowed. Level 2 has Zlib compression so that a trivial
2834 // move to level 3 will not be allowed
2835 options.compression_per_level = {kNoCompression, kNoCompression,
2836 kZlibCompression};
2837 int matches = 0, didnt_match = 0, trivial_move = 0, non_trivial = 0;
2838
2839 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
2840 "Compaction::InputCompressionMatchesOutput:Matches",
2841 [&](void* /*arg*/) { matches++; });
2842 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
2843 "Compaction::InputCompressionMatchesOutput:DidntMatch",
2844 [&](void* /*arg*/) { didnt_match++; });
2845 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
2846 "DBImpl::BackgroundCompaction:NonTrivial",
2847 [&](void* /*arg*/) { non_trivial++; });
2848 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
2849 "DBImpl::BackgroundCompaction:TrivialMove",
2850 [&](void* /*arg*/) { trivial_move++; });
2851 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
2852
2853 Reopen(options);
2854
2855 Random rnd(301);
2856 int key_idx = 0;
2857
2858 // First three 110KB files are going to level 0
2859 // After that, (100K, 200K)
2860 for (int num = 0; num < 3; num++) {
2861 GenerateNewFile(&rnd, &key_idx);
2862 }
2863
2864 // Another 110KB triggers a compaction to 400K file to fill up level 0
2865 GenerateNewFile(&rnd, &key_idx);
2866 ASSERT_EQ(4, GetSstFileCount(dbname_));
2867
2868 // (1, 4)
2869 GenerateNewFile(&rnd, &key_idx);
2870 ASSERT_EQ("1,4", FilesPerLevel(0));
2871
2872 // (1, 4, 1)
2873 GenerateNewFile(&rnd, &key_idx);
2874 ASSERT_EQ("1,4,1", FilesPerLevel(0));
2875
2876 // (1, 4, 2)
2877 GenerateNewFile(&rnd, &key_idx);
2878 ASSERT_EQ("1,4,2", FilesPerLevel(0));
2879
2880 // (1, 4, 3)
2881 GenerateNewFile(&rnd, &key_idx);
2882 ASSERT_EQ("1,4,3", FilesPerLevel(0));
2883
2884 // (1, 4, 4)
2885 GenerateNewFile(&rnd, &key_idx);
2886 ASSERT_EQ("1,4,4", FilesPerLevel(0));
2887
2888 // (1, 4, 5)
2889 GenerateNewFile(&rnd, &key_idx);
2890 ASSERT_EQ("1,4,5", FilesPerLevel(0));
2891
2892 // (1, 4, 6)
2893 GenerateNewFile(&rnd, &key_idx);
2894 ASSERT_EQ("1,4,6", FilesPerLevel(0));
2895
2896 // (1, 4, 7)
2897 GenerateNewFile(&rnd, &key_idx);
2898 ASSERT_EQ("1,4,7", FilesPerLevel(0));
2899
2900 // (1, 4, 8)
2901 GenerateNewFile(&rnd, &key_idx);
2902 ASSERT_EQ("1,4,8", FilesPerLevel(0));
2903
2904 ASSERT_EQ(matches, 12);
2905 // Currently, the test relies on the number of calls to
2906 // InputCompressionMatchesOutput() per compaction.
2907 const int kCallsToInputCompressionMatch = 2;
2908 ASSERT_EQ(didnt_match, 8 * kCallsToInputCompressionMatch);
2909 ASSERT_EQ(trivial_move, 12);
2910 ASSERT_EQ(non_trivial, 8);
2911
2912 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
2913
2914 for (int i = 0; i < key_idx; i++) {
2915 auto v = Get(Key(i));
2916 ASSERT_NE(v, "NOT_FOUND");
2917 ASSERT_TRUE(v.size() == 1 || v.size() == 990);
2918 }
2919
2920 Reopen(options);
2921
2922 for (int i = 0; i < key_idx; i++) {
2923 auto v = Get(Key(i));
2924 ASSERT_NE(v, "NOT_FOUND");
2925 ASSERT_TRUE(v.size() == 1 || v.size() == 990);
2926 }
2927
2928 Destroy(options);
2929 }
2930
TEST_F(DBCompactionTest,SanitizeCompactionOptionsTest)2931 TEST_F(DBCompactionTest, SanitizeCompactionOptionsTest) {
2932 Options options = CurrentOptions();
2933 options.max_background_compactions = 5;
2934 options.soft_pending_compaction_bytes_limit = 0;
2935 options.hard_pending_compaction_bytes_limit = 100;
2936 options.create_if_missing = true;
2937 DestroyAndReopen(options);
2938 ASSERT_EQ(100, db_->GetOptions().soft_pending_compaction_bytes_limit);
2939
2940 options.max_background_compactions = 3;
2941 options.soft_pending_compaction_bytes_limit = 200;
2942 options.hard_pending_compaction_bytes_limit = 150;
2943 DestroyAndReopen(options);
2944 ASSERT_EQ(150, db_->GetOptions().soft_pending_compaction_bytes_limit);
2945 }
2946
2947 // This tests for a bug that could cause two level0 compactions running
2948 // concurrently
2949 // TODO(aekmekji): Make sure that the reason this fails when run with
2950 // max_subcompactions > 1 is not a correctness issue but just inherent to
2951 // running parallel L0-L1 compactions
TEST_F(DBCompactionTest,SuggestCompactRangeNoTwoLevel0Compactions)2952 TEST_F(DBCompactionTest, SuggestCompactRangeNoTwoLevel0Compactions) {
2953 Options options = CurrentOptions();
2954 options.compaction_style = kCompactionStyleLevel;
2955 options.write_buffer_size = 110 << 10;
2956 options.arena_block_size = 4 << 10;
2957 options.level0_file_num_compaction_trigger = 4;
2958 options.num_levels = 4;
2959 options.compression = kNoCompression;
2960 options.max_bytes_for_level_base = 450 << 10;
2961 options.target_file_size_base = 98 << 10;
2962 options.max_write_buffer_number = 2;
2963 options.max_background_compactions = 2;
2964
2965 DestroyAndReopen(options);
2966
2967 // fill up the DB
2968 Random rnd(301);
2969 for (int num = 0; num < 10; num++) {
2970 GenerateNewRandomFile(&rnd);
2971 }
2972 db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
2973
2974 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
2975 {{"CompactionJob::Run():Start",
2976 "DBCompactionTest::SuggestCompactRangeNoTwoLevel0Compactions:1"},
2977 {"DBCompactionTest::SuggestCompactRangeNoTwoLevel0Compactions:2",
2978 "CompactionJob::Run():End"}});
2979
2980 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
2981
2982 // trigger L0 compaction
2983 for (int num = 0; num < options.level0_file_num_compaction_trigger + 1;
2984 num++) {
2985 GenerateNewRandomFile(&rnd, /* nowait */ true);
2986 ASSERT_OK(Flush());
2987 }
2988
2989 TEST_SYNC_POINT(
2990 "DBCompactionTest::SuggestCompactRangeNoTwoLevel0Compactions:1");
2991
2992 GenerateNewRandomFile(&rnd, /* nowait */ true);
2993 dbfull()->TEST_WaitForFlushMemTable();
2994 ASSERT_OK(experimental::SuggestCompactRange(db_, nullptr, nullptr));
2995 for (int num = 0; num < options.level0_file_num_compaction_trigger + 1;
2996 num++) {
2997 GenerateNewRandomFile(&rnd, /* nowait */ true);
2998 ASSERT_OK(Flush());
2999 }
3000
3001 TEST_SYNC_POINT(
3002 "DBCompactionTest::SuggestCompactRangeNoTwoLevel0Compactions:2");
3003 dbfull()->TEST_WaitForCompact();
3004 }
3005
ShortKey(int i)3006 static std::string ShortKey(int i) {
3007 assert(i < 10000);
3008 char buf[100];
3009 snprintf(buf, sizeof(buf), "key%04d", i);
3010 return std::string(buf);
3011 }
3012
TEST_P(DBCompactionTestWithParam,ForceBottommostLevelCompaction)3013 TEST_P(DBCompactionTestWithParam, ForceBottommostLevelCompaction) {
3014 int32_t trivial_move = 0;
3015 int32_t non_trivial_move = 0;
3016 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3017 "DBImpl::BackgroundCompaction:TrivialMove",
3018 [&](void* /*arg*/) { trivial_move++; });
3019 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3020 "DBImpl::BackgroundCompaction:NonTrivial",
3021 [&](void* /*arg*/) { non_trivial_move++; });
3022 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3023
3024 // The key size is guaranteed to be <= 8
3025 class ShortKeyComparator : public Comparator {
3026 int Compare(const ROCKSDB_NAMESPACE::Slice& a,
3027 const ROCKSDB_NAMESPACE::Slice& b) const override {
3028 assert(a.size() <= 8);
3029 assert(b.size() <= 8);
3030 return BytewiseComparator()->Compare(a, b);
3031 }
3032 const char* Name() const override { return "ShortKeyComparator"; }
3033 void FindShortestSeparator(
3034 std::string* start,
3035 const ROCKSDB_NAMESPACE::Slice& limit) const override {
3036 return BytewiseComparator()->FindShortestSeparator(start, limit);
3037 }
3038 void FindShortSuccessor(std::string* key) const override {
3039 return BytewiseComparator()->FindShortSuccessor(key);
3040 }
3041 } short_key_cmp;
3042 Options options = CurrentOptions();
3043 options.target_file_size_base = 100000000;
3044 options.write_buffer_size = 100000000;
3045 options.max_subcompactions = max_subcompactions_;
3046 options.comparator = &short_key_cmp;
3047 DestroyAndReopen(options);
3048
3049 int32_t value_size = 10 * 1024; // 10 KB
3050
3051 Random rnd(301);
3052 std::vector<std::string> values;
3053 // File with keys [ 0 => 99 ]
3054 for (int i = 0; i < 100; i++) {
3055 values.push_back(RandomString(&rnd, value_size));
3056 ASSERT_OK(Put(ShortKey(i), values[i]));
3057 }
3058 ASSERT_OK(Flush());
3059
3060 ASSERT_EQ("1", FilesPerLevel(0));
3061 // Compaction will do L0=>L1 (trivial move) then move L1 files to L3
3062 CompactRangeOptions compact_options;
3063 compact_options.change_level = true;
3064 compact_options.target_level = 3;
3065 ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
3066 ASSERT_EQ("0,0,0,1", FilesPerLevel(0));
3067 ASSERT_EQ(trivial_move, 1);
3068 ASSERT_EQ(non_trivial_move, 0);
3069
3070 // File with keys [ 100 => 199 ]
3071 for (int i = 100; i < 200; i++) {
3072 values.push_back(RandomString(&rnd, value_size));
3073 ASSERT_OK(Put(ShortKey(i), values[i]));
3074 }
3075 ASSERT_OK(Flush());
3076
3077 ASSERT_EQ("1,0,0,1", FilesPerLevel(0));
3078 // Compaction will do L0=>L1 L1=>L2 L2=>L3 (3 trivial moves)
3079 // then compacte the bottommost level L3=>L3 (non trivial move)
3080 compact_options = CompactRangeOptions();
3081 compact_options.bottommost_level_compaction =
3082 BottommostLevelCompaction::kForceOptimized;
3083 ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
3084 ASSERT_EQ("0,0,0,1", FilesPerLevel(0));
3085 ASSERT_EQ(trivial_move, 4);
3086 ASSERT_EQ(non_trivial_move, 1);
3087
3088 // File with keys [ 200 => 299 ]
3089 for (int i = 200; i < 300; i++) {
3090 values.push_back(RandomString(&rnd, value_size));
3091 ASSERT_OK(Put(ShortKey(i), values[i]));
3092 }
3093 ASSERT_OK(Flush());
3094
3095 ASSERT_EQ("1,0,0,1", FilesPerLevel(0));
3096 trivial_move = 0;
3097 non_trivial_move = 0;
3098 compact_options = CompactRangeOptions();
3099 compact_options.bottommost_level_compaction =
3100 BottommostLevelCompaction::kSkip;
3101 // Compaction will do L0=>L1 L1=>L2 L2=>L3 (3 trivial moves)
3102 // and will skip bottommost level compaction
3103 ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
3104 ASSERT_EQ("0,0,0,2", FilesPerLevel(0));
3105 ASSERT_EQ(trivial_move, 3);
3106 ASSERT_EQ(non_trivial_move, 0);
3107
3108 for (int i = 0; i < 300; i++) {
3109 ASSERT_EQ(Get(ShortKey(i)), values[i]);
3110 }
3111
3112 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3113 }
3114
TEST_P(DBCompactionTestWithParam,IntraL0Compaction)3115 TEST_P(DBCompactionTestWithParam, IntraL0Compaction) {
3116 Options options = CurrentOptions();
3117 options.compression = kNoCompression;
3118 options.level0_file_num_compaction_trigger = 5;
3119 options.max_background_compactions = 2;
3120 options.max_subcompactions = max_subcompactions_;
3121 DestroyAndReopen(options);
3122
3123 const size_t kValueSize = 1 << 20;
3124 Random rnd(301);
3125 std::string value(RandomString(&rnd, kValueSize));
3126
3127 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
3128 {{"LevelCompactionPicker::PickCompactionBySize:0",
3129 "CompactionJob::Run():Start"}});
3130 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3131
3132 // index: 0 1 2 3 4 5 6 7 8 9
3133 // size: 1MB 1MB 1MB 1MB 1MB 2MB 1MB 1MB 1MB 1MB
3134 // score: 1.5 1.3 1.5 2.0 inf
3135 //
3136 // Files 0-4 will be included in an L0->L1 compaction.
3137 //
3138 // L0->L0 will be triggered since the sync points guarantee compaction to base
3139 // level is still blocked when files 5-9 trigger another compaction.
3140 //
3141 // Files 6-9 are the longest span of available files for which
3142 // work-per-deleted-file decreases (see "score" row above).
3143 for (int i = 0; i < 10; ++i) {
3144 ASSERT_OK(Put(Key(0), "")); // prevents trivial move
3145 if (i == 5) {
3146 ASSERT_OK(Put(Key(i + 1), value + value));
3147 } else {
3148 ASSERT_OK(Put(Key(i + 1), value));
3149 }
3150 ASSERT_OK(Flush());
3151 }
3152 dbfull()->TEST_WaitForCompact();
3153 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3154
3155 std::vector<std::vector<FileMetaData>> level_to_files;
3156 dbfull()->TEST_GetFilesMetaData(dbfull()->DefaultColumnFamily(),
3157 &level_to_files);
3158 ASSERT_GE(level_to_files.size(), 2); // at least L0 and L1
3159 // L0 has the 2MB file (not compacted) and 4MB file (output of L0->L0)
3160 ASSERT_EQ(2, level_to_files[0].size());
3161 ASSERT_GT(level_to_files[1].size(), 0);
3162 for (int i = 0; i < 2; ++i) {
3163 ASSERT_GE(level_to_files[0][i].fd.file_size, 1 << 21);
3164 }
3165 }
3166
TEST_P(DBCompactionTestWithParam,IntraL0CompactionDoesNotObsoleteDeletions)3167 TEST_P(DBCompactionTestWithParam, IntraL0CompactionDoesNotObsoleteDeletions) {
3168 // regression test for issue #2722: L0->L0 compaction can resurrect deleted
3169 // keys from older L0 files if L1+ files' key-ranges do not include the key.
3170 Options options = CurrentOptions();
3171 options.compression = kNoCompression;
3172 options.level0_file_num_compaction_trigger = 5;
3173 options.max_background_compactions = 2;
3174 options.max_subcompactions = max_subcompactions_;
3175 DestroyAndReopen(options);
3176
3177 const size_t kValueSize = 1 << 20;
3178 Random rnd(301);
3179 std::string value(RandomString(&rnd, kValueSize));
3180
3181 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
3182 {{"LevelCompactionPicker::PickCompactionBySize:0",
3183 "CompactionJob::Run():Start"}});
3184 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3185
3186 // index: 0 1 2 3 4 5 6 7 8 9
3187 // size: 1MB 1MB 1MB 1MB 1MB 1MB 1MB 1MB 1MB 1MB
3188 // score: 1.25 1.33 1.5 2.0 inf
3189 //
3190 // Files 0-4 will be included in an L0->L1 compaction.
3191 //
3192 // L0->L0 will be triggered since the sync points guarantee compaction to base
3193 // level is still blocked when files 5-9 trigger another compaction. All files
3194 // 5-9 are included in the L0->L0 due to work-per-deleted file decreasing.
3195 //
3196 // Put a key-value in files 0-4. Delete that key in files 5-9. Verify the
3197 // L0->L0 preserves the deletion such that the key remains deleted.
3198 for (int i = 0; i < 10; ++i) {
3199 // key 0 serves both to prevent trivial move and as the key we want to
3200 // verify is not resurrected by L0->L0 compaction.
3201 if (i < 5) {
3202 ASSERT_OK(Put(Key(0), ""));
3203 } else {
3204 ASSERT_OK(Delete(Key(0)));
3205 }
3206 ASSERT_OK(Put(Key(i + 1), value));
3207 ASSERT_OK(Flush());
3208 }
3209 dbfull()->TEST_WaitForCompact();
3210 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3211
3212 std::vector<std::vector<FileMetaData>> level_to_files;
3213 dbfull()->TEST_GetFilesMetaData(dbfull()->DefaultColumnFamily(),
3214 &level_to_files);
3215 ASSERT_GE(level_to_files.size(), 2); // at least L0 and L1
3216 // L0 has a single output file from L0->L0
3217 ASSERT_EQ(1, level_to_files[0].size());
3218 ASSERT_GT(level_to_files[1].size(), 0);
3219 ASSERT_GE(level_to_files[0][0].fd.file_size, 1 << 22);
3220
3221 ReadOptions roptions;
3222 std::string result;
3223 ASSERT_TRUE(db_->Get(roptions, Key(0), &result).IsNotFound());
3224 }
3225
TEST_P(DBCompactionTestWithParam,FullCompactionInBottomPriThreadPool)3226 TEST_P(DBCompactionTestWithParam, FullCompactionInBottomPriThreadPool) {
3227 const int kNumFilesTrigger = 3;
3228 Env::Default()->SetBackgroundThreads(1, Env::Priority::BOTTOM);
3229 for (bool use_universal_compaction : {false, true}) {
3230 Options options = CurrentOptions();
3231 if (use_universal_compaction) {
3232 options.compaction_style = kCompactionStyleUniversal;
3233 } else {
3234 options.compaction_style = kCompactionStyleLevel;
3235 options.level_compaction_dynamic_level_bytes = true;
3236 }
3237 options.num_levels = 4;
3238 options.write_buffer_size = 100 << 10; // 100KB
3239 options.target_file_size_base = 32 << 10; // 32KB
3240 options.level0_file_num_compaction_trigger = kNumFilesTrigger;
3241 // Trigger compaction if size amplification exceeds 110%
3242 options.compaction_options_universal.max_size_amplification_percent = 110;
3243 DestroyAndReopen(options);
3244
3245 int num_bottom_pri_compactions = 0;
3246 SyncPoint::GetInstance()->SetCallBack(
3247 "DBImpl::BGWorkBottomCompaction",
3248 [&](void* /*arg*/) { ++num_bottom_pri_compactions; });
3249 SyncPoint::GetInstance()->EnableProcessing();
3250
3251 Random rnd(301);
3252 for (int num = 0; num < kNumFilesTrigger; num++) {
3253 ASSERT_EQ(NumSortedRuns(), num);
3254 int key_idx = 0;
3255 GenerateNewFile(&rnd, &key_idx);
3256 }
3257 dbfull()->TEST_WaitForCompact();
3258
3259 ASSERT_EQ(1, num_bottom_pri_compactions);
3260
3261 // Verify that size amplification did occur
3262 ASSERT_EQ(NumSortedRuns(), 1);
3263 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3264 }
3265 Env::Default()->SetBackgroundThreads(0, Env::Priority::BOTTOM);
3266 }
3267
TEST_F(DBCompactionTest,OptimizedDeletionObsoleting)3268 TEST_F(DBCompactionTest, OptimizedDeletionObsoleting) {
3269 // Deletions can be dropped when compacted to non-last level if they fall
3270 // outside the lower-level files' key-ranges.
3271 const int kNumL0Files = 4;
3272 Options options = CurrentOptions();
3273 options.level0_file_num_compaction_trigger = kNumL0Files;
3274 options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
3275 DestroyAndReopen(options);
3276
3277 // put key 1 and 3 in separate L1, L2 files.
3278 // So key 0, 2, and 4+ fall outside these levels' key-ranges.
3279 for (int level = 2; level >= 1; --level) {
3280 for (int i = 0; i < 2; ++i) {
3281 Put(Key(2 * i + 1), "val");
3282 Flush();
3283 }
3284 MoveFilesToLevel(level);
3285 ASSERT_EQ(2, NumTableFilesAtLevel(level));
3286 }
3287
3288 // Delete keys in range [1, 4]. These L0 files will be compacted with L1:
3289 // - Tombstones for keys 2 and 4 can be dropped early.
3290 // - Tombstones for keys 1 and 3 must be kept due to L2 files' key-ranges.
3291 for (int i = 0; i < kNumL0Files; ++i) {
3292 Put(Key(0), "val"); // sentinel to prevent trivial move
3293 Delete(Key(i + 1));
3294 Flush();
3295 }
3296 dbfull()->TEST_WaitForCompact();
3297
3298 for (int i = 0; i < kNumL0Files; ++i) {
3299 std::string value;
3300 ASSERT_TRUE(db_->Get(ReadOptions(), Key(i + 1), &value).IsNotFound());
3301 }
3302 ASSERT_EQ(2, options.statistics->getTickerCount(
3303 COMPACTION_OPTIMIZED_DEL_DROP_OBSOLETE));
3304 ASSERT_EQ(2,
3305 options.statistics->getTickerCount(COMPACTION_KEY_DROP_OBSOLETE));
3306 }
3307
TEST_F(DBCompactionTest,CompactFilesPendingL0Bug)3308 TEST_F(DBCompactionTest, CompactFilesPendingL0Bug) {
3309 // https://www.facebook.com/groups/rocksdb.dev/permalink/1389452781153232/
3310 // CompactFiles() had a bug where it failed to pick a compaction when an L0
3311 // compaction existed, but marked it as scheduled anyways. It'd never be
3312 // unmarked as scheduled, so future compactions or DB close could hang.
3313 const int kNumL0Files = 5;
3314 Options options = CurrentOptions();
3315 options.level0_file_num_compaction_trigger = kNumL0Files - 1;
3316 options.max_background_compactions = 2;
3317 DestroyAndReopen(options);
3318
3319 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
3320 {{"LevelCompactionPicker::PickCompaction:Return",
3321 "DBCompactionTest::CompactFilesPendingL0Bug:Picked"},
3322 {"DBCompactionTest::CompactFilesPendingL0Bug:ManualCompacted",
3323 "DBImpl::BackgroundCompaction:NonTrivial:AfterRun"}});
3324 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3325
3326 auto schedule_multi_compaction_token =
3327 dbfull()->TEST_write_controler().GetCompactionPressureToken();
3328
3329 // Files 0-3 will be included in an L0->L1 compaction.
3330 //
3331 // File 4 will be included in a call to CompactFiles() while the first
3332 // compaction is running.
3333 for (int i = 0; i < kNumL0Files - 1; ++i) {
3334 ASSERT_OK(Put(Key(0), "val")); // sentinel to prevent trivial move
3335 ASSERT_OK(Put(Key(i + 1), "val"));
3336 ASSERT_OK(Flush());
3337 }
3338 TEST_SYNC_POINT("DBCompactionTest::CompactFilesPendingL0Bug:Picked");
3339 // file 4 flushed after 0-3 picked
3340 ASSERT_OK(Put(Key(kNumL0Files), "val"));
3341 ASSERT_OK(Flush());
3342
3343 // previously DB close would hang forever as this situation caused scheduled
3344 // compactions count to never decrement to zero.
3345 ColumnFamilyMetaData cf_meta;
3346 dbfull()->GetColumnFamilyMetaData(dbfull()->DefaultColumnFamily(), &cf_meta);
3347 ASSERT_EQ(kNumL0Files, cf_meta.levels[0].files.size());
3348 std::vector<std::string> input_filenames;
3349 input_filenames.push_back(cf_meta.levels[0].files.front().name);
3350 ASSERT_OK(dbfull()
3351 ->CompactFiles(CompactionOptions(), input_filenames,
3352 0 /* output_level */));
3353 TEST_SYNC_POINT("DBCompactionTest::CompactFilesPendingL0Bug:ManualCompacted");
3354 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3355 }
3356
TEST_F(DBCompactionTest,CompactFilesOverlapInL0Bug)3357 TEST_F(DBCompactionTest, CompactFilesOverlapInL0Bug) {
3358 // Regression test for bug of not pulling in L0 files that overlap the user-
3359 // specified input files in time- and key-ranges.
3360 Put(Key(0), "old_val");
3361 Flush();
3362 Put(Key(0), "new_val");
3363 Flush();
3364
3365 ColumnFamilyMetaData cf_meta;
3366 dbfull()->GetColumnFamilyMetaData(dbfull()->DefaultColumnFamily(), &cf_meta);
3367 ASSERT_GE(cf_meta.levels.size(), 2);
3368 ASSERT_EQ(2, cf_meta.levels[0].files.size());
3369
3370 // Compacting {new L0 file, L1 file} should pull in the old L0 file since it
3371 // overlaps in key-range and time-range.
3372 std::vector<std::string> input_filenames;
3373 input_filenames.push_back(cf_meta.levels[0].files.front().name);
3374 ASSERT_OK(dbfull()->CompactFiles(CompactionOptions(), input_filenames,
3375 1 /* output_level */));
3376 ASSERT_EQ("new_val", Get(Key(0)));
3377 }
3378
TEST_F(DBCompactionTest,CompactBottomLevelFilesWithDeletions)3379 TEST_F(DBCompactionTest, CompactBottomLevelFilesWithDeletions) {
3380 // bottom-level files may contain deletions due to snapshots protecting the
3381 // deleted keys. Once the snapshot is released, we should see files with many
3382 // such deletions undergo single-file compactions.
3383 const int kNumKeysPerFile = 1024;
3384 const int kNumLevelFiles = 4;
3385 const int kValueSize = 128;
3386 Options options = CurrentOptions();
3387 options.compression = kNoCompression;
3388 options.level0_file_num_compaction_trigger = kNumLevelFiles;
3389 // inflate it a bit to account for key/metadata overhead
3390 options.target_file_size_base = 120 * kNumKeysPerFile * kValueSize / 100;
3391 CreateAndReopenWithCF({"one"}, options);
3392
3393 Random rnd(301);
3394 const Snapshot* snapshot = nullptr;
3395 for (int i = 0; i < kNumLevelFiles; ++i) {
3396 for (int j = 0; j < kNumKeysPerFile; ++j) {
3397 ASSERT_OK(
3398 Put(Key(i * kNumKeysPerFile + j), RandomString(&rnd, kValueSize)));
3399 }
3400 if (i == kNumLevelFiles - 1) {
3401 snapshot = db_->GetSnapshot();
3402 // delete every other key after grabbing a snapshot, so these deletions
3403 // and the keys they cover can't be dropped until after the snapshot is
3404 // released.
3405 for (int j = 0; j < kNumLevelFiles * kNumKeysPerFile; j += 2) {
3406 ASSERT_OK(Delete(Key(j)));
3407 }
3408 }
3409 Flush();
3410 if (i < kNumLevelFiles - 1) {
3411 ASSERT_EQ(i + 1, NumTableFilesAtLevel(0));
3412 }
3413 }
3414 dbfull()->TEST_WaitForCompact();
3415 ASSERT_EQ(kNumLevelFiles, NumTableFilesAtLevel(1));
3416
3417 std::vector<LiveFileMetaData> pre_release_metadata, post_release_metadata;
3418 db_->GetLiveFilesMetaData(&pre_release_metadata);
3419 // just need to bump seqnum so ReleaseSnapshot knows the newest key in the SST
3420 // files does not need to be preserved in case of a future snapshot.
3421 ASSERT_OK(Put(Key(0), "val"));
3422 ASSERT_NE(kMaxSequenceNumber, dbfull()->bottommost_files_mark_threshold_);
3423 // release snapshot and wait for compactions to finish. Single-file
3424 // compactions should be triggered, which reduce the size of each bottom-level
3425 // file without changing file count.
3426 db_->ReleaseSnapshot(snapshot);
3427 ASSERT_EQ(kMaxSequenceNumber, dbfull()->bottommost_files_mark_threshold_);
3428 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3429 "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
3430 Compaction* compaction = reinterpret_cast<Compaction*>(arg);
3431 ASSERT_TRUE(compaction->compaction_reason() ==
3432 CompactionReason::kBottommostFiles);
3433 });
3434 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3435 dbfull()->TEST_WaitForCompact();
3436 db_->GetLiveFilesMetaData(&post_release_metadata);
3437 ASSERT_EQ(pre_release_metadata.size(), post_release_metadata.size());
3438
3439 for (size_t i = 0; i < pre_release_metadata.size(); ++i) {
3440 const auto& pre_file = pre_release_metadata[i];
3441 const auto& post_file = post_release_metadata[i];
3442 ASSERT_EQ(1, pre_file.level);
3443 ASSERT_EQ(1, post_file.level);
3444 // each file is smaller than it was before as it was rewritten without
3445 // deletion markers/deleted keys.
3446 ASSERT_LT(post_file.size, pre_file.size);
3447 }
3448 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3449 }
3450
TEST_F(DBCompactionTest,LevelCompactExpiredTtlFiles)3451 TEST_F(DBCompactionTest, LevelCompactExpiredTtlFiles) {
3452 const int kNumKeysPerFile = 32;
3453 const int kNumLevelFiles = 2;
3454 const int kValueSize = 1024;
3455
3456 Options options = CurrentOptions();
3457 options.compression = kNoCompression;
3458 options.ttl = 24 * 60 * 60; // 24 hours
3459 options.max_open_files = -1;
3460 env_->time_elapse_only_sleep_ = false;
3461 options.env = env_;
3462
3463 env_->addon_time_.store(0);
3464 DestroyAndReopen(options);
3465
3466 Random rnd(301);
3467 for (int i = 0; i < kNumLevelFiles; ++i) {
3468 for (int j = 0; j < kNumKeysPerFile; ++j) {
3469 ASSERT_OK(
3470 Put(Key(i * kNumKeysPerFile + j), RandomString(&rnd, kValueSize)));
3471 }
3472 Flush();
3473 }
3474 dbfull()->TEST_WaitForCompact();
3475 MoveFilesToLevel(3);
3476 ASSERT_EQ("0,0,0,2", FilesPerLevel());
3477
3478 // Delete previously written keys.
3479 for (int i = 0; i < kNumLevelFiles; ++i) {
3480 for (int j = 0; j < kNumKeysPerFile; ++j) {
3481 ASSERT_OK(Delete(Key(i * kNumKeysPerFile + j)));
3482 }
3483 Flush();
3484 }
3485 dbfull()->TEST_WaitForCompact();
3486 ASSERT_EQ("2,0,0,2", FilesPerLevel());
3487 MoveFilesToLevel(1);
3488 ASSERT_EQ("0,2,0,2", FilesPerLevel());
3489
3490 env_->addon_time_.fetch_add(36 * 60 * 60); // 36 hours
3491 ASSERT_EQ("0,2,0,2", FilesPerLevel());
3492
3493 // Just do a simple write + flush so that the Ttl expired files get
3494 // compacted.
3495 ASSERT_OK(Put("a", "1"));
3496 Flush();
3497 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3498 "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
3499 Compaction* compaction = reinterpret_cast<Compaction*>(arg);
3500 ASSERT_TRUE(compaction->compaction_reason() == CompactionReason::kTtl);
3501 });
3502 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3503 dbfull()->TEST_WaitForCompact();
3504 // All non-L0 files are deleted, as they contained only deleted data.
3505 ASSERT_EQ("1", FilesPerLevel());
3506 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3507
3508 // Test dynamically changing ttl.
3509
3510 env_->addon_time_.store(0);
3511 DestroyAndReopen(options);
3512
3513 for (int i = 0; i < kNumLevelFiles; ++i) {
3514 for (int j = 0; j < kNumKeysPerFile; ++j) {
3515 ASSERT_OK(
3516 Put(Key(i * kNumKeysPerFile + j), RandomString(&rnd, kValueSize)));
3517 }
3518 Flush();
3519 }
3520 dbfull()->TEST_WaitForCompact();
3521 MoveFilesToLevel(3);
3522 ASSERT_EQ("0,0,0,2", FilesPerLevel());
3523
3524 // Delete previously written keys.
3525 for (int i = 0; i < kNumLevelFiles; ++i) {
3526 for (int j = 0; j < kNumKeysPerFile; ++j) {
3527 ASSERT_OK(Delete(Key(i * kNumKeysPerFile + j)));
3528 }
3529 Flush();
3530 }
3531 dbfull()->TEST_WaitForCompact();
3532 ASSERT_EQ("2,0,0,2", FilesPerLevel());
3533 MoveFilesToLevel(1);
3534 ASSERT_EQ("0,2,0,2", FilesPerLevel());
3535
3536 // Move time forward by 12 hours, and make sure that compaction still doesn't
3537 // trigger as ttl is set to 24 hours.
3538 env_->addon_time_.fetch_add(12 * 60 * 60);
3539 ASSERT_OK(Put("a", "1"));
3540 Flush();
3541 dbfull()->TEST_WaitForCompact();
3542 ASSERT_EQ("1,2,0,2", FilesPerLevel());
3543
3544 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3545 "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
3546 Compaction* compaction = reinterpret_cast<Compaction*>(arg);
3547 ASSERT_TRUE(compaction->compaction_reason() == CompactionReason::kTtl);
3548 });
3549 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3550
3551 // Dynamically change ttl to 10 hours.
3552 // This should trigger a ttl compaction, as 12 hours have already passed.
3553 ASSERT_OK(dbfull()->SetOptions({{"ttl", "36000"}}));
3554 dbfull()->TEST_WaitForCompact();
3555 // All non-L0 files are deleted, as they contained only deleted data.
3556 ASSERT_EQ("1", FilesPerLevel());
3557 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3558 }
3559
TEST_F(DBCompactionTest,LevelTtlCascadingCompactions)3560 TEST_F(DBCompactionTest, LevelTtlCascadingCompactions) {
3561 const int kValueSize = 100;
3562
3563 for (bool if_restart : {false, true}) {
3564 for (bool if_open_all_files : {false, true}) {
3565 Options options = CurrentOptions();
3566 options.compression = kNoCompression;
3567 options.ttl = 24 * 60 * 60; // 24 hours
3568 if (if_open_all_files) {
3569 options.max_open_files = -1;
3570 } else {
3571 options.max_open_files = 20;
3572 }
3573 // RocksDB sanitize max open files to at least 20. Modify it back.
3574 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3575 "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
3576 int* max_open_files = static_cast<int*>(arg);
3577 *max_open_files = 2;
3578 });
3579 // In the case where all files are opened and doing DB restart
3580 // forcing the oldest ancester time in manifest file to be 0 to
3581 // simulate the case of reading from an old version.
3582 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3583 "VersionEdit::EncodeTo:VarintOldestAncesterTime", [&](void* arg) {
3584 if (if_restart && if_open_all_files) {
3585 std::string* encoded_fieled = static_cast<std::string*>(arg);
3586 *encoded_fieled = "";
3587 PutVarint64(encoded_fieled, 0);
3588 }
3589 });
3590
3591 env_->time_elapse_only_sleep_ = false;
3592 options.env = env_;
3593
3594 env_->addon_time_.store(0);
3595 DestroyAndReopen(options);
3596
3597 int ttl_compactions = 0;
3598 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3599 "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
3600 Compaction* compaction = reinterpret_cast<Compaction*>(arg);
3601 auto compaction_reason = compaction->compaction_reason();
3602 if (compaction_reason == CompactionReason::kTtl) {
3603 ttl_compactions++;
3604 }
3605 });
3606 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3607
3608 // Add two L6 files with key ranges: [1 .. 100], [101 .. 200].
3609 Random rnd(301);
3610 for (int i = 1; i <= 100; ++i) {
3611 ASSERT_OK(Put(Key(i), RandomString(&rnd, kValueSize)));
3612 }
3613 Flush();
3614 // Get the first file's creation time. This will be the oldest file in the
3615 // DB. Compactions inolving this file's descendents should keep getting
3616 // this time.
3617 std::vector<std::vector<FileMetaData>> level_to_files;
3618 dbfull()->TEST_GetFilesMetaData(dbfull()->DefaultColumnFamily(),
3619 &level_to_files);
3620 uint64_t oldest_time = level_to_files[0][0].oldest_ancester_time;
3621 // Add 1 hour and do another flush.
3622 env_->addon_time_.fetch_add(1 * 60 * 60);
3623 for (int i = 101; i <= 200; ++i) {
3624 ASSERT_OK(Put(Key(i), RandomString(&rnd, kValueSize)));
3625 }
3626 Flush();
3627 MoveFilesToLevel(6);
3628 ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel());
3629
3630 env_->addon_time_.fetch_add(1 * 60 * 60);
3631 // Add two L4 files with key ranges: [1 .. 50], [51 .. 150].
3632 for (int i = 1; i <= 50; ++i) {
3633 ASSERT_OK(Put(Key(i), RandomString(&rnd, kValueSize)));
3634 }
3635 Flush();
3636 env_->addon_time_.fetch_add(1 * 60 * 60);
3637 for (int i = 51; i <= 150; ++i) {
3638 ASSERT_OK(Put(Key(i), RandomString(&rnd, kValueSize)));
3639 }
3640 Flush();
3641 MoveFilesToLevel(4);
3642 ASSERT_EQ("0,0,0,0,2,0,2", FilesPerLevel());
3643
3644 env_->addon_time_.fetch_add(1 * 60 * 60);
3645 // Add one L1 file with key range: [26, 75].
3646 for (int i = 26; i <= 75; ++i) {
3647 ASSERT_OK(Put(Key(i), RandomString(&rnd, kValueSize)));
3648 }
3649 Flush();
3650 dbfull()->TEST_WaitForCompact();
3651 MoveFilesToLevel(1);
3652 ASSERT_EQ("0,1,0,0,2,0,2", FilesPerLevel());
3653
3654 // LSM tree:
3655 // L1: [26 .. 75]
3656 // L4: [1 .. 50][51 ..... 150]
3657 // L6: [1 ........ 100][101 .... 200]
3658 //
3659 // On TTL expiry, TTL compaction should be initiated on L1 file, and the
3660 // compactions should keep going on until the key range hits bottom level.
3661 // In other words: the compaction on this data range "cascasdes" until
3662 // reaching the bottom level.
3663 //
3664 // Order of events on TTL expiry:
3665 // 1. L1 file falls to L3 via 2 trivial moves which are initiated by the
3666 // ttl
3667 // compaction.
3668 // 2. A TTL compaction happens between L3 and L4 files. Output file in L4.
3669 // 3. The new output file from L4 falls to L5 via 1 trival move initiated
3670 // by the ttl compaction.
3671 // 4. A TTL compaction happens between L5 and L6 files. Ouptut in L6.
3672
3673 // Add 25 hours and do a write
3674 env_->addon_time_.fetch_add(25 * 60 * 60);
3675
3676 ASSERT_OK(Put(Key(1), "1"));
3677 if (if_restart) {
3678 Reopen(options);
3679 } else {
3680 Flush();
3681 }
3682 dbfull()->TEST_WaitForCompact();
3683 ASSERT_EQ("1,0,0,0,0,0,1", FilesPerLevel());
3684 ASSERT_EQ(5, ttl_compactions);
3685
3686 dbfull()->TEST_GetFilesMetaData(dbfull()->DefaultColumnFamily(),
3687 &level_to_files);
3688 ASSERT_EQ(oldest_time, level_to_files[6][0].oldest_ancester_time);
3689
3690 env_->addon_time_.fetch_add(25 * 60 * 60);
3691 ASSERT_OK(Put(Key(2), "1"));
3692 if (if_restart) {
3693 Reopen(options);
3694 } else {
3695 Flush();
3696 }
3697 dbfull()->TEST_WaitForCompact();
3698 ASSERT_EQ("1,0,0,0,0,0,1", FilesPerLevel());
3699 ASSERT_GE(ttl_compactions, 6);
3700
3701 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3702 }
3703 }
3704 }
3705
TEST_F(DBCompactionTest,LevelPeriodicCompaction)3706 TEST_F(DBCompactionTest, LevelPeriodicCompaction) {
3707 const int kNumKeysPerFile = 32;
3708 const int kNumLevelFiles = 2;
3709 const int kValueSize = 100;
3710
3711 for (bool if_restart : {false, true}) {
3712 for (bool if_open_all_files : {false, true}) {
3713 Options options = CurrentOptions();
3714 options.periodic_compaction_seconds = 48 * 60 * 60; // 2 days
3715 if (if_open_all_files) {
3716 options.max_open_files = -1; // needed for ttl compaction
3717 } else {
3718 options.max_open_files = 20;
3719 }
3720 // RocksDB sanitize max open files to at least 20. Modify it back.
3721 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3722 "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
3723 int* max_open_files = static_cast<int*>(arg);
3724 *max_open_files = 0;
3725 });
3726 // In the case where all files are opened and doing DB restart
3727 // forcing the file creation time in manifest file to be 0 to
3728 // simulate the case of reading from an old version.
3729 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3730 "VersionEdit::EncodeTo:VarintFileCreationTime", [&](void* arg) {
3731 if (if_restart && if_open_all_files) {
3732 std::string* encoded_fieled = static_cast<std::string*>(arg);
3733 *encoded_fieled = "";
3734 PutVarint64(encoded_fieled, 0);
3735 }
3736 });
3737
3738 env_->time_elapse_only_sleep_ = false;
3739 options.env = env_;
3740
3741 env_->addon_time_.store(0);
3742 DestroyAndReopen(options);
3743
3744 int periodic_compactions = 0;
3745 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3746 "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
3747 Compaction* compaction = reinterpret_cast<Compaction*>(arg);
3748 auto compaction_reason = compaction->compaction_reason();
3749 if (compaction_reason == CompactionReason::kPeriodicCompaction) {
3750 periodic_compactions++;
3751 }
3752 });
3753 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3754
3755 Random rnd(301);
3756 for (int i = 0; i < kNumLevelFiles; ++i) {
3757 for (int j = 0; j < kNumKeysPerFile; ++j) {
3758 ASSERT_OK(Put(Key(i * kNumKeysPerFile + j),
3759 RandomString(&rnd, kValueSize)));
3760 }
3761 Flush();
3762 }
3763 dbfull()->TEST_WaitForCompact();
3764
3765 ASSERT_EQ("2", FilesPerLevel());
3766 ASSERT_EQ(0, periodic_compactions);
3767
3768 // Add 50 hours and do a write
3769 env_->addon_time_.fetch_add(50 * 60 * 60);
3770 ASSERT_OK(Put("a", "1"));
3771 Flush();
3772 dbfull()->TEST_WaitForCompact();
3773 // Assert that the files stay in the same level
3774 ASSERT_EQ("3", FilesPerLevel());
3775 // The two old files go through the periodic compaction process
3776 ASSERT_EQ(2, periodic_compactions);
3777
3778 MoveFilesToLevel(1);
3779 ASSERT_EQ("0,3", FilesPerLevel());
3780
3781 // Add another 50 hours and do another write
3782 env_->addon_time_.fetch_add(50 * 60 * 60);
3783 ASSERT_OK(Put("b", "2"));
3784 if (if_restart) {
3785 Reopen(options);
3786 } else {
3787 Flush();
3788 }
3789 dbfull()->TEST_WaitForCompact();
3790 ASSERT_EQ("1,3", FilesPerLevel());
3791 // The three old files now go through the periodic compaction process. 2
3792 // + 3.
3793 ASSERT_EQ(5, periodic_compactions);
3794
3795 // Add another 50 hours and do another write
3796 env_->addon_time_.fetch_add(50 * 60 * 60);
3797 ASSERT_OK(Put("c", "3"));
3798 Flush();
3799 dbfull()->TEST_WaitForCompact();
3800 ASSERT_EQ("2,3", FilesPerLevel());
3801 // The four old files now go through the periodic compaction process. 5
3802 // + 4.
3803 ASSERT_EQ(9, periodic_compactions);
3804
3805 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3806 }
3807 }
3808 }
3809
TEST_F(DBCompactionTest,LevelPeriodicCompactionWithOldDB)3810 TEST_F(DBCompactionTest, LevelPeriodicCompactionWithOldDB) {
3811 // This test makes sure that periodic compactions are working with a DB
3812 // where file_creation_time of some files is 0.
3813 // After compactions the new files are created with a valid file_creation_time
3814
3815 const int kNumKeysPerFile = 32;
3816 const int kNumFiles = 4;
3817 const int kValueSize = 100;
3818
3819 Options options = CurrentOptions();
3820 env_->time_elapse_only_sleep_ = false;
3821 options.env = env_;
3822
3823 env_->addon_time_.store(0);
3824 DestroyAndReopen(options);
3825
3826 int periodic_compactions = 0;
3827 bool set_file_creation_time_to_zero = true;
3828 bool set_creation_time_to_zero = true;
3829 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3830 "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
3831 Compaction* compaction = reinterpret_cast<Compaction*>(arg);
3832 auto compaction_reason = compaction->compaction_reason();
3833 if (compaction_reason == CompactionReason::kPeriodicCompaction) {
3834 periodic_compactions++;
3835 }
3836 });
3837 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3838 "PropertyBlockBuilder::AddTableProperty:Start", [&](void* arg) {
3839 TableProperties* props = reinterpret_cast<TableProperties*>(arg);
3840 if (set_file_creation_time_to_zero) {
3841 props->file_creation_time = 0;
3842 }
3843 if (set_creation_time_to_zero) {
3844 props->creation_time = 0;
3845 }
3846 });
3847 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3848
3849 Random rnd(301);
3850 for (int i = 0; i < kNumFiles; ++i) {
3851 for (int j = 0; j < kNumKeysPerFile; ++j) {
3852 ASSERT_OK(
3853 Put(Key(i * kNumKeysPerFile + j), RandomString(&rnd, kValueSize)));
3854 }
3855 Flush();
3856 // Move the first two files to L2.
3857 if (i == 1) {
3858 MoveFilesToLevel(2);
3859 set_creation_time_to_zero = false;
3860 }
3861 }
3862 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3863
3864 ASSERT_EQ("2,0,2", FilesPerLevel());
3865 ASSERT_EQ(0, periodic_compactions);
3866
3867 Close();
3868
3869 set_file_creation_time_to_zero = false;
3870 // Forward the clock by 2 days.
3871 env_->addon_time_.fetch_add(2 * 24 * 60 * 60);
3872 options.periodic_compaction_seconds = 1 * 24 * 60 * 60; // 1 day
3873
3874 Reopen(options);
3875 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3876 ASSERT_EQ("2,0,2", FilesPerLevel());
3877 // Make sure that all files go through periodic compaction.
3878 ASSERT_EQ(kNumFiles, periodic_compactions);
3879
3880 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3881 }
3882
TEST_F(DBCompactionTest,LevelPeriodicAndTtlCompaction)3883 TEST_F(DBCompactionTest, LevelPeriodicAndTtlCompaction) {
3884 const int kNumKeysPerFile = 32;
3885 const int kNumLevelFiles = 2;
3886 const int kValueSize = 100;
3887
3888 Options options = CurrentOptions();
3889 options.ttl = 10 * 60 * 60; // 10 hours
3890 options.periodic_compaction_seconds = 48 * 60 * 60; // 2 days
3891 options.max_open_files = -1; // needed for both periodic and ttl compactions
3892 env_->time_elapse_only_sleep_ = false;
3893 options.env = env_;
3894
3895 env_->addon_time_.store(0);
3896 DestroyAndReopen(options);
3897
3898 int periodic_compactions = 0;
3899 int ttl_compactions = 0;
3900 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3901 "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
3902 Compaction* compaction = reinterpret_cast<Compaction*>(arg);
3903 auto compaction_reason = compaction->compaction_reason();
3904 if (compaction_reason == CompactionReason::kPeriodicCompaction) {
3905 periodic_compactions++;
3906 } else if (compaction_reason == CompactionReason::kTtl) {
3907 ttl_compactions++;
3908 }
3909 });
3910 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3911
3912 Random rnd(301);
3913 for (int i = 0; i < kNumLevelFiles; ++i) {
3914 for (int j = 0; j < kNumKeysPerFile; ++j) {
3915 ASSERT_OK(
3916 Put(Key(i * kNumKeysPerFile + j), RandomString(&rnd, kValueSize)));
3917 }
3918 Flush();
3919 }
3920 dbfull()->TEST_WaitForCompact();
3921
3922 MoveFilesToLevel(3);
3923
3924 ASSERT_EQ("0,0,0,2", FilesPerLevel());
3925 ASSERT_EQ(0, periodic_compactions);
3926 ASSERT_EQ(0, ttl_compactions);
3927
3928 // Add some time greater than periodic_compaction_time.
3929 env_->addon_time_.fetch_add(50 * 60 * 60);
3930 ASSERT_OK(Put("a", "1"));
3931 Flush();
3932 dbfull()->TEST_WaitForCompact();
3933 // Files in the bottom level go through periodic compactions.
3934 ASSERT_EQ("1,0,0,2", FilesPerLevel());
3935 ASSERT_EQ(2, periodic_compactions);
3936 ASSERT_EQ(0, ttl_compactions);
3937
3938 // Add a little more time than ttl
3939 env_->addon_time_.fetch_add(11 * 60 * 60);
3940 ASSERT_OK(Put("b", "1"));
3941 Flush();
3942 dbfull()->TEST_WaitForCompact();
3943 // Notice that the previous file in level 1 falls down to the bottom level
3944 // due to ttl compactions, one level at a time.
3945 // And bottom level files don't get picked up for ttl compactions.
3946 ASSERT_EQ("1,0,0,3", FilesPerLevel());
3947 ASSERT_EQ(2, periodic_compactions);
3948 ASSERT_EQ(3, ttl_compactions);
3949
3950 // Add some time greater than periodic_compaction_time.
3951 env_->addon_time_.fetch_add(50 * 60 * 60);
3952 ASSERT_OK(Put("c", "1"));
3953 Flush();
3954 dbfull()->TEST_WaitForCompact();
3955 // Previous L0 file falls one level at a time to bottom level due to ttl.
3956 // And all 4 bottom files go through periodic compactions.
3957 ASSERT_EQ("1,0,0,4", FilesPerLevel());
3958 ASSERT_EQ(6, periodic_compactions);
3959 ASSERT_EQ(6, ttl_compactions);
3960
3961 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3962 }
3963
TEST_F(DBCompactionTest,LevelPeriodicCompactionWithCompactionFilters)3964 TEST_F(DBCompactionTest, LevelPeriodicCompactionWithCompactionFilters) {
3965 class TestCompactionFilter : public CompactionFilter {
3966 const char* Name() const override { return "TestCompactionFilter"; }
3967 };
3968 class TestCompactionFilterFactory : public CompactionFilterFactory {
3969 const char* Name() const override { return "TestCompactionFilterFactory"; }
3970 std::unique_ptr<CompactionFilter> CreateCompactionFilter(
3971 const CompactionFilter::Context& /*context*/) override {
3972 return std::unique_ptr<CompactionFilter>(new TestCompactionFilter());
3973 }
3974 };
3975
3976 const int kNumKeysPerFile = 32;
3977 const int kNumLevelFiles = 2;
3978 const int kValueSize = 100;
3979
3980 Random rnd(301);
3981
3982 Options options = CurrentOptions();
3983 TestCompactionFilter test_compaction_filter;
3984 env_->time_elapse_only_sleep_ = false;
3985 options.env = env_;
3986 env_->addon_time_.store(0);
3987
3988 enum CompactionFilterType {
3989 kUseCompactionFilter,
3990 kUseCompactionFilterFactory
3991 };
3992
3993 for (CompactionFilterType comp_filter_type :
3994 {kUseCompactionFilter, kUseCompactionFilterFactory}) {
3995 // Assert that periodic compactions are not enabled.
3996 ASSERT_EQ(port::kMaxUint64 - 1, options.periodic_compaction_seconds);
3997
3998 if (comp_filter_type == kUseCompactionFilter) {
3999 options.compaction_filter = &test_compaction_filter;
4000 options.compaction_filter_factory.reset();
4001 } else if (comp_filter_type == kUseCompactionFilterFactory) {
4002 options.compaction_filter = nullptr;
4003 options.compaction_filter_factory.reset(
4004 new TestCompactionFilterFactory());
4005 }
4006 DestroyAndReopen(options);
4007
4008 // periodic_compaction_seconds should be set to the sanitized value when
4009 // a compaction filter or a compaction filter factory is used.
4010 ASSERT_EQ(30 * 24 * 60 * 60,
4011 dbfull()->GetOptions().periodic_compaction_seconds);
4012
4013 int periodic_compactions = 0;
4014 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
4015 "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
4016 Compaction* compaction = reinterpret_cast<Compaction*>(arg);
4017 auto compaction_reason = compaction->compaction_reason();
4018 if (compaction_reason == CompactionReason::kPeriodicCompaction) {
4019 periodic_compactions++;
4020 }
4021 });
4022 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
4023
4024 for (int i = 0; i < kNumLevelFiles; ++i) {
4025 for (int j = 0; j < kNumKeysPerFile; ++j) {
4026 ASSERT_OK(
4027 Put(Key(i * kNumKeysPerFile + j), RandomString(&rnd, kValueSize)));
4028 }
4029 Flush();
4030 }
4031 dbfull()->TEST_WaitForCompact();
4032
4033 ASSERT_EQ("2", FilesPerLevel());
4034 ASSERT_EQ(0, periodic_compactions);
4035
4036 // Add 31 days and do a write
4037 env_->addon_time_.fetch_add(31 * 24 * 60 * 60);
4038 ASSERT_OK(Put("a", "1"));
4039 Flush();
4040 dbfull()->TEST_WaitForCompact();
4041 // Assert that the files stay in the same level
4042 ASSERT_EQ("3", FilesPerLevel());
4043 // The two old files go through the periodic compaction process
4044 ASSERT_EQ(2, periodic_compactions);
4045
4046 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
4047 }
4048 }
4049
TEST_F(DBCompactionTest,CompactRangeDelayedByL0FileCount)4050 TEST_F(DBCompactionTest, CompactRangeDelayedByL0FileCount) {
4051 // Verify that, when `CompactRangeOptions::allow_write_stall == false`, manual
4052 // compaction only triggers flush after it's sure stall won't be triggered for
4053 // L0 file count going too high.
4054 const int kNumL0FilesTrigger = 4;
4055 const int kNumL0FilesLimit = 8;
4056 // i == 0: verifies normal case where stall is avoided by delay
4057 // i == 1: verifies no delay in edge case where stall trigger is same as
4058 // compaction trigger, so stall can't be avoided
4059 for (int i = 0; i < 2; ++i) {
4060 Options options = CurrentOptions();
4061 options.level0_slowdown_writes_trigger = kNumL0FilesLimit;
4062 if (i == 0) {
4063 options.level0_file_num_compaction_trigger = kNumL0FilesTrigger;
4064 } else {
4065 options.level0_file_num_compaction_trigger = kNumL0FilesLimit;
4066 }
4067 Reopen(options);
4068
4069 if (i == 0) {
4070 // ensure the auto compaction doesn't finish until manual compaction has
4071 // had a chance to be delayed.
4072 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
4073 {{"DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait",
4074 "CompactionJob::Run():End"}});
4075 } else {
4076 // ensure the auto-compaction doesn't finish until manual compaction has
4077 // continued without delay.
4078 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
4079 {{"DBImpl::FlushMemTable:StallWaitDone",
4080 "CompactionJob::Run():End"}});
4081 }
4082 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
4083
4084 Random rnd(301);
4085 for (int j = 0; j < kNumL0FilesLimit - 1; ++j) {
4086 for (int k = 0; k < 2; ++k) {
4087 ASSERT_OK(Put(Key(k), RandomString(&rnd, 1024)));
4088 }
4089 Flush();
4090 }
4091 auto manual_compaction_thread = port::Thread([this]() {
4092 CompactRangeOptions cro;
4093 cro.allow_write_stall = false;
4094 db_->CompactRange(cro, nullptr, nullptr);
4095 });
4096
4097 manual_compaction_thread.join();
4098 dbfull()->TEST_WaitForCompact();
4099 ASSERT_EQ(0, NumTableFilesAtLevel(0));
4100 ASSERT_GT(NumTableFilesAtLevel(1), 0);
4101 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
4102 }
4103 }
4104
TEST_F(DBCompactionTest,CompactRangeDelayedByImmMemTableCount)4105 TEST_F(DBCompactionTest, CompactRangeDelayedByImmMemTableCount) {
4106 // Verify that, when `CompactRangeOptions::allow_write_stall == false`, manual
4107 // compaction only triggers flush after it's sure stall won't be triggered for
4108 // immutable memtable count going too high.
4109 const int kNumImmMemTableLimit = 8;
4110 // i == 0: verifies normal case where stall is avoided by delay
4111 // i == 1: verifies no delay in edge case where stall trigger is same as flush
4112 // trigger, so stall can't be avoided
4113 for (int i = 0; i < 2; ++i) {
4114 Options options = CurrentOptions();
4115 options.disable_auto_compactions = true;
4116 // the delay limit is one less than the stop limit. This test focuses on
4117 // avoiding delay limit, but this option sets stop limit, so add one.
4118 options.max_write_buffer_number = kNumImmMemTableLimit + 1;
4119 if (i == 1) {
4120 options.min_write_buffer_number_to_merge = kNumImmMemTableLimit;
4121 }
4122 Reopen(options);
4123
4124 if (i == 0) {
4125 // ensure the flush doesn't finish until manual compaction has had a
4126 // chance to be delayed.
4127 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
4128 {{"DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait",
4129 "FlushJob::WriteLevel0Table"}});
4130 } else {
4131 // ensure the flush doesn't finish until manual compaction has continued
4132 // without delay.
4133 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
4134 {{"DBImpl::FlushMemTable:StallWaitDone",
4135 "FlushJob::WriteLevel0Table"}});
4136 }
4137 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
4138
4139 Random rnd(301);
4140 for (int j = 0; j < kNumImmMemTableLimit - 1; ++j) {
4141 ASSERT_OK(Put(Key(0), RandomString(&rnd, 1024)));
4142 FlushOptions flush_opts;
4143 flush_opts.wait = false;
4144 flush_opts.allow_write_stall = true;
4145 dbfull()->Flush(flush_opts);
4146 }
4147
4148 auto manual_compaction_thread = port::Thread([this]() {
4149 CompactRangeOptions cro;
4150 cro.allow_write_stall = false;
4151 db_->CompactRange(cro, nullptr, nullptr);
4152 });
4153
4154 manual_compaction_thread.join();
4155 dbfull()->TEST_WaitForFlushMemTable();
4156 ASSERT_EQ(0, NumTableFilesAtLevel(0));
4157 ASSERT_GT(NumTableFilesAtLevel(1), 0);
4158 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
4159 }
4160 }
4161
TEST_F(DBCompactionTest,CompactRangeShutdownWhileDelayed)4162 TEST_F(DBCompactionTest, CompactRangeShutdownWhileDelayed) {
4163 // Verify that, when `CompactRangeOptions::allow_write_stall == false`, delay
4164 // does not hang if CF is dropped or DB is closed
4165 const int kNumL0FilesTrigger = 4;
4166 const int kNumL0FilesLimit = 8;
4167 Options options = CurrentOptions();
4168 options.level0_file_num_compaction_trigger = kNumL0FilesTrigger;
4169 options.level0_slowdown_writes_trigger = kNumL0FilesLimit;
4170 // i == 0: DB::DropColumnFamily() on CompactRange's target CF unblocks it
4171 // i == 1: DB::CancelAllBackgroundWork() unblocks CompactRange. This is to
4172 // simulate what happens during Close as we can't call Close (it
4173 // blocks on the auto-compaction, making a cycle).
4174 for (int i = 0; i < 2; ++i) {
4175 CreateAndReopenWithCF({"one"}, options);
4176 // The calls to close CF/DB wait until the manual compaction stalls.
4177 // The auto-compaction waits until the manual compaction finishes to ensure
4178 // the signal comes from closing CF/DB, not from compaction making progress.
4179 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
4180 {{"DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait",
4181 "DBCompactionTest::CompactRangeShutdownWhileDelayed:PreShutdown"},
4182 {"DBCompactionTest::CompactRangeShutdownWhileDelayed:PostManual",
4183 "CompactionJob::Run():End"}});
4184 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
4185
4186 Random rnd(301);
4187 for (int j = 0; j < kNumL0FilesLimit - 1; ++j) {
4188 for (int k = 0; k < 2; ++k) {
4189 ASSERT_OK(Put(1, Key(k), RandomString(&rnd, 1024)));
4190 }
4191 Flush(1);
4192 }
4193 auto manual_compaction_thread = port::Thread([this, i]() {
4194 CompactRangeOptions cro;
4195 cro.allow_write_stall = false;
4196 Status s = db_->CompactRange(cro, handles_[1], nullptr, nullptr);
4197 if (i == 0) {
4198 ASSERT_TRUE(db_->CompactRange(cro, handles_[1], nullptr, nullptr)
4199 .IsColumnFamilyDropped());
4200 } else {
4201 ASSERT_TRUE(db_->CompactRange(cro, handles_[1], nullptr, nullptr)
4202 .IsShutdownInProgress());
4203 }
4204 });
4205
4206 TEST_SYNC_POINT(
4207 "DBCompactionTest::CompactRangeShutdownWhileDelayed:PreShutdown");
4208 if (i == 0) {
4209 ASSERT_OK(db_->DropColumnFamily(handles_[1]));
4210 } else {
4211 dbfull()->CancelAllBackgroundWork(false /* wait */);
4212 }
4213 manual_compaction_thread.join();
4214 TEST_SYNC_POINT(
4215 "DBCompactionTest::CompactRangeShutdownWhileDelayed:PostManual");
4216 dbfull()->TEST_WaitForCompact();
4217 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
4218 }
4219 }
4220
TEST_F(DBCompactionTest,CompactRangeSkipFlushAfterDelay)4221 TEST_F(DBCompactionTest, CompactRangeSkipFlushAfterDelay) {
4222 // Verify that, when `CompactRangeOptions::allow_write_stall == false`,
4223 // CompactRange skips its flush if the delay is long enough that the memtables
4224 // existing at the beginning of the call have already been flushed.
4225 const int kNumL0FilesTrigger = 4;
4226 const int kNumL0FilesLimit = 8;
4227 Options options = CurrentOptions();
4228 options.level0_slowdown_writes_trigger = kNumL0FilesLimit;
4229 options.level0_file_num_compaction_trigger = kNumL0FilesTrigger;
4230 Reopen(options);
4231
4232 Random rnd(301);
4233 // The manual flush includes the memtable that was active when CompactRange
4234 // began. So it unblocks CompactRange and precludes its flush. Throughout the
4235 // test, stall conditions are upheld via high L0 file count.
4236 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
4237 {{"DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait",
4238 "DBCompactionTest::CompactRangeSkipFlushAfterDelay:PreFlush"},
4239 {"DBCompactionTest::CompactRangeSkipFlushAfterDelay:PostFlush",
4240 "DBImpl::FlushMemTable:StallWaitDone"},
4241 {"DBImpl::FlushMemTable:StallWaitDone", "CompactionJob::Run():End"}});
4242 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
4243
4244 //used for the delayable flushes
4245 FlushOptions flush_opts;
4246 flush_opts.allow_write_stall = true;
4247 for (int i = 0; i < kNumL0FilesLimit - 1; ++i) {
4248 for (int j = 0; j < 2; ++j) {
4249 ASSERT_OK(Put(Key(j), RandomString(&rnd, 1024)));
4250 }
4251 dbfull()->Flush(flush_opts);
4252 }
4253 auto manual_compaction_thread = port::Thread([this]() {
4254 CompactRangeOptions cro;
4255 cro.allow_write_stall = false;
4256 db_->CompactRange(cro, nullptr, nullptr);
4257 });
4258
4259 TEST_SYNC_POINT("DBCompactionTest::CompactRangeSkipFlushAfterDelay:PreFlush");
4260 Put(ToString(0), RandomString(&rnd, 1024));
4261 dbfull()->Flush(flush_opts);
4262 Put(ToString(0), RandomString(&rnd, 1024));
4263 TEST_SYNC_POINT("DBCompactionTest::CompactRangeSkipFlushAfterDelay:PostFlush");
4264 manual_compaction_thread.join();
4265
4266 // If CompactRange's flush was skipped, the final Put above will still be
4267 // in the active memtable.
4268 std::string num_keys_in_memtable;
4269 db_->GetProperty(DB::Properties::kNumEntriesActiveMemTable, &num_keys_in_memtable);
4270 ASSERT_EQ(ToString(1), num_keys_in_memtable);
4271
4272 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
4273 }
4274
TEST_F(DBCompactionTest,CompactRangeFlushOverlappingMemtable)4275 TEST_F(DBCompactionTest, CompactRangeFlushOverlappingMemtable) {
4276 // Verify memtable only gets flushed if it contains data overlapping the range
4277 // provided to `CompactRange`. Tests all kinds of overlap/non-overlap.
4278 const int kNumEndpointKeys = 5;
4279 std::string keys[kNumEndpointKeys] = {"a", "b", "c", "d", "e"};
4280 Options options = CurrentOptions();
4281 options.disable_auto_compactions = true;
4282 Reopen(options);
4283
4284 // One extra iteration for nullptr, which means left side of interval is
4285 // unbounded.
4286 for (int i = 0; i <= kNumEndpointKeys; ++i) {
4287 Slice begin;
4288 Slice* begin_ptr;
4289 if (i == 0) {
4290 begin_ptr = nullptr;
4291 } else {
4292 begin = keys[i - 1];
4293 begin_ptr = &begin;
4294 }
4295 // Start at `i` so right endpoint comes after left endpoint. One extra
4296 // iteration for nullptr, which means right side of interval is unbounded.
4297 for (int j = std::max(0, i - 1); j <= kNumEndpointKeys; ++j) {
4298 Slice end;
4299 Slice* end_ptr;
4300 if (j == kNumEndpointKeys) {
4301 end_ptr = nullptr;
4302 } else {
4303 end = keys[j];
4304 end_ptr = &end;
4305 }
4306 ASSERT_OK(Put("b", "val"));
4307 ASSERT_OK(Put("d", "val"));
4308 CompactRangeOptions compact_range_opts;
4309 ASSERT_OK(db_->CompactRange(compact_range_opts, begin_ptr, end_ptr));
4310
4311 uint64_t get_prop_tmp, num_memtable_entries = 0;
4312 ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kNumEntriesImmMemTables,
4313 &get_prop_tmp));
4314 num_memtable_entries += get_prop_tmp;
4315 ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kNumEntriesActiveMemTable,
4316 &get_prop_tmp));
4317 num_memtable_entries += get_prop_tmp;
4318 if (begin_ptr == nullptr || end_ptr == nullptr ||
4319 (i <= 4 && j >= 1 && (begin != "c" || end != "c"))) {
4320 // In this case `CompactRange`'s range overlapped in some way with the
4321 // memtable's range, so flush should've happened. Then "b" and "d" won't
4322 // be in the memtable.
4323 ASSERT_EQ(0, num_memtable_entries);
4324 } else {
4325 ASSERT_EQ(2, num_memtable_entries);
4326 // flush anyways to prepare for next iteration
4327 db_->Flush(FlushOptions());
4328 }
4329 }
4330 }
4331 }
4332
TEST_F(DBCompactionTest,CompactionStatsTest)4333 TEST_F(DBCompactionTest, CompactionStatsTest) {
4334 Options options = CurrentOptions();
4335 options.level0_file_num_compaction_trigger = 2;
4336 CompactionStatsCollector* collector = new CompactionStatsCollector();
4337 options.listeners.emplace_back(collector);
4338 DestroyAndReopen(options);
4339
4340 for (int i = 0; i < 32; i++) {
4341 for (int j = 0; j < 5000; j++) {
4342 Put(std::to_string(j), std::string(1, 'A'));
4343 }
4344 ASSERT_OK(Flush());
4345 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
4346 }
4347 dbfull()->TEST_WaitForCompact();
4348 ColumnFamilyHandleImpl* cfh =
4349 static_cast<ColumnFamilyHandleImpl*>(dbfull()->DefaultColumnFamily());
4350 ColumnFamilyData* cfd = cfh->cfd();
4351
4352 VerifyCompactionStats(*cfd, *collector);
4353 }
4354
TEST_F(DBCompactionTest,CompactFilesOutputRangeConflict)4355 TEST_F(DBCompactionTest, CompactFilesOutputRangeConflict) {
4356 // LSM setup:
4357 // L1: [ba bz]
4358 // L2: [a b] [c d]
4359 // L3: [a b] [c d]
4360 //
4361 // Thread 1: Thread 2:
4362 // Begin compacting all L2->L3
4363 // Compact [ba bz] L1->L3
4364 // End compacting all L2->L3
4365 //
4366 // The compaction operation in thread 2 should be disallowed because the range
4367 // overlaps with the compaction in thread 1, which also covers that range in
4368 // L3.
4369 Options options = CurrentOptions();
4370 FlushedFileCollector* collector = new FlushedFileCollector();
4371 options.listeners.emplace_back(collector);
4372 Reopen(options);
4373
4374 for (int level = 3; level >= 2; --level) {
4375 ASSERT_OK(Put("a", "val"));
4376 ASSERT_OK(Put("b", "val"));
4377 ASSERT_OK(Flush());
4378 ASSERT_OK(Put("c", "val"));
4379 ASSERT_OK(Put("d", "val"));
4380 ASSERT_OK(Flush());
4381 MoveFilesToLevel(level);
4382 }
4383 ASSERT_OK(Put("ba", "val"));
4384 ASSERT_OK(Put("bz", "val"));
4385 ASSERT_OK(Flush());
4386 MoveFilesToLevel(1);
4387
4388 SyncPoint::GetInstance()->LoadDependency({
4389 {"CompactFilesImpl:0",
4390 "DBCompactionTest::CompactFilesOutputRangeConflict:Thread2Begin"},
4391 {"DBCompactionTest::CompactFilesOutputRangeConflict:Thread2End",
4392 "CompactFilesImpl:1"},
4393 });
4394 SyncPoint::GetInstance()->EnableProcessing();
4395
4396 auto bg_thread = port::Thread([&]() {
4397 // Thread 1
4398 std::vector<std::string> filenames = collector->GetFlushedFiles();
4399 filenames.pop_back();
4400 ASSERT_OK(db_->CompactFiles(CompactionOptions(), filenames,
4401 3 /* output_level */));
4402 });
4403
4404 // Thread 2
4405 TEST_SYNC_POINT(
4406 "DBCompactionTest::CompactFilesOutputRangeConflict:Thread2Begin");
4407 std::string filename = collector->GetFlushedFiles().back();
4408 ASSERT_FALSE(
4409 db_->CompactFiles(CompactionOptions(), {filename}, 3 /* output_level */)
4410 .ok());
4411 TEST_SYNC_POINT(
4412 "DBCompactionTest::CompactFilesOutputRangeConflict:Thread2End");
4413
4414 bg_thread.join();
4415 }
4416
TEST_F(DBCompactionTest,CompactionHasEmptyOutput)4417 TEST_F(DBCompactionTest, CompactionHasEmptyOutput) {
4418 Options options = CurrentOptions();
4419 SstStatsCollector* collector = new SstStatsCollector();
4420 options.level0_file_num_compaction_trigger = 2;
4421 options.listeners.emplace_back(collector);
4422 Reopen(options);
4423
4424 // Make sure the L0 files overlap to prevent trivial move.
4425 ASSERT_OK(Put("a", "val"));
4426 ASSERT_OK(Put("b", "val"));
4427 ASSERT_OK(Flush());
4428 ASSERT_OK(Delete("a"));
4429 ASSERT_OK(Delete("b"));
4430 ASSERT_OK(Flush());
4431
4432 dbfull()->TEST_WaitForCompact();
4433 ASSERT_EQ(NumTableFilesAtLevel(0), 0);
4434 ASSERT_EQ(NumTableFilesAtLevel(1), 0);
4435
4436 // Expect one file creation to start for each flush, and zero for compaction
4437 // since no keys are written.
4438 ASSERT_EQ(2, collector->num_ssts_creation_started());
4439 }
4440
TEST_F(DBCompactionTest,CompactionLimiter)4441 TEST_F(DBCompactionTest, CompactionLimiter) {
4442 const int kNumKeysPerFile = 10;
4443 const int kMaxBackgroundThreads = 64;
4444
4445 struct CompactionLimiter {
4446 std::string name;
4447 int limit_tasks;
4448 int max_tasks;
4449 int tasks;
4450 std::shared_ptr<ConcurrentTaskLimiter> limiter;
4451 };
4452
4453 std::vector<CompactionLimiter> limiter_settings;
4454 limiter_settings.push_back({"limiter_1", 1, 0, 0, nullptr});
4455 limiter_settings.push_back({"limiter_2", 2, 0, 0, nullptr});
4456 limiter_settings.push_back({"limiter_3", 3, 0, 0, nullptr});
4457
4458 for (auto& ls : limiter_settings) {
4459 ls.limiter.reset(NewConcurrentTaskLimiter(ls.name, ls.limit_tasks));
4460 }
4461
4462 std::shared_ptr<ConcurrentTaskLimiter> unique_limiter(
4463 NewConcurrentTaskLimiter("unique_limiter", -1));
4464
4465 const char* cf_names[] = {"default", "0", "1", "2", "3", "4", "5",
4466 "6", "7", "8", "9", "a", "b", "c", "d", "e", "f" };
4467 const unsigned int cf_count = sizeof cf_names / sizeof cf_names[0];
4468
4469 std::unordered_map<std::string, CompactionLimiter*> cf_to_limiter;
4470
4471 Options options = CurrentOptions();
4472 options.write_buffer_size = 110 * 1024; // 110KB
4473 options.arena_block_size = 4096;
4474 options.num_levels = 3;
4475 options.level0_file_num_compaction_trigger = 4;
4476 options.level0_slowdown_writes_trigger = 64;
4477 options.level0_stop_writes_trigger = 64;
4478 options.max_background_jobs = kMaxBackgroundThreads; // Enough threads
4479 options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
4480 options.max_write_buffer_number = 10; // Enough memtables
4481 DestroyAndReopen(options);
4482
4483 std::vector<Options> option_vector;
4484 option_vector.reserve(cf_count);
4485
4486 for (unsigned int cf = 0; cf < cf_count; cf++) {
4487 ColumnFamilyOptions cf_opt(options);
4488 if (cf == 0) {
4489 // "Default" CF does't use compaction limiter
4490 cf_opt.compaction_thread_limiter = nullptr;
4491 } else if (cf == 1) {
4492 // "1" CF uses bypass compaction limiter
4493 unique_limiter->SetMaxOutstandingTask(-1);
4494 cf_opt.compaction_thread_limiter = unique_limiter;
4495 } else {
4496 // Assign limiter by mod
4497 auto& ls = limiter_settings[cf % 3];
4498 cf_opt.compaction_thread_limiter = ls.limiter;
4499 cf_to_limiter[cf_names[cf]] = &ls;
4500 }
4501 option_vector.emplace_back(DBOptions(options), cf_opt);
4502 }
4503
4504 for (unsigned int cf = 1; cf < cf_count; cf++) {
4505 CreateColumnFamilies({cf_names[cf]}, option_vector[cf]);
4506 }
4507
4508 ReopenWithColumnFamilies(std::vector<std::string>(cf_names,
4509 cf_names + cf_count),
4510 option_vector);
4511
4512 port::Mutex mutex;
4513
4514 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
4515 "DBImpl::BackgroundCompaction:BeforeCompaction", [&](void* arg) {
4516 const auto& cf_name = static_cast<ColumnFamilyData*>(arg)->GetName();
4517 auto iter = cf_to_limiter.find(cf_name);
4518 if (iter != cf_to_limiter.end()) {
4519 MutexLock l(&mutex);
4520 ASSERT_GE(iter->second->limit_tasks, ++iter->second->tasks);
4521 iter->second->max_tasks =
4522 std::max(iter->second->max_tasks, iter->second->limit_tasks);
4523 }
4524 });
4525
4526 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
4527 "DBImpl::BackgroundCompaction:AfterCompaction", [&](void* arg) {
4528 const auto& cf_name = static_cast<ColumnFamilyData*>(arg)->GetName();
4529 auto iter = cf_to_limiter.find(cf_name);
4530 if (iter != cf_to_limiter.end()) {
4531 MutexLock l(&mutex);
4532 ASSERT_GE(--iter->second->tasks, 0);
4533 }
4534 });
4535
4536 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
4537
4538 // Block all compact threads in thread pool.
4539 const size_t kTotalFlushTasks = kMaxBackgroundThreads / 4;
4540 const size_t kTotalCompactTasks = kMaxBackgroundThreads - kTotalFlushTasks;
4541 env_->SetBackgroundThreads((int)kTotalFlushTasks, Env::HIGH);
4542 env_->SetBackgroundThreads((int)kTotalCompactTasks, Env::LOW);
4543
4544 test::SleepingBackgroundTask sleeping_compact_tasks[kTotalCompactTasks];
4545
4546 // Block all compaction threads in thread pool.
4547 for (size_t i = 0; i < kTotalCompactTasks; i++) {
4548 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
4549 &sleeping_compact_tasks[i], Env::LOW);
4550 sleeping_compact_tasks[i].WaitUntilSleeping();
4551 }
4552
4553 int keyIndex = 0;
4554
4555 for (int n = 0; n < options.level0_file_num_compaction_trigger; n++) {
4556 for (unsigned int cf = 0; cf < cf_count; cf++) {
4557 for (int i = 0; i < kNumKeysPerFile; i++) {
4558 ASSERT_OK(Put(cf, Key(keyIndex++), ""));
4559 }
4560 // put extra key to trigger flush
4561 ASSERT_OK(Put(cf, "", ""));
4562 }
4563
4564 for (unsigned int cf = 0; cf < cf_count; cf++) {
4565 dbfull()->TEST_WaitForFlushMemTable(handles_[cf]);
4566 }
4567 }
4568
4569 // Enough L0 files to trigger compaction
4570 for (unsigned int cf = 0; cf < cf_count; cf++) {
4571 ASSERT_EQ(NumTableFilesAtLevel(0, cf),
4572 options.level0_file_num_compaction_trigger);
4573 }
4574
4575 // Create more files for one column family, which triggers speed up
4576 // condition, all compactions will be scheduled.
4577 for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) {
4578 for (int i = 0; i < kNumKeysPerFile; i++) {
4579 ASSERT_OK(Put(0, Key(i), ""));
4580 }
4581 // put extra key to trigger flush
4582 ASSERT_OK(Put(0, "", ""));
4583 dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
4584 ASSERT_EQ(options.level0_file_num_compaction_trigger + num + 1,
4585 NumTableFilesAtLevel(0, 0));
4586 }
4587
4588 // All CFs are pending compaction
4589 ASSERT_EQ(cf_count, env_->GetThreadPoolQueueLen(Env::LOW));
4590
4591 // Unblock all compaction threads
4592 for (size_t i = 0; i < kTotalCompactTasks; i++) {
4593 sleeping_compact_tasks[i].WakeUp();
4594 sleeping_compact_tasks[i].WaitUntilDone();
4595 }
4596
4597 for (unsigned int cf = 0; cf < cf_count; cf++) {
4598 dbfull()->TEST_WaitForFlushMemTable(handles_[cf]);
4599 }
4600
4601 ASSERT_OK(dbfull()->TEST_WaitForCompact());
4602
4603 // Max outstanding compact tasks reached limit
4604 for (auto& ls : limiter_settings) {
4605 ASSERT_EQ(ls.limit_tasks, ls.max_tasks);
4606 ASSERT_EQ(0, ls.limiter->GetOutstandingTask());
4607 }
4608
4609 // test manual compaction under a fully throttled limiter
4610 int cf_test = 1;
4611 unique_limiter->SetMaxOutstandingTask(0);
4612
4613 // flush one more file to cf 1
4614 for (int i = 0; i < kNumKeysPerFile; i++) {
4615 ASSERT_OK(Put(cf_test, Key(keyIndex++), ""));
4616 }
4617 // put extra key to trigger flush
4618 ASSERT_OK(Put(cf_test, "", ""));
4619
4620 dbfull()->TEST_WaitForFlushMemTable(handles_[cf_test]);
4621 ASSERT_EQ(1, NumTableFilesAtLevel(0, cf_test));
4622
4623 Compact(cf_test, Key(0), Key(keyIndex));
4624 ASSERT_OK(dbfull()->TEST_WaitForCompact());
4625 }
4626
4627 INSTANTIATE_TEST_CASE_P(DBCompactionTestWithParam, DBCompactionTestWithParam,
4628 ::testing::Values(std::make_tuple(1, true),
4629 std::make_tuple(1, false),
4630 std::make_tuple(4, true),
4631 std::make_tuple(4, false)));
4632
TEST_P(DBCompactionDirectIOTest,DirectIO)4633 TEST_P(DBCompactionDirectIOTest, DirectIO) {
4634 Options options = CurrentOptions();
4635 Destroy(options);
4636 options.create_if_missing = true;
4637 options.disable_auto_compactions = true;
4638 options.use_direct_io_for_flush_and_compaction = GetParam();
4639 options.env = new MockEnv(Env::Default());
4640 Reopen(options);
4641 bool readahead = false;
4642 SyncPoint::GetInstance()->SetCallBack(
4643 "CompactionJob::OpenCompactionOutputFile", [&](void* arg) {
4644 bool* use_direct_writes = static_cast<bool*>(arg);
4645 ASSERT_EQ(*use_direct_writes,
4646 options.use_direct_io_for_flush_and_compaction);
4647 });
4648 if (options.use_direct_io_for_flush_and_compaction) {
4649 SyncPoint::GetInstance()->SetCallBack(
4650 "SanitizeOptions:direct_io", [&](void* /*arg*/) {
4651 readahead = true;
4652 });
4653 }
4654 SyncPoint::GetInstance()->EnableProcessing();
4655 CreateAndReopenWithCF({"pikachu"}, options);
4656 MakeTables(3, "p", "q", 1);
4657 ASSERT_EQ("1,1,1", FilesPerLevel(1));
4658 Compact(1, "p1", "p9");
4659 ASSERT_EQ(readahead, options.use_direct_reads);
4660 ASSERT_EQ("0,0,1", FilesPerLevel(1));
4661 Destroy(options);
4662 delete options.env;
4663 }
4664
4665 INSTANTIATE_TEST_CASE_P(DBCompactionDirectIOTest, DBCompactionDirectIOTest,
4666 testing::Bool());
4667
4668 class CompactionPriTest : public DBTestBase,
4669 public testing::WithParamInterface<uint32_t> {
4670 public:
CompactionPriTest()4671 CompactionPriTest() : DBTestBase("/compaction_pri_test") {
4672 compaction_pri_ = GetParam();
4673 }
4674
4675 // Required if inheriting from testing::WithParamInterface<>
SetUpTestCase()4676 static void SetUpTestCase() {}
TearDownTestCase()4677 static void TearDownTestCase() {}
4678
4679 uint32_t compaction_pri_;
4680 };
4681
TEST_P(CompactionPriTest,Test)4682 TEST_P(CompactionPriTest, Test) {
4683 Options options = CurrentOptions();
4684 options.write_buffer_size = 16 * 1024;
4685 options.compaction_pri = static_cast<CompactionPri>(compaction_pri_);
4686 options.hard_pending_compaction_bytes_limit = 256 * 1024;
4687 options.max_bytes_for_level_base = 64 * 1024;
4688 options.max_bytes_for_level_multiplier = 4;
4689 options.compression = kNoCompression;
4690
4691 DestroyAndReopen(options);
4692
4693 Random rnd(301);
4694 const int kNKeys = 5000;
4695 int keys[kNKeys];
4696 for (int i = 0; i < kNKeys; i++) {
4697 keys[i] = i;
4698 }
4699 std::random_shuffle(std::begin(keys), std::end(keys));
4700
4701 for (int i = 0; i < kNKeys; i++) {
4702 ASSERT_OK(Put(Key(keys[i]), RandomString(&rnd, 102)));
4703 }
4704
4705 dbfull()->TEST_WaitForCompact();
4706 for (int i = 0; i < kNKeys; i++) {
4707 ASSERT_NE("NOT_FOUND", Get(Key(i)));
4708 }
4709 }
4710
4711 INSTANTIATE_TEST_CASE_P(
4712 CompactionPriTest, CompactionPriTest,
4713 ::testing::Values(CompactionPri::kByCompensatedSize,
4714 CompactionPri::kOldestLargestSeqFirst,
4715 CompactionPri::kOldestSmallestSeqFirst,
4716 CompactionPri::kMinOverlappingRatio));
4717
4718 class NoopMergeOperator : public MergeOperator {
4719 public:
NoopMergeOperator()4720 NoopMergeOperator() {}
4721
FullMergeV2(const MergeOperationInput &,MergeOperationOutput * merge_out) const4722 bool FullMergeV2(const MergeOperationInput& /*merge_in*/,
4723 MergeOperationOutput* merge_out) const override {
4724 std::string val("bar");
4725 merge_out->new_value = val;
4726 return true;
4727 }
4728
Name() const4729 const char* Name() const override { return "Noop"; }
4730 };
4731
TEST_F(DBCompactionTest,PartialManualCompaction)4732 TEST_F(DBCompactionTest, PartialManualCompaction) {
4733 Options opts = CurrentOptions();
4734 opts.num_levels = 3;
4735 opts.level0_file_num_compaction_trigger = 10;
4736 opts.compression = kNoCompression;
4737 opts.merge_operator.reset(new NoopMergeOperator());
4738 opts.target_file_size_base = 10240;
4739 DestroyAndReopen(opts);
4740
4741 Random rnd(301);
4742 for (auto i = 0; i < 8; ++i) {
4743 for (auto j = 0; j < 10; ++j) {
4744 Merge("foo", RandomString(&rnd, 1024));
4745 }
4746 Flush();
4747 }
4748
4749 MoveFilesToLevel(2);
4750
4751 std::string prop;
4752 EXPECT_TRUE(dbfull()->GetProperty(DB::Properties::kLiveSstFilesSize, &prop));
4753 uint64_t max_compaction_bytes = atoi(prop.c_str()) / 2;
4754 ASSERT_OK(dbfull()->SetOptions(
4755 {{"max_compaction_bytes", std::to_string(max_compaction_bytes)}}));
4756
4757 CompactRangeOptions cro;
4758 cro.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized;
4759 dbfull()->CompactRange(cro, nullptr, nullptr);
4760 }
4761
TEST_F(DBCompactionTest,ManualCompactionFailsInReadOnlyMode)4762 TEST_F(DBCompactionTest, ManualCompactionFailsInReadOnlyMode) {
4763 // Regression test for bug where manual compaction hangs forever when the DB
4764 // is in read-only mode. Verify it now at least returns, despite failing.
4765 const int kNumL0Files = 4;
4766 std::unique_ptr<FaultInjectionTestEnv> mock_env(
4767 new FaultInjectionTestEnv(Env::Default()));
4768 Options opts = CurrentOptions();
4769 opts.disable_auto_compactions = true;
4770 opts.env = mock_env.get();
4771 DestroyAndReopen(opts);
4772
4773 Random rnd(301);
4774 for (int i = 0; i < kNumL0Files; ++i) {
4775 // Make sure files are overlapping in key-range to prevent trivial move.
4776 Put("key1", RandomString(&rnd, 1024));
4777 Put("key2", RandomString(&rnd, 1024));
4778 Flush();
4779 }
4780 ASSERT_EQ(kNumL0Files, NumTableFilesAtLevel(0));
4781
4782 // Enter read-only mode by failing a write.
4783 mock_env->SetFilesystemActive(false);
4784 // Make sure this is outside `CompactRange`'s range so that it doesn't fail
4785 // early trying to flush memtable.
4786 ASSERT_NOK(Put("key3", RandomString(&rnd, 1024)));
4787
4788 // In the bug scenario, the first manual compaction would fail and forget to
4789 // unregister itself, causing the second one to hang forever due to conflict
4790 // with a non-running compaction.
4791 CompactRangeOptions cro;
4792 cro.exclusive_manual_compaction = false;
4793 Slice begin_key("key1");
4794 Slice end_key("key2");
4795 ASSERT_NOK(dbfull()->CompactRange(cro, &begin_key, &end_key));
4796 ASSERT_NOK(dbfull()->CompactRange(cro, &begin_key, &end_key));
4797
4798 // Close before mock_env destruct.
4799 Close();
4800 }
4801
4802 // ManualCompactionBottomLevelOptimization tests the bottom level manual
4803 // compaction optimization to skip recompacting files created by Ln-1 to Ln
4804 // compaction
TEST_F(DBCompactionTest,ManualCompactionBottomLevelOptimized)4805 TEST_F(DBCompactionTest, ManualCompactionBottomLevelOptimized) {
4806 Options opts = CurrentOptions();
4807 opts.num_levels = 3;
4808 opts.level0_file_num_compaction_trigger = 5;
4809 opts.compression = kNoCompression;
4810 opts.merge_operator.reset(new NoopMergeOperator());
4811 opts.target_file_size_base = 1024;
4812 opts.max_bytes_for_level_multiplier = 2;
4813 opts.disable_auto_compactions = true;
4814 DestroyAndReopen(opts);
4815 ColumnFamilyHandleImpl* cfh =
4816 static_cast<ColumnFamilyHandleImpl*>(dbfull()->DefaultColumnFamily());
4817 ColumnFamilyData* cfd = cfh->cfd();
4818 InternalStats* internal_stats_ptr = cfd->internal_stats();
4819 ASSERT_NE(internal_stats_ptr, nullptr);
4820
4821 Random rnd(301);
4822 for (auto i = 0; i < 8; ++i) {
4823 for (auto j = 0; j < 10; ++j) {
4824 ASSERT_OK(
4825 Put("foo" + std::to_string(i * 10 + j), RandomString(&rnd, 1024)));
4826 }
4827 Flush();
4828 }
4829
4830 MoveFilesToLevel(2);
4831
4832 for (auto i = 0; i < 8; ++i) {
4833 for (auto j = 0; j < 10; ++j) {
4834 ASSERT_OK(
4835 Put("bar" + std::to_string(i * 10 + j), RandomString(&rnd, 1024)));
4836 }
4837 Flush();
4838 }
4839 const std::vector<InternalStats::CompactionStats>& comp_stats =
4840 internal_stats_ptr->TEST_GetCompactionStats();
4841 int num = comp_stats[2].num_input_files_in_output_level;
4842 ASSERT_EQ(num, 0);
4843
4844 CompactRangeOptions cro;
4845 cro.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized;
4846 dbfull()->CompactRange(cro, nullptr, nullptr);
4847
4848 const std::vector<InternalStats::CompactionStats>& comp_stats2 =
4849 internal_stats_ptr->TEST_GetCompactionStats();
4850 num = comp_stats2[2].num_input_files_in_output_level;
4851 ASSERT_EQ(num, 0);
4852 }
4853
TEST_F(DBCompactionTest,CompactionDuringShutdown)4854 TEST_F(DBCompactionTest, CompactionDuringShutdown) {
4855 Options opts = CurrentOptions();
4856 opts.level0_file_num_compaction_trigger = 2;
4857 opts.disable_auto_compactions = true;
4858 DestroyAndReopen(opts);
4859 ColumnFamilyHandleImpl* cfh =
4860 static_cast<ColumnFamilyHandleImpl*>(dbfull()->DefaultColumnFamily());
4861 ColumnFamilyData* cfd = cfh->cfd();
4862 InternalStats* internal_stats_ptr = cfd->internal_stats();
4863 ASSERT_NE(internal_stats_ptr, nullptr);
4864
4865 Random rnd(301);
4866 for (auto i = 0; i < 2; ++i) {
4867 for (auto j = 0; j < 10; ++j) {
4868 ASSERT_OK(
4869 Put("foo" + std::to_string(i * 10 + j), RandomString(&rnd, 1024)));
4870 }
4871 Flush();
4872 }
4873
4874 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
4875 "DBImpl::BackgroundCompaction:NonTrivial:BeforeRun",
4876 [&](void* /*arg*/) { dbfull()->shutting_down_.store(true); });
4877 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
4878 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
4879 ASSERT_OK(dbfull()->error_handler_.GetBGError());
4880 }
4881
4882 // FixFileIngestionCompactionDeadlock tests and verifies that compaction and
4883 // file ingestion do not cause deadlock in the event of write stall triggered
4884 // by number of L0 files reaching level0_stop_writes_trigger.
TEST_P(DBCompactionTestWithParam,FixFileIngestionCompactionDeadlock)4885 TEST_P(DBCompactionTestWithParam, FixFileIngestionCompactionDeadlock) {
4886 const int kNumKeysPerFile = 100;
4887 // Generate SST files.
4888 Options options = CurrentOptions();
4889
4890 // Generate an external SST file containing a single key, i.e. 99
4891 std::string sst_files_dir = dbname_ + "/sst_files/";
4892 test::DestroyDir(env_, sst_files_dir);
4893 ASSERT_OK(env_->CreateDir(sst_files_dir));
4894 SstFileWriter sst_writer(EnvOptions(), options);
4895 const std::string sst_file_path = sst_files_dir + "test.sst";
4896 ASSERT_OK(sst_writer.Open(sst_file_path));
4897 ASSERT_OK(sst_writer.Put(Key(kNumKeysPerFile - 1), "value"));
4898 ASSERT_OK(sst_writer.Finish());
4899
4900 SyncPoint::GetInstance()->DisableProcessing();
4901 SyncPoint::GetInstance()->ClearAllCallBacks();
4902 SyncPoint::GetInstance()->LoadDependency({
4903 {"DBImpl::IngestExternalFile:AfterIncIngestFileCounter",
4904 "BackgroundCallCompaction:0"},
4905 });
4906 SyncPoint::GetInstance()->EnableProcessing();
4907
4908 options.write_buffer_size = 110 << 10; // 110KB
4909 options.level0_file_num_compaction_trigger =
4910 options.level0_stop_writes_trigger;
4911 options.max_subcompactions = max_subcompactions_;
4912 options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
4913 DestroyAndReopen(options);
4914 Random rnd(301);
4915
4916 // Generate level0_stop_writes_trigger L0 files to trigger write stop
4917 for (int i = 0; i != options.level0_file_num_compaction_trigger; ++i) {
4918 for (int j = 0; j != kNumKeysPerFile; ++j) {
4919 ASSERT_OK(Put(Key(j), RandomString(&rnd, 990)));
4920 }
4921 if (0 == i) {
4922 // When we reach here, the memtables have kNumKeysPerFile keys. Note that
4923 // flush is not yet triggered. We need to write an extra key so that the
4924 // write path will call PreprocessWrite and flush the previous key-value
4925 // pairs to e flushed. After that, there will be the newest key in the
4926 // memtable, and a bunch of L0 files. Since there is already one key in
4927 // the memtable, then for i = 1, 2, ..., we do not have to write this
4928 // extra key to trigger flush.
4929 ASSERT_OK(Put("", ""));
4930 }
4931 dbfull()->TEST_WaitForFlushMemTable();
4932 ASSERT_EQ(NumTableFilesAtLevel(0 /*level*/, 0 /*cf*/), i + 1);
4933 }
4934 // When we reach this point, there will be level0_stop_writes_trigger L0
4935 // files and one extra key (99) in memory, which overlaps with the external
4936 // SST file. Write stall triggers, and can be cleared only after compaction
4937 // reduces the number of L0 files.
4938
4939 // Compaction will also be triggered since we have reached the threshold for
4940 // auto compaction. Note that compaction may begin after the following file
4941 // ingestion thread and waits for ingestion to finish.
4942
4943 // Thread to ingest file with overlapping key range with the current
4944 // memtable. Consequently ingestion will trigger a flush. The flush MUST
4945 // proceed without waiting for the write stall condition to clear, otherwise
4946 // deadlock can happen.
4947 port::Thread ingestion_thr([&]() {
4948 IngestExternalFileOptions ifo;
4949 Status s = db_->IngestExternalFile({sst_file_path}, ifo);
4950 ASSERT_OK(s);
4951 });
4952
4953 // More write to trigger write stop
4954 ingestion_thr.join();
4955 ASSERT_OK(dbfull()->TEST_WaitForCompact());
4956 Close();
4957 }
4958
TEST_F(DBCompactionTest,ConsistencyFailTest)4959 TEST_F(DBCompactionTest, ConsistencyFailTest) {
4960 Options options = CurrentOptions();
4961 DestroyAndReopen(options);
4962
4963 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
4964 "VersionBuilder::CheckConsistency", [&](void* arg) {
4965 auto p =
4966 reinterpret_cast<std::pair<FileMetaData**, FileMetaData**>*>(arg);
4967 // just swap the two FileMetaData so that we hit error
4968 // in CheckConsistency funcion
4969 FileMetaData* temp = *(p->first);
4970 *(p->first) = *(p->second);
4971 *(p->second) = temp;
4972 });
4973
4974 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
4975
4976 for (int k = 0; k < 2; ++k) {
4977 ASSERT_OK(Put("foo", "bar"));
4978 Flush();
4979 }
4980
4981 ASSERT_NOK(Put("foo", "bar"));
4982 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
4983 }
4984
IngestOneKeyValue(DBImpl * db,const std::string & key,const std::string & value,const Options & options)4985 void IngestOneKeyValue(DBImpl* db, const std::string& key,
4986 const std::string& value, const Options& options) {
4987 ExternalSstFileInfo info;
4988 std::string f = test::PerThreadDBPath("sst_file" + key);
4989 EnvOptions env;
4990 ROCKSDB_NAMESPACE::SstFileWriter writer(env, options);
4991 auto s = writer.Open(f);
4992 ASSERT_OK(s);
4993 // ASSERT_OK(writer.Put(Key(), ""));
4994 ASSERT_OK(writer.Put(key, value));
4995
4996 ASSERT_OK(writer.Finish(&info));
4997 IngestExternalFileOptions ingest_opt;
4998
4999 ASSERT_OK(db->IngestExternalFile({info.file_path}, ingest_opt));
5000 }
5001
TEST_P(DBCompactionTestWithParam,FlushAfterIntraL0CompactionCheckConsistencyFail)5002 TEST_P(DBCompactionTestWithParam,
5003 FlushAfterIntraL0CompactionCheckConsistencyFail) {
5004 Options options = CurrentOptions();
5005 options.force_consistency_checks = true;
5006 options.compression = kNoCompression;
5007 options.level0_file_num_compaction_trigger = 5;
5008 options.max_background_compactions = 2;
5009 options.max_subcompactions = max_subcompactions_;
5010 DestroyAndReopen(options);
5011
5012 const size_t kValueSize = 1 << 20;
5013 Random rnd(301);
5014 std::atomic<int> pick_intra_l0_count(0);
5015 std::string value(RandomString(&rnd, kValueSize));
5016
5017 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
5018 {{"DBCompactionTestWithParam::FlushAfterIntraL0:1",
5019 "CompactionJob::Run():Start"}});
5020 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
5021 "FindIntraL0Compaction",
5022 [&](void* /*arg*/) { pick_intra_l0_count.fetch_add(1); });
5023
5024 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
5025
5026 // prevents trivial move
5027 for (int i = 0; i < 10; ++i) {
5028 ASSERT_OK(Put(Key(i), "")); // prevents trivial move
5029 }
5030 ASSERT_OK(Flush());
5031 Compact("", Key(99));
5032 ASSERT_EQ(0, NumTableFilesAtLevel(0));
5033
5034 // Flush 5 L0 sst.
5035 for (int i = 0; i < 5; ++i) {
5036 ASSERT_OK(Put(Key(i + 1), value));
5037 ASSERT_OK(Flush());
5038 }
5039 ASSERT_EQ(5, NumTableFilesAtLevel(0));
5040
5041 // Put one key, to make smallest log sequence number in this memtable is less
5042 // than sst which would be ingested in next step.
5043 ASSERT_OK(Put(Key(0), "a"));
5044
5045 ASSERT_EQ(5, NumTableFilesAtLevel(0));
5046
5047 // Ingest 5 L0 sst. And this files would trigger PickIntraL0Compaction.
5048 for (int i = 5; i < 10; i++) {
5049 IngestOneKeyValue(dbfull(), Key(i), value, options);
5050 ASSERT_EQ(i + 1, NumTableFilesAtLevel(0));
5051 }
5052
5053 TEST_SYNC_POINT("DBCompactionTestWithParam::FlushAfterIntraL0:1");
5054 // Put one key, to make biggest log sequence number in this memtable is bigger
5055 // than sst which would be ingested in next step.
5056 ASSERT_OK(Put(Key(2), "b"));
5057 ASSERT_EQ(10, NumTableFilesAtLevel(0));
5058 dbfull()->TEST_WaitForCompact();
5059 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
5060 std::vector<std::vector<FileMetaData>> level_to_files;
5061 dbfull()->TEST_GetFilesMetaData(dbfull()->DefaultColumnFamily(),
5062 &level_to_files);
5063 ASSERT_GT(level_to_files[0].size(), 0);
5064 ASSERT_GT(pick_intra_l0_count.load(), 0);
5065
5066 ASSERT_OK(Flush());
5067 }
5068
TEST_P(DBCompactionTestWithParam,IntraL0CompactionAfterFlushCheckConsistencyFail)5069 TEST_P(DBCompactionTestWithParam,
5070 IntraL0CompactionAfterFlushCheckConsistencyFail) {
5071 Options options = CurrentOptions();
5072 options.force_consistency_checks = true;
5073 options.compression = kNoCompression;
5074 options.level0_file_num_compaction_trigger = 5;
5075 options.max_background_compactions = 2;
5076 options.max_subcompactions = max_subcompactions_;
5077 options.write_buffer_size = 2 << 20;
5078 options.max_write_buffer_number = 6;
5079 DestroyAndReopen(options);
5080
5081 const size_t kValueSize = 1 << 20;
5082 Random rnd(301);
5083 std::string value(RandomString(&rnd, kValueSize));
5084 std::string value2(RandomString(&rnd, kValueSize));
5085 std::string bigvalue = value + value;
5086
5087 // prevents trivial move
5088 for (int i = 0; i < 10; ++i) {
5089 ASSERT_OK(Put(Key(i), "")); // prevents trivial move
5090 }
5091 ASSERT_OK(Flush());
5092 Compact("", Key(99));
5093 ASSERT_EQ(0, NumTableFilesAtLevel(0));
5094
5095 std::atomic<int> pick_intra_l0_count(0);
5096 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
5097 {{"DBCompactionTestWithParam::IntraL0CompactionAfterFlush:1",
5098 "CompactionJob::Run():Start"}});
5099 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
5100 "FindIntraL0Compaction",
5101 [&](void* /*arg*/) { pick_intra_l0_count.fetch_add(1); });
5102 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
5103 // Make 6 L0 sst.
5104 for (int i = 0; i < 6; ++i) {
5105 if (i % 2 == 0) {
5106 IngestOneKeyValue(dbfull(), Key(i), value, options);
5107 } else {
5108 ASSERT_OK(Put(Key(i), value));
5109 ASSERT_OK(Flush());
5110 }
5111 }
5112
5113 ASSERT_EQ(6, NumTableFilesAtLevel(0));
5114
5115 // Stop run flush job
5116 env_->SetBackgroundThreads(1, Env::HIGH);
5117 test::SleepingBackgroundTask sleeping_tasks;
5118 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_tasks,
5119 Env::Priority::HIGH);
5120 sleeping_tasks.WaitUntilSleeping();
5121
5122 // Put many keys to make memtable request to flush
5123 for (int i = 0; i < 6; ++i) {
5124 ASSERT_OK(Put(Key(i), bigvalue));
5125 }
5126
5127 ASSERT_EQ(6, NumTableFilesAtLevel(0));
5128 // ingest file to trigger IntraL0Compaction
5129 for (int i = 6; i < 10; ++i) {
5130 ASSERT_EQ(i, NumTableFilesAtLevel(0));
5131 IngestOneKeyValue(dbfull(), Key(i), value2, options);
5132 }
5133 ASSERT_EQ(10, NumTableFilesAtLevel(0));
5134
5135 // Wake up flush job
5136 sleeping_tasks.WakeUp();
5137 sleeping_tasks.WaitUntilDone();
5138 TEST_SYNC_POINT("DBCompactionTestWithParam::IntraL0CompactionAfterFlush:1");
5139 dbfull()->TEST_WaitForCompact();
5140 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
5141
5142 uint64_t error_count = 0;
5143 db_->GetIntProperty("rocksdb.background-errors", &error_count);
5144 ASSERT_EQ(error_count, 0);
5145 ASSERT_GT(pick_intra_l0_count.load(), 0);
5146 for (int i = 0; i < 6; ++i) {
5147 ASSERT_EQ(bigvalue, Get(Key(i)));
5148 }
5149 for (int i = 6; i < 10; ++i) {
5150 ASSERT_EQ(value2, Get(Key(i)));
5151 }
5152 }
5153
5154 #endif // !defined(ROCKSDB_LITE)
5155 } // namespace ROCKSDB_NAMESPACE
5156
main(int argc,char ** argv)5157 int main(int argc, char** argv) {
5158 #if !defined(ROCKSDB_LITE)
5159 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
5160 ::testing::InitGoogleTest(&argc, argv);
5161 return RUN_ALL_TESTS();
5162 #else
5163 (void) argc;
5164 (void) argv;
5165 return 0;
5166 #endif
5167 }
5168