1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "db/compaction/compaction_picker.h"
11 
12 #include <cinttypes>
13 #include <limits>
14 #include <queue>
15 #include <string>
16 #include <utility>
17 #include <vector>
18 #include "db/column_family.h"
19 #include "file/filename.h"
20 #include "logging/log_buffer.h"
21 #include "monitoring/statistics.h"
22 #include "test_util/sync_point.h"
23 #include "util/random.h"
24 #include "util/string_util.h"
25 
26 namespace ROCKSDB_NAMESPACE {
27 
28 namespace {
TotalCompensatedFileSize(const std::vector<FileMetaData * > & files)29 uint64_t TotalCompensatedFileSize(const std::vector<FileMetaData*>& files) {
30   uint64_t sum = 0;
31   for (size_t i = 0; i < files.size() && files[i]; i++) {
32     sum += files[i]->compensated_file_size;
33   }
34   return sum;
35 }
36 }  // anonymous namespace
37 
FindIntraL0Compaction(const std::vector<FileMetaData * > & level_files,size_t min_files_to_compact,uint64_t max_compact_bytes_per_del_file,uint64_t max_compaction_bytes,CompactionInputFiles * comp_inputs,SequenceNumber earliest_mem_seqno)38 bool FindIntraL0Compaction(const std::vector<FileMetaData*>& level_files,
39                            size_t min_files_to_compact,
40                            uint64_t max_compact_bytes_per_del_file,
41                            uint64_t max_compaction_bytes,
42                            CompactionInputFiles* comp_inputs,
43                            SequenceNumber earliest_mem_seqno) {
44   // Do not pick ingested file when there is at least one memtable not flushed
45   // which of seqno is overlap with the sst.
46   TEST_SYNC_POINT("FindIntraL0Compaction");
47   size_t start = 0;
48   for (; start < level_files.size(); start++) {
49     if (level_files[start]->being_compacted) {
50       return false;
51     }
52     // If there is no data in memtable, the earliest sequence number would the
53     // largest sequence number in last memtable.
54     // Because all files are sorted in descending order by largest_seqno, so we
55     // only need to check the first one.
56     if (level_files[start]->fd.largest_seqno <= earliest_mem_seqno) {
57       break;
58     }
59   }
60   if (start >= level_files.size()) {
61     return false;
62   }
63   size_t compact_bytes = static_cast<size_t>(level_files[start]->fd.file_size);
64   uint64_t compensated_compact_bytes =
65       level_files[start]->compensated_file_size;
66   size_t compact_bytes_per_del_file = port::kMaxSizet;
67   // Compaction range will be [start, limit).
68   size_t limit;
69   // Pull in files until the amount of compaction work per deleted file begins
70   // increasing or maximum total compaction size is reached.
71   size_t new_compact_bytes_per_del_file = 0;
72   for (limit = start + 1; limit < level_files.size(); ++limit) {
73     compact_bytes += static_cast<size_t>(level_files[limit]->fd.file_size);
74     compensated_compact_bytes += level_files[limit]->compensated_file_size;
75     new_compact_bytes_per_del_file = compact_bytes / (limit - start);
76     if (level_files[limit]->being_compacted ||
77         new_compact_bytes_per_del_file > compact_bytes_per_del_file ||
78         compensated_compact_bytes > max_compaction_bytes) {
79       break;
80     }
81     compact_bytes_per_del_file = new_compact_bytes_per_del_file;
82   }
83 
84   if ((limit - start) >= min_files_to_compact &&
85       compact_bytes_per_del_file < max_compact_bytes_per_del_file) {
86     assert(comp_inputs != nullptr);
87     comp_inputs->level = 0;
88     for (size_t i = start; i < limit; ++i) {
89       comp_inputs->files.push_back(level_files[i]);
90     }
91     return true;
92   }
93   return false;
94 }
95 
96 // Determine compression type, based on user options, level of the output
97 // file and whether compression is disabled.
98 // If enable_compression is false, then compression is always disabled no
99 // matter what the values of the other two parameters are.
100 // Otherwise, the compression type is determined based on options and level.
GetCompressionType(const ImmutableCFOptions & ioptions,const VersionStorageInfo * vstorage,const MutableCFOptions & mutable_cf_options,int level,int base_level,const bool enable_compression)101 CompressionType GetCompressionType(const ImmutableCFOptions& ioptions,
102                                    const VersionStorageInfo* vstorage,
103                                    const MutableCFOptions& mutable_cf_options,
104                                    int level, int base_level,
105                                    const bool enable_compression) {
106   if (!enable_compression) {
107     // disable compression
108     return kNoCompression;
109   }
110 
111   // If bottommost_compression is set and we are compacting to the
112   // bottommost level then we should use it.
113   if (ioptions.bottommost_compression != kDisableCompressionOption &&
114       level >= (vstorage->num_non_empty_levels() - 1)) {
115     return ioptions.bottommost_compression;
116   }
117   // If the user has specified a different compression level for each level,
118   // then pick the compression for that level.
119   if (!ioptions.compression_per_level.empty()) {
120     assert(level == 0 || level >= base_level);
121     int idx = (level == 0) ? 0 : level - base_level + 1;
122 
123     const int n = static_cast<int>(ioptions.compression_per_level.size()) - 1;
124     // It is possible for level_ to be -1; in that case, we use level
125     // 0's compression.  This occurs mostly in backwards compatibility
126     // situations when the builder doesn't know what level the file
127     // belongs to.  Likewise, if level is beyond the end of the
128     // specified compression levels, use the last value.
129     return ioptions.compression_per_level[std::max(0, std::min(idx, n))];
130   } else {
131     return mutable_cf_options.compression;
132   }
133 }
134 
GetCompressionOptions(const ImmutableCFOptions & ioptions,const VersionStorageInfo * vstorage,int level,const bool enable_compression)135 CompressionOptions GetCompressionOptions(const ImmutableCFOptions& ioptions,
136                                          const VersionStorageInfo* vstorage,
137                                          int level,
138                                          const bool enable_compression) {
139   if (!enable_compression) {
140     return ioptions.compression_opts;
141   }
142   // If bottommost_compression is set and we are compacting to the
143   // bottommost level then we should use the specified compression options
144   // for the bottmomost_compression.
145   if (ioptions.bottommost_compression != kDisableCompressionOption &&
146       level >= (vstorage->num_non_empty_levels() - 1) &&
147       ioptions.bottommost_compression_opts.enabled) {
148     return ioptions.bottommost_compression_opts;
149   }
150   return ioptions.compression_opts;
151 }
152 
CompactionPicker(const ImmutableCFOptions & ioptions,const InternalKeyComparator * icmp)153 CompactionPicker::CompactionPicker(const ImmutableCFOptions& ioptions,
154                                    const InternalKeyComparator* icmp)
155     : ioptions_(ioptions), icmp_(icmp) {}
156 
~CompactionPicker()157 CompactionPicker::~CompactionPicker() {}
158 
159 // Delete this compaction from the list of running compactions.
ReleaseCompactionFiles(Compaction * c,Status status)160 void CompactionPicker::ReleaseCompactionFiles(Compaction* c, Status status) {
161   UnregisterCompaction(c);
162   if (!status.ok()) {
163     c->ResetNextCompactionIndex();
164   }
165 }
166 
GetRange(const CompactionInputFiles & inputs,InternalKey * smallest,InternalKey * largest) const167 void CompactionPicker::GetRange(const CompactionInputFiles& inputs,
168                                 InternalKey* smallest,
169                                 InternalKey* largest) const {
170   const int level = inputs.level;
171   assert(!inputs.empty());
172   smallest->Clear();
173   largest->Clear();
174 
175   if (level == 0) {
176     for (size_t i = 0; i < inputs.size(); i++) {
177       FileMetaData* f = inputs[i];
178       if (i == 0) {
179         *smallest = f->smallest;
180         *largest = f->largest;
181       } else {
182         if (icmp_->Compare(f->smallest, *smallest) < 0) {
183           *smallest = f->smallest;
184         }
185         if (icmp_->Compare(f->largest, *largest) > 0) {
186           *largest = f->largest;
187         }
188       }
189     }
190   } else {
191     *smallest = inputs[0]->smallest;
192     *largest = inputs[inputs.size() - 1]->largest;
193   }
194 }
195 
GetRange(const CompactionInputFiles & inputs1,const CompactionInputFiles & inputs2,InternalKey * smallest,InternalKey * largest) const196 void CompactionPicker::GetRange(const CompactionInputFiles& inputs1,
197                                 const CompactionInputFiles& inputs2,
198                                 InternalKey* smallest,
199                                 InternalKey* largest) const {
200   assert(!inputs1.empty() || !inputs2.empty());
201   if (inputs1.empty()) {
202     GetRange(inputs2, smallest, largest);
203   } else if (inputs2.empty()) {
204     GetRange(inputs1, smallest, largest);
205   } else {
206     InternalKey smallest1, smallest2, largest1, largest2;
207     GetRange(inputs1, &smallest1, &largest1);
208     GetRange(inputs2, &smallest2, &largest2);
209     *smallest =
210         icmp_->Compare(smallest1, smallest2) < 0 ? smallest1 : smallest2;
211     *largest = icmp_->Compare(largest1, largest2) < 0 ? largest2 : largest1;
212   }
213 }
214 
GetRange(const std::vector<CompactionInputFiles> & inputs,InternalKey * smallest,InternalKey * largest) const215 void CompactionPicker::GetRange(const std::vector<CompactionInputFiles>& inputs,
216                                 InternalKey* smallest,
217                                 InternalKey* largest) const {
218   InternalKey current_smallest;
219   InternalKey current_largest;
220   bool initialized = false;
221   for (const auto& in : inputs) {
222     if (in.empty()) {
223       continue;
224     }
225     GetRange(in, &current_smallest, &current_largest);
226     if (!initialized) {
227       *smallest = current_smallest;
228       *largest = current_largest;
229       initialized = true;
230     } else {
231       if (icmp_->Compare(current_smallest, *smallest) < 0) {
232         *smallest = current_smallest;
233       }
234       if (icmp_->Compare(current_largest, *largest) > 0) {
235         *largest = current_largest;
236       }
237     }
238   }
239   assert(initialized);
240 }
241 
ExpandInputsToCleanCut(const std::string &,VersionStorageInfo * vstorage,CompactionInputFiles * inputs,InternalKey ** next_smallest)242 bool CompactionPicker::ExpandInputsToCleanCut(const std::string& /*cf_name*/,
243                                               VersionStorageInfo* vstorage,
244                                               CompactionInputFiles* inputs,
245                                               InternalKey** next_smallest) {
246   // This isn't good compaction
247   assert(!inputs->empty());
248 
249   const int level = inputs->level;
250   // GetOverlappingInputs will always do the right thing for level-0.
251   // So we don't need to do any expansion if level == 0.
252   if (level == 0) {
253     return true;
254   }
255 
256   InternalKey smallest, largest;
257 
258   // Keep expanding inputs until we are sure that there is a "clean cut"
259   // boundary between the files in input and the surrounding files.
260   // This will ensure that no parts of a key are lost during compaction.
261   int hint_index = -1;
262   size_t old_size;
263   do {
264     old_size = inputs->size();
265     GetRange(*inputs, &smallest, &largest);
266     inputs->clear();
267     vstorage->GetOverlappingInputs(level, &smallest, &largest, &inputs->files,
268                                    hint_index, &hint_index, true,
269                                    next_smallest);
270   } while (inputs->size() > old_size);
271 
272   // we started off with inputs non-empty and the previous loop only grew
273   // inputs. thus, inputs should be non-empty here
274   assert(!inputs->empty());
275 
276   // If, after the expansion, there are files that are already under
277   // compaction, then we must drop/cancel this compaction.
278   if (AreFilesInCompaction(inputs->files)) {
279     return false;
280   }
281   return true;
282 }
283 
RangeOverlapWithCompaction(const Slice & smallest_user_key,const Slice & largest_user_key,int level) const284 bool CompactionPicker::RangeOverlapWithCompaction(
285     const Slice& smallest_user_key, const Slice& largest_user_key,
286     int level) const {
287   const Comparator* ucmp = icmp_->user_comparator();
288   for (Compaction* c : compactions_in_progress_) {
289     if (c->output_level() == level &&
290         ucmp->Compare(smallest_user_key, c->GetLargestUserKey()) <= 0 &&
291         ucmp->Compare(largest_user_key, c->GetSmallestUserKey()) >= 0) {
292       // Overlap
293       return true;
294     }
295   }
296   // Did not overlap with any running compaction in level `level`
297   return false;
298 }
299 
FilesRangeOverlapWithCompaction(const std::vector<CompactionInputFiles> & inputs,int level) const300 bool CompactionPicker::FilesRangeOverlapWithCompaction(
301     const std::vector<CompactionInputFiles>& inputs, int level) const {
302   bool is_empty = true;
303   for (auto& in : inputs) {
304     if (!in.empty()) {
305       is_empty = false;
306       break;
307     }
308   }
309   if (is_empty) {
310     // No files in inputs
311     return false;
312   }
313 
314   InternalKey smallest, largest;
315   GetRange(inputs, &smallest, &largest);
316   return RangeOverlapWithCompaction(smallest.user_key(), largest.user_key(),
317                                     level);
318 }
319 
320 // Returns true if any one of specified files are being compacted
AreFilesInCompaction(const std::vector<FileMetaData * > & files)321 bool CompactionPicker::AreFilesInCompaction(
322     const std::vector<FileMetaData*>& files) {
323   for (size_t i = 0; i < files.size(); i++) {
324     if (files[i]->being_compacted) {
325       return true;
326     }
327   }
328   return false;
329 }
330 
CompactFiles(const CompactionOptions & compact_options,const std::vector<CompactionInputFiles> & input_files,int output_level,VersionStorageInfo * vstorage,const MutableCFOptions & mutable_cf_options,uint32_t output_path_id)331 Compaction* CompactionPicker::CompactFiles(
332     const CompactionOptions& compact_options,
333     const std::vector<CompactionInputFiles>& input_files, int output_level,
334     VersionStorageInfo* vstorage, const MutableCFOptions& mutable_cf_options,
335     uint32_t output_path_id) {
336   assert(input_files.size());
337   // This compaction output should not overlap with a running compaction as
338   // `SanitizeCompactionInputFiles` should've checked earlier and db mutex
339   // shouldn't have been released since.
340   assert(!FilesRangeOverlapWithCompaction(input_files, output_level));
341 
342   CompressionType compression_type;
343   if (compact_options.compression == kDisableCompressionOption) {
344     int base_level;
345     if (ioptions_.compaction_style == kCompactionStyleLevel) {
346       base_level = vstorage->base_level();
347     } else {
348       base_level = 1;
349     }
350     compression_type =
351         GetCompressionType(ioptions_, vstorage, mutable_cf_options,
352                            output_level, base_level);
353   } else {
354     // TODO(ajkr): `CompactionOptions` offers configurable `CompressionType`
355     // without configurable `CompressionOptions`, which is inconsistent.
356     compression_type = compact_options.compression;
357   }
358   auto c = new Compaction(
359       vstorage, ioptions_, mutable_cf_options, input_files, output_level,
360       compact_options.output_file_size_limit,
361       mutable_cf_options.max_compaction_bytes, output_path_id, compression_type,
362       GetCompressionOptions(ioptions_, vstorage, output_level),
363       compact_options.max_subcompactions,
364       /* grandparents */ {}, true);
365   RegisterCompaction(c);
366   return c;
367 }
368 
GetCompactionInputsFromFileNumbers(std::vector<CompactionInputFiles> * input_files,std::unordered_set<uint64_t> * input_set,const VersionStorageInfo * vstorage,const CompactionOptions &) const369 Status CompactionPicker::GetCompactionInputsFromFileNumbers(
370     std::vector<CompactionInputFiles>* input_files,
371     std::unordered_set<uint64_t>* input_set, const VersionStorageInfo* vstorage,
372     const CompactionOptions& /*compact_options*/) const {
373   if (input_set->size() == 0U) {
374     return Status::InvalidArgument(
375         "Compaction must include at least one file.");
376   }
377   assert(input_files);
378 
379   std::vector<CompactionInputFiles> matched_input_files;
380   matched_input_files.resize(vstorage->num_levels());
381   int first_non_empty_level = -1;
382   int last_non_empty_level = -1;
383   // TODO(yhchiang): use a lazy-initialized mapping from
384   //                 file_number to FileMetaData in Version.
385   for (int level = 0; level < vstorage->num_levels(); ++level) {
386     for (auto file : vstorage->LevelFiles(level)) {
387       auto iter = input_set->find(file->fd.GetNumber());
388       if (iter != input_set->end()) {
389         matched_input_files[level].files.push_back(file);
390         input_set->erase(iter);
391         last_non_empty_level = level;
392         if (first_non_empty_level == -1) {
393           first_non_empty_level = level;
394         }
395       }
396     }
397   }
398 
399   if (!input_set->empty()) {
400     std::string message(
401         "Cannot find matched SST files for the following file numbers:");
402     for (auto fn : *input_set) {
403       message += " ";
404       message += ToString(fn);
405     }
406     return Status::InvalidArgument(message);
407   }
408 
409   for (int level = first_non_empty_level; level <= last_non_empty_level;
410        ++level) {
411     matched_input_files[level].level = level;
412     input_files->emplace_back(std::move(matched_input_files[level]));
413   }
414 
415   return Status::OK();
416 }
417 
418 // Returns true if any one of the parent files are being compacted
IsRangeInCompaction(VersionStorageInfo * vstorage,const InternalKey * smallest,const InternalKey * largest,int level,int * level_index)419 bool CompactionPicker::IsRangeInCompaction(VersionStorageInfo* vstorage,
420                                            const InternalKey* smallest,
421                                            const InternalKey* largest,
422                                            int level, int* level_index) {
423   std::vector<FileMetaData*> inputs;
424   assert(level < NumberLevels());
425 
426   vstorage->GetOverlappingInputs(level, smallest, largest, &inputs,
427                                  level_index ? *level_index : 0, level_index);
428   return AreFilesInCompaction(inputs);
429 }
430 
431 // Populates the set of inputs of all other levels that overlap with the
432 // start level.
433 // Now we assume all levels except start level and output level are empty.
434 // Will also attempt to expand "start level" if that doesn't expand
435 // "output level" or cause "level" to include a file for compaction that has an
436 // overlapping user-key with another file.
437 // REQUIRES: input_level and output_level are different
438 // REQUIRES: inputs->empty() == false
439 // Returns false if files on parent level are currently in compaction, which
440 // means that we can't compact them
SetupOtherInputs(const std::string & cf_name,const MutableCFOptions & mutable_cf_options,VersionStorageInfo * vstorage,CompactionInputFiles * inputs,CompactionInputFiles * output_level_inputs,int * parent_index,int base_index)441 bool CompactionPicker::SetupOtherInputs(
442     const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
443     VersionStorageInfo* vstorage, CompactionInputFiles* inputs,
444     CompactionInputFiles* output_level_inputs, int* parent_index,
445     int base_index) {
446   assert(!inputs->empty());
447   assert(output_level_inputs->empty());
448   const int input_level = inputs->level;
449   const int output_level = output_level_inputs->level;
450   if (input_level == output_level) {
451     // no possibility of conflict
452     return true;
453   }
454 
455   // For now, we only support merging two levels, start level and output level.
456   // We need to assert other levels are empty.
457   for (int l = input_level + 1; l < output_level; l++) {
458     assert(vstorage->NumLevelFiles(l) == 0);
459   }
460 
461   InternalKey smallest, largest;
462 
463   // Get the range one last time.
464   GetRange(*inputs, &smallest, &largest);
465 
466   // Populate the set of next-level files (inputs_GetOutputLevelInputs()) to
467   // include in compaction
468   vstorage->GetOverlappingInputs(output_level, &smallest, &largest,
469                                  &output_level_inputs->files, *parent_index,
470                                  parent_index);
471   if (AreFilesInCompaction(output_level_inputs->files)) {
472     return false;
473   }
474   if (!output_level_inputs->empty()) {
475     if (!ExpandInputsToCleanCut(cf_name, vstorage, output_level_inputs)) {
476       return false;
477     }
478   }
479 
480   // See if we can further grow the number of inputs in "level" without
481   // changing the number of "level+1" files we pick up. We also choose NOT
482   // to expand if this would cause "level" to include some entries for some
483   // user key, while excluding other entries for the same user key. This
484   // can happen when one user key spans multiple files.
485   if (!output_level_inputs->empty()) {
486     const uint64_t limit = mutable_cf_options.max_compaction_bytes;
487     const uint64_t output_level_inputs_size =
488         TotalCompensatedFileSize(output_level_inputs->files);
489     const uint64_t inputs_size = TotalCompensatedFileSize(inputs->files);
490     bool expand_inputs = false;
491 
492     CompactionInputFiles expanded_inputs;
493     expanded_inputs.level = input_level;
494     // Get closed interval of output level
495     InternalKey all_start, all_limit;
496     GetRange(*inputs, *output_level_inputs, &all_start, &all_limit);
497     bool try_overlapping_inputs = true;
498     vstorage->GetOverlappingInputs(input_level, &all_start, &all_limit,
499                                    &expanded_inputs.files, base_index, nullptr);
500     uint64_t expanded_inputs_size =
501         TotalCompensatedFileSize(expanded_inputs.files);
502     if (!ExpandInputsToCleanCut(cf_name, vstorage, &expanded_inputs)) {
503       try_overlapping_inputs = false;
504     }
505     if (try_overlapping_inputs && expanded_inputs.size() > inputs->size() &&
506         output_level_inputs_size + expanded_inputs_size < limit &&
507         !AreFilesInCompaction(expanded_inputs.files)) {
508       InternalKey new_start, new_limit;
509       GetRange(expanded_inputs, &new_start, &new_limit);
510       CompactionInputFiles expanded_output_level_inputs;
511       expanded_output_level_inputs.level = output_level;
512       vstorage->GetOverlappingInputs(output_level, &new_start, &new_limit,
513                                      &expanded_output_level_inputs.files,
514                                      *parent_index, parent_index);
515       assert(!expanded_output_level_inputs.empty());
516       if (!AreFilesInCompaction(expanded_output_level_inputs.files) &&
517           ExpandInputsToCleanCut(cf_name, vstorage,
518                                  &expanded_output_level_inputs) &&
519           expanded_output_level_inputs.size() == output_level_inputs->size()) {
520         expand_inputs = true;
521       }
522     }
523     if (!expand_inputs) {
524       vstorage->GetCleanInputsWithinInterval(input_level, &all_start,
525                                              &all_limit, &expanded_inputs.files,
526                                              base_index, nullptr);
527       expanded_inputs_size = TotalCompensatedFileSize(expanded_inputs.files);
528       if (expanded_inputs.size() > inputs->size() &&
529           output_level_inputs_size + expanded_inputs_size < limit &&
530           !AreFilesInCompaction(expanded_inputs.files)) {
531         expand_inputs = true;
532       }
533     }
534     if (expand_inputs) {
535       ROCKS_LOG_INFO(ioptions_.info_log,
536                      "[%s] Expanding@%d %" ROCKSDB_PRIszt "+%" ROCKSDB_PRIszt
537                      "(%" PRIu64 "+%" PRIu64 " bytes) to %" ROCKSDB_PRIszt
538                      "+%" ROCKSDB_PRIszt " (%" PRIu64 "+%" PRIu64 " bytes)\n",
539                      cf_name.c_str(), input_level, inputs->size(),
540                      output_level_inputs->size(), inputs_size,
541                      output_level_inputs_size, expanded_inputs.size(),
542                      output_level_inputs->size(), expanded_inputs_size,
543                      output_level_inputs_size);
544       inputs->files = expanded_inputs.files;
545     }
546   }
547   return true;
548 }
549 
GetGrandparents(VersionStorageInfo * vstorage,const CompactionInputFiles & inputs,const CompactionInputFiles & output_level_inputs,std::vector<FileMetaData * > * grandparents)550 void CompactionPicker::GetGrandparents(
551     VersionStorageInfo* vstorage, const CompactionInputFiles& inputs,
552     const CompactionInputFiles& output_level_inputs,
553     std::vector<FileMetaData*>* grandparents) {
554   InternalKey start, limit;
555   GetRange(inputs, output_level_inputs, &start, &limit);
556   // Compute the set of grandparent files that overlap this compaction
557   // (parent == level+1; grandparent == level+2)
558   if (output_level_inputs.level + 1 < NumberLevels()) {
559     vstorage->GetOverlappingInputs(output_level_inputs.level + 1, &start,
560                                    &limit, grandparents);
561   }
562 }
563 
CompactRange(const std::string & cf_name,const MutableCFOptions & mutable_cf_options,VersionStorageInfo * vstorage,int input_level,int output_level,const CompactRangeOptions & compact_range_options,const InternalKey * begin,const InternalKey * end,InternalKey ** compaction_end,bool * manual_conflict,uint64_t max_file_num_to_ignore)564 Compaction* CompactionPicker::CompactRange(
565     const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
566     VersionStorageInfo* vstorage, int input_level, int output_level,
567     const CompactRangeOptions& compact_range_options, const InternalKey* begin,
568     const InternalKey* end, InternalKey** compaction_end, bool* manual_conflict,
569     uint64_t max_file_num_to_ignore) {
570   // CompactionPickerFIFO has its own implementation of compact range
571   assert(ioptions_.compaction_style != kCompactionStyleFIFO);
572 
573   if (input_level == ColumnFamilyData::kCompactAllLevels) {
574     assert(ioptions_.compaction_style == kCompactionStyleUniversal);
575 
576     // Universal compaction with more than one level always compacts all the
577     // files together to the last level.
578     assert(vstorage->num_levels() > 1);
579     // DBImpl::CompactRange() set output level to be the last level
580     if (ioptions_.allow_ingest_behind) {
581       assert(output_level == vstorage->num_levels() - 2);
582     } else {
583       assert(output_level == vstorage->num_levels() - 1);
584     }
585     // DBImpl::RunManualCompaction will make full range for universal compaction
586     assert(begin == nullptr);
587     assert(end == nullptr);
588     *compaction_end = nullptr;
589 
590     int start_level = 0;
591     for (; start_level < vstorage->num_levels() &&
592            vstorage->NumLevelFiles(start_level) == 0;
593          start_level++) {
594     }
595     if (start_level == vstorage->num_levels()) {
596       return nullptr;
597     }
598 
599     if ((start_level == 0) && (!level0_compactions_in_progress_.empty())) {
600       *manual_conflict = true;
601       // Only one level 0 compaction allowed
602       return nullptr;
603     }
604 
605     std::vector<CompactionInputFiles> inputs(vstorage->num_levels() -
606                                              start_level);
607     for (int level = start_level; level < vstorage->num_levels(); level++) {
608       inputs[level - start_level].level = level;
609       auto& files = inputs[level - start_level].files;
610       for (FileMetaData* f : vstorage->LevelFiles(level)) {
611         files.push_back(f);
612       }
613       if (AreFilesInCompaction(files)) {
614         *manual_conflict = true;
615         return nullptr;
616       }
617     }
618 
619     // 2 non-exclusive manual compactions could run at the same time producing
620     // overlaping outputs in the same level.
621     if (FilesRangeOverlapWithCompaction(inputs, output_level)) {
622       // This compaction output could potentially conflict with the output
623       // of a currently running compaction, we cannot run it.
624       *manual_conflict = true;
625       return nullptr;
626     }
627 
628     Compaction* c = new Compaction(
629         vstorage, ioptions_, mutable_cf_options, std::move(inputs),
630         output_level,
631         MaxFileSizeForLevel(mutable_cf_options, output_level,
632                             ioptions_.compaction_style),
633         /* max_compaction_bytes */ LLONG_MAX,
634         compact_range_options.target_path_id,
635         GetCompressionType(ioptions_, vstorage, mutable_cf_options,
636                            output_level, 1),
637         GetCompressionOptions(ioptions_, vstorage, output_level),
638         compact_range_options.max_subcompactions, /* grandparents */ {},
639         /* is manual */ true);
640     RegisterCompaction(c);
641     return c;
642   }
643 
644   CompactionInputFiles inputs;
645   inputs.level = input_level;
646   bool covering_the_whole_range = true;
647 
648   // All files are 'overlapping' in universal style compaction.
649   // We have to compact the entire range in one shot.
650   if (ioptions_.compaction_style == kCompactionStyleUniversal) {
651     begin = nullptr;
652     end = nullptr;
653   }
654 
655   vstorage->GetOverlappingInputs(input_level, begin, end, &inputs.files);
656   if (inputs.empty()) {
657     return nullptr;
658   }
659 
660   if ((input_level == 0) && (!level0_compactions_in_progress_.empty())) {
661     // Only one level 0 compaction allowed
662     TEST_SYNC_POINT("CompactionPicker::CompactRange:Conflict");
663     *manual_conflict = true;
664     return nullptr;
665   }
666 
667   // Avoid compacting too much in one shot in case the range is large.
668   // But we cannot do this for level-0 since level-0 files can overlap
669   // and we must not pick one file and drop another older file if the
670   // two files overlap.
671   if (input_level > 0) {
672     const uint64_t limit = mutable_cf_options.max_compaction_bytes;
673     uint64_t total = 0;
674     for (size_t i = 0; i + 1 < inputs.size(); ++i) {
675       uint64_t s = inputs[i]->compensated_file_size;
676       total += s;
677       if (total >= limit) {
678         covering_the_whole_range = false;
679         inputs.files.resize(i + 1);
680         break;
681       }
682     }
683   }
684   assert(compact_range_options.target_path_id <
685          static_cast<uint32_t>(ioptions_.cf_paths.size()));
686 
687   // for BOTTOM LEVEL compaction only, use max_file_num_to_ignore to filter out
688   // files that are created during the current compaction.
689   if (compact_range_options.bottommost_level_compaction ==
690           BottommostLevelCompaction::kForceOptimized &&
691       max_file_num_to_ignore != port::kMaxUint64) {
692     assert(input_level == output_level);
693     // inputs_shrunk holds a continuous subset of input files which were all
694     // created before the current manual compaction
695     std::vector<FileMetaData*> inputs_shrunk;
696     size_t skip_input_index = inputs.size();
697     for (size_t i = 0; i < inputs.size(); ++i) {
698       if (inputs[i]->fd.GetNumber() < max_file_num_to_ignore) {
699         inputs_shrunk.push_back(inputs[i]);
700       } else if (!inputs_shrunk.empty()) {
701         // inputs[i] was created during the current manual compaction and
702         // need to be skipped
703         skip_input_index = i;
704         break;
705       }
706     }
707     if (inputs_shrunk.empty()) {
708       return nullptr;
709     }
710     if (inputs.size() != inputs_shrunk.size()) {
711       inputs.files.swap(inputs_shrunk);
712     }
713     // set covering_the_whole_range to false if there is any file that need to
714     // be compacted in the range of inputs[skip_input_index+1, inputs.size())
715     for (size_t i = skip_input_index + 1; i < inputs.size(); ++i) {
716       if (inputs[i]->fd.GetNumber() < max_file_num_to_ignore) {
717         covering_the_whole_range = false;
718       }
719     }
720   }
721 
722   InternalKey key_storage;
723   InternalKey* next_smallest = &key_storage;
724   if (ExpandInputsToCleanCut(cf_name, vstorage, &inputs, &next_smallest) ==
725       false) {
726     // manual compaction is now multi-threaded, so it can
727     // happen that ExpandWhileOverlapping fails
728     // we handle it higher in RunManualCompaction
729     *manual_conflict = true;
730     return nullptr;
731   }
732 
733   if (covering_the_whole_range || !next_smallest) {
734     *compaction_end = nullptr;
735   } else {
736     **compaction_end = *next_smallest;
737   }
738 
739   CompactionInputFiles output_level_inputs;
740   if (output_level == ColumnFamilyData::kCompactToBaseLevel) {
741     assert(input_level == 0);
742     output_level = vstorage->base_level();
743     assert(output_level > 0);
744   }
745   output_level_inputs.level = output_level;
746   if (input_level != output_level) {
747     int parent_index = -1;
748     if (!SetupOtherInputs(cf_name, mutable_cf_options, vstorage, &inputs,
749                           &output_level_inputs, &parent_index, -1)) {
750       // manual compaction is now multi-threaded, so it can
751       // happen that SetupOtherInputs fails
752       // we handle it higher in RunManualCompaction
753       *manual_conflict = true;
754       return nullptr;
755     }
756   }
757 
758   std::vector<CompactionInputFiles> compaction_inputs({inputs});
759   if (!output_level_inputs.empty()) {
760     compaction_inputs.push_back(output_level_inputs);
761   }
762   for (size_t i = 0; i < compaction_inputs.size(); i++) {
763     if (AreFilesInCompaction(compaction_inputs[i].files)) {
764       *manual_conflict = true;
765       return nullptr;
766     }
767   }
768 
769   // 2 non-exclusive manual compactions could run at the same time producing
770   // overlaping outputs in the same level.
771   if (FilesRangeOverlapWithCompaction(compaction_inputs, output_level)) {
772     // This compaction output could potentially conflict with the output
773     // of a currently running compaction, we cannot run it.
774     *manual_conflict = true;
775     return nullptr;
776   }
777 
778   std::vector<FileMetaData*> grandparents;
779   GetGrandparents(vstorage, inputs, output_level_inputs, &grandparents);
780   Compaction* compaction = new Compaction(
781       vstorage, ioptions_, mutable_cf_options, std::move(compaction_inputs),
782       output_level,
783       MaxFileSizeForLevel(mutable_cf_options, output_level,
784                           ioptions_.compaction_style, vstorage->base_level(),
785                           ioptions_.level_compaction_dynamic_level_bytes),
786       mutable_cf_options.max_compaction_bytes,
787       compact_range_options.target_path_id,
788       GetCompressionType(ioptions_, vstorage, mutable_cf_options, output_level,
789                          vstorage->base_level()),
790       GetCompressionOptions(ioptions_, vstorage, output_level),
791       compact_range_options.max_subcompactions, std::move(grandparents),
792       /* is manual compaction */ true);
793 
794   TEST_SYNC_POINT_CALLBACK("CompactionPicker::CompactRange:Return", compaction);
795   RegisterCompaction(compaction);
796 
797   // Creating a compaction influences the compaction score because the score
798   // takes running compactions into account (by skipping files that are already
799   // being compacted). Since we just changed compaction score, we recalculate it
800   // here
801   vstorage->ComputeCompactionScore(ioptions_, mutable_cf_options);
802 
803   return compaction;
804 }
805 
806 #ifndef ROCKSDB_LITE
807 namespace {
808 // Test whether two files have overlapping key-ranges.
HaveOverlappingKeyRanges(const Comparator * c,const SstFileMetaData & a,const SstFileMetaData & b)809 bool HaveOverlappingKeyRanges(const Comparator* c, const SstFileMetaData& a,
810                               const SstFileMetaData& b) {
811   if (c->Compare(a.smallestkey, b.smallestkey) >= 0) {
812     if (c->Compare(a.smallestkey, b.largestkey) <= 0) {
813       // b.smallestkey <= a.smallestkey <= b.largestkey
814       return true;
815     }
816   } else if (c->Compare(a.largestkey, b.smallestkey) >= 0) {
817     // a.smallestkey < b.smallestkey <= a.largestkey
818     return true;
819   }
820   if (c->Compare(a.largestkey, b.largestkey) <= 0) {
821     if (c->Compare(a.largestkey, b.smallestkey) >= 0) {
822       // b.smallestkey <= a.largestkey <= b.largestkey
823       return true;
824     }
825   } else if (c->Compare(a.smallestkey, b.largestkey) <= 0) {
826     // a.smallestkey <= b.largestkey < a.largestkey
827     return true;
828   }
829   return false;
830 }
831 }  // namespace
832 
SanitizeCompactionInputFilesForAllLevels(std::unordered_set<uint64_t> * input_files,const ColumnFamilyMetaData & cf_meta,const int output_level) const833 Status CompactionPicker::SanitizeCompactionInputFilesForAllLevels(
834     std::unordered_set<uint64_t>* input_files,
835     const ColumnFamilyMetaData& cf_meta, const int output_level) const {
836   auto& levels = cf_meta.levels;
837   auto comparator = icmp_->user_comparator();
838 
839   // TODO(yhchiang): add is_adjustable to CompactionOptions
840 
841   // the smallest and largest key of the current compaction input
842   std::string smallestkey;
843   std::string largestkey;
844   // a flag for initializing smallest and largest key
845   bool is_first = false;
846   const int kNotFound = -1;
847 
848   // For each level, it does the following things:
849   // 1. Find the first and the last compaction input files
850   //    in the current level.
851   // 2. Include all files between the first and the last
852   //    compaction input files.
853   // 3. Update the compaction key-range.
854   // 4. For all remaining levels, include files that have
855   //    overlapping key-range with the compaction key-range.
856   for (int l = 0; l <= output_level; ++l) {
857     auto& current_files = levels[l].files;
858     int first_included = static_cast<int>(current_files.size());
859     int last_included = kNotFound;
860 
861     // identify the first and the last compaction input files
862     // in the current level.
863     for (size_t f = 0; f < current_files.size(); ++f) {
864       if (input_files->find(TableFileNameToNumber(current_files[f].name)) !=
865           input_files->end()) {
866         first_included = std::min(first_included, static_cast<int>(f));
867         last_included = std::max(last_included, static_cast<int>(f));
868         if (is_first == false) {
869           smallestkey = current_files[f].smallestkey;
870           largestkey = current_files[f].largestkey;
871           is_first = true;
872         }
873       }
874     }
875     if (last_included == kNotFound) {
876       continue;
877     }
878 
879     if (l != 0) {
880       // expend the compaction input of the current level if it
881       // has overlapping key-range with other non-compaction input
882       // files in the same level.
883       while (first_included > 0) {
884         if (comparator->Compare(current_files[first_included - 1].largestkey,
885                                 current_files[first_included].smallestkey) <
886             0) {
887           break;
888         }
889         first_included--;
890       }
891 
892       while (last_included < static_cast<int>(current_files.size()) - 1) {
893         if (comparator->Compare(current_files[last_included + 1].smallestkey,
894                                 current_files[last_included].largestkey) > 0) {
895           break;
896         }
897         last_included++;
898       }
899     } else if (output_level > 0) {
900       last_included = static_cast<int>(current_files.size() - 1);
901     }
902 
903     // include all files between the first and the last compaction input files.
904     for (int f = first_included; f <= last_included; ++f) {
905       if (current_files[f].being_compacted) {
906         return Status::Aborted("Necessary compaction input file " +
907                                current_files[f].name +
908                                " is currently being compacted.");
909       }
910       input_files->insert(TableFileNameToNumber(current_files[f].name));
911     }
912 
913     // update smallest and largest key
914     if (l == 0) {
915       for (int f = first_included; f <= last_included; ++f) {
916         if (comparator->Compare(smallestkey, current_files[f].smallestkey) >
917             0) {
918           smallestkey = current_files[f].smallestkey;
919         }
920         if (comparator->Compare(largestkey, current_files[f].largestkey) < 0) {
921           largestkey = current_files[f].largestkey;
922         }
923       }
924     } else {
925       if (comparator->Compare(smallestkey,
926                               current_files[first_included].smallestkey) > 0) {
927         smallestkey = current_files[first_included].smallestkey;
928       }
929       if (comparator->Compare(largestkey,
930                               current_files[last_included].largestkey) < 0) {
931         largestkey = current_files[last_included].largestkey;
932       }
933     }
934 
935     SstFileMetaData aggregated_file_meta;
936     aggregated_file_meta.smallestkey = smallestkey;
937     aggregated_file_meta.largestkey = largestkey;
938 
939     // For all lower levels, include all overlapping files.
940     // We need to add overlapping files from the current level too because even
941     // if there no input_files in level l, we would still need to add files
942     // which overlap with the range containing the input_files in levels 0 to l
943     // Level 0 doesn't need to be handled this way because files are sorted by
944     // time and not by key
945     for (int m = std::max(l, 1); m <= output_level; ++m) {
946       for (auto& next_lv_file : levels[m].files) {
947         if (HaveOverlappingKeyRanges(comparator, aggregated_file_meta,
948                                      next_lv_file)) {
949           if (next_lv_file.being_compacted) {
950             return Status::Aborted(
951                 "File " + next_lv_file.name +
952                 " that has overlapping key range with one of the compaction "
953                 " input file is currently being compacted.");
954           }
955           input_files->insert(TableFileNameToNumber(next_lv_file.name));
956         }
957       }
958     }
959   }
960   if (RangeOverlapWithCompaction(smallestkey, largestkey, output_level)) {
961     return Status::Aborted(
962         "A running compaction is writing to the same output level in an "
963         "overlapping key range");
964   }
965   return Status::OK();
966 }
967 
SanitizeCompactionInputFiles(std::unordered_set<uint64_t> * input_files,const ColumnFamilyMetaData & cf_meta,const int output_level) const968 Status CompactionPicker::SanitizeCompactionInputFiles(
969     std::unordered_set<uint64_t>* input_files,
970     const ColumnFamilyMetaData& cf_meta, const int output_level) const {
971   assert(static_cast<int>(cf_meta.levels.size()) - 1 ==
972          cf_meta.levels[cf_meta.levels.size() - 1].level);
973   if (output_level >= static_cast<int>(cf_meta.levels.size())) {
974     return Status::InvalidArgument(
975         "Output level for column family " + cf_meta.name +
976         " must between [0, " +
977         ToString(cf_meta.levels[cf_meta.levels.size() - 1].level) + "].");
978   }
979 
980   if (output_level > MaxOutputLevel()) {
981     return Status::InvalidArgument(
982         "Exceed the maximum output level defined by "
983         "the current compaction algorithm --- " +
984         ToString(MaxOutputLevel()));
985   }
986 
987   if (output_level < 0) {
988     return Status::InvalidArgument("Output level cannot be negative.");
989   }
990 
991   if (input_files->size() == 0) {
992     return Status::InvalidArgument(
993         "A compaction must contain at least one file.");
994   }
995 
996   Status s = SanitizeCompactionInputFilesForAllLevels(input_files, cf_meta,
997                                                       output_level);
998 
999   if (!s.ok()) {
1000     return s;
1001   }
1002 
1003   // for all input files, check whether the file number matches
1004   // any currently-existing files.
1005   for (auto file_num : *input_files) {
1006     bool found = false;
1007     for (const auto& level_meta : cf_meta.levels) {
1008       for (const auto& file_meta : level_meta.files) {
1009         if (file_num == TableFileNameToNumber(file_meta.name)) {
1010           if (file_meta.being_compacted) {
1011             return Status::Aborted("Specified compaction input file " +
1012                                    MakeTableFileName("", file_num) +
1013                                    " is already being compacted.");
1014           }
1015           found = true;
1016           break;
1017         }
1018       }
1019       if (found) {
1020         break;
1021       }
1022     }
1023     if (!found) {
1024       return Status::InvalidArgument(
1025           "Specified compaction input file " + MakeTableFileName("", file_num) +
1026           " does not exist in column family " + cf_meta.name + ".");
1027     }
1028   }
1029 
1030   return Status::OK();
1031 }
1032 #endif  // !ROCKSDB_LITE
1033 
RegisterCompaction(Compaction * c)1034 void CompactionPicker::RegisterCompaction(Compaction* c) {
1035   if (c == nullptr) {
1036     return;
1037   }
1038   assert(ioptions_.compaction_style != kCompactionStyleLevel ||
1039          c->output_level() == 0 ||
1040          !FilesRangeOverlapWithCompaction(*c->inputs(), c->output_level()));
1041   if (c->start_level() == 0 ||
1042       ioptions_.compaction_style == kCompactionStyleUniversal) {
1043     level0_compactions_in_progress_.insert(c);
1044   }
1045   compactions_in_progress_.insert(c);
1046 }
1047 
UnregisterCompaction(Compaction * c)1048 void CompactionPicker::UnregisterCompaction(Compaction* c) {
1049   if (c == nullptr) {
1050     return;
1051   }
1052   if (c->start_level() == 0 ||
1053       ioptions_.compaction_style == kCompactionStyleUniversal) {
1054     level0_compactions_in_progress_.erase(c);
1055   }
1056   compactions_in_progress_.erase(c);
1057 }
1058 
PickFilesMarkedForCompaction(const std::string & cf_name,VersionStorageInfo * vstorage,int * start_level,int * output_level,CompactionInputFiles * start_level_inputs)1059 void CompactionPicker::PickFilesMarkedForCompaction(
1060     const std::string& cf_name, VersionStorageInfo* vstorage, int* start_level,
1061     int* output_level, CompactionInputFiles* start_level_inputs) {
1062   if (vstorage->FilesMarkedForCompaction().empty()) {
1063     return;
1064   }
1065 
1066   auto continuation = [&, cf_name](std::pair<int, FileMetaData*> level_file) {
1067     // If it's being compacted it has nothing to do here.
1068     // If this assert() fails that means that some function marked some
1069     // files as being_compacted, but didn't call ComputeCompactionScore()
1070     assert(!level_file.second->being_compacted);
1071     *start_level = level_file.first;
1072     *output_level =
1073         (*start_level == 0) ? vstorage->base_level() : *start_level + 1;
1074 
1075     if (*start_level == 0 && !level0_compactions_in_progress()->empty()) {
1076       return false;
1077     }
1078 
1079     start_level_inputs->files = {level_file.second};
1080     start_level_inputs->level = *start_level;
1081     return ExpandInputsToCleanCut(cf_name, vstorage, start_level_inputs);
1082   };
1083 
1084   // take a chance on a random file first
1085   Random64 rnd(/* seed */ reinterpret_cast<uint64_t>(vstorage));
1086   size_t random_file_index = static_cast<size_t>(rnd.Uniform(
1087       static_cast<uint64_t>(vstorage->FilesMarkedForCompaction().size())));
1088 
1089   if (continuation(vstorage->FilesMarkedForCompaction()[random_file_index])) {
1090     // found the compaction!
1091     return;
1092   }
1093 
1094   for (auto& level_file : vstorage->FilesMarkedForCompaction()) {
1095     if (continuation(level_file)) {
1096       // found the compaction!
1097       return;
1098     }
1099   }
1100   start_level_inputs->files.clear();
1101 }
1102 
GetOverlappingL0Files(VersionStorageInfo * vstorage,CompactionInputFiles * start_level_inputs,int output_level,int * parent_index)1103 bool CompactionPicker::GetOverlappingL0Files(
1104     VersionStorageInfo* vstorage, CompactionInputFiles* start_level_inputs,
1105     int output_level, int* parent_index) {
1106   // Two level 0 compaction won't run at the same time, so don't need to worry
1107   // about files on level 0 being compacted.
1108   assert(level0_compactions_in_progress()->empty());
1109   InternalKey smallest, largest;
1110   GetRange(*start_level_inputs, &smallest, &largest);
1111   // Note that the next call will discard the file we placed in
1112   // c->inputs_[0] earlier and replace it with an overlapping set
1113   // which will include the picked file.
1114   start_level_inputs->files.clear();
1115   vstorage->GetOverlappingInputs(0, &smallest, &largest,
1116                                  &(start_level_inputs->files));
1117 
1118   // If we include more L0 files in the same compaction run it can
1119   // cause the 'smallest' and 'largest' key to get extended to a
1120   // larger range. So, re-invoke GetRange to get the new key range
1121   GetRange(*start_level_inputs, &smallest, &largest);
1122   if (IsRangeInCompaction(vstorage, &smallest, &largest, output_level,
1123                           parent_index)) {
1124     return false;
1125   }
1126   assert(!start_level_inputs->files.empty());
1127 
1128   return true;
1129 }
1130 
1131 }  // namespace ROCKSDB_NAMESPACE
1132