1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include <algorithm>
11 #include <cinttypes>
12 #include <functional>
13 #include <list>
14 #include <memory>
15 #include <random>
16 #include <set>
17 #include <thread>
18 #include <utility>
19 #include <vector>
20 
21 #include "db/builder.h"
22 #include "db/compaction/compaction_job.h"
23 #include "db/db_impl/db_impl.h"
24 #include "db/db_iter.h"
25 #include "db/dbformat.h"
26 #include "db/error_handler.h"
27 #include "db/event_helpers.h"
28 #include "db/log_reader.h"
29 #include "db/log_writer.h"
30 #include "db/memtable.h"
31 #include "db/memtable_list.h"
32 #include "db/merge_context.h"
33 #include "db/merge_helper.h"
34 #include "db/range_del_aggregator.h"
35 #include "db/version_set.h"
36 #include "file/filename.h"
37 #include "file/read_write_util.h"
38 #include "file/sst_file_manager_impl.h"
39 #include "file/writable_file_writer.h"
40 #include "logging/log_buffer.h"
41 #include "logging/logging.h"
42 #include "monitoring/iostats_context_imp.h"
43 #include "monitoring/perf_context_imp.h"
44 #include "monitoring/thread_status_util.h"
45 #include "port/port.h"
46 #include "rocksdb/db.h"
47 #include "rocksdb/env.h"
48 #include "rocksdb/statistics.h"
49 #include "rocksdb/status.h"
50 #include "rocksdb/table.h"
51 #include "table/block_based/block.h"
52 #include "table/block_based/block_based_table_factory.h"
53 #include "table/merging_iterator.h"
54 #include "table/table_builder.h"
55 #include "test_util/sync_point.h"
56 #include "util/coding.h"
57 #include "util/mutexlock.h"
58 #include "util/random.h"
59 #include "util/stop_watch.h"
60 #include "util/string_util.h"
61 
62 namespace ROCKSDB_NAMESPACE {
63 
GetCompactionReasonString(CompactionReason compaction_reason)64 const char* GetCompactionReasonString(CompactionReason compaction_reason) {
65   switch (compaction_reason) {
66     case CompactionReason::kUnknown:
67       return "Unknown";
68     case CompactionReason::kLevelL0FilesNum:
69       return "LevelL0FilesNum";
70     case CompactionReason::kLevelMaxLevelSize:
71       return "LevelMaxLevelSize";
72     case CompactionReason::kUniversalSizeAmplification:
73       return "UniversalSizeAmplification";
74     case CompactionReason::kUniversalSizeRatio:
75       return "UniversalSizeRatio";
76     case CompactionReason::kUniversalSortedRunNum:
77       return "UniversalSortedRunNum";
78     case CompactionReason::kFIFOMaxSize:
79       return "FIFOMaxSize";
80     case CompactionReason::kFIFOReduceNumFiles:
81       return "FIFOReduceNumFiles";
82     case CompactionReason::kFIFOTtl:
83       return "FIFOTtl";
84     case CompactionReason::kManualCompaction:
85       return "ManualCompaction";
86     case CompactionReason::kFilesMarkedForCompaction:
87       return "FilesMarkedForCompaction";
88     case CompactionReason::kBottommostFiles:
89       return "BottommostFiles";
90     case CompactionReason::kTtl:
91       return "Ttl";
92     case CompactionReason::kFlush:
93       return "Flush";
94     case CompactionReason::kExternalSstIngestion:
95       return "ExternalSstIngestion";
96     case CompactionReason::kPeriodicCompaction:
97       return "PeriodicCompaction";
98     case CompactionReason::kNumOfReasons:
99       // fall through
100     default:
101       assert(false);
102       return "Invalid";
103   }
104 }
105 
106 // Maintains state for each sub-compaction
107 struct CompactionJob::SubcompactionState {
108   const Compaction* compaction;
109   std::unique_ptr<CompactionIterator> c_iter;
110 
111   // The boundaries of the key-range this compaction is interested in. No two
112   // subcompactions may have overlapping key-ranges.
113   // 'start' is inclusive, 'end' is exclusive, and nullptr means unbounded
114   Slice *start, *end;
115 
116   // The return status of this subcompaction
117   Status status;
118 
119   // Files produced by this subcompaction
120   struct Output {
121     FileMetaData meta;
122     bool finished;
123     std::shared_ptr<const TableProperties> table_properties;
124   };
125 
126   // State kept for output being generated
127   std::vector<Output> outputs;
128   std::unique_ptr<WritableFileWriter> outfile;
129   std::unique_ptr<TableBuilder> builder;
current_outputROCKSDB_NAMESPACE::CompactionJob::SubcompactionState130   Output* current_output() {
131     if (outputs.empty()) {
132       // This subcompaction's outptut could be empty if compaction was aborted
133       // before this subcompaction had a chance to generate any output files.
134       // When subcompactions are executed sequentially this is more likely and
135       // will be particulalry likely for the later subcompactions to be empty.
136       // Once they are run in parallel however it should be much rarer.
137       return nullptr;
138     } else {
139       return &outputs.back();
140     }
141   }
142 
143   uint64_t current_output_file_size;
144 
145   // State during the subcompaction
146   uint64_t total_bytes;
147   uint64_t num_output_records;
148   CompactionJobStats compaction_job_stats;
149   uint64_t approx_size;
150   // An index that used to speed up ShouldStopBefore().
151   size_t grandparent_index = 0;
152   // The number of bytes overlapping between the current output and
153   // grandparent files used in ShouldStopBefore().
154   uint64_t overlapped_bytes = 0;
155   // A flag determine whether the key has been seen in ShouldStopBefore()
156   bool seen_key = false;
157 
SubcompactionStateROCKSDB_NAMESPACE::CompactionJob::SubcompactionState158   SubcompactionState(Compaction* c, Slice* _start, Slice* _end,
159                      uint64_t size = 0)
160       : compaction(c),
161         start(_start),
162         end(_end),
163         outfile(nullptr),
164         builder(nullptr),
165         current_output_file_size(0),
166         total_bytes(0),
167         num_output_records(0),
168         approx_size(size),
169         grandparent_index(0),
170         overlapped_bytes(0),
171         seen_key(false) {
172     assert(compaction != nullptr);
173   }
174 
SubcompactionStateROCKSDB_NAMESPACE::CompactionJob::SubcompactionState175   SubcompactionState(SubcompactionState&& o) { *this = std::move(o); }
176 
operator =ROCKSDB_NAMESPACE::CompactionJob::SubcompactionState177   SubcompactionState& operator=(SubcompactionState&& o) {
178     compaction = std::move(o.compaction);
179     start = std::move(o.start);
180     end = std::move(o.end);
181     status = std::move(o.status);
182     outputs = std::move(o.outputs);
183     outfile = std::move(o.outfile);
184     builder = std::move(o.builder);
185     current_output_file_size = std::move(o.current_output_file_size);
186     total_bytes = std::move(o.total_bytes);
187     num_output_records = std::move(o.num_output_records);
188     compaction_job_stats = std::move(o.compaction_job_stats);
189     approx_size = std::move(o.approx_size);
190     grandparent_index = std::move(o.grandparent_index);
191     overlapped_bytes = std::move(o.overlapped_bytes);
192     seen_key = std::move(o.seen_key);
193     return *this;
194   }
195 
196   // Because member std::unique_ptrs do not have these.
197   SubcompactionState(const SubcompactionState&) = delete;
198 
199   SubcompactionState& operator=(const SubcompactionState&) = delete;
200 
201   // Returns true iff we should stop building the current output
202   // before processing "internal_key".
ShouldStopBeforeROCKSDB_NAMESPACE::CompactionJob::SubcompactionState203   bool ShouldStopBefore(const Slice& internal_key, uint64_t curr_file_size) {
204     const InternalKeyComparator* icmp =
205         &compaction->column_family_data()->internal_comparator();
206     const std::vector<FileMetaData*>& grandparents = compaction->grandparents();
207 
208     // Scan to find earliest grandparent file that contains key.
209     while (grandparent_index < grandparents.size() &&
210            icmp->Compare(internal_key,
211                          grandparents[grandparent_index]->largest.Encode()) >
212                0) {
213       if (seen_key) {
214         overlapped_bytes += grandparents[grandparent_index]->fd.GetFileSize();
215       }
216       assert(grandparent_index + 1 >= grandparents.size() ||
217              icmp->Compare(
218                  grandparents[grandparent_index]->largest.Encode(),
219                  grandparents[grandparent_index + 1]->smallest.Encode()) <= 0);
220       grandparent_index++;
221     }
222     seen_key = true;
223 
224     if (overlapped_bytes + curr_file_size >
225         compaction->max_compaction_bytes()) {
226       // Too much overlap for current output; start new output
227       overlapped_bytes = 0;
228       return true;
229     }
230 
231     return false;
232   }
233 };
234 
235 // Maintains state for the entire compaction
236 struct CompactionJob::CompactionState {
237   Compaction* const compaction;
238 
239   // REQUIRED: subcompaction states are stored in order of increasing
240   // key-range
241   std::vector<CompactionJob::SubcompactionState> sub_compact_states;
242   Status status;
243 
244   uint64_t total_bytes;
245   uint64_t num_output_records;
246 
CompactionStateROCKSDB_NAMESPACE::CompactionJob::CompactionState247   explicit CompactionState(Compaction* c)
248       : compaction(c),
249         total_bytes(0),
250         num_output_records(0) {}
251 
NumOutputFilesROCKSDB_NAMESPACE::CompactionJob::CompactionState252   size_t NumOutputFiles() {
253     size_t total = 0;
254     for (auto& s : sub_compact_states) {
255       total += s.outputs.size();
256     }
257     return total;
258   }
259 
SmallestUserKeyROCKSDB_NAMESPACE::CompactionJob::CompactionState260   Slice SmallestUserKey() {
261     for (const auto& sub_compact_state : sub_compact_states) {
262       if (!sub_compact_state.outputs.empty() &&
263           sub_compact_state.outputs[0].finished) {
264         return sub_compact_state.outputs[0].meta.smallest.user_key();
265       }
266     }
267     // If there is no finished output, return an empty slice.
268     return Slice(nullptr, 0);
269   }
270 
LargestUserKeyROCKSDB_NAMESPACE::CompactionJob::CompactionState271   Slice LargestUserKey() {
272     for (auto it = sub_compact_states.rbegin(); it < sub_compact_states.rend();
273          ++it) {
274       if (!it->outputs.empty() && it->current_output()->finished) {
275         assert(it->current_output() != nullptr);
276         return it->current_output()->meta.largest.user_key();
277       }
278     }
279     // If there is no finished output, return an empty slice.
280     return Slice(nullptr, 0);
281   }
282 };
283 
AggregateStatistics()284 void CompactionJob::AggregateStatistics() {
285   for (SubcompactionState& sc : compact_->sub_compact_states) {
286     compact_->total_bytes += sc.total_bytes;
287     compact_->num_output_records += sc.num_output_records;
288   }
289   if (compaction_job_stats_) {
290     for (SubcompactionState& sc : compact_->sub_compact_states) {
291       compaction_job_stats_->Add(sc.compaction_job_stats);
292     }
293   }
294 }
295 
CompactionJob(int job_id,Compaction * compaction,const ImmutableDBOptions & db_options,const FileOptions & file_options,VersionSet * versions,const std::atomic<bool> * shutting_down,const SequenceNumber preserve_deletes_seqnum,LogBuffer * log_buffer,Directory * db_directory,Directory * output_directory,Statistics * stats,InstrumentedMutex * db_mutex,ErrorHandler * db_error_handler,std::vector<SequenceNumber> existing_snapshots,SequenceNumber earliest_write_conflict_snapshot,const SnapshotChecker * snapshot_checker,std::shared_ptr<Cache> table_cache,EventLogger * event_logger,bool paranoid_file_checks,bool measure_io_stats,const std::string & dbname,CompactionJobStats * compaction_job_stats,Env::Priority thread_pri,const std::atomic<bool> * manual_compaction_paused)296 CompactionJob::CompactionJob(
297     int job_id, Compaction* compaction, const ImmutableDBOptions& db_options,
298     const FileOptions& file_options, VersionSet* versions,
299     const std::atomic<bool>* shutting_down,
300     const SequenceNumber preserve_deletes_seqnum, LogBuffer* log_buffer,
301     Directory* db_directory, Directory* output_directory, Statistics* stats,
302     InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler,
303     std::vector<SequenceNumber> existing_snapshots,
304     SequenceNumber earliest_write_conflict_snapshot,
305     const SnapshotChecker* snapshot_checker, std::shared_ptr<Cache> table_cache,
306     EventLogger* event_logger, bool paranoid_file_checks, bool measure_io_stats,
307     const std::string& dbname, CompactionJobStats* compaction_job_stats,
308     Env::Priority thread_pri, const std::atomic<bool>* manual_compaction_paused)
309     : job_id_(job_id),
310       compact_(new CompactionState(compaction)),
311       compaction_job_stats_(compaction_job_stats),
312       compaction_stats_(compaction->compaction_reason(), 1),
313       dbname_(dbname),
314       db_options_(db_options),
315       file_options_(file_options),
316       env_(db_options.env),
317       fs_(db_options.fs.get()),
318       file_options_for_read_(
319           fs_->OptimizeForCompactionTableRead(file_options, db_options_)),
320       versions_(versions),
321       shutting_down_(shutting_down),
322       manual_compaction_paused_(manual_compaction_paused),
323       preserve_deletes_seqnum_(preserve_deletes_seqnum),
324       log_buffer_(log_buffer),
325       db_directory_(db_directory),
326       output_directory_(output_directory),
327       stats_(stats),
328       db_mutex_(db_mutex),
329       db_error_handler_(db_error_handler),
330       existing_snapshots_(std::move(existing_snapshots)),
331       earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
332       snapshot_checker_(snapshot_checker),
333       table_cache_(std::move(table_cache)),
334       event_logger_(event_logger),
335       bottommost_level_(false),
336       paranoid_file_checks_(paranoid_file_checks),
337       measure_io_stats_(measure_io_stats),
338       write_hint_(Env::WLTH_NOT_SET),
339       thread_pri_(thread_pri) {
340   assert(log_buffer_ != nullptr);
341   const auto* cfd = compact_->compaction->column_family_data();
342   ThreadStatusUtil::SetColumnFamily(cfd, cfd->ioptions()->env,
343                                     db_options_.enable_thread_tracking);
344   ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
345   ReportStartedCompaction(compaction);
346 }
347 
~CompactionJob()348 CompactionJob::~CompactionJob() {
349   assert(compact_ == nullptr);
350   ThreadStatusUtil::ResetThreadStatus();
351 }
352 
ReportStartedCompaction(Compaction * compaction)353 void CompactionJob::ReportStartedCompaction(Compaction* compaction) {
354   const auto* cfd = compact_->compaction->column_family_data();
355   ThreadStatusUtil::SetColumnFamily(cfd, cfd->ioptions()->env,
356                                     db_options_.enable_thread_tracking);
357 
358   ThreadStatusUtil::SetThreadOperationProperty(ThreadStatus::COMPACTION_JOB_ID,
359                                                job_id_);
360 
361   ThreadStatusUtil::SetThreadOperationProperty(
362       ThreadStatus::COMPACTION_INPUT_OUTPUT_LEVEL,
363       (static_cast<uint64_t>(compact_->compaction->start_level()) << 32) +
364           compact_->compaction->output_level());
365 
366   // In the current design, a CompactionJob is always created
367   // for non-trivial compaction.
368   assert(compaction->IsTrivialMove() == false ||
369          compaction->is_manual_compaction() == true);
370 
371   ThreadStatusUtil::SetThreadOperationProperty(
372       ThreadStatus::COMPACTION_PROP_FLAGS,
373       compaction->is_manual_compaction() +
374           (compaction->deletion_compaction() << 1));
375 
376   ThreadStatusUtil::SetThreadOperationProperty(
377       ThreadStatus::COMPACTION_TOTAL_INPUT_BYTES,
378       compaction->CalculateTotalInputSize());
379 
380   IOSTATS_RESET(bytes_written);
381   IOSTATS_RESET(bytes_read);
382   ThreadStatusUtil::SetThreadOperationProperty(
383       ThreadStatus::COMPACTION_BYTES_WRITTEN, 0);
384   ThreadStatusUtil::SetThreadOperationProperty(
385       ThreadStatus::COMPACTION_BYTES_READ, 0);
386 
387   // Set the thread operation after operation properties
388   // to ensure GetThreadList() can always show them all together.
389   ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
390 
391   if (compaction_job_stats_) {
392     compaction_job_stats_->is_manual_compaction =
393         compaction->is_manual_compaction();
394   }
395 }
396 
Prepare()397 void CompactionJob::Prepare() {
398   AutoThreadOperationStageUpdater stage_updater(
399       ThreadStatus::STAGE_COMPACTION_PREPARE);
400 
401   // Generate file_levels_ for compaction berfore making Iterator
402   auto* c = compact_->compaction;
403   assert(c->column_family_data() != nullptr);
404   assert(c->column_family_data()->current()->storage_info()->NumLevelFiles(
405              compact_->compaction->level()) > 0);
406 
407   write_hint_ =
408       c->column_family_data()->CalculateSSTWriteHint(c->output_level());
409   bottommost_level_ = c->bottommost_level();
410 
411   if (c->ShouldFormSubcompactions()) {
412     {
413       StopWatch sw(env_, stats_, SUBCOMPACTION_SETUP_TIME);
414       GenSubcompactionBoundaries();
415     }
416     assert(sizes_.size() == boundaries_.size() + 1);
417 
418     for (size_t i = 0; i <= boundaries_.size(); i++) {
419       Slice* start = i == 0 ? nullptr : &boundaries_[i - 1];
420       Slice* end = i == boundaries_.size() ? nullptr : &boundaries_[i];
421       compact_->sub_compact_states.emplace_back(c, start, end, sizes_[i]);
422     }
423     RecordInHistogram(stats_, NUM_SUBCOMPACTIONS_SCHEDULED,
424                       compact_->sub_compact_states.size());
425   } else {
426     compact_->sub_compact_states.emplace_back(c, nullptr, nullptr);
427   }
428 }
429 
430 struct RangeWithSize {
431   Range range;
432   uint64_t size;
433 
RangeWithSizeROCKSDB_NAMESPACE::RangeWithSize434   RangeWithSize(const Slice& a, const Slice& b, uint64_t s = 0)
435       : range(a, b), size(s) {}
436 };
437 
GenSubcompactionBoundaries()438 void CompactionJob::GenSubcompactionBoundaries() {
439   auto* c = compact_->compaction;
440   auto* cfd = c->column_family_data();
441   const Comparator* cfd_comparator = cfd->user_comparator();
442   std::vector<Slice> bounds;
443   int start_lvl = c->start_level();
444   int out_lvl = c->output_level();
445 
446   // Add the starting and/or ending key of certain input files as a potential
447   // boundary
448   for (size_t lvl_idx = 0; lvl_idx < c->num_input_levels(); lvl_idx++) {
449     int lvl = c->level(lvl_idx);
450     if (lvl >= start_lvl && lvl <= out_lvl) {
451       const LevelFilesBrief* flevel = c->input_levels(lvl_idx);
452       size_t num_files = flevel->num_files;
453 
454       if (num_files == 0) {
455         continue;
456       }
457 
458       if (lvl == 0) {
459         // For level 0 add the starting and ending key of each file since the
460         // files may have greatly differing key ranges (not range-partitioned)
461         for (size_t i = 0; i < num_files; i++) {
462           bounds.emplace_back(flevel->files[i].smallest_key);
463           bounds.emplace_back(flevel->files[i].largest_key);
464         }
465       } else {
466         // For all other levels add the smallest/largest key in the level to
467         // encompass the range covered by that level
468         bounds.emplace_back(flevel->files[0].smallest_key);
469         bounds.emplace_back(flevel->files[num_files - 1].largest_key);
470         if (lvl == out_lvl) {
471           // For the last level include the starting keys of all files since
472           // the last level is the largest and probably has the widest key
473           // range. Since it's range partitioned, the ending key of one file
474           // and the starting key of the next are very close (or identical).
475           for (size_t i = 1; i < num_files; i++) {
476             bounds.emplace_back(flevel->files[i].smallest_key);
477           }
478         }
479       }
480     }
481   }
482 
483   std::sort(bounds.begin(), bounds.end(),
484             [cfd_comparator](const Slice& a, const Slice& b) -> bool {
485               return cfd_comparator->Compare(ExtractUserKey(a),
486                                              ExtractUserKey(b)) < 0;
487             });
488   // Remove duplicated entries from bounds
489   bounds.erase(
490       std::unique(bounds.begin(), bounds.end(),
491                   [cfd_comparator](const Slice& a, const Slice& b) -> bool {
492                     return cfd_comparator->Compare(ExtractUserKey(a),
493                                                    ExtractUserKey(b)) == 0;
494                   }),
495       bounds.end());
496 
497   // Combine consecutive pairs of boundaries into ranges with an approximate
498   // size of data covered by keys in that range
499   uint64_t sum = 0;
500   std::vector<RangeWithSize> ranges;
501   // Get input version from CompactionState since it's already referenced
502   // earlier in SetInputVersioCompaction::SetInputVersion and will not change
503   // when db_mutex_ is released below
504   auto* v = compact_->compaction->input_version();
505   for (auto it = bounds.begin();;) {
506     const Slice a = *it;
507     ++it;
508 
509     if (it == bounds.end()) {
510       break;
511     }
512 
513     const Slice b = *it;
514 
515     // ApproximateSize could potentially create table reader iterator to seek
516     // to the index block and may incur I/O cost in the process. Unlock db
517     // mutex to reduce contention
518     db_mutex_->Unlock();
519     uint64_t size = versions_->ApproximateSize(SizeApproximationOptions(), v, a,
520                                                b, start_lvl, out_lvl + 1,
521                                                TableReaderCaller::kCompaction);
522     db_mutex_->Lock();
523     ranges.emplace_back(a, b, size);
524     sum += size;
525   }
526 
527   // Group the ranges into subcompactions
528   const double min_file_fill_percent = 4.0 / 5;
529   int base_level = v->storage_info()->base_level();
530   uint64_t max_output_files = static_cast<uint64_t>(std::ceil(
531       sum / min_file_fill_percent /
532       MaxFileSizeForLevel(*(c->mutable_cf_options()), out_lvl,
533           c->immutable_cf_options()->compaction_style, base_level,
534           c->immutable_cf_options()->level_compaction_dynamic_level_bytes)));
535   uint64_t subcompactions =
536       std::min({static_cast<uint64_t>(ranges.size()),
537                 static_cast<uint64_t>(c->max_subcompactions()),
538                 max_output_files});
539 
540   if (subcompactions > 1) {
541     double mean = sum * 1.0 / subcompactions;
542     // Greedily add ranges to the subcompaction until the sum of the ranges'
543     // sizes becomes >= the expected mean size of a subcompaction
544     sum = 0;
545     for (size_t i = 0; i < ranges.size() - 1; i++) {
546       sum += ranges[i].size;
547       if (subcompactions == 1) {
548         // If there's only one left to schedule then it goes to the end so no
549         // need to put an end boundary
550         continue;
551       }
552       if (sum >= mean) {
553         boundaries_.emplace_back(ExtractUserKey(ranges[i].range.limit));
554         sizes_.emplace_back(sum);
555         subcompactions--;
556         sum = 0;
557       }
558     }
559     sizes_.emplace_back(sum + ranges.back().size);
560   } else {
561     // Only one range so its size is the total sum of sizes computed above
562     sizes_.emplace_back(sum);
563   }
564 }
565 
Run()566 Status CompactionJob::Run() {
567   AutoThreadOperationStageUpdater stage_updater(
568       ThreadStatus::STAGE_COMPACTION_RUN);
569   TEST_SYNC_POINT("CompactionJob::Run():Start");
570   log_buffer_->FlushBufferToLog();
571   LogCompaction();
572 
573   const size_t num_threads = compact_->sub_compact_states.size();
574   assert(num_threads > 0);
575   const uint64_t start_micros = env_->NowMicros();
576 
577   // Launch a thread for each of subcompactions 1...num_threads-1
578   std::vector<port::Thread> thread_pool;
579   thread_pool.reserve(num_threads - 1);
580   for (size_t i = 1; i < compact_->sub_compact_states.size(); i++) {
581     thread_pool.emplace_back(&CompactionJob::ProcessKeyValueCompaction, this,
582                              &compact_->sub_compact_states[i]);
583   }
584 
585   // Always schedule the first subcompaction (whether or not there are also
586   // others) in the current thread to be efficient with resources
587   ProcessKeyValueCompaction(&compact_->sub_compact_states[0]);
588 
589   // Wait for all other threads (if there are any) to finish execution
590   for (auto& thread : thread_pool) {
591     thread.join();
592   }
593 
594   compaction_stats_.micros = env_->NowMicros() - start_micros;
595   compaction_stats_.cpu_micros = 0;
596   for (size_t i = 0; i < compact_->sub_compact_states.size(); i++) {
597     compaction_stats_.cpu_micros +=
598         compact_->sub_compact_states[i].compaction_job_stats.cpu_micros;
599   }
600 
601   RecordTimeToHistogram(stats_, COMPACTION_TIME, compaction_stats_.micros);
602   RecordTimeToHistogram(stats_, COMPACTION_CPU_TIME,
603                         compaction_stats_.cpu_micros);
604 
605   TEST_SYNC_POINT("CompactionJob::Run:BeforeVerify");
606 
607   // Check if any thread encountered an error during execution
608   Status status;
609   for (const auto& state : compact_->sub_compact_states) {
610     if (!state.status.ok()) {
611       status = state.status;
612       break;
613     }
614   }
615 
616   if (status.ok() && output_directory_) {
617     status = output_directory_->Fsync();
618   }
619 
620   if (status.ok()) {
621     thread_pool.clear();
622     std::vector<const FileMetaData*> files_meta;
623     for (const auto& state : compact_->sub_compact_states) {
624       for (const auto& output : state.outputs) {
625         files_meta.emplace_back(&output.meta);
626       }
627     }
628     ColumnFamilyData* cfd = compact_->compaction->column_family_data();
629     auto prefix_extractor =
630         compact_->compaction->mutable_cf_options()->prefix_extractor.get();
631     std::atomic<size_t> next_file_meta_idx(0);
632     auto verify_table = [&](Status& output_status) {
633       while (true) {
634         size_t file_idx = next_file_meta_idx.fetch_add(1);
635         if (file_idx >= files_meta.size()) {
636           break;
637         }
638         // Verify that the table is usable
639         // We set for_compaction to false and don't OptimizeForCompactionTableRead
640         // here because this is a special case after we finish the table building
641         // No matter whether use_direct_io_for_flush_and_compaction is true,
642         // we will regard this verification as user reads since the goal is
643         // to cache it here for further user reads
644         InternalIterator* iter = cfd->table_cache()->NewIterator(
645             ReadOptions(), file_options_, cfd->internal_comparator(),
646             *files_meta[file_idx], /*range_del_agg=*/nullptr, prefix_extractor,
647             /*table_reader_ptr=*/nullptr,
648             cfd->internal_stats()->GetFileReadHist(
649                 compact_->compaction->output_level()),
650             TableReaderCaller::kCompactionRefill, /*arena=*/nullptr,
651             /*skip_filters=*/false, compact_->compaction->output_level(),
652             /*smallest_compaction_key=*/nullptr,
653             /*largest_compaction_key=*/nullptr);
654         auto s = iter->status();
655 
656         if (s.ok() && paranoid_file_checks_) {
657           for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {}
658           s = iter->status();
659         }
660 
661         delete iter;
662 
663         if (!s.ok()) {
664           output_status = s;
665           break;
666         }
667       }
668     };
669     for (size_t i = 1; i < compact_->sub_compact_states.size(); i++) {
670       thread_pool.emplace_back(verify_table,
671                                std::ref(compact_->sub_compact_states[i].status));
672     }
673     verify_table(compact_->sub_compact_states[0].status);
674     for (auto& thread : thread_pool) {
675       thread.join();
676     }
677     for (const auto& state : compact_->sub_compact_states) {
678       if (!state.status.ok()) {
679         status = state.status;
680         break;
681       }
682     }
683   }
684 
685   TablePropertiesCollection tp;
686   for (const auto& state : compact_->sub_compact_states) {
687     for (const auto& output : state.outputs) {
688       auto fn =
689           TableFileName(state.compaction->immutable_cf_options()->cf_paths,
690                         output.meta.fd.GetNumber(), output.meta.fd.GetPathId());
691       tp[fn] = output.table_properties;
692     }
693   }
694   compact_->compaction->SetOutputTableProperties(std::move(tp));
695 
696   // Finish up all book-keeping to unify the subcompaction results
697   AggregateStatistics();
698   UpdateCompactionStats();
699   RecordCompactionIOStats();
700   LogFlush(db_options_.info_log);
701   TEST_SYNC_POINT("CompactionJob::Run():End");
702 
703   compact_->status = status;
704   return status;
705 }
706 
Install(const MutableCFOptions & mutable_cf_options)707 Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
708   AutoThreadOperationStageUpdater stage_updater(
709       ThreadStatus::STAGE_COMPACTION_INSTALL);
710   db_mutex_->AssertHeld();
711   Status status = compact_->status;
712   ColumnFamilyData* cfd = compact_->compaction->column_family_data();
713   cfd->internal_stats()->AddCompactionStats(
714       compact_->compaction->output_level(), thread_pri_, compaction_stats_);
715 
716   if (status.ok()) {
717     status = InstallCompactionResults(mutable_cf_options);
718   }
719   VersionStorageInfo::LevelSummaryStorage tmp;
720   auto vstorage = cfd->current()->storage_info();
721   const auto& stats = compaction_stats_;
722 
723   double read_write_amp = 0.0;
724   double write_amp = 0.0;
725   double bytes_read_per_sec = 0;
726   double bytes_written_per_sec = 0;
727 
728   if (stats.bytes_read_non_output_levels > 0) {
729     read_write_amp = (stats.bytes_written + stats.bytes_read_output_level +
730                       stats.bytes_read_non_output_levels) /
731                      static_cast<double>(stats.bytes_read_non_output_levels);
732     write_amp = stats.bytes_written /
733                 static_cast<double>(stats.bytes_read_non_output_levels);
734   }
735   if (stats.micros > 0) {
736     bytes_read_per_sec =
737         (stats.bytes_read_non_output_levels + stats.bytes_read_output_level) /
738         static_cast<double>(stats.micros);
739     bytes_written_per_sec =
740         stats.bytes_written / static_cast<double>(stats.micros);
741   }
742 
743   ROCKS_LOG_BUFFER(
744       log_buffer_,
745       "[%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, "
746       "files in(%d, %d) out(%d) "
747       "MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) "
748       "write-amplify(%.1f) %s, records in: %" PRIu64
749       ", records dropped: %" PRIu64 " output_compression: %s\n",
750       cfd->GetName().c_str(), vstorage->LevelSummary(&tmp), bytes_read_per_sec,
751       bytes_written_per_sec, compact_->compaction->output_level(),
752       stats.num_input_files_in_non_output_levels,
753       stats.num_input_files_in_output_level, stats.num_output_files,
754       stats.bytes_read_non_output_levels / 1048576.0,
755       stats.bytes_read_output_level / 1048576.0,
756       stats.bytes_written / 1048576.0, read_write_amp, write_amp,
757       status.ToString().c_str(), stats.num_input_records,
758       stats.num_dropped_records,
759       CompressionTypeToString(compact_->compaction->output_compression())
760           .c_str());
761 
762   UpdateCompactionJobStats(stats);
763 
764   auto stream = event_logger_->LogToBuffer(log_buffer_);
765   stream << "job" << job_id_ << "event"
766          << "compaction_finished"
767          << "compaction_time_micros" << stats.micros
768          << "compaction_time_cpu_micros" << stats.cpu_micros << "output_level"
769          << compact_->compaction->output_level() << "num_output_files"
770          << compact_->NumOutputFiles() << "total_output_size"
771          << compact_->total_bytes << "num_input_records"
772          << stats.num_input_records << "num_output_records"
773          << compact_->num_output_records << "num_subcompactions"
774          << compact_->sub_compact_states.size() << "output_compression"
775          << CompressionTypeToString(compact_->compaction->output_compression());
776 
777   if (compaction_job_stats_ != nullptr) {
778     stream << "num_single_delete_mismatches"
779            << compaction_job_stats_->num_single_del_mismatch;
780     stream << "num_single_delete_fallthrough"
781            << compaction_job_stats_->num_single_del_fallthru;
782   }
783 
784   if (measure_io_stats_ && compaction_job_stats_ != nullptr) {
785     stream << "file_write_nanos" << compaction_job_stats_->file_write_nanos;
786     stream << "file_range_sync_nanos"
787            << compaction_job_stats_->file_range_sync_nanos;
788     stream << "file_fsync_nanos" << compaction_job_stats_->file_fsync_nanos;
789     stream << "file_prepare_write_nanos"
790            << compaction_job_stats_->file_prepare_write_nanos;
791   }
792 
793   stream << "lsm_state";
794   stream.StartArray();
795   for (int level = 0; level < vstorage->num_levels(); ++level) {
796     stream << vstorage->NumLevelFiles(level);
797   }
798   stream.EndArray();
799 
800   CleanupCompaction();
801   return status;
802 }
803 
ProcessKeyValueCompaction(SubcompactionState * sub_compact)804 void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
805   assert(sub_compact != nullptr);
806 
807   uint64_t prev_cpu_micros = env_->NowCPUNanos() / 1000;
808 
809   ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
810 
811   // Create compaction filter and fail the compaction if
812   // IgnoreSnapshots() = false because it is not supported anymore
813   const CompactionFilter* compaction_filter =
814       cfd->ioptions()->compaction_filter;
815   std::unique_ptr<CompactionFilter> compaction_filter_from_factory = nullptr;
816   if (compaction_filter == nullptr) {
817     compaction_filter_from_factory =
818         sub_compact->compaction->CreateCompactionFilter();
819     compaction_filter = compaction_filter_from_factory.get();
820   }
821   if (compaction_filter != nullptr && !compaction_filter->IgnoreSnapshots()) {
822     sub_compact->status = Status::NotSupported(
823         "CompactionFilter::IgnoreSnapshots() = false is not supported "
824         "anymore.");
825     return;
826   }
827 
828   CompactionRangeDelAggregator range_del_agg(&cfd->internal_comparator(),
829                                              existing_snapshots_);
830 
831   // Although the v2 aggregator is what the level iterator(s) know about,
832   // the AddTombstones calls will be propagated down to the v1 aggregator.
833   std::unique_ptr<InternalIterator> input(versions_->MakeInputIterator(
834       sub_compact->compaction, &range_del_agg, file_options_for_read_));
835 
836   AutoThreadOperationStageUpdater stage_updater(
837       ThreadStatus::STAGE_COMPACTION_PROCESS_KV);
838 
839   // I/O measurement variables
840   PerfLevel prev_perf_level = PerfLevel::kEnableTime;
841   const uint64_t kRecordStatsEvery = 1000;
842   uint64_t prev_write_nanos = 0;
843   uint64_t prev_fsync_nanos = 0;
844   uint64_t prev_range_sync_nanos = 0;
845   uint64_t prev_prepare_write_nanos = 0;
846   uint64_t prev_cpu_write_nanos = 0;
847   uint64_t prev_cpu_read_nanos = 0;
848   if (measure_io_stats_) {
849     prev_perf_level = GetPerfLevel();
850     SetPerfLevel(PerfLevel::kEnableTimeAndCPUTimeExceptForMutex);
851     prev_write_nanos = IOSTATS(write_nanos);
852     prev_fsync_nanos = IOSTATS(fsync_nanos);
853     prev_range_sync_nanos = IOSTATS(range_sync_nanos);
854     prev_prepare_write_nanos = IOSTATS(prepare_write_nanos);
855     prev_cpu_write_nanos = IOSTATS(cpu_write_nanos);
856     prev_cpu_read_nanos = IOSTATS(cpu_read_nanos);
857   }
858 
859   MergeHelper merge(
860       env_, cfd->user_comparator(), cfd->ioptions()->merge_operator,
861       compaction_filter, db_options_.info_log.get(),
862       false /* internal key corruption is expected */,
863       existing_snapshots_.empty() ? 0 : existing_snapshots_.back(),
864       snapshot_checker_, compact_->compaction->level(),
865       db_options_.statistics.get());
866 
867   TEST_SYNC_POINT("CompactionJob::Run():Inprogress");
868   TEST_SYNC_POINT_CALLBACK(
869       "CompactionJob::Run():PausingManualCompaction:1",
870       reinterpret_cast<void*>(
871           const_cast<std::atomic<bool>*>(manual_compaction_paused_)));
872 
873   Slice* start = sub_compact->start;
874   Slice* end = sub_compact->end;
875   if (start != nullptr) {
876     IterKey start_iter;
877     start_iter.SetInternalKey(*start, kMaxSequenceNumber, kValueTypeForSeek);
878     input->Seek(start_iter.GetInternalKey());
879   } else {
880     input->SeekToFirst();
881   }
882 
883   Status status;
884   sub_compact->c_iter.reset(new CompactionIterator(
885       input.get(), cfd->user_comparator(), &merge, versions_->LastSequence(),
886       &existing_snapshots_, earliest_write_conflict_snapshot_,
887       snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_), false,
888       &range_del_agg, sub_compact->compaction, compaction_filter,
889       shutting_down_, preserve_deletes_seqnum_, manual_compaction_paused_,
890       db_options_.info_log));
891   auto c_iter = sub_compact->c_iter.get();
892   c_iter->SeekToFirst();
893   if (c_iter->Valid() && sub_compact->compaction->output_level() != 0) {
894     // ShouldStopBefore() maintains state based on keys processed so far. The
895     // compaction loop always calls it on the "next" key, thus won't tell it the
896     // first key. So we do that here.
897     sub_compact->ShouldStopBefore(c_iter->key(),
898                                   sub_compact->current_output_file_size);
899   }
900   const auto& c_iter_stats = c_iter->iter_stats();
901 
902   while (status.ok() && !cfd->IsDropped() && c_iter->Valid()) {
903     // Invariant: c_iter.status() is guaranteed to be OK if c_iter->Valid()
904     // returns true.
905     const Slice& key = c_iter->key();
906     const Slice& value = c_iter->value();
907 
908     // If an end key (exclusive) is specified, check if the current key is
909     // >= than it and exit if it is because the iterator is out of its range
910     if (end != nullptr &&
911         cfd->user_comparator()->Compare(c_iter->user_key(), *end) >= 0) {
912       break;
913     }
914     if (c_iter_stats.num_input_records % kRecordStatsEvery ==
915         kRecordStatsEvery - 1) {
916       RecordDroppedKeys(c_iter_stats, &sub_compact->compaction_job_stats);
917       c_iter->ResetRecordCounts();
918       RecordCompactionIOStats();
919     }
920 
921     // Open output file if necessary
922     if (sub_compact->builder == nullptr) {
923       status = OpenCompactionOutputFile(sub_compact);
924       if (!status.ok()) {
925         break;
926       }
927     }
928     assert(sub_compact->builder != nullptr);
929     assert(sub_compact->current_output() != nullptr);
930     sub_compact->builder->Add(key, value);
931     sub_compact->current_output_file_size = sub_compact->builder->FileSize();
932     const ParsedInternalKey& ikey = c_iter->ikey();
933     sub_compact->current_output()->meta.UpdateBoundaries(
934         key, value, ikey.sequence, ikey.type);
935     sub_compact->num_output_records++;
936 
937     // Close output file if it is big enough. Two possibilities determine it's
938     // time to close it: (1) the current key should be this file's last key, (2)
939     // the next key should not be in this file.
940     //
941     // TODO(aekmekji): determine if file should be closed earlier than this
942     // during subcompactions (i.e. if output size, estimated by input size, is
943     // going to be 1.2MB and max_output_file_size = 1MB, prefer to have 0.6MB
944     // and 0.6MB instead of 1MB and 0.2MB)
945     bool output_file_ended = false;
946     Status input_status;
947     if (sub_compact->compaction->output_level() != 0 &&
948         sub_compact->current_output_file_size >=
949             sub_compact->compaction->max_output_file_size()) {
950       // (1) this key terminates the file. For historical reasons, the iterator
951       // status before advancing will be given to FinishCompactionOutputFile().
952       input_status = input->status();
953       output_file_ended = true;
954     }
955     TEST_SYNC_POINT_CALLBACK(
956         "CompactionJob::Run():PausingManualCompaction:2",
957         reinterpret_cast<void*>(
958             const_cast<std::atomic<bool>*>(manual_compaction_paused_)));
959     c_iter->Next();
960     if (c_iter->status().IsManualCompactionPaused()) {
961       break;
962     }
963     if (!output_file_ended && c_iter->Valid() &&
964         sub_compact->compaction->output_level() != 0 &&
965         sub_compact->ShouldStopBefore(c_iter->key(),
966                                       sub_compact->current_output_file_size) &&
967         sub_compact->builder != nullptr) {
968       // (2) this key belongs to the next file. For historical reasons, the
969       // iterator status after advancing will be given to
970       // FinishCompactionOutputFile().
971       input_status = input->status();
972       output_file_ended = true;
973     }
974     if (output_file_ended) {
975       const Slice* next_key = nullptr;
976       if (c_iter->Valid()) {
977         next_key = &c_iter->key();
978       }
979       CompactionIterationStats range_del_out_stats;
980       status =
981           FinishCompactionOutputFile(input_status, sub_compact, &range_del_agg,
982                                      &range_del_out_stats, next_key);
983       RecordDroppedKeys(range_del_out_stats,
984                         &sub_compact->compaction_job_stats);
985     }
986   }
987 
988   sub_compact->compaction_job_stats.num_input_deletion_records =
989       c_iter_stats.num_input_deletion_records;
990   sub_compact->compaction_job_stats.num_corrupt_keys =
991       c_iter_stats.num_input_corrupt_records;
992   sub_compact->compaction_job_stats.num_single_del_fallthru =
993       c_iter_stats.num_single_del_fallthru;
994   sub_compact->compaction_job_stats.num_single_del_mismatch =
995       c_iter_stats.num_single_del_mismatch;
996   sub_compact->compaction_job_stats.total_input_raw_key_bytes +=
997       c_iter_stats.total_input_raw_key_bytes;
998   sub_compact->compaction_job_stats.total_input_raw_value_bytes +=
999       c_iter_stats.total_input_raw_value_bytes;
1000 
1001   RecordTick(stats_, FILTER_OPERATION_TOTAL_TIME,
1002              c_iter_stats.total_filter_time);
1003   RecordDroppedKeys(c_iter_stats, &sub_compact->compaction_job_stats);
1004   RecordCompactionIOStats();
1005 
1006   if (status.ok() && cfd->IsDropped()) {
1007     status =
1008         Status::ColumnFamilyDropped("Column family dropped during compaction");
1009   }
1010   if ((status.ok() || status.IsColumnFamilyDropped()) &&
1011       shutting_down_->load(std::memory_order_relaxed)) {
1012     status = Status::ShutdownInProgress("Database shutdown");
1013   }
1014   if ((status.ok() || status.IsColumnFamilyDropped()) &&
1015       (manual_compaction_paused_ &&
1016        manual_compaction_paused_->load(std::memory_order_relaxed))) {
1017     status = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
1018   }
1019   if (status.ok()) {
1020     status = input->status();
1021   }
1022   if (status.ok()) {
1023     status = c_iter->status();
1024   }
1025 
1026   if (status.ok() && sub_compact->builder == nullptr &&
1027       sub_compact->outputs.size() == 0 && !range_del_agg.IsEmpty()) {
1028     // handle subcompaction containing only range deletions
1029     status = OpenCompactionOutputFile(sub_compact);
1030   }
1031 
1032   // Call FinishCompactionOutputFile() even if status is not ok: it needs to
1033   // close the output file.
1034   if (sub_compact->builder != nullptr) {
1035     CompactionIterationStats range_del_out_stats;
1036     Status s = FinishCompactionOutputFile(status, sub_compact, &range_del_agg,
1037                                           &range_del_out_stats);
1038     if (status.ok()) {
1039       status = s;
1040     }
1041     RecordDroppedKeys(range_del_out_stats, &sub_compact->compaction_job_stats);
1042   }
1043 
1044   sub_compact->compaction_job_stats.cpu_micros =
1045       env_->NowCPUNanos() / 1000 - prev_cpu_micros;
1046 
1047   if (measure_io_stats_) {
1048     sub_compact->compaction_job_stats.file_write_nanos +=
1049         IOSTATS(write_nanos) - prev_write_nanos;
1050     sub_compact->compaction_job_stats.file_fsync_nanos +=
1051         IOSTATS(fsync_nanos) - prev_fsync_nanos;
1052     sub_compact->compaction_job_stats.file_range_sync_nanos +=
1053         IOSTATS(range_sync_nanos) - prev_range_sync_nanos;
1054     sub_compact->compaction_job_stats.file_prepare_write_nanos +=
1055         IOSTATS(prepare_write_nanos) - prev_prepare_write_nanos;
1056     sub_compact->compaction_job_stats.cpu_micros -=
1057         (IOSTATS(cpu_write_nanos) - prev_cpu_write_nanos +
1058          IOSTATS(cpu_read_nanos) - prev_cpu_read_nanos) /
1059         1000;
1060     if (prev_perf_level != PerfLevel::kEnableTimeAndCPUTimeExceptForMutex) {
1061       SetPerfLevel(prev_perf_level);
1062     }
1063   }
1064 
1065   sub_compact->c_iter.reset();
1066   input.reset();
1067   sub_compact->status = status;
1068 }
1069 
RecordDroppedKeys(const CompactionIterationStats & c_iter_stats,CompactionJobStats * compaction_job_stats)1070 void CompactionJob::RecordDroppedKeys(
1071     const CompactionIterationStats& c_iter_stats,
1072     CompactionJobStats* compaction_job_stats) {
1073   if (c_iter_stats.num_record_drop_user > 0) {
1074     RecordTick(stats_, COMPACTION_KEY_DROP_USER,
1075                c_iter_stats.num_record_drop_user);
1076   }
1077   if (c_iter_stats.num_record_drop_hidden > 0) {
1078     RecordTick(stats_, COMPACTION_KEY_DROP_NEWER_ENTRY,
1079                c_iter_stats.num_record_drop_hidden);
1080     if (compaction_job_stats) {
1081       compaction_job_stats->num_records_replaced +=
1082           c_iter_stats.num_record_drop_hidden;
1083     }
1084   }
1085   if (c_iter_stats.num_record_drop_obsolete > 0) {
1086     RecordTick(stats_, COMPACTION_KEY_DROP_OBSOLETE,
1087                c_iter_stats.num_record_drop_obsolete);
1088     if (compaction_job_stats) {
1089       compaction_job_stats->num_expired_deletion_records +=
1090           c_iter_stats.num_record_drop_obsolete;
1091     }
1092   }
1093   if (c_iter_stats.num_record_drop_range_del > 0) {
1094     RecordTick(stats_, COMPACTION_KEY_DROP_RANGE_DEL,
1095                c_iter_stats.num_record_drop_range_del);
1096   }
1097   if (c_iter_stats.num_range_del_drop_obsolete > 0) {
1098     RecordTick(stats_, COMPACTION_RANGE_DEL_DROP_OBSOLETE,
1099                c_iter_stats.num_range_del_drop_obsolete);
1100   }
1101   if (c_iter_stats.num_optimized_del_drop_obsolete > 0) {
1102     RecordTick(stats_, COMPACTION_OPTIMIZED_DEL_DROP_OBSOLETE,
1103                c_iter_stats.num_optimized_del_drop_obsolete);
1104   }
1105 }
1106 
FinishCompactionOutputFile(const Status & input_status,SubcompactionState * sub_compact,CompactionRangeDelAggregator * range_del_agg,CompactionIterationStats * range_del_out_stats,const Slice * next_table_min_key)1107 Status CompactionJob::FinishCompactionOutputFile(
1108     const Status& input_status, SubcompactionState* sub_compact,
1109     CompactionRangeDelAggregator* range_del_agg,
1110     CompactionIterationStats* range_del_out_stats,
1111     const Slice* next_table_min_key /* = nullptr */) {
1112   AutoThreadOperationStageUpdater stage_updater(
1113       ThreadStatus::STAGE_COMPACTION_SYNC_FILE);
1114   assert(sub_compact != nullptr);
1115   assert(sub_compact->outfile);
1116   assert(sub_compact->builder != nullptr);
1117   assert(sub_compact->current_output() != nullptr);
1118 
1119   uint64_t output_number = sub_compact->current_output()->meta.fd.GetNumber();
1120   assert(output_number != 0);
1121 
1122   ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
1123   const Comparator* ucmp = cfd->user_comparator();
1124 
1125   // Check for iterator errors
1126   Status s = input_status;
1127   auto meta = &sub_compact->current_output()->meta;
1128   assert(meta != nullptr);
1129   if (s.ok()) {
1130     Slice lower_bound_guard, upper_bound_guard;
1131     std::string smallest_user_key;
1132     const Slice *lower_bound, *upper_bound;
1133     bool lower_bound_from_sub_compact = false;
1134     if (sub_compact->outputs.size() == 1) {
1135       // For the first output table, include range tombstones before the min key
1136       // but after the subcompaction boundary.
1137       lower_bound = sub_compact->start;
1138       lower_bound_from_sub_compact = true;
1139     } else if (meta->smallest.size() > 0) {
1140       // For subsequent output tables, only include range tombstones from min
1141       // key onwards since the previous file was extended to contain range
1142       // tombstones falling before min key.
1143       smallest_user_key = meta->smallest.user_key().ToString(false /*hex*/);
1144       lower_bound_guard = Slice(smallest_user_key);
1145       lower_bound = &lower_bound_guard;
1146     } else {
1147       lower_bound = nullptr;
1148     }
1149     if (next_table_min_key != nullptr) {
1150       // This may be the last file in the subcompaction in some cases, so we
1151       // need to compare the end key of subcompaction with the next file start
1152       // key. When the end key is chosen by the subcompaction, we know that
1153       // it must be the biggest key in output file. Therefore, it is safe to
1154       // use the smaller key as the upper bound of the output file, to ensure
1155       // that there is no overlapping between different output files.
1156       upper_bound_guard = ExtractUserKey(*next_table_min_key);
1157       if (sub_compact->end != nullptr &&
1158           ucmp->Compare(upper_bound_guard, *sub_compact->end) >= 0) {
1159         upper_bound = sub_compact->end;
1160       } else {
1161         upper_bound = &upper_bound_guard;
1162       }
1163     } else {
1164       // This is the last file in the subcompaction, so extend until the
1165       // subcompaction ends.
1166       upper_bound = sub_compact->end;
1167     }
1168     auto earliest_snapshot = kMaxSequenceNumber;
1169     if (existing_snapshots_.size() > 0) {
1170       earliest_snapshot = existing_snapshots_[0];
1171     }
1172     bool has_overlapping_endpoints;
1173     if (upper_bound != nullptr && meta->largest.size() > 0) {
1174       has_overlapping_endpoints =
1175           ucmp->Compare(meta->largest.user_key(), *upper_bound) == 0;
1176     } else {
1177       has_overlapping_endpoints = false;
1178     }
1179 
1180     // The end key of the subcompaction must be bigger or equal to the upper
1181     // bound. If the end of subcompaction is null or the upper bound is null,
1182     // it means that this file is the last file in the compaction. So there
1183     // will be no overlapping between this file and others.
1184     assert(sub_compact->end == nullptr ||
1185            upper_bound == nullptr ||
1186            ucmp->Compare(*upper_bound , *sub_compact->end) <= 0);
1187     auto it = range_del_agg->NewIterator(lower_bound, upper_bound,
1188                                          has_overlapping_endpoints);
1189     // Position the range tombstone output iterator. There may be tombstone
1190     // fragments that are entirely out of range, so make sure that we do not
1191     // include those.
1192     if (lower_bound != nullptr) {
1193       it->Seek(*lower_bound);
1194     } else {
1195       it->SeekToFirst();
1196     }
1197     for (; it->Valid(); it->Next()) {
1198       auto tombstone = it->Tombstone();
1199       if (upper_bound != nullptr) {
1200         int cmp = ucmp->Compare(*upper_bound, tombstone.start_key_);
1201         if ((has_overlapping_endpoints && cmp < 0) ||
1202             (!has_overlapping_endpoints && cmp <= 0)) {
1203           // Tombstones starting after upper_bound only need to be included in
1204           // the next table. If the current SST ends before upper_bound, i.e.,
1205           // `has_overlapping_endpoints == false`, we can also skip over range
1206           // tombstones that start exactly at upper_bound. Such range tombstones
1207           // will be included in the next file and are not relevant to the point
1208           // keys or endpoints of the current file.
1209           break;
1210         }
1211       }
1212 
1213       if (bottommost_level_ && tombstone.seq_ <= earliest_snapshot) {
1214         // TODO(andrewkr): tombstones that span multiple output files are
1215         // counted for each compaction output file, so lots of double counting.
1216         range_del_out_stats->num_range_del_drop_obsolete++;
1217         range_del_out_stats->num_record_drop_obsolete++;
1218         continue;
1219       }
1220 
1221       auto kv = tombstone.Serialize();
1222       assert(lower_bound == nullptr ||
1223              ucmp->Compare(*lower_bound, kv.second) < 0);
1224       sub_compact->builder->Add(kv.first.Encode(), kv.second);
1225       InternalKey smallest_candidate = std::move(kv.first);
1226       if (lower_bound != nullptr &&
1227           ucmp->Compare(smallest_candidate.user_key(), *lower_bound) <= 0) {
1228         // Pretend the smallest key has the same user key as lower_bound
1229         // (the max key in the previous table or subcompaction) in order for
1230         // files to appear key-space partitioned.
1231         //
1232         // When lower_bound is chosen by a subcompaction, we know that
1233         // subcompactions over smaller keys cannot contain any keys at
1234         // lower_bound. We also know that smaller subcompactions exist, because
1235         // otherwise the subcompaction woud be unbounded on the left. As a
1236         // result, we know that no other files on the output level will contain
1237         // actual keys at lower_bound (an output file may have a largest key of
1238         // lower_bound@kMaxSequenceNumber, but this only indicates a large range
1239         // tombstone was truncated). Therefore, it is safe to use the
1240         // tombstone's sequence number, to ensure that keys at lower_bound at
1241         // lower levels are covered by truncated tombstones.
1242         //
1243         // If lower_bound was chosen by the smallest data key in the file,
1244         // choose lowest seqnum so this file's smallest internal key comes after
1245         // the previous file's largest. The fake seqnum is OK because the read
1246         // path's file-picking code only considers user key.
1247         smallest_candidate = InternalKey(
1248             *lower_bound, lower_bound_from_sub_compact ? tombstone.seq_ : 0,
1249             kTypeRangeDeletion);
1250       }
1251       InternalKey largest_candidate = tombstone.SerializeEndKey();
1252       if (upper_bound != nullptr &&
1253           ucmp->Compare(*upper_bound, largest_candidate.user_key()) <= 0) {
1254         // Pretend the largest key has the same user key as upper_bound (the
1255         // min key in the following table or subcompaction) in order for files
1256         // to appear key-space partitioned.
1257         //
1258         // Choose highest seqnum so this file's largest internal key comes
1259         // before the next file's/subcompaction's smallest. The fake seqnum is
1260         // OK because the read path's file-picking code only considers the user
1261         // key portion.
1262         //
1263         // Note Seek() also creates InternalKey with (user_key,
1264         // kMaxSequenceNumber), but with kTypeDeletion (0x7) instead of
1265         // kTypeRangeDeletion (0xF), so the range tombstone comes before the
1266         // Seek() key in InternalKey's ordering. So Seek() will look in the
1267         // next file for the user key.
1268         largest_candidate =
1269             InternalKey(*upper_bound, kMaxSequenceNumber, kTypeRangeDeletion);
1270       }
1271 #ifndef NDEBUG
1272       SequenceNumber smallest_ikey_seqnum = kMaxSequenceNumber;
1273       if (meta->smallest.size() > 0) {
1274         smallest_ikey_seqnum = GetInternalKeySeqno(meta->smallest.Encode());
1275       }
1276 #endif
1277       meta->UpdateBoundariesForRange(smallest_candidate, largest_candidate,
1278                                      tombstone.seq_,
1279                                      cfd->internal_comparator());
1280 
1281       // The smallest key in a file is used for range tombstone truncation, so
1282       // it cannot have a seqnum of 0 (unless the smallest data key in a file
1283       // has a seqnum of 0). Otherwise, the truncated tombstone may expose
1284       // deleted keys at lower levels.
1285       assert(smallest_ikey_seqnum == 0 ||
1286              ExtractInternalKeyFooter(meta->smallest.Encode()) !=
1287                  PackSequenceAndType(0, kTypeRangeDeletion));
1288     }
1289     meta->marked_for_compaction = sub_compact->builder->NeedCompact();
1290   }
1291   const uint64_t current_entries = sub_compact->builder->NumEntries();
1292   if (s.ok()) {
1293     s = sub_compact->builder->Finish();
1294   } else {
1295     sub_compact->builder->Abandon();
1296   }
1297   const uint64_t current_bytes = sub_compact->builder->FileSize();
1298   if (s.ok()) {
1299     // Add the checksum information to file metadata.
1300     meta->file_checksum = sub_compact->builder->GetFileChecksum();
1301     meta->file_checksum_func_name =
1302         sub_compact->builder->GetFileChecksumFuncName();
1303 
1304     meta->fd.file_size = current_bytes;
1305   }
1306   sub_compact->current_output()->finished = true;
1307   sub_compact->total_bytes += current_bytes;
1308 
1309   // Finish and check for file errors
1310   if (s.ok()) {
1311     StopWatch sw(env_, stats_, COMPACTION_OUTFILE_SYNC_MICROS);
1312     s = sub_compact->outfile->Sync(db_options_.use_fsync);
1313   }
1314   if (s.ok()) {
1315     s = sub_compact->outfile->Close();
1316   }
1317   sub_compact->outfile.reset();
1318 
1319   TableProperties tp;
1320   if (s.ok()) {
1321     tp = sub_compact->builder->GetTableProperties();
1322   }
1323 
1324   if (s.ok() && current_entries == 0 && tp.num_range_deletions == 0) {
1325     // If there is nothing to output, no necessary to generate a sst file.
1326     // This happens when the output level is bottom level, at the same time
1327     // the sub_compact output nothing.
1328     std::string fname =
1329         TableFileName(sub_compact->compaction->immutable_cf_options()->cf_paths,
1330                       meta->fd.GetNumber(), meta->fd.GetPathId());
1331     env_->DeleteFile(fname);
1332 
1333     // Also need to remove the file from outputs, or it will be added to the
1334     // VersionEdit.
1335     assert(!sub_compact->outputs.empty());
1336     sub_compact->outputs.pop_back();
1337     meta = nullptr;
1338   }
1339 
1340   if (s.ok() && (current_entries > 0 || tp.num_range_deletions > 0)) {
1341     // Output to event logger and fire events.
1342     sub_compact->current_output()->table_properties =
1343         std::make_shared<TableProperties>(tp);
1344     ROCKS_LOG_INFO(db_options_.info_log,
1345                    "[%s] [JOB %d] Generated table #%" PRIu64 ": %" PRIu64
1346                    " keys, %" PRIu64 " bytes%s",
1347                    cfd->GetName().c_str(), job_id_, output_number,
1348                    current_entries, current_bytes,
1349                    meta->marked_for_compaction ? " (need compaction)" : "");
1350   }
1351   std::string fname;
1352   FileDescriptor output_fd;
1353   uint64_t oldest_blob_file_number = kInvalidBlobFileNumber;
1354   if (meta != nullptr) {
1355     fname =
1356         TableFileName(sub_compact->compaction->immutable_cf_options()->cf_paths,
1357                       meta->fd.GetNumber(), meta->fd.GetPathId());
1358     output_fd = meta->fd;
1359     oldest_blob_file_number = meta->oldest_blob_file_number;
1360   } else {
1361     fname = "(nil)";
1362   }
1363   EventHelpers::LogAndNotifyTableFileCreationFinished(
1364       event_logger_, cfd->ioptions()->listeners, dbname_, cfd->GetName(), fname,
1365       job_id_, output_fd, oldest_blob_file_number, tp,
1366       TableFileCreationReason::kCompaction, s);
1367 
1368 #ifndef ROCKSDB_LITE
1369   // Report new file to SstFileManagerImpl
1370   auto sfm =
1371       static_cast<SstFileManagerImpl*>(db_options_.sst_file_manager.get());
1372   if (sfm && meta != nullptr && meta->fd.GetPathId() == 0) {
1373     sfm->OnAddFile(fname);
1374     if (sfm->IsMaxAllowedSpaceReached()) {
1375       // TODO(ajkr): should we return OK() if max space was reached by the final
1376       // compaction output file (similarly to how flush works when full)?
1377       s = Status::SpaceLimit("Max allowed space was reached");
1378       TEST_SYNC_POINT(
1379           "CompactionJob::FinishCompactionOutputFile:"
1380           "MaxAllowedSpaceReached");
1381       InstrumentedMutexLock l(db_mutex_);
1382       db_error_handler_->SetBGError(s, BackgroundErrorReason::kCompaction);
1383     }
1384   }
1385 #endif
1386 
1387   sub_compact->builder.reset();
1388   sub_compact->current_output_file_size = 0;
1389   return s;
1390 }
1391 
InstallCompactionResults(const MutableCFOptions & mutable_cf_options)1392 Status CompactionJob::InstallCompactionResults(
1393     const MutableCFOptions& mutable_cf_options) {
1394   db_mutex_->AssertHeld();
1395 
1396   auto* compaction = compact_->compaction;
1397   // paranoia: verify that the files that we started with
1398   // still exist in the current version and in the same original level.
1399   // This ensures that a concurrent compaction did not erroneously
1400   // pick the same files to compact_.
1401   if (!versions_->VerifyCompactionFileConsistency(compaction)) {
1402     Compaction::InputLevelSummaryBuffer inputs_summary;
1403 
1404     ROCKS_LOG_ERROR(db_options_.info_log, "[%s] [JOB %d] Compaction %s aborted",
1405                     compaction->column_family_data()->GetName().c_str(),
1406                     job_id_, compaction->InputLevelSummary(&inputs_summary));
1407     return Status::Corruption("Compaction input files inconsistent");
1408   }
1409 
1410   {
1411     Compaction::InputLevelSummaryBuffer inputs_summary;
1412     ROCKS_LOG_INFO(
1413         db_options_.info_log, "[%s] [JOB %d] Compacted %s => %" PRIu64 " bytes",
1414         compaction->column_family_data()->GetName().c_str(), job_id_,
1415         compaction->InputLevelSummary(&inputs_summary), compact_->total_bytes);
1416   }
1417 
1418   // Add compaction inputs
1419   compaction->AddInputDeletions(compact_->compaction->edit());
1420 
1421   for (const auto& sub_compact : compact_->sub_compact_states) {
1422     for (const auto& out : sub_compact.outputs) {
1423       compaction->edit()->AddFile(compaction->output_level(), out.meta);
1424     }
1425   }
1426   return versions_->LogAndApply(compaction->column_family_data(),
1427                                 mutable_cf_options, compaction->edit(),
1428                                 db_mutex_, db_directory_);
1429 }
1430 
RecordCompactionIOStats()1431 void CompactionJob::RecordCompactionIOStats() {
1432   RecordTick(stats_, COMPACT_READ_BYTES, IOSTATS(bytes_read));
1433   ThreadStatusUtil::IncreaseThreadOperationProperty(
1434       ThreadStatus::COMPACTION_BYTES_READ, IOSTATS(bytes_read));
1435   IOSTATS_RESET(bytes_read);
1436   RecordTick(stats_, COMPACT_WRITE_BYTES, IOSTATS(bytes_written));
1437   ThreadStatusUtil::IncreaseThreadOperationProperty(
1438       ThreadStatus::COMPACTION_BYTES_WRITTEN, IOSTATS(bytes_written));
1439   IOSTATS_RESET(bytes_written);
1440 }
1441 
OpenCompactionOutputFile(SubcompactionState * sub_compact)1442 Status CompactionJob::OpenCompactionOutputFile(
1443     SubcompactionState* sub_compact) {
1444   assert(sub_compact != nullptr);
1445   assert(sub_compact->builder == nullptr);
1446   // no need to lock because VersionSet::next_file_number_ is atomic
1447   uint64_t file_number = versions_->NewFileNumber();
1448   std::string fname =
1449       TableFileName(sub_compact->compaction->immutable_cf_options()->cf_paths,
1450                     file_number, sub_compact->compaction->output_path_id());
1451   // Fire events.
1452   ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
1453 #ifndef ROCKSDB_LITE
1454   EventHelpers::NotifyTableFileCreationStarted(
1455       cfd->ioptions()->listeners, dbname_, cfd->GetName(), fname, job_id_,
1456       TableFileCreationReason::kCompaction);
1457 #endif  // !ROCKSDB_LITE
1458   // Make the output file
1459   std::unique_ptr<FSWritableFile> writable_file;
1460 #ifndef NDEBUG
1461   bool syncpoint_arg = file_options_.use_direct_writes;
1462   TEST_SYNC_POINT_CALLBACK("CompactionJob::OpenCompactionOutputFile",
1463                            &syncpoint_arg);
1464 #endif
1465   Status s = NewWritableFile(fs_, fname, &writable_file, file_options_);
1466   if (!s.ok()) {
1467     ROCKS_LOG_ERROR(
1468         db_options_.info_log,
1469         "[%s] [JOB %d] OpenCompactionOutputFiles for table #%" PRIu64
1470         " fails at NewWritableFile with status %s",
1471         sub_compact->compaction->column_family_data()->GetName().c_str(),
1472         job_id_, file_number, s.ToString().c_str());
1473     LogFlush(db_options_.info_log);
1474     EventHelpers::LogAndNotifyTableFileCreationFinished(
1475         event_logger_, cfd->ioptions()->listeners, dbname_, cfd->GetName(),
1476         fname, job_id_, FileDescriptor(), kInvalidBlobFileNumber,
1477         TableProperties(), TableFileCreationReason::kCompaction, s);
1478     return s;
1479   }
1480 
1481   // Try to figure out the output file's oldest ancester time.
1482   int64_t temp_current_time = 0;
1483   auto get_time_status = env_->GetCurrentTime(&temp_current_time);
1484   // Safe to proceed even if GetCurrentTime fails. So, log and proceed.
1485   if (!get_time_status.ok()) {
1486     ROCKS_LOG_WARN(db_options_.info_log,
1487                    "Failed to get current time. Status: %s",
1488                    get_time_status.ToString().c_str());
1489   }
1490   uint64_t current_time = static_cast<uint64_t>(temp_current_time);
1491   uint64_t oldest_ancester_time =
1492       sub_compact->compaction->MinInputFileOldestAncesterTime();
1493   if (oldest_ancester_time == port::kMaxUint64) {
1494     oldest_ancester_time = current_time;
1495   }
1496 
1497   // Initialize a SubcompactionState::Output and add it to sub_compact->outputs
1498   {
1499     SubcompactionState::Output out;
1500     out.meta.fd = FileDescriptor(file_number,
1501                                  sub_compact->compaction->output_path_id(), 0);
1502     out.meta.oldest_ancester_time = oldest_ancester_time;
1503     out.meta.file_creation_time = current_time;
1504     out.finished = false;
1505     sub_compact->outputs.push_back(out);
1506   }
1507 
1508   writable_file->SetIOPriority(Env::IOPriority::IO_LOW);
1509   writable_file->SetWriteLifeTimeHint(write_hint_);
1510   writable_file->SetPreallocationBlockSize(static_cast<size_t>(
1511       sub_compact->compaction->OutputFilePreallocationSize()));
1512   const auto& listeners =
1513       sub_compact->compaction->immutable_cf_options()->listeners;
1514   sub_compact->outfile.reset(
1515       new WritableFileWriter(std::move(writable_file), fname, file_options_,
1516                              env_, db_options_.statistics.get(), listeners,
1517                              db_options_.sst_file_checksum_func.get()));
1518 
1519   // If the Column family flag is to only optimize filters for hits,
1520   // we can skip creating filters if this is the bottommost_level where
1521   // data is going to be found
1522   bool skip_filters =
1523       cfd->ioptions()->optimize_filters_for_hits && bottommost_level_;
1524 
1525   sub_compact->builder.reset(NewTableBuilder(
1526       *cfd->ioptions(), *(sub_compact->compaction->mutable_cf_options()),
1527       cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(),
1528       cfd->GetID(), cfd->GetName(), sub_compact->outfile.get(),
1529       sub_compact->compaction->output_compression(),
1530       0 /*sample_for_compression */,
1531       sub_compact->compaction->output_compression_opts(),
1532       sub_compact->compaction->output_level(), skip_filters,
1533       oldest_ancester_time, 0 /* oldest_key_time */,
1534       sub_compact->compaction->max_output_file_size(), current_time));
1535   LogFlush(db_options_.info_log);
1536   return s;
1537 }
1538 
CleanupCompaction()1539 void CompactionJob::CleanupCompaction() {
1540   for (SubcompactionState& sub_compact : compact_->sub_compact_states) {
1541     const auto& sub_status = sub_compact.status;
1542 
1543     if (sub_compact.builder != nullptr) {
1544       // May happen if we get a shutdown call in the middle of compaction
1545       sub_compact.builder->Abandon();
1546       sub_compact.builder.reset();
1547     } else {
1548       assert(!sub_status.ok() || sub_compact.outfile == nullptr);
1549     }
1550     for (const auto& out : sub_compact.outputs) {
1551       // If this file was inserted into the table cache then remove
1552       // them here because this compaction was not committed.
1553       if (!sub_status.ok()) {
1554         TableCache::Evict(table_cache_.get(), out.meta.fd.GetNumber());
1555       }
1556     }
1557   }
1558   delete compact_;
1559   compact_ = nullptr;
1560 }
1561 
1562 #ifndef ROCKSDB_LITE
1563 namespace {
CopyPrefix(const Slice & src,size_t prefix_length,std::string * dst)1564 void CopyPrefix(const Slice& src, size_t prefix_length, std::string* dst) {
1565   assert(prefix_length > 0);
1566   size_t length = src.size() > prefix_length ? prefix_length : src.size();
1567   dst->assign(src.data(), length);
1568 }
1569 }  // namespace
1570 
1571 #endif  // !ROCKSDB_LITE
1572 
UpdateCompactionStats()1573 void CompactionJob::UpdateCompactionStats() {
1574   Compaction* compaction = compact_->compaction;
1575   compaction_stats_.num_input_files_in_non_output_levels = 0;
1576   compaction_stats_.num_input_files_in_output_level = 0;
1577   for (int input_level = 0;
1578        input_level < static_cast<int>(compaction->num_input_levels());
1579        ++input_level) {
1580     if (compaction->level(input_level) != compaction->output_level()) {
1581       UpdateCompactionInputStatsHelper(
1582           &compaction_stats_.num_input_files_in_non_output_levels,
1583           &compaction_stats_.bytes_read_non_output_levels, input_level);
1584     } else {
1585       UpdateCompactionInputStatsHelper(
1586           &compaction_stats_.num_input_files_in_output_level,
1587           &compaction_stats_.bytes_read_output_level, input_level);
1588     }
1589   }
1590 
1591   uint64_t num_output_records = 0;
1592 
1593   for (const auto& sub_compact : compact_->sub_compact_states) {
1594     size_t num_output_files = sub_compact.outputs.size();
1595     if (sub_compact.builder != nullptr) {
1596       // An error occurred so ignore the last output.
1597       assert(num_output_files > 0);
1598       --num_output_files;
1599     }
1600     compaction_stats_.num_output_files += static_cast<int>(num_output_files);
1601 
1602     num_output_records += sub_compact.num_output_records;
1603 
1604     for (const auto& out : sub_compact.outputs) {
1605       compaction_stats_.bytes_written += out.meta.fd.file_size;
1606     }
1607   }
1608 
1609   if (compaction_stats_.num_input_records > num_output_records) {
1610     compaction_stats_.num_dropped_records =
1611         compaction_stats_.num_input_records - num_output_records;
1612   }
1613 }
1614 
UpdateCompactionInputStatsHelper(int * num_files,uint64_t * bytes_read,int input_level)1615 void CompactionJob::UpdateCompactionInputStatsHelper(int* num_files,
1616                                                      uint64_t* bytes_read,
1617                                                      int input_level) {
1618   const Compaction* compaction = compact_->compaction;
1619   auto num_input_files = compaction->num_input_files(input_level);
1620   *num_files += static_cast<int>(num_input_files);
1621 
1622   for (size_t i = 0; i < num_input_files; ++i) {
1623     const auto* file_meta = compaction->input(input_level, i);
1624     *bytes_read += file_meta->fd.GetFileSize();
1625     compaction_stats_.num_input_records +=
1626         static_cast<uint64_t>(file_meta->num_entries);
1627   }
1628 }
1629 
UpdateCompactionJobStats(const InternalStats::CompactionStats & stats) const1630 void CompactionJob::UpdateCompactionJobStats(
1631     const InternalStats::CompactionStats& stats) const {
1632 #ifndef ROCKSDB_LITE
1633   if (compaction_job_stats_) {
1634     compaction_job_stats_->elapsed_micros = stats.micros;
1635 
1636     // input information
1637     compaction_job_stats_->total_input_bytes =
1638         stats.bytes_read_non_output_levels + stats.bytes_read_output_level;
1639     compaction_job_stats_->num_input_records = stats.num_input_records;
1640     compaction_job_stats_->num_input_files =
1641         stats.num_input_files_in_non_output_levels +
1642         stats.num_input_files_in_output_level;
1643     compaction_job_stats_->num_input_files_at_output_level =
1644         stats.num_input_files_in_output_level;
1645 
1646     // output information
1647     compaction_job_stats_->total_output_bytes = stats.bytes_written;
1648     compaction_job_stats_->num_output_records = compact_->num_output_records;
1649     compaction_job_stats_->num_output_files = stats.num_output_files;
1650 
1651     if (compact_->NumOutputFiles() > 0U) {
1652       CopyPrefix(compact_->SmallestUserKey(),
1653                  CompactionJobStats::kMaxPrefixLength,
1654                  &compaction_job_stats_->smallest_output_key_prefix);
1655       CopyPrefix(compact_->LargestUserKey(),
1656                  CompactionJobStats::kMaxPrefixLength,
1657                  &compaction_job_stats_->largest_output_key_prefix);
1658     }
1659   }
1660 #else
1661   (void)stats;
1662 #endif  // !ROCKSDB_LITE
1663 }
1664 
LogCompaction()1665 void CompactionJob::LogCompaction() {
1666   Compaction* compaction = compact_->compaction;
1667   ColumnFamilyData* cfd = compaction->column_family_data();
1668 
1669   // Let's check if anything will get logged. Don't prepare all the info if
1670   // we're not logging
1671   if (db_options_.info_log_level <= InfoLogLevel::INFO_LEVEL) {
1672     Compaction::InputLevelSummaryBuffer inputs_summary;
1673     ROCKS_LOG_INFO(
1674         db_options_.info_log, "[%s] [JOB %d] Compacting %s, score %.2f",
1675         cfd->GetName().c_str(), job_id_,
1676         compaction->InputLevelSummary(&inputs_summary), compaction->score());
1677     char scratch[2345];
1678     compaction->Summary(scratch, sizeof(scratch));
1679     ROCKS_LOG_INFO(db_options_.info_log, "[%s] Compaction start summary: %s\n",
1680                    cfd->GetName().c_str(), scratch);
1681     // build event logger report
1682     auto stream = event_logger_->Log();
1683     stream << "job" << job_id_ << "event"
1684            << "compaction_started"
1685            << "compaction_reason"
1686            << GetCompactionReasonString(compaction->compaction_reason());
1687     for (size_t i = 0; i < compaction->num_input_levels(); ++i) {
1688       stream << ("files_L" + ToString(compaction->level(i)));
1689       stream.StartArray();
1690       for (auto f : *compaction->inputs(i)) {
1691         stream << f->fd.GetNumber();
1692       }
1693       stream.EndArray();
1694     }
1695     stream << "score" << compaction->score() << "input_data_size"
1696            << compaction->CalculateTotalInputSize();
1697   }
1698 }
1699 
1700 }  // namespace ROCKSDB_NAMESPACE
1701