1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #ifndef ROCKSDB_LITE
7 
8 #include "db/compaction/compaction_job.h"
9 
10 #include <algorithm>
11 #include <array>
12 #include <cinttypes>
13 #include <map>
14 #include <string>
15 #include <tuple>
16 
17 #include "db/blob/blob_index.h"
18 #include "db/column_family.h"
19 #include "db/db_impl/db_impl.h"
20 #include "db/error_handler.h"
21 #include "db/version_set.h"
22 #include "file/writable_file_writer.h"
23 #include "rocksdb/cache.h"
24 #include "rocksdb/db.h"
25 #include "rocksdb/file_system.h"
26 #include "rocksdb/options.h"
27 #include "rocksdb/write_buffer_manager.h"
28 #include "table/mock_table.h"
29 #include "test_util/testharness.h"
30 #include "test_util/testutil.h"
31 #include "util/string_util.h"
32 #include "utilities/merge_operators.h"
33 
34 namespace ROCKSDB_NAMESPACE {
35 
36 namespace {
37 
VerifyInitializationOfCompactionJobStats(const CompactionJobStats & compaction_job_stats)38 void VerifyInitializationOfCompactionJobStats(
39       const CompactionJobStats& compaction_job_stats) {
40 #if !defined(IOS_CROSS_COMPILE)
41   ASSERT_EQ(compaction_job_stats.elapsed_micros, 0U);
42 
43   ASSERT_EQ(compaction_job_stats.num_input_records, 0U);
44   ASSERT_EQ(compaction_job_stats.num_input_files, 0U);
45   ASSERT_EQ(compaction_job_stats.num_input_files_at_output_level, 0U);
46 
47   ASSERT_EQ(compaction_job_stats.num_output_records, 0U);
48   ASSERT_EQ(compaction_job_stats.num_output_files, 0U);
49 
50   ASSERT_EQ(compaction_job_stats.is_manual_compaction, true);
51 
52   ASSERT_EQ(compaction_job_stats.total_input_bytes, 0U);
53   ASSERT_EQ(compaction_job_stats.total_output_bytes, 0U);
54 
55   ASSERT_EQ(compaction_job_stats.total_input_raw_key_bytes, 0U);
56   ASSERT_EQ(compaction_job_stats.total_input_raw_value_bytes, 0U);
57 
58   ASSERT_EQ(compaction_job_stats.smallest_output_key_prefix[0], 0);
59   ASSERT_EQ(compaction_job_stats.largest_output_key_prefix[0], 0);
60 
61   ASSERT_EQ(compaction_job_stats.num_records_replaced, 0U);
62 
63   ASSERT_EQ(compaction_job_stats.num_input_deletion_records, 0U);
64   ASSERT_EQ(compaction_job_stats.num_expired_deletion_records, 0U);
65 
66   ASSERT_EQ(compaction_job_stats.num_corrupt_keys, 0U);
67 #endif  // !defined(IOS_CROSS_COMPILE)
68 }
69 
70 }  // namespace
71 
72 class CompactionJobTestBase : public testing::Test {
73  protected:
CompactionJobTestBase(std::string dbname,const Comparator * ucmp,std::function<std::string (uint64_t)> encode_u64_ts)74   CompactionJobTestBase(std::string dbname, const Comparator* ucmp,
75                         std::function<std::string(uint64_t)> encode_u64_ts)
76       : env_(Env::Default()),
77         fs_(env_->GetFileSystem()),
78         dbname_(std::move(dbname)),
79         ucmp_(ucmp),
80         db_options_(),
81         mutable_cf_options_(cf_options_),
82         mutable_db_options_(),
83         table_cache_(NewLRUCache(50000, 16)),
84         write_buffer_manager_(db_options_.db_write_buffer_size),
85         versions_(new VersionSet(dbname_, &db_options_, env_options_,
86                                  table_cache_.get(), &write_buffer_manager_,
87                                  &write_controller_,
88                                  /*block_cache_tracer=*/nullptr,
89                                  /*io_tracer=*/nullptr, /*db_session_id*/ "")),
90         shutting_down_(false),
91         preserve_deletes_seqnum_(0),
92         mock_table_factory_(new mock::MockTableFactory()),
93         error_handler_(nullptr, db_options_, &mutex_),
94         encode_u64_ts_(std::move(encode_u64_ts)) {}
95 
SetUp()96   void SetUp() override {
97     EXPECT_OK(env_->CreateDirIfMissing(dbname_));
98     db_options_.env = env_;
99     db_options_.fs = fs_;
100     db_options_.db_paths.emplace_back(dbname_,
101                                       std::numeric_limits<uint64_t>::max());
102     cf_options_.comparator = ucmp_;
103     cf_options_.table_factory = mock_table_factory_;
104   }
105 
GenerateFileName(uint64_t file_number)106   std::string GenerateFileName(uint64_t file_number) {
107     FileMetaData meta;
108     std::vector<DbPath> db_paths;
109     db_paths.emplace_back(dbname_, std::numeric_limits<uint64_t>::max());
110     meta.fd = FileDescriptor(file_number, 0, 0);
111     return TableFileName(db_paths, meta.fd.GetNumber(), meta.fd.GetPathId());
112   }
113 
KeyStr(const std::string & user_key,const SequenceNumber seq_num,const ValueType t,uint64_t ts=0)114   std::string KeyStr(const std::string& user_key, const SequenceNumber seq_num,
115                      const ValueType t, uint64_t ts = 0) {
116     std::string user_key_with_ts = user_key + encode_u64_ts_(ts);
117     return InternalKey(user_key_with_ts, seq_num, t).Encode().ToString();
118   }
119 
BlobStr(uint64_t blob_file_number,uint64_t offset,uint64_t size)120   static std::string BlobStr(uint64_t blob_file_number, uint64_t offset,
121                              uint64_t size) {
122     std::string blob_index;
123     BlobIndex::EncodeBlob(&blob_index, blob_file_number, offset, size,
124                           kNoCompression);
125     return blob_index;
126   }
127 
BlobStrTTL(uint64_t blob_file_number,uint64_t offset,uint64_t size,uint64_t expiration)128   static std::string BlobStrTTL(uint64_t blob_file_number, uint64_t offset,
129                                 uint64_t size, uint64_t expiration) {
130     std::string blob_index;
131     BlobIndex::EncodeBlobTTL(&blob_index, expiration, blob_file_number, offset,
132                              size, kNoCompression);
133     return blob_index;
134   }
135 
BlobStrInlinedTTL(const Slice & value,uint64_t expiration)136   static std::string BlobStrInlinedTTL(const Slice& value,
137                                        uint64_t expiration) {
138     std::string blob_index;
139     BlobIndex::EncodeInlinedTTL(&blob_index, expiration, value);
140     return blob_index;
141   }
142 
AddMockFile(const mock::KVVector & contents,int level=0)143   void AddMockFile(const mock::KVVector& contents, int level = 0) {
144     assert(contents.size() > 0);
145 
146     bool first_key = true;
147     std::string smallest, largest;
148     InternalKey smallest_key, largest_key;
149     SequenceNumber smallest_seqno = kMaxSequenceNumber;
150     SequenceNumber largest_seqno = 0;
151     uint64_t oldest_blob_file_number = kInvalidBlobFileNumber;
152     for (auto kv : contents) {
153       ParsedInternalKey key;
154       std::string skey;
155       std::string value;
156       std::tie(skey, value) = kv;
157       const Status pik_status =
158           ParseInternalKey(skey, &key, true /* log_err_key */);
159 
160       smallest_seqno = std::min(smallest_seqno, key.sequence);
161       largest_seqno = std::max(largest_seqno, key.sequence);
162 
163       if (first_key ||
164           cfd_->user_comparator()->Compare(key.user_key, smallest) < 0) {
165         smallest.assign(key.user_key.data(), key.user_key.size());
166         smallest_key.DecodeFrom(skey);
167       }
168       if (first_key ||
169           cfd_->user_comparator()->Compare(key.user_key, largest) > 0) {
170         largest.assign(key.user_key.data(), key.user_key.size());
171         largest_key.DecodeFrom(skey);
172       }
173 
174       first_key = false;
175 
176       if (pik_status.ok() && key.type == kTypeBlobIndex) {
177         BlobIndex blob_index;
178         const Status s = blob_index.DecodeFrom(value);
179         if (!s.ok()) {
180           continue;
181         }
182 
183         if (blob_index.IsInlined() || blob_index.HasTTL() ||
184             blob_index.file_number() == kInvalidBlobFileNumber) {
185           continue;
186         }
187 
188         if (oldest_blob_file_number == kInvalidBlobFileNumber ||
189             oldest_blob_file_number > blob_index.file_number()) {
190           oldest_blob_file_number = blob_index.file_number();
191         }
192       }
193     }
194 
195     uint64_t file_number = versions_->NewFileNumber();
196     EXPECT_OK(mock_table_factory_->CreateMockTable(
197         env_, GenerateFileName(file_number), std::move(contents)));
198 
199     VersionEdit edit;
200     edit.AddFile(level, file_number, 0, 10, smallest_key, largest_key,
201                  smallest_seqno, largest_seqno, false, oldest_blob_file_number,
202                  kUnknownOldestAncesterTime, kUnknownFileCreationTime,
203                  kUnknownFileChecksum, kUnknownFileChecksumFuncName);
204 
205     mutex_.Lock();
206     EXPECT_OK(
207         versions_->LogAndApply(versions_->GetColumnFamilySet()->GetDefault(),
208                                mutable_cf_options_, &edit, &mutex_));
209     mutex_.Unlock();
210   }
211 
SetLastSequence(const SequenceNumber sequence_number)212   void SetLastSequence(const SequenceNumber sequence_number) {
213     versions_->SetLastAllocatedSequence(sequence_number + 1);
214     versions_->SetLastPublishedSequence(sequence_number + 1);
215     versions_->SetLastSequence(sequence_number + 1);
216   }
217 
218   // returns expected result after compaction
CreateTwoFiles(bool gen_corrupted_keys)219   mock::KVVector CreateTwoFiles(bool gen_corrupted_keys) {
220     stl_wrappers::KVMap expected_results;
221     constexpr int kKeysPerFile = 10000;
222     constexpr int kCorruptKeysPerFile = 200;
223     constexpr int kMatchingKeys = kKeysPerFile / 2;
224     SequenceNumber sequence_number = 0;
225 
226     auto corrupt_id = [&](int id) {
227       return gen_corrupted_keys && id > 0 && id <= kCorruptKeysPerFile;
228     };
229 
230     for (int i = 0; i < 2; ++i) {
231       auto contents = mock::MakeMockFile();
232       for (int k = 0; k < kKeysPerFile; ++k) {
233         auto key = ToString(i * kMatchingKeys + k);
234         auto value = ToString(i * kKeysPerFile + k);
235         InternalKey internal_key(key, ++sequence_number, kTypeValue);
236 
237         // This is how the key will look like once it's written in bottommost
238         // file
239         InternalKey bottommost_internal_key(
240             key, 0, kTypeValue);
241 
242         if (corrupt_id(k)) {
243           test::CorruptKeyType(&internal_key);
244           test::CorruptKeyType(&bottommost_internal_key);
245         }
246         contents.push_back({internal_key.Encode().ToString(), value});
247         if (i == 1 || k < kMatchingKeys || corrupt_id(k - kMatchingKeys)) {
248           expected_results.insert(
249               {bottommost_internal_key.Encode().ToString(), value});
250         }
251       }
252       mock::SortKVVector(&contents, ucmp_);
253 
254       AddMockFile(contents);
255     }
256 
257     SetLastSequence(sequence_number);
258 
259     mock::KVVector expected_results_kvvector;
260     for (auto& kv : expected_results) {
261       expected_results_kvvector.push_back({kv.first, kv.second});
262     }
263 
264     return expected_results_kvvector;
265   }
266 
NewDB()267   void NewDB() {
268     EXPECT_OK(DestroyDB(dbname_, Options()));
269     EXPECT_OK(env_->CreateDirIfMissing(dbname_));
270     versions_.reset(
271         new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
272                        &write_buffer_manager_, &write_controller_,
273                        /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
274                        /*db_session_id*/ ""));
275     compaction_job_stats_.Reset();
276     ASSERT_OK(SetIdentityFile(env_, dbname_));
277 
278     VersionEdit new_db;
279     new_db.SetLogNumber(0);
280     new_db.SetNextFile(2);
281     new_db.SetLastSequence(0);
282 
283     const std::string manifest = DescriptorFileName(dbname_, 1);
284     std::unique_ptr<WritableFileWriter> file_writer;
285     const auto& fs = env_->GetFileSystem();
286     Status s = WritableFileWriter::Create(
287         fs, manifest, fs->OptimizeForManifestWrite(env_options_), &file_writer,
288         nullptr);
289 
290     ASSERT_OK(s);
291     {
292       log::Writer log(std::move(file_writer), 0, false);
293       std::string record;
294       new_db.EncodeTo(&record);
295       s = log.AddRecord(record);
296     }
297     ASSERT_OK(s);
298     // Make "CURRENT" file that points to the new manifest file.
299     s = SetCurrentFile(fs_.get(), dbname_, 1, nullptr);
300 
301     ASSERT_OK(s);
302 
303     cf_options_.merge_operator = merge_op_;
304     cf_options_.compaction_filter = compaction_filter_.get();
305     std::vector<ColumnFamilyDescriptor> column_families;
306     column_families.emplace_back(kDefaultColumnFamilyName, cf_options_);
307 
308     ASSERT_OK(versions_->Recover(column_families, false));
309     cfd_ = versions_->GetColumnFamilySet()->GetDefault();
310   }
311 
RunCompaction(const std::vector<std::vector<FileMetaData * >> & input_files,const mock::KVVector & expected_results,const std::vector<SequenceNumber> & snapshots={},SequenceNumber earliest_write_conflict_snapshot=kMaxSequenceNumber,int output_level=1,bool verify=true,uint64_t expected_oldest_blob_file_number=kInvalidBlobFileNumber)312   void RunCompaction(
313       const std::vector<std::vector<FileMetaData*>>& input_files,
314       const mock::KVVector& expected_results,
315       const std::vector<SequenceNumber>& snapshots = {},
316       SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber,
317       int output_level = 1, bool verify = true,
318       uint64_t expected_oldest_blob_file_number = kInvalidBlobFileNumber) {
319     auto cfd = versions_->GetColumnFamilySet()->GetDefault();
320 
321     size_t num_input_files = 0;
322     std::vector<CompactionInputFiles> compaction_input_files;
323     for (size_t level = 0; level < input_files.size(); level++) {
324       auto level_files = input_files[level];
325       CompactionInputFiles compaction_level;
326       compaction_level.level = static_cast<int>(level);
327       compaction_level.files.insert(compaction_level.files.end(),
328           level_files.begin(), level_files.end());
329       compaction_input_files.push_back(compaction_level);
330       num_input_files += level_files.size();
331     }
332 
333     Compaction compaction(
334         cfd->current()->storage_info(), *cfd->ioptions(),
335         *cfd->GetLatestMutableCFOptions(), mutable_db_options_,
336         compaction_input_files, output_level, 1024 * 1024, 10 * 1024 * 1024, 0,
337         kNoCompression, cfd->GetLatestMutableCFOptions()->compression_opts,
338         Temperature::kUnknown, 0, {}, true);
339     compaction.SetInputVersion(cfd->current());
340 
341     LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
342     mutex_.Lock();
343     EventLogger event_logger(db_options_.info_log.get());
344     // TODO(yiwu) add a mock snapshot checker and add test for it.
345     SnapshotChecker* snapshot_checker = nullptr;
346     ASSERT_TRUE(full_history_ts_low_.empty() ||
347                 ucmp_->timestamp_size() == full_history_ts_low_.size());
348     CompactionJob compaction_job(
349         0, &compaction, db_options_, mutable_db_options_, env_options_,
350         versions_.get(), &shutting_down_, preserve_deletes_seqnum_, &log_buffer,
351         nullptr, nullptr, nullptr, nullptr, &mutex_, &error_handler_, snapshots,
352         earliest_write_conflict_snapshot, snapshot_checker, table_cache_,
353         &event_logger, false, false, dbname_, &compaction_job_stats_,
354         Env::Priority::USER, nullptr /* IOTracer */,
355         /*manual_compaction_paused=*/nullptr,
356         /*manual_compaction_canceled=*/nullptr, /*db_id=*/"",
357         /*db_session_id=*/"", full_history_ts_low_);
358     VerifyInitializationOfCompactionJobStats(compaction_job_stats_);
359 
360     compaction_job.Prepare();
361     mutex_.Unlock();
362     Status s = compaction_job.Run();
363     ASSERT_OK(s);
364     ASSERT_OK(compaction_job.io_status());
365     mutex_.Lock();
366     ASSERT_OK(compaction_job.Install(*cfd->GetLatestMutableCFOptions()));
367     ASSERT_OK(compaction_job.io_status());
368     mutex_.Unlock();
369 
370     if (verify) {
371       ASSERT_GE(compaction_job_stats_.elapsed_micros, 0U);
372       ASSERT_EQ(compaction_job_stats_.num_input_files, num_input_files);
373 
374       if (expected_results.empty()) {
375         ASSERT_EQ(compaction_job_stats_.num_output_files, 0U);
376       } else {
377         ASSERT_EQ(compaction_job_stats_.num_output_files, 1U);
378         mock_table_factory_->AssertLatestFile(expected_results);
379 
380         auto output_files =
381             cfd->current()->storage_info()->LevelFiles(output_level);
382         ASSERT_EQ(output_files.size(), 1);
383         ASSERT_EQ(output_files[0]->oldest_blob_file_number,
384                   expected_oldest_blob_file_number);
385       }
386     }
387   }
388 
389   Env* env_;
390   std::shared_ptr<FileSystem> fs_;
391   std::string dbname_;
392   const Comparator* const ucmp_;
393   EnvOptions env_options_;
394   ImmutableDBOptions db_options_;
395   ColumnFamilyOptions cf_options_;
396   MutableCFOptions mutable_cf_options_;
397   MutableDBOptions mutable_db_options_;
398   std::shared_ptr<Cache> table_cache_;
399   WriteController write_controller_;
400   WriteBufferManager write_buffer_manager_;
401   std::unique_ptr<VersionSet> versions_;
402   InstrumentedMutex mutex_;
403   std::atomic<bool> shutting_down_;
404   SequenceNumber preserve_deletes_seqnum_;
405   std::shared_ptr<mock::MockTableFactory> mock_table_factory_;
406   CompactionJobStats compaction_job_stats_;
407   ColumnFamilyData* cfd_;
408   std::unique_ptr<CompactionFilter> compaction_filter_;
409   std::shared_ptr<MergeOperator> merge_op_;
410   ErrorHandler error_handler_;
411   std::string full_history_ts_low_;
412   const std::function<std::string(uint64_t)> encode_u64_ts_;
413 };
414 
415 // TODO(icanadi) Make it simpler once we mock out VersionSet
416 class CompactionJobTest : public CompactionJobTestBase {
417  public:
CompactionJobTest()418   CompactionJobTest()
419       : CompactionJobTestBase(test::PerThreadDBPath("compaction_job_test"),
420                               BytewiseComparator(),
421                               [](uint64_t /*ts*/) { return ""; }) {}
422 };
423 
TEST_F(CompactionJobTest,Simple)424 TEST_F(CompactionJobTest, Simple) {
425   NewDB();
426 
427   auto expected_results = CreateTwoFiles(false);
428   auto cfd = versions_->GetColumnFamilySet()->GetDefault();
429   auto files = cfd->current()->storage_info()->LevelFiles(0);
430   ASSERT_EQ(2U, files.size());
431   RunCompaction({ files }, expected_results);
432 }
433 
TEST_F(CompactionJobTest,DISABLED_SimpleCorrupted)434 TEST_F(CompactionJobTest, DISABLED_SimpleCorrupted) {
435   NewDB();
436 
437   auto expected_results = CreateTwoFiles(true);
438   auto cfd = versions_->GetColumnFamilySet()->GetDefault();
439   auto files = cfd->current()->storage_info()->LevelFiles(0);
440   RunCompaction({files}, expected_results);
441   ASSERT_EQ(compaction_job_stats_.num_corrupt_keys, 400U);
442 }
443 
TEST_F(CompactionJobTest,SimpleDeletion)444 TEST_F(CompactionJobTest, SimpleDeletion) {
445   NewDB();
446 
447   auto file1 = mock::MakeMockFile({{KeyStr("c", 4U, kTypeDeletion), ""},
448                                    {KeyStr("c", 3U, kTypeValue), "val"}});
449   AddMockFile(file1);
450 
451   auto file2 = mock::MakeMockFile({{KeyStr("b", 2U, kTypeValue), "val"},
452                                    {KeyStr("b", 1U, kTypeValue), "val"}});
453   AddMockFile(file2);
454 
455   auto expected_results =
456       mock::MakeMockFile({{KeyStr("b", 0U, kTypeValue), "val"}});
457 
458   SetLastSequence(4U);
459   auto files = cfd_->current()->storage_info()->LevelFiles(0);
460   RunCompaction({files}, expected_results);
461 }
462 
TEST_F(CompactionJobTest,OutputNothing)463 TEST_F(CompactionJobTest, OutputNothing) {
464   NewDB();
465 
466   auto file1 = mock::MakeMockFile({{KeyStr("a", 1U, kTypeValue), "val"}});
467 
468   AddMockFile(file1);
469 
470   auto file2 = mock::MakeMockFile({{KeyStr("a", 2U, kTypeDeletion), ""}});
471 
472   AddMockFile(file2);
473 
474   auto expected_results = mock::MakeMockFile();
475 
476   SetLastSequence(4U);
477   auto files = cfd_->current()->storage_info()->LevelFiles(0);
478   RunCompaction({files}, expected_results);
479 }
480 
TEST_F(CompactionJobTest,SimpleOverwrite)481 TEST_F(CompactionJobTest, SimpleOverwrite) {
482   NewDB();
483 
484   auto file1 = mock::MakeMockFile({
485       {KeyStr("a", 3U, kTypeValue), "val2"},
486       {KeyStr("b", 4U, kTypeValue), "val3"},
487   });
488   AddMockFile(file1);
489 
490   auto file2 = mock::MakeMockFile({{KeyStr("a", 1U, kTypeValue), "val"},
491                                    {KeyStr("b", 2U, kTypeValue), "val"}});
492   AddMockFile(file2);
493 
494   auto expected_results =
495       mock::MakeMockFile({{KeyStr("a", 0U, kTypeValue), "val2"},
496                           {KeyStr("b", 0U, kTypeValue), "val3"}});
497 
498   SetLastSequence(4U);
499   auto files = cfd_->current()->storage_info()->LevelFiles(0);
500   RunCompaction({files}, expected_results);
501 }
502 
TEST_F(CompactionJobTest,SimpleNonLastLevel)503 TEST_F(CompactionJobTest, SimpleNonLastLevel) {
504   NewDB();
505 
506   auto file1 = mock::MakeMockFile({
507       {KeyStr("a", 5U, kTypeValue), "val2"},
508       {KeyStr("b", 6U, kTypeValue), "val3"},
509   });
510   AddMockFile(file1);
511 
512   auto file2 = mock::MakeMockFile({{KeyStr("a", 3U, kTypeValue), "val"},
513                                    {KeyStr("b", 4U, kTypeValue), "val"}});
514   AddMockFile(file2, 1);
515 
516   auto file3 = mock::MakeMockFile({{KeyStr("a", 1U, kTypeValue), "val"},
517                                    {KeyStr("b", 2U, kTypeValue), "val"}});
518   AddMockFile(file3, 2);
519 
520   // Because level 1 is not the last level, the sequence numbers of a and b
521   // cannot be set to 0
522   auto expected_results =
523       mock::MakeMockFile({{KeyStr("a", 5U, kTypeValue), "val2"},
524                           {KeyStr("b", 6U, kTypeValue), "val3"}});
525 
526   SetLastSequence(6U);
527   auto lvl0_files = cfd_->current()->storage_info()->LevelFiles(0);
528   auto lvl1_files = cfd_->current()->storage_info()->LevelFiles(1);
529   RunCompaction({lvl0_files, lvl1_files}, expected_results);
530 }
531 
TEST_F(CompactionJobTest,SimpleMerge)532 TEST_F(CompactionJobTest, SimpleMerge) {
533   merge_op_ = MergeOperators::CreateStringAppendOperator();
534   NewDB();
535 
536   auto file1 = mock::MakeMockFile({
537       {KeyStr("a", 5U, kTypeMerge), "5"},
538       {KeyStr("a", 4U, kTypeMerge), "4"},
539       {KeyStr("a", 3U, kTypeValue), "3"},
540   });
541   AddMockFile(file1);
542 
543   auto file2 = mock::MakeMockFile(
544       {{KeyStr("b", 2U, kTypeMerge), "2"}, {KeyStr("b", 1U, kTypeValue), "1"}});
545   AddMockFile(file2);
546 
547   auto expected_results =
548       mock::MakeMockFile({{KeyStr("a", 0U, kTypeValue), "3,4,5"},
549                           {KeyStr("b", 0U, kTypeValue), "1,2"}});
550 
551   SetLastSequence(5U);
552   auto files = cfd_->current()->storage_info()->LevelFiles(0);
553   RunCompaction({files}, expected_results);
554 }
555 
TEST_F(CompactionJobTest,NonAssocMerge)556 TEST_F(CompactionJobTest, NonAssocMerge) {
557   merge_op_ = MergeOperators::CreateStringAppendTESTOperator();
558   NewDB();
559 
560   auto file1 = mock::MakeMockFile({
561       {KeyStr("a", 5U, kTypeMerge), "5"},
562       {KeyStr("a", 4U, kTypeMerge), "4"},
563       {KeyStr("a", 3U, kTypeMerge), "3"},
564   });
565   AddMockFile(file1);
566 
567   auto file2 = mock::MakeMockFile(
568       {{KeyStr("b", 2U, kTypeMerge), "2"}, {KeyStr("b", 1U, kTypeMerge), "1"}});
569   AddMockFile(file2);
570 
571   auto expected_results =
572       mock::MakeMockFile({{KeyStr("a", 0U, kTypeValue), "3,4,5"},
573                           {KeyStr("b", 0U, kTypeValue), "1,2"}});
574 
575   SetLastSequence(5U);
576   auto files = cfd_->current()->storage_info()->LevelFiles(0);
577   RunCompaction({files}, expected_results);
578 }
579 
580 // Filters merge operands with value 10.
TEST_F(CompactionJobTest,MergeOperandFilter)581 TEST_F(CompactionJobTest, MergeOperandFilter) {
582   merge_op_ = MergeOperators::CreateUInt64AddOperator();
583   compaction_filter_.reset(new test::FilterNumber(10U));
584   NewDB();
585 
586   auto file1 = mock::MakeMockFile(
587       {{KeyStr("a", 5U, kTypeMerge), test::EncodeInt(5U)},
588        {KeyStr("a", 4U, kTypeMerge), test::EncodeInt(10U)},  // Filtered
589        {KeyStr("a", 3U, kTypeMerge), test::EncodeInt(3U)}});
590   AddMockFile(file1);
591 
592   auto file2 = mock::MakeMockFile({
593       {KeyStr("b", 2U, kTypeMerge), test::EncodeInt(2U)},
594       {KeyStr("b", 1U, kTypeMerge), test::EncodeInt(10U)}  // Filtered
595   });
596   AddMockFile(file2);
597 
598   auto expected_results =
599       mock::MakeMockFile({{KeyStr("a", 0U, kTypeValue), test::EncodeInt(8U)},
600                           {KeyStr("b", 0U, kTypeValue), test::EncodeInt(2U)}});
601 
602   SetLastSequence(5U);
603   auto files = cfd_->current()->storage_info()->LevelFiles(0);
604   RunCompaction({files}, expected_results);
605 }
606 
TEST_F(CompactionJobTest,FilterSomeMergeOperands)607 TEST_F(CompactionJobTest, FilterSomeMergeOperands) {
608   merge_op_ = MergeOperators::CreateUInt64AddOperator();
609   compaction_filter_.reset(new test::FilterNumber(10U));
610   NewDB();
611 
612   auto file1 = mock::MakeMockFile(
613       {{KeyStr("a", 5U, kTypeMerge), test::EncodeInt(5U)},
614        {KeyStr("a", 4U, kTypeMerge), test::EncodeInt(10U)},  // Filtered
615        {KeyStr("a", 3U, kTypeValue), test::EncodeInt(5U)},
616        {KeyStr("d", 8U, kTypeMerge), test::EncodeInt(10U)}});
617   AddMockFile(file1);
618 
619   auto file2 =
620       mock::MakeMockFile({{KeyStr("b", 2U, kTypeMerge), test::EncodeInt(10U)},
621                           {KeyStr("b", 1U, kTypeMerge), test::EncodeInt(10U)},
622                           {KeyStr("c", 2U, kTypeMerge), test::EncodeInt(3U)},
623                           {KeyStr("c", 1U, kTypeValue), test::EncodeInt(7U)},
624                           {KeyStr("d", 1U, kTypeValue), test::EncodeInt(6U)}});
625   AddMockFile(file2);
626 
627   auto file3 =
628       mock::MakeMockFile({{KeyStr("a", 1U, kTypeMerge), test::EncodeInt(3U)}});
629   AddMockFile(file3, 2);
630 
631   auto expected_results = mock::MakeMockFile({
632       {KeyStr("a", 5U, kTypeValue), test::EncodeInt(10U)},
633       {KeyStr("c", 2U, kTypeValue), test::EncodeInt(10U)},
634       {KeyStr("d", 1U, kTypeValue), test::EncodeInt(6U)}
635       // b does not appear because the operands are filtered
636   });
637 
638   SetLastSequence(5U);
639   auto files = cfd_->current()->storage_info()->LevelFiles(0);
640   RunCompaction({files}, expected_results);
641 }
642 
643 // Test where all operands/merge results are filtered out.
TEST_F(CompactionJobTest,FilterAllMergeOperands)644 TEST_F(CompactionJobTest, FilterAllMergeOperands) {
645   merge_op_ = MergeOperators::CreateUInt64AddOperator();
646   compaction_filter_.reset(new test::FilterNumber(10U));
647   NewDB();
648 
649   auto file1 =
650       mock::MakeMockFile({{KeyStr("a", 11U, kTypeMerge), test::EncodeInt(10U)},
651                           {KeyStr("a", 10U, kTypeMerge), test::EncodeInt(10U)},
652                           {KeyStr("a", 9U, kTypeMerge), test::EncodeInt(10U)}});
653   AddMockFile(file1);
654 
655   auto file2 =
656       mock::MakeMockFile({{KeyStr("b", 8U, kTypeMerge), test::EncodeInt(10U)},
657                           {KeyStr("b", 7U, kTypeMerge), test::EncodeInt(10U)},
658                           {KeyStr("b", 6U, kTypeMerge), test::EncodeInt(10U)},
659                           {KeyStr("b", 5U, kTypeMerge), test::EncodeInt(10U)},
660                           {KeyStr("b", 4U, kTypeMerge), test::EncodeInt(10U)},
661                           {KeyStr("b", 3U, kTypeMerge), test::EncodeInt(10U)},
662                           {KeyStr("b", 2U, kTypeMerge), test::EncodeInt(10U)},
663                           {KeyStr("c", 2U, kTypeMerge), test::EncodeInt(10U)},
664                           {KeyStr("c", 1U, kTypeMerge), test::EncodeInt(10U)}});
665   AddMockFile(file2);
666 
667   auto file3 =
668       mock::MakeMockFile({{KeyStr("a", 2U, kTypeMerge), test::EncodeInt(10U)},
669                           {KeyStr("b", 1U, kTypeMerge), test::EncodeInt(10U)}});
670   AddMockFile(file3, 2);
671 
672   SetLastSequence(11U);
673   auto files = cfd_->current()->storage_info()->LevelFiles(0);
674 
675   mock::KVVector empty_map;
676   RunCompaction({files}, empty_map);
677 }
678 
TEST_F(CompactionJobTest,SimpleSingleDelete)679 TEST_F(CompactionJobTest, SimpleSingleDelete) {
680   NewDB();
681 
682   auto file1 = mock::MakeMockFile({
683       {KeyStr("a", 5U, kTypeDeletion), ""},
684       {KeyStr("b", 6U, kTypeSingleDeletion), ""},
685   });
686   AddMockFile(file1);
687 
688   auto file2 = mock::MakeMockFile({{KeyStr("a", 3U, kTypeValue), "val"},
689                                    {KeyStr("b", 4U, kTypeValue), "val"}});
690   AddMockFile(file2);
691 
692   auto file3 = mock::MakeMockFile({
693       {KeyStr("a", 1U, kTypeValue), "val"},
694   });
695   AddMockFile(file3, 2);
696 
697   auto expected_results =
698       mock::MakeMockFile({{KeyStr("a", 5U, kTypeDeletion), ""}});
699 
700   SetLastSequence(6U);
701   auto files = cfd_->current()->storage_info()->LevelFiles(0);
702   RunCompaction({files}, expected_results);
703 }
704 
TEST_F(CompactionJobTest,SingleDeleteSnapshots)705 TEST_F(CompactionJobTest, SingleDeleteSnapshots) {
706   NewDB();
707 
708   auto file1 = mock::MakeMockFile({
709       {KeyStr("A", 12U, kTypeSingleDeletion), ""},
710       {KeyStr("a", 12U, kTypeSingleDeletion), ""},
711       {KeyStr("b", 21U, kTypeSingleDeletion), ""},
712       {KeyStr("c", 22U, kTypeSingleDeletion), ""},
713       {KeyStr("d", 9U, kTypeSingleDeletion), ""},
714       {KeyStr("f", 21U, kTypeSingleDeletion), ""},
715       {KeyStr("j", 11U, kTypeSingleDeletion), ""},
716       {KeyStr("j", 9U, kTypeSingleDeletion), ""},
717       {KeyStr("k", 12U, kTypeSingleDeletion), ""},
718       {KeyStr("k", 11U, kTypeSingleDeletion), ""},
719       {KeyStr("l", 3U, kTypeSingleDeletion), ""},
720       {KeyStr("l", 2U, kTypeSingleDeletion), ""},
721   });
722   AddMockFile(file1);
723 
724   auto file2 = mock::MakeMockFile({
725       {KeyStr("0", 2U, kTypeSingleDeletion), ""},
726       {KeyStr("a", 11U, kTypeValue), "val1"},
727       {KeyStr("b", 11U, kTypeValue), "val2"},
728       {KeyStr("c", 21U, kTypeValue), "val3"},
729       {KeyStr("d", 8U, kTypeValue), "val4"},
730       {KeyStr("e", 2U, kTypeSingleDeletion), ""},
731       {KeyStr("f", 1U, kTypeValue), "val1"},
732       {KeyStr("g", 11U, kTypeSingleDeletion), ""},
733       {KeyStr("h", 2U, kTypeSingleDeletion), ""},
734       {KeyStr("m", 12U, kTypeValue), "val1"},
735       {KeyStr("m", 11U, kTypeSingleDeletion), ""},
736       {KeyStr("m", 8U, kTypeValue), "val2"},
737   });
738   AddMockFile(file2);
739 
740   auto file3 = mock::MakeMockFile({
741       {KeyStr("A", 1U, kTypeValue), "val"},
742       {KeyStr("e", 1U, kTypeValue), "val"},
743   });
744   AddMockFile(file3, 2);
745 
746   auto expected_results = mock::MakeMockFile({
747       {KeyStr("A", 12U, kTypeSingleDeletion), ""},
748       {KeyStr("a", 12U, kTypeSingleDeletion), ""},
749       {KeyStr("a", 11U, kTypeValue), ""},
750       {KeyStr("b", 21U, kTypeSingleDeletion), ""},
751       {KeyStr("b", 11U, kTypeValue), "val2"},
752       {KeyStr("c", 22U, kTypeSingleDeletion), ""},
753       {KeyStr("c", 21U, kTypeValue), ""},
754       {KeyStr("e", 2U, kTypeSingleDeletion), ""},
755       {KeyStr("f", 21U, kTypeSingleDeletion), ""},
756       {KeyStr("f", 1U, kTypeValue), "val1"},
757       {KeyStr("g", 11U, kTypeSingleDeletion), ""},
758       {KeyStr("j", 11U, kTypeSingleDeletion), ""},
759       {KeyStr("k", 11U, kTypeSingleDeletion), ""},
760       {KeyStr("m", 12U, kTypeValue), "val1"},
761       {KeyStr("m", 11U, kTypeSingleDeletion), ""},
762       {KeyStr("m", 8U, kTypeValue), "val2"},
763   });
764 
765   SetLastSequence(22U);
766   auto files = cfd_->current()->storage_info()->LevelFiles(0);
767   RunCompaction({files}, expected_results, {10U, 20U}, 10U);
768 }
769 
TEST_F(CompactionJobTest,EarliestWriteConflictSnapshot)770 TEST_F(CompactionJobTest, EarliestWriteConflictSnapshot) {
771   NewDB();
772 
773   // Test multiple snapshots where the earliest snapshot is not a
774   // write-conflic-snapshot.
775 
776   auto file1 = mock::MakeMockFile({
777       {KeyStr("A", 24U, kTypeSingleDeletion), ""},
778       {KeyStr("A", 23U, kTypeValue), "val"},
779       {KeyStr("B", 24U, kTypeSingleDeletion), ""},
780       {KeyStr("B", 23U, kTypeValue), "val"},
781       {KeyStr("D", 24U, kTypeSingleDeletion), ""},
782       {KeyStr("G", 32U, kTypeSingleDeletion), ""},
783       {KeyStr("G", 31U, kTypeValue), "val"},
784       {KeyStr("G", 24U, kTypeSingleDeletion), ""},
785       {KeyStr("G", 23U, kTypeValue), "val2"},
786       {KeyStr("H", 31U, kTypeValue), "val"},
787       {KeyStr("H", 24U, kTypeSingleDeletion), ""},
788       {KeyStr("H", 23U, kTypeValue), "val"},
789       {KeyStr("I", 35U, kTypeSingleDeletion), ""},
790       {KeyStr("I", 34U, kTypeValue), "val2"},
791       {KeyStr("I", 33U, kTypeSingleDeletion), ""},
792       {KeyStr("I", 32U, kTypeValue), "val3"},
793       {KeyStr("I", 31U, kTypeSingleDeletion), ""},
794       {KeyStr("J", 34U, kTypeValue), "val"},
795       {KeyStr("J", 33U, kTypeSingleDeletion), ""},
796       {KeyStr("J", 25U, kTypeValue), "val2"},
797       {KeyStr("J", 24U, kTypeSingleDeletion), ""},
798   });
799   AddMockFile(file1);
800 
801   auto file2 = mock::MakeMockFile({
802       {KeyStr("A", 14U, kTypeSingleDeletion), ""},
803       {KeyStr("A", 13U, kTypeValue), "val2"},
804       {KeyStr("C", 14U, kTypeSingleDeletion), ""},
805       {KeyStr("C", 13U, kTypeValue), "val"},
806       {KeyStr("E", 12U, kTypeSingleDeletion), ""},
807       {KeyStr("F", 4U, kTypeSingleDeletion), ""},
808       {KeyStr("F", 3U, kTypeValue), "val"},
809       {KeyStr("G", 14U, kTypeSingleDeletion), ""},
810       {KeyStr("G", 13U, kTypeValue), "val3"},
811       {KeyStr("H", 14U, kTypeSingleDeletion), ""},
812       {KeyStr("H", 13U, kTypeValue), "val2"},
813       {KeyStr("I", 13U, kTypeValue), "val4"},
814       {KeyStr("I", 12U, kTypeSingleDeletion), ""},
815       {KeyStr("I", 11U, kTypeValue), "val5"},
816       {KeyStr("J", 15U, kTypeValue), "val3"},
817       {KeyStr("J", 14U, kTypeSingleDeletion), ""},
818   });
819   AddMockFile(file2);
820 
821   auto expected_results = mock::MakeMockFile({
822       {KeyStr("A", 24U, kTypeSingleDeletion), ""},
823       {KeyStr("A", 23U, kTypeValue), ""},
824       {KeyStr("B", 24U, kTypeSingleDeletion), ""},
825       {KeyStr("B", 23U, kTypeValue), ""},
826       {KeyStr("D", 24U, kTypeSingleDeletion), ""},
827       {KeyStr("E", 12U, kTypeSingleDeletion), ""},
828       {KeyStr("G", 32U, kTypeSingleDeletion), ""},
829       {KeyStr("G", 31U, kTypeValue), ""},
830       {KeyStr("H", 31U, kTypeValue), "val"},
831       {KeyStr("I", 35U, kTypeSingleDeletion), ""},
832       {KeyStr("I", 34U, kTypeValue), ""},
833       {KeyStr("I", 31U, kTypeSingleDeletion), ""},
834       {KeyStr("I", 13U, kTypeValue), "val4"},
835       {KeyStr("J", 34U, kTypeValue), "val"},
836       {KeyStr("J", 33U, kTypeSingleDeletion), ""},
837       {KeyStr("J", 25U, kTypeValue), "val2"},
838       {KeyStr("J", 24U, kTypeSingleDeletion), ""},
839       {KeyStr("J", 15U, kTypeValue), "val3"},
840       {KeyStr("J", 14U, kTypeSingleDeletion), ""},
841   });
842 
843   SetLastSequence(24U);
844   auto files = cfd_->current()->storage_info()->LevelFiles(0);
845   RunCompaction({files}, expected_results, {10U, 20U, 30U}, 20U);
846 }
847 
TEST_F(CompactionJobTest,SingleDeleteZeroSeq)848 TEST_F(CompactionJobTest, SingleDeleteZeroSeq) {
849   NewDB();
850 
851   auto file1 = mock::MakeMockFile({
852       {KeyStr("A", 10U, kTypeSingleDeletion), ""},
853       {KeyStr("dummy", 5U, kTypeValue), "val2"},
854   });
855   AddMockFile(file1);
856 
857   auto file2 = mock::MakeMockFile({
858       {KeyStr("A", 0U, kTypeValue), "val"},
859   });
860   AddMockFile(file2);
861 
862   auto expected_results = mock::MakeMockFile({
863       {KeyStr("dummy", 0U, kTypeValue), "val2"},
864   });
865 
866   SetLastSequence(22U);
867   auto files = cfd_->current()->storage_info()->LevelFiles(0);
868   RunCompaction({files}, expected_results, {});
869 }
870 
TEST_F(CompactionJobTest,MultiSingleDelete)871 TEST_F(CompactionJobTest, MultiSingleDelete) {
872   // Tests three scenarios involving multiple single delete/put pairs:
873   //
874   // A: Put Snapshot SDel Put SDel -> Put Snapshot SDel
875   // B: Snapshot Put SDel Put SDel Snapshot -> Snapshot SDel Snapshot
876   // C: SDel Put SDel Snapshot Put -> Snapshot Put
877   // D: (Put) SDel Snapshot Put SDel -> (Put) SDel Snapshot SDel
878   // E: Put SDel Snapshot Put SDel -> Snapshot SDel
879   // F: Put SDel Put Sdel Snapshot -> removed
880   // G: Snapshot SDel Put SDel Put -> Snapshot Put SDel
881   // H: (Put) Put SDel Put Sdel Snapshot -> Removed
882   // I: (Put) Snapshot Put SDel Put SDel -> SDel
883   // J: Put Put SDel Put SDel SDel Snapshot Put Put SDel SDel Put
884   //      -> Snapshot Put
885   // K: SDel SDel Put SDel Put Put Snapshot SDel Put SDel SDel Put SDel
886   //      -> Snapshot Put Snapshot SDel
887   // L: SDel Put Del Put SDel Snapshot Del Put Del SDel Put SDel
888   //      -> Snapshot SDel
889   // M: (Put) SDel Put Del Put SDel Snapshot Put Del SDel Put SDel Del
890   //      -> SDel Snapshot Del
891   NewDB();
892 
893   auto file1 = mock::MakeMockFile({
894       {KeyStr("A", 14U, kTypeSingleDeletion), ""},
895       {KeyStr("A", 13U, kTypeValue), "val5"},
896       {KeyStr("A", 12U, kTypeSingleDeletion), ""},
897       {KeyStr("B", 14U, kTypeSingleDeletion), ""},
898       {KeyStr("B", 13U, kTypeValue), "val2"},
899       {KeyStr("C", 14U, kTypeValue), "val3"},
900       {KeyStr("D", 12U, kTypeSingleDeletion), ""},
901       {KeyStr("D", 11U, kTypeValue), "val4"},
902       {KeyStr("G", 15U, kTypeValue), "val"},
903       {KeyStr("G", 14U, kTypeSingleDeletion), ""},
904       {KeyStr("G", 13U, kTypeValue), "val"},
905       {KeyStr("I", 14U, kTypeSingleDeletion), ""},
906       {KeyStr("I", 13U, kTypeValue), "val"},
907       {KeyStr("J", 15U, kTypeValue), "val"},
908       {KeyStr("J", 14U, kTypeSingleDeletion), ""},
909       {KeyStr("J", 13U, kTypeSingleDeletion), ""},
910       {KeyStr("J", 12U, kTypeValue), "val"},
911       {KeyStr("J", 11U, kTypeValue), "val"},
912       {KeyStr("K", 16U, kTypeSingleDeletion), ""},
913       {KeyStr("K", 15U, kTypeValue), "val1"},
914       {KeyStr("K", 14U, kTypeSingleDeletion), ""},
915       {KeyStr("K", 13U, kTypeSingleDeletion), ""},
916       {KeyStr("K", 12U, kTypeValue), "val2"},
917       {KeyStr("K", 11U, kTypeSingleDeletion), ""},
918       {KeyStr("L", 16U, kTypeSingleDeletion), ""},
919       {KeyStr("L", 15U, kTypeValue), "val"},
920       {KeyStr("L", 14U, kTypeSingleDeletion), ""},
921       {KeyStr("L", 13U, kTypeDeletion), ""},
922       {KeyStr("L", 12U, kTypeValue), "val"},
923       {KeyStr("L", 11U, kTypeDeletion), ""},
924       {KeyStr("M", 16U, kTypeDeletion), ""},
925       {KeyStr("M", 15U, kTypeSingleDeletion), ""},
926       {KeyStr("M", 14U, kTypeValue), "val"},
927       {KeyStr("M", 13U, kTypeSingleDeletion), ""},
928       {KeyStr("M", 12U, kTypeDeletion), ""},
929       {KeyStr("M", 11U, kTypeValue), "val"},
930   });
931   AddMockFile(file1);
932 
933   auto file2 = mock::MakeMockFile({
934       {KeyStr("A", 10U, kTypeValue), "val"},
935       {KeyStr("B", 12U, kTypeSingleDeletion), ""},
936       {KeyStr("B", 11U, kTypeValue), "val2"},
937       {KeyStr("C", 10U, kTypeSingleDeletion), ""},
938       {KeyStr("C", 9U, kTypeValue), "val6"},
939       {KeyStr("C", 8U, kTypeSingleDeletion), ""},
940       {KeyStr("D", 10U, kTypeSingleDeletion), ""},
941       {KeyStr("E", 12U, kTypeSingleDeletion), ""},
942       {KeyStr("E", 11U, kTypeValue), "val"},
943       {KeyStr("E", 5U, kTypeSingleDeletion), ""},
944       {KeyStr("E", 4U, kTypeValue), "val"},
945       {KeyStr("F", 6U, kTypeSingleDeletion), ""},
946       {KeyStr("F", 5U, kTypeValue), "val"},
947       {KeyStr("F", 4U, kTypeSingleDeletion), ""},
948       {KeyStr("F", 3U, kTypeValue), "val"},
949       {KeyStr("G", 12U, kTypeSingleDeletion), ""},
950       {KeyStr("H", 6U, kTypeSingleDeletion), ""},
951       {KeyStr("H", 5U, kTypeValue), "val"},
952       {KeyStr("H", 4U, kTypeSingleDeletion), ""},
953       {KeyStr("H", 3U, kTypeValue), "val"},
954       {KeyStr("I", 12U, kTypeSingleDeletion), ""},
955       {KeyStr("I", 11U, kTypeValue), "val"},
956       {KeyStr("J", 6U, kTypeSingleDeletion), ""},
957       {KeyStr("J", 5U, kTypeSingleDeletion), ""},
958       {KeyStr("J", 4U, kTypeValue), "val"},
959       {KeyStr("J", 3U, kTypeSingleDeletion), ""},
960       {KeyStr("J", 2U, kTypeValue), "val"},
961       {KeyStr("K", 8U, kTypeValue), "val3"},
962       {KeyStr("K", 7U, kTypeValue), "val4"},
963       {KeyStr("K", 6U, kTypeSingleDeletion), ""},
964       {KeyStr("K", 5U, kTypeValue), "val5"},
965       {KeyStr("K", 2U, kTypeSingleDeletion), ""},
966       {KeyStr("K", 1U, kTypeSingleDeletion), ""},
967       {KeyStr("L", 5U, kTypeSingleDeletion), ""},
968       {KeyStr("L", 4U, kTypeValue), "val"},
969       {KeyStr("L", 3U, kTypeDeletion), ""},
970       {KeyStr("L", 2U, kTypeValue), "val"},
971       {KeyStr("L", 1U, kTypeSingleDeletion), ""},
972       {KeyStr("M", 10U, kTypeSingleDeletion), ""},
973       {KeyStr("M", 7U, kTypeValue), "val"},
974       {KeyStr("M", 5U, kTypeDeletion), ""},
975       {KeyStr("M", 4U, kTypeValue), "val"},
976       {KeyStr("M", 3U, kTypeSingleDeletion), ""},
977   });
978   AddMockFile(file2);
979 
980   auto file3 = mock::MakeMockFile({
981       {KeyStr("D", 1U, kTypeValue), "val"},
982       {KeyStr("H", 1U, kTypeValue), "val"},
983       {KeyStr("I", 2U, kTypeValue), "val"},
984   });
985   AddMockFile(file3, 2);
986 
987   auto file4 = mock::MakeMockFile({
988       {KeyStr("M", 1U, kTypeValue), "val"},
989   });
990   AddMockFile(file4, 2);
991 
992   auto expected_results =
993       mock::MakeMockFile({{KeyStr("A", 14U, kTypeSingleDeletion), ""},
994                           {KeyStr("A", 13U, kTypeValue), ""},
995                           {KeyStr("A", 12U, kTypeSingleDeletion), ""},
996                           {KeyStr("A", 10U, kTypeValue), "val"},
997                           {KeyStr("B", 14U, kTypeSingleDeletion), ""},
998                           {KeyStr("B", 13U, kTypeValue), ""},
999                           {KeyStr("C", 14U, kTypeValue), "val3"},
1000                           {KeyStr("D", 12U, kTypeSingleDeletion), ""},
1001                           {KeyStr("D", 11U, kTypeValue), ""},
1002                           {KeyStr("D", 10U, kTypeSingleDeletion), ""},
1003                           {KeyStr("E", 12U, kTypeSingleDeletion), ""},
1004                           {KeyStr("E", 11U, kTypeValue), ""},
1005                           {KeyStr("G", 15U, kTypeValue), "val"},
1006                           {KeyStr("G", 12U, kTypeSingleDeletion), ""},
1007                           {KeyStr("I", 14U, kTypeSingleDeletion), ""},
1008                           {KeyStr("I", 13U, kTypeValue), ""},
1009                           {KeyStr("J", 15U, kTypeValue), "val"},
1010                           {KeyStr("K", 16U, kTypeSingleDeletion), ""},
1011                           {KeyStr("K", 15U, kTypeValue), ""},
1012                           {KeyStr("K", 11U, kTypeSingleDeletion), ""},
1013                           {KeyStr("K", 8U, kTypeValue), "val3"},
1014                           {KeyStr("L", 16U, kTypeSingleDeletion), ""},
1015                           {KeyStr("L", 15U, kTypeValue), ""},
1016                           {KeyStr("M", 16U, kTypeDeletion), ""},
1017                           {KeyStr("M", 3U, kTypeSingleDeletion), ""}});
1018 
1019   SetLastSequence(22U);
1020   auto files = cfd_->current()->storage_info()->LevelFiles(0);
1021   RunCompaction({files}, expected_results, {10U}, 10U);
1022 }
1023 
1024 // This test documents the behavior where a corrupt key follows a deletion or a
1025 // single deletion and the (single) deletion gets removed while the corrupt key
1026 // gets written out. TODO(noetzli): We probably want a better way to treat
1027 // corrupt keys.
TEST_F(CompactionJobTest,DISABLED_CorruptionAfterDeletion)1028 TEST_F(CompactionJobTest, DISABLED_CorruptionAfterDeletion) {
1029   NewDB();
1030 
1031   auto file1 =
1032       mock::MakeMockFile({{test::KeyStr("A", 6U, kTypeValue), "val3"},
1033                           {test::KeyStr("a", 5U, kTypeDeletion), ""},
1034                           {test::KeyStr("a", 4U, kTypeValue, true), "val"}});
1035   AddMockFile(file1);
1036 
1037   auto file2 =
1038       mock::MakeMockFile({{test::KeyStr("b", 3U, kTypeSingleDeletion), ""},
1039                           {test::KeyStr("b", 2U, kTypeValue, true), "val"},
1040                           {test::KeyStr("c", 1U, kTypeValue), "val2"}});
1041   AddMockFile(file2);
1042 
1043   auto expected_results =
1044       mock::MakeMockFile({{test::KeyStr("A", 0U, kTypeValue), "val3"},
1045                           {test::KeyStr("a", 0U, kTypeValue, true), "val"},
1046                           {test::KeyStr("b", 0U, kTypeValue, true), "val"},
1047                           {test::KeyStr("c", 0U, kTypeValue), "val2"}});
1048 
1049   SetLastSequence(6U);
1050   auto files = cfd_->current()->storage_info()->LevelFiles(0);
1051   RunCompaction({files}, expected_results);
1052 }
1053 
TEST_F(CompactionJobTest,OldestBlobFileNumber)1054 TEST_F(CompactionJobTest, OldestBlobFileNumber) {
1055   NewDB();
1056 
1057   // Note: blob1 is inlined TTL, so it will not be considered for the purposes
1058   // of identifying the oldest referenced blob file. Similarly, blob6 will be
1059   // ignored because it has TTL and hence refers to a TTL blob file.
1060   const stl_wrappers::KVMap::value_type blob1(
1061       KeyStr("a", 1U, kTypeBlobIndex), BlobStrInlinedTTL("foo", 1234567890ULL));
1062   const stl_wrappers::KVMap::value_type blob2(KeyStr("b", 2U, kTypeBlobIndex),
1063                                               BlobStr(59, 123456, 999));
1064   const stl_wrappers::KVMap::value_type blob3(KeyStr("c", 3U, kTypeBlobIndex),
1065                                               BlobStr(138, 1000, 1 << 8));
1066   auto file1 = mock::MakeMockFile({blob1, blob2, blob3});
1067   AddMockFile(file1);
1068 
1069   const stl_wrappers::KVMap::value_type blob4(KeyStr("d", 4U, kTypeBlobIndex),
1070                                               BlobStr(199, 3 << 10, 1 << 20));
1071   const stl_wrappers::KVMap::value_type blob5(KeyStr("e", 5U, kTypeBlobIndex),
1072                                               BlobStr(19, 6789, 333));
1073   const stl_wrappers::KVMap::value_type blob6(
1074       KeyStr("f", 6U, kTypeBlobIndex),
1075       BlobStrTTL(5, 2048, 1 << 7, 1234567890ULL));
1076   auto file2 = mock::MakeMockFile({blob4, blob5, blob6});
1077   AddMockFile(file2);
1078 
1079   const stl_wrappers::KVMap::value_type expected_blob1(
1080       KeyStr("a", 0U, kTypeBlobIndex), blob1.second);
1081   const stl_wrappers::KVMap::value_type expected_blob2(
1082       KeyStr("b", 0U, kTypeBlobIndex), blob2.second);
1083   const stl_wrappers::KVMap::value_type expected_blob3(
1084       KeyStr("c", 0U, kTypeBlobIndex), blob3.second);
1085   const stl_wrappers::KVMap::value_type expected_blob4(
1086       KeyStr("d", 0U, kTypeBlobIndex), blob4.second);
1087   const stl_wrappers::KVMap::value_type expected_blob5(
1088       KeyStr("e", 0U, kTypeBlobIndex), blob5.second);
1089   const stl_wrappers::KVMap::value_type expected_blob6(
1090       KeyStr("f", 0U, kTypeBlobIndex), blob6.second);
1091   auto expected_results =
1092       mock::MakeMockFile({expected_blob1, expected_blob2, expected_blob3,
1093                           expected_blob4, expected_blob5, expected_blob6});
1094 
1095   SetLastSequence(6U);
1096   auto files = cfd_->current()->storage_info()->LevelFiles(0);
1097   RunCompaction({files}, expected_results, std::vector<SequenceNumber>(),
1098                 kMaxSequenceNumber, /* output_level */ 1, /* verify */ true,
1099                 /* expected_oldest_blob_file_number */ 19);
1100 }
1101 
TEST_F(CompactionJobTest,InputSerialization)1102 TEST_F(CompactionJobTest, InputSerialization) {
1103   // Setup a random CompactionServiceInput
1104   CompactionServiceInput input;
1105   const int kStrMaxLen = 1000;
1106   Random rnd(static_cast<uint32_t>(time(nullptr)));
1107   Random64 rnd64(time(nullptr));
1108   input.column_family.name = rnd.RandomString(rnd.Uniform(kStrMaxLen));
1109   input.column_family.options.comparator = ReverseBytewiseComparator();
1110   input.column_family.options.max_bytes_for_level_base =
1111       rnd64.Uniform(UINT64_MAX);
1112   input.column_family.options.disable_auto_compactions = rnd.OneIn(2);
1113   input.column_family.options.compression = kZSTD;
1114   input.column_family.options.compression_opts.level = 4;
1115   input.db_options.max_background_flushes = 10;
1116   input.db_options.paranoid_checks = rnd.OneIn(2);
1117   input.db_options.statistics = CreateDBStatistics();
1118   input.db_options.env = env_;
1119   while (!rnd.OneIn(10)) {
1120     input.snapshots.emplace_back(rnd64.Uniform(UINT64_MAX));
1121   }
1122   while (!rnd.OneIn(10)) {
1123     input.input_files.emplace_back(rnd.RandomString(
1124         rnd.Uniform(kStrMaxLen - 1) +
1125         1));  // input file name should have at least one character
1126   }
1127   input.output_level = 4;
1128   input.has_begin = rnd.OneIn(2);
1129   if (input.has_begin) {
1130     input.begin = rnd.RandomBinaryString(rnd.Uniform(kStrMaxLen));
1131   }
1132   input.has_end = rnd.OneIn(2);
1133   if (input.has_end) {
1134     input.end = rnd.RandomBinaryString(rnd.Uniform(kStrMaxLen));
1135   }
1136   input.approx_size = rnd64.Uniform(UINT64_MAX);
1137 
1138   std::string output;
1139   ASSERT_OK(input.Write(&output));
1140 
1141   // Test deserialization
1142   CompactionServiceInput deserialized1;
1143   ASSERT_OK(CompactionServiceInput::Read(output, &deserialized1));
1144   ASSERT_TRUE(deserialized1.TEST_Equals(&input));
1145 
1146   // Test mismatch
1147   deserialized1.db_options.max_background_flushes += 10;
1148   std::string mismatch;
1149   ASSERT_FALSE(deserialized1.TEST_Equals(&input, &mismatch));
1150   ASSERT_EQ(mismatch, "db_options.max_background_flushes");
1151 
1152   // Test unknown field
1153   CompactionServiceInput deserialized2;
1154   output.clear();
1155   ASSERT_OK(input.Write(&output));
1156   output.append("new_field=123;");
1157 
1158   ASSERT_OK(CompactionServiceInput::Read(output, &deserialized2));
1159   ASSERT_TRUE(deserialized2.TEST_Equals(&input));
1160 
1161   // Test missing field
1162   CompactionServiceInput deserialized3;
1163   deserialized3.output_level = 0;
1164   std::string to_remove = "output_level=4;";
1165   size_t pos = output.find(to_remove);
1166   ASSERT_TRUE(pos != std::string::npos);
1167   output.erase(pos, to_remove.length());
1168   ASSERT_OK(CompactionServiceInput::Read(output, &deserialized3));
1169   mismatch.clear();
1170   ASSERT_FALSE(deserialized3.TEST_Equals(&input, &mismatch));
1171   ASSERT_EQ(mismatch, "output_level");
1172 
1173   // manually set the value back, should match the original structure
1174   deserialized3.output_level = 4;
1175   ASSERT_TRUE(deserialized3.TEST_Equals(&input));
1176 
1177   // Test invalid version
1178   output.clear();
1179   ASSERT_OK(input.Write(&output));
1180 
1181   uint32_t data_version = DecodeFixed32(output.data());
1182   const size_t kDataVersionSize = sizeof(data_version);
1183   ASSERT_EQ(data_version,
1184             1U);  // Update once the default data version is changed
1185   char buf[kDataVersionSize];
1186   EncodeFixed32(buf, data_version + 10);  // make sure it's not valid
1187   output.replace(0, kDataVersionSize, buf, kDataVersionSize);
1188   Status s = CompactionServiceInput::Read(output, &deserialized3);
1189   ASSERT_TRUE(s.IsNotSupported());
1190 }
1191 
TEST_F(CompactionJobTest,ResultSerialization)1192 TEST_F(CompactionJobTest, ResultSerialization) {
1193   // Setup a random CompactionServiceResult
1194   CompactionServiceResult result;
1195   const int kStrMaxLen = 1000;
1196   Random rnd(static_cast<uint32_t>(time(nullptr)));
1197   Random64 rnd64(time(nullptr));
1198   std::vector<Status> status_list = {
1199       Status::OK(),
1200       Status::InvalidArgument("invalid option"),
1201       Status::Aborted("failed to run"),
1202       Status::NotSupported("not supported option"),
1203   };
1204   result.status =
1205       status_list.at(rnd.Uniform(static_cast<int>(status_list.size())));
1206   while (!rnd.OneIn(10)) {
1207     result.output_files.emplace_back(
1208         rnd.RandomString(rnd.Uniform(kStrMaxLen)), rnd64.Uniform(UINT64_MAX),
1209         rnd64.Uniform(UINT64_MAX),
1210         rnd.RandomBinaryString(rnd.Uniform(kStrMaxLen)),
1211         rnd.RandomBinaryString(rnd.Uniform(kStrMaxLen)),
1212         rnd64.Uniform(UINT64_MAX), rnd64.Uniform(UINT64_MAX),
1213         rnd64.Uniform(UINT64_MAX), rnd.OneIn(2));
1214   }
1215   result.output_level = rnd.Uniform(10);
1216   result.output_path = rnd.RandomString(rnd.Uniform(kStrMaxLen));
1217   result.num_output_records = rnd64.Uniform(UINT64_MAX);
1218   result.total_bytes = rnd64.Uniform(UINT64_MAX);
1219   result.bytes_read = 123;
1220   result.bytes_written = rnd64.Uniform(UINT64_MAX);
1221   result.stats.elapsed_micros = rnd64.Uniform(UINT64_MAX);
1222   result.stats.num_output_files = rnd.Uniform(1000);
1223   result.stats.is_full_compaction = rnd.OneIn(2);
1224   result.stats.num_single_del_mismatch = rnd64.Uniform(UINT64_MAX);
1225   result.stats.num_input_files = 9;
1226 
1227   std::string output;
1228   ASSERT_OK(result.Write(&output));
1229 
1230   // Test deserialization
1231   CompactionServiceResult deserialized1;
1232   ASSERT_OK(CompactionServiceResult::Read(output, &deserialized1));
1233   ASSERT_TRUE(deserialized1.TEST_Equals(&result));
1234 
1235   // Test mismatch
1236   deserialized1.stats.num_input_files += 10;
1237   std::string mismatch;
1238   ASSERT_FALSE(deserialized1.TEST_Equals(&result, &mismatch));
1239   ASSERT_EQ(mismatch, "stats.num_input_files");
1240 
1241   // Test unknown field
1242   CompactionServiceResult deserialized2;
1243   output.clear();
1244   ASSERT_OK(result.Write(&output));
1245   output.append("new_field=123;");
1246 
1247   ASSERT_OK(CompactionServiceResult::Read(output, &deserialized2));
1248   ASSERT_TRUE(deserialized2.TEST_Equals(&result));
1249 
1250   // Test missing field
1251   CompactionServiceResult deserialized3;
1252   deserialized3.bytes_read = 0;
1253   std::string to_remove = "bytes_read=123;";
1254   size_t pos = output.find(to_remove);
1255   ASSERT_TRUE(pos != std::string::npos);
1256   output.erase(pos, to_remove.length());
1257   ASSERT_OK(CompactionServiceResult::Read(output, &deserialized3));
1258   mismatch.clear();
1259   ASSERT_FALSE(deserialized3.TEST_Equals(&result, &mismatch));
1260   ASSERT_EQ(mismatch, "bytes_read");
1261 
1262   deserialized3.bytes_read = 123;
1263   ASSERT_TRUE(deserialized3.TEST_Equals(&result));
1264 
1265   // Test invalid version
1266   output.clear();
1267   ASSERT_OK(result.Write(&output));
1268 
1269   uint32_t data_version = DecodeFixed32(output.data());
1270   const size_t kDataVersionSize = sizeof(data_version);
1271   ASSERT_EQ(data_version,
1272             1U);  // Update once the default data version is changed
1273   char buf[kDataVersionSize];
1274   EncodeFixed32(buf, data_version + 10);  // make sure it's not valid
1275   output.replace(0, kDataVersionSize, buf, kDataVersionSize);
1276   Status s = CompactionServiceResult::Read(output, &deserialized3);
1277   ASSERT_TRUE(s.IsNotSupported());
1278   for (const auto& item : status_list) {
1279     item.PermitUncheckedError();
1280   }
1281 }
1282 
1283 class CompactionJobTimestampTest : public CompactionJobTestBase {
1284  public:
CompactionJobTimestampTest()1285   CompactionJobTimestampTest()
1286       : CompactionJobTestBase(test::PerThreadDBPath("compaction_job_ts_test"),
1287                               test::ComparatorWithU64Ts(), test::EncodeInt) {}
1288 };
1289 
TEST_F(CompactionJobTimestampTest,GCDisabled)1290 TEST_F(CompactionJobTimestampTest, GCDisabled) {
1291   NewDB();
1292 
1293   auto file1 =
1294       mock::MakeMockFile({{KeyStr("a", 10, ValueType::kTypeValue, 100), "a10"},
1295                           {KeyStr("a", 9, ValueType::kTypeValue, 99), "a9"},
1296                           {KeyStr("b", 8, ValueType::kTypeValue, 98), "b8"},
1297                           {KeyStr("d", 7, ValueType::kTypeValue, 97), "d7"}});
1298 
1299   AddMockFile(file1);
1300 
1301   auto file2 = mock::MakeMockFile(
1302       {{KeyStr("b", 6, ValueType::kTypeDeletionWithTimestamp, 96), ""},
1303        {KeyStr("c", 5, ValueType::kTypeDeletionWithTimestamp, 95), ""},
1304        {KeyStr("c", 4, ValueType::kTypeValue, 94), "c5"},
1305        {KeyStr("d", 3, ValueType::kTypeSingleDeletion, 93), ""}});
1306   AddMockFile(file2);
1307 
1308   SetLastSequence(10);
1309 
1310   auto expected_results = mock::MakeMockFile(
1311       {{KeyStr("a", 10, ValueType::kTypeValue, 100), "a10"},
1312        {KeyStr("a", 9, ValueType::kTypeValue, 99), "a9"},
1313        {KeyStr("b", 8, ValueType::kTypeValue, 98), "b8"},
1314        {KeyStr("b", 6, ValueType::kTypeDeletionWithTimestamp, 96), ""},
1315        {KeyStr("c", 5, ValueType::kTypeDeletionWithTimestamp, 95), ""},
1316        {KeyStr("c", 4, ValueType::kTypeValue, 94), "c5"},
1317        {KeyStr("d", 7, ValueType::kTypeValue, 97), "d7"},
1318        {KeyStr("d", 3, ValueType::kTypeSingleDeletion, 93), ""}});
1319   const auto& files = cfd_->current()->storage_info()->LevelFiles(0);
1320   RunCompaction({files}, expected_results);
1321 }
1322 
TEST_F(CompactionJobTimestampTest,NoKeyExpired)1323 TEST_F(CompactionJobTimestampTest, NoKeyExpired) {
1324   NewDB();
1325 
1326   auto file1 =
1327       mock::MakeMockFile({{KeyStr("a", 6, ValueType::kTypeValue, 100), "a6"},
1328                           {KeyStr("b", 7, ValueType::kTypeValue, 101), "b7"},
1329                           {KeyStr("c", 5, ValueType::kTypeValue, 99), "c5"}});
1330   AddMockFile(file1);
1331 
1332   auto file2 =
1333       mock::MakeMockFile({{KeyStr("a", 4, ValueType::kTypeValue, 98), "a4"},
1334                           {KeyStr("c", 3, ValueType::kTypeValue, 97), "c3"}});
1335   AddMockFile(file2);
1336 
1337   SetLastSequence(101);
1338 
1339   auto expected_results =
1340       mock::MakeMockFile({{KeyStr("a", 6, ValueType::kTypeValue, 100), "a6"},
1341                           {KeyStr("a", 4, ValueType::kTypeValue, 98), "a4"},
1342                           {KeyStr("b", 7, ValueType::kTypeValue, 101), "b7"},
1343                           {KeyStr("c", 5, ValueType::kTypeValue, 99), "c5"},
1344                           {KeyStr("c", 3, ValueType::kTypeValue, 97), "c3"}});
1345   const auto& files = cfd_->current()->storage_info()->LevelFiles(0);
1346 
1347   full_history_ts_low_ = encode_u64_ts_(0);
1348   RunCompaction({files}, expected_results);
1349 }
1350 
TEST_F(CompactionJobTimestampTest,AllKeysExpired)1351 TEST_F(CompactionJobTimestampTest, AllKeysExpired) {
1352   NewDB();
1353 
1354   auto file1 = mock::MakeMockFile(
1355       {{KeyStr("a", 5, ValueType::kTypeDeletionWithTimestamp, 100), ""},
1356        {KeyStr("b", 6, ValueType::kTypeSingleDeletion, 99), ""},
1357        {KeyStr("c", 7, ValueType::kTypeValue, 98), "c7"}});
1358   AddMockFile(file1);
1359 
1360   auto file2 = mock::MakeMockFile(
1361       {{KeyStr("a", 4, ValueType::kTypeValue, 97), "a4"},
1362        {KeyStr("b", 3, ValueType::kTypeValue, 96), "b3"},
1363        {KeyStr("c", 2, ValueType::kTypeDeletionWithTimestamp, 95), ""},
1364        {KeyStr("c", 1, ValueType::kTypeValue, 94), "c1"}});
1365   AddMockFile(file2);
1366 
1367   SetLastSequence(7);
1368 
1369   auto expected_results =
1370       mock::MakeMockFile({{KeyStr("c", 0, ValueType::kTypeValue, 0), "c7"}});
1371   const auto& files = cfd_->current()->storage_info()->LevelFiles(0);
1372 
1373   full_history_ts_low_ = encode_u64_ts_(std::numeric_limits<uint64_t>::max());
1374   RunCompaction({files}, expected_results);
1375 }
1376 
TEST_F(CompactionJobTimestampTest,SomeKeysExpired)1377 TEST_F(CompactionJobTimestampTest, SomeKeysExpired) {
1378   NewDB();
1379 
1380   auto file1 =
1381       mock::MakeMockFile({{KeyStr("a", 5, ValueType::kTypeValue, 50), "a5"},
1382                           {KeyStr("b", 6, ValueType::kTypeValue, 49), "b6"}});
1383   AddMockFile(file1);
1384 
1385   auto file2 = mock::MakeMockFile(
1386       {{KeyStr("a", 3, ValueType::kTypeValue, 48), "a3"},
1387        {KeyStr("a", 2, ValueType::kTypeValue, 46), "a2"},
1388        {KeyStr("b", 4, ValueType::kTypeDeletionWithTimestamp, 47), ""}});
1389   AddMockFile(file2);
1390 
1391   SetLastSequence(6);
1392 
1393   auto expected_results =
1394       mock::MakeMockFile({{KeyStr("a", 5, ValueType::kTypeValue, 50), "a5"},
1395                           {KeyStr("b", 6, ValueType::kTypeValue, 49), "b6"}});
1396   const auto& files = cfd_->current()->storage_info()->LevelFiles(0);
1397 
1398   full_history_ts_low_ = encode_u64_ts_(49);
1399   RunCompaction({files}, expected_results);
1400 }
1401 
1402 }  // namespace ROCKSDB_NAMESPACE
1403 
main(int argc,char ** argv)1404 int main(int argc, char** argv) {
1405   ::testing::InitGoogleTest(&argc, argv);
1406   return RUN_ALL_TESTS();
1407 }
1408 
1409 #else
1410 #include <stdio.h>
1411 
main(int,char **)1412 int main(int /*argc*/, char** /*argv*/) {
1413   fprintf(stderr,
1414           "SKIPPED as CompactionJobStats is not supported in ROCKSDB_LITE\n");
1415   return 0;
1416 }
1417 
1418 #endif  // ROCKSDB_LITE
1419