1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "db/compaction/compaction.h"
11 
12 #include <cinttypes>
13 #include <vector>
14 
15 #include "db/column_family.h"
16 #include "rocksdb/compaction_filter.h"
17 #include "rocksdb/sst_partitioner.h"
18 #include "test_util/sync_point.h"
19 #include "util/string_util.h"
20 
21 namespace ROCKSDB_NAMESPACE {
22 
23 const uint64_t kRangeTombstoneSentinel =
24     PackSequenceAndType(kMaxSequenceNumber, kTypeRangeDeletion);
25 
sstableKeyCompare(const Comparator * user_cmp,const InternalKey & a,const InternalKey & b)26 int sstableKeyCompare(const Comparator* user_cmp, const InternalKey& a,
27                       const InternalKey& b) {
28   auto c = user_cmp->CompareWithoutTimestamp(a.user_key(), b.user_key());
29   if (c != 0) {
30     return c;
31   }
32   auto a_footer = ExtractInternalKeyFooter(a.Encode());
33   auto b_footer = ExtractInternalKeyFooter(b.Encode());
34   if (a_footer == kRangeTombstoneSentinel) {
35     if (b_footer != kRangeTombstoneSentinel) {
36       return -1;
37     }
38   } else if (b_footer == kRangeTombstoneSentinel) {
39     return 1;
40   }
41   return 0;
42 }
43 
sstableKeyCompare(const Comparator * user_cmp,const InternalKey * a,const InternalKey & b)44 int sstableKeyCompare(const Comparator* user_cmp, const InternalKey* a,
45                       const InternalKey& b) {
46   if (a == nullptr) {
47     return -1;
48   }
49   return sstableKeyCompare(user_cmp, *a, b);
50 }
51 
sstableKeyCompare(const Comparator * user_cmp,const InternalKey & a,const InternalKey * b)52 int sstableKeyCompare(const Comparator* user_cmp, const InternalKey& a,
53                       const InternalKey* b) {
54   if (b == nullptr) {
55     return -1;
56   }
57   return sstableKeyCompare(user_cmp, a, *b);
58 }
59 
TotalFileSize(const std::vector<FileMetaData * > & files)60 uint64_t TotalFileSize(const std::vector<FileMetaData*>& files) {
61   uint64_t sum = 0;
62   for (size_t i = 0; i < files.size() && files[i]; i++) {
63     sum += files[i]->fd.GetFileSize();
64   }
65   return sum;
66 }
67 
SetInputVersion(Version * _input_version)68 void Compaction::SetInputVersion(Version* _input_version) {
69   input_version_ = _input_version;
70   cfd_ = input_version_->cfd();
71 
72   cfd_->Ref();
73   input_version_->Ref();
74   edit_.SetColumnFamily(cfd_->GetID());
75 }
76 
GetBoundaryKeys(VersionStorageInfo * vstorage,const std::vector<CompactionInputFiles> & inputs,Slice * smallest_user_key,Slice * largest_user_key)77 void Compaction::GetBoundaryKeys(
78     VersionStorageInfo* vstorage,
79     const std::vector<CompactionInputFiles>& inputs, Slice* smallest_user_key,
80     Slice* largest_user_key) {
81   bool initialized = false;
82   const Comparator* ucmp = vstorage->InternalComparator()->user_comparator();
83   for (size_t i = 0; i < inputs.size(); ++i) {
84     if (inputs[i].files.empty()) {
85       continue;
86     }
87     if (inputs[i].level == 0) {
88       // we need to consider all files on level 0
89       for (const auto* f : inputs[i].files) {
90         const Slice& start_user_key = f->smallest.user_key();
91         if (!initialized ||
92             ucmp->Compare(start_user_key, *smallest_user_key) < 0) {
93           *smallest_user_key = start_user_key;
94         }
95         const Slice& end_user_key = f->largest.user_key();
96         if (!initialized ||
97             ucmp->Compare(end_user_key, *largest_user_key) > 0) {
98           *largest_user_key = end_user_key;
99         }
100         initialized = true;
101       }
102     } else {
103       // we only need to consider the first and last file
104       const Slice& start_user_key = inputs[i].files[0]->smallest.user_key();
105       if (!initialized ||
106           ucmp->Compare(start_user_key, *smallest_user_key) < 0) {
107         *smallest_user_key = start_user_key;
108       }
109       const Slice& end_user_key = inputs[i].files.back()->largest.user_key();
110       if (!initialized || ucmp->Compare(end_user_key, *largest_user_key) > 0) {
111         *largest_user_key = end_user_key;
112       }
113       initialized = true;
114     }
115   }
116 }
117 
PopulateWithAtomicBoundaries(VersionStorageInfo * vstorage,std::vector<CompactionInputFiles> inputs)118 std::vector<CompactionInputFiles> Compaction::PopulateWithAtomicBoundaries(
119     VersionStorageInfo* vstorage, std::vector<CompactionInputFiles> inputs) {
120   const Comparator* ucmp = vstorage->InternalComparator()->user_comparator();
121   for (size_t i = 0; i < inputs.size(); i++) {
122     if (inputs[i].level == 0 || inputs[i].files.empty()) {
123       continue;
124     }
125     inputs[i].atomic_compaction_unit_boundaries.reserve(inputs[i].files.size());
126     AtomicCompactionUnitBoundary cur_boundary;
127     size_t first_atomic_idx = 0;
128     auto add_unit_boundary = [&](size_t to) {
129       if (first_atomic_idx == to) return;
130       for (size_t k = first_atomic_idx; k < to; k++) {
131         inputs[i].atomic_compaction_unit_boundaries.push_back(cur_boundary);
132       }
133       first_atomic_idx = to;
134     };
135     for (size_t j = 0; j < inputs[i].files.size(); j++) {
136       const auto* f = inputs[i].files[j];
137       if (j == 0) {
138         // First file in a level.
139         cur_boundary.smallest = &f->smallest;
140         cur_boundary.largest = &f->largest;
141       } else if (sstableKeyCompare(ucmp, *cur_boundary.largest, f->smallest) ==
142                  0) {
143         // SSTs overlap but the end key of the previous file was not
144         // artificially extended by a range tombstone. Extend the current
145         // boundary.
146         cur_boundary.largest = &f->largest;
147       } else {
148         // Atomic compaction unit has ended.
149         add_unit_boundary(j);
150         cur_boundary.smallest = &f->smallest;
151         cur_boundary.largest = &f->largest;
152       }
153     }
154     add_unit_boundary(inputs[i].files.size());
155     assert(inputs[i].files.size() ==
156            inputs[i].atomic_compaction_unit_boundaries.size());
157   }
158   return inputs;
159 }
160 
161 // helper function to determine if compaction is creating files at the
162 // bottommost level
IsBottommostLevel(int output_level,VersionStorageInfo * vstorage,const std::vector<CompactionInputFiles> & inputs)163 bool Compaction::IsBottommostLevel(
164     int output_level, VersionStorageInfo* vstorage,
165     const std::vector<CompactionInputFiles>& inputs) {
166   int output_l0_idx;
167   if (output_level == 0) {
168     output_l0_idx = 0;
169     for (const auto* file : vstorage->LevelFiles(0)) {
170       if (inputs[0].files.back() == file) {
171         break;
172       }
173       ++output_l0_idx;
174     }
175     assert(static_cast<size_t>(output_l0_idx) < vstorage->LevelFiles(0).size());
176   } else {
177     output_l0_idx = -1;
178   }
179   Slice smallest_key, largest_key;
180   GetBoundaryKeys(vstorage, inputs, &smallest_key, &largest_key);
181   return !vstorage->RangeMightExistAfterSortedRun(smallest_key, largest_key,
182                                                   output_level, output_l0_idx);
183 }
184 
185 // test function to validate the functionality of IsBottommostLevel()
186 // function -- determines if compaction with inputs and storage is bottommost
TEST_IsBottommostLevel(int output_level,VersionStorageInfo * vstorage,const std::vector<CompactionInputFiles> & inputs)187 bool Compaction::TEST_IsBottommostLevel(
188     int output_level, VersionStorageInfo* vstorage,
189     const std::vector<CompactionInputFiles>& inputs) {
190   return IsBottommostLevel(output_level, vstorage, inputs);
191 }
192 
IsFullCompaction(VersionStorageInfo * vstorage,const std::vector<CompactionInputFiles> & inputs)193 bool Compaction::IsFullCompaction(
194     VersionStorageInfo* vstorage,
195     const std::vector<CompactionInputFiles>& inputs) {
196   size_t num_files_in_compaction = 0;
197   size_t total_num_files = 0;
198   for (int l = 0; l < vstorage->num_levels(); l++) {
199     total_num_files += vstorage->NumLevelFiles(l);
200   }
201   for (size_t i = 0; i < inputs.size(); i++) {
202     num_files_in_compaction += inputs[i].size();
203   }
204   return num_files_in_compaction == total_num_files;
205 }
206 
Compaction(VersionStorageInfo * vstorage,const ImmutableOptions & _immutable_options,const MutableCFOptions & _mutable_cf_options,const MutableDBOptions & _mutable_db_options,std::vector<CompactionInputFiles> _inputs,int _output_level,uint64_t _target_file_size,uint64_t _max_compaction_bytes,uint32_t _output_path_id,CompressionType _compression,CompressionOptions _compression_opts,uint32_t _max_subcompactions,std::vector<FileMetaData * > _grandparents,bool _manual_compaction,double _score,bool _deletion_compaction,CompactionReason _compaction_reason)207 Compaction::Compaction(
208     VersionStorageInfo* vstorage, const ImmutableOptions& _immutable_options,
209     const MutableCFOptions& _mutable_cf_options,
210     const MutableDBOptions& _mutable_db_options,
211     std::vector<CompactionInputFiles> _inputs, int _output_level,
212     uint64_t _target_file_size, uint64_t _max_compaction_bytes,
213     uint32_t _output_path_id, CompressionType _compression,
214     CompressionOptions _compression_opts, uint32_t _max_subcompactions,
215     std::vector<FileMetaData*> _grandparents, bool _manual_compaction,
216     double _score, bool _deletion_compaction,
217     CompactionReason _compaction_reason)
218     : input_vstorage_(vstorage),
219       start_level_(_inputs[0].level),
220       output_level_(_output_level),
221       max_output_file_size_(_target_file_size),
222       max_compaction_bytes_(_max_compaction_bytes),
223       max_subcompactions_(_max_subcompactions),
224       immutable_options_(_immutable_options),
225       mutable_cf_options_(_mutable_cf_options),
226       input_version_(nullptr),
227       number_levels_(vstorage->num_levels()),
228       cfd_(nullptr),
229       output_path_id_(_output_path_id),
230       output_compression_(_compression),
231       output_compression_opts_(_compression_opts),
232       deletion_compaction_(_deletion_compaction),
233       inputs_(PopulateWithAtomicBoundaries(vstorage, std::move(_inputs))),
234       grandparents_(std::move(_grandparents)),
235       score_(_score),
236       bottommost_level_(IsBottommostLevel(output_level_, vstorage, inputs_)),
237       is_full_compaction_(IsFullCompaction(vstorage, inputs_)),
238       is_manual_compaction_(_manual_compaction),
239       is_trivial_move_(false),
240       compaction_reason_(_compaction_reason) {
241   MarkFilesBeingCompacted(true);
242   if (is_manual_compaction_) {
243     compaction_reason_ = CompactionReason::kManualCompaction;
244   }
245   if (max_subcompactions_ == 0) {
246     max_subcompactions_ = _mutable_db_options.max_subcompactions;
247   }
248 
249 #ifndef NDEBUG
250   for (size_t i = 1; i < inputs_.size(); ++i) {
251     assert(inputs_[i].level > inputs_[i - 1].level);
252   }
253 #endif
254 
255   // setup input_levels_
256   {
257     input_levels_.resize(num_input_levels());
258     for (size_t which = 0; which < num_input_levels(); which++) {
259       DoGenerateLevelFilesBrief(&input_levels_[which], inputs_[which].files,
260                                 &arena_);
261     }
262   }
263 
264   GetBoundaryKeys(vstorage, inputs_, &smallest_user_key_, &largest_user_key_);
265 }
266 
~Compaction()267 Compaction::~Compaction() {
268   if (input_version_ != nullptr) {
269     input_version_->Unref();
270   }
271   if (cfd_ != nullptr) {
272     cfd_->UnrefAndTryDelete();
273   }
274 }
275 
InputCompressionMatchesOutput() const276 bool Compaction::InputCompressionMatchesOutput() const {
277   int base_level = input_vstorage_->base_level();
278   bool matches = (GetCompressionType(immutable_options_, input_vstorage_,
279                                      mutable_cf_options_, start_level_,
280                                      base_level) == output_compression_);
281   if (matches) {
282     TEST_SYNC_POINT("Compaction::InputCompressionMatchesOutput:Matches");
283     return true;
284   }
285   TEST_SYNC_POINT("Compaction::InputCompressionMatchesOutput:DidntMatch");
286   return matches;
287 }
288 
IsTrivialMove() const289 bool Compaction::IsTrivialMove() const {
290   // Avoid a move if there is lots of overlapping grandparent data.
291   // Otherwise, the move could create a parent file that will require
292   // a very expensive merge later on.
293   // If start_level_== output_level_, the purpose is to force compaction
294   // filter to be applied to that level, and thus cannot be a trivial move.
295 
296   // Check if start level have files with overlapping ranges
297   if (start_level_ == 0 && input_vstorage_->level0_non_overlapping() == false) {
298     // We cannot move files from L0 to L1 if the files are overlapping
299     return false;
300   }
301 
302   if (is_manual_compaction_ &&
303       (immutable_options_.compaction_filter != nullptr ||
304        immutable_options_.compaction_filter_factory != nullptr)) {
305     // This is a manual compaction and we have a compaction filter that should
306     // be executed, we cannot do a trivial move
307     return false;
308   }
309 
310   // Used in universal compaction, where trivial move can be done if the
311   // input files are non overlapping
312   if ((mutable_cf_options_.compaction_options_universal.allow_trivial_move) &&
313       (output_level_ != 0)) {
314     return is_trivial_move_;
315   }
316 
317   if (!(start_level_ != output_level_ && num_input_levels() == 1 &&
318           input(0, 0)->fd.GetPathId() == output_path_id() &&
319           InputCompressionMatchesOutput())) {
320     return false;
321   }
322 
323   // assert inputs_.size() == 1
324 
325   std::unique_ptr<SstPartitioner> partitioner = CreateSstPartitioner();
326 
327   for (const auto& file : inputs_.front().files) {
328     std::vector<FileMetaData*> file_grand_parents;
329     if (output_level_ + 1 >= number_levels_) {
330       continue;
331     }
332     input_vstorage_->GetOverlappingInputs(output_level_ + 1, &file->smallest,
333                                           &file->largest, &file_grand_parents);
334     const auto compaction_size =
335         file->fd.GetFileSize() + TotalFileSize(file_grand_parents);
336     if (compaction_size > max_compaction_bytes_) {
337       return false;
338     }
339 
340     if (partitioner.get() != nullptr) {
341       if (!partitioner->CanDoTrivialMove(file->smallest.user_key(),
342                                          file->largest.user_key())) {
343         return false;
344       }
345     }
346   }
347 
348   return true;
349 }
350 
AddInputDeletions(VersionEdit * out_edit)351 void Compaction::AddInputDeletions(VersionEdit* out_edit) {
352   for (size_t which = 0; which < num_input_levels(); which++) {
353     for (size_t i = 0; i < inputs_[which].size(); i++) {
354       out_edit->DeleteFile(level(which), inputs_[which][i]->fd.GetNumber());
355     }
356   }
357 }
358 
KeyNotExistsBeyondOutputLevel(const Slice & user_key,std::vector<size_t> * level_ptrs) const359 bool Compaction::KeyNotExistsBeyondOutputLevel(
360     const Slice& user_key, std::vector<size_t>* level_ptrs) const {
361   assert(input_version_ != nullptr);
362   assert(level_ptrs != nullptr);
363   assert(level_ptrs->size() == static_cast<size_t>(number_levels_));
364   if (bottommost_level_) {
365     return true;
366   } else if (output_level_ != 0 &&
367              cfd_->ioptions()->compaction_style == kCompactionStyleLevel) {
368     // Maybe use binary search to find right entry instead of linear search?
369     const Comparator* user_cmp = cfd_->user_comparator();
370     for (int lvl = output_level_ + 1; lvl < number_levels_; lvl++) {
371       const std::vector<FileMetaData*>& files =
372           input_vstorage_->LevelFiles(lvl);
373       for (; level_ptrs->at(lvl) < files.size(); level_ptrs->at(lvl)++) {
374         auto* f = files[level_ptrs->at(lvl)];
375         if (user_cmp->Compare(user_key, f->largest.user_key()) <= 0) {
376           // We've advanced far enough
377           // In the presence of user-defined timestamp, we may need to handle
378           // the case in which f->smallest.user_key() (including ts) has the
379           // same user key, but the ts part is smaller. If so,
380           // Compare(user_key, f->smallest.user_key()) returns -1.
381           // That's why we need CompareWithoutTimestamp().
382           if (user_cmp->CompareWithoutTimestamp(user_key,
383                                                 f->smallest.user_key()) >= 0) {
384             // Key falls in this file's range, so it may
385             // exist beyond output level
386             return false;
387           }
388           break;
389         }
390       }
391     }
392     return true;
393   }
394   return false;
395 }
396 
397 // Mark (or clear) each file that is being compacted
MarkFilesBeingCompacted(bool mark_as_compacted)398 void Compaction::MarkFilesBeingCompacted(bool mark_as_compacted) {
399   for (size_t i = 0; i < num_input_levels(); i++) {
400     for (size_t j = 0; j < inputs_[i].size(); j++) {
401       assert(mark_as_compacted ? !inputs_[i][j]->being_compacted
402                                : inputs_[i][j]->being_compacted);
403       inputs_[i][j]->being_compacted = mark_as_compacted;
404     }
405   }
406 }
407 
408 // Sample output:
409 // If compacting 3 L0 files, 2 L3 files and 1 L4 file, and outputting to L5,
410 // print: "3@0 + 2@3 + 1@4 files to L5"
InputLevelSummary(InputLevelSummaryBuffer * scratch) const411 const char* Compaction::InputLevelSummary(
412     InputLevelSummaryBuffer* scratch) const {
413   int len = 0;
414   bool is_first = true;
415   for (auto& input_level : inputs_) {
416     if (input_level.empty()) {
417       continue;
418     }
419     if (!is_first) {
420       len +=
421           snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, " + ");
422       len = std::min(len, static_cast<int>(sizeof(scratch->buffer)));
423     } else {
424       is_first = false;
425     }
426     len += snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len,
427                     "%" ROCKSDB_PRIszt "@%d", input_level.size(),
428                     input_level.level);
429     len = std::min(len, static_cast<int>(sizeof(scratch->buffer)));
430   }
431   snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len,
432            " files to L%d", output_level());
433 
434   return scratch->buffer;
435 }
436 
CalculateTotalInputSize() const437 uint64_t Compaction::CalculateTotalInputSize() const {
438   uint64_t size = 0;
439   for (auto& input_level : inputs_) {
440     for (auto f : input_level.files) {
441       size += f->fd.GetFileSize();
442     }
443   }
444   return size;
445 }
446 
ReleaseCompactionFiles(Status status)447 void Compaction::ReleaseCompactionFiles(Status status) {
448   MarkFilesBeingCompacted(false);
449   cfd_->compaction_picker()->ReleaseCompactionFiles(this, status);
450 }
451 
ResetNextCompactionIndex()452 void Compaction::ResetNextCompactionIndex() {
453   assert(input_version_ != nullptr);
454   input_vstorage_->ResetNextCompactionIndex(start_level_);
455 }
456 
457 namespace {
InputSummary(const std::vector<FileMetaData * > & files,char * output,int len)458 int InputSummary(const std::vector<FileMetaData*>& files, char* output,
459                  int len) {
460   *output = '\0';
461   int write = 0;
462   for (size_t i = 0; i < files.size(); i++) {
463     int sz = len - write;
464     int ret;
465     char sztxt[16];
466     AppendHumanBytes(files.at(i)->fd.GetFileSize(), sztxt, 16);
467     ret = snprintf(output + write, sz, "%" PRIu64 "(%s) ",
468                    files.at(i)->fd.GetNumber(), sztxt);
469     if (ret < 0 || ret >= sz) break;
470     write += ret;
471   }
472   // if files.size() is non-zero, overwrite the last space
473   return write - !!files.size();
474 }
475 }  // namespace
476 
Summary(char * output,int len)477 void Compaction::Summary(char* output, int len) {
478   int write =
479       snprintf(output, len, "Base version %" PRIu64 " Base level %d, inputs: [",
480                input_version_->GetVersionNumber(), start_level_);
481   if (write < 0 || write >= len) {
482     return;
483   }
484 
485   for (size_t level_iter = 0; level_iter < num_input_levels(); ++level_iter) {
486     if (level_iter > 0) {
487       write += snprintf(output + write, len - write, "], [");
488       if (write < 0 || write >= len) {
489         return;
490       }
491     }
492     write +=
493         InputSummary(inputs_[level_iter].files, output + write, len - write);
494     if (write < 0 || write >= len) {
495       return;
496     }
497   }
498 
499   snprintf(output + write, len - write, "]");
500 }
501 
OutputFilePreallocationSize() const502 uint64_t Compaction::OutputFilePreallocationSize() const {
503   uint64_t preallocation_size = 0;
504 
505   for (const auto& level_files : inputs_) {
506     for (const auto& file : level_files.files) {
507       preallocation_size += file->fd.GetFileSize();
508     }
509   }
510 
511   if (max_output_file_size_ != port::kMaxUint64 &&
512       (immutable_options_.compaction_style == kCompactionStyleLevel ||
513        output_level() > 0)) {
514     preallocation_size = std::min(max_output_file_size_, preallocation_size);
515   }
516 
517   // Over-estimate slightly so we don't end up just barely crossing
518   // the threshold
519   // No point to preallocate more than 1GB.
520   return std::min(uint64_t{1073741824},
521                   preallocation_size + (preallocation_size / 10));
522 }
523 
CreateCompactionFilter() const524 std::unique_ptr<CompactionFilter> Compaction::CreateCompactionFilter() const {
525   if (!cfd_->ioptions()->compaction_filter_factory) {
526     return nullptr;
527   }
528 
529   if (!cfd_->ioptions()
530            ->compaction_filter_factory->ShouldFilterTableFileCreation(
531                TableFileCreationReason::kCompaction)) {
532     return nullptr;
533   }
534 
535   CompactionFilter::Context context;
536   context.is_full_compaction = is_full_compaction_;
537   context.is_manual_compaction = is_manual_compaction_;
538   context.column_family_id = cfd_->GetID();
539   context.reason = TableFileCreationReason::kCompaction;
540   return cfd_->ioptions()->compaction_filter_factory->CreateCompactionFilter(
541       context);
542 }
543 
CreateSstPartitioner() const544 std::unique_ptr<SstPartitioner> Compaction::CreateSstPartitioner() const {
545   if (!immutable_options_.sst_partitioner_factory) {
546     return nullptr;
547   }
548 
549   SstPartitioner::Context context;
550   context.is_full_compaction = is_full_compaction_;
551   context.is_manual_compaction = is_manual_compaction_;
552   context.output_level = output_level_;
553   context.smallest_user_key = smallest_user_key_;
554   context.largest_user_key = largest_user_key_;
555   return immutable_options_.sst_partitioner_factory->CreatePartitioner(context);
556 }
557 
IsOutputLevelEmpty() const558 bool Compaction::IsOutputLevelEmpty() const {
559   return inputs_.back().level != output_level_ || inputs_.back().empty();
560 }
561 
ShouldFormSubcompactions() const562 bool Compaction::ShouldFormSubcompactions() const {
563   if (max_subcompactions_ <= 1 || cfd_ == nullptr) {
564     return false;
565   }
566   if (cfd_->ioptions()->compaction_style == kCompactionStyleLevel) {
567     return (start_level_ == 0 || is_manual_compaction_) && output_level_ > 0 &&
568            !IsOutputLevelEmpty();
569   } else if (cfd_->ioptions()->compaction_style == kCompactionStyleUniversal) {
570     return number_levels_ > 1 && output_level_ > 0;
571   } else {
572     return false;
573   }
574 }
575 
MinInputFileOldestAncesterTime() const576 uint64_t Compaction::MinInputFileOldestAncesterTime() const {
577   uint64_t min_oldest_ancester_time = port::kMaxUint64;
578   for (const auto& level_files : inputs_) {
579     for (const auto& file : level_files.files) {
580       uint64_t oldest_ancester_time = file->TryGetOldestAncesterTime();
581       if (oldest_ancester_time != 0) {
582         min_oldest_ancester_time =
583             std::min(min_oldest_ancester_time, oldest_ancester_time);
584       }
585     }
586   }
587   return min_oldest_ancester_time;
588 }
589 
GetInputBaseLevel() const590 int Compaction::GetInputBaseLevel() const {
591   return input_vstorage_->base_level();
592 }
593 
594 }  // namespace ROCKSDB_NAMESPACE
595