1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include <cinttypes>
11 #include <vector>
12 
13 #include "db/column_family.h"
14 #include "db/compaction/compaction.h"
15 #include "rocksdb/compaction_filter.h"
16 #include "test_util/sync_point.h"
17 #include "util/string_util.h"
18 
19 namespace ROCKSDB_NAMESPACE {
20 
21 const uint64_t kRangeTombstoneSentinel =
22     PackSequenceAndType(kMaxSequenceNumber, kTypeRangeDeletion);
23 
sstableKeyCompare(const Comparator * user_cmp,const InternalKey & a,const InternalKey & b)24 int sstableKeyCompare(const Comparator* user_cmp, const InternalKey& a,
25                       const InternalKey& b) {
26   auto c = user_cmp->Compare(a.user_key(), b.user_key());
27   if (c != 0) {
28     return c;
29   }
30   auto a_footer = ExtractInternalKeyFooter(a.Encode());
31   auto b_footer = ExtractInternalKeyFooter(b.Encode());
32   if (a_footer == kRangeTombstoneSentinel) {
33     if (b_footer != kRangeTombstoneSentinel) {
34       return -1;
35     }
36   } else if (b_footer == kRangeTombstoneSentinel) {
37     return 1;
38   }
39   return 0;
40 }
41 
sstableKeyCompare(const Comparator * user_cmp,const InternalKey * a,const InternalKey & b)42 int sstableKeyCompare(const Comparator* user_cmp, const InternalKey* a,
43                       const InternalKey& b) {
44   if (a == nullptr) {
45     return -1;
46   }
47   return sstableKeyCompare(user_cmp, *a, b);
48 }
49 
sstableKeyCompare(const Comparator * user_cmp,const InternalKey & a,const InternalKey * b)50 int sstableKeyCompare(const Comparator* user_cmp, const InternalKey& a,
51                       const InternalKey* b) {
52   if (b == nullptr) {
53     return -1;
54   }
55   return sstableKeyCompare(user_cmp, a, *b);
56 }
57 
TotalFileSize(const std::vector<FileMetaData * > & files)58 uint64_t TotalFileSize(const std::vector<FileMetaData*>& files) {
59   uint64_t sum = 0;
60   for (size_t i = 0; i < files.size() && files[i]; i++) {
61     sum += files[i]->fd.GetFileSize();
62   }
63   return sum;
64 }
65 
SetInputVersion(Version * _input_version)66 void Compaction::SetInputVersion(Version* _input_version) {
67   input_version_ = _input_version;
68   cfd_ = input_version_->cfd();
69 
70   cfd_->Ref();
71   input_version_->Ref();
72   edit_.SetColumnFamily(cfd_->GetID());
73 }
74 
GetBoundaryKeys(VersionStorageInfo * vstorage,const std::vector<CompactionInputFiles> & inputs,Slice * smallest_user_key,Slice * largest_user_key)75 void Compaction::GetBoundaryKeys(
76     VersionStorageInfo* vstorage,
77     const std::vector<CompactionInputFiles>& inputs, Slice* smallest_user_key,
78     Slice* largest_user_key) {
79   bool initialized = false;
80   const Comparator* ucmp = vstorage->InternalComparator()->user_comparator();
81   for (size_t i = 0; i < inputs.size(); ++i) {
82     if (inputs[i].files.empty()) {
83       continue;
84     }
85     if (inputs[i].level == 0) {
86       // we need to consider all files on level 0
87       for (const auto* f : inputs[i].files) {
88         const Slice& start_user_key = f->smallest.user_key();
89         if (!initialized ||
90             ucmp->Compare(start_user_key, *smallest_user_key) < 0) {
91           *smallest_user_key = start_user_key;
92         }
93         const Slice& end_user_key = f->largest.user_key();
94         if (!initialized ||
95             ucmp->Compare(end_user_key, *largest_user_key) > 0) {
96           *largest_user_key = end_user_key;
97         }
98         initialized = true;
99       }
100     } else {
101       // we only need to consider the first and last file
102       const Slice& start_user_key = inputs[i].files[0]->smallest.user_key();
103       if (!initialized ||
104           ucmp->Compare(start_user_key, *smallest_user_key) < 0) {
105         *smallest_user_key = start_user_key;
106       }
107       const Slice& end_user_key = inputs[i].files.back()->largest.user_key();
108       if (!initialized || ucmp->Compare(end_user_key, *largest_user_key) > 0) {
109         *largest_user_key = end_user_key;
110       }
111       initialized = true;
112     }
113   }
114 }
115 
PopulateWithAtomicBoundaries(VersionStorageInfo * vstorage,std::vector<CompactionInputFiles> inputs)116 std::vector<CompactionInputFiles> Compaction::PopulateWithAtomicBoundaries(
117     VersionStorageInfo* vstorage, std::vector<CompactionInputFiles> inputs) {
118   const Comparator* ucmp = vstorage->InternalComparator()->user_comparator();
119   for (size_t i = 0; i < inputs.size(); i++) {
120     if (inputs[i].level == 0 || inputs[i].files.empty()) {
121       continue;
122     }
123     inputs[i].atomic_compaction_unit_boundaries.reserve(inputs[i].files.size());
124     AtomicCompactionUnitBoundary cur_boundary;
125     size_t first_atomic_idx = 0;
126     auto add_unit_boundary = [&](size_t to) {
127       if (first_atomic_idx == to) return;
128       for (size_t k = first_atomic_idx; k < to; k++) {
129         inputs[i].atomic_compaction_unit_boundaries.push_back(cur_boundary);
130       }
131       first_atomic_idx = to;
132     };
133     for (size_t j = 0; j < inputs[i].files.size(); j++) {
134       const auto* f = inputs[i].files[j];
135       if (j == 0) {
136         // First file in a level.
137         cur_boundary.smallest = &f->smallest;
138         cur_boundary.largest = &f->largest;
139       } else if (sstableKeyCompare(ucmp, *cur_boundary.largest, f->smallest) ==
140                  0) {
141         // SSTs overlap but the end key of the previous file was not
142         // artificially extended by a range tombstone. Extend the current
143         // boundary.
144         cur_boundary.largest = &f->largest;
145       } else {
146         // Atomic compaction unit has ended.
147         add_unit_boundary(j);
148         cur_boundary.smallest = &f->smallest;
149         cur_boundary.largest = &f->largest;
150       }
151     }
152     add_unit_boundary(inputs[i].files.size());
153     assert(inputs[i].files.size() ==
154            inputs[i].atomic_compaction_unit_boundaries.size());
155   }
156   return inputs;
157 }
158 
159 // helper function to determine if compaction is creating files at the
160 // bottommost level
IsBottommostLevel(int output_level,VersionStorageInfo * vstorage,const std::vector<CompactionInputFiles> & inputs)161 bool Compaction::IsBottommostLevel(
162     int output_level, VersionStorageInfo* vstorage,
163     const std::vector<CompactionInputFiles>& inputs) {
164   int output_l0_idx;
165   if (output_level == 0) {
166     output_l0_idx = 0;
167     for (const auto* file : vstorage->LevelFiles(0)) {
168       if (inputs[0].files.back() == file) {
169         break;
170       }
171       ++output_l0_idx;
172     }
173     assert(static_cast<size_t>(output_l0_idx) < vstorage->LevelFiles(0).size());
174   } else {
175     output_l0_idx = -1;
176   }
177   Slice smallest_key, largest_key;
178   GetBoundaryKeys(vstorage, inputs, &smallest_key, &largest_key);
179   return !vstorage->RangeMightExistAfterSortedRun(smallest_key, largest_key,
180                                                   output_level, output_l0_idx);
181 }
182 
183 // test function to validate the functionality of IsBottommostLevel()
184 // function -- determines if compaction with inputs and storage is bottommost
TEST_IsBottommostLevel(int output_level,VersionStorageInfo * vstorage,const std::vector<CompactionInputFiles> & inputs)185 bool Compaction::TEST_IsBottommostLevel(
186     int output_level, VersionStorageInfo* vstorage,
187     const std::vector<CompactionInputFiles>& inputs) {
188   return IsBottommostLevel(output_level, vstorage, inputs);
189 }
190 
IsFullCompaction(VersionStorageInfo * vstorage,const std::vector<CompactionInputFiles> & inputs)191 bool Compaction::IsFullCompaction(
192     VersionStorageInfo* vstorage,
193     const std::vector<CompactionInputFiles>& inputs) {
194   size_t num_files_in_compaction = 0;
195   size_t total_num_files = 0;
196   for (int l = 0; l < vstorage->num_levels(); l++) {
197     total_num_files += vstorage->NumLevelFiles(l);
198   }
199   for (size_t i = 0; i < inputs.size(); i++) {
200     num_files_in_compaction += inputs[i].size();
201   }
202   return num_files_in_compaction == total_num_files;
203 }
204 
Compaction(VersionStorageInfo * vstorage,const ImmutableCFOptions & _immutable_cf_options,const MutableCFOptions & _mutable_cf_options,std::vector<CompactionInputFiles> _inputs,int _output_level,uint64_t _target_file_size,uint64_t _max_compaction_bytes,uint32_t _output_path_id,CompressionType _compression,CompressionOptions _compression_opts,uint32_t _max_subcompactions,std::vector<FileMetaData * > _grandparents,bool _manual_compaction,double _score,bool _deletion_compaction,CompactionReason _compaction_reason)205 Compaction::Compaction(VersionStorageInfo* vstorage,
206                        const ImmutableCFOptions& _immutable_cf_options,
207                        const MutableCFOptions& _mutable_cf_options,
208                        std::vector<CompactionInputFiles> _inputs,
209                        int _output_level, uint64_t _target_file_size,
210                        uint64_t _max_compaction_bytes, uint32_t _output_path_id,
211                        CompressionType _compression,
212                        CompressionOptions _compression_opts,
213                        uint32_t _max_subcompactions,
214                        std::vector<FileMetaData*> _grandparents,
215                        bool _manual_compaction, double _score,
216                        bool _deletion_compaction,
217                        CompactionReason _compaction_reason)
218     : input_vstorage_(vstorage),
219       start_level_(_inputs[0].level),
220       output_level_(_output_level),
221       max_output_file_size_(_target_file_size),
222       max_compaction_bytes_(_max_compaction_bytes),
223       max_subcompactions_(_max_subcompactions),
224       immutable_cf_options_(_immutable_cf_options),
225       mutable_cf_options_(_mutable_cf_options),
226       input_version_(nullptr),
227       number_levels_(vstorage->num_levels()),
228       cfd_(nullptr),
229       output_path_id_(_output_path_id),
230       output_compression_(_compression),
231       output_compression_opts_(_compression_opts),
232       deletion_compaction_(_deletion_compaction),
233       inputs_(PopulateWithAtomicBoundaries(vstorage, std::move(_inputs))),
234       grandparents_(std::move(_grandparents)),
235       score_(_score),
236       bottommost_level_(IsBottommostLevel(output_level_, vstorage, inputs_)),
237       is_full_compaction_(IsFullCompaction(vstorage, inputs_)),
238       is_manual_compaction_(_manual_compaction),
239       is_trivial_move_(false),
240       compaction_reason_(_compaction_reason) {
241   MarkFilesBeingCompacted(true);
242   if (is_manual_compaction_) {
243     compaction_reason_ = CompactionReason::kManualCompaction;
244   }
245   if (max_subcompactions_ == 0) {
246     max_subcompactions_ = immutable_cf_options_.max_subcompactions;
247   }
248   if (!bottommost_level_) {
249     // Currently we only enable dictionary compression during compaction to the
250     // bottommost level.
251     output_compression_opts_.max_dict_bytes = 0;
252     output_compression_opts_.zstd_max_train_bytes = 0;
253   }
254 
255 #ifndef NDEBUG
256   for (size_t i = 1; i < inputs_.size(); ++i) {
257     assert(inputs_[i].level > inputs_[i - 1].level);
258   }
259 #endif
260 
261   // setup input_levels_
262   {
263     input_levels_.resize(num_input_levels());
264     for (size_t which = 0; which < num_input_levels(); which++) {
265       DoGenerateLevelFilesBrief(&input_levels_[which], inputs_[which].files,
266                                 &arena_);
267     }
268   }
269 
270   GetBoundaryKeys(vstorage, inputs_, &smallest_user_key_, &largest_user_key_);
271 }
272 
~Compaction()273 Compaction::~Compaction() {
274   if (input_version_ != nullptr) {
275     input_version_->Unref();
276   }
277   if (cfd_ != nullptr) {
278     cfd_->UnrefAndTryDelete();
279   }
280 }
281 
InputCompressionMatchesOutput() const282 bool Compaction::InputCompressionMatchesOutput() const {
283   int base_level = input_vstorage_->base_level();
284   bool matches = (GetCompressionType(immutable_cf_options_, input_vstorage_,
285                                      mutable_cf_options_, start_level_,
286                                      base_level) == output_compression_);
287   if (matches) {
288     TEST_SYNC_POINT("Compaction::InputCompressionMatchesOutput:Matches");
289     return true;
290   }
291   TEST_SYNC_POINT("Compaction::InputCompressionMatchesOutput:DidntMatch");
292   return matches;
293 }
294 
IsTrivialMove() const295 bool Compaction::IsTrivialMove() const {
296   // Avoid a move if there is lots of overlapping grandparent data.
297   // Otherwise, the move could create a parent file that will require
298   // a very expensive merge later on.
299   // If start_level_== output_level_, the purpose is to force compaction
300   // filter to be applied to that level, and thus cannot be a trivial move.
301 
302   // Check if start level have files with overlapping ranges
303   if (start_level_ == 0 && input_vstorage_->level0_non_overlapping() == false) {
304     // We cannot move files from L0 to L1 if the files are overlapping
305     return false;
306   }
307 
308   if (is_manual_compaction_ &&
309       (immutable_cf_options_.compaction_filter != nullptr ||
310        immutable_cf_options_.compaction_filter_factory != nullptr)) {
311     // This is a manual compaction and we have a compaction filter that should
312     // be executed, we cannot do a trivial move
313     return false;
314   }
315 
316   // Used in universal compaction, where trivial move can be done if the
317   // input files are non overlapping
318   if ((mutable_cf_options_.compaction_options_universal.allow_trivial_move) &&
319       (output_level_ != 0)) {
320     return is_trivial_move_;
321   }
322 
323   if (!(start_level_ != output_level_ && num_input_levels() == 1 &&
324           input(0, 0)->fd.GetPathId() == output_path_id() &&
325           InputCompressionMatchesOutput())) {
326     return false;
327   }
328 
329   // assert inputs_.size() == 1
330 
331   for (const auto& file : inputs_.front().files) {
332     std::vector<FileMetaData*> file_grand_parents;
333     if (output_level_ + 1 >= number_levels_) {
334       continue;
335     }
336     input_vstorage_->GetOverlappingInputs(output_level_ + 1, &file->smallest,
337                                           &file->largest, &file_grand_parents);
338     const auto compaction_size =
339         file->fd.GetFileSize() + TotalFileSize(file_grand_parents);
340     if (compaction_size > max_compaction_bytes_) {
341       return false;
342     }
343   }
344 
345   return true;
346 }
347 
AddInputDeletions(VersionEdit * out_edit)348 void Compaction::AddInputDeletions(VersionEdit* out_edit) {
349   for (size_t which = 0; which < num_input_levels(); which++) {
350     for (size_t i = 0; i < inputs_[which].size(); i++) {
351       out_edit->DeleteFile(level(which), inputs_[which][i]->fd.GetNumber());
352     }
353   }
354 }
355 
KeyNotExistsBeyondOutputLevel(const Slice & user_key,std::vector<size_t> * level_ptrs) const356 bool Compaction::KeyNotExistsBeyondOutputLevel(
357     const Slice& user_key, std::vector<size_t>* level_ptrs) const {
358   assert(input_version_ != nullptr);
359   assert(level_ptrs != nullptr);
360   assert(level_ptrs->size() == static_cast<size_t>(number_levels_));
361   if (bottommost_level_) {
362     return true;
363   } else if (output_level_ != 0 &&
364              cfd_->ioptions()->compaction_style == kCompactionStyleLevel) {
365     // Maybe use binary search to find right entry instead of linear search?
366     const Comparator* user_cmp = cfd_->user_comparator();
367     for (int lvl = output_level_ + 1; lvl < number_levels_; lvl++) {
368       const std::vector<FileMetaData*>& files =
369           input_vstorage_->LevelFiles(lvl);
370       for (; level_ptrs->at(lvl) < files.size(); level_ptrs->at(lvl)++) {
371         auto* f = files[level_ptrs->at(lvl)];
372         if (user_cmp->Compare(user_key, f->largest.user_key()) <= 0) {
373           // We've advanced far enough
374           if (user_cmp->Compare(user_key, f->smallest.user_key()) >= 0) {
375             // Key falls in this file's range, so it may
376             // exist beyond output level
377             return false;
378           }
379           break;
380         }
381       }
382     }
383     return true;
384   }
385   return false;
386 }
387 
388 // Mark (or clear) each file that is being compacted
MarkFilesBeingCompacted(bool mark_as_compacted)389 void Compaction::MarkFilesBeingCompacted(bool mark_as_compacted) {
390   for (size_t i = 0; i < num_input_levels(); i++) {
391     for (size_t j = 0; j < inputs_[i].size(); j++) {
392       assert(mark_as_compacted ? !inputs_[i][j]->being_compacted
393                                : inputs_[i][j]->being_compacted);
394       inputs_[i][j]->being_compacted = mark_as_compacted;
395     }
396   }
397 }
398 
399 // Sample output:
400 // If compacting 3 L0 files, 2 L3 files and 1 L4 file, and outputting to L5,
401 // print: "3@0 + 2@3 + 1@4 files to L5"
InputLevelSummary(InputLevelSummaryBuffer * scratch) const402 const char* Compaction::InputLevelSummary(
403     InputLevelSummaryBuffer* scratch) const {
404   int len = 0;
405   bool is_first = true;
406   for (auto& input_level : inputs_) {
407     if (input_level.empty()) {
408       continue;
409     }
410     if (!is_first) {
411       len +=
412           snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, " + ");
413       len = std::min(len, static_cast<int>(sizeof(scratch->buffer)));
414     } else {
415       is_first = false;
416     }
417     len += snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len,
418                     "%" ROCKSDB_PRIszt "@%d", input_level.size(),
419                     input_level.level);
420     len = std::min(len, static_cast<int>(sizeof(scratch->buffer)));
421   }
422   snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len,
423            " files to L%d", output_level());
424 
425   return scratch->buffer;
426 }
427 
CalculateTotalInputSize() const428 uint64_t Compaction::CalculateTotalInputSize() const {
429   uint64_t size = 0;
430   for (auto& input_level : inputs_) {
431     for (auto f : input_level.files) {
432       size += f->fd.GetFileSize();
433     }
434   }
435   return size;
436 }
437 
ReleaseCompactionFiles(Status status)438 void Compaction::ReleaseCompactionFiles(Status status) {
439   MarkFilesBeingCompacted(false);
440   cfd_->compaction_picker()->ReleaseCompactionFiles(this, status);
441 }
442 
ResetNextCompactionIndex()443 void Compaction::ResetNextCompactionIndex() {
444   assert(input_version_ != nullptr);
445   input_vstorage_->ResetNextCompactionIndex(start_level_);
446 }
447 
448 namespace {
InputSummary(const std::vector<FileMetaData * > & files,char * output,int len)449 int InputSummary(const std::vector<FileMetaData*>& files, char* output,
450                  int len) {
451   *output = '\0';
452   int write = 0;
453   for (size_t i = 0; i < files.size(); i++) {
454     int sz = len - write;
455     int ret;
456     char sztxt[16];
457     AppendHumanBytes(files.at(i)->fd.GetFileSize(), sztxt, 16);
458     ret = snprintf(output + write, sz, "%" PRIu64 "(%s) ",
459                    files.at(i)->fd.GetNumber(), sztxt);
460     if (ret < 0 || ret >= sz) break;
461     write += ret;
462   }
463   // if files.size() is non-zero, overwrite the last space
464   return write - !!files.size();
465 }
466 }  // namespace
467 
Summary(char * output,int len)468 void Compaction::Summary(char* output, int len) {
469   int write =
470       snprintf(output, len, "Base version %" PRIu64 " Base level %d, inputs: [",
471                input_version_->GetVersionNumber(), start_level_);
472   if (write < 0 || write >= len) {
473     return;
474   }
475 
476   for (size_t level_iter = 0; level_iter < num_input_levels(); ++level_iter) {
477     if (level_iter > 0) {
478       write += snprintf(output + write, len - write, "], [");
479       if (write < 0 || write >= len) {
480         return;
481       }
482     }
483     write +=
484         InputSummary(inputs_[level_iter].files, output + write, len - write);
485     if (write < 0 || write >= len) {
486       return;
487     }
488   }
489 
490   snprintf(output + write, len - write, "]");
491 }
492 
OutputFilePreallocationSize() const493 uint64_t Compaction::OutputFilePreallocationSize() const {
494   uint64_t preallocation_size = 0;
495 
496   for (const auto& level_files : inputs_) {
497     for (const auto& file : level_files.files) {
498       preallocation_size += file->fd.GetFileSize();
499     }
500   }
501 
502   if (max_output_file_size_ != port::kMaxUint64 &&
503       (immutable_cf_options_.compaction_style == kCompactionStyleLevel ||
504        output_level() > 0)) {
505     preallocation_size = std::min(max_output_file_size_, preallocation_size);
506   }
507 
508   // Over-estimate slightly so we don't end up just barely crossing
509   // the threshold
510   // No point to prellocate more than 1GB.
511   return std::min(uint64_t{1073741824},
512                   preallocation_size + (preallocation_size / 10));
513 }
514 
CreateCompactionFilter() const515 std::unique_ptr<CompactionFilter> Compaction::CreateCompactionFilter() const {
516   if (!cfd_->ioptions()->compaction_filter_factory) {
517     return nullptr;
518   }
519 
520   CompactionFilter::Context context;
521   context.is_full_compaction = is_full_compaction_;
522   context.is_manual_compaction = is_manual_compaction_;
523   context.column_family_id = cfd_->GetID();
524   return cfd_->ioptions()->compaction_filter_factory->CreateCompactionFilter(
525       context);
526 }
527 
IsOutputLevelEmpty() const528 bool Compaction::IsOutputLevelEmpty() const {
529   return inputs_.back().level != output_level_ || inputs_.back().empty();
530 }
531 
ShouldFormSubcompactions() const532 bool Compaction::ShouldFormSubcompactions() const {
533   if (max_subcompactions_ <= 1 || cfd_ == nullptr) {
534     return false;
535   }
536   if (cfd_->ioptions()->compaction_style == kCompactionStyleLevel) {
537     return (start_level_ == 0 || is_manual_compaction_) && output_level_ > 0 &&
538            !IsOutputLevelEmpty();
539   } else if (cfd_->ioptions()->compaction_style == kCompactionStyleUniversal) {
540     return number_levels_ > 1 && output_level_ > 0;
541   } else {
542     return false;
543   }
544 }
545 
MinInputFileOldestAncesterTime() const546 uint64_t Compaction::MinInputFileOldestAncesterTime() const {
547   uint64_t min_oldest_ancester_time = port::kMaxUint64;
548   for (const auto& level_files : inputs_) {
549     for (const auto& file : level_files.files) {
550       uint64_t oldest_ancester_time = file->TryGetOldestAncesterTime();
551       if (oldest_ancester_time != 0) {
552         min_oldest_ancester_time =
553             std::min(min_oldest_ancester_time, oldest_ancester_time);
554       }
555     }
556   }
557   return min_oldest_ancester_time;
558 }
559 
GetInputBaseLevel() const560 int Compaction::GetInputBaseLevel() const {
561   return input_vstorage_->base_level();
562 }
563 
564 }  // namespace ROCKSDB_NAMESPACE
565