1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "db/db_test_util.h"
11 #include "port/port.h"
12 #include "port/stack_trace.h"
13 #include "rocksdb/concurrent_task_limiter.h"
14 #include "rocksdb/experimental.h"
15 #include "rocksdb/sst_file_writer.h"
16 #include "rocksdb/utilities/convenience.h"
17 #include "test_util/fault_injection_test_env.h"
18 #include "test_util/sync_point.h"
19 #include "util/concurrent_task_limiter_impl.h"
20 
21 namespace ROCKSDB_NAMESPACE {
22 
23 // SYNC_POINT is not supported in released Windows mode.
24 #if !defined(ROCKSDB_LITE)
25 
26 class DBCompactionTest : public DBTestBase {
27  public:
DBCompactionTest()28   DBCompactionTest() : DBTestBase("/db_compaction_test") {}
29 };
30 
31 class DBCompactionTestWithParam
32     : public DBTestBase,
33       public testing::WithParamInterface<std::tuple<uint32_t, bool>> {
34  public:
DBCompactionTestWithParam()35   DBCompactionTestWithParam() : DBTestBase("/db_compaction_test") {
36     max_subcompactions_ = std::get<0>(GetParam());
37     exclusive_manual_compaction_ = std::get<1>(GetParam());
38   }
39 
40   // Required if inheriting from testing::WithParamInterface<>
SetUpTestCase()41   static void SetUpTestCase() {}
TearDownTestCase()42   static void TearDownTestCase() {}
43 
44   uint32_t max_subcompactions_;
45   bool exclusive_manual_compaction_;
46 };
47 
48 class DBCompactionDirectIOTest : public DBCompactionTest,
49                                  public ::testing::WithParamInterface<bool> {
50  public:
DBCompactionDirectIOTest()51   DBCompactionDirectIOTest() : DBCompactionTest() {}
52 };
53 
54 namespace {
55 
56 class FlushedFileCollector : public EventListener {
57  public:
FlushedFileCollector()58   FlushedFileCollector() {}
~FlushedFileCollector()59   ~FlushedFileCollector() override {}
60 
OnFlushCompleted(DB *,const FlushJobInfo & info)61   void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override {
62     std::lock_guard<std::mutex> lock(mutex_);
63     flushed_files_.push_back(info.file_path);
64   }
65 
GetFlushedFiles()66   std::vector<std::string> GetFlushedFiles() {
67     std::lock_guard<std::mutex> lock(mutex_);
68     std::vector<std::string> result;
69     for (auto fname : flushed_files_) {
70       result.push_back(fname);
71     }
72     return result;
73   }
74 
ClearFlushedFiles()75   void ClearFlushedFiles() { flushed_files_.clear(); }
76 
77  private:
78   std::vector<std::string> flushed_files_;
79   std::mutex mutex_;
80 };
81 
82 class CompactionStatsCollector : public EventListener {
83 public:
CompactionStatsCollector()84   CompactionStatsCollector()
85       : compaction_completed_(static_cast<int>(CompactionReason::kNumOfReasons)) {
86     for (auto& v : compaction_completed_) {
87       v.store(0);
88     }
89   }
90 
~CompactionStatsCollector()91   ~CompactionStatsCollector() override {}
92 
OnCompactionCompleted(DB *,const CompactionJobInfo & info)93   void OnCompactionCompleted(DB* /* db */,
94                              const CompactionJobInfo& info) override {
95     int k = static_cast<int>(info.compaction_reason);
96     int num_of_reasons = static_cast<int>(CompactionReason::kNumOfReasons);
97     assert(k >= 0 && k < num_of_reasons);
98     compaction_completed_[k]++;
99   }
100 
OnExternalFileIngested(DB *,const ExternalFileIngestionInfo &)101   void OnExternalFileIngested(
102       DB* /* db */, const ExternalFileIngestionInfo& /* info */) override {
103     int k = static_cast<int>(CompactionReason::kExternalSstIngestion);
104     compaction_completed_[k]++;
105   }
106 
OnFlushCompleted(DB *,const FlushJobInfo &)107   void OnFlushCompleted(DB* /* db */, const FlushJobInfo& /* info */) override {
108     int k = static_cast<int>(CompactionReason::kFlush);
109     compaction_completed_[k]++;
110   }
111 
NumberOfCompactions(CompactionReason reason) const112   int NumberOfCompactions(CompactionReason reason) const {
113     int num_of_reasons = static_cast<int>(CompactionReason::kNumOfReasons);
114     int k = static_cast<int>(reason);
115     assert(k >= 0 && k < num_of_reasons);
116     return compaction_completed_.at(k).load();
117   }
118 
119 private:
120   std::vector<std::atomic<int>> compaction_completed_;
121 };
122 
123 class SstStatsCollector : public EventListener {
124  public:
SstStatsCollector()125   SstStatsCollector() : num_ssts_creation_started_(0) {}
126 
OnTableFileCreationStarted(const TableFileCreationBriefInfo &)127   void OnTableFileCreationStarted(
128       const TableFileCreationBriefInfo& /* info */) override {
129     ++num_ssts_creation_started_;
130   }
131 
num_ssts_creation_started()132   int num_ssts_creation_started() { return num_ssts_creation_started_; }
133 
134  private:
135   std::atomic<int> num_ssts_creation_started_;
136 };
137 
138 static const int kCDTValueSize = 1000;
139 static const int kCDTKeysPerBuffer = 4;
140 static const int kCDTNumLevels = 8;
DeletionTriggerOptions(Options options)141 Options DeletionTriggerOptions(Options options) {
142   options.compression = kNoCompression;
143   options.write_buffer_size = kCDTKeysPerBuffer * (kCDTValueSize + 24);
144   options.min_write_buffer_number_to_merge = 1;
145   options.max_write_buffer_size_to_maintain = 0;
146   options.num_levels = kCDTNumLevels;
147   options.level0_file_num_compaction_trigger = 1;
148   options.target_file_size_base = options.write_buffer_size * 2;
149   options.target_file_size_multiplier = 2;
150   options.max_bytes_for_level_base =
151       options.target_file_size_base * options.target_file_size_multiplier;
152   options.max_bytes_for_level_multiplier = 2;
153   options.disable_auto_compactions = false;
154   return options;
155 }
156 
HaveOverlappingKeyRanges(const Comparator * c,const SstFileMetaData & a,const SstFileMetaData & b)157 bool HaveOverlappingKeyRanges(
158     const Comparator* c,
159     const SstFileMetaData& a, const SstFileMetaData& b) {
160   if (c->Compare(a.smallestkey, b.smallestkey) >= 0) {
161     if (c->Compare(a.smallestkey, b.largestkey) <= 0) {
162       // b.smallestkey <= a.smallestkey <= b.largestkey
163       return true;
164     }
165   } else if (c->Compare(a.largestkey, b.smallestkey) >= 0) {
166     // a.smallestkey < b.smallestkey <= a.largestkey
167     return true;
168   }
169   if (c->Compare(a.largestkey, b.largestkey) <= 0) {
170     if (c->Compare(a.largestkey, b.smallestkey) >= 0) {
171       // b.smallestkey <= a.largestkey <= b.largestkey
172       return true;
173     }
174   } else if (c->Compare(a.smallestkey, b.largestkey) <= 0) {
175     // a.smallestkey <= b.largestkey < a.largestkey
176     return true;
177   }
178   return false;
179 }
180 
181 // Identifies all files between level "min_level" and "max_level"
182 // which has overlapping key range with "input_file_meta".
GetOverlappingFileNumbersForLevelCompaction(const ColumnFamilyMetaData & cf_meta,const Comparator * comparator,int min_level,int max_level,const SstFileMetaData * input_file_meta,std::set<std::string> * overlapping_file_names)183 void GetOverlappingFileNumbersForLevelCompaction(
184     const ColumnFamilyMetaData& cf_meta,
185     const Comparator* comparator,
186     int min_level, int max_level,
187     const SstFileMetaData* input_file_meta,
188     std::set<std::string>* overlapping_file_names) {
189   std::set<const SstFileMetaData*> overlapping_files;
190   overlapping_files.insert(input_file_meta);
191   for (int m = min_level; m <= max_level; ++m) {
192     for (auto& file : cf_meta.levels[m].files) {
193       for (auto* included_file : overlapping_files) {
194         if (HaveOverlappingKeyRanges(
195                 comparator, *included_file, file)) {
196           overlapping_files.insert(&file);
197           overlapping_file_names->insert(file.name);
198           break;
199         }
200       }
201     }
202   }
203 }
204 
VerifyCompactionResult(const ColumnFamilyMetaData & cf_meta,const std::set<std::string> & overlapping_file_numbers)205 void VerifyCompactionResult(
206     const ColumnFamilyMetaData& cf_meta,
207     const std::set<std::string>& overlapping_file_numbers) {
208 #ifndef NDEBUG
209   for (auto& level : cf_meta.levels) {
210     for (auto& file : level.files) {
211       assert(overlapping_file_numbers.find(file.name) ==
212              overlapping_file_numbers.end());
213     }
214   }
215 #endif
216 }
217 
218 /*
219  * Verifies compaction stats of cfd are valid.
220  *
221  * For each level of cfd, its compaction stats are valid if
222  * 1) sum(stat.counts) == stat.count, and
223  * 2) stat.counts[i] == collector.NumberOfCompactions(i)
224  */
VerifyCompactionStats(ColumnFamilyData & cfd,const CompactionStatsCollector & collector)225 void VerifyCompactionStats(ColumnFamilyData& cfd,
226     const CompactionStatsCollector& collector) {
227 #ifndef NDEBUG
228   InternalStats* internal_stats_ptr = cfd.internal_stats();
229   ASSERT_TRUE(internal_stats_ptr != nullptr);
230   const std::vector<InternalStats::CompactionStats>& comp_stats =
231       internal_stats_ptr->TEST_GetCompactionStats();
232   const int num_of_reasons = static_cast<int>(CompactionReason::kNumOfReasons);
233   std::vector<int> counts(num_of_reasons, 0);
234   // Count the number of compactions caused by each CompactionReason across
235   // all levels.
236   for (const auto& stat : comp_stats) {
237     int sum = 0;
238     for (int i = 0; i < num_of_reasons; i++) {
239       counts[i] += stat.counts[i];
240       sum += stat.counts[i];
241     }
242     ASSERT_EQ(sum, stat.count);
243   }
244   // Verify InternalStats bookkeeping matches that of CompactionStatsCollector,
245   // assuming that all compactions complete.
246   for (int i = 0; i < num_of_reasons; i++) {
247     ASSERT_EQ(collector.NumberOfCompactions(static_cast<CompactionReason>(i)), counts[i]);
248   }
249 #endif /* NDEBUG */
250 }
251 
PickFileRandomly(const ColumnFamilyMetaData & cf_meta,Random * rand,int * level=nullptr)252 const SstFileMetaData* PickFileRandomly(
253     const ColumnFamilyMetaData& cf_meta,
254     Random* rand,
255     int* level = nullptr) {
256   auto file_id = rand->Uniform(static_cast<int>(
257       cf_meta.file_count)) + 1;
258   for (auto& level_meta : cf_meta.levels) {
259     if (file_id <= level_meta.files.size()) {
260       if (level != nullptr) {
261         *level = level_meta.level;
262       }
263       auto result = rand->Uniform(file_id);
264       return &(level_meta.files[result]);
265     }
266     file_id -= static_cast<uint32_t>(level_meta.files.size());
267   }
268   assert(false);
269   return nullptr;
270 }
271 }  // anonymous namespace
272 
273 #ifndef ROCKSDB_VALGRIND_RUN
274 // All the TEST_P tests run once with sub_compactions disabled (i.e.
275 // options.max_subcompactions = 1) and once with it enabled
TEST_P(DBCompactionTestWithParam,CompactionDeletionTrigger)276 TEST_P(DBCompactionTestWithParam, CompactionDeletionTrigger) {
277   for (int tid = 0; tid < 3; ++tid) {
278     uint64_t db_size[2];
279     Options options = DeletionTriggerOptions(CurrentOptions());
280     options.max_subcompactions = max_subcompactions_;
281 
282     if (tid == 1) {
283       // the following only disable stats update in DB::Open()
284       // and should not affect the result of this test.
285       options.skip_stats_update_on_db_open = true;
286     } else if (tid == 2) {
287       // third pass with universal compaction
288       options.compaction_style = kCompactionStyleUniversal;
289       options.num_levels = 1;
290     }
291 
292     DestroyAndReopen(options);
293     Random rnd(301);
294 
295     const int kTestSize = kCDTKeysPerBuffer * 1024;
296     std::vector<std::string> values;
297     for (int k = 0; k < kTestSize; ++k) {
298       values.push_back(RandomString(&rnd, kCDTValueSize));
299       ASSERT_OK(Put(Key(k), values[k]));
300     }
301     dbfull()->TEST_WaitForFlushMemTable();
302     dbfull()->TEST_WaitForCompact();
303     db_size[0] = Size(Key(0), Key(kTestSize - 1));
304 
305     for (int k = 0; k < kTestSize; ++k) {
306       ASSERT_OK(Delete(Key(k)));
307     }
308     dbfull()->TEST_WaitForFlushMemTable();
309     dbfull()->TEST_WaitForCompact();
310     db_size[1] = Size(Key(0), Key(kTestSize - 1));
311 
312     // must have much smaller db size.
313     ASSERT_GT(db_size[0] / 3, db_size[1]);
314   }
315 }
316 #endif  // ROCKSDB_VALGRIND_RUN
317 
TEST_P(DBCompactionTestWithParam,CompactionsPreserveDeletes)318 TEST_P(DBCompactionTestWithParam, CompactionsPreserveDeletes) {
319   //  For each options type we test following
320   //  - Enable preserve_deletes
321   //  - write bunch of keys and deletes
322   //  - Set start_seqnum to the beginning; compact; check that keys are present
323   //  - rewind start_seqnum way forward; compact; check that keys are gone
324 
325   for (int tid = 0; tid < 3; ++tid) {
326     Options options = DeletionTriggerOptions(CurrentOptions());
327     options.max_subcompactions = max_subcompactions_;
328     options.preserve_deletes=true;
329     options.num_levels = 2;
330 
331     if (tid == 1) {
332       options.skip_stats_update_on_db_open = true;
333     } else if (tid == 2) {
334       // third pass with universal compaction
335       options.compaction_style = kCompactionStyleUniversal;
336     }
337 
338     DestroyAndReopen(options);
339     Random rnd(301);
340     // highlight the default; all deletes should be preserved
341     SetPreserveDeletesSequenceNumber(0);
342 
343     const int kTestSize = kCDTKeysPerBuffer;
344     std::vector<std::string> values;
345     for (int k = 0; k < kTestSize; ++k) {
346       values.push_back(RandomString(&rnd, kCDTValueSize));
347       ASSERT_OK(Put(Key(k), values[k]));
348     }
349 
350     for (int k = 0; k < kTestSize; ++k) {
351       ASSERT_OK(Delete(Key(k)));
352     }
353     // to ensure we tackle all tombstones
354     CompactRangeOptions cro;
355     cro.change_level = true;
356     cro.target_level = 2;
357     cro.bottommost_level_compaction =
358         BottommostLevelCompaction::kForceOptimized;
359 
360     dbfull()->TEST_WaitForFlushMemTable();
361     dbfull()->CompactRange(cro, nullptr, nullptr);
362 
363     // check that normal user iterator doesn't see anything
364     Iterator* db_iter = dbfull()->NewIterator(ReadOptions());
365     int i = 0;
366     for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) {
367       i++;
368     }
369     ASSERT_EQ(i, 0);
370     delete db_iter;
371 
372     // check that iterator that sees internal keys sees tombstones
373     ReadOptions ro;
374     ro.iter_start_seqnum=1;
375     db_iter = dbfull()->NewIterator(ro);
376     i = 0;
377     for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) {
378       i++;
379     }
380     ASSERT_EQ(i, 4);
381     delete db_iter;
382 
383     // now all deletes should be gone
384     SetPreserveDeletesSequenceNumber(100000000);
385     dbfull()->CompactRange(cro, nullptr, nullptr);
386 
387     db_iter = dbfull()->NewIterator(ro);
388     i = 0;
389     for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) {
390       i++;
391     }
392     ASSERT_EQ(i, 0);
393     delete db_iter;
394   }
395 }
396 
TEST_F(DBCompactionTest,SkipStatsUpdateTest)397 TEST_F(DBCompactionTest, SkipStatsUpdateTest) {
398   // This test verify UpdateAccumulatedStats is not on
399   // if options.skip_stats_update_on_db_open = true
400   // The test will need to be updated if the internal behavior changes.
401 
402   Options options = DeletionTriggerOptions(CurrentOptions());
403   options.disable_auto_compactions = true;
404   options.env = env_;
405   DestroyAndReopen(options);
406   Random rnd(301);
407 
408   const int kTestSize = kCDTKeysPerBuffer * 512;
409   std::vector<std::string> values;
410   for (int k = 0; k < kTestSize; ++k) {
411     values.push_back(RandomString(&rnd, kCDTValueSize));
412     ASSERT_OK(Put(Key(k), values[k]));
413   }
414 
415   ASSERT_OK(Flush());
416 
417   Close();
418 
419   int update_acc_stats_called = 0;
420   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
421       "VersionStorageInfo::UpdateAccumulatedStats",
422       [&](void* /* arg */) { ++update_acc_stats_called; });
423   SyncPoint::GetInstance()->EnableProcessing();
424 
425   // Reopen the DB with stats-update disabled
426   options.skip_stats_update_on_db_open = true;
427   options.max_open_files = 20;
428   Reopen(options);
429 
430   ASSERT_EQ(update_acc_stats_called, 0);
431 
432   // Repeat the reopen process, but this time we enable
433   // stats-update.
434   options.skip_stats_update_on_db_open = false;
435   Reopen(options);
436 
437   ASSERT_GT(update_acc_stats_called, 0);
438 
439   SyncPoint::GetInstance()->ClearAllCallBacks();
440   SyncPoint::GetInstance()->DisableProcessing();
441 }
442 
TEST_F(DBCompactionTest,TestTableReaderForCompaction)443 TEST_F(DBCompactionTest, TestTableReaderForCompaction) {
444   Options options = CurrentOptions();
445   options.env = env_;
446   options.new_table_reader_for_compaction_inputs = true;
447   options.max_open_files = 20;
448   options.level0_file_num_compaction_trigger = 3;
449   DestroyAndReopen(options);
450   Random rnd(301);
451 
452   int num_table_cache_lookup = 0;
453   int num_new_table_reader = 0;
454   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
455       "TableCache::FindTable:0", [&](void* arg) {
456         assert(arg != nullptr);
457         bool no_io = *(reinterpret_cast<bool*>(arg));
458         if (!no_io) {
459           // filter out cases for table properties queries.
460           num_table_cache_lookup++;
461         }
462       });
463   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
464       "TableCache::GetTableReader:0",
465       [&](void* /*arg*/) { num_new_table_reader++; });
466   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
467 
468   for (int k = 0; k < options.level0_file_num_compaction_trigger; ++k) {
469     ASSERT_OK(Put(Key(k), Key(k)));
470     ASSERT_OK(Put(Key(10 - k), "bar"));
471     if (k < options.level0_file_num_compaction_trigger - 1) {
472       num_table_cache_lookup = 0;
473       Flush();
474       dbfull()->TEST_WaitForCompact();
475       // preloading iterator issues one table cache lookup and create
476       // a new table reader, if not preloaded.
477       int old_num_table_cache_lookup = num_table_cache_lookup;
478       ASSERT_GE(num_table_cache_lookup, 1);
479       ASSERT_EQ(num_new_table_reader, 1);
480 
481       num_table_cache_lookup = 0;
482       num_new_table_reader = 0;
483       ASSERT_EQ(Key(k), Get(Key(k)));
484       // lookup iterator from table cache and no need to create a new one.
485       ASSERT_EQ(old_num_table_cache_lookup + num_table_cache_lookup, 2);
486       ASSERT_EQ(num_new_table_reader, 0);
487     }
488   }
489 
490   num_table_cache_lookup = 0;
491   num_new_table_reader = 0;
492   Flush();
493   dbfull()->TEST_WaitForCompact();
494   // Preloading iterator issues one table cache lookup and creates
495   // a new table reader. One file is created for flush and one for compaction.
496   // Compaction inputs make no table cache look-up for data/range deletion
497   // iterators
498   // May preload table cache too.
499   ASSERT_GE(num_table_cache_lookup, 2);
500   int old_num_table_cache_lookup2 = num_table_cache_lookup;
501 
502   // Create new iterator for:
503   // (1) 1 for verifying flush results
504   // (2) 1 for verifying compaction results.
505   // (3) New TableReaders will not be created for compaction inputs
506   ASSERT_EQ(num_new_table_reader, 2);
507 
508   num_table_cache_lookup = 0;
509   num_new_table_reader = 0;
510   ASSERT_EQ(Key(1), Get(Key(1)));
511   ASSERT_EQ(num_table_cache_lookup + old_num_table_cache_lookup2, 5);
512   ASSERT_EQ(num_new_table_reader, 0);
513 
514   num_table_cache_lookup = 0;
515   num_new_table_reader = 0;
516   CompactRangeOptions cro;
517   cro.change_level = true;
518   cro.target_level = 2;
519   cro.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized;
520   db_->CompactRange(cro, nullptr, nullptr);
521   // Only verifying compaction outputs issues one table cache lookup
522   // for both data block and range deletion block).
523   // May preload table cache too.
524   ASSERT_GE(num_table_cache_lookup, 1);
525   old_num_table_cache_lookup2 = num_table_cache_lookup;
526   // One for verifying compaction results.
527   // No new iterator created for compaction.
528   ASSERT_EQ(num_new_table_reader, 1);
529 
530   num_table_cache_lookup = 0;
531   num_new_table_reader = 0;
532   ASSERT_EQ(Key(1), Get(Key(1)));
533   ASSERT_EQ(num_table_cache_lookup + old_num_table_cache_lookup2, 3);
534   ASSERT_EQ(num_new_table_reader, 0);
535 
536   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
537 }
538 
TEST_P(DBCompactionTestWithParam,CompactionDeletionTriggerReopen)539 TEST_P(DBCompactionTestWithParam, CompactionDeletionTriggerReopen) {
540   for (int tid = 0; tid < 2; ++tid) {
541     uint64_t db_size[3];
542     Options options = DeletionTriggerOptions(CurrentOptions());
543     options.max_subcompactions = max_subcompactions_;
544 
545     if (tid == 1) {
546       // second pass with universal compaction
547       options.compaction_style = kCompactionStyleUniversal;
548       options.num_levels = 1;
549     }
550 
551     DestroyAndReopen(options);
552     Random rnd(301);
553 
554     // round 1 --- insert key/value pairs.
555     const int kTestSize = kCDTKeysPerBuffer * 512;
556     std::vector<std::string> values;
557     for (int k = 0; k < kTestSize; ++k) {
558       values.push_back(RandomString(&rnd, kCDTValueSize));
559       ASSERT_OK(Put(Key(k), values[k]));
560     }
561     dbfull()->TEST_WaitForFlushMemTable();
562     dbfull()->TEST_WaitForCompact();
563     db_size[0] = Size(Key(0), Key(kTestSize - 1));
564     Close();
565 
566     // round 2 --- disable auto-compactions and issue deletions.
567     options.create_if_missing = false;
568     options.disable_auto_compactions = true;
569     Reopen(options);
570 
571     for (int k = 0; k < kTestSize; ++k) {
572       ASSERT_OK(Delete(Key(k)));
573     }
574     db_size[1] = Size(Key(0), Key(kTestSize - 1));
575     Close();
576     // as auto_compaction is off, we shouldn't see too much reduce
577     // in db size.
578     ASSERT_LT(db_size[0] / 3, db_size[1]);
579 
580     // round 3 --- reopen db with auto_compaction on and see if
581     // deletion compensation still work.
582     options.disable_auto_compactions = false;
583     Reopen(options);
584     // insert relatively small amount of data to trigger auto compaction.
585     for (int k = 0; k < kTestSize / 10; ++k) {
586       ASSERT_OK(Put(Key(k), values[k]));
587     }
588     dbfull()->TEST_WaitForFlushMemTable();
589     dbfull()->TEST_WaitForCompact();
590     db_size[2] = Size(Key(0), Key(kTestSize - 1));
591     // this time we're expecting significant drop in size.
592     ASSERT_GT(db_size[0] / 3, db_size[2]);
593   }
594 }
595 
TEST_F(DBCompactionTest,DisableStatsUpdateReopen)596 TEST_F(DBCompactionTest, DisableStatsUpdateReopen) {
597   uint64_t db_size[3];
598   for (int test = 0; test < 2; ++test) {
599     Options options = DeletionTriggerOptions(CurrentOptions());
600     options.skip_stats_update_on_db_open = (test == 0);
601 
602     env_->random_read_counter_.Reset();
603     DestroyAndReopen(options);
604     Random rnd(301);
605 
606     // round 1 --- insert key/value pairs.
607     const int kTestSize = kCDTKeysPerBuffer * 512;
608     std::vector<std::string> values;
609     for (int k = 0; k < kTestSize; ++k) {
610       values.push_back(RandomString(&rnd, kCDTValueSize));
611       ASSERT_OK(Put(Key(k), values[k]));
612     }
613     dbfull()->TEST_WaitForFlushMemTable();
614     dbfull()->TEST_WaitForCompact();
615     db_size[0] = Size(Key(0), Key(kTestSize - 1));
616     Close();
617 
618     // round 2 --- disable auto-compactions and issue deletions.
619     options.create_if_missing = false;
620     options.disable_auto_compactions = true;
621 
622     env_->random_read_counter_.Reset();
623     Reopen(options);
624 
625     for (int k = 0; k < kTestSize; ++k) {
626       ASSERT_OK(Delete(Key(k)));
627     }
628     db_size[1] = Size(Key(0), Key(kTestSize - 1));
629     Close();
630     // as auto_compaction is off, we shouldn't see too much reduce
631     // in db size.
632     ASSERT_LT(db_size[0] / 3, db_size[1]);
633 
634     // round 3 --- reopen db with auto_compaction on and see if
635     // deletion compensation still work.
636     options.disable_auto_compactions = false;
637     Reopen(options);
638     dbfull()->TEST_WaitForFlushMemTable();
639     dbfull()->TEST_WaitForCompact();
640     db_size[2] = Size(Key(0), Key(kTestSize - 1));
641 
642     if (options.skip_stats_update_on_db_open) {
643       // If update stats on DB::Open is disable, we don't expect
644       // deletion entries taking effect.
645       ASSERT_LT(db_size[0] / 3, db_size[2]);
646     } else {
647       // Otherwise, we should see a significant drop in db size.
648       ASSERT_GT(db_size[0] / 3, db_size[2]);
649     }
650   }
651 }
652 
653 
TEST_P(DBCompactionTestWithParam,CompactionTrigger)654 TEST_P(DBCompactionTestWithParam, CompactionTrigger) {
655   const int kNumKeysPerFile = 100;
656 
657   Options options = CurrentOptions();
658   options.write_buffer_size = 110 << 10;  // 110KB
659   options.arena_block_size = 4 << 10;
660   options.num_levels = 3;
661   options.level0_file_num_compaction_trigger = 3;
662   options.max_subcompactions = max_subcompactions_;
663   options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
664   CreateAndReopenWithCF({"pikachu"}, options);
665 
666   Random rnd(301);
667 
668   for (int num = 0; num < options.level0_file_num_compaction_trigger - 1;
669        num++) {
670     std::vector<std::string> values;
671     // Write 100KB (100 values, each 1K)
672     for (int i = 0; i < kNumKeysPerFile; i++) {
673       values.push_back(RandomString(&rnd, 990));
674       ASSERT_OK(Put(1, Key(i), values[i]));
675     }
676     // put extra key to trigger flush
677     ASSERT_OK(Put(1, "", ""));
678     dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
679     ASSERT_EQ(NumTableFilesAtLevel(0, 1), num + 1);
680   }
681 
682   // generate one more file in level-0, and should trigger level-0 compaction
683   std::vector<std::string> values;
684   for (int i = 0; i < kNumKeysPerFile; i++) {
685     values.push_back(RandomString(&rnd, 990));
686     ASSERT_OK(Put(1, Key(i), values[i]));
687   }
688   // put extra key to trigger flush
689   ASSERT_OK(Put(1, "", ""));
690   dbfull()->TEST_WaitForCompact();
691 
692   ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
693   ASSERT_EQ(NumTableFilesAtLevel(1, 1), 1);
694 }
695 
TEST_F(DBCompactionTest,BGCompactionsAllowed)696 TEST_F(DBCompactionTest, BGCompactionsAllowed) {
697   // Create several column families. Make compaction triggers in all of them
698   // and see number of compactions scheduled to be less than allowed.
699   const int kNumKeysPerFile = 100;
700 
701   Options options = CurrentOptions();
702   options.write_buffer_size = 110 << 10;  // 110KB
703   options.arena_block_size = 4 << 10;
704   options.num_levels = 3;
705   // Should speed up compaction when there are 4 files.
706   options.level0_file_num_compaction_trigger = 2;
707   options.level0_slowdown_writes_trigger = 20;
708   options.soft_pending_compaction_bytes_limit = 1 << 30;  // Infinitely large
709   options.max_background_compactions = 3;
710   options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
711 
712   // Block all threads in thread pool.
713   const size_t kTotalTasks = 4;
714   env_->SetBackgroundThreads(4, Env::LOW);
715   test::SleepingBackgroundTask sleeping_tasks[kTotalTasks];
716   for (size_t i = 0; i < kTotalTasks; i++) {
717     env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
718                    &sleeping_tasks[i], Env::Priority::LOW);
719     sleeping_tasks[i].WaitUntilSleeping();
720   }
721 
722   CreateAndReopenWithCF({"one", "two", "three"}, options);
723 
724   Random rnd(301);
725   for (int cf = 0; cf < 4; cf++) {
726     for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) {
727       for (int i = 0; i < kNumKeysPerFile; i++) {
728         ASSERT_OK(Put(cf, Key(i), ""));
729       }
730       // put extra key to trigger flush
731       ASSERT_OK(Put(cf, "", ""));
732       dbfull()->TEST_WaitForFlushMemTable(handles_[cf]);
733       ASSERT_EQ(NumTableFilesAtLevel(0, cf), num + 1);
734     }
735   }
736 
737   // Now all column families qualify compaction but only one should be
738   // scheduled, because no column family hits speed up condition.
739   ASSERT_EQ(1u, env_->GetThreadPoolQueueLen(Env::Priority::LOW));
740 
741   // Create two more files for one column family, which triggers speed up
742   // condition, three compactions will be scheduled.
743   for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) {
744     for (int i = 0; i < kNumKeysPerFile; i++) {
745       ASSERT_OK(Put(2, Key(i), ""));
746     }
747     // put extra key to trigger flush
748     ASSERT_OK(Put(2, "", ""));
749     dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
750     ASSERT_EQ(options.level0_file_num_compaction_trigger + num + 1,
751               NumTableFilesAtLevel(0, 2));
752   }
753   ASSERT_EQ(3U, env_->GetThreadPoolQueueLen(Env::Priority::LOW));
754 
755   // Unblock all threads to unblock all compactions.
756   for (size_t i = 0; i < kTotalTasks; i++) {
757     sleeping_tasks[i].WakeUp();
758     sleeping_tasks[i].WaitUntilDone();
759   }
760   dbfull()->TEST_WaitForCompact();
761 
762   // Verify number of compactions allowed will come back to 1.
763 
764   for (size_t i = 0; i < kTotalTasks; i++) {
765     sleeping_tasks[i].Reset();
766     env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
767                    &sleeping_tasks[i], Env::Priority::LOW);
768     sleeping_tasks[i].WaitUntilSleeping();
769   }
770   for (int cf = 0; cf < 4; cf++) {
771     for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) {
772       for (int i = 0; i < kNumKeysPerFile; i++) {
773         ASSERT_OK(Put(cf, Key(i), ""));
774       }
775       // put extra key to trigger flush
776       ASSERT_OK(Put(cf, "", ""));
777       dbfull()->TEST_WaitForFlushMemTable(handles_[cf]);
778       ASSERT_EQ(NumTableFilesAtLevel(0, cf), num + 1);
779     }
780   }
781 
782   // Now all column families qualify compaction but only one should be
783   // scheduled, because no column family hits speed up condition.
784   ASSERT_EQ(1U, env_->GetThreadPoolQueueLen(Env::Priority::LOW));
785 
786   for (size_t i = 0; i < kTotalTasks; i++) {
787     sleeping_tasks[i].WakeUp();
788     sleeping_tasks[i].WaitUntilDone();
789   }
790 }
791 
TEST_P(DBCompactionTestWithParam,CompactionsGenerateMultipleFiles)792 TEST_P(DBCompactionTestWithParam, CompactionsGenerateMultipleFiles) {
793   Options options = CurrentOptions();
794   options.write_buffer_size = 100000000;        // Large write buffer
795   options.max_subcompactions = max_subcompactions_;
796   CreateAndReopenWithCF({"pikachu"}, options);
797 
798   Random rnd(301);
799 
800   // Write 8MB (80 values, each 100K)
801   ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
802   std::vector<std::string> values;
803   for (int i = 0; i < 80; i++) {
804     values.push_back(RandomString(&rnd, 100000));
805     ASSERT_OK(Put(1, Key(i), values[i]));
806   }
807 
808   // Reopening moves updates to level-0
809   ReopenWithColumnFamilies({"default", "pikachu"}, options);
810   dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1],
811                               true /* disallow trivial move */);
812 
813   ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
814   ASSERT_GT(NumTableFilesAtLevel(1, 1), 1);
815   for (int i = 0; i < 80; i++) {
816     ASSERT_EQ(Get(1, Key(i)), values[i]);
817   }
818 }
819 
TEST_F(DBCompactionTest,MinorCompactionsHappen)820 TEST_F(DBCompactionTest, MinorCompactionsHappen) {
821   do {
822     Options options = CurrentOptions();
823     options.write_buffer_size = 10000;
824     CreateAndReopenWithCF({"pikachu"}, options);
825 
826     const int N = 500;
827 
828     int starting_num_tables = TotalTableFiles(1);
829     for (int i = 0; i < N; i++) {
830       ASSERT_OK(Put(1, Key(i), Key(i) + std::string(1000, 'v')));
831     }
832     int ending_num_tables = TotalTableFiles(1);
833     ASSERT_GT(ending_num_tables, starting_num_tables);
834 
835     for (int i = 0; i < N; i++) {
836       ASSERT_EQ(Key(i) + std::string(1000, 'v'), Get(1, Key(i)));
837     }
838 
839     ReopenWithColumnFamilies({"default", "pikachu"}, options);
840 
841     for (int i = 0; i < N; i++) {
842       ASSERT_EQ(Key(i) + std::string(1000, 'v'), Get(1, Key(i)));
843     }
844   } while (ChangeCompactOptions());
845 }
846 
TEST_F(DBCompactionTest,UserKeyCrossFile1)847 TEST_F(DBCompactionTest, UserKeyCrossFile1) {
848   Options options = CurrentOptions();
849   options.compaction_style = kCompactionStyleLevel;
850   options.level0_file_num_compaction_trigger = 3;
851 
852   DestroyAndReopen(options);
853 
854   // create first file and flush to l0
855   Put("4", "A");
856   Put("3", "A");
857   Flush();
858   dbfull()->TEST_WaitForFlushMemTable();
859 
860   Put("2", "A");
861   Delete("3");
862   Flush();
863   dbfull()->TEST_WaitForFlushMemTable();
864   ASSERT_EQ("NOT_FOUND", Get("3"));
865 
866   // move both files down to l1
867   dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
868   ASSERT_EQ("NOT_FOUND", Get("3"));
869 
870   for (int i = 0; i < 3; i++) {
871     Put("2", "B");
872     Flush();
873     dbfull()->TEST_WaitForFlushMemTable();
874   }
875   dbfull()->TEST_WaitForCompact();
876 
877   ASSERT_EQ("NOT_FOUND", Get("3"));
878 }
879 
TEST_F(DBCompactionTest,UserKeyCrossFile2)880 TEST_F(DBCompactionTest, UserKeyCrossFile2) {
881   Options options = CurrentOptions();
882   options.compaction_style = kCompactionStyleLevel;
883   options.level0_file_num_compaction_trigger = 3;
884 
885   DestroyAndReopen(options);
886 
887   // create first file and flush to l0
888   Put("4", "A");
889   Put("3", "A");
890   Flush();
891   dbfull()->TEST_WaitForFlushMemTable();
892 
893   Put("2", "A");
894   SingleDelete("3");
895   Flush();
896   dbfull()->TEST_WaitForFlushMemTable();
897   ASSERT_EQ("NOT_FOUND", Get("3"));
898 
899   // move both files down to l1
900   dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
901   ASSERT_EQ("NOT_FOUND", Get("3"));
902 
903   for (int i = 0; i < 3; i++) {
904     Put("2", "B");
905     Flush();
906     dbfull()->TEST_WaitForFlushMemTable();
907   }
908   dbfull()->TEST_WaitForCompact();
909 
910   ASSERT_EQ("NOT_FOUND", Get("3"));
911 }
912 
TEST_F(DBCompactionTest,ZeroSeqIdCompaction)913 TEST_F(DBCompactionTest, ZeroSeqIdCompaction) {
914   Options options = CurrentOptions();
915   options.compaction_style = kCompactionStyleLevel;
916   options.level0_file_num_compaction_trigger = 3;
917 
918   FlushedFileCollector* collector = new FlushedFileCollector();
919   options.listeners.emplace_back(collector);
920 
921   // compaction options
922   CompactionOptions compact_opt;
923   compact_opt.compression = kNoCompression;
924   compact_opt.output_file_size_limit = 4096;
925   const size_t key_len =
926     static_cast<size_t>(compact_opt.output_file_size_limit) / 5;
927 
928   DestroyAndReopen(options);
929 
930   std::vector<const Snapshot*> snaps;
931 
932   // create first file and flush to l0
933   for (auto& key : {"1", "2", "3", "3", "3", "3"}) {
934     Put(key, std::string(key_len, 'A'));
935     snaps.push_back(dbfull()->GetSnapshot());
936   }
937   Flush();
938   dbfull()->TEST_WaitForFlushMemTable();
939 
940   // create second file and flush to l0
941   for (auto& key : {"3", "4", "5", "6", "7", "8"}) {
942     Put(key, std::string(key_len, 'A'));
943     snaps.push_back(dbfull()->GetSnapshot());
944   }
945   Flush();
946   dbfull()->TEST_WaitForFlushMemTable();
947 
948   // move both files down to l1
949   dbfull()->CompactFiles(compact_opt, collector->GetFlushedFiles(), 1);
950 
951   // release snap so that first instance of key(3) can have seqId=0
952   for (auto snap : snaps) {
953     dbfull()->ReleaseSnapshot(snap);
954   }
955 
956   // create 3 files in l0 so to trigger compaction
957   for (int i = 0; i < options.level0_file_num_compaction_trigger; i++) {
958     Put("2", std::string(1, 'A'));
959     Flush();
960     dbfull()->TEST_WaitForFlushMemTable();
961   }
962 
963   dbfull()->TEST_WaitForCompact();
964   ASSERT_OK(Put("", ""));
965 }
966 
TEST_F(DBCompactionTest,ManualCompactionUnknownOutputSize)967 TEST_F(DBCompactionTest, ManualCompactionUnknownOutputSize) {
968   // github issue #2249
969   Options options = CurrentOptions();
970   options.compaction_style = kCompactionStyleLevel;
971   options.level0_file_num_compaction_trigger = 3;
972   DestroyAndReopen(options);
973 
974   // create two files in l1 that we can compact
975   for (int i = 0; i < 2; ++i) {
976     for (int j = 0; j < options.level0_file_num_compaction_trigger; j++) {
977       // make l0 files' ranges overlap to avoid trivial move
978       Put(std::to_string(2 * i), std::string(1, 'A'));
979       Put(std::to_string(2 * i + 1), std::string(1, 'A'));
980       Flush();
981       dbfull()->TEST_WaitForFlushMemTable();
982     }
983     dbfull()->TEST_WaitForCompact();
984     ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0);
985     ASSERT_EQ(NumTableFilesAtLevel(1, 0), i + 1);
986   }
987 
988   ColumnFamilyMetaData cf_meta;
989   dbfull()->GetColumnFamilyMetaData(dbfull()->DefaultColumnFamily(), &cf_meta);
990   ASSERT_EQ(2, cf_meta.levels[1].files.size());
991   std::vector<std::string> input_filenames;
992   for (const auto& sst_file : cf_meta.levels[1].files) {
993     input_filenames.push_back(sst_file.name);
994   }
995 
996   // note CompactionOptions::output_file_size_limit is unset.
997   CompactionOptions compact_opt;
998   compact_opt.compression = kNoCompression;
999   dbfull()->CompactFiles(compact_opt, input_filenames, 1);
1000 }
1001 
1002 // Check that writes done during a memtable compaction are recovered
1003 // if the database is shutdown during the memtable compaction.
TEST_F(DBCompactionTest,RecoverDuringMemtableCompaction)1004 TEST_F(DBCompactionTest, RecoverDuringMemtableCompaction) {
1005   do {
1006     Options options = CurrentOptions();
1007     options.env = env_;
1008     CreateAndReopenWithCF({"pikachu"}, options);
1009 
1010     // Trigger a long memtable compaction and reopen the database during it
1011     ASSERT_OK(Put(1, "foo", "v1"));  // Goes to 1st log file
1012     ASSERT_OK(Put(1, "big1", std::string(10000000, 'x')));  // Fills memtable
1013     ASSERT_OK(Put(1, "big2", std::string(1000, 'y')));  // Triggers compaction
1014     ASSERT_OK(Put(1, "bar", "v2"));                     // Goes to new log file
1015 
1016     ReopenWithColumnFamilies({"default", "pikachu"}, options);
1017     ASSERT_EQ("v1", Get(1, "foo"));
1018     ASSERT_EQ("v2", Get(1, "bar"));
1019     ASSERT_EQ(std::string(10000000, 'x'), Get(1, "big1"));
1020     ASSERT_EQ(std::string(1000, 'y'), Get(1, "big2"));
1021   } while (ChangeOptions());
1022 }
1023 
TEST_P(DBCompactionTestWithParam,TrivialMoveOneFile)1024 TEST_P(DBCompactionTestWithParam, TrivialMoveOneFile) {
1025   int32_t trivial_move = 0;
1026   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1027       "DBImpl::BackgroundCompaction:TrivialMove",
1028       [&](void* /*arg*/) { trivial_move++; });
1029   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1030 
1031   Options options = CurrentOptions();
1032   options.write_buffer_size = 100000000;
1033   options.max_subcompactions = max_subcompactions_;
1034   DestroyAndReopen(options);
1035 
1036   int32_t num_keys = 80;
1037   int32_t value_size = 100 * 1024;  // 100 KB
1038 
1039   Random rnd(301);
1040   std::vector<std::string> values;
1041   for (int i = 0; i < num_keys; i++) {
1042     values.push_back(RandomString(&rnd, value_size));
1043     ASSERT_OK(Put(Key(i), values[i]));
1044   }
1045 
1046   // Reopening moves updates to L0
1047   Reopen(options);
1048   ASSERT_EQ(NumTableFilesAtLevel(0, 0), 1);  // 1 file in L0
1049   ASSERT_EQ(NumTableFilesAtLevel(1, 0), 0);  // 0 files in L1
1050 
1051   std::vector<LiveFileMetaData> metadata;
1052   db_->GetLiveFilesMetaData(&metadata);
1053   ASSERT_EQ(metadata.size(), 1U);
1054   LiveFileMetaData level0_file = metadata[0];  // L0 file meta
1055 
1056   CompactRangeOptions cro;
1057   cro.exclusive_manual_compaction = exclusive_manual_compaction_;
1058 
1059   // Compaction will initiate a trivial move from L0 to L1
1060   dbfull()->CompactRange(cro, nullptr, nullptr);
1061 
1062   // File moved From L0 to L1
1063   ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0);  // 0 files in L0
1064   ASSERT_EQ(NumTableFilesAtLevel(1, 0), 1);  // 1 file in L1
1065 
1066   metadata.clear();
1067   db_->GetLiveFilesMetaData(&metadata);
1068   ASSERT_EQ(metadata.size(), 1U);
1069   ASSERT_EQ(metadata[0].name /* level1_file.name */, level0_file.name);
1070   ASSERT_EQ(metadata[0].size /* level1_file.size */, level0_file.size);
1071 
1072   for (int i = 0; i < num_keys; i++) {
1073     ASSERT_EQ(Get(Key(i)), values[i]);
1074   }
1075 
1076   ASSERT_EQ(trivial_move, 1);
1077   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
1078 }
1079 
TEST_P(DBCompactionTestWithParam,TrivialMoveNonOverlappingFiles)1080 TEST_P(DBCompactionTestWithParam, TrivialMoveNonOverlappingFiles) {
1081   int32_t trivial_move = 0;
1082   int32_t non_trivial_move = 0;
1083   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1084       "DBImpl::BackgroundCompaction:TrivialMove",
1085       [&](void* /*arg*/) { trivial_move++; });
1086   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1087       "DBImpl::BackgroundCompaction:NonTrivial",
1088       [&](void* /*arg*/) { non_trivial_move++; });
1089   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1090 
1091   Options options = CurrentOptions();
1092   options.disable_auto_compactions = true;
1093   options.write_buffer_size = 10 * 1024 * 1024;
1094   options.max_subcompactions = max_subcompactions_;
1095 
1096   DestroyAndReopen(options);
1097   // non overlapping ranges
1098   std::vector<std::pair<int32_t, int32_t>> ranges = {
1099     {100, 199},
1100     {300, 399},
1101     {0, 99},
1102     {200, 299},
1103     {600, 699},
1104     {400, 499},
1105     {500, 550},
1106     {551, 599},
1107   };
1108   int32_t value_size = 10 * 1024;  // 10 KB
1109 
1110   Random rnd(301);
1111   std::map<int32_t, std::string> values;
1112   for (size_t i = 0; i < ranges.size(); i++) {
1113     for (int32_t j = ranges[i].first; j <= ranges[i].second; j++) {
1114       values[j] = RandomString(&rnd, value_size);
1115       ASSERT_OK(Put(Key(j), values[j]));
1116     }
1117     ASSERT_OK(Flush());
1118   }
1119 
1120   int32_t level0_files = NumTableFilesAtLevel(0, 0);
1121   ASSERT_EQ(level0_files, ranges.size());    // Multiple files in L0
1122   ASSERT_EQ(NumTableFilesAtLevel(1, 0), 0);  // No files in L1
1123 
1124   CompactRangeOptions cro;
1125   cro.exclusive_manual_compaction = exclusive_manual_compaction_;
1126 
1127   // Since data is non-overlapping we expect compaction to initiate
1128   // a trivial move
1129   db_->CompactRange(cro, nullptr, nullptr);
1130   // We expect that all the files were trivially moved from L0 to L1
1131   ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0);
1132   ASSERT_EQ(NumTableFilesAtLevel(1, 0) /* level1_files */, level0_files);
1133 
1134   for (size_t i = 0; i < ranges.size(); i++) {
1135     for (int32_t j = ranges[i].first; j <= ranges[i].second; j++) {
1136       ASSERT_EQ(Get(Key(j)), values[j]);
1137     }
1138   }
1139 
1140   ASSERT_EQ(trivial_move, 1);
1141   ASSERT_EQ(non_trivial_move, 0);
1142 
1143   trivial_move = 0;
1144   non_trivial_move = 0;
1145   values.clear();
1146   DestroyAndReopen(options);
1147   // Same ranges as above but overlapping
1148   ranges = {
1149     {100, 199},
1150     {300, 399},
1151     {0, 99},
1152     {200, 299},
1153     {600, 699},
1154     {400, 499},
1155     {500, 560},  // this range overlap with the next one
1156     {551, 599},
1157   };
1158   for (size_t i = 0; i < ranges.size(); i++) {
1159     for (int32_t j = ranges[i].first; j <= ranges[i].second; j++) {
1160       values[j] = RandomString(&rnd, value_size);
1161       ASSERT_OK(Put(Key(j), values[j]));
1162     }
1163     ASSERT_OK(Flush());
1164   }
1165 
1166   db_->CompactRange(cro, nullptr, nullptr);
1167 
1168   for (size_t i = 0; i < ranges.size(); i++) {
1169     for (int32_t j = ranges[i].first; j <= ranges[i].second; j++) {
1170       ASSERT_EQ(Get(Key(j)), values[j]);
1171     }
1172   }
1173   ASSERT_EQ(trivial_move, 0);
1174   ASSERT_EQ(non_trivial_move, 1);
1175 
1176   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
1177 }
1178 
TEST_P(DBCompactionTestWithParam,TrivialMoveTargetLevel)1179 TEST_P(DBCompactionTestWithParam, TrivialMoveTargetLevel) {
1180   int32_t trivial_move = 0;
1181   int32_t non_trivial_move = 0;
1182   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1183       "DBImpl::BackgroundCompaction:TrivialMove",
1184       [&](void* /*arg*/) { trivial_move++; });
1185   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1186       "DBImpl::BackgroundCompaction:NonTrivial",
1187       [&](void* /*arg*/) { non_trivial_move++; });
1188   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1189 
1190   Options options = CurrentOptions();
1191   options.disable_auto_compactions = true;
1192   options.write_buffer_size = 10 * 1024 * 1024;
1193   options.num_levels = 7;
1194   options.max_subcompactions = max_subcompactions_;
1195 
1196   DestroyAndReopen(options);
1197   int32_t value_size = 10 * 1024;  // 10 KB
1198 
1199   // Add 2 non-overlapping files
1200   Random rnd(301);
1201   std::map<int32_t, std::string> values;
1202 
1203   // file 1 [0 => 300]
1204   for (int32_t i = 0; i <= 300; i++) {
1205     values[i] = RandomString(&rnd, value_size);
1206     ASSERT_OK(Put(Key(i), values[i]));
1207   }
1208   ASSERT_OK(Flush());
1209 
1210   // file 2 [600 => 700]
1211   for (int32_t i = 600; i <= 700; i++) {
1212     values[i] = RandomString(&rnd, value_size);
1213     ASSERT_OK(Put(Key(i), values[i]));
1214   }
1215   ASSERT_OK(Flush());
1216 
1217   // 2 files in L0
1218   ASSERT_EQ("2", FilesPerLevel(0));
1219   CompactRangeOptions compact_options;
1220   compact_options.change_level = true;
1221   compact_options.target_level = 6;
1222   compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
1223   ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
1224   // 2 files in L6
1225   ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel(0));
1226 
1227   ASSERT_EQ(trivial_move, 1);
1228   ASSERT_EQ(non_trivial_move, 0);
1229 
1230   for (int32_t i = 0; i <= 300; i++) {
1231     ASSERT_EQ(Get(Key(i)), values[i]);
1232   }
1233   for (int32_t i = 600; i <= 700; i++) {
1234     ASSERT_EQ(Get(Key(i)), values[i]);
1235   }
1236 }
1237 
TEST_P(DBCompactionTestWithParam,ManualCompactionPartial)1238 TEST_P(DBCompactionTestWithParam, ManualCompactionPartial) {
1239   int32_t trivial_move = 0;
1240   int32_t non_trivial_move = 0;
1241   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1242       "DBImpl::BackgroundCompaction:TrivialMove",
1243       [&](void* /*arg*/) { trivial_move++; });
1244   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1245       "DBImpl::BackgroundCompaction:NonTrivial",
1246       [&](void* /*arg*/) { non_trivial_move++; });
1247   bool first = true;
1248   // Purpose of dependencies:
1249   // 4 -> 1: ensure the order of two non-trivial compactions
1250   // 5 -> 2 and 5 -> 3: ensure we do a check before two non-trivial compactions
1251   // are installed
1252   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
1253       {{"DBCompaction::ManualPartial:4", "DBCompaction::ManualPartial:1"},
1254        {"DBCompaction::ManualPartial:5", "DBCompaction::ManualPartial:2"},
1255        {"DBCompaction::ManualPartial:5", "DBCompaction::ManualPartial:3"}});
1256   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1257       "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* /*arg*/) {
1258         if (first) {
1259           first = false;
1260           TEST_SYNC_POINT("DBCompaction::ManualPartial:4");
1261           TEST_SYNC_POINT("DBCompaction::ManualPartial:3");
1262         } else {  // second non-trivial compaction
1263           TEST_SYNC_POINT("DBCompaction::ManualPartial:2");
1264         }
1265       });
1266 
1267   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1268 
1269   Options options = CurrentOptions();
1270   options.write_buffer_size = 10 * 1024 * 1024;
1271   options.num_levels = 7;
1272   options.max_subcompactions = max_subcompactions_;
1273   options.level0_file_num_compaction_trigger = 3;
1274   options.max_background_compactions = 3;
1275   options.target_file_size_base = 1 << 23;  // 8 MB
1276 
1277   DestroyAndReopen(options);
1278   int32_t value_size = 10 * 1024;  // 10 KB
1279 
1280   // Add 2 non-overlapping files
1281   Random rnd(301);
1282   std::map<int32_t, std::string> values;
1283 
1284   // file 1 [0 => 100]
1285   for (int32_t i = 0; i < 100; i++) {
1286     values[i] = RandomString(&rnd, value_size);
1287     ASSERT_OK(Put(Key(i), values[i]));
1288   }
1289   ASSERT_OK(Flush());
1290 
1291   // file 2 [100 => 300]
1292   for (int32_t i = 100; i < 300; i++) {
1293     values[i] = RandomString(&rnd, value_size);
1294     ASSERT_OK(Put(Key(i), values[i]));
1295   }
1296   ASSERT_OK(Flush());
1297 
1298   // 2 files in L0
1299   ASSERT_EQ("2", FilesPerLevel(0));
1300   CompactRangeOptions compact_options;
1301   compact_options.change_level = true;
1302   compact_options.target_level = 6;
1303   compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
1304   // Trivial move the two non-overlapping files to level 6
1305   ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
1306   // 2 files in L6
1307   ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel(0));
1308 
1309   ASSERT_EQ(trivial_move, 1);
1310   ASSERT_EQ(non_trivial_move, 0);
1311 
1312   // file 3 [ 0 => 200]
1313   for (int32_t i = 0; i < 200; i++) {
1314     values[i] = RandomString(&rnd, value_size);
1315     ASSERT_OK(Put(Key(i), values[i]));
1316   }
1317   ASSERT_OK(Flush());
1318 
1319   // 1 files in L0
1320   ASSERT_EQ("1,0,0,0,0,0,2", FilesPerLevel(0));
1321   ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr, false));
1322   ASSERT_OK(dbfull()->TEST_CompactRange(1, nullptr, nullptr, nullptr, false));
1323   ASSERT_OK(dbfull()->TEST_CompactRange(2, nullptr, nullptr, nullptr, false));
1324   ASSERT_OK(dbfull()->TEST_CompactRange(3, nullptr, nullptr, nullptr, false));
1325   ASSERT_OK(dbfull()->TEST_CompactRange(4, nullptr, nullptr, nullptr, false));
1326   // 2 files in L6, 1 file in L5
1327   ASSERT_EQ("0,0,0,0,0,1,2", FilesPerLevel(0));
1328 
1329   ASSERT_EQ(trivial_move, 6);
1330   ASSERT_EQ(non_trivial_move, 0);
1331 
1332   ROCKSDB_NAMESPACE::port::Thread threads([&] {
1333     compact_options.change_level = false;
1334     compact_options.exclusive_manual_compaction = false;
1335     std::string begin_string = Key(0);
1336     std::string end_string = Key(199);
1337     Slice begin(begin_string);
1338     Slice end(end_string);
1339     // First non-trivial compaction is triggered
1340     ASSERT_OK(db_->CompactRange(compact_options, &begin, &end));
1341   });
1342 
1343   TEST_SYNC_POINT("DBCompaction::ManualPartial:1");
1344   // file 4 [300 => 400)
1345   for (int32_t i = 300; i <= 400; i++) {
1346     values[i] = RandomString(&rnd, value_size);
1347     ASSERT_OK(Put(Key(i), values[i]));
1348   }
1349   ASSERT_OK(Flush());
1350 
1351   // file 5 [400 => 500)
1352   for (int32_t i = 400; i <= 500; i++) {
1353     values[i] = RandomString(&rnd, value_size);
1354     ASSERT_OK(Put(Key(i), values[i]));
1355   }
1356   ASSERT_OK(Flush());
1357 
1358   // file 6 [500 => 600)
1359   for (int32_t i = 500; i <= 600; i++) {
1360     values[i] = RandomString(&rnd, value_size);
1361     ASSERT_OK(Put(Key(i), values[i]));
1362   }
1363   // Second non-trivial compaction is triggered
1364   ASSERT_OK(Flush());
1365 
1366   // Before two non-trivial compactions are installed, there are 3 files in L0
1367   ASSERT_EQ("3,0,0,0,0,1,2", FilesPerLevel(0));
1368   TEST_SYNC_POINT("DBCompaction::ManualPartial:5");
1369 
1370   dbfull()->TEST_WaitForFlushMemTable();
1371   dbfull()->TEST_WaitForCompact();
1372   // After two non-trivial compactions are installed, there is 1 file in L6, and
1373   // 1 file in L1
1374   ASSERT_EQ("0,1,0,0,0,0,1", FilesPerLevel(0));
1375   threads.join();
1376 
1377   for (int32_t i = 0; i < 600; i++) {
1378     ASSERT_EQ(Get(Key(i)), values[i]);
1379   }
1380 }
1381 
1382 // Disable as the test is flaky.
TEST_F(DBCompactionTest,DISABLED_ManualPartialFill)1383 TEST_F(DBCompactionTest, DISABLED_ManualPartialFill) {
1384   int32_t trivial_move = 0;
1385   int32_t non_trivial_move = 0;
1386   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1387       "DBImpl::BackgroundCompaction:TrivialMove",
1388       [&](void* /*arg*/) { trivial_move++; });
1389   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1390       "DBImpl::BackgroundCompaction:NonTrivial",
1391       [&](void* /*arg*/) { non_trivial_move++; });
1392   bool first = true;
1393   bool second = true;
1394   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
1395       {{"DBCompaction::PartialFill:4", "DBCompaction::PartialFill:1"},
1396        {"DBCompaction::PartialFill:2", "DBCompaction::PartialFill:3"}});
1397   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1398       "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* /*arg*/) {
1399         if (first) {
1400           TEST_SYNC_POINT("DBCompaction::PartialFill:4");
1401           first = false;
1402           TEST_SYNC_POINT("DBCompaction::PartialFill:3");
1403         } else if (second) {
1404         }
1405       });
1406 
1407   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1408 
1409   Options options = CurrentOptions();
1410   options.write_buffer_size = 10 * 1024 * 1024;
1411   options.max_bytes_for_level_multiplier = 2;
1412   options.num_levels = 4;
1413   options.level0_file_num_compaction_trigger = 3;
1414   options.max_background_compactions = 3;
1415 
1416   DestroyAndReopen(options);
1417   // make sure all background compaction jobs can be scheduled
1418   auto stop_token =
1419       dbfull()->TEST_write_controler().GetCompactionPressureToken();
1420   int32_t value_size = 10 * 1024;  // 10 KB
1421 
1422   // Add 2 non-overlapping files
1423   Random rnd(301);
1424   std::map<int32_t, std::string> values;
1425 
1426   // file 1 [0 => 100]
1427   for (int32_t i = 0; i < 100; i++) {
1428     values[i] = RandomString(&rnd, value_size);
1429     ASSERT_OK(Put(Key(i), values[i]));
1430   }
1431   ASSERT_OK(Flush());
1432 
1433   // file 2 [100 => 300]
1434   for (int32_t i = 100; i < 300; i++) {
1435     values[i] = RandomString(&rnd, value_size);
1436     ASSERT_OK(Put(Key(i), values[i]));
1437   }
1438   ASSERT_OK(Flush());
1439 
1440   // 2 files in L0
1441   ASSERT_EQ("2", FilesPerLevel(0));
1442   CompactRangeOptions compact_options;
1443   compact_options.change_level = true;
1444   compact_options.target_level = 2;
1445   ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
1446   // 2 files in L2
1447   ASSERT_EQ("0,0,2", FilesPerLevel(0));
1448 
1449   ASSERT_EQ(trivial_move, 1);
1450   ASSERT_EQ(non_trivial_move, 0);
1451 
1452   // file 3 [ 0 => 200]
1453   for (int32_t i = 0; i < 200; i++) {
1454     values[i] = RandomString(&rnd, value_size);
1455     ASSERT_OK(Put(Key(i), values[i]));
1456   }
1457   ASSERT_OK(Flush());
1458 
1459   // 2 files in L2, 1 in L0
1460   ASSERT_EQ("1,0,2", FilesPerLevel(0));
1461   ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr, false));
1462   // 2 files in L2, 1 in L1
1463   ASSERT_EQ("0,1,2", FilesPerLevel(0));
1464 
1465   ASSERT_EQ(trivial_move, 2);
1466   ASSERT_EQ(non_trivial_move, 0);
1467 
1468   ROCKSDB_NAMESPACE::port::Thread threads([&] {
1469     compact_options.change_level = false;
1470     compact_options.exclusive_manual_compaction = false;
1471     std::string begin_string = Key(0);
1472     std::string end_string = Key(199);
1473     Slice begin(begin_string);
1474     Slice end(end_string);
1475     ASSERT_OK(db_->CompactRange(compact_options, &begin, &end));
1476   });
1477 
1478   TEST_SYNC_POINT("DBCompaction::PartialFill:1");
1479   // Many files 4 [300 => 4300)
1480   for (int32_t i = 0; i <= 5; i++) {
1481     for (int32_t j = 300; j < 4300; j++) {
1482       if (j == 2300) {
1483         ASSERT_OK(Flush());
1484         dbfull()->TEST_WaitForFlushMemTable();
1485       }
1486       values[j] = RandomString(&rnd, value_size);
1487       ASSERT_OK(Put(Key(j), values[j]));
1488     }
1489   }
1490 
1491   // Verify level sizes
1492   uint64_t target_size = 4 * options.max_bytes_for_level_base;
1493   for (int32_t i = 1; i < options.num_levels; i++) {
1494     ASSERT_LE(SizeAtLevel(i), target_size);
1495     target_size = static_cast<uint64_t>(target_size *
1496                                         options.max_bytes_for_level_multiplier);
1497   }
1498 
1499   TEST_SYNC_POINT("DBCompaction::PartialFill:2");
1500   dbfull()->TEST_WaitForFlushMemTable();
1501   dbfull()->TEST_WaitForCompact();
1502   threads.join();
1503 
1504   for (int32_t i = 0; i < 4300; i++) {
1505     ASSERT_EQ(Get(Key(i)), values[i]);
1506   }
1507 }
1508 
TEST_F(DBCompactionTest,ManualCompactionWithUnorderedWrite)1509 TEST_F(DBCompactionTest, ManualCompactionWithUnorderedWrite) {
1510   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
1511       {{"DBImpl::WriteImpl:UnorderedWriteAfterWriteWAL",
1512         "DBCompactionTest::ManualCompactionWithUnorderedWrite:WaitWriteWAL"},
1513        {"DBImpl::WaitForPendingWrites:BeforeBlock",
1514         "DBImpl::WriteImpl:BeforeUnorderedWriteMemtable"}});
1515 
1516   Options options = CurrentOptions();
1517   options.unordered_write = true;
1518   DestroyAndReopen(options);
1519   Put("foo", "v1");
1520   ASSERT_OK(Flush());
1521 
1522   Put("bar", "v1");
1523   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1524   port::Thread writer([&]() { Put("foo", "v2"); });
1525 
1526   TEST_SYNC_POINT(
1527       "DBCompactionTest::ManualCompactionWithUnorderedWrite:WaitWriteWAL");
1528   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
1529 
1530   writer.join();
1531   ASSERT_EQ(Get("foo"), "v2");
1532 
1533   SyncPoint::GetInstance()->DisableProcessing();
1534   SyncPoint::GetInstance()->ClearAllCallBacks();
1535 
1536   Reopen(options);
1537   ASSERT_EQ(Get("foo"), "v2");
1538 }
1539 
TEST_F(DBCompactionTest,DeleteFileRange)1540 TEST_F(DBCompactionTest, DeleteFileRange) {
1541   Options options = CurrentOptions();
1542   options.write_buffer_size = 10 * 1024 * 1024;
1543   options.max_bytes_for_level_multiplier = 2;
1544   options.num_levels = 4;
1545   options.level0_file_num_compaction_trigger = 3;
1546   options.max_background_compactions = 3;
1547 
1548   DestroyAndReopen(options);
1549   int32_t value_size = 10 * 1024;  // 10 KB
1550 
1551   // Add 2 non-overlapping files
1552   Random rnd(301);
1553   std::map<int32_t, std::string> values;
1554 
1555   // file 1 [0 => 100]
1556   for (int32_t i = 0; i < 100; i++) {
1557     values[i] = RandomString(&rnd, value_size);
1558     ASSERT_OK(Put(Key(i), values[i]));
1559   }
1560   ASSERT_OK(Flush());
1561 
1562   // file 2 [100 => 300]
1563   for (int32_t i = 100; i < 300; i++) {
1564     values[i] = RandomString(&rnd, value_size);
1565     ASSERT_OK(Put(Key(i), values[i]));
1566   }
1567   ASSERT_OK(Flush());
1568 
1569   // 2 files in L0
1570   ASSERT_EQ("2", FilesPerLevel(0));
1571   CompactRangeOptions compact_options;
1572   compact_options.change_level = true;
1573   compact_options.target_level = 2;
1574   ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
1575   // 2 files in L2
1576   ASSERT_EQ("0,0,2", FilesPerLevel(0));
1577 
1578   // file 3 [ 0 => 200]
1579   for (int32_t i = 0; i < 200; i++) {
1580     values[i] = RandomString(&rnd, value_size);
1581     ASSERT_OK(Put(Key(i), values[i]));
1582   }
1583   ASSERT_OK(Flush());
1584 
1585   // Many files 4 [300 => 4300)
1586   for (int32_t i = 0; i <= 5; i++) {
1587     for (int32_t j = 300; j < 4300; j++) {
1588       if (j == 2300) {
1589         ASSERT_OK(Flush());
1590         dbfull()->TEST_WaitForFlushMemTable();
1591       }
1592       values[j] = RandomString(&rnd, value_size);
1593       ASSERT_OK(Put(Key(j), values[j]));
1594     }
1595   }
1596   ASSERT_OK(Flush());
1597   dbfull()->TEST_WaitForFlushMemTable();
1598   dbfull()->TEST_WaitForCompact();
1599 
1600   // Verify level sizes
1601   uint64_t target_size = 4 * options.max_bytes_for_level_base;
1602   for (int32_t i = 1; i < options.num_levels; i++) {
1603     ASSERT_LE(SizeAtLevel(i), target_size);
1604     target_size = static_cast<uint64_t>(target_size *
1605                                         options.max_bytes_for_level_multiplier);
1606   }
1607 
1608   size_t old_num_files = CountFiles();
1609   std::string begin_string = Key(1000);
1610   std::string end_string = Key(2000);
1611   Slice begin(begin_string);
1612   Slice end(end_string);
1613   ASSERT_OK(DeleteFilesInRange(db_, db_->DefaultColumnFamily(), &begin, &end));
1614 
1615   int32_t deleted_count = 0;
1616   for (int32_t i = 0; i < 4300; i++) {
1617     if (i < 1000 || i > 2000) {
1618       ASSERT_EQ(Get(Key(i)), values[i]);
1619     } else {
1620       ReadOptions roptions;
1621       std::string result;
1622       Status s = db_->Get(roptions, Key(i), &result);
1623       ASSERT_TRUE(s.IsNotFound() || s.ok());
1624       if (s.IsNotFound()) {
1625         deleted_count++;
1626       }
1627     }
1628   }
1629   ASSERT_GT(deleted_count, 0);
1630   begin_string = Key(5000);
1631   end_string = Key(6000);
1632   Slice begin1(begin_string);
1633   Slice end1(end_string);
1634   // Try deleting files in range which contain no keys
1635   ASSERT_OK(
1636       DeleteFilesInRange(db_, db_->DefaultColumnFamily(), &begin1, &end1));
1637 
1638   // Push data from level 0 to level 1 to force all data to be deleted
1639   // Note that we don't delete level 0 files
1640   compact_options.change_level = true;
1641   compact_options.target_level = 1;
1642   ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
1643   dbfull()->TEST_WaitForCompact();
1644 
1645   ASSERT_OK(
1646       DeleteFilesInRange(db_, db_->DefaultColumnFamily(), nullptr, nullptr));
1647 
1648   int32_t deleted_count2 = 0;
1649   for (int32_t i = 0; i < 4300; i++) {
1650     ReadOptions roptions;
1651     std::string result;
1652     Status s = db_->Get(roptions, Key(i), &result);
1653     ASSERT_TRUE(s.IsNotFound());
1654     deleted_count2++;
1655   }
1656   ASSERT_GT(deleted_count2, deleted_count);
1657   size_t new_num_files = CountFiles();
1658   ASSERT_GT(old_num_files, new_num_files);
1659 }
1660 
TEST_F(DBCompactionTest,DeleteFilesInRanges)1661 TEST_F(DBCompactionTest, DeleteFilesInRanges) {
1662   Options options = CurrentOptions();
1663   options.write_buffer_size = 10 * 1024 * 1024;
1664   options.max_bytes_for_level_multiplier = 2;
1665   options.num_levels = 4;
1666   options.max_background_compactions = 3;
1667   options.disable_auto_compactions = true;
1668 
1669   DestroyAndReopen(options);
1670   int32_t value_size = 10 * 1024;  // 10 KB
1671 
1672   Random rnd(301);
1673   std::map<int32_t, std::string> values;
1674 
1675   // file [0 => 100), [100 => 200), ... [900, 1000)
1676   for (auto i = 0; i < 10; i++) {
1677     for (auto j = 0; j < 100; j++) {
1678       auto k = i * 100 + j;
1679       values[k] = RandomString(&rnd, value_size);
1680       ASSERT_OK(Put(Key(k), values[k]));
1681     }
1682     ASSERT_OK(Flush());
1683   }
1684   ASSERT_EQ("10", FilesPerLevel(0));
1685   CompactRangeOptions compact_options;
1686   compact_options.change_level = true;
1687   compact_options.target_level = 2;
1688   ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
1689   ASSERT_EQ("0,0,10", FilesPerLevel(0));
1690 
1691   // file [0 => 100), [200 => 300), ... [800, 900)
1692   for (auto i = 0; i < 10; i+=2) {
1693     for (auto j = 0; j < 100; j++) {
1694       auto k = i * 100 + j;
1695       ASSERT_OK(Put(Key(k), values[k]));
1696     }
1697     ASSERT_OK(Flush());
1698   }
1699   ASSERT_EQ("5,0,10", FilesPerLevel(0));
1700   ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr));
1701   ASSERT_EQ("0,5,10", FilesPerLevel(0));
1702 
1703   // Delete files in range [0, 299] (inclusive)
1704   {
1705     auto begin_str1 = Key(0), end_str1 = Key(100);
1706     auto begin_str2 = Key(100), end_str2 = Key(200);
1707     auto begin_str3 = Key(200), end_str3 = Key(299);
1708     Slice begin1(begin_str1), end1(end_str1);
1709     Slice begin2(begin_str2), end2(end_str2);
1710     Slice begin3(begin_str3), end3(end_str3);
1711     std::vector<RangePtr> ranges;
1712     ranges.push_back(RangePtr(&begin1, &end1));
1713     ranges.push_back(RangePtr(&begin2, &end2));
1714     ranges.push_back(RangePtr(&begin3, &end3));
1715     ASSERT_OK(DeleteFilesInRanges(db_, db_->DefaultColumnFamily(),
1716                                   ranges.data(), ranges.size()));
1717     ASSERT_EQ("0,3,7", FilesPerLevel(0));
1718 
1719     // Keys [0, 300) should not exist.
1720     for (auto i = 0; i < 300; i++) {
1721       ReadOptions ropts;
1722       std::string result;
1723       auto s = db_->Get(ropts, Key(i), &result);
1724       ASSERT_TRUE(s.IsNotFound());
1725     }
1726     for (auto i = 300; i < 1000; i++) {
1727       ASSERT_EQ(Get(Key(i)), values[i]);
1728     }
1729   }
1730 
1731   // Delete files in range [600, 999) (exclusive)
1732   {
1733     auto begin_str1 = Key(600), end_str1 = Key(800);
1734     auto begin_str2 = Key(700), end_str2 = Key(900);
1735     auto begin_str3 = Key(800), end_str3 = Key(999);
1736     Slice begin1(begin_str1), end1(end_str1);
1737     Slice begin2(begin_str2), end2(end_str2);
1738     Slice begin3(begin_str3), end3(end_str3);
1739     std::vector<RangePtr> ranges;
1740     ranges.push_back(RangePtr(&begin1, &end1));
1741     ranges.push_back(RangePtr(&begin2, &end2));
1742     ranges.push_back(RangePtr(&begin3, &end3));
1743     ASSERT_OK(DeleteFilesInRanges(db_, db_->DefaultColumnFamily(),
1744                                   ranges.data(), ranges.size(), false));
1745     ASSERT_EQ("0,1,4", FilesPerLevel(0));
1746 
1747     // Keys [600, 900) should not exist.
1748     for (auto i = 600; i < 900; i++) {
1749       ReadOptions ropts;
1750       std::string result;
1751       auto s = db_->Get(ropts, Key(i), &result);
1752       ASSERT_TRUE(s.IsNotFound());
1753     }
1754     for (auto i = 300; i < 600; i++) {
1755       ASSERT_EQ(Get(Key(i)), values[i]);
1756     }
1757     for (auto i = 900; i < 1000; i++) {
1758       ASSERT_EQ(Get(Key(i)), values[i]);
1759     }
1760   }
1761 
1762   // Delete all files.
1763   {
1764     RangePtr range;
1765     ASSERT_OK(DeleteFilesInRanges(db_, db_->DefaultColumnFamily(), &range, 1));
1766     ASSERT_EQ("", FilesPerLevel(0));
1767 
1768     for (auto i = 0; i < 1000; i++) {
1769       ReadOptions ropts;
1770       std::string result;
1771       auto s = db_->Get(ropts, Key(i), &result);
1772       ASSERT_TRUE(s.IsNotFound());
1773     }
1774   }
1775 }
1776 
TEST_F(DBCompactionTest,DeleteFileRangeFileEndpointsOverlapBug)1777 TEST_F(DBCompactionTest, DeleteFileRangeFileEndpointsOverlapBug) {
1778   // regression test for #2833: groups of files whose user-keys overlap at the
1779   // endpoints could be split by `DeleteFilesInRange`. This caused old data to
1780   // reappear, either because a new version of the key was removed, or a range
1781   // deletion was partially dropped. It could also cause non-overlapping
1782   // invariant to be violated if the files dropped by DeleteFilesInRange were
1783   // a subset of files that a range deletion spans.
1784   const int kNumL0Files = 2;
1785   const int kValSize = 8 << 10;  // 8KB
1786   Options options = CurrentOptions();
1787   options.level0_file_num_compaction_trigger = kNumL0Files;
1788   options.target_file_size_base = 1 << 10;  // 1KB
1789   DestroyAndReopen(options);
1790 
1791   // The snapshot prevents key 1 from having its old version dropped. The low
1792   // `target_file_size_base` ensures two keys will be in each output file.
1793   const Snapshot* snapshot = nullptr;
1794   Random rnd(301);
1795   // The value indicates which flush the key belonged to, which is enough
1796   // for us to determine the keys' relative ages. After L0 flushes finish,
1797   // files look like:
1798   //
1799   // File 0: 0 -> vals[0], 1 -> vals[0]
1800   // File 1:               1 -> vals[1], 2 -> vals[1]
1801   //
1802   // Then L0->L1 compaction happens, which outputs keys as follows:
1803   //
1804   // File 0: 0 -> vals[0], 1 -> vals[1]
1805   // File 1:               1 -> vals[0], 2 -> vals[1]
1806   //
1807   // DeleteFilesInRange shouldn't be allowed to drop just file 0, as that
1808   // would cause `1 -> vals[0]` (an older key) to reappear.
1809   std::string vals[kNumL0Files];
1810   for (int i = 0; i < kNumL0Files; ++i) {
1811     vals[i] = RandomString(&rnd, kValSize);
1812     Put(Key(i), vals[i]);
1813     Put(Key(i + 1), vals[i]);
1814     Flush();
1815     if (i == 0) {
1816       snapshot = db_->GetSnapshot();
1817     }
1818   }
1819   dbfull()->TEST_WaitForCompact();
1820 
1821   // Verify `DeleteFilesInRange` can't drop only file 0 which would cause
1822   // "1 -> vals[0]" to reappear.
1823   std::string begin_str = Key(0), end_str = Key(1);
1824   Slice begin = begin_str, end = end_str;
1825   ASSERT_OK(DeleteFilesInRange(db_, db_->DefaultColumnFamily(), &begin, &end));
1826   ASSERT_EQ(vals[1], Get(Key(1)));
1827 
1828   db_->ReleaseSnapshot(snapshot);
1829 }
1830 
TEST_P(DBCompactionTestWithParam,TrivialMoveToLastLevelWithFiles)1831 TEST_P(DBCompactionTestWithParam, TrivialMoveToLastLevelWithFiles) {
1832   int32_t trivial_move = 0;
1833   int32_t non_trivial_move = 0;
1834   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1835       "DBImpl::BackgroundCompaction:TrivialMove",
1836       [&](void* /*arg*/) { trivial_move++; });
1837   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1838       "DBImpl::BackgroundCompaction:NonTrivial",
1839       [&](void* /*arg*/) { non_trivial_move++; });
1840   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1841 
1842   Options options = CurrentOptions();
1843   options.write_buffer_size = 100000000;
1844   options.max_subcompactions = max_subcompactions_;
1845   DestroyAndReopen(options);
1846 
1847   int32_t value_size = 10 * 1024;  // 10 KB
1848 
1849   Random rnd(301);
1850   std::vector<std::string> values;
1851   // File with keys [ 0 => 99 ]
1852   for (int i = 0; i < 100; i++) {
1853     values.push_back(RandomString(&rnd, value_size));
1854     ASSERT_OK(Put(Key(i), values[i]));
1855   }
1856   ASSERT_OK(Flush());
1857 
1858   ASSERT_EQ("1", FilesPerLevel(0));
1859   // Compaction will do L0=>L1 (trivial move) then move L1 files to L3
1860   CompactRangeOptions compact_options;
1861   compact_options.change_level = true;
1862   compact_options.target_level = 3;
1863   compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
1864   ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
1865   ASSERT_EQ("0,0,0,1", FilesPerLevel(0));
1866   ASSERT_EQ(trivial_move, 1);
1867   ASSERT_EQ(non_trivial_move, 0);
1868 
1869   // File with keys [ 100 => 199 ]
1870   for (int i = 100; i < 200; i++) {
1871     values.push_back(RandomString(&rnd, value_size));
1872     ASSERT_OK(Put(Key(i), values[i]));
1873   }
1874   ASSERT_OK(Flush());
1875 
1876   ASSERT_EQ("1,0,0,1", FilesPerLevel(0));
1877   CompactRangeOptions cro;
1878   cro.exclusive_manual_compaction = exclusive_manual_compaction_;
1879   // Compaction will do L0=>L1 L1=>L2 L2=>L3 (3 trivial moves)
1880   ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
1881   ASSERT_EQ("0,0,0,2", FilesPerLevel(0));
1882   ASSERT_EQ(trivial_move, 4);
1883   ASSERT_EQ(non_trivial_move, 0);
1884 
1885   for (int i = 0; i < 200; i++) {
1886     ASSERT_EQ(Get(Key(i)), values[i]);
1887   }
1888 
1889   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
1890 }
1891 
TEST_P(DBCompactionTestWithParam,LevelCompactionThirdPath)1892 TEST_P(DBCompactionTestWithParam, LevelCompactionThirdPath) {
1893   Options options = CurrentOptions();
1894   options.db_paths.emplace_back(dbname_, 500 * 1024);
1895   options.db_paths.emplace_back(dbname_ + "_2", 4 * 1024 * 1024);
1896   options.db_paths.emplace_back(dbname_ + "_3", 1024 * 1024 * 1024);
1897   options.memtable_factory.reset(
1898       new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
1899   options.compaction_style = kCompactionStyleLevel;
1900   options.write_buffer_size = 110 << 10;  // 110KB
1901   options.arena_block_size = 4 << 10;
1902   options.level0_file_num_compaction_trigger = 2;
1903   options.num_levels = 4;
1904   options.max_bytes_for_level_base = 400 * 1024;
1905   options.max_subcompactions = max_subcompactions_;
1906   //  options = CurrentOptions(options);
1907 
1908   std::vector<std::string> filenames;
1909   env_->GetChildren(options.db_paths[1].path, &filenames);
1910   // Delete archival files.
1911   for (size_t i = 0; i < filenames.size(); ++i) {
1912     env_->DeleteFile(options.db_paths[1].path + "/" + filenames[i]);
1913   }
1914   env_->DeleteDir(options.db_paths[1].path);
1915   Reopen(options);
1916 
1917   Random rnd(301);
1918   int key_idx = 0;
1919 
1920   // First three 110KB files are not going to second path.
1921   // After that, (100K, 200K)
1922   for (int num = 0; num < 3; num++) {
1923     GenerateNewFile(&rnd, &key_idx);
1924   }
1925 
1926   // Another 110KB triggers a compaction to 400K file to fill up first path
1927   GenerateNewFile(&rnd, &key_idx);
1928   ASSERT_EQ(3, GetSstFileCount(options.db_paths[1].path));
1929 
1930   // (1, 4)
1931   GenerateNewFile(&rnd, &key_idx);
1932   ASSERT_EQ("1,4", FilesPerLevel(0));
1933   ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path));
1934   ASSERT_EQ(1, GetSstFileCount(dbname_));
1935 
1936   // (1, 4, 1)
1937   GenerateNewFile(&rnd, &key_idx);
1938   ASSERT_EQ("1,4,1", FilesPerLevel(0));
1939   ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path));
1940   ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path));
1941   ASSERT_EQ(1, GetSstFileCount(dbname_));
1942 
1943   // (1, 4, 2)
1944   GenerateNewFile(&rnd, &key_idx);
1945   ASSERT_EQ("1,4,2", FilesPerLevel(0));
1946   ASSERT_EQ(2, GetSstFileCount(options.db_paths[2].path));
1947   ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path));
1948   ASSERT_EQ(1, GetSstFileCount(dbname_));
1949 
1950   // (1, 4, 3)
1951   GenerateNewFile(&rnd, &key_idx);
1952   ASSERT_EQ("1,4,3", FilesPerLevel(0));
1953   ASSERT_EQ(3, GetSstFileCount(options.db_paths[2].path));
1954   ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path));
1955   ASSERT_EQ(1, GetSstFileCount(dbname_));
1956 
1957   // (1, 4, 4)
1958   GenerateNewFile(&rnd, &key_idx);
1959   ASSERT_EQ("1,4,4", FilesPerLevel(0));
1960   ASSERT_EQ(4, GetSstFileCount(options.db_paths[2].path));
1961   ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path));
1962   ASSERT_EQ(1, GetSstFileCount(dbname_));
1963 
1964   // (1, 4, 5)
1965   GenerateNewFile(&rnd, &key_idx);
1966   ASSERT_EQ("1,4,5", FilesPerLevel(0));
1967   ASSERT_EQ(5, GetSstFileCount(options.db_paths[2].path));
1968   ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path));
1969   ASSERT_EQ(1, GetSstFileCount(dbname_));
1970 
1971   // (1, 4, 6)
1972   GenerateNewFile(&rnd, &key_idx);
1973   ASSERT_EQ("1,4,6", FilesPerLevel(0));
1974   ASSERT_EQ(6, GetSstFileCount(options.db_paths[2].path));
1975   ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path));
1976   ASSERT_EQ(1, GetSstFileCount(dbname_));
1977 
1978   // (1, 4, 7)
1979   GenerateNewFile(&rnd, &key_idx);
1980   ASSERT_EQ("1,4,7", FilesPerLevel(0));
1981   ASSERT_EQ(7, GetSstFileCount(options.db_paths[2].path));
1982   ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path));
1983   ASSERT_EQ(1, GetSstFileCount(dbname_));
1984 
1985   // (1, 4, 8)
1986   GenerateNewFile(&rnd, &key_idx);
1987   ASSERT_EQ("1,4,8", FilesPerLevel(0));
1988   ASSERT_EQ(8, GetSstFileCount(options.db_paths[2].path));
1989   ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path));
1990   ASSERT_EQ(1, GetSstFileCount(dbname_));
1991 
1992   for (int i = 0; i < key_idx; i++) {
1993     auto v = Get(Key(i));
1994     ASSERT_NE(v, "NOT_FOUND");
1995     ASSERT_TRUE(v.size() == 1 || v.size() == 990);
1996   }
1997 
1998   Reopen(options);
1999 
2000   for (int i = 0; i < key_idx; i++) {
2001     auto v = Get(Key(i));
2002     ASSERT_NE(v, "NOT_FOUND");
2003     ASSERT_TRUE(v.size() == 1 || v.size() == 990);
2004   }
2005 
2006   Destroy(options);
2007 }
2008 
TEST_P(DBCompactionTestWithParam,LevelCompactionPathUse)2009 TEST_P(DBCompactionTestWithParam, LevelCompactionPathUse) {
2010   Options options = CurrentOptions();
2011   options.db_paths.emplace_back(dbname_, 500 * 1024);
2012   options.db_paths.emplace_back(dbname_ + "_2", 4 * 1024 * 1024);
2013   options.db_paths.emplace_back(dbname_ + "_3", 1024 * 1024 * 1024);
2014   options.memtable_factory.reset(
2015       new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
2016   options.compaction_style = kCompactionStyleLevel;
2017   options.write_buffer_size = 110 << 10;  // 110KB
2018   options.arena_block_size = 4 << 10;
2019   options.level0_file_num_compaction_trigger = 2;
2020   options.num_levels = 4;
2021   options.max_bytes_for_level_base = 400 * 1024;
2022   options.max_subcompactions = max_subcompactions_;
2023   //  options = CurrentOptions(options);
2024 
2025   std::vector<std::string> filenames;
2026   env_->GetChildren(options.db_paths[1].path, &filenames);
2027   // Delete archival files.
2028   for (size_t i = 0; i < filenames.size(); ++i) {
2029     env_->DeleteFile(options.db_paths[1].path + "/" + filenames[i]);
2030   }
2031   env_->DeleteDir(options.db_paths[1].path);
2032   Reopen(options);
2033 
2034   Random rnd(301);
2035   int key_idx = 0;
2036 
2037   // Always gets compacted into 1 Level1 file,
2038   // 0/1 Level 0 file
2039   for (int num = 0; num < 3; num++) {
2040     key_idx = 0;
2041     GenerateNewFile(&rnd, &key_idx);
2042   }
2043 
2044   key_idx = 0;
2045   GenerateNewFile(&rnd, &key_idx);
2046   ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2047 
2048   key_idx = 0;
2049   GenerateNewFile(&rnd, &key_idx);
2050   ASSERT_EQ("1,1", FilesPerLevel(0));
2051   ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2052   ASSERT_EQ(1, GetSstFileCount(dbname_));
2053 
2054   key_idx = 0;
2055   GenerateNewFile(&rnd, &key_idx);
2056   ASSERT_EQ("0,1", FilesPerLevel(0));
2057   ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path));
2058   ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2059   ASSERT_EQ(0, GetSstFileCount(dbname_));
2060 
2061   key_idx = 0;
2062   GenerateNewFile(&rnd, &key_idx);
2063   ASSERT_EQ("1,1", FilesPerLevel(0));
2064   ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path));
2065   ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2066   ASSERT_EQ(1, GetSstFileCount(dbname_));
2067 
2068   key_idx = 0;
2069   GenerateNewFile(&rnd, &key_idx);
2070   ASSERT_EQ("0,1", FilesPerLevel(0));
2071   ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path));
2072   ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2073   ASSERT_EQ(0, GetSstFileCount(dbname_));
2074 
2075   key_idx = 0;
2076   GenerateNewFile(&rnd, &key_idx);
2077   ASSERT_EQ("1,1", FilesPerLevel(0));
2078   ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path));
2079   ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2080   ASSERT_EQ(1, GetSstFileCount(dbname_));
2081 
2082   key_idx = 0;
2083   GenerateNewFile(&rnd, &key_idx);
2084   ASSERT_EQ("0,1", FilesPerLevel(0));
2085   ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path));
2086   ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2087   ASSERT_EQ(0, GetSstFileCount(dbname_));
2088 
2089   key_idx = 0;
2090   GenerateNewFile(&rnd, &key_idx);
2091   ASSERT_EQ("1,1", FilesPerLevel(0));
2092   ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path));
2093   ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2094   ASSERT_EQ(1, GetSstFileCount(dbname_));
2095 
2096   key_idx = 0;
2097   GenerateNewFile(&rnd, &key_idx);
2098   ASSERT_EQ("0,1", FilesPerLevel(0));
2099   ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path));
2100   ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2101   ASSERT_EQ(0, GetSstFileCount(dbname_));
2102 
2103   key_idx = 0;
2104   GenerateNewFile(&rnd, &key_idx);
2105   ASSERT_EQ("1,1", FilesPerLevel(0));
2106   ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path));
2107   ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2108   ASSERT_EQ(1, GetSstFileCount(dbname_));
2109 
2110   for (int i = 0; i < key_idx; i++) {
2111     auto v = Get(Key(i));
2112     ASSERT_NE(v, "NOT_FOUND");
2113     ASSERT_TRUE(v.size() == 1 || v.size() == 990);
2114   }
2115 
2116   Reopen(options);
2117 
2118   for (int i = 0; i < key_idx; i++) {
2119     auto v = Get(Key(i));
2120     ASSERT_NE(v, "NOT_FOUND");
2121     ASSERT_TRUE(v.size() == 1 || v.size() == 990);
2122   }
2123 
2124   Destroy(options);
2125 }
2126 
TEST_P(DBCompactionTestWithParam,LevelCompactionCFPathUse)2127 TEST_P(DBCompactionTestWithParam, LevelCompactionCFPathUse) {
2128   Options options = CurrentOptions();
2129   options.db_paths.emplace_back(dbname_, 500 * 1024);
2130   options.db_paths.emplace_back(dbname_ + "_2", 4 * 1024 * 1024);
2131   options.db_paths.emplace_back(dbname_ + "_3", 1024 * 1024 * 1024);
2132   options.memtable_factory.reset(
2133     new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
2134   options.compaction_style = kCompactionStyleLevel;
2135   options.write_buffer_size = 110 << 10;  // 110KB
2136   options.arena_block_size = 4 << 10;
2137   options.level0_file_num_compaction_trigger = 2;
2138   options.num_levels = 4;
2139   options.max_bytes_for_level_base = 400 * 1024;
2140   options.max_subcompactions = max_subcompactions_;
2141 
2142   std::vector<Options> option_vector;
2143   option_vector.emplace_back(options);
2144   ColumnFamilyOptions cf_opt1(options), cf_opt2(options);
2145   // Configure CF1 specific paths.
2146   cf_opt1.cf_paths.emplace_back(dbname_ + "cf1", 500 * 1024);
2147   cf_opt1.cf_paths.emplace_back(dbname_ + "cf1_2", 4 * 1024 * 1024);
2148   cf_opt1.cf_paths.emplace_back(dbname_ + "cf1_3", 1024 * 1024 * 1024);
2149   option_vector.emplace_back(DBOptions(options), cf_opt1);
2150   CreateColumnFamilies({"one"},option_vector[1]);
2151 
2152   // Configura CF2 specific paths.
2153   cf_opt2.cf_paths.emplace_back(dbname_ + "cf2", 500 * 1024);
2154   cf_opt2.cf_paths.emplace_back(dbname_ + "cf2_2", 4 * 1024 * 1024);
2155   cf_opt2.cf_paths.emplace_back(dbname_ + "cf2_3", 1024 * 1024 * 1024);
2156   option_vector.emplace_back(DBOptions(options), cf_opt2);
2157   CreateColumnFamilies({"two"},option_vector[2]);
2158 
2159   ReopenWithColumnFamilies({"default", "one", "two"}, option_vector);
2160 
2161   Random rnd(301);
2162   int key_idx = 0;
2163   int key_idx1 = 0;
2164   int key_idx2 = 0;
2165 
2166   auto generate_file = [&]() {
2167     GenerateNewFile(0, &rnd, &key_idx);
2168     GenerateNewFile(1, &rnd, &key_idx1);
2169     GenerateNewFile(2, &rnd, &key_idx2);
2170   };
2171 
2172   auto check_sstfilecount = [&](int path_id, int expected) {
2173     ASSERT_EQ(expected, GetSstFileCount(options.db_paths[path_id].path));
2174     ASSERT_EQ(expected, GetSstFileCount(cf_opt1.cf_paths[path_id].path));
2175     ASSERT_EQ(expected, GetSstFileCount(cf_opt2.cf_paths[path_id].path));
2176   };
2177 
2178   auto check_filesperlevel = [&](const std::string& expected) {
2179     ASSERT_EQ(expected, FilesPerLevel(0));
2180     ASSERT_EQ(expected, FilesPerLevel(1));
2181     ASSERT_EQ(expected, FilesPerLevel(2));
2182   };
2183 
2184   auto check_getvalues = [&]() {
2185     for (int i = 0; i < key_idx; i++) {
2186       auto v = Get(0, Key(i));
2187       ASSERT_NE(v, "NOT_FOUND");
2188       ASSERT_TRUE(v.size() == 1 || v.size() == 990);
2189     }
2190 
2191     for (int i = 0; i < key_idx1; i++) {
2192       auto v = Get(1, Key(i));
2193       ASSERT_NE(v, "NOT_FOUND");
2194       ASSERT_TRUE(v.size() == 1 || v.size() == 990);
2195     }
2196 
2197     for (int i = 0; i < key_idx2; i++) {
2198       auto v = Get(2, Key(i));
2199       ASSERT_NE(v, "NOT_FOUND");
2200       ASSERT_TRUE(v.size() == 1 || v.size() == 990);
2201     }
2202   };
2203 
2204   // Check that default column family uses db_paths.
2205   // And Column family "one" uses cf_paths.
2206 
2207   // First three 110KB files are not going to second path.
2208   // After that, (100K, 200K)
2209   for (int num = 0; num < 3; num++) {
2210     generate_file();
2211   }
2212 
2213   // Another 110KB triggers a compaction to 400K file to fill up first path
2214   generate_file();
2215   check_sstfilecount(1, 3);
2216 
2217   // (1, 4)
2218   generate_file();
2219   check_filesperlevel("1,4");
2220   check_sstfilecount(1, 4);
2221   check_sstfilecount(0, 1);
2222 
2223   // (1, 4, 1)
2224   generate_file();
2225   check_filesperlevel("1,4,1");
2226   check_sstfilecount(2, 1);
2227   check_sstfilecount(1, 4);
2228   check_sstfilecount(0, 1);
2229 
2230   // (1, 4, 2)
2231   generate_file();
2232   check_filesperlevel("1,4,2");
2233   check_sstfilecount(2, 2);
2234   check_sstfilecount(1, 4);
2235   check_sstfilecount(0, 1);
2236 
2237   check_getvalues();
2238 
2239   ReopenWithColumnFamilies({"default", "one", "two"}, option_vector);
2240 
2241   check_getvalues();
2242 
2243   Destroy(options, true);
2244 }
2245 
TEST_P(DBCompactionTestWithParam,ConvertCompactionStyle)2246 TEST_P(DBCompactionTestWithParam, ConvertCompactionStyle) {
2247   Random rnd(301);
2248   int max_key_level_insert = 200;
2249   int max_key_universal_insert = 600;
2250 
2251   // Stage 1: generate a db with level compaction
2252   Options options = CurrentOptions();
2253   options.write_buffer_size = 110 << 10;  // 110KB
2254   options.arena_block_size = 4 << 10;
2255   options.num_levels = 4;
2256   options.level0_file_num_compaction_trigger = 3;
2257   options.max_bytes_for_level_base = 500 << 10;  // 500KB
2258   options.max_bytes_for_level_multiplier = 1;
2259   options.target_file_size_base = 200 << 10;  // 200KB
2260   options.target_file_size_multiplier = 1;
2261   options.max_subcompactions = max_subcompactions_;
2262   CreateAndReopenWithCF({"pikachu"}, options);
2263 
2264   for (int i = 0; i <= max_key_level_insert; i++) {
2265     // each value is 10K
2266     ASSERT_OK(Put(1, Key(i), RandomString(&rnd, 10000)));
2267   }
2268   ASSERT_OK(Flush(1));
2269   dbfull()->TEST_WaitForCompact();
2270 
2271   ASSERT_GT(TotalTableFiles(1, 4), 1);
2272   int non_level0_num_files = 0;
2273   for (int i = 1; i < options.num_levels; i++) {
2274     non_level0_num_files += NumTableFilesAtLevel(i, 1);
2275   }
2276   ASSERT_GT(non_level0_num_files, 0);
2277 
2278   // Stage 2: reopen with universal compaction - should fail
2279   options = CurrentOptions();
2280   options.compaction_style = kCompactionStyleUniversal;
2281   options.num_levels = 1;
2282   options = CurrentOptions(options);
2283   Status s = TryReopenWithColumnFamilies({"default", "pikachu"}, options);
2284   ASSERT_TRUE(s.IsInvalidArgument());
2285 
2286   // Stage 3: compact into a single file and move the file to level 0
2287   options = CurrentOptions();
2288   options.disable_auto_compactions = true;
2289   options.target_file_size_base = INT_MAX;
2290   options.target_file_size_multiplier = 1;
2291   options.max_bytes_for_level_base = INT_MAX;
2292   options.max_bytes_for_level_multiplier = 1;
2293   options.num_levels = 4;
2294   options = CurrentOptions(options);
2295   ReopenWithColumnFamilies({"default", "pikachu"}, options);
2296 
2297   CompactRangeOptions compact_options;
2298   compact_options.change_level = true;
2299   compact_options.target_level = 0;
2300   // cannot use kForceOptimized here because the compaction here is expected
2301   // to generate one output file
2302   compact_options.bottommost_level_compaction =
2303       BottommostLevelCompaction::kForce;
2304   compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
2305   dbfull()->CompactRange(compact_options, handles_[1], nullptr, nullptr);
2306 
2307   // Only 1 file in L0
2308   ASSERT_EQ("1", FilesPerLevel(1));
2309 
2310   // Stage 4: re-open in universal compaction style and do some db operations
2311   options = CurrentOptions();
2312   options.compaction_style = kCompactionStyleUniversal;
2313   options.num_levels = 4;
2314   options.write_buffer_size = 110 << 10;  // 110KB
2315   options.arena_block_size = 4 << 10;
2316   options.level0_file_num_compaction_trigger = 3;
2317   options = CurrentOptions(options);
2318   ReopenWithColumnFamilies({"default", "pikachu"}, options);
2319 
2320   options.num_levels = 1;
2321   ReopenWithColumnFamilies({"default", "pikachu"}, options);
2322 
2323   for (int i = max_key_level_insert / 2; i <= max_key_universal_insert; i++) {
2324     ASSERT_OK(Put(1, Key(i), RandomString(&rnd, 10000)));
2325   }
2326   dbfull()->Flush(FlushOptions());
2327   ASSERT_OK(Flush(1));
2328   dbfull()->TEST_WaitForCompact();
2329 
2330   for (int i = 1; i < options.num_levels; i++) {
2331     ASSERT_EQ(NumTableFilesAtLevel(i, 1), 0);
2332   }
2333 
2334   // verify keys inserted in both level compaction style and universal
2335   // compaction style
2336   std::string keys_in_db;
2337   Iterator* iter = dbfull()->NewIterator(ReadOptions(), handles_[1]);
2338   for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
2339     keys_in_db.append(iter->key().ToString());
2340     keys_in_db.push_back(',');
2341   }
2342   delete iter;
2343 
2344   std::string expected_keys;
2345   for (int i = 0; i <= max_key_universal_insert; i++) {
2346     expected_keys.append(Key(i));
2347     expected_keys.push_back(',');
2348   }
2349 
2350   ASSERT_EQ(keys_in_db, expected_keys);
2351 }
2352 
TEST_F(DBCompactionTest,L0_CompactionBug_Issue44_a)2353 TEST_F(DBCompactionTest, L0_CompactionBug_Issue44_a) {
2354   do {
2355     CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
2356     ASSERT_OK(Put(1, "b", "v"));
2357     ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2358     ASSERT_OK(Delete(1, "b"));
2359     ASSERT_OK(Delete(1, "a"));
2360     ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2361     ASSERT_OK(Delete(1, "a"));
2362     ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2363     ASSERT_OK(Put(1, "a", "v"));
2364     ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2365     ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2366     ASSERT_EQ("(a->v)", Contents(1));
2367     env_->SleepForMicroseconds(1000000);  // Wait for compaction to finish
2368     ASSERT_EQ("(a->v)", Contents(1));
2369   } while (ChangeCompactOptions());
2370 }
2371 
TEST_F(DBCompactionTest,L0_CompactionBug_Issue44_b)2372 TEST_F(DBCompactionTest, L0_CompactionBug_Issue44_b) {
2373   do {
2374     CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
2375     Put(1, "", "");
2376     ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2377     Delete(1, "e");
2378     Put(1, "", "");
2379     ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2380     Put(1, "c", "cv");
2381     ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2382     Put(1, "", "");
2383     ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2384     Put(1, "", "");
2385     env_->SleepForMicroseconds(1000000);  // Wait for compaction to finish
2386     ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2387     Put(1, "d", "dv");
2388     ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2389     Put(1, "", "");
2390     ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2391     Delete(1, "d");
2392     Delete(1, "b");
2393     ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2394     ASSERT_EQ("(->)(c->cv)", Contents(1));
2395     env_->SleepForMicroseconds(1000000);  // Wait for compaction to finish
2396     ASSERT_EQ("(->)(c->cv)", Contents(1));
2397   } while (ChangeCompactOptions());
2398 }
2399 
TEST_F(DBCompactionTest,ManualAutoRace)2400 TEST_F(DBCompactionTest, ManualAutoRace) {
2401   CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
2402   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
2403       {{"DBImpl::BGWorkCompaction", "DBCompactionTest::ManualAutoRace:1"},
2404        {"DBImpl::RunManualCompaction:WaitScheduled",
2405         "BackgroundCallCompaction:0"}});
2406 
2407   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
2408 
2409   Put(1, "foo", "");
2410   Put(1, "bar", "");
2411   Flush(1);
2412   Put(1, "foo", "");
2413   Put(1, "bar", "");
2414   // Generate four files in CF 0, which should trigger an auto compaction
2415   Put("foo", "");
2416   Put("bar", "");
2417   Flush();
2418   Put("foo", "");
2419   Put("bar", "");
2420   Flush();
2421   Put("foo", "");
2422   Put("bar", "");
2423   Flush();
2424   Put("foo", "");
2425   Put("bar", "");
2426   Flush();
2427 
2428   // The auto compaction is scheduled but waited until here
2429   TEST_SYNC_POINT("DBCompactionTest::ManualAutoRace:1");
2430   // The auto compaction will wait until the manual compaction is registerd
2431   // before processing so that it will be cancelled.
2432   dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr);
2433   ASSERT_EQ("0,1", FilesPerLevel(1));
2434 
2435   // Eventually the cancelled compaction will be rescheduled and executed.
2436   dbfull()->TEST_WaitForCompact();
2437   ASSERT_EQ("0,1", FilesPerLevel(0));
2438   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
2439 }
2440 
TEST_P(DBCompactionTestWithParam,ManualCompaction)2441 TEST_P(DBCompactionTestWithParam, ManualCompaction) {
2442   Options options = CurrentOptions();
2443   options.max_subcompactions = max_subcompactions_;
2444   options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
2445   CreateAndReopenWithCF({"pikachu"}, options);
2446 
2447   // iter - 0 with 7 levels
2448   // iter - 1 with 3 levels
2449   for (int iter = 0; iter < 2; ++iter) {
2450     MakeTables(3, "p", "q", 1);
2451     ASSERT_EQ("1,1,1", FilesPerLevel(1));
2452 
2453     // Compaction range falls before files
2454     Compact(1, "", "c");
2455     ASSERT_EQ("1,1,1", FilesPerLevel(1));
2456 
2457     // Compaction range falls after files
2458     Compact(1, "r", "z");
2459     ASSERT_EQ("1,1,1", FilesPerLevel(1));
2460 
2461     // Compaction range overlaps files
2462     Compact(1, "p1", "p9");
2463     ASSERT_EQ("0,0,1", FilesPerLevel(1));
2464 
2465     // Populate a different range
2466     MakeTables(3, "c", "e", 1);
2467     ASSERT_EQ("1,1,2", FilesPerLevel(1));
2468 
2469     // Compact just the new range
2470     Compact(1, "b", "f");
2471     ASSERT_EQ("0,0,2", FilesPerLevel(1));
2472 
2473     // Compact all
2474     MakeTables(1, "a", "z", 1);
2475     ASSERT_EQ("1,0,2", FilesPerLevel(1));
2476 
2477     uint64_t prev_block_cache_add =
2478         options.statistics->getTickerCount(BLOCK_CACHE_ADD);
2479     CompactRangeOptions cro;
2480     cro.exclusive_manual_compaction = exclusive_manual_compaction_;
2481     db_->CompactRange(cro, handles_[1], nullptr, nullptr);
2482     // Verify manual compaction doesn't fill block cache
2483     ASSERT_EQ(prev_block_cache_add,
2484               options.statistics->getTickerCount(BLOCK_CACHE_ADD));
2485 
2486     ASSERT_EQ("0,0,1", FilesPerLevel(1));
2487 
2488     if (iter == 0) {
2489       options = CurrentOptions();
2490       options.num_levels = 3;
2491       options.create_if_missing = true;
2492       options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
2493       DestroyAndReopen(options);
2494       CreateAndReopenWithCF({"pikachu"}, options);
2495     }
2496   }
2497 }
2498 
2499 
TEST_P(DBCompactionTestWithParam,ManualLevelCompactionOutputPathId)2500 TEST_P(DBCompactionTestWithParam, ManualLevelCompactionOutputPathId) {
2501   Options options = CurrentOptions();
2502   options.db_paths.emplace_back(dbname_ + "_2", 2 * 10485760);
2503   options.db_paths.emplace_back(dbname_ + "_3", 100 * 10485760);
2504   options.db_paths.emplace_back(dbname_ + "_4", 120 * 10485760);
2505   options.max_subcompactions = max_subcompactions_;
2506   CreateAndReopenWithCF({"pikachu"}, options);
2507 
2508   // iter - 0 with 7 levels
2509   // iter - 1 with 3 levels
2510   for (int iter = 0; iter < 2; ++iter) {
2511     for (int i = 0; i < 3; ++i) {
2512       ASSERT_OK(Put(1, "p", "begin"));
2513       ASSERT_OK(Put(1, "q", "end"));
2514       ASSERT_OK(Flush(1));
2515     }
2516     ASSERT_EQ("3", FilesPerLevel(1));
2517     ASSERT_EQ(3, GetSstFileCount(options.db_paths[0].path));
2518     ASSERT_EQ(0, GetSstFileCount(dbname_));
2519 
2520     // Compaction range falls before files
2521     Compact(1, "", "c");
2522     ASSERT_EQ("3", FilesPerLevel(1));
2523 
2524     // Compaction range falls after files
2525     Compact(1, "r", "z");
2526     ASSERT_EQ("3", FilesPerLevel(1));
2527 
2528     // Compaction range overlaps files
2529     Compact(1, "p1", "p9", 1);
2530     ASSERT_OK(dbfull()->TEST_WaitForCompact());
2531     ASSERT_EQ("0,1", FilesPerLevel(1));
2532     ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2533     ASSERT_EQ(0, GetSstFileCount(options.db_paths[0].path));
2534     ASSERT_EQ(0, GetSstFileCount(dbname_));
2535 
2536     // Populate a different range
2537     for (int i = 0; i < 3; ++i) {
2538       ASSERT_OK(Put(1, "c", "begin"));
2539       ASSERT_OK(Put(1, "e", "end"));
2540       ASSERT_OK(Flush(1));
2541     }
2542     ASSERT_EQ("3,1", FilesPerLevel(1));
2543 
2544     // Compact just the new range
2545     Compact(1, "b", "f", 1);
2546     ASSERT_OK(dbfull()->TEST_WaitForCompact());
2547     ASSERT_EQ("0,2", FilesPerLevel(1));
2548     ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path));
2549     ASSERT_EQ(0, GetSstFileCount(options.db_paths[0].path));
2550     ASSERT_EQ(0, GetSstFileCount(dbname_));
2551 
2552     // Compact all
2553     ASSERT_OK(Put(1, "a", "begin"));
2554     ASSERT_OK(Put(1, "z", "end"));
2555     ASSERT_OK(Flush(1));
2556     ASSERT_EQ("1,2", FilesPerLevel(1));
2557     ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path));
2558     ASSERT_EQ(1, GetSstFileCount(options.db_paths[0].path));
2559     CompactRangeOptions compact_options;
2560     compact_options.target_path_id = 1;
2561     compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
2562     db_->CompactRange(compact_options, handles_[1], nullptr, nullptr);
2563     ASSERT_OK(dbfull()->TEST_WaitForCompact());
2564 
2565     ASSERT_EQ("0,1", FilesPerLevel(1));
2566     ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2567     ASSERT_EQ(0, GetSstFileCount(options.db_paths[0].path));
2568     ASSERT_EQ(0, GetSstFileCount(dbname_));
2569 
2570     if (iter == 0) {
2571       DestroyAndReopen(options);
2572       options = CurrentOptions();
2573       options.db_paths.emplace_back(dbname_ + "_2", 2 * 10485760);
2574       options.db_paths.emplace_back(dbname_ + "_3", 100 * 10485760);
2575       options.db_paths.emplace_back(dbname_ + "_4", 120 * 10485760);
2576       options.max_background_flushes = 1;
2577       options.num_levels = 3;
2578       options.create_if_missing = true;
2579       CreateAndReopenWithCF({"pikachu"}, options);
2580     }
2581   }
2582 }
2583 
TEST_F(DBCompactionTest,FilesDeletedAfterCompaction)2584 TEST_F(DBCompactionTest, FilesDeletedAfterCompaction) {
2585   do {
2586     CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
2587     ASSERT_OK(Put(1, "foo", "v2"));
2588     Compact(1, "a", "z");
2589     const size_t num_files = CountLiveFiles();
2590     for (int i = 0; i < 10; i++) {
2591       ASSERT_OK(Put(1, "foo", "v2"));
2592       Compact(1, "a", "z");
2593     }
2594     ASSERT_EQ(CountLiveFiles(), num_files);
2595   } while (ChangeCompactOptions());
2596 }
2597 
2598 // Check level comapction with compact files
TEST_P(DBCompactionTestWithParam,DISABLED_CompactFilesOnLevelCompaction)2599 TEST_P(DBCompactionTestWithParam, DISABLED_CompactFilesOnLevelCompaction) {
2600   const int kTestKeySize = 16;
2601   const int kTestValueSize = 984;
2602   const int kEntrySize = kTestKeySize + kTestValueSize;
2603   const int kEntriesPerBuffer = 100;
2604   Options options;
2605   options.create_if_missing = true;
2606   options.write_buffer_size = kEntrySize * kEntriesPerBuffer;
2607   options.compaction_style = kCompactionStyleLevel;
2608   options.target_file_size_base = options.write_buffer_size;
2609   options.max_bytes_for_level_base = options.target_file_size_base * 2;
2610   options.level0_stop_writes_trigger = 2;
2611   options.max_bytes_for_level_multiplier = 2;
2612   options.compression = kNoCompression;
2613   options.max_subcompactions = max_subcompactions_;
2614   options = CurrentOptions(options);
2615   CreateAndReopenWithCF({"pikachu"}, options);
2616 
2617   Random rnd(301);
2618   for (int key = 64 * kEntriesPerBuffer; key >= 0; --key) {
2619     ASSERT_OK(Put(1, ToString(key), RandomString(&rnd, kTestValueSize)));
2620   }
2621   dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
2622   dbfull()->TEST_WaitForCompact();
2623 
2624   ColumnFamilyMetaData cf_meta;
2625   dbfull()->GetColumnFamilyMetaData(handles_[1], &cf_meta);
2626   int output_level = static_cast<int>(cf_meta.levels.size()) - 1;
2627   for (int file_picked = 5; file_picked > 0; --file_picked) {
2628     std::set<std::string> overlapping_file_names;
2629     std::vector<std::string> compaction_input_file_names;
2630     for (int f = 0; f < file_picked; ++f) {
2631       int level = 0;
2632       auto file_meta = PickFileRandomly(cf_meta, &rnd, &level);
2633       compaction_input_file_names.push_back(file_meta->name);
2634       GetOverlappingFileNumbersForLevelCompaction(
2635           cf_meta, options.comparator, level, output_level,
2636           file_meta, &overlapping_file_names);
2637     }
2638 
2639     ASSERT_OK(dbfull()->CompactFiles(
2640         CompactionOptions(), handles_[1],
2641         compaction_input_file_names,
2642         output_level));
2643 
2644     // Make sure all overlapping files do not exist after compaction
2645     dbfull()->GetColumnFamilyMetaData(handles_[1], &cf_meta);
2646     VerifyCompactionResult(cf_meta, overlapping_file_names);
2647   }
2648 
2649   // make sure all key-values are still there.
2650   for (int key = 64 * kEntriesPerBuffer; key >= 0; --key) {
2651     ASSERT_NE(Get(1, ToString(key)), "NOT_FOUND");
2652   }
2653 }
2654 
TEST_P(DBCompactionTestWithParam,PartialCompactionFailure)2655 TEST_P(DBCompactionTestWithParam, PartialCompactionFailure) {
2656   Options options;
2657   const int kKeySize = 16;
2658   const int kKvSize = 1000;
2659   const int kKeysPerBuffer = 100;
2660   const int kNumL1Files = 5;
2661   options.create_if_missing = true;
2662   options.write_buffer_size = kKeysPerBuffer * kKvSize;
2663   options.max_write_buffer_number = 2;
2664   options.target_file_size_base =
2665       options.write_buffer_size *
2666       (options.max_write_buffer_number - 1);
2667   options.level0_file_num_compaction_trigger = kNumL1Files;
2668   options.max_bytes_for_level_base =
2669       options.level0_file_num_compaction_trigger *
2670       options.target_file_size_base;
2671   options.max_bytes_for_level_multiplier = 2;
2672   options.compression = kNoCompression;
2673   options.max_subcompactions = max_subcompactions_;
2674 
2675   env_->SetBackgroundThreads(1, Env::HIGH);
2676   env_->SetBackgroundThreads(1, Env::LOW);
2677   // stop the compaction thread until we simulate the file creation failure.
2678   test::SleepingBackgroundTask sleeping_task_low;
2679   env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
2680                  Env::Priority::LOW);
2681 
2682   options.env = env_;
2683 
2684   DestroyAndReopen(options);
2685 
2686   const int kNumInsertedKeys =
2687       options.level0_file_num_compaction_trigger *
2688       (options.max_write_buffer_number - 1) *
2689       kKeysPerBuffer;
2690 
2691   Random rnd(301);
2692   std::vector<std::string> keys;
2693   std::vector<std::string> values;
2694   for (int k = 0; k < kNumInsertedKeys; ++k) {
2695     keys.emplace_back(RandomString(&rnd, kKeySize));
2696     values.emplace_back(RandomString(&rnd, kKvSize - kKeySize));
2697     ASSERT_OK(Put(Slice(keys[k]), Slice(values[k])));
2698     dbfull()->TEST_WaitForFlushMemTable();
2699   }
2700 
2701   dbfull()->TEST_FlushMemTable(true);
2702   // Make sure the number of L0 files can trigger compaction.
2703   ASSERT_GE(NumTableFilesAtLevel(0),
2704             options.level0_file_num_compaction_trigger);
2705 
2706   auto previous_num_level0_files = NumTableFilesAtLevel(0);
2707 
2708   // Fail the first file creation.
2709   env_->non_writable_count_ = 1;
2710   sleeping_task_low.WakeUp();
2711   sleeping_task_low.WaitUntilDone();
2712 
2713   // Expect compaction to fail here as one file will fail its
2714   // creation.
2715   ASSERT_TRUE(!dbfull()->TEST_WaitForCompact().ok());
2716 
2717   // Verify L0 -> L1 compaction does fail.
2718   ASSERT_EQ(NumTableFilesAtLevel(1), 0);
2719 
2720   // Verify all L0 files are still there.
2721   ASSERT_EQ(NumTableFilesAtLevel(0), previous_num_level0_files);
2722 
2723   // All key-values must exist after compaction fails.
2724   for (int k = 0; k < kNumInsertedKeys; ++k) {
2725     ASSERT_EQ(values[k], Get(keys[k]));
2726   }
2727 
2728   env_->non_writable_count_ = 0;
2729 
2730   // Make sure RocksDB will not get into corrupted state.
2731   Reopen(options);
2732 
2733   // Verify again after reopen.
2734   for (int k = 0; k < kNumInsertedKeys; ++k) {
2735     ASSERT_EQ(values[k], Get(keys[k]));
2736   }
2737 }
2738 
TEST_P(DBCompactionTestWithParam,DeleteMovedFileAfterCompaction)2739 TEST_P(DBCompactionTestWithParam, DeleteMovedFileAfterCompaction) {
2740   // iter 1 -- delete_obsolete_files_period_micros == 0
2741   for (int iter = 0; iter < 2; ++iter) {
2742     // This test triggers move compaction and verifies that the file is not
2743     // deleted when it's part of move compaction
2744     Options options = CurrentOptions();
2745     options.env = env_;
2746     if (iter == 1) {
2747       options.delete_obsolete_files_period_micros = 0;
2748     }
2749     options.create_if_missing = true;
2750     options.level0_file_num_compaction_trigger =
2751         2;  // trigger compaction when we have 2 files
2752     OnFileDeletionListener* listener = new OnFileDeletionListener();
2753     options.listeners.emplace_back(listener);
2754     options.max_subcompactions = max_subcompactions_;
2755     DestroyAndReopen(options);
2756 
2757     Random rnd(301);
2758     // Create two 1MB sst files
2759     for (int i = 0; i < 2; ++i) {
2760       // Create 1MB sst file
2761       for (int j = 0; j < 100; ++j) {
2762         ASSERT_OK(Put(Key(i * 50 + j), RandomString(&rnd, 10 * 1024)));
2763       }
2764       ASSERT_OK(Flush());
2765     }
2766     // this should execute L0->L1
2767     dbfull()->TEST_WaitForCompact();
2768     ASSERT_EQ("0,1", FilesPerLevel(0));
2769 
2770     // block compactions
2771     test::SleepingBackgroundTask sleeping_task;
2772     env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task,
2773                    Env::Priority::LOW);
2774 
2775     options.max_bytes_for_level_base = 1024 * 1024;  // 1 MB
2776     Reopen(options);
2777     std::unique_ptr<Iterator> iterator(db_->NewIterator(ReadOptions()));
2778     ASSERT_EQ("0,1", FilesPerLevel(0));
2779     // let compactions go
2780     sleeping_task.WakeUp();
2781     sleeping_task.WaitUntilDone();
2782 
2783     // this should execute L1->L2 (move)
2784     dbfull()->TEST_WaitForCompact();
2785 
2786     ASSERT_EQ("0,0,1", FilesPerLevel(0));
2787 
2788     std::vector<LiveFileMetaData> metadata;
2789     db_->GetLiveFilesMetaData(&metadata);
2790     ASSERT_EQ(metadata.size(), 1U);
2791     auto moved_file_name = metadata[0].name;
2792 
2793     // Create two more 1MB sst files
2794     for (int i = 0; i < 2; ++i) {
2795       // Create 1MB sst file
2796       for (int j = 0; j < 100; ++j) {
2797         ASSERT_OK(Put(Key(i * 50 + j + 100), RandomString(&rnd, 10 * 1024)));
2798       }
2799       ASSERT_OK(Flush());
2800     }
2801     // this should execute both L0->L1 and L1->L2 (merge with previous file)
2802     dbfull()->TEST_WaitForCompact();
2803 
2804     ASSERT_EQ("0,0,2", FilesPerLevel(0));
2805 
2806     // iterator is holding the file
2807     ASSERT_OK(env_->FileExists(dbname_ + moved_file_name));
2808 
2809     listener->SetExpectedFileName(dbname_ + moved_file_name);
2810     iterator.reset();
2811 
2812     // this file should have been compacted away
2813     ASSERT_NOK(env_->FileExists(dbname_ + moved_file_name));
2814     listener->VerifyMatchedCount(1);
2815   }
2816 }
2817 
TEST_P(DBCompactionTestWithParam,CompressLevelCompaction)2818 TEST_P(DBCompactionTestWithParam, CompressLevelCompaction) {
2819   if (!Zlib_Supported()) {
2820     return;
2821   }
2822   Options options = CurrentOptions();
2823   options.memtable_factory.reset(
2824       new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
2825   options.compaction_style = kCompactionStyleLevel;
2826   options.write_buffer_size = 110 << 10;  // 110KB
2827   options.arena_block_size = 4 << 10;
2828   options.level0_file_num_compaction_trigger = 2;
2829   options.num_levels = 4;
2830   options.max_bytes_for_level_base = 400 * 1024;
2831   options.max_subcompactions = max_subcompactions_;
2832   // First two levels have no compression, so that a trivial move between
2833   // them will be allowed. Level 2 has Zlib compression so that a trivial
2834   // move to level 3 will not be allowed
2835   options.compression_per_level = {kNoCompression, kNoCompression,
2836                                    kZlibCompression};
2837   int matches = 0, didnt_match = 0, trivial_move = 0, non_trivial = 0;
2838 
2839   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
2840       "Compaction::InputCompressionMatchesOutput:Matches",
2841       [&](void* /*arg*/) { matches++; });
2842   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
2843       "Compaction::InputCompressionMatchesOutput:DidntMatch",
2844       [&](void* /*arg*/) { didnt_match++; });
2845   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
2846       "DBImpl::BackgroundCompaction:NonTrivial",
2847       [&](void* /*arg*/) { non_trivial++; });
2848   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
2849       "DBImpl::BackgroundCompaction:TrivialMove",
2850       [&](void* /*arg*/) { trivial_move++; });
2851   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
2852 
2853   Reopen(options);
2854 
2855   Random rnd(301);
2856   int key_idx = 0;
2857 
2858   // First three 110KB files are going to level 0
2859   // After that, (100K, 200K)
2860   for (int num = 0; num < 3; num++) {
2861     GenerateNewFile(&rnd, &key_idx);
2862   }
2863 
2864   // Another 110KB triggers a compaction to 400K file to fill up level 0
2865   GenerateNewFile(&rnd, &key_idx);
2866   ASSERT_EQ(4, GetSstFileCount(dbname_));
2867 
2868   // (1, 4)
2869   GenerateNewFile(&rnd, &key_idx);
2870   ASSERT_EQ("1,4", FilesPerLevel(0));
2871 
2872   // (1, 4, 1)
2873   GenerateNewFile(&rnd, &key_idx);
2874   ASSERT_EQ("1,4,1", FilesPerLevel(0));
2875 
2876   // (1, 4, 2)
2877   GenerateNewFile(&rnd, &key_idx);
2878   ASSERT_EQ("1,4,2", FilesPerLevel(0));
2879 
2880   // (1, 4, 3)
2881   GenerateNewFile(&rnd, &key_idx);
2882   ASSERT_EQ("1,4,3", FilesPerLevel(0));
2883 
2884   // (1, 4, 4)
2885   GenerateNewFile(&rnd, &key_idx);
2886   ASSERT_EQ("1,4,4", FilesPerLevel(0));
2887 
2888   // (1, 4, 5)
2889   GenerateNewFile(&rnd, &key_idx);
2890   ASSERT_EQ("1,4,5", FilesPerLevel(0));
2891 
2892   // (1, 4, 6)
2893   GenerateNewFile(&rnd, &key_idx);
2894   ASSERT_EQ("1,4,6", FilesPerLevel(0));
2895 
2896   // (1, 4, 7)
2897   GenerateNewFile(&rnd, &key_idx);
2898   ASSERT_EQ("1,4,7", FilesPerLevel(0));
2899 
2900   // (1, 4, 8)
2901   GenerateNewFile(&rnd, &key_idx);
2902   ASSERT_EQ("1,4,8", FilesPerLevel(0));
2903 
2904   ASSERT_EQ(matches, 12);
2905   // Currently, the test relies on the number of calls to
2906   // InputCompressionMatchesOutput() per compaction.
2907   const int kCallsToInputCompressionMatch = 2;
2908   ASSERT_EQ(didnt_match, 8 * kCallsToInputCompressionMatch);
2909   ASSERT_EQ(trivial_move, 12);
2910   ASSERT_EQ(non_trivial, 8);
2911 
2912   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
2913 
2914   for (int i = 0; i < key_idx; i++) {
2915     auto v = Get(Key(i));
2916     ASSERT_NE(v, "NOT_FOUND");
2917     ASSERT_TRUE(v.size() == 1 || v.size() == 990);
2918   }
2919 
2920   Reopen(options);
2921 
2922   for (int i = 0; i < key_idx; i++) {
2923     auto v = Get(Key(i));
2924     ASSERT_NE(v, "NOT_FOUND");
2925     ASSERT_TRUE(v.size() == 1 || v.size() == 990);
2926   }
2927 
2928   Destroy(options);
2929 }
2930 
TEST_F(DBCompactionTest,SanitizeCompactionOptionsTest)2931 TEST_F(DBCompactionTest, SanitizeCompactionOptionsTest) {
2932   Options options = CurrentOptions();
2933   options.max_background_compactions = 5;
2934   options.soft_pending_compaction_bytes_limit = 0;
2935   options.hard_pending_compaction_bytes_limit = 100;
2936   options.create_if_missing = true;
2937   DestroyAndReopen(options);
2938   ASSERT_EQ(100, db_->GetOptions().soft_pending_compaction_bytes_limit);
2939 
2940   options.max_background_compactions = 3;
2941   options.soft_pending_compaction_bytes_limit = 200;
2942   options.hard_pending_compaction_bytes_limit = 150;
2943   DestroyAndReopen(options);
2944   ASSERT_EQ(150, db_->GetOptions().soft_pending_compaction_bytes_limit);
2945 }
2946 
2947 // This tests for a bug that could cause two level0 compactions running
2948 // concurrently
2949 // TODO(aekmekji): Make sure that the reason this fails when run with
2950 // max_subcompactions > 1 is not a correctness issue but just inherent to
2951 // running parallel L0-L1 compactions
TEST_F(DBCompactionTest,SuggestCompactRangeNoTwoLevel0Compactions)2952 TEST_F(DBCompactionTest, SuggestCompactRangeNoTwoLevel0Compactions) {
2953   Options options = CurrentOptions();
2954   options.compaction_style = kCompactionStyleLevel;
2955   options.write_buffer_size = 110 << 10;
2956   options.arena_block_size = 4 << 10;
2957   options.level0_file_num_compaction_trigger = 4;
2958   options.num_levels = 4;
2959   options.compression = kNoCompression;
2960   options.max_bytes_for_level_base = 450 << 10;
2961   options.target_file_size_base = 98 << 10;
2962   options.max_write_buffer_number = 2;
2963   options.max_background_compactions = 2;
2964 
2965   DestroyAndReopen(options);
2966 
2967   // fill up the DB
2968   Random rnd(301);
2969   for (int num = 0; num < 10; num++) {
2970     GenerateNewRandomFile(&rnd);
2971   }
2972   db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
2973 
2974   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
2975       {{"CompactionJob::Run():Start",
2976         "DBCompactionTest::SuggestCompactRangeNoTwoLevel0Compactions:1"},
2977        {"DBCompactionTest::SuggestCompactRangeNoTwoLevel0Compactions:2",
2978         "CompactionJob::Run():End"}});
2979 
2980   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
2981 
2982   // trigger L0 compaction
2983   for (int num = 0; num < options.level0_file_num_compaction_trigger + 1;
2984        num++) {
2985     GenerateNewRandomFile(&rnd, /* nowait */ true);
2986     ASSERT_OK(Flush());
2987   }
2988 
2989   TEST_SYNC_POINT(
2990       "DBCompactionTest::SuggestCompactRangeNoTwoLevel0Compactions:1");
2991 
2992   GenerateNewRandomFile(&rnd, /* nowait */ true);
2993   dbfull()->TEST_WaitForFlushMemTable();
2994   ASSERT_OK(experimental::SuggestCompactRange(db_, nullptr, nullptr));
2995   for (int num = 0; num < options.level0_file_num_compaction_trigger + 1;
2996        num++) {
2997     GenerateNewRandomFile(&rnd, /* nowait */ true);
2998     ASSERT_OK(Flush());
2999   }
3000 
3001   TEST_SYNC_POINT(
3002       "DBCompactionTest::SuggestCompactRangeNoTwoLevel0Compactions:2");
3003   dbfull()->TEST_WaitForCompact();
3004 }
3005 
ShortKey(int i)3006 static std::string ShortKey(int i) {
3007   assert(i < 10000);
3008   char buf[100];
3009   snprintf(buf, sizeof(buf), "key%04d", i);
3010   return std::string(buf);
3011 }
3012 
TEST_P(DBCompactionTestWithParam,ForceBottommostLevelCompaction)3013 TEST_P(DBCompactionTestWithParam, ForceBottommostLevelCompaction) {
3014   int32_t trivial_move = 0;
3015   int32_t non_trivial_move = 0;
3016   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3017       "DBImpl::BackgroundCompaction:TrivialMove",
3018       [&](void* /*arg*/) { trivial_move++; });
3019   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3020       "DBImpl::BackgroundCompaction:NonTrivial",
3021       [&](void* /*arg*/) { non_trivial_move++; });
3022   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3023 
3024   // The key size is guaranteed to be <= 8
3025   class ShortKeyComparator : public Comparator {
3026     int Compare(const ROCKSDB_NAMESPACE::Slice& a,
3027                 const ROCKSDB_NAMESPACE::Slice& b) const override {
3028       assert(a.size() <= 8);
3029       assert(b.size() <= 8);
3030       return BytewiseComparator()->Compare(a, b);
3031     }
3032     const char* Name() const override { return "ShortKeyComparator"; }
3033     void FindShortestSeparator(
3034         std::string* start,
3035         const ROCKSDB_NAMESPACE::Slice& limit) const override {
3036       return BytewiseComparator()->FindShortestSeparator(start, limit);
3037     }
3038     void FindShortSuccessor(std::string* key) const override {
3039       return BytewiseComparator()->FindShortSuccessor(key);
3040     }
3041   } short_key_cmp;
3042   Options options = CurrentOptions();
3043   options.target_file_size_base = 100000000;
3044   options.write_buffer_size = 100000000;
3045   options.max_subcompactions = max_subcompactions_;
3046   options.comparator = &short_key_cmp;
3047   DestroyAndReopen(options);
3048 
3049   int32_t value_size = 10 * 1024;  // 10 KB
3050 
3051   Random rnd(301);
3052   std::vector<std::string> values;
3053   // File with keys [ 0 => 99 ]
3054   for (int i = 0; i < 100; i++) {
3055     values.push_back(RandomString(&rnd, value_size));
3056     ASSERT_OK(Put(ShortKey(i), values[i]));
3057   }
3058   ASSERT_OK(Flush());
3059 
3060   ASSERT_EQ("1", FilesPerLevel(0));
3061   // Compaction will do L0=>L1 (trivial move) then move L1 files to L3
3062   CompactRangeOptions compact_options;
3063   compact_options.change_level = true;
3064   compact_options.target_level = 3;
3065   ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
3066   ASSERT_EQ("0,0,0,1", FilesPerLevel(0));
3067   ASSERT_EQ(trivial_move, 1);
3068   ASSERT_EQ(non_trivial_move, 0);
3069 
3070   // File with keys [ 100 => 199 ]
3071   for (int i = 100; i < 200; i++) {
3072     values.push_back(RandomString(&rnd, value_size));
3073     ASSERT_OK(Put(ShortKey(i), values[i]));
3074   }
3075   ASSERT_OK(Flush());
3076 
3077   ASSERT_EQ("1,0,0,1", FilesPerLevel(0));
3078   // Compaction will do L0=>L1 L1=>L2 L2=>L3 (3 trivial moves)
3079   // then compacte the bottommost level L3=>L3 (non trivial move)
3080   compact_options = CompactRangeOptions();
3081   compact_options.bottommost_level_compaction =
3082       BottommostLevelCompaction::kForceOptimized;
3083   ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
3084   ASSERT_EQ("0,0,0,1", FilesPerLevel(0));
3085   ASSERT_EQ(trivial_move, 4);
3086   ASSERT_EQ(non_trivial_move, 1);
3087 
3088   // File with keys [ 200 => 299 ]
3089   for (int i = 200; i < 300; i++) {
3090     values.push_back(RandomString(&rnd, value_size));
3091     ASSERT_OK(Put(ShortKey(i), values[i]));
3092   }
3093   ASSERT_OK(Flush());
3094 
3095   ASSERT_EQ("1,0,0,1", FilesPerLevel(0));
3096   trivial_move = 0;
3097   non_trivial_move = 0;
3098   compact_options = CompactRangeOptions();
3099   compact_options.bottommost_level_compaction =
3100       BottommostLevelCompaction::kSkip;
3101   // Compaction will do L0=>L1 L1=>L2 L2=>L3 (3 trivial moves)
3102   // and will skip bottommost level compaction
3103   ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
3104   ASSERT_EQ("0,0,0,2", FilesPerLevel(0));
3105   ASSERT_EQ(trivial_move, 3);
3106   ASSERT_EQ(non_trivial_move, 0);
3107 
3108   for (int i = 0; i < 300; i++) {
3109     ASSERT_EQ(Get(ShortKey(i)), values[i]);
3110   }
3111 
3112   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3113 }
3114 
TEST_P(DBCompactionTestWithParam,IntraL0Compaction)3115 TEST_P(DBCompactionTestWithParam, IntraL0Compaction) {
3116   Options options = CurrentOptions();
3117   options.compression = kNoCompression;
3118   options.level0_file_num_compaction_trigger = 5;
3119   options.max_background_compactions = 2;
3120   options.max_subcompactions = max_subcompactions_;
3121   DestroyAndReopen(options);
3122 
3123   const size_t kValueSize = 1 << 20;
3124   Random rnd(301);
3125   std::string value(RandomString(&rnd, kValueSize));
3126 
3127   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
3128       {{"LevelCompactionPicker::PickCompactionBySize:0",
3129         "CompactionJob::Run():Start"}});
3130   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3131 
3132   // index:   0   1   2   3   4   5   6   7   8   9
3133   // size:  1MB 1MB 1MB 1MB 1MB 2MB 1MB 1MB 1MB 1MB
3134   // score:                     1.5 1.3 1.5 2.0 inf
3135   //
3136   // Files 0-4 will be included in an L0->L1 compaction.
3137   //
3138   // L0->L0 will be triggered since the sync points guarantee compaction to base
3139   // level is still blocked when files 5-9 trigger another compaction.
3140   //
3141   // Files 6-9 are the longest span of available files for which
3142   // work-per-deleted-file decreases (see "score" row above).
3143   for (int i = 0; i < 10; ++i) {
3144     ASSERT_OK(Put(Key(0), ""));  // prevents trivial move
3145     if (i == 5) {
3146       ASSERT_OK(Put(Key(i + 1), value + value));
3147     } else {
3148       ASSERT_OK(Put(Key(i + 1), value));
3149     }
3150     ASSERT_OK(Flush());
3151   }
3152   dbfull()->TEST_WaitForCompact();
3153   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3154 
3155   std::vector<std::vector<FileMetaData>> level_to_files;
3156   dbfull()->TEST_GetFilesMetaData(dbfull()->DefaultColumnFamily(),
3157                                   &level_to_files);
3158   ASSERT_GE(level_to_files.size(), 2);  // at least L0 and L1
3159   // L0 has the 2MB file (not compacted) and 4MB file (output of L0->L0)
3160   ASSERT_EQ(2, level_to_files[0].size());
3161   ASSERT_GT(level_to_files[1].size(), 0);
3162   for (int i = 0; i < 2; ++i) {
3163     ASSERT_GE(level_to_files[0][i].fd.file_size, 1 << 21);
3164   }
3165 }
3166 
TEST_P(DBCompactionTestWithParam,IntraL0CompactionDoesNotObsoleteDeletions)3167 TEST_P(DBCompactionTestWithParam, IntraL0CompactionDoesNotObsoleteDeletions) {
3168   // regression test for issue #2722: L0->L0 compaction can resurrect deleted
3169   // keys from older L0 files if L1+ files' key-ranges do not include the key.
3170   Options options = CurrentOptions();
3171   options.compression = kNoCompression;
3172   options.level0_file_num_compaction_trigger = 5;
3173   options.max_background_compactions = 2;
3174   options.max_subcompactions = max_subcompactions_;
3175   DestroyAndReopen(options);
3176 
3177   const size_t kValueSize = 1 << 20;
3178   Random rnd(301);
3179   std::string value(RandomString(&rnd, kValueSize));
3180 
3181   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
3182       {{"LevelCompactionPicker::PickCompactionBySize:0",
3183         "CompactionJob::Run():Start"}});
3184   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3185 
3186   // index:   0   1   2   3   4    5    6   7   8   9
3187   // size:  1MB 1MB 1MB 1MB 1MB  1MB  1MB 1MB 1MB 1MB
3188   // score:                     1.25 1.33 1.5 2.0 inf
3189   //
3190   // Files 0-4 will be included in an L0->L1 compaction.
3191   //
3192   // L0->L0 will be triggered since the sync points guarantee compaction to base
3193   // level is still blocked when files 5-9 trigger another compaction. All files
3194   // 5-9 are included in the L0->L0 due to work-per-deleted file decreasing.
3195   //
3196   // Put a key-value in files 0-4. Delete that key in files 5-9. Verify the
3197   // L0->L0 preserves the deletion such that the key remains deleted.
3198   for (int i = 0; i < 10; ++i) {
3199     // key 0 serves both to prevent trivial move and as the key we want to
3200     // verify is not resurrected by L0->L0 compaction.
3201     if (i < 5) {
3202       ASSERT_OK(Put(Key(0), ""));
3203     } else {
3204       ASSERT_OK(Delete(Key(0)));
3205     }
3206     ASSERT_OK(Put(Key(i + 1), value));
3207     ASSERT_OK(Flush());
3208   }
3209   dbfull()->TEST_WaitForCompact();
3210   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3211 
3212   std::vector<std::vector<FileMetaData>> level_to_files;
3213   dbfull()->TEST_GetFilesMetaData(dbfull()->DefaultColumnFamily(),
3214                                   &level_to_files);
3215   ASSERT_GE(level_to_files.size(), 2);  // at least L0 and L1
3216   // L0 has a single output file from L0->L0
3217   ASSERT_EQ(1, level_to_files[0].size());
3218   ASSERT_GT(level_to_files[1].size(), 0);
3219   ASSERT_GE(level_to_files[0][0].fd.file_size, 1 << 22);
3220 
3221   ReadOptions roptions;
3222   std::string result;
3223   ASSERT_TRUE(db_->Get(roptions, Key(0), &result).IsNotFound());
3224 }
3225 
TEST_P(DBCompactionTestWithParam,FullCompactionInBottomPriThreadPool)3226 TEST_P(DBCompactionTestWithParam, FullCompactionInBottomPriThreadPool) {
3227   const int kNumFilesTrigger = 3;
3228   Env::Default()->SetBackgroundThreads(1, Env::Priority::BOTTOM);
3229   for (bool use_universal_compaction : {false, true}) {
3230     Options options = CurrentOptions();
3231     if (use_universal_compaction) {
3232       options.compaction_style = kCompactionStyleUniversal;
3233     } else {
3234       options.compaction_style = kCompactionStyleLevel;
3235       options.level_compaction_dynamic_level_bytes = true;
3236     }
3237     options.num_levels = 4;
3238     options.write_buffer_size = 100 << 10;     // 100KB
3239     options.target_file_size_base = 32 << 10;  // 32KB
3240     options.level0_file_num_compaction_trigger = kNumFilesTrigger;
3241     // Trigger compaction if size amplification exceeds 110%
3242     options.compaction_options_universal.max_size_amplification_percent = 110;
3243     DestroyAndReopen(options);
3244 
3245     int num_bottom_pri_compactions = 0;
3246     SyncPoint::GetInstance()->SetCallBack(
3247         "DBImpl::BGWorkBottomCompaction",
3248         [&](void* /*arg*/) { ++num_bottom_pri_compactions; });
3249     SyncPoint::GetInstance()->EnableProcessing();
3250 
3251     Random rnd(301);
3252     for (int num = 0; num < kNumFilesTrigger; num++) {
3253       ASSERT_EQ(NumSortedRuns(), num);
3254       int key_idx = 0;
3255       GenerateNewFile(&rnd, &key_idx);
3256     }
3257     dbfull()->TEST_WaitForCompact();
3258 
3259     ASSERT_EQ(1, num_bottom_pri_compactions);
3260 
3261     // Verify that size amplification did occur
3262     ASSERT_EQ(NumSortedRuns(), 1);
3263     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3264   }
3265   Env::Default()->SetBackgroundThreads(0, Env::Priority::BOTTOM);
3266 }
3267 
TEST_F(DBCompactionTest,OptimizedDeletionObsoleting)3268 TEST_F(DBCompactionTest, OptimizedDeletionObsoleting) {
3269   // Deletions can be dropped when compacted to non-last level if they fall
3270   // outside the lower-level files' key-ranges.
3271   const int kNumL0Files = 4;
3272   Options options = CurrentOptions();
3273   options.level0_file_num_compaction_trigger = kNumL0Files;
3274   options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
3275   DestroyAndReopen(options);
3276 
3277   // put key 1 and 3 in separate L1, L2 files.
3278   // So key 0, 2, and 4+ fall outside these levels' key-ranges.
3279   for (int level = 2; level >= 1; --level) {
3280     for (int i = 0; i < 2; ++i) {
3281       Put(Key(2 * i + 1), "val");
3282       Flush();
3283     }
3284     MoveFilesToLevel(level);
3285     ASSERT_EQ(2, NumTableFilesAtLevel(level));
3286   }
3287 
3288   // Delete keys in range [1, 4]. These L0 files will be compacted with L1:
3289   // - Tombstones for keys 2 and 4 can be dropped early.
3290   // - Tombstones for keys 1 and 3 must be kept due to L2 files' key-ranges.
3291   for (int i = 0; i < kNumL0Files; ++i) {
3292     Put(Key(0), "val");  // sentinel to prevent trivial move
3293     Delete(Key(i + 1));
3294     Flush();
3295   }
3296   dbfull()->TEST_WaitForCompact();
3297 
3298   for (int i = 0; i < kNumL0Files; ++i) {
3299     std::string value;
3300     ASSERT_TRUE(db_->Get(ReadOptions(), Key(i + 1), &value).IsNotFound());
3301   }
3302   ASSERT_EQ(2, options.statistics->getTickerCount(
3303                    COMPACTION_OPTIMIZED_DEL_DROP_OBSOLETE));
3304   ASSERT_EQ(2,
3305             options.statistics->getTickerCount(COMPACTION_KEY_DROP_OBSOLETE));
3306 }
3307 
TEST_F(DBCompactionTest,CompactFilesPendingL0Bug)3308 TEST_F(DBCompactionTest, CompactFilesPendingL0Bug) {
3309   // https://www.facebook.com/groups/rocksdb.dev/permalink/1389452781153232/
3310   // CompactFiles() had a bug where it failed to pick a compaction when an L0
3311   // compaction existed, but marked it as scheduled anyways. It'd never be
3312   // unmarked as scheduled, so future compactions or DB close could hang.
3313   const int kNumL0Files = 5;
3314   Options options = CurrentOptions();
3315   options.level0_file_num_compaction_trigger = kNumL0Files - 1;
3316   options.max_background_compactions = 2;
3317   DestroyAndReopen(options);
3318 
3319   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
3320       {{"LevelCompactionPicker::PickCompaction:Return",
3321         "DBCompactionTest::CompactFilesPendingL0Bug:Picked"},
3322        {"DBCompactionTest::CompactFilesPendingL0Bug:ManualCompacted",
3323         "DBImpl::BackgroundCompaction:NonTrivial:AfterRun"}});
3324   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3325 
3326   auto schedule_multi_compaction_token =
3327       dbfull()->TEST_write_controler().GetCompactionPressureToken();
3328 
3329   // Files 0-3 will be included in an L0->L1 compaction.
3330   //
3331   // File 4 will be included in a call to CompactFiles() while the first
3332   // compaction is running.
3333   for (int i = 0; i < kNumL0Files - 1; ++i) {
3334     ASSERT_OK(Put(Key(0), "val"));  // sentinel to prevent trivial move
3335     ASSERT_OK(Put(Key(i + 1), "val"));
3336     ASSERT_OK(Flush());
3337   }
3338   TEST_SYNC_POINT("DBCompactionTest::CompactFilesPendingL0Bug:Picked");
3339   // file 4 flushed after 0-3 picked
3340   ASSERT_OK(Put(Key(kNumL0Files), "val"));
3341   ASSERT_OK(Flush());
3342 
3343   // previously DB close would hang forever as this situation caused scheduled
3344   // compactions count to never decrement to zero.
3345   ColumnFamilyMetaData cf_meta;
3346   dbfull()->GetColumnFamilyMetaData(dbfull()->DefaultColumnFamily(), &cf_meta);
3347   ASSERT_EQ(kNumL0Files, cf_meta.levels[0].files.size());
3348   std::vector<std::string> input_filenames;
3349   input_filenames.push_back(cf_meta.levels[0].files.front().name);
3350   ASSERT_OK(dbfull()
3351                   ->CompactFiles(CompactionOptions(), input_filenames,
3352                                  0 /* output_level */));
3353   TEST_SYNC_POINT("DBCompactionTest::CompactFilesPendingL0Bug:ManualCompacted");
3354   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3355 }
3356 
TEST_F(DBCompactionTest,CompactFilesOverlapInL0Bug)3357 TEST_F(DBCompactionTest, CompactFilesOverlapInL0Bug) {
3358   // Regression test for bug of not pulling in L0 files that overlap the user-
3359   // specified input files in time- and key-ranges.
3360   Put(Key(0), "old_val");
3361   Flush();
3362   Put(Key(0), "new_val");
3363   Flush();
3364 
3365   ColumnFamilyMetaData cf_meta;
3366   dbfull()->GetColumnFamilyMetaData(dbfull()->DefaultColumnFamily(), &cf_meta);
3367   ASSERT_GE(cf_meta.levels.size(), 2);
3368   ASSERT_EQ(2, cf_meta.levels[0].files.size());
3369 
3370   // Compacting {new L0 file, L1 file} should pull in the old L0 file since it
3371   // overlaps in key-range and time-range.
3372   std::vector<std::string> input_filenames;
3373   input_filenames.push_back(cf_meta.levels[0].files.front().name);
3374   ASSERT_OK(dbfull()->CompactFiles(CompactionOptions(), input_filenames,
3375                                    1 /* output_level */));
3376   ASSERT_EQ("new_val", Get(Key(0)));
3377 }
3378 
TEST_F(DBCompactionTest,CompactBottomLevelFilesWithDeletions)3379 TEST_F(DBCompactionTest, CompactBottomLevelFilesWithDeletions) {
3380   // bottom-level files may contain deletions due to snapshots protecting the
3381   // deleted keys. Once the snapshot is released, we should see files with many
3382   // such deletions undergo single-file compactions.
3383   const int kNumKeysPerFile = 1024;
3384   const int kNumLevelFiles = 4;
3385   const int kValueSize = 128;
3386   Options options = CurrentOptions();
3387   options.compression = kNoCompression;
3388   options.level0_file_num_compaction_trigger = kNumLevelFiles;
3389   // inflate it a bit to account for key/metadata overhead
3390   options.target_file_size_base = 120 * kNumKeysPerFile * kValueSize / 100;
3391   CreateAndReopenWithCF({"one"}, options);
3392 
3393   Random rnd(301);
3394   const Snapshot* snapshot = nullptr;
3395   for (int i = 0; i < kNumLevelFiles; ++i) {
3396     for (int j = 0; j < kNumKeysPerFile; ++j) {
3397       ASSERT_OK(
3398           Put(Key(i * kNumKeysPerFile + j), RandomString(&rnd, kValueSize)));
3399     }
3400     if (i == kNumLevelFiles - 1) {
3401       snapshot = db_->GetSnapshot();
3402       // delete every other key after grabbing a snapshot, so these deletions
3403       // and the keys they cover can't be dropped until after the snapshot is
3404       // released.
3405       for (int j = 0; j < kNumLevelFiles * kNumKeysPerFile; j += 2) {
3406         ASSERT_OK(Delete(Key(j)));
3407       }
3408     }
3409     Flush();
3410     if (i < kNumLevelFiles - 1) {
3411       ASSERT_EQ(i + 1, NumTableFilesAtLevel(0));
3412     }
3413   }
3414   dbfull()->TEST_WaitForCompact();
3415   ASSERT_EQ(kNumLevelFiles, NumTableFilesAtLevel(1));
3416 
3417   std::vector<LiveFileMetaData> pre_release_metadata, post_release_metadata;
3418   db_->GetLiveFilesMetaData(&pre_release_metadata);
3419   // just need to bump seqnum so ReleaseSnapshot knows the newest key in the SST
3420   // files does not need to be preserved in case of a future snapshot.
3421   ASSERT_OK(Put(Key(0), "val"));
3422   ASSERT_NE(kMaxSequenceNumber, dbfull()->bottommost_files_mark_threshold_);
3423   // release snapshot and wait for compactions to finish. Single-file
3424   // compactions should be triggered, which reduce the size of each bottom-level
3425   // file without changing file count.
3426   db_->ReleaseSnapshot(snapshot);
3427   ASSERT_EQ(kMaxSequenceNumber, dbfull()->bottommost_files_mark_threshold_);
3428   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3429       "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
3430         Compaction* compaction = reinterpret_cast<Compaction*>(arg);
3431         ASSERT_TRUE(compaction->compaction_reason() ==
3432                     CompactionReason::kBottommostFiles);
3433       });
3434   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3435   dbfull()->TEST_WaitForCompact();
3436   db_->GetLiveFilesMetaData(&post_release_metadata);
3437   ASSERT_EQ(pre_release_metadata.size(), post_release_metadata.size());
3438 
3439   for (size_t i = 0; i < pre_release_metadata.size(); ++i) {
3440     const auto& pre_file = pre_release_metadata[i];
3441     const auto& post_file = post_release_metadata[i];
3442     ASSERT_EQ(1, pre_file.level);
3443     ASSERT_EQ(1, post_file.level);
3444     // each file is smaller than it was before as it was rewritten without
3445     // deletion markers/deleted keys.
3446     ASSERT_LT(post_file.size, pre_file.size);
3447   }
3448   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3449 }
3450 
TEST_F(DBCompactionTest,LevelCompactExpiredTtlFiles)3451 TEST_F(DBCompactionTest, LevelCompactExpiredTtlFiles) {
3452   const int kNumKeysPerFile = 32;
3453   const int kNumLevelFiles = 2;
3454   const int kValueSize = 1024;
3455 
3456   Options options = CurrentOptions();
3457   options.compression = kNoCompression;
3458   options.ttl = 24 * 60 * 60;  // 24 hours
3459   options.max_open_files = -1;
3460   env_->time_elapse_only_sleep_ = false;
3461   options.env = env_;
3462 
3463   env_->addon_time_.store(0);
3464   DestroyAndReopen(options);
3465 
3466   Random rnd(301);
3467   for (int i = 0; i < kNumLevelFiles; ++i) {
3468     for (int j = 0; j < kNumKeysPerFile; ++j) {
3469       ASSERT_OK(
3470           Put(Key(i * kNumKeysPerFile + j), RandomString(&rnd, kValueSize)));
3471     }
3472     Flush();
3473   }
3474   dbfull()->TEST_WaitForCompact();
3475   MoveFilesToLevel(3);
3476   ASSERT_EQ("0,0,0,2", FilesPerLevel());
3477 
3478   // Delete previously written keys.
3479   for (int i = 0; i < kNumLevelFiles; ++i) {
3480     for (int j = 0; j < kNumKeysPerFile; ++j) {
3481       ASSERT_OK(Delete(Key(i * kNumKeysPerFile + j)));
3482     }
3483     Flush();
3484   }
3485   dbfull()->TEST_WaitForCompact();
3486   ASSERT_EQ("2,0,0,2", FilesPerLevel());
3487   MoveFilesToLevel(1);
3488   ASSERT_EQ("0,2,0,2", FilesPerLevel());
3489 
3490   env_->addon_time_.fetch_add(36 * 60 * 60);  // 36 hours
3491   ASSERT_EQ("0,2,0,2", FilesPerLevel());
3492 
3493   // Just do a simple write + flush so that the Ttl expired files get
3494   // compacted.
3495   ASSERT_OK(Put("a", "1"));
3496   Flush();
3497   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3498       "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
3499         Compaction* compaction = reinterpret_cast<Compaction*>(arg);
3500         ASSERT_TRUE(compaction->compaction_reason() == CompactionReason::kTtl);
3501       });
3502   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3503   dbfull()->TEST_WaitForCompact();
3504   // All non-L0 files are deleted, as they contained only deleted data.
3505   ASSERT_EQ("1", FilesPerLevel());
3506   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3507 
3508   // Test dynamically changing ttl.
3509 
3510   env_->addon_time_.store(0);
3511   DestroyAndReopen(options);
3512 
3513   for (int i = 0; i < kNumLevelFiles; ++i) {
3514     for (int j = 0; j < kNumKeysPerFile; ++j) {
3515       ASSERT_OK(
3516           Put(Key(i * kNumKeysPerFile + j), RandomString(&rnd, kValueSize)));
3517     }
3518     Flush();
3519   }
3520   dbfull()->TEST_WaitForCompact();
3521   MoveFilesToLevel(3);
3522   ASSERT_EQ("0,0,0,2", FilesPerLevel());
3523 
3524   // Delete previously written keys.
3525   for (int i = 0; i < kNumLevelFiles; ++i) {
3526     for (int j = 0; j < kNumKeysPerFile; ++j) {
3527       ASSERT_OK(Delete(Key(i * kNumKeysPerFile + j)));
3528     }
3529     Flush();
3530   }
3531   dbfull()->TEST_WaitForCompact();
3532   ASSERT_EQ("2,0,0,2", FilesPerLevel());
3533   MoveFilesToLevel(1);
3534   ASSERT_EQ("0,2,0,2", FilesPerLevel());
3535 
3536   // Move time forward by 12 hours, and make sure that compaction still doesn't
3537   // trigger as ttl is set to 24 hours.
3538   env_->addon_time_.fetch_add(12 * 60 * 60);
3539   ASSERT_OK(Put("a", "1"));
3540   Flush();
3541   dbfull()->TEST_WaitForCompact();
3542   ASSERT_EQ("1,2,0,2", FilesPerLevel());
3543 
3544   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3545       "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
3546         Compaction* compaction = reinterpret_cast<Compaction*>(arg);
3547         ASSERT_TRUE(compaction->compaction_reason() == CompactionReason::kTtl);
3548       });
3549   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3550 
3551   // Dynamically change ttl to 10 hours.
3552   // This should trigger a ttl compaction, as 12 hours have already passed.
3553   ASSERT_OK(dbfull()->SetOptions({{"ttl", "36000"}}));
3554   dbfull()->TEST_WaitForCompact();
3555   // All non-L0 files are deleted, as they contained only deleted data.
3556   ASSERT_EQ("1", FilesPerLevel());
3557   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3558 }
3559 
TEST_F(DBCompactionTest,LevelTtlCascadingCompactions)3560 TEST_F(DBCompactionTest, LevelTtlCascadingCompactions) {
3561   const int kValueSize = 100;
3562 
3563   for (bool if_restart : {false, true}) {
3564     for (bool if_open_all_files : {false, true}) {
3565       Options options = CurrentOptions();
3566       options.compression = kNoCompression;
3567       options.ttl = 24 * 60 * 60;  // 24 hours
3568       if (if_open_all_files) {
3569         options.max_open_files = -1;
3570       } else {
3571         options.max_open_files = 20;
3572       }
3573       // RocksDB sanitize max open files to at least 20. Modify it back.
3574       ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3575           "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
3576             int* max_open_files = static_cast<int*>(arg);
3577             *max_open_files = 2;
3578           });
3579       // In the case where all files are opened and doing DB restart
3580       // forcing the oldest ancester time in manifest file to be 0 to
3581       // simulate the case of reading from an old version.
3582       ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3583           "VersionEdit::EncodeTo:VarintOldestAncesterTime", [&](void* arg) {
3584             if (if_restart && if_open_all_files) {
3585               std::string* encoded_fieled = static_cast<std::string*>(arg);
3586               *encoded_fieled = "";
3587               PutVarint64(encoded_fieled, 0);
3588             }
3589           });
3590 
3591       env_->time_elapse_only_sleep_ = false;
3592       options.env = env_;
3593 
3594       env_->addon_time_.store(0);
3595       DestroyAndReopen(options);
3596 
3597       int ttl_compactions = 0;
3598       ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3599           "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
3600             Compaction* compaction = reinterpret_cast<Compaction*>(arg);
3601             auto compaction_reason = compaction->compaction_reason();
3602             if (compaction_reason == CompactionReason::kTtl) {
3603               ttl_compactions++;
3604             }
3605           });
3606       ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3607 
3608       // Add two L6 files with key ranges: [1 .. 100], [101 .. 200].
3609       Random rnd(301);
3610       for (int i = 1; i <= 100; ++i) {
3611         ASSERT_OK(Put(Key(i), RandomString(&rnd, kValueSize)));
3612       }
3613       Flush();
3614       // Get the first file's creation time. This will be the oldest file in the
3615       // DB. Compactions inolving this file's descendents should keep getting
3616       // this time.
3617       std::vector<std::vector<FileMetaData>> level_to_files;
3618       dbfull()->TEST_GetFilesMetaData(dbfull()->DefaultColumnFamily(),
3619                                       &level_to_files);
3620       uint64_t oldest_time = level_to_files[0][0].oldest_ancester_time;
3621       // Add 1 hour and do another flush.
3622       env_->addon_time_.fetch_add(1 * 60 * 60);
3623       for (int i = 101; i <= 200; ++i) {
3624         ASSERT_OK(Put(Key(i), RandomString(&rnd, kValueSize)));
3625       }
3626       Flush();
3627       MoveFilesToLevel(6);
3628       ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel());
3629 
3630       env_->addon_time_.fetch_add(1 * 60 * 60);
3631       // Add two L4 files with key ranges: [1 .. 50], [51 .. 150].
3632       for (int i = 1; i <= 50; ++i) {
3633         ASSERT_OK(Put(Key(i), RandomString(&rnd, kValueSize)));
3634       }
3635       Flush();
3636       env_->addon_time_.fetch_add(1 * 60 * 60);
3637       for (int i = 51; i <= 150; ++i) {
3638         ASSERT_OK(Put(Key(i), RandomString(&rnd, kValueSize)));
3639       }
3640       Flush();
3641       MoveFilesToLevel(4);
3642       ASSERT_EQ("0,0,0,0,2,0,2", FilesPerLevel());
3643 
3644       env_->addon_time_.fetch_add(1 * 60 * 60);
3645       // Add one L1 file with key range: [26, 75].
3646       for (int i = 26; i <= 75; ++i) {
3647         ASSERT_OK(Put(Key(i), RandomString(&rnd, kValueSize)));
3648       }
3649       Flush();
3650       dbfull()->TEST_WaitForCompact();
3651       MoveFilesToLevel(1);
3652       ASSERT_EQ("0,1,0,0,2,0,2", FilesPerLevel());
3653 
3654       // LSM tree:
3655       // L1:         [26 .. 75]
3656       // L4:     [1 .. 50][51 ..... 150]
3657       // L6:     [1 ........ 100][101 .... 200]
3658       //
3659       // On TTL expiry, TTL compaction should be initiated on L1 file, and the
3660       // compactions should keep going on until the key range hits bottom level.
3661       // In other words: the compaction on this data range "cascasdes" until
3662       // reaching the bottom level.
3663       //
3664       // Order of events on TTL expiry:
3665       // 1. L1 file falls to L3 via 2 trivial moves which are initiated by the
3666       // ttl
3667       //    compaction.
3668       // 2. A TTL compaction happens between L3 and L4 files. Output file in L4.
3669       // 3. The new output file from L4 falls to L5 via 1 trival move initiated
3670       //    by the ttl compaction.
3671       // 4. A TTL compaction happens between L5 and L6 files. Ouptut in L6.
3672 
3673       // Add 25 hours and do a write
3674       env_->addon_time_.fetch_add(25 * 60 * 60);
3675 
3676       ASSERT_OK(Put(Key(1), "1"));
3677       if (if_restart) {
3678         Reopen(options);
3679       } else {
3680         Flush();
3681       }
3682       dbfull()->TEST_WaitForCompact();
3683       ASSERT_EQ("1,0,0,0,0,0,1", FilesPerLevel());
3684       ASSERT_EQ(5, ttl_compactions);
3685 
3686       dbfull()->TEST_GetFilesMetaData(dbfull()->DefaultColumnFamily(),
3687                                       &level_to_files);
3688       ASSERT_EQ(oldest_time, level_to_files[6][0].oldest_ancester_time);
3689 
3690       env_->addon_time_.fetch_add(25 * 60 * 60);
3691       ASSERT_OK(Put(Key(2), "1"));
3692       if (if_restart) {
3693         Reopen(options);
3694       } else {
3695         Flush();
3696       }
3697       dbfull()->TEST_WaitForCompact();
3698       ASSERT_EQ("1,0,0,0,0,0,1", FilesPerLevel());
3699       ASSERT_GE(ttl_compactions, 6);
3700 
3701       ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3702     }
3703   }
3704 }
3705 
TEST_F(DBCompactionTest,LevelPeriodicCompaction)3706 TEST_F(DBCompactionTest, LevelPeriodicCompaction) {
3707   const int kNumKeysPerFile = 32;
3708   const int kNumLevelFiles = 2;
3709   const int kValueSize = 100;
3710 
3711   for (bool if_restart : {false, true}) {
3712     for (bool if_open_all_files : {false, true}) {
3713       Options options = CurrentOptions();
3714       options.periodic_compaction_seconds = 48 * 60 * 60;  // 2 days
3715       if (if_open_all_files) {
3716         options.max_open_files = -1;  // needed for ttl compaction
3717       } else {
3718         options.max_open_files = 20;
3719       }
3720       // RocksDB sanitize max open files to at least 20. Modify it back.
3721       ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3722           "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
3723             int* max_open_files = static_cast<int*>(arg);
3724             *max_open_files = 0;
3725           });
3726       // In the case where all files are opened and doing DB restart
3727       // forcing the file creation time in manifest file to be 0 to
3728       // simulate the case of reading from an old version.
3729       ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3730           "VersionEdit::EncodeTo:VarintFileCreationTime", [&](void* arg) {
3731             if (if_restart && if_open_all_files) {
3732               std::string* encoded_fieled = static_cast<std::string*>(arg);
3733               *encoded_fieled = "";
3734               PutVarint64(encoded_fieled, 0);
3735             }
3736           });
3737 
3738       env_->time_elapse_only_sleep_ = false;
3739       options.env = env_;
3740 
3741       env_->addon_time_.store(0);
3742       DestroyAndReopen(options);
3743 
3744       int periodic_compactions = 0;
3745       ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3746           "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
3747             Compaction* compaction = reinterpret_cast<Compaction*>(arg);
3748             auto compaction_reason = compaction->compaction_reason();
3749             if (compaction_reason == CompactionReason::kPeriodicCompaction) {
3750               periodic_compactions++;
3751             }
3752           });
3753       ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3754 
3755       Random rnd(301);
3756       for (int i = 0; i < kNumLevelFiles; ++i) {
3757         for (int j = 0; j < kNumKeysPerFile; ++j) {
3758           ASSERT_OK(Put(Key(i * kNumKeysPerFile + j),
3759                         RandomString(&rnd, kValueSize)));
3760         }
3761         Flush();
3762       }
3763       dbfull()->TEST_WaitForCompact();
3764 
3765       ASSERT_EQ("2", FilesPerLevel());
3766       ASSERT_EQ(0, periodic_compactions);
3767 
3768       // Add 50 hours and do a write
3769       env_->addon_time_.fetch_add(50 * 60 * 60);
3770       ASSERT_OK(Put("a", "1"));
3771       Flush();
3772       dbfull()->TEST_WaitForCompact();
3773       // Assert that the files stay in the same level
3774       ASSERT_EQ("3", FilesPerLevel());
3775       // The two old files go through the periodic compaction process
3776       ASSERT_EQ(2, periodic_compactions);
3777 
3778       MoveFilesToLevel(1);
3779       ASSERT_EQ("0,3", FilesPerLevel());
3780 
3781       // Add another 50 hours and do another write
3782       env_->addon_time_.fetch_add(50 * 60 * 60);
3783       ASSERT_OK(Put("b", "2"));
3784       if (if_restart) {
3785         Reopen(options);
3786       } else {
3787         Flush();
3788       }
3789       dbfull()->TEST_WaitForCompact();
3790       ASSERT_EQ("1,3", FilesPerLevel());
3791       // The three old files now go through the periodic compaction process. 2
3792       // + 3.
3793       ASSERT_EQ(5, periodic_compactions);
3794 
3795       // Add another 50 hours and do another write
3796       env_->addon_time_.fetch_add(50 * 60 * 60);
3797       ASSERT_OK(Put("c", "3"));
3798       Flush();
3799       dbfull()->TEST_WaitForCompact();
3800       ASSERT_EQ("2,3", FilesPerLevel());
3801       // The four old files now go through the periodic compaction process. 5
3802       // + 4.
3803       ASSERT_EQ(9, periodic_compactions);
3804 
3805       ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3806     }
3807   }
3808 }
3809 
TEST_F(DBCompactionTest,LevelPeriodicCompactionWithOldDB)3810 TEST_F(DBCompactionTest, LevelPeriodicCompactionWithOldDB) {
3811   // This test makes sure that periodic compactions are working with a DB
3812   // where file_creation_time of some files is 0.
3813   // After compactions the new files are created with a valid file_creation_time
3814 
3815   const int kNumKeysPerFile = 32;
3816   const int kNumFiles = 4;
3817   const int kValueSize = 100;
3818 
3819   Options options = CurrentOptions();
3820   env_->time_elapse_only_sleep_ = false;
3821   options.env = env_;
3822 
3823   env_->addon_time_.store(0);
3824   DestroyAndReopen(options);
3825 
3826   int periodic_compactions = 0;
3827   bool set_file_creation_time_to_zero = true;
3828   bool set_creation_time_to_zero = true;
3829   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3830       "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
3831         Compaction* compaction = reinterpret_cast<Compaction*>(arg);
3832         auto compaction_reason = compaction->compaction_reason();
3833         if (compaction_reason == CompactionReason::kPeriodicCompaction) {
3834           periodic_compactions++;
3835         }
3836       });
3837   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3838       "PropertyBlockBuilder::AddTableProperty:Start", [&](void* arg) {
3839         TableProperties* props = reinterpret_cast<TableProperties*>(arg);
3840         if (set_file_creation_time_to_zero) {
3841           props->file_creation_time = 0;
3842         }
3843         if (set_creation_time_to_zero) {
3844           props->creation_time = 0;
3845         }
3846       });
3847   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3848 
3849   Random rnd(301);
3850   for (int i = 0; i < kNumFiles; ++i) {
3851     for (int j = 0; j < kNumKeysPerFile; ++j) {
3852       ASSERT_OK(
3853           Put(Key(i * kNumKeysPerFile + j), RandomString(&rnd, kValueSize)));
3854     }
3855     Flush();
3856     // Move the first two files to L2.
3857     if (i == 1) {
3858       MoveFilesToLevel(2);
3859       set_creation_time_to_zero = false;
3860     }
3861   }
3862   ASSERT_OK(dbfull()->TEST_WaitForCompact());
3863 
3864   ASSERT_EQ("2,0,2", FilesPerLevel());
3865   ASSERT_EQ(0, periodic_compactions);
3866 
3867   Close();
3868 
3869   set_file_creation_time_to_zero = false;
3870   // Forward the clock by 2 days.
3871   env_->addon_time_.fetch_add(2 * 24 * 60 * 60);
3872   options.periodic_compaction_seconds = 1 * 24 * 60 * 60;  // 1 day
3873 
3874   Reopen(options);
3875   ASSERT_OK(dbfull()->TEST_WaitForCompact());
3876   ASSERT_EQ("2,0,2", FilesPerLevel());
3877   // Make sure that all files go through periodic compaction.
3878   ASSERT_EQ(kNumFiles, periodic_compactions);
3879 
3880   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3881 }
3882 
TEST_F(DBCompactionTest,LevelPeriodicAndTtlCompaction)3883 TEST_F(DBCompactionTest, LevelPeriodicAndTtlCompaction) {
3884   const int kNumKeysPerFile = 32;
3885   const int kNumLevelFiles = 2;
3886   const int kValueSize = 100;
3887 
3888   Options options = CurrentOptions();
3889   options.ttl = 10 * 60 * 60;  // 10 hours
3890   options.periodic_compaction_seconds = 48 * 60 * 60;  // 2 days
3891   options.max_open_files = -1;   // needed for both periodic and ttl compactions
3892   env_->time_elapse_only_sleep_ = false;
3893   options.env = env_;
3894 
3895   env_->addon_time_.store(0);
3896   DestroyAndReopen(options);
3897 
3898   int periodic_compactions = 0;
3899   int ttl_compactions = 0;
3900   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3901       "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
3902         Compaction* compaction = reinterpret_cast<Compaction*>(arg);
3903         auto compaction_reason = compaction->compaction_reason();
3904         if (compaction_reason == CompactionReason::kPeriodicCompaction) {
3905           periodic_compactions++;
3906         } else if (compaction_reason == CompactionReason::kTtl) {
3907           ttl_compactions++;
3908         }
3909       });
3910   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3911 
3912   Random rnd(301);
3913   for (int i = 0; i < kNumLevelFiles; ++i) {
3914     for (int j = 0; j < kNumKeysPerFile; ++j) {
3915       ASSERT_OK(
3916           Put(Key(i * kNumKeysPerFile + j), RandomString(&rnd, kValueSize)));
3917     }
3918     Flush();
3919   }
3920   dbfull()->TEST_WaitForCompact();
3921 
3922   MoveFilesToLevel(3);
3923 
3924   ASSERT_EQ("0,0,0,2", FilesPerLevel());
3925   ASSERT_EQ(0, periodic_compactions);
3926   ASSERT_EQ(0, ttl_compactions);
3927 
3928   // Add some time greater than periodic_compaction_time.
3929   env_->addon_time_.fetch_add(50 * 60 * 60);
3930   ASSERT_OK(Put("a", "1"));
3931   Flush();
3932   dbfull()->TEST_WaitForCompact();
3933   // Files in the bottom level go through periodic compactions.
3934   ASSERT_EQ("1,0,0,2", FilesPerLevel());
3935   ASSERT_EQ(2, periodic_compactions);
3936   ASSERT_EQ(0, ttl_compactions);
3937 
3938   // Add a little more time than ttl
3939   env_->addon_time_.fetch_add(11 * 60 * 60);
3940   ASSERT_OK(Put("b", "1"));
3941   Flush();
3942   dbfull()->TEST_WaitForCompact();
3943   // Notice that the previous file in level 1 falls down to the bottom level
3944   // due to ttl compactions, one level at a time.
3945   // And bottom level files don't get picked up for ttl compactions.
3946   ASSERT_EQ("1,0,0,3", FilesPerLevel());
3947   ASSERT_EQ(2, periodic_compactions);
3948   ASSERT_EQ(3, ttl_compactions);
3949 
3950   // Add some time greater than periodic_compaction_time.
3951   env_->addon_time_.fetch_add(50 * 60 * 60);
3952   ASSERT_OK(Put("c", "1"));
3953   Flush();
3954   dbfull()->TEST_WaitForCompact();
3955   // Previous L0 file falls one level at a time to bottom level due to ttl.
3956   // And all 4 bottom files go through periodic compactions.
3957   ASSERT_EQ("1,0,0,4", FilesPerLevel());
3958   ASSERT_EQ(6, periodic_compactions);
3959   ASSERT_EQ(6, ttl_compactions);
3960 
3961   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3962 }
3963 
TEST_F(DBCompactionTest,LevelPeriodicCompactionWithCompactionFilters)3964 TEST_F(DBCompactionTest, LevelPeriodicCompactionWithCompactionFilters) {
3965   class TestCompactionFilter : public CompactionFilter {
3966     const char* Name() const override { return "TestCompactionFilter"; }
3967   };
3968   class TestCompactionFilterFactory : public CompactionFilterFactory {
3969     const char* Name() const override { return "TestCompactionFilterFactory"; }
3970     std::unique_ptr<CompactionFilter> CreateCompactionFilter(
3971         const CompactionFilter::Context& /*context*/) override {
3972       return std::unique_ptr<CompactionFilter>(new TestCompactionFilter());
3973     }
3974   };
3975 
3976   const int kNumKeysPerFile = 32;
3977   const int kNumLevelFiles = 2;
3978   const int kValueSize = 100;
3979 
3980   Random rnd(301);
3981 
3982   Options options = CurrentOptions();
3983   TestCompactionFilter test_compaction_filter;
3984   env_->time_elapse_only_sleep_ = false;
3985   options.env = env_;
3986   env_->addon_time_.store(0);
3987 
3988   enum CompactionFilterType {
3989     kUseCompactionFilter,
3990     kUseCompactionFilterFactory
3991   };
3992 
3993   for (CompactionFilterType comp_filter_type :
3994        {kUseCompactionFilter, kUseCompactionFilterFactory}) {
3995     // Assert that periodic compactions are not enabled.
3996     ASSERT_EQ(port::kMaxUint64 - 1, options.periodic_compaction_seconds);
3997 
3998     if (comp_filter_type == kUseCompactionFilter) {
3999       options.compaction_filter = &test_compaction_filter;
4000       options.compaction_filter_factory.reset();
4001     } else if (comp_filter_type == kUseCompactionFilterFactory) {
4002       options.compaction_filter = nullptr;
4003       options.compaction_filter_factory.reset(
4004           new TestCompactionFilterFactory());
4005     }
4006     DestroyAndReopen(options);
4007 
4008     // periodic_compaction_seconds should be set to the sanitized value when
4009     // a compaction filter or a compaction filter factory is used.
4010     ASSERT_EQ(30 * 24 * 60 * 60,
4011               dbfull()->GetOptions().periodic_compaction_seconds);
4012 
4013     int periodic_compactions = 0;
4014     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
4015         "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
4016           Compaction* compaction = reinterpret_cast<Compaction*>(arg);
4017           auto compaction_reason = compaction->compaction_reason();
4018           if (compaction_reason == CompactionReason::kPeriodicCompaction) {
4019             periodic_compactions++;
4020           }
4021         });
4022     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
4023 
4024     for (int i = 0; i < kNumLevelFiles; ++i) {
4025       for (int j = 0; j < kNumKeysPerFile; ++j) {
4026         ASSERT_OK(
4027             Put(Key(i * kNumKeysPerFile + j), RandomString(&rnd, kValueSize)));
4028       }
4029       Flush();
4030     }
4031     dbfull()->TEST_WaitForCompact();
4032 
4033     ASSERT_EQ("2", FilesPerLevel());
4034     ASSERT_EQ(0, periodic_compactions);
4035 
4036     // Add 31 days and do a write
4037     env_->addon_time_.fetch_add(31 * 24 * 60 * 60);
4038     ASSERT_OK(Put("a", "1"));
4039     Flush();
4040     dbfull()->TEST_WaitForCompact();
4041     // Assert that the files stay in the same level
4042     ASSERT_EQ("3", FilesPerLevel());
4043     // The two old files go through the periodic compaction process
4044     ASSERT_EQ(2, periodic_compactions);
4045 
4046     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
4047   }
4048 }
4049 
TEST_F(DBCompactionTest,CompactRangeDelayedByL0FileCount)4050 TEST_F(DBCompactionTest, CompactRangeDelayedByL0FileCount) {
4051   // Verify that, when `CompactRangeOptions::allow_write_stall == false`, manual
4052   // compaction only triggers flush after it's sure stall won't be triggered for
4053   // L0 file count going too high.
4054   const int kNumL0FilesTrigger = 4;
4055   const int kNumL0FilesLimit = 8;
4056   // i == 0: verifies normal case where stall is avoided by delay
4057   // i == 1: verifies no delay in edge case where stall trigger is same as
4058   //         compaction trigger, so stall can't be avoided
4059   for (int i = 0; i < 2; ++i) {
4060     Options options = CurrentOptions();
4061     options.level0_slowdown_writes_trigger = kNumL0FilesLimit;
4062     if (i == 0) {
4063       options.level0_file_num_compaction_trigger = kNumL0FilesTrigger;
4064     } else {
4065       options.level0_file_num_compaction_trigger = kNumL0FilesLimit;
4066     }
4067     Reopen(options);
4068 
4069     if (i == 0) {
4070       // ensure the auto compaction doesn't finish until manual compaction has
4071       // had a chance to be delayed.
4072       ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
4073           {{"DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait",
4074             "CompactionJob::Run():End"}});
4075     } else {
4076       // ensure the auto-compaction doesn't finish until manual compaction has
4077       // continued without delay.
4078       ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
4079           {{"DBImpl::FlushMemTable:StallWaitDone",
4080             "CompactionJob::Run():End"}});
4081     }
4082     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
4083 
4084     Random rnd(301);
4085     for (int j = 0; j < kNumL0FilesLimit - 1; ++j) {
4086       for (int k = 0; k < 2; ++k) {
4087         ASSERT_OK(Put(Key(k), RandomString(&rnd, 1024)));
4088       }
4089       Flush();
4090     }
4091     auto manual_compaction_thread = port::Thread([this]() {
4092       CompactRangeOptions cro;
4093       cro.allow_write_stall = false;
4094       db_->CompactRange(cro, nullptr, nullptr);
4095     });
4096 
4097     manual_compaction_thread.join();
4098     dbfull()->TEST_WaitForCompact();
4099     ASSERT_EQ(0, NumTableFilesAtLevel(0));
4100     ASSERT_GT(NumTableFilesAtLevel(1), 0);
4101     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
4102   }
4103 }
4104 
TEST_F(DBCompactionTest,CompactRangeDelayedByImmMemTableCount)4105 TEST_F(DBCompactionTest, CompactRangeDelayedByImmMemTableCount) {
4106   // Verify that, when `CompactRangeOptions::allow_write_stall == false`, manual
4107   // compaction only triggers flush after it's sure stall won't be triggered for
4108   // immutable memtable count going too high.
4109   const int kNumImmMemTableLimit = 8;
4110   // i == 0: verifies normal case where stall is avoided by delay
4111   // i == 1: verifies no delay in edge case where stall trigger is same as flush
4112   //         trigger, so stall can't be avoided
4113   for (int i = 0; i < 2; ++i) {
4114     Options options = CurrentOptions();
4115     options.disable_auto_compactions = true;
4116     // the delay limit is one less than the stop limit. This test focuses on
4117     // avoiding delay limit, but this option sets stop limit, so add one.
4118     options.max_write_buffer_number = kNumImmMemTableLimit + 1;
4119     if (i == 1) {
4120       options.min_write_buffer_number_to_merge = kNumImmMemTableLimit;
4121     }
4122     Reopen(options);
4123 
4124     if (i == 0) {
4125       // ensure the flush doesn't finish until manual compaction has had a
4126       // chance to be delayed.
4127       ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
4128           {{"DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait",
4129             "FlushJob::WriteLevel0Table"}});
4130     } else {
4131       // ensure the flush doesn't finish until manual compaction has continued
4132       // without delay.
4133       ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
4134           {{"DBImpl::FlushMemTable:StallWaitDone",
4135             "FlushJob::WriteLevel0Table"}});
4136     }
4137     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
4138 
4139     Random rnd(301);
4140     for (int j = 0; j < kNumImmMemTableLimit - 1; ++j) {
4141       ASSERT_OK(Put(Key(0), RandomString(&rnd, 1024)));
4142       FlushOptions flush_opts;
4143       flush_opts.wait = false;
4144       flush_opts.allow_write_stall = true;
4145       dbfull()->Flush(flush_opts);
4146     }
4147 
4148     auto manual_compaction_thread = port::Thread([this]() {
4149       CompactRangeOptions cro;
4150       cro.allow_write_stall = false;
4151       db_->CompactRange(cro, nullptr, nullptr);
4152     });
4153 
4154     manual_compaction_thread.join();
4155     dbfull()->TEST_WaitForFlushMemTable();
4156     ASSERT_EQ(0, NumTableFilesAtLevel(0));
4157     ASSERT_GT(NumTableFilesAtLevel(1), 0);
4158     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
4159   }
4160 }
4161 
TEST_F(DBCompactionTest,CompactRangeShutdownWhileDelayed)4162 TEST_F(DBCompactionTest, CompactRangeShutdownWhileDelayed) {
4163   // Verify that, when `CompactRangeOptions::allow_write_stall == false`, delay
4164   // does not hang if CF is dropped or DB is closed
4165   const int kNumL0FilesTrigger = 4;
4166   const int kNumL0FilesLimit = 8;
4167   Options options = CurrentOptions();
4168   options.level0_file_num_compaction_trigger = kNumL0FilesTrigger;
4169   options.level0_slowdown_writes_trigger = kNumL0FilesLimit;
4170   // i == 0: DB::DropColumnFamily() on CompactRange's target CF unblocks it
4171   // i == 1: DB::CancelAllBackgroundWork() unblocks CompactRange. This is to
4172   //         simulate what happens during Close as we can't call Close (it
4173   //         blocks on the auto-compaction, making a cycle).
4174   for (int i = 0; i < 2; ++i) {
4175     CreateAndReopenWithCF({"one"}, options);
4176     // The calls to close CF/DB wait until the manual compaction stalls.
4177     // The auto-compaction waits until the manual compaction finishes to ensure
4178     // the signal comes from closing CF/DB, not from compaction making progress.
4179     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
4180         {{"DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait",
4181           "DBCompactionTest::CompactRangeShutdownWhileDelayed:PreShutdown"},
4182          {"DBCompactionTest::CompactRangeShutdownWhileDelayed:PostManual",
4183           "CompactionJob::Run():End"}});
4184     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
4185 
4186     Random rnd(301);
4187     for (int j = 0; j < kNumL0FilesLimit - 1; ++j) {
4188       for (int k = 0; k < 2; ++k) {
4189         ASSERT_OK(Put(1, Key(k), RandomString(&rnd, 1024)));
4190       }
4191       Flush(1);
4192     }
4193     auto manual_compaction_thread = port::Thread([this, i]() {
4194       CompactRangeOptions cro;
4195       cro.allow_write_stall = false;
4196       Status s = db_->CompactRange(cro, handles_[1], nullptr, nullptr);
4197       if (i == 0) {
4198         ASSERT_TRUE(db_->CompactRange(cro, handles_[1], nullptr, nullptr)
4199                         .IsColumnFamilyDropped());
4200       } else {
4201         ASSERT_TRUE(db_->CompactRange(cro, handles_[1], nullptr, nullptr)
4202                         .IsShutdownInProgress());
4203       }
4204     });
4205 
4206     TEST_SYNC_POINT(
4207         "DBCompactionTest::CompactRangeShutdownWhileDelayed:PreShutdown");
4208     if (i == 0) {
4209       ASSERT_OK(db_->DropColumnFamily(handles_[1]));
4210     } else {
4211       dbfull()->CancelAllBackgroundWork(false /* wait */);
4212     }
4213     manual_compaction_thread.join();
4214     TEST_SYNC_POINT(
4215         "DBCompactionTest::CompactRangeShutdownWhileDelayed:PostManual");
4216     dbfull()->TEST_WaitForCompact();
4217     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
4218   }
4219 }
4220 
TEST_F(DBCompactionTest,CompactRangeSkipFlushAfterDelay)4221 TEST_F(DBCompactionTest, CompactRangeSkipFlushAfterDelay) {
4222   // Verify that, when `CompactRangeOptions::allow_write_stall == false`,
4223   // CompactRange skips its flush if the delay is long enough that the memtables
4224   // existing at the beginning of the call have already been flushed.
4225   const int kNumL0FilesTrigger = 4;
4226   const int kNumL0FilesLimit = 8;
4227   Options options = CurrentOptions();
4228   options.level0_slowdown_writes_trigger = kNumL0FilesLimit;
4229   options.level0_file_num_compaction_trigger = kNumL0FilesTrigger;
4230   Reopen(options);
4231 
4232   Random rnd(301);
4233   // The manual flush includes the memtable that was active when CompactRange
4234   // began. So it unblocks CompactRange and precludes its flush. Throughout the
4235   // test, stall conditions are upheld via high L0 file count.
4236   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
4237       {{"DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait",
4238         "DBCompactionTest::CompactRangeSkipFlushAfterDelay:PreFlush"},
4239        {"DBCompactionTest::CompactRangeSkipFlushAfterDelay:PostFlush",
4240         "DBImpl::FlushMemTable:StallWaitDone"},
4241        {"DBImpl::FlushMemTable:StallWaitDone", "CompactionJob::Run():End"}});
4242   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
4243 
4244   //used for the delayable flushes
4245   FlushOptions flush_opts;
4246   flush_opts.allow_write_stall = true;
4247   for (int i = 0; i < kNumL0FilesLimit - 1; ++i) {
4248     for (int j = 0; j < 2; ++j) {
4249       ASSERT_OK(Put(Key(j), RandomString(&rnd, 1024)));
4250     }
4251     dbfull()->Flush(flush_opts);
4252   }
4253   auto manual_compaction_thread = port::Thread([this]() {
4254     CompactRangeOptions cro;
4255     cro.allow_write_stall = false;
4256     db_->CompactRange(cro, nullptr, nullptr);
4257   });
4258 
4259   TEST_SYNC_POINT("DBCompactionTest::CompactRangeSkipFlushAfterDelay:PreFlush");
4260   Put(ToString(0), RandomString(&rnd, 1024));
4261   dbfull()->Flush(flush_opts);
4262   Put(ToString(0), RandomString(&rnd, 1024));
4263   TEST_SYNC_POINT("DBCompactionTest::CompactRangeSkipFlushAfterDelay:PostFlush");
4264   manual_compaction_thread.join();
4265 
4266   // If CompactRange's flush was skipped, the final Put above will still be
4267   // in the active memtable.
4268   std::string num_keys_in_memtable;
4269   db_->GetProperty(DB::Properties::kNumEntriesActiveMemTable, &num_keys_in_memtable);
4270   ASSERT_EQ(ToString(1), num_keys_in_memtable);
4271 
4272   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
4273 }
4274 
TEST_F(DBCompactionTest,CompactRangeFlushOverlappingMemtable)4275 TEST_F(DBCompactionTest, CompactRangeFlushOverlappingMemtable) {
4276   // Verify memtable only gets flushed if it contains data overlapping the range
4277   // provided to `CompactRange`. Tests all kinds of overlap/non-overlap.
4278   const int kNumEndpointKeys = 5;
4279   std::string keys[kNumEndpointKeys] = {"a", "b", "c", "d", "e"};
4280   Options options = CurrentOptions();
4281   options.disable_auto_compactions = true;
4282   Reopen(options);
4283 
4284   // One extra iteration for nullptr, which means left side of interval is
4285   // unbounded.
4286   for (int i = 0; i <= kNumEndpointKeys; ++i) {
4287     Slice begin;
4288     Slice* begin_ptr;
4289     if (i == 0) {
4290       begin_ptr = nullptr;
4291     } else {
4292       begin = keys[i - 1];
4293       begin_ptr = &begin;
4294     }
4295     // Start at `i` so right endpoint comes after left endpoint. One extra
4296     // iteration for nullptr, which means right side of interval is unbounded.
4297     for (int j = std::max(0, i - 1); j <= kNumEndpointKeys; ++j) {
4298       Slice end;
4299       Slice* end_ptr;
4300       if (j == kNumEndpointKeys) {
4301         end_ptr = nullptr;
4302       } else {
4303         end = keys[j];
4304         end_ptr = &end;
4305       }
4306       ASSERT_OK(Put("b", "val"));
4307       ASSERT_OK(Put("d", "val"));
4308       CompactRangeOptions compact_range_opts;
4309       ASSERT_OK(db_->CompactRange(compact_range_opts, begin_ptr, end_ptr));
4310 
4311       uint64_t get_prop_tmp, num_memtable_entries = 0;
4312       ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kNumEntriesImmMemTables,
4313                                       &get_prop_tmp));
4314       num_memtable_entries += get_prop_tmp;
4315       ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kNumEntriesActiveMemTable,
4316                                       &get_prop_tmp));
4317       num_memtable_entries += get_prop_tmp;
4318       if (begin_ptr == nullptr || end_ptr == nullptr ||
4319           (i <= 4 && j >= 1 && (begin != "c" || end != "c"))) {
4320         // In this case `CompactRange`'s range overlapped in some way with the
4321         // memtable's range, so flush should've happened. Then "b" and "d" won't
4322         // be in the memtable.
4323         ASSERT_EQ(0, num_memtable_entries);
4324       } else {
4325         ASSERT_EQ(2, num_memtable_entries);
4326         // flush anyways to prepare for next iteration
4327         db_->Flush(FlushOptions());
4328       }
4329     }
4330   }
4331 }
4332 
TEST_F(DBCompactionTest,CompactionStatsTest)4333 TEST_F(DBCompactionTest, CompactionStatsTest) {
4334   Options options = CurrentOptions();
4335   options.level0_file_num_compaction_trigger = 2;
4336   CompactionStatsCollector* collector = new CompactionStatsCollector();
4337   options.listeners.emplace_back(collector);
4338   DestroyAndReopen(options);
4339 
4340   for (int i = 0; i < 32; i++) {
4341     for (int j = 0; j < 5000; j++) {
4342       Put(std::to_string(j), std::string(1, 'A'));
4343     }
4344     ASSERT_OK(Flush());
4345     ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
4346   }
4347   dbfull()->TEST_WaitForCompact();
4348   ColumnFamilyHandleImpl* cfh =
4349       static_cast<ColumnFamilyHandleImpl*>(dbfull()->DefaultColumnFamily());
4350   ColumnFamilyData* cfd = cfh->cfd();
4351 
4352   VerifyCompactionStats(*cfd, *collector);
4353 }
4354 
TEST_F(DBCompactionTest,CompactFilesOutputRangeConflict)4355 TEST_F(DBCompactionTest, CompactFilesOutputRangeConflict) {
4356   // LSM setup:
4357   // L1:      [ba bz]
4358   // L2: [a b]       [c d]
4359   // L3: [a b]       [c d]
4360   //
4361   // Thread 1:                        Thread 2:
4362   // Begin compacting all L2->L3
4363   //                                  Compact [ba bz] L1->L3
4364   // End compacting all L2->L3
4365   //
4366   // The compaction operation in thread 2 should be disallowed because the range
4367   // overlaps with the compaction in thread 1, which also covers that range in
4368   // L3.
4369   Options options = CurrentOptions();
4370   FlushedFileCollector* collector = new FlushedFileCollector();
4371   options.listeners.emplace_back(collector);
4372   Reopen(options);
4373 
4374   for (int level = 3; level >= 2; --level) {
4375     ASSERT_OK(Put("a", "val"));
4376     ASSERT_OK(Put("b", "val"));
4377     ASSERT_OK(Flush());
4378     ASSERT_OK(Put("c", "val"));
4379     ASSERT_OK(Put("d", "val"));
4380     ASSERT_OK(Flush());
4381     MoveFilesToLevel(level);
4382   }
4383   ASSERT_OK(Put("ba", "val"));
4384   ASSERT_OK(Put("bz", "val"));
4385   ASSERT_OK(Flush());
4386   MoveFilesToLevel(1);
4387 
4388   SyncPoint::GetInstance()->LoadDependency({
4389       {"CompactFilesImpl:0",
4390        "DBCompactionTest::CompactFilesOutputRangeConflict:Thread2Begin"},
4391       {"DBCompactionTest::CompactFilesOutputRangeConflict:Thread2End",
4392        "CompactFilesImpl:1"},
4393   });
4394   SyncPoint::GetInstance()->EnableProcessing();
4395 
4396   auto bg_thread = port::Thread([&]() {
4397     // Thread 1
4398     std::vector<std::string> filenames = collector->GetFlushedFiles();
4399     filenames.pop_back();
4400     ASSERT_OK(db_->CompactFiles(CompactionOptions(), filenames,
4401                                 3 /* output_level */));
4402   });
4403 
4404   // Thread 2
4405   TEST_SYNC_POINT(
4406       "DBCompactionTest::CompactFilesOutputRangeConflict:Thread2Begin");
4407   std::string filename = collector->GetFlushedFiles().back();
4408   ASSERT_FALSE(
4409       db_->CompactFiles(CompactionOptions(), {filename}, 3 /* output_level */)
4410           .ok());
4411   TEST_SYNC_POINT(
4412       "DBCompactionTest::CompactFilesOutputRangeConflict:Thread2End");
4413 
4414   bg_thread.join();
4415 }
4416 
TEST_F(DBCompactionTest,CompactionHasEmptyOutput)4417 TEST_F(DBCompactionTest, CompactionHasEmptyOutput) {
4418   Options options = CurrentOptions();
4419   SstStatsCollector* collector = new SstStatsCollector();
4420   options.level0_file_num_compaction_trigger = 2;
4421   options.listeners.emplace_back(collector);
4422   Reopen(options);
4423 
4424   // Make sure the L0 files overlap to prevent trivial move.
4425   ASSERT_OK(Put("a", "val"));
4426   ASSERT_OK(Put("b", "val"));
4427   ASSERT_OK(Flush());
4428   ASSERT_OK(Delete("a"));
4429   ASSERT_OK(Delete("b"));
4430   ASSERT_OK(Flush());
4431 
4432   dbfull()->TEST_WaitForCompact();
4433   ASSERT_EQ(NumTableFilesAtLevel(0), 0);
4434   ASSERT_EQ(NumTableFilesAtLevel(1), 0);
4435 
4436   // Expect one file creation to start for each flush, and zero for compaction
4437   // since no keys are written.
4438   ASSERT_EQ(2, collector->num_ssts_creation_started());
4439 }
4440 
TEST_F(DBCompactionTest,CompactionLimiter)4441 TEST_F(DBCompactionTest, CompactionLimiter) {
4442   const int kNumKeysPerFile = 10;
4443   const int kMaxBackgroundThreads = 64;
4444 
4445   struct CompactionLimiter {
4446     std::string name;
4447     int limit_tasks;
4448     int max_tasks;
4449     int tasks;
4450     std::shared_ptr<ConcurrentTaskLimiter> limiter;
4451   };
4452 
4453   std::vector<CompactionLimiter> limiter_settings;
4454   limiter_settings.push_back({"limiter_1", 1, 0, 0, nullptr});
4455   limiter_settings.push_back({"limiter_2", 2, 0, 0, nullptr});
4456   limiter_settings.push_back({"limiter_3", 3, 0, 0, nullptr});
4457 
4458   for (auto& ls : limiter_settings) {
4459     ls.limiter.reset(NewConcurrentTaskLimiter(ls.name, ls.limit_tasks));
4460   }
4461 
4462   std::shared_ptr<ConcurrentTaskLimiter> unique_limiter(
4463     NewConcurrentTaskLimiter("unique_limiter", -1));
4464 
4465   const char* cf_names[] = {"default", "0", "1", "2", "3", "4", "5",
4466     "6", "7", "8", "9", "a", "b", "c", "d", "e", "f" };
4467   const unsigned int cf_count = sizeof cf_names / sizeof cf_names[0];
4468 
4469   std::unordered_map<std::string, CompactionLimiter*> cf_to_limiter;
4470 
4471   Options options = CurrentOptions();
4472   options.write_buffer_size = 110 * 1024;  // 110KB
4473   options.arena_block_size = 4096;
4474   options.num_levels = 3;
4475   options.level0_file_num_compaction_trigger = 4;
4476   options.level0_slowdown_writes_trigger = 64;
4477   options.level0_stop_writes_trigger = 64;
4478   options.max_background_jobs = kMaxBackgroundThreads; // Enough threads
4479   options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
4480   options.max_write_buffer_number = 10; // Enough memtables
4481   DestroyAndReopen(options);
4482 
4483   std::vector<Options> option_vector;
4484   option_vector.reserve(cf_count);
4485 
4486   for (unsigned int cf = 0; cf < cf_count; cf++) {
4487     ColumnFamilyOptions cf_opt(options);
4488     if (cf == 0) {
4489       // "Default" CF does't use compaction limiter
4490       cf_opt.compaction_thread_limiter = nullptr;
4491     } else if (cf == 1) {
4492       // "1" CF uses bypass compaction limiter
4493       unique_limiter->SetMaxOutstandingTask(-1);
4494       cf_opt.compaction_thread_limiter = unique_limiter;
4495     } else {
4496       // Assign limiter by mod
4497       auto& ls = limiter_settings[cf % 3];
4498       cf_opt.compaction_thread_limiter = ls.limiter;
4499       cf_to_limiter[cf_names[cf]] = &ls;
4500     }
4501     option_vector.emplace_back(DBOptions(options), cf_opt);
4502   }
4503 
4504   for (unsigned int cf = 1; cf < cf_count; cf++) {
4505     CreateColumnFamilies({cf_names[cf]}, option_vector[cf]);
4506   }
4507 
4508   ReopenWithColumnFamilies(std::vector<std::string>(cf_names,
4509                                                     cf_names + cf_count),
4510                            option_vector);
4511 
4512   port::Mutex mutex;
4513 
4514   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
4515       "DBImpl::BackgroundCompaction:BeforeCompaction", [&](void* arg) {
4516         const auto& cf_name = static_cast<ColumnFamilyData*>(arg)->GetName();
4517         auto iter = cf_to_limiter.find(cf_name);
4518         if (iter != cf_to_limiter.end()) {
4519           MutexLock l(&mutex);
4520           ASSERT_GE(iter->second->limit_tasks, ++iter->second->tasks);
4521           iter->second->max_tasks =
4522               std::max(iter->second->max_tasks, iter->second->limit_tasks);
4523         }
4524       });
4525 
4526   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
4527       "DBImpl::BackgroundCompaction:AfterCompaction", [&](void* arg) {
4528         const auto& cf_name = static_cast<ColumnFamilyData*>(arg)->GetName();
4529         auto iter = cf_to_limiter.find(cf_name);
4530         if (iter != cf_to_limiter.end()) {
4531           MutexLock l(&mutex);
4532           ASSERT_GE(--iter->second->tasks, 0);
4533         }
4534       });
4535 
4536   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
4537 
4538   // Block all compact threads in thread pool.
4539   const size_t kTotalFlushTasks = kMaxBackgroundThreads / 4;
4540   const size_t kTotalCompactTasks = kMaxBackgroundThreads - kTotalFlushTasks;
4541   env_->SetBackgroundThreads((int)kTotalFlushTasks, Env::HIGH);
4542   env_->SetBackgroundThreads((int)kTotalCompactTasks, Env::LOW);
4543 
4544   test::SleepingBackgroundTask sleeping_compact_tasks[kTotalCompactTasks];
4545 
4546   // Block all compaction threads in thread pool.
4547   for (size_t i = 0; i < kTotalCompactTasks; i++) {
4548     env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
4549                    &sleeping_compact_tasks[i], Env::LOW);
4550     sleeping_compact_tasks[i].WaitUntilSleeping();
4551   }
4552 
4553   int keyIndex = 0;
4554 
4555   for (int n = 0; n < options.level0_file_num_compaction_trigger; n++) {
4556     for (unsigned int cf = 0; cf < cf_count; cf++) {
4557       for (int i = 0; i < kNumKeysPerFile; i++) {
4558         ASSERT_OK(Put(cf, Key(keyIndex++), ""));
4559       }
4560       // put extra key to trigger flush
4561       ASSERT_OK(Put(cf, "", ""));
4562     }
4563 
4564     for (unsigned int cf = 0; cf < cf_count; cf++) {
4565       dbfull()->TEST_WaitForFlushMemTable(handles_[cf]);
4566     }
4567   }
4568 
4569   // Enough L0 files to trigger compaction
4570   for (unsigned int cf = 0; cf < cf_count; cf++) {
4571     ASSERT_EQ(NumTableFilesAtLevel(0, cf),
4572       options.level0_file_num_compaction_trigger);
4573   }
4574 
4575   // Create more files for one column family, which triggers speed up
4576   // condition, all compactions will be scheduled.
4577   for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) {
4578     for (int i = 0; i < kNumKeysPerFile; i++) {
4579       ASSERT_OK(Put(0, Key(i), ""));
4580     }
4581     // put extra key to trigger flush
4582     ASSERT_OK(Put(0, "", ""));
4583     dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
4584     ASSERT_EQ(options.level0_file_num_compaction_trigger + num + 1,
4585               NumTableFilesAtLevel(0, 0));
4586   }
4587 
4588   // All CFs are pending compaction
4589   ASSERT_EQ(cf_count, env_->GetThreadPoolQueueLen(Env::LOW));
4590 
4591   // Unblock all compaction threads
4592   for (size_t i = 0; i < kTotalCompactTasks; i++) {
4593     sleeping_compact_tasks[i].WakeUp();
4594     sleeping_compact_tasks[i].WaitUntilDone();
4595   }
4596 
4597   for (unsigned int cf = 0; cf < cf_count; cf++) {
4598     dbfull()->TEST_WaitForFlushMemTable(handles_[cf]);
4599   }
4600 
4601   ASSERT_OK(dbfull()->TEST_WaitForCompact());
4602 
4603   // Max outstanding compact tasks reached limit
4604   for (auto& ls : limiter_settings) {
4605     ASSERT_EQ(ls.limit_tasks, ls.max_tasks);
4606     ASSERT_EQ(0, ls.limiter->GetOutstandingTask());
4607   }
4608 
4609   // test manual compaction under a fully throttled limiter
4610   int cf_test = 1;
4611   unique_limiter->SetMaxOutstandingTask(0);
4612 
4613   // flush one more file to cf 1
4614   for (int i = 0; i < kNumKeysPerFile; i++) {
4615       ASSERT_OK(Put(cf_test, Key(keyIndex++), ""));
4616   }
4617   // put extra key to trigger flush
4618   ASSERT_OK(Put(cf_test, "", ""));
4619 
4620   dbfull()->TEST_WaitForFlushMemTable(handles_[cf_test]);
4621   ASSERT_EQ(1, NumTableFilesAtLevel(0, cf_test));
4622 
4623   Compact(cf_test, Key(0), Key(keyIndex));
4624   ASSERT_OK(dbfull()->TEST_WaitForCompact());
4625 }
4626 
4627 INSTANTIATE_TEST_CASE_P(DBCompactionTestWithParam, DBCompactionTestWithParam,
4628                         ::testing::Values(std::make_tuple(1, true),
4629                                           std::make_tuple(1, false),
4630                                           std::make_tuple(4, true),
4631                                           std::make_tuple(4, false)));
4632 
TEST_P(DBCompactionDirectIOTest,DirectIO)4633 TEST_P(DBCompactionDirectIOTest, DirectIO) {
4634   Options options = CurrentOptions();
4635   Destroy(options);
4636   options.create_if_missing = true;
4637   options.disable_auto_compactions = true;
4638   options.use_direct_io_for_flush_and_compaction = GetParam();
4639   options.env = new MockEnv(Env::Default());
4640   Reopen(options);
4641   bool readahead = false;
4642   SyncPoint::GetInstance()->SetCallBack(
4643       "CompactionJob::OpenCompactionOutputFile", [&](void* arg) {
4644         bool* use_direct_writes = static_cast<bool*>(arg);
4645         ASSERT_EQ(*use_direct_writes,
4646                   options.use_direct_io_for_flush_and_compaction);
4647       });
4648   if (options.use_direct_io_for_flush_and_compaction) {
4649     SyncPoint::GetInstance()->SetCallBack(
4650         "SanitizeOptions:direct_io", [&](void* /*arg*/) {
4651           readahead = true;
4652         });
4653   }
4654   SyncPoint::GetInstance()->EnableProcessing();
4655   CreateAndReopenWithCF({"pikachu"}, options);
4656   MakeTables(3, "p", "q", 1);
4657   ASSERT_EQ("1,1,1", FilesPerLevel(1));
4658   Compact(1, "p1", "p9");
4659   ASSERT_EQ(readahead, options.use_direct_reads);
4660   ASSERT_EQ("0,0,1", FilesPerLevel(1));
4661   Destroy(options);
4662   delete options.env;
4663 }
4664 
4665 INSTANTIATE_TEST_CASE_P(DBCompactionDirectIOTest, DBCompactionDirectIOTest,
4666                         testing::Bool());
4667 
4668 class CompactionPriTest : public DBTestBase,
4669                           public testing::WithParamInterface<uint32_t> {
4670  public:
CompactionPriTest()4671   CompactionPriTest() : DBTestBase("/compaction_pri_test") {
4672     compaction_pri_ = GetParam();
4673   }
4674 
4675   // Required if inheriting from testing::WithParamInterface<>
SetUpTestCase()4676   static void SetUpTestCase() {}
TearDownTestCase()4677   static void TearDownTestCase() {}
4678 
4679   uint32_t compaction_pri_;
4680 };
4681 
TEST_P(CompactionPriTest,Test)4682 TEST_P(CompactionPriTest, Test) {
4683   Options options = CurrentOptions();
4684   options.write_buffer_size = 16 * 1024;
4685   options.compaction_pri = static_cast<CompactionPri>(compaction_pri_);
4686   options.hard_pending_compaction_bytes_limit = 256 * 1024;
4687   options.max_bytes_for_level_base = 64 * 1024;
4688   options.max_bytes_for_level_multiplier = 4;
4689   options.compression = kNoCompression;
4690 
4691   DestroyAndReopen(options);
4692 
4693   Random rnd(301);
4694   const int kNKeys = 5000;
4695   int keys[kNKeys];
4696   for (int i = 0; i < kNKeys; i++) {
4697     keys[i] = i;
4698   }
4699   std::random_shuffle(std::begin(keys), std::end(keys));
4700 
4701   for (int i = 0; i < kNKeys; i++) {
4702     ASSERT_OK(Put(Key(keys[i]), RandomString(&rnd, 102)));
4703   }
4704 
4705   dbfull()->TEST_WaitForCompact();
4706   for (int i = 0; i < kNKeys; i++) {
4707     ASSERT_NE("NOT_FOUND", Get(Key(i)));
4708   }
4709 }
4710 
4711 INSTANTIATE_TEST_CASE_P(
4712     CompactionPriTest, CompactionPriTest,
4713     ::testing::Values(CompactionPri::kByCompensatedSize,
4714                       CompactionPri::kOldestLargestSeqFirst,
4715                       CompactionPri::kOldestSmallestSeqFirst,
4716                       CompactionPri::kMinOverlappingRatio));
4717 
4718 class NoopMergeOperator : public MergeOperator {
4719  public:
NoopMergeOperator()4720   NoopMergeOperator() {}
4721 
FullMergeV2(const MergeOperationInput &,MergeOperationOutput * merge_out) const4722   bool FullMergeV2(const MergeOperationInput& /*merge_in*/,
4723                    MergeOperationOutput* merge_out) const override {
4724     std::string val("bar");
4725     merge_out->new_value = val;
4726     return true;
4727   }
4728 
Name() const4729   const char* Name() const override { return "Noop"; }
4730 };
4731 
TEST_F(DBCompactionTest,PartialManualCompaction)4732 TEST_F(DBCompactionTest, PartialManualCompaction) {
4733   Options opts = CurrentOptions();
4734   opts.num_levels = 3;
4735   opts.level0_file_num_compaction_trigger = 10;
4736   opts.compression = kNoCompression;
4737   opts.merge_operator.reset(new NoopMergeOperator());
4738   opts.target_file_size_base = 10240;
4739   DestroyAndReopen(opts);
4740 
4741   Random rnd(301);
4742   for (auto i = 0; i < 8; ++i) {
4743     for (auto j = 0; j < 10; ++j) {
4744       Merge("foo", RandomString(&rnd, 1024));
4745     }
4746     Flush();
4747   }
4748 
4749   MoveFilesToLevel(2);
4750 
4751   std::string prop;
4752   EXPECT_TRUE(dbfull()->GetProperty(DB::Properties::kLiveSstFilesSize, &prop));
4753   uint64_t max_compaction_bytes = atoi(prop.c_str()) / 2;
4754   ASSERT_OK(dbfull()->SetOptions(
4755       {{"max_compaction_bytes", std::to_string(max_compaction_bytes)}}));
4756 
4757   CompactRangeOptions cro;
4758   cro.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized;
4759   dbfull()->CompactRange(cro, nullptr, nullptr);
4760 }
4761 
TEST_F(DBCompactionTest,ManualCompactionFailsInReadOnlyMode)4762 TEST_F(DBCompactionTest, ManualCompactionFailsInReadOnlyMode) {
4763   // Regression test for bug where manual compaction hangs forever when the DB
4764   // is in read-only mode. Verify it now at least returns, despite failing.
4765   const int kNumL0Files = 4;
4766   std::unique_ptr<FaultInjectionTestEnv> mock_env(
4767       new FaultInjectionTestEnv(Env::Default()));
4768   Options opts = CurrentOptions();
4769   opts.disable_auto_compactions = true;
4770   opts.env = mock_env.get();
4771   DestroyAndReopen(opts);
4772 
4773   Random rnd(301);
4774   for (int i = 0; i < kNumL0Files; ++i) {
4775     // Make sure files are overlapping in key-range to prevent trivial move.
4776     Put("key1", RandomString(&rnd, 1024));
4777     Put("key2", RandomString(&rnd, 1024));
4778     Flush();
4779   }
4780   ASSERT_EQ(kNumL0Files, NumTableFilesAtLevel(0));
4781 
4782   // Enter read-only mode by failing a write.
4783   mock_env->SetFilesystemActive(false);
4784   // Make sure this is outside `CompactRange`'s range so that it doesn't fail
4785   // early trying to flush memtable.
4786   ASSERT_NOK(Put("key3", RandomString(&rnd, 1024)));
4787 
4788   // In the bug scenario, the first manual compaction would fail and forget to
4789   // unregister itself, causing the second one to hang forever due to conflict
4790   // with a non-running compaction.
4791   CompactRangeOptions cro;
4792   cro.exclusive_manual_compaction = false;
4793   Slice begin_key("key1");
4794   Slice end_key("key2");
4795   ASSERT_NOK(dbfull()->CompactRange(cro, &begin_key, &end_key));
4796   ASSERT_NOK(dbfull()->CompactRange(cro, &begin_key, &end_key));
4797 
4798   // Close before mock_env destruct.
4799   Close();
4800 }
4801 
4802 // ManualCompactionBottomLevelOptimization tests the bottom level manual
4803 // compaction optimization to skip recompacting files created by Ln-1 to Ln
4804 // compaction
TEST_F(DBCompactionTest,ManualCompactionBottomLevelOptimized)4805 TEST_F(DBCompactionTest, ManualCompactionBottomLevelOptimized) {
4806   Options opts = CurrentOptions();
4807   opts.num_levels = 3;
4808   opts.level0_file_num_compaction_trigger = 5;
4809   opts.compression = kNoCompression;
4810   opts.merge_operator.reset(new NoopMergeOperator());
4811   opts.target_file_size_base = 1024;
4812   opts.max_bytes_for_level_multiplier = 2;
4813   opts.disable_auto_compactions = true;
4814   DestroyAndReopen(opts);
4815   ColumnFamilyHandleImpl* cfh =
4816       static_cast<ColumnFamilyHandleImpl*>(dbfull()->DefaultColumnFamily());
4817   ColumnFamilyData* cfd = cfh->cfd();
4818   InternalStats* internal_stats_ptr = cfd->internal_stats();
4819   ASSERT_NE(internal_stats_ptr, nullptr);
4820 
4821   Random rnd(301);
4822   for (auto i = 0; i < 8; ++i) {
4823     for (auto j = 0; j < 10; ++j) {
4824       ASSERT_OK(
4825           Put("foo" + std::to_string(i * 10 + j), RandomString(&rnd, 1024)));
4826     }
4827     Flush();
4828   }
4829 
4830   MoveFilesToLevel(2);
4831 
4832   for (auto i = 0; i < 8; ++i) {
4833     for (auto j = 0; j < 10; ++j) {
4834       ASSERT_OK(
4835           Put("bar" + std::to_string(i * 10 + j), RandomString(&rnd, 1024)));
4836     }
4837     Flush();
4838   }
4839   const std::vector<InternalStats::CompactionStats>& comp_stats =
4840       internal_stats_ptr->TEST_GetCompactionStats();
4841   int num = comp_stats[2].num_input_files_in_output_level;
4842   ASSERT_EQ(num, 0);
4843 
4844   CompactRangeOptions cro;
4845   cro.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized;
4846   dbfull()->CompactRange(cro, nullptr, nullptr);
4847 
4848   const std::vector<InternalStats::CompactionStats>& comp_stats2 =
4849       internal_stats_ptr->TEST_GetCompactionStats();
4850   num = comp_stats2[2].num_input_files_in_output_level;
4851   ASSERT_EQ(num, 0);
4852 }
4853 
TEST_F(DBCompactionTest,CompactionDuringShutdown)4854 TEST_F(DBCompactionTest, CompactionDuringShutdown) {
4855   Options opts = CurrentOptions();
4856   opts.level0_file_num_compaction_trigger = 2;
4857   opts.disable_auto_compactions = true;
4858   DestroyAndReopen(opts);
4859   ColumnFamilyHandleImpl* cfh =
4860       static_cast<ColumnFamilyHandleImpl*>(dbfull()->DefaultColumnFamily());
4861   ColumnFamilyData* cfd = cfh->cfd();
4862   InternalStats* internal_stats_ptr = cfd->internal_stats();
4863   ASSERT_NE(internal_stats_ptr, nullptr);
4864 
4865   Random rnd(301);
4866   for (auto i = 0; i < 2; ++i) {
4867     for (auto j = 0; j < 10; ++j) {
4868       ASSERT_OK(
4869           Put("foo" + std::to_string(i * 10 + j), RandomString(&rnd, 1024)));
4870     }
4871     Flush();
4872   }
4873 
4874   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
4875       "DBImpl::BackgroundCompaction:NonTrivial:BeforeRun",
4876       [&](void* /*arg*/) { dbfull()->shutting_down_.store(true); });
4877   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
4878   dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
4879   ASSERT_OK(dbfull()->error_handler_.GetBGError());
4880 }
4881 
4882 // FixFileIngestionCompactionDeadlock tests and verifies that compaction and
4883 // file ingestion do not cause deadlock in the event of write stall triggered
4884 // by number of L0 files reaching level0_stop_writes_trigger.
TEST_P(DBCompactionTestWithParam,FixFileIngestionCompactionDeadlock)4885 TEST_P(DBCompactionTestWithParam, FixFileIngestionCompactionDeadlock) {
4886   const int kNumKeysPerFile = 100;
4887   // Generate SST files.
4888   Options options = CurrentOptions();
4889 
4890   // Generate an external SST file containing a single key, i.e. 99
4891   std::string sst_files_dir = dbname_ + "/sst_files/";
4892   test::DestroyDir(env_, sst_files_dir);
4893   ASSERT_OK(env_->CreateDir(sst_files_dir));
4894   SstFileWriter sst_writer(EnvOptions(), options);
4895   const std::string sst_file_path = sst_files_dir + "test.sst";
4896   ASSERT_OK(sst_writer.Open(sst_file_path));
4897   ASSERT_OK(sst_writer.Put(Key(kNumKeysPerFile - 1), "value"));
4898   ASSERT_OK(sst_writer.Finish());
4899 
4900   SyncPoint::GetInstance()->DisableProcessing();
4901   SyncPoint::GetInstance()->ClearAllCallBacks();
4902   SyncPoint::GetInstance()->LoadDependency({
4903       {"DBImpl::IngestExternalFile:AfterIncIngestFileCounter",
4904        "BackgroundCallCompaction:0"},
4905   });
4906   SyncPoint::GetInstance()->EnableProcessing();
4907 
4908   options.write_buffer_size = 110 << 10;  // 110KB
4909   options.level0_file_num_compaction_trigger =
4910       options.level0_stop_writes_trigger;
4911   options.max_subcompactions = max_subcompactions_;
4912   options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
4913   DestroyAndReopen(options);
4914   Random rnd(301);
4915 
4916   // Generate level0_stop_writes_trigger L0 files to trigger write stop
4917   for (int i = 0; i != options.level0_file_num_compaction_trigger; ++i) {
4918     for (int j = 0; j != kNumKeysPerFile; ++j) {
4919       ASSERT_OK(Put(Key(j), RandomString(&rnd, 990)));
4920     }
4921     if (0 == i) {
4922       // When we reach here, the memtables have kNumKeysPerFile keys. Note that
4923       // flush is not yet triggered. We need to write an extra key so that the
4924       // write path will call PreprocessWrite and flush the previous key-value
4925       // pairs to e flushed. After that, there will be the newest key in the
4926       // memtable, and a bunch of L0 files. Since there is already one key in
4927       // the memtable, then for i = 1, 2, ..., we do not have to write this
4928       // extra key to trigger flush.
4929       ASSERT_OK(Put("", ""));
4930     }
4931     dbfull()->TEST_WaitForFlushMemTable();
4932     ASSERT_EQ(NumTableFilesAtLevel(0 /*level*/, 0 /*cf*/), i + 1);
4933   }
4934   // When we reach this point, there will be level0_stop_writes_trigger L0
4935   // files and one extra key (99) in memory, which overlaps with the external
4936   // SST file. Write stall triggers, and can be cleared only after compaction
4937   // reduces the number of L0 files.
4938 
4939   // Compaction will also be triggered since we have reached the threshold for
4940   // auto compaction. Note that compaction may begin after the following file
4941   // ingestion thread and waits for ingestion to finish.
4942 
4943   // Thread to ingest file with overlapping key range with the current
4944   // memtable. Consequently ingestion will trigger a flush. The flush MUST
4945   // proceed without waiting for the write stall condition to clear, otherwise
4946   // deadlock can happen.
4947   port::Thread ingestion_thr([&]() {
4948     IngestExternalFileOptions ifo;
4949     Status s = db_->IngestExternalFile({sst_file_path}, ifo);
4950     ASSERT_OK(s);
4951   });
4952 
4953   // More write to trigger write stop
4954   ingestion_thr.join();
4955   ASSERT_OK(dbfull()->TEST_WaitForCompact());
4956   Close();
4957 }
4958 
TEST_F(DBCompactionTest,ConsistencyFailTest)4959 TEST_F(DBCompactionTest, ConsistencyFailTest) {
4960   Options options = CurrentOptions();
4961   DestroyAndReopen(options);
4962 
4963   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
4964       "VersionBuilder::CheckConsistency", [&](void* arg) {
4965         auto p =
4966             reinterpret_cast<std::pair<FileMetaData**, FileMetaData**>*>(arg);
4967         // just swap the two FileMetaData so that we hit error
4968         // in CheckConsistency funcion
4969         FileMetaData* temp = *(p->first);
4970         *(p->first) = *(p->second);
4971         *(p->second) = temp;
4972       });
4973 
4974   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
4975 
4976   for (int k = 0; k < 2; ++k) {
4977     ASSERT_OK(Put("foo", "bar"));
4978     Flush();
4979   }
4980 
4981   ASSERT_NOK(Put("foo", "bar"));
4982   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
4983 }
4984 
IngestOneKeyValue(DBImpl * db,const std::string & key,const std::string & value,const Options & options)4985 void IngestOneKeyValue(DBImpl* db, const std::string& key,
4986                        const std::string& value, const Options& options) {
4987   ExternalSstFileInfo info;
4988   std::string f = test::PerThreadDBPath("sst_file" + key);
4989   EnvOptions env;
4990   ROCKSDB_NAMESPACE::SstFileWriter writer(env, options);
4991   auto s = writer.Open(f);
4992   ASSERT_OK(s);
4993   // ASSERT_OK(writer.Put(Key(), ""));
4994   ASSERT_OK(writer.Put(key, value));
4995 
4996   ASSERT_OK(writer.Finish(&info));
4997   IngestExternalFileOptions ingest_opt;
4998 
4999   ASSERT_OK(db->IngestExternalFile({info.file_path}, ingest_opt));
5000 }
5001 
TEST_P(DBCompactionTestWithParam,FlushAfterIntraL0CompactionCheckConsistencyFail)5002 TEST_P(DBCompactionTestWithParam,
5003        FlushAfterIntraL0CompactionCheckConsistencyFail) {
5004   Options options = CurrentOptions();
5005   options.force_consistency_checks = true;
5006   options.compression = kNoCompression;
5007   options.level0_file_num_compaction_trigger = 5;
5008   options.max_background_compactions = 2;
5009   options.max_subcompactions = max_subcompactions_;
5010   DestroyAndReopen(options);
5011 
5012   const size_t kValueSize = 1 << 20;
5013   Random rnd(301);
5014   std::atomic<int> pick_intra_l0_count(0);
5015   std::string value(RandomString(&rnd, kValueSize));
5016 
5017   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
5018       {{"DBCompactionTestWithParam::FlushAfterIntraL0:1",
5019         "CompactionJob::Run():Start"}});
5020   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
5021       "FindIntraL0Compaction",
5022       [&](void* /*arg*/) { pick_intra_l0_count.fetch_add(1); });
5023 
5024   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
5025 
5026   // prevents trivial move
5027   for (int i = 0; i < 10; ++i) {
5028     ASSERT_OK(Put(Key(i), ""));  // prevents trivial move
5029   }
5030   ASSERT_OK(Flush());
5031   Compact("", Key(99));
5032   ASSERT_EQ(0, NumTableFilesAtLevel(0));
5033 
5034   // Flush 5 L0 sst.
5035   for (int i = 0; i < 5; ++i) {
5036     ASSERT_OK(Put(Key(i + 1), value));
5037     ASSERT_OK(Flush());
5038   }
5039   ASSERT_EQ(5, NumTableFilesAtLevel(0));
5040 
5041   // Put one key, to make smallest log sequence number in this memtable is less
5042   // than sst which would be ingested in next step.
5043   ASSERT_OK(Put(Key(0), "a"));
5044 
5045   ASSERT_EQ(5, NumTableFilesAtLevel(0));
5046 
5047   // Ingest 5 L0 sst. And this files would trigger PickIntraL0Compaction.
5048   for (int i = 5; i < 10; i++) {
5049     IngestOneKeyValue(dbfull(), Key(i), value, options);
5050     ASSERT_EQ(i + 1, NumTableFilesAtLevel(0));
5051   }
5052 
5053   TEST_SYNC_POINT("DBCompactionTestWithParam::FlushAfterIntraL0:1");
5054   // Put one key, to make biggest log sequence number in this memtable is bigger
5055   // than sst which would be ingested in next step.
5056   ASSERT_OK(Put(Key(2), "b"));
5057   ASSERT_EQ(10, NumTableFilesAtLevel(0));
5058   dbfull()->TEST_WaitForCompact();
5059   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
5060   std::vector<std::vector<FileMetaData>> level_to_files;
5061   dbfull()->TEST_GetFilesMetaData(dbfull()->DefaultColumnFamily(),
5062                                   &level_to_files);
5063   ASSERT_GT(level_to_files[0].size(), 0);
5064   ASSERT_GT(pick_intra_l0_count.load(), 0);
5065 
5066   ASSERT_OK(Flush());
5067 }
5068 
TEST_P(DBCompactionTestWithParam,IntraL0CompactionAfterFlushCheckConsistencyFail)5069 TEST_P(DBCompactionTestWithParam,
5070        IntraL0CompactionAfterFlushCheckConsistencyFail) {
5071   Options options = CurrentOptions();
5072   options.force_consistency_checks = true;
5073   options.compression = kNoCompression;
5074   options.level0_file_num_compaction_trigger = 5;
5075   options.max_background_compactions = 2;
5076   options.max_subcompactions = max_subcompactions_;
5077   options.write_buffer_size = 2 << 20;
5078   options.max_write_buffer_number = 6;
5079   DestroyAndReopen(options);
5080 
5081   const size_t kValueSize = 1 << 20;
5082   Random rnd(301);
5083   std::string value(RandomString(&rnd, kValueSize));
5084   std::string value2(RandomString(&rnd, kValueSize));
5085   std::string bigvalue = value + value;
5086 
5087   // prevents trivial move
5088   for (int i = 0; i < 10; ++i) {
5089     ASSERT_OK(Put(Key(i), ""));  // prevents trivial move
5090   }
5091   ASSERT_OK(Flush());
5092   Compact("", Key(99));
5093   ASSERT_EQ(0, NumTableFilesAtLevel(0));
5094 
5095   std::atomic<int> pick_intra_l0_count(0);
5096   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
5097       {{"DBCompactionTestWithParam::IntraL0CompactionAfterFlush:1",
5098         "CompactionJob::Run():Start"}});
5099   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
5100       "FindIntraL0Compaction",
5101       [&](void* /*arg*/) { pick_intra_l0_count.fetch_add(1); });
5102   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
5103   // Make 6 L0 sst.
5104   for (int i = 0; i < 6; ++i) {
5105     if (i % 2 == 0) {
5106       IngestOneKeyValue(dbfull(), Key(i), value, options);
5107     } else {
5108       ASSERT_OK(Put(Key(i), value));
5109       ASSERT_OK(Flush());
5110     }
5111   }
5112 
5113   ASSERT_EQ(6, NumTableFilesAtLevel(0));
5114 
5115   // Stop run flush job
5116   env_->SetBackgroundThreads(1, Env::HIGH);
5117   test::SleepingBackgroundTask sleeping_tasks;
5118   env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_tasks,
5119                  Env::Priority::HIGH);
5120   sleeping_tasks.WaitUntilSleeping();
5121 
5122   // Put many keys to make memtable request to flush
5123   for (int i = 0; i < 6; ++i) {
5124     ASSERT_OK(Put(Key(i), bigvalue));
5125   }
5126 
5127   ASSERT_EQ(6, NumTableFilesAtLevel(0));
5128   // ingest file to trigger IntraL0Compaction
5129   for (int i = 6; i < 10; ++i) {
5130     ASSERT_EQ(i, NumTableFilesAtLevel(0));
5131     IngestOneKeyValue(dbfull(), Key(i), value2, options);
5132   }
5133   ASSERT_EQ(10, NumTableFilesAtLevel(0));
5134 
5135   // Wake up flush job
5136   sleeping_tasks.WakeUp();
5137   sleeping_tasks.WaitUntilDone();
5138   TEST_SYNC_POINT("DBCompactionTestWithParam::IntraL0CompactionAfterFlush:1");
5139   dbfull()->TEST_WaitForCompact();
5140   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
5141 
5142   uint64_t error_count = 0;
5143   db_->GetIntProperty("rocksdb.background-errors", &error_count);
5144   ASSERT_EQ(error_count, 0);
5145   ASSERT_GT(pick_intra_l0_count.load(), 0);
5146   for (int i = 0; i < 6; ++i) {
5147     ASSERT_EQ(bigvalue, Get(Key(i)));
5148   }
5149   for (int i = 6; i < 10; ++i) {
5150     ASSERT_EQ(value2, Get(Key(i)));
5151   }
5152 }
5153 
5154 #endif // !defined(ROCKSDB_LITE)
5155 }  // namespace ROCKSDB_NAMESPACE
5156 
main(int argc,char ** argv)5157 int main(int argc, char** argv) {
5158 #if !defined(ROCKSDB_LITE)
5159   ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
5160   ::testing::InitGoogleTest(&argc, argv);
5161   return RUN_ALL_TESTS();
5162 #else
5163   (void) argc;
5164   (void) argv;
5165   return 0;
5166 #endif
5167 }
5168