1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "db/compaction/compaction_job.h"
11 
12 #include <algorithm>
13 #include <cinttypes>
14 #include <functional>
15 #include <list>
16 #include <memory>
17 #include <random>
18 #include <set>
19 #include <thread>
20 #include <utility>
21 #include <vector>
22 
23 #include "db/blob/blob_file_addition.h"
24 #include "db/blob/blob_file_builder.h"
25 #include "db/builder.h"
26 #include "db/db_impl/db_impl.h"
27 #include "db/db_iter.h"
28 #include "db/dbformat.h"
29 #include "db/error_handler.h"
30 #include "db/event_helpers.h"
31 #include "db/log_reader.h"
32 #include "db/log_writer.h"
33 #include "db/memtable.h"
34 #include "db/memtable_list.h"
35 #include "db/merge_context.h"
36 #include "db/merge_helper.h"
37 #include "db/output_validator.h"
38 #include "db/range_del_aggregator.h"
39 #include "db/version_set.h"
40 #include "file/filename.h"
41 #include "file/read_write_util.h"
42 #include "file/sst_file_manager_impl.h"
43 #include "file/writable_file_writer.h"
44 #include "logging/log_buffer.h"
45 #include "logging/logging.h"
46 #include "monitoring/iostats_context_imp.h"
47 #include "monitoring/perf_context_imp.h"
48 #include "monitoring/thread_status_util.h"
49 #include "options/configurable_helper.h"
50 #include "options/options_helper.h"
51 #include "port/port.h"
52 #include "rocksdb/db.h"
53 #include "rocksdb/env.h"
54 #include "rocksdb/sst_partitioner.h"
55 #include "rocksdb/statistics.h"
56 #include "rocksdb/status.h"
57 #include "rocksdb/table.h"
58 #include "rocksdb/utilities/options_type.h"
59 #include "table/block_based/block.h"
60 #include "table/block_based/block_based_table_factory.h"
61 #include "table/merging_iterator.h"
62 #include "table/table_builder.h"
63 #include "test_util/sync_point.h"
64 #include "util/coding.h"
65 #include "util/hash.h"
66 #include "util/mutexlock.h"
67 #include "util/random.h"
68 #include "util/stop_watch.h"
69 #include "util/string_util.h"
70 
71 namespace ROCKSDB_NAMESPACE {
72 
GetCompactionReasonString(CompactionReason compaction_reason)73 const char* GetCompactionReasonString(CompactionReason compaction_reason) {
74   switch (compaction_reason) {
75     case CompactionReason::kUnknown:
76       return "Unknown";
77     case CompactionReason::kLevelL0FilesNum:
78       return "LevelL0FilesNum";
79     case CompactionReason::kLevelMaxLevelSize:
80       return "LevelMaxLevelSize";
81     case CompactionReason::kUniversalSizeAmplification:
82       return "UniversalSizeAmplification";
83     case CompactionReason::kUniversalSizeRatio:
84       return "UniversalSizeRatio";
85     case CompactionReason::kUniversalSortedRunNum:
86       return "UniversalSortedRunNum";
87     case CompactionReason::kFIFOMaxSize:
88       return "FIFOMaxSize";
89     case CompactionReason::kFIFOReduceNumFiles:
90       return "FIFOReduceNumFiles";
91     case CompactionReason::kFIFOTtl:
92       return "FIFOTtl";
93     case CompactionReason::kManualCompaction:
94       return "ManualCompaction";
95     case CompactionReason::kFilesMarkedForCompaction:
96       return "FilesMarkedForCompaction";
97     case CompactionReason::kBottommostFiles:
98       return "BottommostFiles";
99     case CompactionReason::kTtl:
100       return "Ttl";
101     case CompactionReason::kFlush:
102       return "Flush";
103     case CompactionReason::kExternalSstIngestion:
104       return "ExternalSstIngestion";
105     case CompactionReason::kPeriodicCompaction:
106       return "PeriodicCompaction";
107     case CompactionReason::kNumOfReasons:
108       // fall through
109     default:
110       assert(false);
111       return "Invalid";
112   }
113 }
114 
115 // Maintains state for each sub-compaction
116 struct CompactionJob::SubcompactionState {
117   const Compaction* compaction;
118   std::unique_ptr<CompactionIterator> c_iter;
119 
120   // The boundaries of the key-range this compaction is interested in. No two
121   // subcompactions may have overlapping key-ranges.
122   // 'start' is inclusive, 'end' is exclusive, and nullptr means unbounded
123   Slice *start, *end;
124 
125   // The return status of this subcompaction
126   Status status;
127 
128   // The return IO Status of this subcompaction
129   IOStatus io_status;
130 
131   // Files produced by this subcompaction
132   struct Output {
OutputROCKSDB_NAMESPACE::CompactionJob::SubcompactionState::Output133     Output(FileMetaData&& _meta, const InternalKeyComparator& _icmp,
134            bool _enable_order_check, bool _enable_hash, bool _finished = false,
135            uint64_t precalculated_hash = 0)
136         : meta(std::move(_meta)),
137           validator(_icmp, _enable_order_check, _enable_hash,
138                     precalculated_hash),
139           finished(_finished) {}
140     FileMetaData meta;
141     OutputValidator validator;
142     bool finished;
143     std::shared_ptr<const TableProperties> table_properties;
144   };
145 
146   // State kept for output being generated
147   std::vector<Output> outputs;
148   std::vector<BlobFileAddition> blob_file_additions;
149   std::unique_ptr<WritableFileWriter> outfile;
150   std::unique_ptr<TableBuilder> builder;
151 
current_outputROCKSDB_NAMESPACE::CompactionJob::SubcompactionState152   Output* current_output() {
153     if (outputs.empty()) {
154       // This subcompaction's output could be empty if compaction was aborted
155       // before this subcompaction had a chance to generate any output files.
156       // When subcompactions are executed sequentially this is more likely and
157       // will be particularly likely for the later subcompactions to be empty.
158       // Once they are run in parallel however it should be much rarer.
159       return nullptr;
160     } else {
161       return &outputs.back();
162     }
163   }
164 
165   uint64_t current_output_file_size = 0;
166 
167   // State during the subcompaction
168   uint64_t total_bytes = 0;
169   uint64_t num_output_records = 0;
170   CompactionJobStats compaction_job_stats;
171   uint64_t approx_size = 0;
172   // An index that used to speed up ShouldStopBefore().
173   size_t grandparent_index = 0;
174   // The number of bytes overlapping between the current output and
175   // grandparent files used in ShouldStopBefore().
176   uint64_t overlapped_bytes = 0;
177   // A flag determine whether the key has been seen in ShouldStopBefore()
178   bool seen_key = false;
179 
SubcompactionStateROCKSDB_NAMESPACE::CompactionJob::SubcompactionState180   SubcompactionState(Compaction* c, Slice* _start, Slice* _end, uint64_t size)
181       : compaction(c), start(_start), end(_end), approx_size(size) {
182     assert(compaction != nullptr);
183   }
184 
185   // Adds the key and value to the builder
186   // If paranoid is true, adds the key-value to the paranoid hash
AddToBuilderROCKSDB_NAMESPACE::CompactionJob::SubcompactionState187   Status AddToBuilder(const Slice& key, const Slice& value) {
188     auto curr = current_output();
189     assert(builder != nullptr);
190     assert(curr != nullptr);
191     Status s = curr->validator.Add(key, value);
192     if (!s.ok()) {
193       return s;
194     }
195     builder->Add(key, value);
196     return Status::OK();
197   }
198 
199   // Returns true iff we should stop building the current output
200   // before processing "internal_key".
ShouldStopBeforeROCKSDB_NAMESPACE::CompactionJob::SubcompactionState201   bool ShouldStopBefore(const Slice& internal_key, uint64_t curr_file_size) {
202     const InternalKeyComparator* icmp =
203         &compaction->column_family_data()->internal_comparator();
204     const std::vector<FileMetaData*>& grandparents = compaction->grandparents();
205 
206     // Scan to find earliest grandparent file that contains key.
207     while (grandparent_index < grandparents.size() &&
208            icmp->Compare(internal_key,
209                          grandparents[grandparent_index]->largest.Encode()) >
210                0) {
211       if (seen_key) {
212         overlapped_bytes += grandparents[grandparent_index]->fd.GetFileSize();
213       }
214       assert(grandparent_index + 1 >= grandparents.size() ||
215              icmp->Compare(
216                  grandparents[grandparent_index]->largest.Encode(),
217                  grandparents[grandparent_index + 1]->smallest.Encode()) <= 0);
218       grandparent_index++;
219     }
220     seen_key = true;
221 
222     if (overlapped_bytes + curr_file_size >
223         compaction->max_compaction_bytes()) {
224       // Too much overlap for current output; start new output
225       overlapped_bytes = 0;
226       return true;
227     }
228 
229     return false;
230   }
231 };
232 
233 // Maintains state for the entire compaction
234 struct CompactionJob::CompactionState {
235   Compaction* const compaction;
236 
237   // REQUIRED: subcompaction states are stored in order of increasing
238   // key-range
239   std::vector<CompactionJob::SubcompactionState> sub_compact_states;
240   Status status;
241 
242   size_t num_output_files = 0;
243   uint64_t total_bytes = 0;
244   size_t num_blob_output_files = 0;
245   uint64_t total_blob_bytes = 0;
246   uint64_t num_output_records = 0;
247 
CompactionStateROCKSDB_NAMESPACE::CompactionJob::CompactionState248   explicit CompactionState(Compaction* c) : compaction(c) {}
249 
SmallestUserKeyROCKSDB_NAMESPACE::CompactionJob::CompactionState250   Slice SmallestUserKey() {
251     for (const auto& sub_compact_state : sub_compact_states) {
252       if (!sub_compact_state.outputs.empty() &&
253           sub_compact_state.outputs[0].finished) {
254         return sub_compact_state.outputs[0].meta.smallest.user_key();
255       }
256     }
257     // If there is no finished output, return an empty slice.
258     return Slice(nullptr, 0);
259   }
260 
LargestUserKeyROCKSDB_NAMESPACE::CompactionJob::CompactionState261   Slice LargestUserKey() {
262     for (auto it = sub_compact_states.rbegin(); it < sub_compact_states.rend();
263          ++it) {
264       if (!it->outputs.empty() && it->current_output()->finished) {
265         assert(it->current_output() != nullptr);
266         return it->current_output()->meta.largest.user_key();
267       }
268     }
269     // If there is no finished output, return an empty slice.
270     return Slice(nullptr, 0);
271   }
272 };
273 
AggregateStatistics()274 void CompactionJob::AggregateStatistics() {
275   assert(compact_);
276 
277   for (SubcompactionState& sc : compact_->sub_compact_states) {
278     auto& outputs = sc.outputs;
279 
280     if (!outputs.empty() && !outputs.back().meta.fd.file_size) {
281       // An error occurred, so ignore the last output.
282       outputs.pop_back();
283     }
284 
285     compact_->num_output_files += outputs.size();
286     compact_->total_bytes += sc.total_bytes;
287 
288     const auto& blobs = sc.blob_file_additions;
289 
290     compact_->num_blob_output_files += blobs.size();
291 
292     for (const auto& blob : blobs) {
293       compact_->total_blob_bytes += blob.GetTotalBlobBytes();
294     }
295 
296     compact_->num_output_records += sc.num_output_records;
297 
298     compaction_job_stats_->Add(sc.compaction_job_stats);
299   }
300 }
301 
CompactionJob(int job_id,Compaction * compaction,const ImmutableDBOptions & db_options,const MutableDBOptions & mutable_db_options,const FileOptions & file_options,VersionSet * versions,const std::atomic<bool> * shutting_down,const SequenceNumber preserve_deletes_seqnum,LogBuffer * log_buffer,FSDirectory * db_directory,FSDirectory * output_directory,FSDirectory * blob_output_directory,Statistics * stats,InstrumentedMutex * db_mutex,ErrorHandler * db_error_handler,std::vector<SequenceNumber> existing_snapshots,SequenceNumber earliest_write_conflict_snapshot,const SnapshotChecker * snapshot_checker,std::shared_ptr<Cache> table_cache,EventLogger * event_logger,bool paranoid_file_checks,bool measure_io_stats,const std::string & dbname,CompactionJobStats * compaction_job_stats,Env::Priority thread_pri,const std::shared_ptr<IOTracer> & io_tracer,const std::atomic<int> * manual_compaction_paused,const std::string & db_id,const std::string & db_session_id,std::string full_history_ts_low,BlobFileCompletionCallback * blob_callback)302 CompactionJob::CompactionJob(
303     int job_id, Compaction* compaction, const ImmutableDBOptions& db_options,
304     const MutableDBOptions& mutable_db_options, const FileOptions& file_options,
305     VersionSet* versions, const std::atomic<bool>* shutting_down,
306     const SequenceNumber preserve_deletes_seqnum, LogBuffer* log_buffer,
307     FSDirectory* db_directory, FSDirectory* output_directory,
308     FSDirectory* blob_output_directory, Statistics* stats,
309     InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler,
310     std::vector<SequenceNumber> existing_snapshots,
311     SequenceNumber earliest_write_conflict_snapshot,
312     const SnapshotChecker* snapshot_checker, std::shared_ptr<Cache> table_cache,
313     EventLogger* event_logger, bool paranoid_file_checks, bool measure_io_stats,
314     const std::string& dbname, CompactionJobStats* compaction_job_stats,
315     Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer,
316     const std::atomic<int>* manual_compaction_paused, const std::string& db_id,
317     const std::string& db_session_id, std::string full_history_ts_low,
318     BlobFileCompletionCallback* blob_callback)
319     : compact_(new CompactionState(compaction)),
320       compaction_stats_(compaction->compaction_reason(), 1),
321       db_options_(db_options),
322       mutable_db_options_copy_(mutable_db_options),
323       log_buffer_(log_buffer),
324       output_directory_(output_directory),
325       stats_(stats),
326       bottommost_level_(false),
327       write_hint_(Env::WLTH_NOT_SET),
328       job_id_(job_id),
329       compaction_job_stats_(compaction_job_stats),
330       dbname_(dbname),
331       db_id_(db_id),
332       db_session_id_(db_session_id),
333       file_options_(file_options),
334       env_(db_options.env),
335       io_tracer_(io_tracer),
336       fs_(db_options.fs, io_tracer),
337       file_options_for_read_(
338           fs_->OptimizeForCompactionTableRead(file_options, db_options_)),
339       versions_(versions),
340       shutting_down_(shutting_down),
341       manual_compaction_paused_(manual_compaction_paused),
342       preserve_deletes_seqnum_(preserve_deletes_seqnum),
343       db_directory_(db_directory),
344       blob_output_directory_(blob_output_directory),
345       db_mutex_(db_mutex),
346       db_error_handler_(db_error_handler),
347       existing_snapshots_(std::move(existing_snapshots)),
348       earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
349       snapshot_checker_(snapshot_checker),
350       table_cache_(std::move(table_cache)),
351       event_logger_(event_logger),
352       paranoid_file_checks_(paranoid_file_checks),
353       measure_io_stats_(measure_io_stats),
354       thread_pri_(thread_pri),
355       full_history_ts_low_(std::move(full_history_ts_low)),
356       blob_callback_(blob_callback) {
357   assert(compaction_job_stats_ != nullptr);
358   assert(log_buffer_ != nullptr);
359   const auto* cfd = compact_->compaction->column_family_data();
360   ThreadStatusUtil::SetColumnFamily(cfd, cfd->ioptions()->env,
361                                     db_options_.enable_thread_tracking);
362   ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
363   ReportStartedCompaction(compaction);
364 }
365 
~CompactionJob()366 CompactionJob::~CompactionJob() {
367   assert(compact_ == nullptr);
368   ThreadStatusUtil::ResetThreadStatus();
369 }
370 
ReportStartedCompaction(Compaction * compaction)371 void CompactionJob::ReportStartedCompaction(Compaction* compaction) {
372   const auto* cfd = compact_->compaction->column_family_data();
373   ThreadStatusUtil::SetColumnFamily(cfd, cfd->ioptions()->env,
374                                     db_options_.enable_thread_tracking);
375 
376   ThreadStatusUtil::SetThreadOperationProperty(ThreadStatus::COMPACTION_JOB_ID,
377                                                job_id_);
378 
379   ThreadStatusUtil::SetThreadOperationProperty(
380       ThreadStatus::COMPACTION_INPUT_OUTPUT_LEVEL,
381       (static_cast<uint64_t>(compact_->compaction->start_level()) << 32) +
382           compact_->compaction->output_level());
383 
384   // In the current design, a CompactionJob is always created
385   // for non-trivial compaction.
386   assert(compaction->IsTrivialMove() == false ||
387          compaction->is_manual_compaction() == true);
388 
389   ThreadStatusUtil::SetThreadOperationProperty(
390       ThreadStatus::COMPACTION_PROP_FLAGS,
391       compaction->is_manual_compaction() +
392           (compaction->deletion_compaction() << 1));
393 
394   ThreadStatusUtil::SetThreadOperationProperty(
395       ThreadStatus::COMPACTION_TOTAL_INPUT_BYTES,
396       compaction->CalculateTotalInputSize());
397 
398   IOSTATS_RESET(bytes_written);
399   IOSTATS_RESET(bytes_read);
400   ThreadStatusUtil::SetThreadOperationProperty(
401       ThreadStatus::COMPACTION_BYTES_WRITTEN, 0);
402   ThreadStatusUtil::SetThreadOperationProperty(
403       ThreadStatus::COMPACTION_BYTES_READ, 0);
404 
405   // Set the thread operation after operation properties
406   // to ensure GetThreadList() can always show them all together.
407   ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
408 
409   compaction_job_stats_->is_manual_compaction =
410       compaction->is_manual_compaction();
411   compaction_job_stats_->is_full_compaction = compaction->is_full_compaction();
412 }
413 
Prepare()414 void CompactionJob::Prepare() {
415   AutoThreadOperationStageUpdater stage_updater(
416       ThreadStatus::STAGE_COMPACTION_PREPARE);
417 
418   // Generate file_levels_ for compaction before making Iterator
419   auto* c = compact_->compaction;
420   assert(c->column_family_data() != nullptr);
421   assert(c->column_family_data()->current()->storage_info()->NumLevelFiles(
422              compact_->compaction->level()) > 0);
423 
424   write_hint_ =
425       c->column_family_data()->CalculateSSTWriteHint(c->output_level());
426   bottommost_level_ = c->bottommost_level();
427 
428   if (c->ShouldFormSubcompactions()) {
429     {
430       StopWatch sw(db_options_.clock, stats_, SUBCOMPACTION_SETUP_TIME);
431       GenSubcompactionBoundaries();
432     }
433     assert(sizes_.size() == boundaries_.size() + 1);
434 
435     for (size_t i = 0; i <= boundaries_.size(); i++) {
436       Slice* start = i == 0 ? nullptr : &boundaries_[i - 1];
437       Slice* end = i == boundaries_.size() ? nullptr : &boundaries_[i];
438       compact_->sub_compact_states.emplace_back(c, start, end, sizes_[i]);
439     }
440     RecordInHistogram(stats_, NUM_SUBCOMPACTIONS_SCHEDULED,
441                       compact_->sub_compact_states.size());
442   } else {
443     constexpr Slice* start = nullptr;
444     constexpr Slice* end = nullptr;
445     constexpr uint64_t size = 0;
446 
447     compact_->sub_compact_states.emplace_back(c, start, end, size);
448   }
449 }
450 
451 struct RangeWithSize {
452   Range range;
453   uint64_t size;
454 
RangeWithSizeROCKSDB_NAMESPACE::RangeWithSize455   RangeWithSize(const Slice& a, const Slice& b, uint64_t s = 0)
456       : range(a, b), size(s) {}
457 };
458 
GenSubcompactionBoundaries()459 void CompactionJob::GenSubcompactionBoundaries() {
460   auto* c = compact_->compaction;
461   auto* cfd = c->column_family_data();
462   const Comparator* cfd_comparator = cfd->user_comparator();
463   std::vector<Slice> bounds;
464   int start_lvl = c->start_level();
465   int out_lvl = c->output_level();
466 
467   // Add the starting and/or ending key of certain input files as a potential
468   // boundary
469   for (size_t lvl_idx = 0; lvl_idx < c->num_input_levels(); lvl_idx++) {
470     int lvl = c->level(lvl_idx);
471     if (lvl >= start_lvl && lvl <= out_lvl) {
472       const LevelFilesBrief* flevel = c->input_levels(lvl_idx);
473       size_t num_files = flevel->num_files;
474 
475       if (num_files == 0) {
476         continue;
477       }
478 
479       if (lvl == 0) {
480         // For level 0 add the starting and ending key of each file since the
481         // files may have greatly differing key ranges (not range-partitioned)
482         for (size_t i = 0; i < num_files; i++) {
483           bounds.emplace_back(flevel->files[i].smallest_key);
484           bounds.emplace_back(flevel->files[i].largest_key);
485         }
486       } else {
487         // For all other levels add the smallest/largest key in the level to
488         // encompass the range covered by that level
489         bounds.emplace_back(flevel->files[0].smallest_key);
490         bounds.emplace_back(flevel->files[num_files - 1].largest_key);
491         if (lvl == out_lvl) {
492           // For the last level include the starting keys of all files since
493           // the last level is the largest and probably has the widest key
494           // range. Since it's range partitioned, the ending key of one file
495           // and the starting key of the next are very close (or identical).
496           for (size_t i = 1; i < num_files; i++) {
497             bounds.emplace_back(flevel->files[i].smallest_key);
498           }
499         }
500       }
501     }
502   }
503 
504   std::sort(bounds.begin(), bounds.end(),
505             [cfd_comparator](const Slice& a, const Slice& b) -> bool {
506               return cfd_comparator->Compare(ExtractUserKey(a),
507                                              ExtractUserKey(b)) < 0;
508             });
509   // Remove duplicated entries from bounds
510   bounds.erase(
511       std::unique(bounds.begin(), bounds.end(),
512                   [cfd_comparator](const Slice& a, const Slice& b) -> bool {
513                     return cfd_comparator->Compare(ExtractUserKey(a),
514                                                    ExtractUserKey(b)) == 0;
515                   }),
516       bounds.end());
517 
518   // Combine consecutive pairs of boundaries into ranges with an approximate
519   // size of data covered by keys in that range
520   uint64_t sum = 0;
521   std::vector<RangeWithSize> ranges;
522   // Get input version from CompactionState since it's already referenced
523   // earlier in SetInputVersioCompaction::SetInputVersion and will not change
524   // when db_mutex_ is released below
525   auto* v = compact_->compaction->input_version();
526   for (auto it = bounds.begin();;) {
527     const Slice a = *it;
528     ++it;
529 
530     if (it == bounds.end()) {
531       break;
532     }
533 
534     const Slice b = *it;
535 
536     // ApproximateSize could potentially create table reader iterator to seek
537     // to the index block and may incur I/O cost in the process. Unlock db
538     // mutex to reduce contention
539     db_mutex_->Unlock();
540     uint64_t size = versions_->ApproximateSize(SizeApproximationOptions(), v, a,
541                                                b, start_lvl, out_lvl + 1,
542                                                TableReaderCaller::kCompaction);
543     db_mutex_->Lock();
544     ranges.emplace_back(a, b, size);
545     sum += size;
546   }
547 
548   // Group the ranges into subcompactions
549   const double min_file_fill_percent = 4.0 / 5;
550   int base_level = v->storage_info()->base_level();
551   uint64_t max_output_files = static_cast<uint64_t>(std::ceil(
552       sum / min_file_fill_percent /
553       MaxFileSizeForLevel(*(c->mutable_cf_options()), out_lvl,
554           c->immutable_cf_options()->compaction_style, base_level,
555           c->immutable_cf_options()->level_compaction_dynamic_level_bytes)));
556   uint64_t subcompactions =
557       std::min({static_cast<uint64_t>(ranges.size()),
558                 static_cast<uint64_t>(c->max_subcompactions()),
559                 max_output_files});
560 
561   if (subcompactions > 1) {
562     double mean = sum * 1.0 / subcompactions;
563     // Greedily add ranges to the subcompaction until the sum of the ranges'
564     // sizes becomes >= the expected mean size of a subcompaction
565     sum = 0;
566     for (size_t i = 0; i + 1 < ranges.size(); i++) {
567       sum += ranges[i].size;
568       if (subcompactions == 1) {
569         // If there's only one left to schedule then it goes to the end so no
570         // need to put an end boundary
571         continue;
572       }
573       if (sum >= mean) {
574         boundaries_.emplace_back(ExtractUserKey(ranges[i].range.limit));
575         sizes_.emplace_back(sum);
576         subcompactions--;
577         sum = 0;
578       }
579     }
580     sizes_.emplace_back(sum + ranges.back().size);
581   } else {
582     // Only one range so its size is the total sum of sizes computed above
583     sizes_.emplace_back(sum);
584   }
585 }
586 
Run()587 Status CompactionJob::Run() {
588   AutoThreadOperationStageUpdater stage_updater(
589       ThreadStatus::STAGE_COMPACTION_RUN);
590   TEST_SYNC_POINT("CompactionJob::Run():Start");
591   log_buffer_->FlushBufferToLog();
592   LogCompaction();
593 
594   const size_t num_threads = compact_->sub_compact_states.size();
595   assert(num_threads > 0);
596   const uint64_t start_micros = db_options_.clock->NowMicros();
597 
598   // Launch a thread for each of subcompactions 1...num_threads-1
599   std::vector<port::Thread> thread_pool;
600   thread_pool.reserve(num_threads - 1);
601   for (size_t i = 1; i < compact_->sub_compact_states.size(); i++) {
602     thread_pool.emplace_back(&CompactionJob::ProcessKeyValueCompaction, this,
603                              &compact_->sub_compact_states[i]);
604   }
605 
606   // Always schedule the first subcompaction (whether or not there are also
607   // others) in the current thread to be efficient with resources
608   ProcessKeyValueCompaction(&compact_->sub_compact_states[0]);
609 
610   // Wait for all other threads (if there are any) to finish execution
611   for (auto& thread : thread_pool) {
612     thread.join();
613   }
614 
615   compaction_stats_.micros = db_options_.clock->NowMicros() - start_micros;
616   compaction_stats_.cpu_micros = 0;
617   for (size_t i = 0; i < compact_->sub_compact_states.size(); i++) {
618     compaction_stats_.cpu_micros +=
619         compact_->sub_compact_states[i].compaction_job_stats.cpu_micros;
620   }
621 
622   RecordTimeToHistogram(stats_, COMPACTION_TIME, compaction_stats_.micros);
623   RecordTimeToHistogram(stats_, COMPACTION_CPU_TIME,
624                         compaction_stats_.cpu_micros);
625 
626   TEST_SYNC_POINT("CompactionJob::Run:BeforeVerify");
627 
628   // Check if any thread encountered an error during execution
629   Status status;
630   IOStatus io_s;
631   bool wrote_new_blob_files = false;
632 
633   for (const auto& state : compact_->sub_compact_states) {
634     if (!state.status.ok()) {
635       status = state.status;
636       io_s = state.io_status;
637       break;
638     }
639 
640     if (!state.blob_file_additions.empty()) {
641       wrote_new_blob_files = true;
642     }
643   }
644 
645   if (io_status_.ok()) {
646     io_status_ = io_s;
647   }
648   if (status.ok()) {
649     constexpr IODebugContext* dbg = nullptr;
650 
651     if (output_directory_) {
652       io_s = output_directory_->Fsync(IOOptions(), dbg);
653     }
654 
655     if (io_s.ok() && wrote_new_blob_files && blob_output_directory_ &&
656         blob_output_directory_ != output_directory_) {
657       io_s = blob_output_directory_->Fsync(IOOptions(), dbg);
658     }
659   }
660   if (io_status_.ok()) {
661     io_status_ = io_s;
662   }
663   if (status.ok()) {
664     status = io_s;
665   }
666   if (status.ok()) {
667     thread_pool.clear();
668     std::vector<const CompactionJob::SubcompactionState::Output*> files_output;
669     for (const auto& state : compact_->sub_compact_states) {
670       for (const auto& output : state.outputs) {
671         files_output.emplace_back(&output);
672       }
673     }
674     ColumnFamilyData* cfd = compact_->compaction->column_family_data();
675     auto prefix_extractor =
676         compact_->compaction->mutable_cf_options()->prefix_extractor.get();
677     std::atomic<size_t> next_file_idx(0);
678     auto verify_table = [&](Status& output_status) {
679       while (true) {
680         size_t file_idx = next_file_idx.fetch_add(1);
681         if (file_idx >= files_output.size()) {
682           break;
683         }
684         // Verify that the table is usable
685         // We set for_compaction to false and don't OptimizeForCompactionTableRead
686         // here because this is a special case after we finish the table building
687         // No matter whether use_direct_io_for_flush_and_compaction is true,
688         // we will regard this verification as user reads since the goal is
689         // to cache it here for further user reads
690         ReadOptions read_options;
691         InternalIterator* iter = cfd->table_cache()->NewIterator(
692             read_options, file_options_, cfd->internal_comparator(),
693             files_output[file_idx]->meta, /*range_del_agg=*/nullptr,
694             prefix_extractor,
695             /*table_reader_ptr=*/nullptr,
696             cfd->internal_stats()->GetFileReadHist(
697                 compact_->compaction->output_level()),
698             TableReaderCaller::kCompactionRefill, /*arena=*/nullptr,
699             /*skip_filters=*/false, compact_->compaction->output_level(),
700             MaxFileSizeForL0MetaPin(
701                 *compact_->compaction->mutable_cf_options()),
702             /*smallest_compaction_key=*/nullptr,
703             /*largest_compaction_key=*/nullptr,
704             /*allow_unprepared_value=*/false);
705         auto s = iter->status();
706 
707         if (s.ok() && paranoid_file_checks_) {
708           OutputValidator validator(cfd->internal_comparator(),
709                                     /*_enable_order_check=*/true,
710                                     /*_enable_hash=*/true);
711           for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
712             s = validator.Add(iter->key(), iter->value());
713             if (!s.ok()) {
714               break;
715             }
716           }
717           if (s.ok()) {
718             s = iter->status();
719           }
720           if (s.ok() &&
721               !validator.CompareValidator(files_output[file_idx]->validator)) {
722             s = Status::Corruption("Paranoid checksums do not match");
723           }
724         }
725 
726         delete iter;
727 
728         if (!s.ok()) {
729           output_status = s;
730           break;
731         }
732       }
733     };
734     for (size_t i = 1; i < compact_->sub_compact_states.size(); i++) {
735       thread_pool.emplace_back(verify_table,
736                                std::ref(compact_->sub_compact_states[i].status));
737     }
738     verify_table(compact_->sub_compact_states[0].status);
739     for (auto& thread : thread_pool) {
740       thread.join();
741     }
742     for (const auto& state : compact_->sub_compact_states) {
743       if (!state.status.ok()) {
744         status = state.status;
745         break;
746       }
747     }
748   }
749 
750   TablePropertiesCollection tp;
751   for (const auto& state : compact_->sub_compact_states) {
752     for (const auto& output : state.outputs) {
753       auto fn =
754           TableFileName(state.compaction->immutable_cf_options()->cf_paths,
755                         output.meta.fd.GetNumber(), output.meta.fd.GetPathId());
756       tp[fn] = output.table_properties;
757     }
758   }
759   compact_->compaction->SetOutputTableProperties(std::move(tp));
760 
761   // Finish up all book-keeping to unify the subcompaction results
762   AggregateStatistics();
763   UpdateCompactionStats();
764 
765   RecordCompactionIOStats();
766   LogFlush(db_options_.info_log);
767   TEST_SYNC_POINT("CompactionJob::Run():End");
768 
769   compact_->status = status;
770   return status;
771 }
772 
Install(const MutableCFOptions & mutable_cf_options)773 Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
774   assert(compact_);
775 
776   AutoThreadOperationStageUpdater stage_updater(
777       ThreadStatus::STAGE_COMPACTION_INSTALL);
778   db_mutex_->AssertHeld();
779   Status status = compact_->status;
780 
781   ColumnFamilyData* cfd = compact_->compaction->column_family_data();
782   assert(cfd);
783 
784   cfd->internal_stats()->AddCompactionStats(
785       compact_->compaction->output_level(), thread_pri_, compaction_stats_);
786 
787   if (status.ok()) {
788     status = InstallCompactionResults(mutable_cf_options);
789   }
790   if (!versions_->io_status().ok()) {
791     io_status_ = versions_->io_status();
792   }
793 
794   VersionStorageInfo::LevelSummaryStorage tmp;
795   auto vstorage = cfd->current()->storage_info();
796   const auto& stats = compaction_stats_;
797 
798   double read_write_amp = 0.0;
799   double write_amp = 0.0;
800   double bytes_read_per_sec = 0;
801   double bytes_written_per_sec = 0;
802 
803   const uint64_t bytes_read_non_output_and_blob =
804       stats.bytes_read_non_output_levels + stats.bytes_read_blob;
805   const uint64_t bytes_read_all =
806       stats.bytes_read_output_level + bytes_read_non_output_and_blob;
807   const uint64_t bytes_written_all =
808       stats.bytes_written + stats.bytes_written_blob;
809 
810   if (bytes_read_non_output_and_blob > 0) {
811     read_write_amp = (bytes_written_all + bytes_read_all) /
812                      static_cast<double>(bytes_read_non_output_and_blob);
813     write_amp =
814         bytes_written_all / static_cast<double>(bytes_read_non_output_and_blob);
815   }
816   if (stats.micros > 0) {
817     bytes_read_per_sec = bytes_read_all / static_cast<double>(stats.micros);
818     bytes_written_per_sec =
819         bytes_written_all / static_cast<double>(stats.micros);
820   }
821 
822   const std::string& column_family_name = cfd->GetName();
823 
824   constexpr double kMB = 1048576.0;
825 
826   ROCKS_LOG_BUFFER(
827       log_buffer_,
828       "[%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, "
829       "files in(%d, %d) out(%d +%d blob) "
830       "MB in(%.1f, %.1f +%.1f blob) out(%.1f +%.1f blob), "
831       "read-write-amplify(%.1f) write-amplify(%.1f) %s, records in: %" PRIu64
832       ", records dropped: %" PRIu64 " output_compression: %s\n",
833       column_family_name.c_str(), vstorage->LevelSummary(&tmp),
834       bytes_read_per_sec, bytes_written_per_sec,
835       compact_->compaction->output_level(),
836       stats.num_input_files_in_non_output_levels,
837       stats.num_input_files_in_output_level, stats.num_output_files,
838       stats.num_output_files_blob, stats.bytes_read_non_output_levels / kMB,
839       stats.bytes_read_output_level / kMB, stats.bytes_read_blob / kMB,
840       stats.bytes_written / kMB, stats.bytes_written_blob / kMB, read_write_amp,
841       write_amp, status.ToString().c_str(), stats.num_input_records,
842       stats.num_dropped_records,
843       CompressionTypeToString(compact_->compaction->output_compression())
844           .c_str());
845 
846   const auto& blob_files = vstorage->GetBlobFiles();
847   if (!blob_files.empty()) {
848     ROCKS_LOG_BUFFER(log_buffer_,
849                      "[%s] Blob file summary: head=%" PRIu64 ", tail=%" PRIu64
850                      "\n",
851                      column_family_name.c_str(), blob_files.begin()->first,
852                      blob_files.rbegin()->first);
853   }
854 
855   UpdateCompactionJobStats(stats);
856 
857   auto stream = event_logger_->LogToBuffer(log_buffer_);
858   stream << "job" << job_id_ << "event"
859          << "compaction_finished"
860          << "compaction_time_micros" << stats.micros
861          << "compaction_time_cpu_micros" << stats.cpu_micros << "output_level"
862          << compact_->compaction->output_level() << "num_output_files"
863          << compact_->num_output_files << "total_output_size"
864          << compact_->total_bytes;
865 
866   if (compact_->num_blob_output_files > 0) {
867     stream << "num_blob_output_files" << compact_->num_blob_output_files
868            << "total_blob_output_size" << compact_->total_blob_bytes;
869   }
870 
871   stream << "num_input_records" << stats.num_input_records
872          << "num_output_records" << compact_->num_output_records
873          << "num_subcompactions" << compact_->sub_compact_states.size()
874          << "output_compression"
875          << CompressionTypeToString(compact_->compaction->output_compression());
876 
877   stream << "num_single_delete_mismatches"
878          << compaction_job_stats_->num_single_del_mismatch;
879   stream << "num_single_delete_fallthrough"
880          << compaction_job_stats_->num_single_del_fallthru;
881 
882   if (measure_io_stats_) {
883     stream << "file_write_nanos" << compaction_job_stats_->file_write_nanos;
884     stream << "file_range_sync_nanos"
885            << compaction_job_stats_->file_range_sync_nanos;
886     stream << "file_fsync_nanos" << compaction_job_stats_->file_fsync_nanos;
887     stream << "file_prepare_write_nanos"
888            << compaction_job_stats_->file_prepare_write_nanos;
889   }
890 
891   stream << "lsm_state";
892   stream.StartArray();
893   for (int level = 0; level < vstorage->num_levels(); ++level) {
894     stream << vstorage->NumLevelFiles(level);
895   }
896   stream.EndArray();
897 
898   if (!blob_files.empty()) {
899     stream << "blob_file_head" << blob_files.begin()->first;
900     stream << "blob_file_tail" << blob_files.rbegin()->first;
901   }
902 
903   CleanupCompaction();
904   return status;
905 }
906 
907 #ifndef ROCKSDB_LITE
ProcessKeyValueCompactionWithCompactionService(SubcompactionState * sub_compact)908 void CompactionJob::ProcessKeyValueCompactionWithCompactionService(
909     SubcompactionState* sub_compact) {
910   assert(sub_compact);
911   assert(sub_compact->compaction);
912   assert(db_options_.compaction_service);
913 
914   const Compaction* compaction = sub_compact->compaction;
915   CompactionServiceInput compaction_input;
916   compaction_input.output_level = compaction->output_level();
917 
918   const std::vector<CompactionInputFiles>& inputs =
919       *(compact_->compaction->inputs());
920   for (const auto& files_per_level : inputs) {
921     for (const auto& file : files_per_level.files) {
922       compaction_input.input_files.emplace_back(
923           MakeTableFileName(file->fd.GetNumber()));
924     }
925   }
926   compaction_input.column_family.name =
927       compaction->column_family_data()->GetName();
928   compaction_input.column_family.options =
929       compaction->column_family_data()->GetLatestCFOptions();
930   compaction_input.db_options =
931       BuildDBOptions(db_options_, mutable_db_options_copy_);
932   compaction_input.snapshots = existing_snapshots_;
933   compaction_input.has_begin = sub_compact->start;
934   compaction_input.begin =
935       compaction_input.has_begin ? sub_compact->start->ToString() : "";
936   compaction_input.has_end = sub_compact->end;
937   compaction_input.end =
938       compaction_input.has_end ? sub_compact->end->ToString() : "";
939   compaction_input.approx_size = sub_compact->approx_size;
940 
941   std::string compaction_input_binary;
942   Status s = compaction_input.Write(&compaction_input_binary);
943   if (!s.ok()) {
944     sub_compact->status = s;
945     return;
946   }
947 
948   std::ostringstream input_files_oss;
949   bool is_first_one = true;
950   for (const auto& file : compaction_input.input_files) {
951     input_files_oss << (is_first_one ? "" : ", ") << file;
952     is_first_one = false;
953   }
954 
955   ROCKS_LOG_INFO(
956       db_options_.info_log,
957       "[%s] [JOB %d] Starting remote compaction (output level: %d): %s",
958       compaction_input.column_family.name.c_str(), job_id_,
959       compaction_input.output_level, input_files_oss.str().c_str());
960   CompactionServiceJobStatus compaction_status =
961       db_options_.compaction_service->Start(compaction_input_binary, job_id_);
962   if (compaction_status != CompactionServiceJobStatus::kSuccess) {
963     sub_compact->status =
964         Status::Incomplete("CompactionService failed to start compaction job.");
965     return;
966   }
967 
968   std::string compaction_result_binary;
969   compaction_status = db_options_.compaction_service->WaitForComplete(
970       job_id_, &compaction_result_binary);
971 
972   CompactionServiceResult compaction_result;
973   s = CompactionServiceResult::Read(compaction_result_binary,
974                                     &compaction_result);
975   if (compaction_status != CompactionServiceJobStatus::kSuccess) {
976     sub_compact->status =
977         s.ok() ? compaction_result.status
978                : Status::Incomplete(
979                      "CompactionService failed to run compaction job.");
980     compaction_result.status.PermitUncheckedError();
981     ROCKS_LOG_WARN(db_options_.info_log,
982                    "[%s] [JOB %d] Remote compaction failed, status: %s",
983                    compaction_input.column_family.name.c_str(), job_id_,
984                    s.ToString().c_str());
985     return;
986   }
987 
988   if (!s.ok()) {
989     sub_compact->status = s;
990     compaction_result.status.PermitUncheckedError();
991     return;
992   }
993   sub_compact->status = compaction_result.status;
994 
995   std::ostringstream output_files_oss;
996   is_first_one = true;
997   for (const auto& file : compaction_result.output_files) {
998     output_files_oss << (is_first_one ? "" : ", ") << file.file_name;
999     is_first_one = false;
1000   }
1001 
1002   ROCKS_LOG_INFO(db_options_.info_log,
1003                  "[%s] [JOB %d] Receive remote compaction result, output path: "
1004                  "%s, files: %s",
1005                  compaction_input.column_family.name.c_str(), job_id_,
1006                  compaction_result.output_path.c_str(),
1007                  output_files_oss.str().c_str());
1008 
1009   if (!s.ok()) {
1010     sub_compact->status = s;
1011     return;
1012   }
1013 
1014   for (const auto& file : compaction_result.output_files) {
1015     uint64_t file_num = versions_->NewFileNumber();
1016     auto src_file = compaction_result.output_path + "/" + file.file_name;
1017     auto tgt_file = TableFileName(compaction->immutable_cf_options()->cf_paths,
1018                                   file_num, compaction->output_path_id());
1019     s = fs_->RenameFile(src_file, tgt_file, IOOptions(), nullptr);
1020     if (!s.ok()) {
1021       sub_compact->status = s;
1022       return;
1023     }
1024 
1025     FileMetaData meta;
1026     uint64_t file_size;
1027     s = fs_->GetFileSize(tgt_file, IOOptions(), &file_size, nullptr);
1028     if (!s.ok()) {
1029       sub_compact->status = s;
1030       return;
1031     }
1032     meta.fd = FileDescriptor(file_num, compaction->output_path_id(), file_size,
1033                              file.smallest_seqno, file.largest_seqno);
1034     meta.smallest.DecodeFrom(file.smallest_internal_key);
1035     meta.largest.DecodeFrom(file.largest_internal_key);
1036     meta.oldest_ancester_time = file.oldest_ancester_time;
1037     meta.file_creation_time = file.file_creation_time;
1038     meta.marked_for_compaction = file.marked_for_compaction;
1039 
1040     auto cfd = compaction->column_family_data();
1041     sub_compact->outputs.emplace_back(std::move(meta),
1042                                       cfd->internal_comparator(), false, false,
1043                                       true, file.paranoid_hash);
1044   }
1045   sub_compact->compaction_job_stats = compaction_result.stats;
1046   sub_compact->num_output_records = compaction_result.num_output_records;
1047   sub_compact->approx_size = compaction_input.approx_size;  // is this used?
1048   sub_compact->total_bytes = compaction_result.total_bytes;
1049   IOSTATS_ADD(bytes_written, compaction_result.bytes_written);
1050   IOSTATS_ADD(bytes_read, compaction_result.bytes_read);
1051 }
1052 #endif  // !ROCKSDB_LITE
1053 
ProcessKeyValueCompaction(SubcompactionState * sub_compact)1054 void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
1055   assert(sub_compact);
1056   assert(sub_compact->compaction);
1057 
1058 #ifndef ROCKSDB_LITE
1059   if (db_options_.compaction_service) {
1060     return ProcessKeyValueCompactionWithCompactionService(sub_compact);
1061   }
1062 #endif  // !ROCKSDB_LITE
1063 
1064   uint64_t prev_cpu_micros = db_options_.clock->CPUNanos() / 1000;
1065 
1066   ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
1067 
1068   // Create compaction filter and fail the compaction if
1069   // IgnoreSnapshots() = false because it is not supported anymore
1070   const CompactionFilter* compaction_filter =
1071       cfd->ioptions()->compaction_filter;
1072   std::unique_ptr<CompactionFilter> compaction_filter_from_factory = nullptr;
1073   if (compaction_filter == nullptr) {
1074     compaction_filter_from_factory =
1075         sub_compact->compaction->CreateCompactionFilter();
1076     compaction_filter = compaction_filter_from_factory.get();
1077   }
1078   if (compaction_filter != nullptr && !compaction_filter->IgnoreSnapshots()) {
1079     sub_compact->status = Status::NotSupported(
1080         "CompactionFilter::IgnoreSnapshots() = false is not supported "
1081         "anymore.");
1082     return;
1083   }
1084 
1085   CompactionRangeDelAggregator range_del_agg(&cfd->internal_comparator(),
1086                                              existing_snapshots_);
1087   ReadOptions read_options;
1088   read_options.verify_checksums = true;
1089   read_options.fill_cache = false;
1090   // Compaction iterators shouldn't be confined to a single prefix.
1091   // Compactions use Seek() for
1092   // (a) concurrent compactions,
1093   // (b) CompactionFilter::Decision::kRemoveAndSkipUntil.
1094   read_options.total_order_seek = true;
1095 
1096   // Although the v2 aggregator is what the level iterator(s) know about,
1097   // the AddTombstones calls will be propagated down to the v1 aggregator.
1098   std::unique_ptr<InternalIterator> input(
1099       versions_->MakeInputIterator(read_options, sub_compact->compaction,
1100                                    &range_del_agg, file_options_for_read_));
1101 
1102   AutoThreadOperationStageUpdater stage_updater(
1103       ThreadStatus::STAGE_COMPACTION_PROCESS_KV);
1104 
1105   // I/O measurement variables
1106   PerfLevel prev_perf_level = PerfLevel::kEnableTime;
1107   const uint64_t kRecordStatsEvery = 1000;
1108   uint64_t prev_write_nanos = 0;
1109   uint64_t prev_fsync_nanos = 0;
1110   uint64_t prev_range_sync_nanos = 0;
1111   uint64_t prev_prepare_write_nanos = 0;
1112   uint64_t prev_cpu_write_nanos = 0;
1113   uint64_t prev_cpu_read_nanos = 0;
1114   if (measure_io_stats_) {
1115     prev_perf_level = GetPerfLevel();
1116     SetPerfLevel(PerfLevel::kEnableTimeAndCPUTimeExceptForMutex);
1117     prev_write_nanos = IOSTATS(write_nanos);
1118     prev_fsync_nanos = IOSTATS(fsync_nanos);
1119     prev_range_sync_nanos = IOSTATS(range_sync_nanos);
1120     prev_prepare_write_nanos = IOSTATS(prepare_write_nanos);
1121     prev_cpu_write_nanos = IOSTATS(cpu_write_nanos);
1122     prev_cpu_read_nanos = IOSTATS(cpu_read_nanos);
1123   }
1124 
1125   MergeHelper merge(
1126       env_, cfd->user_comparator(), cfd->ioptions()->merge_operator.get(),
1127       compaction_filter, db_options_.info_log.get(),
1128       false /* internal key corruption is expected */,
1129       existing_snapshots_.empty() ? 0 : existing_snapshots_.back(),
1130       snapshot_checker_, compact_->compaction->level(), db_options_.stats);
1131 
1132   const MutableCFOptions* mutable_cf_options =
1133       sub_compact->compaction->mutable_cf_options();
1134   assert(mutable_cf_options);
1135 
1136   std::vector<std::string> blob_file_paths;
1137 
1138   std::unique_ptr<BlobFileBuilder> blob_file_builder(
1139       mutable_cf_options->enable_blob_files
1140           ? new BlobFileBuilder(versions_, fs_.get(),
1141                                 sub_compact->compaction->immutable_cf_options(),
1142                                 mutable_cf_options, &file_options_, job_id_,
1143                                 cfd->GetID(), cfd->GetName(),
1144                                 Env::IOPriority::IO_LOW, write_hint_,
1145                                 io_tracer_, blob_callback_, &blob_file_paths,
1146                                 &sub_compact->blob_file_additions)
1147           : nullptr);
1148 
1149   TEST_SYNC_POINT("CompactionJob::Run():Inprogress");
1150   TEST_SYNC_POINT_CALLBACK(
1151       "CompactionJob::Run():PausingManualCompaction:1",
1152       reinterpret_cast<void*>(
1153           const_cast<std::atomic<int>*>(manual_compaction_paused_)));
1154 
1155   Slice* start = sub_compact->start;
1156   Slice* end = sub_compact->end;
1157   if (start != nullptr) {
1158     IterKey start_iter;
1159     start_iter.SetInternalKey(*start, kMaxSequenceNumber, kValueTypeForSeek);
1160     input->Seek(start_iter.GetInternalKey());
1161   } else {
1162     input->SeekToFirst();
1163   }
1164 
1165   Status status;
1166   const std::string* const full_history_ts_low =
1167       full_history_ts_low_.empty() ? nullptr : &full_history_ts_low_;
1168   sub_compact->c_iter.reset(new CompactionIterator(
1169       input.get(), cfd->user_comparator(), &merge, versions_->LastSequence(),
1170       &existing_snapshots_, earliest_write_conflict_snapshot_,
1171       snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_),
1172       /*expect_valid_internal_key=*/true, &range_del_agg,
1173       blob_file_builder.get(), db_options_.allow_data_in_errors,
1174       sub_compact->compaction, compaction_filter, shutting_down_,
1175       preserve_deletes_seqnum_, manual_compaction_paused_, db_options_.info_log,
1176       full_history_ts_low));
1177   auto c_iter = sub_compact->c_iter.get();
1178   c_iter->SeekToFirst();
1179   if (c_iter->Valid() && sub_compact->compaction->output_level() != 0) {
1180     // ShouldStopBefore() maintains state based on keys processed so far. The
1181     // compaction loop always calls it on the "next" key, thus won't tell it the
1182     // first key. So we do that here.
1183     sub_compact->ShouldStopBefore(c_iter->key(),
1184                                   sub_compact->current_output_file_size);
1185   }
1186   const auto& c_iter_stats = c_iter->iter_stats();
1187 
1188   std::unique_ptr<SstPartitioner> partitioner =
1189       sub_compact->compaction->output_level() == 0
1190           ? nullptr
1191           : sub_compact->compaction->CreateSstPartitioner();
1192   std::string last_key_for_partitioner;
1193 
1194   while (status.ok() && !cfd->IsDropped() && c_iter->Valid()) {
1195     // Invariant: c_iter.status() is guaranteed to be OK if c_iter->Valid()
1196     // returns true.
1197     const Slice& key = c_iter->key();
1198     const Slice& value = c_iter->value();
1199 
1200     // If an end key (exclusive) is specified, check if the current key is
1201     // >= than it and exit if it is because the iterator is out of its range
1202     if (end != nullptr &&
1203         cfd->user_comparator()->Compare(c_iter->user_key(), *end) >= 0) {
1204       break;
1205     }
1206     if (c_iter_stats.num_input_records % kRecordStatsEvery ==
1207         kRecordStatsEvery - 1) {
1208       RecordDroppedKeys(c_iter_stats, &sub_compact->compaction_job_stats);
1209       c_iter->ResetRecordCounts();
1210       RecordCompactionIOStats();
1211     }
1212 
1213     // Open output file if necessary
1214     if (sub_compact->builder == nullptr) {
1215       status = OpenCompactionOutputFile(sub_compact);
1216       if (!status.ok()) {
1217         break;
1218       }
1219     }
1220     status = sub_compact->AddToBuilder(key, value);
1221     if (!status.ok()) {
1222       break;
1223     }
1224 
1225     sub_compact->current_output_file_size =
1226         sub_compact->builder->EstimatedFileSize();
1227     const ParsedInternalKey& ikey = c_iter->ikey();
1228     sub_compact->current_output()->meta.UpdateBoundaries(
1229         key, value, ikey.sequence, ikey.type);
1230     sub_compact->num_output_records++;
1231 
1232     // Close output file if it is big enough. Two possibilities determine it's
1233     // time to close it: (1) the current key should be this file's last key, (2)
1234     // the next key should not be in this file.
1235     //
1236     // TODO(aekmekji): determine if file should be closed earlier than this
1237     // during subcompactions (i.e. if output size, estimated by input size, is
1238     // going to be 1.2MB and max_output_file_size = 1MB, prefer to have 0.6MB
1239     // and 0.6MB instead of 1MB and 0.2MB)
1240     bool output_file_ended = false;
1241     if (sub_compact->compaction->output_level() != 0 &&
1242         sub_compact->current_output_file_size >=
1243             sub_compact->compaction->max_output_file_size()) {
1244       // (1) this key terminates the file. For historical reasons, the iterator
1245       // status before advancing will be given to FinishCompactionOutputFile().
1246       output_file_ended = true;
1247     }
1248     TEST_SYNC_POINT_CALLBACK(
1249         "CompactionJob::Run():PausingManualCompaction:2",
1250         reinterpret_cast<void*>(
1251             const_cast<std::atomic<int>*>(manual_compaction_paused_)));
1252     if (partitioner.get()) {
1253       last_key_for_partitioner.assign(c_iter->user_key().data_,
1254                                       c_iter->user_key().size_);
1255     }
1256     c_iter->Next();
1257     if (c_iter->status().IsManualCompactionPaused()) {
1258       break;
1259     }
1260     if (!output_file_ended && c_iter->Valid()) {
1261       if (((partitioner.get() &&
1262             partitioner->ShouldPartition(PartitionerRequest(
1263                 last_key_for_partitioner, c_iter->user_key(),
1264                 sub_compact->current_output_file_size)) == kRequired) ||
1265            (sub_compact->compaction->output_level() != 0 &&
1266             sub_compact->ShouldStopBefore(
1267                 c_iter->key(), sub_compact->current_output_file_size))) &&
1268           sub_compact->builder != nullptr) {
1269         // (2) this key belongs to the next file. For historical reasons, the
1270         // iterator status after advancing will be given to
1271         // FinishCompactionOutputFile().
1272         output_file_ended = true;
1273       }
1274     }
1275     if (output_file_ended) {
1276       const Slice* next_key = nullptr;
1277       if (c_iter->Valid()) {
1278         next_key = &c_iter->key();
1279       }
1280       CompactionIterationStats range_del_out_stats;
1281       status = FinishCompactionOutputFile(input->status(), sub_compact,
1282                                           &range_del_agg, &range_del_out_stats,
1283                                           next_key);
1284       RecordDroppedKeys(range_del_out_stats,
1285                         &sub_compact->compaction_job_stats);
1286     }
1287   }
1288 
1289   sub_compact->compaction_job_stats.num_blobs_read =
1290       c_iter_stats.num_blobs_read;
1291   sub_compact->compaction_job_stats.total_blob_bytes_read =
1292       c_iter_stats.total_blob_bytes_read;
1293   sub_compact->compaction_job_stats.num_input_deletion_records =
1294       c_iter_stats.num_input_deletion_records;
1295   sub_compact->compaction_job_stats.num_corrupt_keys =
1296       c_iter_stats.num_input_corrupt_records;
1297   sub_compact->compaction_job_stats.num_single_del_fallthru =
1298       c_iter_stats.num_single_del_fallthru;
1299   sub_compact->compaction_job_stats.num_single_del_mismatch =
1300       c_iter_stats.num_single_del_mismatch;
1301   sub_compact->compaction_job_stats.total_input_raw_key_bytes +=
1302       c_iter_stats.total_input_raw_key_bytes;
1303   sub_compact->compaction_job_stats.total_input_raw_value_bytes +=
1304       c_iter_stats.total_input_raw_value_bytes;
1305 
1306   RecordTick(stats_, FILTER_OPERATION_TOTAL_TIME,
1307              c_iter_stats.total_filter_time);
1308   RecordDroppedKeys(c_iter_stats, &sub_compact->compaction_job_stats);
1309   RecordCompactionIOStats();
1310 
1311   if (status.ok() && cfd->IsDropped()) {
1312     status =
1313         Status::ColumnFamilyDropped("Column family dropped during compaction");
1314   }
1315   if ((status.ok() || status.IsColumnFamilyDropped()) &&
1316       shutting_down_->load(std::memory_order_relaxed)) {
1317     status = Status::ShutdownInProgress("Database shutdown");
1318   }
1319   if ((status.ok() || status.IsColumnFamilyDropped()) &&
1320       (manual_compaction_paused_ &&
1321        manual_compaction_paused_->load(std::memory_order_relaxed) > 0)) {
1322     status = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
1323   }
1324   if (status.ok()) {
1325     status = input->status();
1326   }
1327   if (status.ok()) {
1328     status = c_iter->status();
1329   }
1330 
1331   if (status.ok() && sub_compact->builder == nullptr &&
1332       sub_compact->outputs.size() == 0 && !range_del_agg.IsEmpty()) {
1333     // handle subcompaction containing only range deletions
1334     status = OpenCompactionOutputFile(sub_compact);
1335   }
1336 
1337   // Call FinishCompactionOutputFile() even if status is not ok: it needs to
1338   // close the output file.
1339   if (sub_compact->builder != nullptr) {
1340     CompactionIterationStats range_del_out_stats;
1341     Status s = FinishCompactionOutputFile(status, sub_compact, &range_del_agg,
1342                                           &range_del_out_stats);
1343     if (!s.ok() && status.ok()) {
1344       status = s;
1345     }
1346     RecordDroppedKeys(range_del_out_stats, &sub_compact->compaction_job_stats);
1347   }
1348 
1349   if (blob_file_builder) {
1350     if (status.ok()) {
1351       status = blob_file_builder->Finish();
1352     } else {
1353       blob_file_builder->Abandon();
1354     }
1355     blob_file_builder.reset();
1356   }
1357 
1358   sub_compact->compaction_job_stats.cpu_micros =
1359       db_options_.clock->CPUNanos() / 1000 - prev_cpu_micros;
1360 
1361   if (measure_io_stats_) {
1362     sub_compact->compaction_job_stats.file_write_nanos +=
1363         IOSTATS(write_nanos) - prev_write_nanos;
1364     sub_compact->compaction_job_stats.file_fsync_nanos +=
1365         IOSTATS(fsync_nanos) - prev_fsync_nanos;
1366     sub_compact->compaction_job_stats.file_range_sync_nanos +=
1367         IOSTATS(range_sync_nanos) - prev_range_sync_nanos;
1368     sub_compact->compaction_job_stats.file_prepare_write_nanos +=
1369         IOSTATS(prepare_write_nanos) - prev_prepare_write_nanos;
1370     sub_compact->compaction_job_stats.cpu_micros -=
1371         (IOSTATS(cpu_write_nanos) - prev_cpu_write_nanos +
1372          IOSTATS(cpu_read_nanos) - prev_cpu_read_nanos) /
1373         1000;
1374     if (prev_perf_level != PerfLevel::kEnableTimeAndCPUTimeExceptForMutex) {
1375       SetPerfLevel(prev_perf_level);
1376     }
1377   }
1378 #ifdef ROCKSDB_ASSERT_STATUS_CHECKED
1379   if (!status.ok()) {
1380     if (sub_compact->c_iter) {
1381       sub_compact->c_iter->status().PermitUncheckedError();
1382     }
1383     if (input) {
1384       input->status().PermitUncheckedError();
1385     }
1386   }
1387 #endif  // ROCKSDB_ASSERT_STATUS_CHECKED
1388 
1389   sub_compact->c_iter.reset();
1390   input.reset();
1391   sub_compact->status = status;
1392 }
1393 
RecordDroppedKeys(const CompactionIterationStats & c_iter_stats,CompactionJobStats * compaction_job_stats)1394 void CompactionJob::RecordDroppedKeys(
1395     const CompactionIterationStats& c_iter_stats,
1396     CompactionJobStats* compaction_job_stats) {
1397   if (c_iter_stats.num_record_drop_user > 0) {
1398     RecordTick(stats_, COMPACTION_KEY_DROP_USER,
1399                c_iter_stats.num_record_drop_user);
1400   }
1401   if (c_iter_stats.num_record_drop_hidden > 0) {
1402     RecordTick(stats_, COMPACTION_KEY_DROP_NEWER_ENTRY,
1403                c_iter_stats.num_record_drop_hidden);
1404     if (compaction_job_stats) {
1405       compaction_job_stats->num_records_replaced +=
1406           c_iter_stats.num_record_drop_hidden;
1407     }
1408   }
1409   if (c_iter_stats.num_record_drop_obsolete > 0) {
1410     RecordTick(stats_, COMPACTION_KEY_DROP_OBSOLETE,
1411                c_iter_stats.num_record_drop_obsolete);
1412     if (compaction_job_stats) {
1413       compaction_job_stats->num_expired_deletion_records +=
1414           c_iter_stats.num_record_drop_obsolete;
1415     }
1416   }
1417   if (c_iter_stats.num_record_drop_range_del > 0) {
1418     RecordTick(stats_, COMPACTION_KEY_DROP_RANGE_DEL,
1419                c_iter_stats.num_record_drop_range_del);
1420   }
1421   if (c_iter_stats.num_range_del_drop_obsolete > 0) {
1422     RecordTick(stats_, COMPACTION_RANGE_DEL_DROP_OBSOLETE,
1423                c_iter_stats.num_range_del_drop_obsolete);
1424   }
1425   if (c_iter_stats.num_optimized_del_drop_obsolete > 0) {
1426     RecordTick(stats_, COMPACTION_OPTIMIZED_DEL_DROP_OBSOLETE,
1427                c_iter_stats.num_optimized_del_drop_obsolete);
1428   }
1429 }
1430 
FinishCompactionOutputFile(const Status & input_status,SubcompactionState * sub_compact,CompactionRangeDelAggregator * range_del_agg,CompactionIterationStats * range_del_out_stats,const Slice * next_table_min_key)1431 Status CompactionJob::FinishCompactionOutputFile(
1432     const Status& input_status, SubcompactionState* sub_compact,
1433     CompactionRangeDelAggregator* range_del_agg,
1434     CompactionIterationStats* range_del_out_stats,
1435     const Slice* next_table_min_key /* = nullptr */) {
1436   AutoThreadOperationStageUpdater stage_updater(
1437       ThreadStatus::STAGE_COMPACTION_SYNC_FILE);
1438   assert(sub_compact != nullptr);
1439   assert(sub_compact->outfile);
1440   assert(sub_compact->builder != nullptr);
1441   assert(sub_compact->current_output() != nullptr);
1442 
1443   uint64_t output_number = sub_compact->current_output()->meta.fd.GetNumber();
1444   assert(output_number != 0);
1445 
1446   ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
1447   const Comparator* ucmp = cfd->user_comparator();
1448   std::string file_checksum = kUnknownFileChecksum;
1449   std::string file_checksum_func_name = kUnknownFileChecksumFuncName;
1450 
1451   // Check for iterator errors
1452   Status s = input_status;
1453   auto meta = &sub_compact->current_output()->meta;
1454   assert(meta != nullptr);
1455   if (s.ok()) {
1456     Slice lower_bound_guard, upper_bound_guard;
1457     std::string smallest_user_key;
1458     const Slice *lower_bound, *upper_bound;
1459     bool lower_bound_from_sub_compact = false;
1460     if (sub_compact->outputs.size() == 1) {
1461       // For the first output table, include range tombstones before the min key
1462       // but after the subcompaction boundary.
1463       lower_bound = sub_compact->start;
1464       lower_bound_from_sub_compact = true;
1465     } else if (meta->smallest.size() > 0) {
1466       // For subsequent output tables, only include range tombstones from min
1467       // key onwards since the previous file was extended to contain range
1468       // tombstones falling before min key.
1469       smallest_user_key = meta->smallest.user_key().ToString(false /*hex*/);
1470       lower_bound_guard = Slice(smallest_user_key);
1471       lower_bound = &lower_bound_guard;
1472     } else {
1473       lower_bound = nullptr;
1474     }
1475     if (next_table_min_key != nullptr) {
1476       // This may be the last file in the subcompaction in some cases, so we
1477       // need to compare the end key of subcompaction with the next file start
1478       // key. When the end key is chosen by the subcompaction, we know that
1479       // it must be the biggest key in output file. Therefore, it is safe to
1480       // use the smaller key as the upper bound of the output file, to ensure
1481       // that there is no overlapping between different output files.
1482       upper_bound_guard = ExtractUserKey(*next_table_min_key);
1483       if (sub_compact->end != nullptr &&
1484           ucmp->Compare(upper_bound_guard, *sub_compact->end) >= 0) {
1485         upper_bound = sub_compact->end;
1486       } else {
1487         upper_bound = &upper_bound_guard;
1488       }
1489     } else {
1490       // This is the last file in the subcompaction, so extend until the
1491       // subcompaction ends.
1492       upper_bound = sub_compact->end;
1493     }
1494     auto earliest_snapshot = kMaxSequenceNumber;
1495     if (existing_snapshots_.size() > 0) {
1496       earliest_snapshot = existing_snapshots_[0];
1497     }
1498     bool has_overlapping_endpoints;
1499     if (upper_bound != nullptr && meta->largest.size() > 0) {
1500       has_overlapping_endpoints =
1501           ucmp->Compare(meta->largest.user_key(), *upper_bound) == 0;
1502     } else {
1503       has_overlapping_endpoints = false;
1504     }
1505 
1506     // The end key of the subcompaction must be bigger or equal to the upper
1507     // bound. If the end of subcompaction is null or the upper bound is null,
1508     // it means that this file is the last file in the compaction. So there
1509     // will be no overlapping between this file and others.
1510     assert(sub_compact->end == nullptr ||
1511            upper_bound == nullptr ||
1512            ucmp->Compare(*upper_bound , *sub_compact->end) <= 0);
1513     auto it = range_del_agg->NewIterator(lower_bound, upper_bound,
1514                                          has_overlapping_endpoints);
1515     // Position the range tombstone output iterator. There may be tombstone
1516     // fragments that are entirely out of range, so make sure that we do not
1517     // include those.
1518     if (lower_bound != nullptr) {
1519       it->Seek(*lower_bound);
1520     } else {
1521       it->SeekToFirst();
1522     }
1523     TEST_SYNC_POINT("CompactionJob::FinishCompactionOutputFile1");
1524     for (; it->Valid(); it->Next()) {
1525       auto tombstone = it->Tombstone();
1526       if (upper_bound != nullptr) {
1527         int cmp = ucmp->Compare(*upper_bound, tombstone.start_key_);
1528         if ((has_overlapping_endpoints && cmp < 0) ||
1529             (!has_overlapping_endpoints && cmp <= 0)) {
1530           // Tombstones starting after upper_bound only need to be included in
1531           // the next table. If the current SST ends before upper_bound, i.e.,
1532           // `has_overlapping_endpoints == false`, we can also skip over range
1533           // tombstones that start exactly at upper_bound. Such range tombstones
1534           // will be included in the next file and are not relevant to the point
1535           // keys or endpoints of the current file.
1536           break;
1537         }
1538       }
1539 
1540       if (bottommost_level_ && tombstone.seq_ <= earliest_snapshot) {
1541         // TODO(andrewkr): tombstones that span multiple output files are
1542         // counted for each compaction output file, so lots of double counting.
1543         range_del_out_stats->num_range_del_drop_obsolete++;
1544         range_del_out_stats->num_record_drop_obsolete++;
1545         continue;
1546       }
1547 
1548       auto kv = tombstone.Serialize();
1549       assert(lower_bound == nullptr ||
1550              ucmp->Compare(*lower_bound, kv.second) < 0);
1551       // Range tombstone is not supported by output validator yet.
1552       sub_compact->builder->Add(kv.first.Encode(), kv.second);
1553       InternalKey smallest_candidate = std::move(kv.first);
1554       if (lower_bound != nullptr &&
1555           ucmp->Compare(smallest_candidate.user_key(), *lower_bound) <= 0) {
1556         // Pretend the smallest key has the same user key as lower_bound
1557         // (the max key in the previous table or subcompaction) in order for
1558         // files to appear key-space partitioned.
1559         //
1560         // When lower_bound is chosen by a subcompaction, we know that
1561         // subcompactions over smaller keys cannot contain any keys at
1562         // lower_bound. We also know that smaller subcompactions exist, because
1563         // otherwise the subcompaction woud be unbounded on the left. As a
1564         // result, we know that no other files on the output level will contain
1565         // actual keys at lower_bound (an output file may have a largest key of
1566         // lower_bound@kMaxSequenceNumber, but this only indicates a large range
1567         // tombstone was truncated). Therefore, it is safe to use the
1568         // tombstone's sequence number, to ensure that keys at lower_bound at
1569         // lower levels are covered by truncated tombstones.
1570         //
1571         // If lower_bound was chosen by the smallest data key in the file,
1572         // choose lowest seqnum so this file's smallest internal key comes after
1573         // the previous file's largest. The fake seqnum is OK because the read
1574         // path's file-picking code only considers user key.
1575         smallest_candidate = InternalKey(
1576             *lower_bound, lower_bound_from_sub_compact ? tombstone.seq_ : 0,
1577             kTypeRangeDeletion);
1578       }
1579       InternalKey largest_candidate = tombstone.SerializeEndKey();
1580       if (upper_bound != nullptr &&
1581           ucmp->Compare(*upper_bound, largest_candidate.user_key()) <= 0) {
1582         // Pretend the largest key has the same user key as upper_bound (the
1583         // min key in the following table or subcompaction) in order for files
1584         // to appear key-space partitioned.
1585         //
1586         // Choose highest seqnum so this file's largest internal key comes
1587         // before the next file's/subcompaction's smallest. The fake seqnum is
1588         // OK because the read path's file-picking code only considers the user
1589         // key portion.
1590         //
1591         // Note Seek() also creates InternalKey with (user_key,
1592         // kMaxSequenceNumber), but with kTypeDeletion (0x7) instead of
1593         // kTypeRangeDeletion (0xF), so the range tombstone comes before the
1594         // Seek() key in InternalKey's ordering. So Seek() will look in the
1595         // next file for the user key.
1596         largest_candidate =
1597             InternalKey(*upper_bound, kMaxSequenceNumber, kTypeRangeDeletion);
1598       }
1599 #ifndef NDEBUG
1600       SequenceNumber smallest_ikey_seqnum = kMaxSequenceNumber;
1601       if (meta->smallest.size() > 0) {
1602         smallest_ikey_seqnum = GetInternalKeySeqno(meta->smallest.Encode());
1603       }
1604 #endif
1605       meta->UpdateBoundariesForRange(smallest_candidate, largest_candidate,
1606                                      tombstone.seq_,
1607                                      cfd->internal_comparator());
1608 
1609       // The smallest key in a file is used for range tombstone truncation, so
1610       // it cannot have a seqnum of 0 (unless the smallest data key in a file
1611       // has a seqnum of 0). Otherwise, the truncated tombstone may expose
1612       // deleted keys at lower levels.
1613       assert(smallest_ikey_seqnum == 0 ||
1614              ExtractInternalKeyFooter(meta->smallest.Encode()) !=
1615                  PackSequenceAndType(0, kTypeRangeDeletion));
1616     }
1617   }
1618   const uint64_t current_entries = sub_compact->builder->NumEntries();
1619   if (s.ok()) {
1620     s = sub_compact->builder->Finish();
1621   } else {
1622     sub_compact->builder->Abandon();
1623   }
1624   IOStatus io_s = sub_compact->builder->io_status();
1625   if (s.ok()) {
1626     s = io_s;
1627   }
1628   const uint64_t current_bytes = sub_compact->builder->FileSize();
1629   if (s.ok()) {
1630     meta->fd.file_size = current_bytes;
1631     meta->marked_for_compaction = sub_compact->builder->NeedCompact();
1632   }
1633   sub_compact->current_output()->finished = true;
1634   sub_compact->total_bytes += current_bytes;
1635 
1636   // Finish and check for file errors
1637   if (s.ok()) {
1638     StopWatch sw(db_options_.clock, stats_, COMPACTION_OUTFILE_SYNC_MICROS);
1639     io_s = sub_compact->outfile->Sync(db_options_.use_fsync);
1640   }
1641   if (s.ok() && io_s.ok()) {
1642     io_s = sub_compact->outfile->Close();
1643   }
1644   if (s.ok() && io_s.ok()) {
1645     // Add the checksum information to file metadata.
1646     meta->file_checksum = sub_compact->outfile->GetFileChecksum();
1647     meta->file_checksum_func_name =
1648         sub_compact->outfile->GetFileChecksumFuncName();
1649     file_checksum = meta->file_checksum;
1650     file_checksum_func_name = meta->file_checksum_func_name;
1651   }
1652   if (s.ok()) {
1653     s = io_s;
1654   }
1655   if (sub_compact->io_status.ok()) {
1656     sub_compact->io_status = io_s;
1657     // Since this error is really a copy of the
1658     // "normal" status, it does not also need to be checked
1659     sub_compact->io_status.PermitUncheckedError();
1660   }
1661   sub_compact->outfile.reset();
1662 
1663   TableProperties tp;
1664   if (s.ok()) {
1665     tp = sub_compact->builder->GetTableProperties();
1666   }
1667 
1668   if (s.ok() && current_entries == 0 && tp.num_range_deletions == 0) {
1669     // If there is nothing to output, no necessary to generate a sst file.
1670     // This happens when the output level is bottom level, at the same time
1671     // the sub_compact output nothing.
1672     std::string fname =
1673         TableFileName(sub_compact->compaction->immutable_cf_options()->cf_paths,
1674                       meta->fd.GetNumber(), meta->fd.GetPathId());
1675 
1676     // TODO(AR) it is not clear if there are any larger implications if
1677     // DeleteFile fails here
1678     Status ds = env_->DeleteFile(fname);
1679     if (!ds.ok()) {
1680       ROCKS_LOG_WARN(
1681           db_options_.info_log,
1682           "[%s] [JOB %d] Unable to remove SST file for table #%" PRIu64
1683           " at bottom level%s",
1684           cfd->GetName().c_str(), job_id_, output_number,
1685           meta->marked_for_compaction ? " (need compaction)" : "");
1686     }
1687 
1688     // Also need to remove the file from outputs, or it will be added to the
1689     // VersionEdit.
1690     assert(!sub_compact->outputs.empty());
1691     sub_compact->outputs.pop_back();
1692     meta = nullptr;
1693   }
1694 
1695   if (s.ok() && (current_entries > 0 || tp.num_range_deletions > 0)) {
1696     // Output to event logger and fire events.
1697     sub_compact->current_output()->table_properties =
1698         std::make_shared<TableProperties>(tp);
1699     ROCKS_LOG_INFO(db_options_.info_log,
1700                    "[%s] [JOB %d] Generated table #%" PRIu64 ": %" PRIu64
1701                    " keys, %" PRIu64 " bytes%s",
1702                    cfd->GetName().c_str(), job_id_, output_number,
1703                    current_entries, current_bytes,
1704                    meta->marked_for_compaction ? " (need compaction)" : "");
1705   }
1706   std::string fname;
1707   FileDescriptor output_fd;
1708   uint64_t oldest_blob_file_number = kInvalidBlobFileNumber;
1709   if (meta != nullptr) {
1710     fname = GetTableFileName(meta->fd.GetNumber());
1711     output_fd = meta->fd;
1712     oldest_blob_file_number = meta->oldest_blob_file_number;
1713   } else {
1714     fname = "(nil)";
1715   }
1716   EventHelpers::LogAndNotifyTableFileCreationFinished(
1717       event_logger_, cfd->ioptions()->listeners, dbname_, cfd->GetName(), fname,
1718       job_id_, output_fd, oldest_blob_file_number, tp,
1719       TableFileCreationReason::kCompaction, s, file_checksum,
1720       file_checksum_func_name);
1721 
1722 #ifndef ROCKSDB_LITE
1723   // Report new file to SstFileManagerImpl
1724   auto sfm =
1725       static_cast<SstFileManagerImpl*>(db_options_.sst_file_manager.get());
1726   if (sfm && meta != nullptr && meta->fd.GetPathId() == 0) {
1727     Status add_s = sfm->OnAddFile(fname);
1728     if (!add_s.ok() && s.ok()) {
1729       s = add_s;
1730     }
1731     if (sfm->IsMaxAllowedSpaceReached()) {
1732       // TODO(ajkr): should we return OK() if max space was reached by the final
1733       // compaction output file (similarly to how flush works when full)?
1734       s = Status::SpaceLimit("Max allowed space was reached");
1735       TEST_SYNC_POINT(
1736           "CompactionJob::FinishCompactionOutputFile:"
1737           "MaxAllowedSpaceReached");
1738       InstrumentedMutexLock l(db_mutex_);
1739       db_error_handler_->SetBGError(s, BackgroundErrorReason::kCompaction);
1740     }
1741   }
1742 #endif
1743 
1744   sub_compact->builder.reset();
1745   sub_compact->current_output_file_size = 0;
1746   return s;
1747 }
1748 
InstallCompactionResults(const MutableCFOptions & mutable_cf_options)1749 Status CompactionJob::InstallCompactionResults(
1750     const MutableCFOptions& mutable_cf_options) {
1751   assert(compact_);
1752 
1753   db_mutex_->AssertHeld();
1754 
1755   auto* compaction = compact_->compaction;
1756   assert(compaction);
1757 
1758   // paranoia: verify that the files that we started with
1759   // still exist in the current version and in the same original level.
1760   // This ensures that a concurrent compaction did not erroneously
1761   // pick the same files to compact_.
1762   if (!versions_->VerifyCompactionFileConsistency(compaction)) {
1763     Compaction::InputLevelSummaryBuffer inputs_summary;
1764 
1765     ROCKS_LOG_ERROR(db_options_.info_log, "[%s] [JOB %d] Compaction %s aborted",
1766                     compaction->column_family_data()->GetName().c_str(),
1767                     job_id_, compaction->InputLevelSummary(&inputs_summary));
1768     return Status::Corruption("Compaction input files inconsistent");
1769   }
1770 
1771   {
1772     Compaction::InputLevelSummaryBuffer inputs_summary;
1773     ROCKS_LOG_INFO(db_options_.info_log,
1774                    "[%s] [JOB %d] Compacted %s => %" PRIu64 " bytes",
1775                    compaction->column_family_data()->GetName().c_str(), job_id_,
1776                    compaction->InputLevelSummary(&inputs_summary),
1777                    compact_->total_bytes + compact_->total_blob_bytes);
1778   }
1779 
1780   VersionEdit* const edit = compaction->edit();
1781   assert(edit);
1782 
1783   // Add compaction inputs
1784   compaction->AddInputDeletions(edit);
1785 
1786   for (const auto& sub_compact : compact_->sub_compact_states) {
1787     for (const auto& out : sub_compact.outputs) {
1788       edit->AddFile(compaction->output_level(), out.meta);
1789     }
1790 
1791     for (const auto& blob : sub_compact.blob_file_additions) {
1792       edit->AddBlobFile(blob);
1793     }
1794   }
1795 
1796   return versions_->LogAndApply(compaction->column_family_data(),
1797                                 mutable_cf_options, edit, db_mutex_,
1798                                 db_directory_);
1799 }
1800 
RecordCompactionIOStats()1801 void CompactionJob::RecordCompactionIOStats() {
1802   RecordTick(stats_, COMPACT_READ_BYTES, IOSTATS(bytes_read));
1803   RecordTick(stats_, COMPACT_WRITE_BYTES, IOSTATS(bytes_written));
1804   CompactionReason compaction_reason =
1805       compact_->compaction->compaction_reason();
1806   if (compaction_reason == CompactionReason::kFilesMarkedForCompaction) {
1807     RecordTick(stats_, COMPACT_READ_BYTES_MARKED, IOSTATS(bytes_read));
1808     RecordTick(stats_, COMPACT_WRITE_BYTES_MARKED, IOSTATS(bytes_written));
1809   } else if (compaction_reason == CompactionReason::kPeriodicCompaction) {
1810     RecordTick(stats_, COMPACT_READ_BYTES_PERIODIC, IOSTATS(bytes_read));
1811     RecordTick(stats_, COMPACT_WRITE_BYTES_PERIODIC, IOSTATS(bytes_written));
1812   } else if (compaction_reason == CompactionReason::kTtl) {
1813     RecordTick(stats_, COMPACT_READ_BYTES_TTL, IOSTATS(bytes_read));
1814     RecordTick(stats_, COMPACT_WRITE_BYTES_TTL, IOSTATS(bytes_written));
1815   }
1816   ThreadStatusUtil::IncreaseThreadOperationProperty(
1817       ThreadStatus::COMPACTION_BYTES_READ, IOSTATS(bytes_read));
1818   IOSTATS_RESET(bytes_read);
1819   ThreadStatusUtil::IncreaseThreadOperationProperty(
1820       ThreadStatus::COMPACTION_BYTES_WRITTEN, IOSTATS(bytes_written));
1821   IOSTATS_RESET(bytes_written);
1822 }
1823 
OpenCompactionOutputFile(SubcompactionState * sub_compact)1824 Status CompactionJob::OpenCompactionOutputFile(
1825     SubcompactionState* sub_compact) {
1826   assert(sub_compact != nullptr);
1827   assert(sub_compact->builder == nullptr);
1828   // no need to lock because VersionSet::next_file_number_ is atomic
1829   uint64_t file_number = versions_->NewFileNumber();
1830   std::string fname = GetTableFileName(file_number);
1831   // Fire events.
1832   ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
1833 #ifndef ROCKSDB_LITE
1834   EventHelpers::NotifyTableFileCreationStarted(
1835       cfd->ioptions()->listeners, dbname_, cfd->GetName(), fname, job_id_,
1836       TableFileCreationReason::kCompaction);
1837 #endif  // !ROCKSDB_LITE
1838   // Make the output file
1839   std::unique_ptr<FSWritableFile> writable_file;
1840 #ifndef NDEBUG
1841   bool syncpoint_arg = file_options_.use_direct_writes;
1842   TEST_SYNC_POINT_CALLBACK("CompactionJob::OpenCompactionOutputFile",
1843                            &syncpoint_arg);
1844 #endif
1845 
1846   // Pass temperature of botommost files to FileSystem.
1847   FileOptions fo_copy = file_options_;
1848   Temperature temperature = Temperature::kUnknown;
1849   if (bottommost_level_) {
1850     fo_copy.temperature = temperature =
1851         sub_compact->compaction->mutable_cf_options()->bottommost_temperature;
1852   }
1853 
1854   Status s;
1855   IOStatus io_s = NewWritableFile(fs_.get(), fname, &writable_file, fo_copy);
1856   s = io_s;
1857   if (sub_compact->io_status.ok()) {
1858     sub_compact->io_status = io_s;
1859     // Since this error is really a copy of the io_s that is checked below as s,
1860     // it does not also need to be checked.
1861     sub_compact->io_status.PermitUncheckedError();
1862   }
1863   if (!s.ok()) {
1864     ROCKS_LOG_ERROR(
1865         db_options_.info_log,
1866         "[%s] [JOB %d] OpenCompactionOutputFiles for table #%" PRIu64
1867         " fails at NewWritableFile with status %s",
1868         sub_compact->compaction->column_family_data()->GetName().c_str(),
1869         job_id_, file_number, s.ToString().c_str());
1870     LogFlush(db_options_.info_log);
1871     EventHelpers::LogAndNotifyTableFileCreationFinished(
1872         event_logger_, cfd->ioptions()->listeners, dbname_, cfd->GetName(),
1873         fname, job_id_, FileDescriptor(), kInvalidBlobFileNumber,
1874         TableProperties(), TableFileCreationReason::kCompaction, s,
1875         kUnknownFileChecksum, kUnknownFileChecksumFuncName);
1876     return s;
1877   }
1878 
1879   // Try to figure out the output file's oldest ancester time.
1880   int64_t temp_current_time = 0;
1881   auto get_time_status = db_options_.clock->GetCurrentTime(&temp_current_time);
1882   // Safe to proceed even if GetCurrentTime fails. So, log and proceed.
1883   if (!get_time_status.ok()) {
1884     ROCKS_LOG_WARN(db_options_.info_log,
1885                    "Failed to get current time. Status: %s",
1886                    get_time_status.ToString().c_str());
1887   }
1888   uint64_t current_time = static_cast<uint64_t>(temp_current_time);
1889   uint64_t oldest_ancester_time =
1890       sub_compact->compaction->MinInputFileOldestAncesterTime();
1891   if (oldest_ancester_time == port::kMaxUint64) {
1892     oldest_ancester_time = current_time;
1893   }
1894 
1895   // Initialize a SubcompactionState::Output and add it to sub_compact->outputs
1896   {
1897     FileMetaData meta;
1898     meta.fd = FileDescriptor(file_number,
1899                              sub_compact->compaction->output_path_id(), 0);
1900     meta.oldest_ancester_time = oldest_ancester_time;
1901     meta.file_creation_time = current_time;
1902     meta.temperature = temperature;
1903     sub_compact->outputs.emplace_back(
1904         std::move(meta), cfd->internal_comparator(),
1905         /*enable_order_check=*/
1906         sub_compact->compaction->mutable_cf_options()
1907             ->check_flush_compaction_key_order,
1908         /*enable_hash=*/paranoid_file_checks_);
1909   }
1910 
1911   writable_file->SetIOPriority(Env::IOPriority::IO_LOW);
1912   writable_file->SetWriteLifeTimeHint(write_hint_);
1913   FileTypeSet tmp_set = db_options_.checksum_handoff_file_types;
1914   writable_file->SetPreallocationBlockSize(static_cast<size_t>(
1915       sub_compact->compaction->OutputFilePreallocationSize()));
1916   const auto& listeners =
1917       sub_compact->compaction->immutable_cf_options()->listeners;
1918   sub_compact->outfile.reset(new WritableFileWriter(
1919       std::move(writable_file), fname, file_options_, db_options_.clock,
1920       io_tracer_, db_options_.stats, listeners,
1921       db_options_.file_checksum_gen_factory.get(),
1922       tmp_set.Contains(FileType::kTableFile)));
1923 
1924   TableBuilderOptions tboptions(
1925       *cfd->ioptions(), *(sub_compact->compaction->mutable_cf_options()),
1926       cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(),
1927       sub_compact->compaction->output_compression(),
1928       sub_compact->compaction->output_compression_opts(), cfd->GetID(),
1929       cfd->GetName(), sub_compact->compaction->output_level(),
1930       bottommost_level_, TableFileCreationReason::kCompaction,
1931       oldest_ancester_time, 0 /* oldest_key_time */, current_time, db_id_,
1932       db_session_id_, sub_compact->compaction->max_output_file_size());
1933   sub_compact->builder.reset(
1934       NewTableBuilder(tboptions, sub_compact->outfile.get()));
1935   LogFlush(db_options_.info_log);
1936   return s;
1937 }
1938 
CleanupCompaction()1939 void CompactionJob::CleanupCompaction() {
1940   for (SubcompactionState& sub_compact : compact_->sub_compact_states) {
1941     const auto& sub_status = sub_compact.status;
1942 
1943     if (sub_compact.builder != nullptr) {
1944       // May happen if we get a shutdown call in the middle of compaction
1945       sub_compact.builder->Abandon();
1946       sub_compact.builder.reset();
1947     } else {
1948       assert(!sub_status.ok() || sub_compact.outfile == nullptr);
1949     }
1950     for (const auto& out : sub_compact.outputs) {
1951       // If this file was inserted into the table cache then remove
1952       // them here because this compaction was not committed.
1953       if (!sub_status.ok()) {
1954         TableCache::Evict(table_cache_.get(), out.meta.fd.GetNumber());
1955       }
1956     }
1957     // TODO: sub_compact.io_status is not checked like status. Not sure if thats
1958     // intentional. So ignoring the io_status as of now.
1959     sub_compact.io_status.PermitUncheckedError();
1960   }
1961   delete compact_;
1962   compact_ = nullptr;
1963 }
1964 
1965 #ifndef ROCKSDB_LITE
1966 namespace {
CopyPrefix(const Slice & src,size_t prefix_length,std::string * dst)1967 void CopyPrefix(const Slice& src, size_t prefix_length, std::string* dst) {
1968   assert(prefix_length > 0);
1969   size_t length = src.size() > prefix_length ? prefix_length : src.size();
1970   dst->assign(src.data(), length);
1971 }
1972 }  // namespace
1973 
1974 #endif  // !ROCKSDB_LITE
1975 
UpdateCompactionStats()1976 void CompactionJob::UpdateCompactionStats() {
1977   assert(compact_);
1978 
1979   Compaction* compaction = compact_->compaction;
1980   compaction_stats_.num_input_files_in_non_output_levels = 0;
1981   compaction_stats_.num_input_files_in_output_level = 0;
1982   for (int input_level = 0;
1983        input_level < static_cast<int>(compaction->num_input_levels());
1984        ++input_level) {
1985     if (compaction->level(input_level) != compaction->output_level()) {
1986       UpdateCompactionInputStatsHelper(
1987           &compaction_stats_.num_input_files_in_non_output_levels,
1988           &compaction_stats_.bytes_read_non_output_levels, input_level);
1989     } else {
1990       UpdateCompactionInputStatsHelper(
1991           &compaction_stats_.num_input_files_in_output_level,
1992           &compaction_stats_.bytes_read_output_level, input_level);
1993     }
1994   }
1995 
1996   assert(compaction_job_stats_);
1997   compaction_stats_.bytes_read_blob =
1998       compaction_job_stats_->total_blob_bytes_read;
1999 
2000   compaction_stats_.num_output_files =
2001       static_cast<int>(compact_->num_output_files);
2002   compaction_stats_.num_output_files_blob =
2003       static_cast<int>(compact_->num_blob_output_files);
2004   compaction_stats_.bytes_written = compact_->total_bytes;
2005   compaction_stats_.bytes_written_blob = compact_->total_blob_bytes;
2006 
2007   if (compaction_stats_.num_input_records > compact_->num_output_records) {
2008     compaction_stats_.num_dropped_records =
2009         compaction_stats_.num_input_records - compact_->num_output_records;
2010   }
2011 }
2012 
UpdateCompactionInputStatsHelper(int * num_files,uint64_t * bytes_read,int input_level)2013 void CompactionJob::UpdateCompactionInputStatsHelper(int* num_files,
2014                                                      uint64_t* bytes_read,
2015                                                      int input_level) {
2016   const Compaction* compaction = compact_->compaction;
2017   auto num_input_files = compaction->num_input_files(input_level);
2018   *num_files += static_cast<int>(num_input_files);
2019 
2020   for (size_t i = 0; i < num_input_files; ++i) {
2021     const auto* file_meta = compaction->input(input_level, i);
2022     *bytes_read += file_meta->fd.GetFileSize();
2023     compaction_stats_.num_input_records +=
2024         static_cast<uint64_t>(file_meta->num_entries);
2025   }
2026 }
2027 
UpdateCompactionJobStats(const InternalStats::CompactionStats & stats) const2028 void CompactionJob::UpdateCompactionJobStats(
2029     const InternalStats::CompactionStats& stats) const {
2030 #ifndef ROCKSDB_LITE
2031   compaction_job_stats_->elapsed_micros = stats.micros;
2032 
2033   // input information
2034   compaction_job_stats_->total_input_bytes =
2035       stats.bytes_read_non_output_levels + stats.bytes_read_output_level;
2036   compaction_job_stats_->num_input_records = stats.num_input_records;
2037   compaction_job_stats_->num_input_files =
2038       stats.num_input_files_in_non_output_levels +
2039       stats.num_input_files_in_output_level;
2040   compaction_job_stats_->num_input_files_at_output_level =
2041       stats.num_input_files_in_output_level;
2042 
2043   // output information
2044   compaction_job_stats_->total_output_bytes = stats.bytes_written;
2045   compaction_job_stats_->total_output_bytes_blob = stats.bytes_written_blob;
2046   compaction_job_stats_->num_output_records = compact_->num_output_records;
2047   compaction_job_stats_->num_output_files = stats.num_output_files;
2048   compaction_job_stats_->num_output_files_blob = stats.num_output_files_blob;
2049 
2050   if (stats.num_output_files > 0) {
2051     CopyPrefix(compact_->SmallestUserKey(),
2052                CompactionJobStats::kMaxPrefixLength,
2053                &compaction_job_stats_->smallest_output_key_prefix);
2054     CopyPrefix(compact_->LargestUserKey(), CompactionJobStats::kMaxPrefixLength,
2055                &compaction_job_stats_->largest_output_key_prefix);
2056   }
2057 #else
2058   (void)stats;
2059 #endif  // !ROCKSDB_LITE
2060 }
2061 
LogCompaction()2062 void CompactionJob::LogCompaction() {
2063   Compaction* compaction = compact_->compaction;
2064   ColumnFamilyData* cfd = compaction->column_family_data();
2065 
2066   // Let's check if anything will get logged. Don't prepare all the info if
2067   // we're not logging
2068   if (db_options_.info_log_level <= InfoLogLevel::INFO_LEVEL) {
2069     Compaction::InputLevelSummaryBuffer inputs_summary;
2070     ROCKS_LOG_INFO(
2071         db_options_.info_log, "[%s] [JOB %d] Compacting %s, score %.2f",
2072         cfd->GetName().c_str(), job_id_,
2073         compaction->InputLevelSummary(&inputs_summary), compaction->score());
2074     char scratch[2345];
2075     compaction->Summary(scratch, sizeof(scratch));
2076     ROCKS_LOG_INFO(db_options_.info_log, "[%s] Compaction start summary: %s\n",
2077                    cfd->GetName().c_str(), scratch);
2078     // build event logger report
2079     auto stream = event_logger_->Log();
2080     stream << "job" << job_id_ << "event"
2081            << "compaction_started"
2082            << "compaction_reason"
2083            << GetCompactionReasonString(compaction->compaction_reason());
2084     for (size_t i = 0; i < compaction->num_input_levels(); ++i) {
2085       stream << ("files_L" + ToString(compaction->level(i)));
2086       stream.StartArray();
2087       for (auto f : *compaction->inputs(i)) {
2088         stream << f->fd.GetNumber();
2089       }
2090       stream.EndArray();
2091     }
2092     stream << "score" << compaction->score() << "input_data_size"
2093            << compaction->CalculateTotalInputSize();
2094   }
2095 }
2096 
GetTableFileName(uint64_t file_number)2097 std::string CompactionJob::GetTableFileName(uint64_t file_number) {
2098   return TableFileName(compact_->compaction->immutable_cf_options()->cf_paths,
2099                        file_number, compact_->compaction->output_path_id());
2100 }
2101 
2102 #ifndef ROCKSDB_LITE
GetTableFileName(uint64_t file_number)2103 std::string CompactionServiceCompactionJob::GetTableFileName(
2104     uint64_t file_number) {
2105   return MakeTableFileName(output_path_, file_number);
2106 }
2107 
CompactionServiceCompactionJob(int job_id,Compaction * compaction,const ImmutableDBOptions & db_options,const MutableDBOptions & mutable_db_options,const FileOptions & file_options,VersionSet * versions,const std::atomic<bool> * shutting_down,LogBuffer * log_buffer,FSDirectory * output_directory,Statistics * stats,InstrumentedMutex * db_mutex,ErrorHandler * db_error_handler,std::vector<SequenceNumber> existing_snapshots,std::shared_ptr<Cache> table_cache,EventLogger * event_logger,const std::string & dbname,const std::shared_ptr<IOTracer> & io_tracer,const std::string & db_id,const std::string & db_session_id,const std::string & output_path,const CompactionServiceInput & compaction_service_input,CompactionServiceResult * compaction_service_result)2108 CompactionServiceCompactionJob::CompactionServiceCompactionJob(
2109     int job_id, Compaction* compaction, const ImmutableDBOptions& db_options,
2110     const MutableDBOptions& mutable_db_options, const FileOptions& file_options,
2111     VersionSet* versions, const std::atomic<bool>* shutting_down,
2112     LogBuffer* log_buffer, FSDirectory* output_directory, Statistics* stats,
2113     InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler,
2114     std::vector<SequenceNumber> existing_snapshots,
2115     std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
2116     const std::string& dbname, const std::shared_ptr<IOTracer>& io_tracer,
2117     const std::string& db_id, const std::string& db_session_id,
2118     const std::string& output_path,
2119     const CompactionServiceInput& compaction_service_input,
2120     CompactionServiceResult* compaction_service_result)
2121     : CompactionJob(
2122           job_id, compaction, db_options, mutable_db_options, file_options,
2123           versions, shutting_down, 0, log_buffer, nullptr, output_directory,
2124           nullptr, stats, db_mutex, db_error_handler, existing_snapshots,
2125           kMaxSequenceNumber, nullptr, table_cache, event_logger,
2126           compaction->mutable_cf_options()->paranoid_file_checks,
2127           compaction->mutable_cf_options()->report_bg_io_stats, dbname,
2128           &(compaction_service_result->stats), Env::Priority::USER, io_tracer,
2129           nullptr, db_id, db_session_id,
2130           compaction->column_family_data()->GetFullHistoryTsLow()),
2131       output_path_(output_path),
2132       compaction_input_(compaction_service_input),
2133       compaction_result_(compaction_service_result) {}
2134 
Run()2135 Status CompactionServiceCompactionJob::Run() {
2136   AutoThreadOperationStageUpdater stage_updater(
2137       ThreadStatus::STAGE_COMPACTION_RUN);
2138 
2139   auto* c = compact_->compaction;
2140   assert(c->column_family_data() != nullptr);
2141   assert(c->column_family_data()->current()->storage_info()->NumLevelFiles(
2142              compact_->compaction->level()) > 0);
2143 
2144   write_hint_ =
2145       c->column_family_data()->CalculateSSTWriteHint(c->output_level());
2146   bottommost_level_ = c->bottommost_level();
2147 
2148   Slice begin = compaction_input_.begin;
2149   Slice end = compaction_input_.end;
2150   compact_->sub_compact_states.emplace_back(
2151       c, compaction_input_.has_begin ? &begin : nullptr,
2152       compaction_input_.has_end ? &end : nullptr,
2153       compaction_input_.approx_size);
2154 
2155   log_buffer_->FlushBufferToLog();
2156   LogCompaction();
2157   const uint64_t start_micros = db_options_.clock->NowMicros();
2158   // Pick the only sub-compaction we should have
2159   assert(compact_->sub_compact_states.size() == 1);
2160   SubcompactionState* sub_compact = compact_->sub_compact_states.data();
2161 
2162   ProcessKeyValueCompaction(sub_compact);
2163 
2164   compaction_stats_.micros = db_options_.clock->NowMicros() - start_micros;
2165   compaction_stats_.cpu_micros = sub_compact->compaction_job_stats.cpu_micros;
2166 
2167   RecordTimeToHistogram(stats_, COMPACTION_TIME, compaction_stats_.micros);
2168   RecordTimeToHistogram(stats_, COMPACTION_CPU_TIME,
2169                         compaction_stats_.cpu_micros);
2170 
2171   Status status = sub_compact->status;
2172   IOStatus io_s = sub_compact->io_status;
2173 
2174   if (io_status_.ok()) {
2175     io_status_ = io_s;
2176   }
2177 
2178   if (status.ok()) {
2179     constexpr IODebugContext* dbg = nullptr;
2180 
2181     if (output_directory_) {
2182       io_s = output_directory_->Fsync(IOOptions(), dbg);
2183     }
2184   }
2185   if (io_status_.ok()) {
2186     io_status_ = io_s;
2187   }
2188   if (status.ok()) {
2189     status = io_s;
2190   }
2191   if (status.ok()) {
2192     // TODO: Add verify_table() and VerifyCompactionFileConsistency()
2193   }
2194 
2195   // Finish up all book-keeping to unify the subcompaction results
2196   AggregateStatistics();
2197   UpdateCompactionStats();
2198 
2199   compaction_result_->bytes_written = IOSTATS(bytes_written);
2200   compaction_result_->bytes_read = IOSTATS(bytes_read);
2201   RecordCompactionIOStats();
2202 
2203   LogFlush(db_options_.info_log);
2204   compact_->status = status;
2205   compact_->status.PermitUncheckedError();
2206 
2207   // Build compaction result
2208   compaction_result_->output_level = compact_->compaction->output_level();
2209   compaction_result_->output_path = output_path_;
2210   for (const auto& output_file : sub_compact->outputs) {
2211     auto& meta = output_file.meta;
2212     compaction_result_->output_files.emplace_back(
2213         MakeTableFileName(meta.fd.GetNumber()), meta.fd.smallest_seqno,
2214         meta.fd.largest_seqno, meta.smallest.Encode().ToString(),
2215         meta.largest.Encode().ToString(), meta.oldest_ancester_time,
2216         meta.file_creation_time, output_file.validator.GetHash(),
2217         meta.marked_for_compaction);
2218   }
2219   compaction_result_->num_output_records = sub_compact->num_output_records;
2220   compaction_result_->total_bytes = sub_compact->total_bytes;
2221 
2222   return status;
2223 }
2224 
CleanupCompaction()2225 void CompactionServiceCompactionJob::CleanupCompaction() {
2226   CompactionJob::CleanupCompaction();
2227 }
2228 
2229 // Internal binary format for the input and result data
2230 enum BinaryFormatVersion : uint32_t {
2231   kOptionsString = 1,  // Use string format similar to Option string format
2232 };
2233 
2234 // offset_of is used to get the offset of a class data member
2235 // ex: offset_of(&ColumnFamilyDescriptor::options)
2236 // This call will return the offset of options in ColumnFamilyDescriptor class
2237 //
2238 // This is the same as offsetof() but allow us to work with non standard-layout
2239 // classes and structures
2240 // refs:
2241 // http://en.cppreference.com/w/cpp/concept/StandardLayoutType
2242 // https://gist.github.com/graphitemaster/494f21190bb2c63c5516
2243 static ColumnFamilyDescriptor dummy_cfd("", ColumnFamilyOptions());
2244 template <typename T1>
offset_of(T1 ColumnFamilyDescriptor::* member)2245 int offset_of(T1 ColumnFamilyDescriptor::*member) {
2246   return int(size_t(&(dummy_cfd.*member)) - size_t(&dummy_cfd));
2247 }
2248 
2249 static CompactionServiceInput dummy_cs_input;
2250 template <typename T1>
offset_of(T1 CompactionServiceInput::* member)2251 int offset_of(T1 CompactionServiceInput::*member) {
2252   return int(size_t(&(dummy_cs_input.*member)) - size_t(&dummy_cs_input));
2253 }
2254 
2255 static std::unordered_map<std::string, OptionTypeInfo> cfd_type_info = {
2256     {"name",
2257      {offset_of(&ColumnFamilyDescriptor::name), OptionType::kEncodedString,
2258       OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
2259     {"options",
2260      {offset_of(&ColumnFamilyDescriptor::options), OptionType::kConfigurable,
2261       OptionVerificationType::kNormal, OptionTypeFlags::kNone,
2262       [](const ConfigOptions& opts, const std::string& /*name*/,
__anond592dbd80502() 2263          const std::string& value, void* addr) {
2264         auto cf_options = static_cast<ColumnFamilyOptions*>(addr);
2265         return GetColumnFamilyOptionsFromString(opts, ColumnFamilyOptions(),
2266                                                 value, cf_options);
2267       },
2268       [](const ConfigOptions& opts, const std::string& /*name*/,
__anond592dbd80602() 2269          const void* addr, std::string* value) {
2270         const auto cf_options = static_cast<const ColumnFamilyOptions*>(addr);
2271         std::string result;
2272         auto status =
2273             GetStringFromColumnFamilyOptions(opts, *cf_options, &result);
2274         *value = "{" + result + "}";
2275         return status;
2276       },
2277       [](const ConfigOptions& opts, const std::string& name, const void* addr1,
__anond592dbd80702() 2278          const void* addr2, std::string* mismatch) {
2279         const auto this_one = static_cast<const ColumnFamilyOptions*>(addr1);
2280         const auto that_one = static_cast<const ColumnFamilyOptions*>(addr2);
2281         auto this_conf = CFOptionsAsConfigurable(*this_one);
2282         auto that_conf = CFOptionsAsConfigurable(*that_one);
2283         std::string mismatch_opt;
2284         bool result =
2285             this_conf->AreEquivalent(opts, that_conf.get(), &mismatch_opt);
2286         if (!result) {
2287           *mismatch = name + "." + mismatch_opt;
2288         }
2289         return result;
2290       }}},
2291 };
2292 
2293 static std::unordered_map<std::string, OptionTypeInfo> cs_input_type_info = {
2294     {"column_family",
2295      OptionTypeInfo::Struct("column_family", &cfd_type_info,
2296                             offset_of(&CompactionServiceInput::column_family),
2297                             OptionVerificationType::kNormal,
2298                             OptionTypeFlags::kNone)},
2299     {"db_options",
2300      {offset_of(&CompactionServiceInput::db_options), OptionType::kConfigurable,
2301       OptionVerificationType::kNormal, OptionTypeFlags::kNone,
2302       [](const ConfigOptions& opts, const std::string& /*name*/,
__anond592dbd80802() 2303          const std::string& value, void* addr) {
2304         auto options = static_cast<DBOptions*>(addr);
2305         return GetDBOptionsFromString(opts, DBOptions(), value, options);
2306       },
2307       [](const ConfigOptions& opts, const std::string& /*name*/,
__anond592dbd80902() 2308          const void* addr, std::string* value) {
2309         const auto options = static_cast<const DBOptions*>(addr);
2310         std::string result;
2311         auto status = GetStringFromDBOptions(opts, *options, &result);
2312         *value = "{" + result + "}";
2313         return status;
2314       },
2315       [](const ConfigOptions& opts, const std::string& name, const void* addr1,
__anond592dbd80a02() 2316          const void* addr2, std::string* mismatch) {
2317         const auto this_one = static_cast<const DBOptions*>(addr1);
2318         const auto that_one = static_cast<const DBOptions*>(addr2);
2319         auto this_conf = DBOptionsAsConfigurable(*this_one);
2320         auto that_conf = DBOptionsAsConfigurable(*that_one);
2321         std::string mismatch_opt;
2322         bool result =
2323             this_conf->AreEquivalent(opts, that_conf.get(), &mismatch_opt);
2324         if (!result) {
2325           *mismatch = name + "." + mismatch_opt;
2326         }
2327         return result;
2328       }}},
2329     {"snapshots", OptionTypeInfo::Vector<uint64_t>(
2330                       offset_of(&CompactionServiceInput::snapshots),
2331                       OptionVerificationType::kNormal, OptionTypeFlags::kNone,
2332                       {0, OptionType::kUInt64T})},
2333     {"input_files", OptionTypeInfo::Vector<std::string>(
2334                         offset_of(&CompactionServiceInput::input_files),
2335                         OptionVerificationType::kNormal, OptionTypeFlags::kNone,
2336                         {0, OptionType::kEncodedString})},
2337     {"output_level",
2338      {offset_of(&CompactionServiceInput::output_level), OptionType::kInt,
2339       OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
2340     {"has_begin",
2341      {offset_of(&CompactionServiceInput::has_begin), OptionType::kBoolean,
2342       OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
2343     {"begin",
2344      {offset_of(&CompactionServiceInput::begin), OptionType::kEncodedString,
2345       OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
2346     {"has_end",
2347      {offset_of(&CompactionServiceInput::has_end), OptionType::kBoolean,
2348       OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
2349     {"end",
2350      {offset_of(&CompactionServiceInput::end), OptionType::kEncodedString,
2351       OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
2352     {"approx_size",
2353      {offset_of(&CompactionServiceInput::approx_size), OptionType::kUInt64T,
2354       OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
2355 };
2356 
2357 static std::unordered_map<std::string, OptionTypeInfo>
2358     cs_output_file_type_info = {
2359         {"file_name",
2360          {offsetof(struct CompactionServiceOutputFile, file_name),
2361           OptionType::kEncodedString, OptionVerificationType::kNormal,
2362           OptionTypeFlags::kNone}},
2363         {"smallest_seqno",
2364          {offsetof(struct CompactionServiceOutputFile, smallest_seqno),
2365           OptionType::kUInt64T, OptionVerificationType::kNormal,
2366           OptionTypeFlags::kNone}},
2367         {"largest_seqno",
2368          {offsetof(struct CompactionServiceOutputFile, largest_seqno),
2369           OptionType::kUInt64T, OptionVerificationType::kNormal,
2370           OptionTypeFlags::kNone}},
2371         {"smallest_internal_key",
2372          {offsetof(struct CompactionServiceOutputFile, smallest_internal_key),
2373           OptionType::kEncodedString, OptionVerificationType::kNormal,
2374           OptionTypeFlags::kNone}},
2375         {"largest_internal_key",
2376          {offsetof(struct CompactionServiceOutputFile, largest_internal_key),
2377           OptionType::kEncodedString, OptionVerificationType::kNormal,
2378           OptionTypeFlags::kNone}},
2379         {"oldest_ancester_time",
2380          {offsetof(struct CompactionServiceOutputFile, oldest_ancester_time),
2381           OptionType::kUInt64T, OptionVerificationType::kNormal,
2382           OptionTypeFlags::kNone}},
2383         {"file_creation_time",
2384          {offsetof(struct CompactionServiceOutputFile, file_creation_time),
2385           OptionType::kUInt64T, OptionVerificationType::kNormal,
2386           OptionTypeFlags::kNone}},
2387         {"paranoid_hash",
2388          {offsetof(struct CompactionServiceOutputFile, paranoid_hash),
2389           OptionType::kUInt64T, OptionVerificationType::kNormal,
2390           OptionTypeFlags::kNone}},
2391         {"marked_for_compaction",
2392          {offsetof(struct CompactionServiceOutputFile, marked_for_compaction),
2393           OptionType::kBoolean, OptionVerificationType::kNormal,
2394           OptionTypeFlags::kNone}},
2395 };
2396 
2397 static std::unordered_map<std::string, OptionTypeInfo>
2398     compaction_job_stats_type_info = {
2399         {"elapsed_micros",
2400          {offsetof(struct CompactionJobStats, elapsed_micros),
2401           OptionType::kUInt64T, OptionVerificationType::kNormal,
2402           OptionTypeFlags::kNone}},
2403         {"cpu_micros",
2404          {offsetof(struct CompactionJobStats, cpu_micros), OptionType::kUInt64T,
2405           OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
2406         {"num_input_records",
2407          {offsetof(struct CompactionJobStats, num_input_records),
2408           OptionType::kUInt64T, OptionVerificationType::kNormal,
2409           OptionTypeFlags::kNone}},
2410         {"num_blobs_read",
2411          {offsetof(struct CompactionJobStats, num_blobs_read),
2412           OptionType::kUInt64T, OptionVerificationType::kNormal,
2413           OptionTypeFlags::kNone}},
2414         {"num_input_files",
2415          {offsetof(struct CompactionJobStats, num_input_files),
2416           OptionType::kSizeT, OptionVerificationType::kNormal,
2417           OptionTypeFlags::kNone}},
2418         {"num_input_files_at_output_level",
2419          {offsetof(struct CompactionJobStats, num_input_files_at_output_level),
2420           OptionType::kSizeT, OptionVerificationType::kNormal,
2421           OptionTypeFlags::kNone}},
2422         {"num_output_records",
2423          {offsetof(struct CompactionJobStats, num_output_records),
2424           OptionType::kUInt64T, OptionVerificationType::kNormal,
2425           OptionTypeFlags::kNone}},
2426         {"num_output_files",
2427          {offsetof(struct CompactionJobStats, num_output_files),
2428           OptionType::kSizeT, OptionVerificationType::kNormal,
2429           OptionTypeFlags::kNone}},
2430         {"num_output_files_blob",
2431          {offsetof(struct CompactionJobStats, num_output_files_blob),
2432           OptionType::kSizeT, OptionVerificationType::kNormal,
2433           OptionTypeFlags::kNone}},
2434         {"is_full_compaction",
2435          {offsetof(struct CompactionJobStats, is_full_compaction),
2436           OptionType::kBoolean, OptionVerificationType::kNormal,
2437           OptionTypeFlags::kNone}},
2438         {"is_manual_compaction",
2439          {offsetof(struct CompactionJobStats, is_manual_compaction),
2440           OptionType::kBoolean, OptionVerificationType::kNormal,
2441           OptionTypeFlags::kNone}},
2442         {"total_input_bytes",
2443          {offsetof(struct CompactionJobStats, total_input_bytes),
2444           OptionType::kUInt64T, OptionVerificationType::kNormal,
2445           OptionTypeFlags::kNone}},
2446         {"total_blob_bytes_read",
2447          {offsetof(struct CompactionJobStats, total_blob_bytes_read),
2448           OptionType::kUInt64T, OptionVerificationType::kNormal,
2449           OptionTypeFlags::kNone}},
2450         {"total_output_bytes",
2451          {offsetof(struct CompactionJobStats, total_output_bytes),
2452           OptionType::kUInt64T, OptionVerificationType::kNormal,
2453           OptionTypeFlags::kNone}},
2454         {"total_output_bytes_blob",
2455          {offsetof(struct CompactionJobStats, total_output_bytes_blob),
2456           OptionType::kUInt64T, OptionVerificationType::kNormal,
2457           OptionTypeFlags::kNone}},
2458         {"num_records_replaced",
2459          {offsetof(struct CompactionJobStats, num_records_replaced),
2460           OptionType::kUInt64T, OptionVerificationType::kNormal,
2461           OptionTypeFlags::kNone}},
2462         {"total_input_raw_key_bytes",
2463          {offsetof(struct CompactionJobStats, total_input_raw_key_bytes),
2464           OptionType::kUInt64T, OptionVerificationType::kNormal,
2465           OptionTypeFlags::kNone}},
2466         {"total_input_raw_value_bytes",
2467          {offsetof(struct CompactionJobStats, total_input_raw_value_bytes),
2468           OptionType::kUInt64T, OptionVerificationType::kNormal,
2469           OptionTypeFlags::kNone}},
2470         {"num_input_deletion_records",
2471          {offsetof(struct CompactionJobStats, num_input_deletion_records),
2472           OptionType::kUInt64T, OptionVerificationType::kNormal,
2473           OptionTypeFlags::kNone}},
2474         {"num_expired_deletion_records",
2475          {offsetof(struct CompactionJobStats, num_expired_deletion_records),
2476           OptionType::kUInt64T, OptionVerificationType::kNormal,
2477           OptionTypeFlags::kNone}},
2478         {"num_corrupt_keys",
2479          {offsetof(struct CompactionJobStats, num_corrupt_keys),
2480           OptionType::kUInt64T, OptionVerificationType::kNormal,
2481           OptionTypeFlags::kNone}},
2482         {"file_write_nanos",
2483          {offsetof(struct CompactionJobStats, file_write_nanos),
2484           OptionType::kUInt64T, OptionVerificationType::kNormal,
2485           OptionTypeFlags::kNone}},
2486         {"file_range_sync_nanos",
2487          {offsetof(struct CompactionJobStats, file_range_sync_nanos),
2488           OptionType::kUInt64T, OptionVerificationType::kNormal,
2489           OptionTypeFlags::kNone}},
2490         {"file_fsync_nanos",
2491          {offsetof(struct CompactionJobStats, file_fsync_nanos),
2492           OptionType::kUInt64T, OptionVerificationType::kNormal,
2493           OptionTypeFlags::kNone}},
2494         {"file_prepare_write_nanos",
2495          {offsetof(struct CompactionJobStats, file_prepare_write_nanos),
2496           OptionType::kUInt64T, OptionVerificationType::kNormal,
2497           OptionTypeFlags::kNone}},
2498         {"smallest_output_key_prefix",
2499          {offsetof(struct CompactionJobStats, smallest_output_key_prefix),
2500           OptionType::kEncodedString, OptionVerificationType::kNormal,
2501           OptionTypeFlags::kNone}},
2502         {"largest_output_key_prefix",
2503          {offsetof(struct CompactionJobStats, largest_output_key_prefix),
2504           OptionType::kEncodedString, OptionVerificationType::kNormal,
2505           OptionTypeFlags::kNone}},
2506         {"num_single_del_fallthru",
2507          {offsetof(struct CompactionJobStats, num_single_del_fallthru),
2508           OptionType::kUInt64T, OptionVerificationType::kNormal,
2509           OptionTypeFlags::kNone}},
2510         {"num_single_del_mismatch",
2511          {offsetof(struct CompactionJobStats, num_single_del_mismatch),
2512           OptionType::kUInt64T, OptionVerificationType::kNormal,
2513           OptionTypeFlags::kNone}},
2514 };
2515 
2516 namespace {
2517 // this is a helper struct to serialize and deserialize class Status, because
2518 // Status's members are not public.
2519 struct StatusSerializationAdapter {
2520   uint8_t code;
2521   uint8_t subcode;
2522   uint8_t severity;
2523   std::string message;
2524 
StatusSerializationAdapterROCKSDB_NAMESPACE::__anond592dbd80b11::StatusSerializationAdapter2525   StatusSerializationAdapter() {}
StatusSerializationAdapterROCKSDB_NAMESPACE::__anond592dbd80b11::StatusSerializationAdapter2526   explicit StatusSerializationAdapter(const Status& s) {
2527     code = s.code();
2528     subcode = s.subcode();
2529     severity = s.severity();
2530     auto msg = s.getState();
2531     message = msg ? msg : "";
2532   }
2533 
GetStatusROCKSDB_NAMESPACE::__anond592dbd80b11::StatusSerializationAdapter2534   Status GetStatus() {
2535     return Status(static_cast<Status::Code>(code),
2536                   static_cast<Status::SubCode>(subcode),
2537                   static_cast<Status::Severity>(severity), message);
2538   }
2539 };
2540 }  // namespace
2541 
2542 static std::unordered_map<std::string, OptionTypeInfo>
2543     status_adapter_type_info = {
2544         {"code",
2545          {offsetof(struct StatusSerializationAdapter, code),
2546           OptionType::kUInt8T, OptionVerificationType::kNormal,
2547           OptionTypeFlags::kNone}},
2548         {"subcode",
2549          {offsetof(struct StatusSerializationAdapter, subcode),
2550           OptionType::kUInt8T, OptionVerificationType::kNormal,
2551           OptionTypeFlags::kNone}},
2552         {"severity",
2553          {offsetof(struct StatusSerializationAdapter, severity),
2554           OptionType::kUInt8T, OptionVerificationType::kNormal,
2555           OptionTypeFlags::kNone}},
2556         {"message",
2557          {offsetof(struct StatusSerializationAdapter, message),
2558           OptionType::kEncodedString, OptionVerificationType::kNormal,
2559           OptionTypeFlags::kNone}},
2560 };
2561 
2562 static std::unordered_map<std::string, OptionTypeInfo> cs_result_type_info = {
2563     {"status",
2564      {offsetof(struct CompactionServiceResult, status),
2565       OptionType::kCustomizable, OptionVerificationType::kNormal,
2566       OptionTypeFlags::kNone,
2567       [](const ConfigOptions& opts, const std::string& /*name*/,
__anond592dbd80c02() 2568          const std::string& value, void* addr) {
2569         auto status_obj = static_cast<Status*>(addr);
2570         StatusSerializationAdapter adapter;
2571         Status s = OptionTypeInfo::ParseType(
2572             opts, value, status_adapter_type_info, &adapter);
2573         *status_obj = adapter.GetStatus();
2574         return s;
2575       },
2576       [](const ConfigOptions& opts, const std::string& /*name*/,
__anond592dbd80d02() 2577          const void* addr, std::string* value) {
2578         const auto status_obj = static_cast<const Status*>(addr);
2579         StatusSerializationAdapter adapter(*status_obj);
2580         std::string result;
2581         Status s = OptionTypeInfo::SerializeType(opts, status_adapter_type_info,
2582                                                  &adapter, &result);
2583         *value = "{" + result + "}";
2584         return s;
2585       },
2586       [](const ConfigOptions& opts, const std::string& /*name*/,
__anond592dbd80e02() 2587          const void* addr1, const void* addr2, std::string* mismatch) {
2588         const auto status1 = static_cast<const Status*>(addr1);
2589         const auto status2 = static_cast<const Status*>(addr2);
2590         StatusSerializationAdapter adatper1(*status1);
2591         StatusSerializationAdapter adapter2(*status2);
2592         return OptionTypeInfo::TypesAreEqual(opts, status_adapter_type_info,
2593                                              &adatper1, &adapter2, mismatch);
2594       }}},
2595     {"output_files",
2596      OptionTypeInfo::Vector<CompactionServiceOutputFile>(
2597          offsetof(struct CompactionServiceResult, output_files),
2598          OptionVerificationType::kNormal, OptionTypeFlags::kNone,
2599          OptionTypeInfo::Struct("output_files", &cs_output_file_type_info, 0,
2600                                 OptionVerificationType::kNormal,
2601                                 OptionTypeFlags::kNone))},
2602     {"output_level",
2603      {offsetof(struct CompactionServiceResult, output_level), OptionType::kInt,
2604       OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
2605     {"output_path",
2606      {offsetof(struct CompactionServiceResult, output_path),
2607       OptionType::kEncodedString, OptionVerificationType::kNormal,
2608       OptionTypeFlags::kNone}},
2609     {"num_output_records",
2610      {offsetof(struct CompactionServiceResult, num_output_records),
2611       OptionType::kUInt64T, OptionVerificationType::kNormal,
2612       OptionTypeFlags::kNone}},
2613     {"total_bytes",
2614      {offsetof(struct CompactionServiceResult, total_bytes),
2615       OptionType::kUInt64T, OptionVerificationType::kNormal,
2616       OptionTypeFlags::kNone}},
2617     {"bytes_read",
2618      {offsetof(struct CompactionServiceResult, bytes_read),
2619       OptionType::kUInt64T, OptionVerificationType::kNormal,
2620       OptionTypeFlags::kNone}},
2621     {"bytes_written",
2622      {offsetof(struct CompactionServiceResult, bytes_written),
2623       OptionType::kUInt64T, OptionVerificationType::kNormal,
2624       OptionTypeFlags::kNone}},
2625     {"stats", OptionTypeInfo::Struct(
2626                   "stats", &compaction_job_stats_type_info,
2627                   offsetof(struct CompactionServiceResult, stats),
2628                   OptionVerificationType::kNormal, OptionTypeFlags::kNone)},
2629 };
2630 
Read(const std::string & data_str,CompactionServiceInput * obj)2631 Status CompactionServiceInput::Read(const std::string& data_str,
2632                                     CompactionServiceInput* obj) {
2633   if (data_str.size() <= sizeof(BinaryFormatVersion)) {
2634     return Status::InvalidArgument("Invalid CompactionServiceInput string");
2635   }
2636   auto format_version = DecodeFixed32(data_str.data());
2637   if (format_version == kOptionsString) {
2638     ConfigOptions cf;
2639     cf.invoke_prepare_options = false;
2640     cf.ignore_unknown_options = true;
2641     return OptionTypeInfo::ParseType(
2642         cf, data_str.substr(sizeof(BinaryFormatVersion)), cs_input_type_info,
2643         obj);
2644   } else {
2645     return Status::NotSupported(
2646         "Compaction Service Input data version not supported: " +
2647         ToString(format_version));
2648   }
2649 }
2650 
Write(std::string * output)2651 Status CompactionServiceInput::Write(std::string* output) {
2652   char buf[sizeof(BinaryFormatVersion)];
2653   EncodeFixed32(buf, kOptionsString);
2654   output->append(buf, sizeof(BinaryFormatVersion));
2655   ConfigOptions cf;
2656   cf.invoke_prepare_options = false;
2657   return OptionTypeInfo::SerializeType(cf, cs_input_type_info, this, output);
2658 }
2659 
Read(const std::string & data_str,CompactionServiceResult * obj)2660 Status CompactionServiceResult::Read(const std::string& data_str,
2661                                      CompactionServiceResult* obj) {
2662   if (data_str.size() <= sizeof(BinaryFormatVersion)) {
2663     return Status::InvalidArgument("Invalid CompactionServiceResult string");
2664   }
2665   auto format_version = DecodeFixed32(data_str.data());
2666   if (format_version == kOptionsString) {
2667     ConfigOptions cf;
2668     cf.invoke_prepare_options = false;
2669     cf.ignore_unknown_options = true;
2670     return OptionTypeInfo::ParseType(
2671         cf, data_str.substr(sizeof(BinaryFormatVersion)), cs_result_type_info,
2672         obj);
2673   } else {
2674     return Status::NotSupported(
2675         "Compaction Service Result data version not supported: " +
2676         ToString(format_version));
2677   }
2678 }
2679 
Write(std::string * output)2680 Status CompactionServiceResult::Write(std::string* output) {
2681   char buf[sizeof(BinaryFormatVersion)];
2682   EncodeFixed32(buf, kOptionsString);
2683   output->append(buf, sizeof(BinaryFormatVersion));
2684   ConfigOptions cf;
2685   cf.invoke_prepare_options = false;
2686   return OptionTypeInfo::SerializeType(cf, cs_result_type_info, this, output);
2687 }
2688 
2689 #ifndef NDEBUG
TEST_Equals(CompactionServiceResult * other)2690 bool CompactionServiceResult::TEST_Equals(CompactionServiceResult* other) {
2691   std::string mismatch;
2692   return TEST_Equals(other, &mismatch);
2693 }
2694 
TEST_Equals(CompactionServiceResult * other,std::string * mismatch)2695 bool CompactionServiceResult::TEST_Equals(CompactionServiceResult* other,
2696                                           std::string* mismatch) {
2697   ConfigOptions cf;
2698   cf.invoke_prepare_options = false;
2699   return OptionTypeInfo::TypesAreEqual(cf, cs_result_type_info, this, other,
2700                                        mismatch);
2701 }
2702 
TEST_Equals(CompactionServiceInput * other)2703 bool CompactionServiceInput::TEST_Equals(CompactionServiceInput* other) {
2704   std::string mismatch;
2705   return TEST_Equals(other, &mismatch);
2706 }
2707 
TEST_Equals(CompactionServiceInput * other,std::string * mismatch)2708 bool CompactionServiceInput::TEST_Equals(CompactionServiceInput* other,
2709                                          std::string* mismatch) {
2710   ConfigOptions cf;
2711   cf.invoke_prepare_options = false;
2712   return OptionTypeInfo::TypesAreEqual(cf, cs_input_type_info, this, other,
2713                                        mismatch);
2714 }
2715 #endif  // NDEBUG
2716 #endif  // !ROCKSDB_LITE
2717 
2718 }  // namespace ROCKSDB_NAMESPACE
2719