1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "db/compaction/compaction_job.h"
11
12 #include <algorithm>
13 #include <cinttypes>
14 #include <functional>
15 #include <list>
16 #include <memory>
17 #include <random>
18 #include <set>
19 #include <thread>
20 #include <utility>
21 #include <vector>
22
23 #include "db/blob/blob_file_addition.h"
24 #include "db/blob/blob_file_builder.h"
25 #include "db/builder.h"
26 #include "db/db_impl/db_impl.h"
27 #include "db/db_iter.h"
28 #include "db/dbformat.h"
29 #include "db/error_handler.h"
30 #include "db/event_helpers.h"
31 #include "db/log_reader.h"
32 #include "db/log_writer.h"
33 #include "db/memtable.h"
34 #include "db/memtable_list.h"
35 #include "db/merge_context.h"
36 #include "db/merge_helper.h"
37 #include "db/output_validator.h"
38 #include "db/range_del_aggregator.h"
39 #include "db/version_set.h"
40 #include "file/filename.h"
41 #include "file/read_write_util.h"
42 #include "file/sst_file_manager_impl.h"
43 #include "file/writable_file_writer.h"
44 #include "logging/log_buffer.h"
45 #include "logging/logging.h"
46 #include "monitoring/iostats_context_imp.h"
47 #include "monitoring/perf_context_imp.h"
48 #include "monitoring/thread_status_util.h"
49 #include "options/configurable_helper.h"
50 #include "options/options_helper.h"
51 #include "port/port.h"
52 #include "rocksdb/db.h"
53 #include "rocksdb/env.h"
54 #include "rocksdb/sst_partitioner.h"
55 #include "rocksdb/statistics.h"
56 #include "rocksdb/status.h"
57 #include "rocksdb/table.h"
58 #include "rocksdb/utilities/options_type.h"
59 #include "table/block_based/block.h"
60 #include "table/block_based/block_based_table_factory.h"
61 #include "table/merging_iterator.h"
62 #include "table/table_builder.h"
63 #include "test_util/sync_point.h"
64 #include "util/coding.h"
65 #include "util/hash.h"
66 #include "util/mutexlock.h"
67 #include "util/random.h"
68 #include "util/stop_watch.h"
69 #include "util/string_util.h"
70
71 namespace ROCKSDB_NAMESPACE {
72
GetCompactionReasonString(CompactionReason compaction_reason)73 const char* GetCompactionReasonString(CompactionReason compaction_reason) {
74 switch (compaction_reason) {
75 case CompactionReason::kUnknown:
76 return "Unknown";
77 case CompactionReason::kLevelL0FilesNum:
78 return "LevelL0FilesNum";
79 case CompactionReason::kLevelMaxLevelSize:
80 return "LevelMaxLevelSize";
81 case CompactionReason::kUniversalSizeAmplification:
82 return "UniversalSizeAmplification";
83 case CompactionReason::kUniversalSizeRatio:
84 return "UniversalSizeRatio";
85 case CompactionReason::kUniversalSortedRunNum:
86 return "UniversalSortedRunNum";
87 case CompactionReason::kFIFOMaxSize:
88 return "FIFOMaxSize";
89 case CompactionReason::kFIFOReduceNumFiles:
90 return "FIFOReduceNumFiles";
91 case CompactionReason::kFIFOTtl:
92 return "FIFOTtl";
93 case CompactionReason::kManualCompaction:
94 return "ManualCompaction";
95 case CompactionReason::kFilesMarkedForCompaction:
96 return "FilesMarkedForCompaction";
97 case CompactionReason::kBottommostFiles:
98 return "BottommostFiles";
99 case CompactionReason::kTtl:
100 return "Ttl";
101 case CompactionReason::kFlush:
102 return "Flush";
103 case CompactionReason::kExternalSstIngestion:
104 return "ExternalSstIngestion";
105 case CompactionReason::kPeriodicCompaction:
106 return "PeriodicCompaction";
107 case CompactionReason::kNumOfReasons:
108 // fall through
109 default:
110 assert(false);
111 return "Invalid";
112 }
113 }
114
115 // Maintains state for each sub-compaction
116 struct CompactionJob::SubcompactionState {
117 const Compaction* compaction;
118 std::unique_ptr<CompactionIterator> c_iter;
119
120 // The boundaries of the key-range this compaction is interested in. No two
121 // subcompactions may have overlapping key-ranges.
122 // 'start' is inclusive, 'end' is exclusive, and nullptr means unbounded
123 Slice *start, *end;
124
125 // The return status of this subcompaction
126 Status status;
127
128 // The return IO Status of this subcompaction
129 IOStatus io_status;
130
131 // Files produced by this subcompaction
132 struct Output {
OutputROCKSDB_NAMESPACE::CompactionJob::SubcompactionState::Output133 Output(FileMetaData&& _meta, const InternalKeyComparator& _icmp,
134 bool _enable_order_check, bool _enable_hash, bool _finished = false,
135 uint64_t precalculated_hash = 0)
136 : meta(std::move(_meta)),
137 validator(_icmp, _enable_order_check, _enable_hash,
138 precalculated_hash),
139 finished(_finished) {}
140 FileMetaData meta;
141 OutputValidator validator;
142 bool finished;
143 std::shared_ptr<const TableProperties> table_properties;
144 };
145
146 // State kept for output being generated
147 std::vector<Output> outputs;
148 std::vector<BlobFileAddition> blob_file_additions;
149 std::unique_ptr<WritableFileWriter> outfile;
150 std::unique_ptr<TableBuilder> builder;
151
current_outputROCKSDB_NAMESPACE::CompactionJob::SubcompactionState152 Output* current_output() {
153 if (outputs.empty()) {
154 // This subcompaction's output could be empty if compaction was aborted
155 // before this subcompaction had a chance to generate any output files.
156 // When subcompactions are executed sequentially this is more likely and
157 // will be particularly likely for the later subcompactions to be empty.
158 // Once they are run in parallel however it should be much rarer.
159 return nullptr;
160 } else {
161 return &outputs.back();
162 }
163 }
164
165 uint64_t current_output_file_size = 0;
166
167 // State during the subcompaction
168 uint64_t total_bytes = 0;
169 uint64_t num_output_records = 0;
170 CompactionJobStats compaction_job_stats;
171 uint64_t approx_size = 0;
172 // An index that used to speed up ShouldStopBefore().
173 size_t grandparent_index = 0;
174 // The number of bytes overlapping between the current output and
175 // grandparent files used in ShouldStopBefore().
176 uint64_t overlapped_bytes = 0;
177 // A flag determine whether the key has been seen in ShouldStopBefore()
178 bool seen_key = false;
179
SubcompactionStateROCKSDB_NAMESPACE::CompactionJob::SubcompactionState180 SubcompactionState(Compaction* c, Slice* _start, Slice* _end, uint64_t size)
181 : compaction(c), start(_start), end(_end), approx_size(size) {
182 assert(compaction != nullptr);
183 }
184
185 // Adds the key and value to the builder
186 // If paranoid is true, adds the key-value to the paranoid hash
AddToBuilderROCKSDB_NAMESPACE::CompactionJob::SubcompactionState187 Status AddToBuilder(const Slice& key, const Slice& value) {
188 auto curr = current_output();
189 assert(builder != nullptr);
190 assert(curr != nullptr);
191 Status s = curr->validator.Add(key, value);
192 if (!s.ok()) {
193 return s;
194 }
195 builder->Add(key, value);
196 return Status::OK();
197 }
198
199 // Returns true iff we should stop building the current output
200 // before processing "internal_key".
ShouldStopBeforeROCKSDB_NAMESPACE::CompactionJob::SubcompactionState201 bool ShouldStopBefore(const Slice& internal_key, uint64_t curr_file_size) {
202 const InternalKeyComparator* icmp =
203 &compaction->column_family_data()->internal_comparator();
204 const std::vector<FileMetaData*>& grandparents = compaction->grandparents();
205
206 // Scan to find earliest grandparent file that contains key.
207 while (grandparent_index < grandparents.size() &&
208 icmp->Compare(internal_key,
209 grandparents[grandparent_index]->largest.Encode()) >
210 0) {
211 if (seen_key) {
212 overlapped_bytes += grandparents[grandparent_index]->fd.GetFileSize();
213 }
214 assert(grandparent_index + 1 >= grandparents.size() ||
215 icmp->Compare(
216 grandparents[grandparent_index]->largest.Encode(),
217 grandparents[grandparent_index + 1]->smallest.Encode()) <= 0);
218 grandparent_index++;
219 }
220 seen_key = true;
221
222 if (overlapped_bytes + curr_file_size >
223 compaction->max_compaction_bytes()) {
224 // Too much overlap for current output; start new output
225 overlapped_bytes = 0;
226 return true;
227 }
228
229 return false;
230 }
231 };
232
233 // Maintains state for the entire compaction
234 struct CompactionJob::CompactionState {
235 Compaction* const compaction;
236
237 // REQUIRED: subcompaction states are stored in order of increasing
238 // key-range
239 std::vector<CompactionJob::SubcompactionState> sub_compact_states;
240 Status status;
241
242 size_t num_output_files = 0;
243 uint64_t total_bytes = 0;
244 size_t num_blob_output_files = 0;
245 uint64_t total_blob_bytes = 0;
246 uint64_t num_output_records = 0;
247
CompactionStateROCKSDB_NAMESPACE::CompactionJob::CompactionState248 explicit CompactionState(Compaction* c) : compaction(c) {}
249
SmallestUserKeyROCKSDB_NAMESPACE::CompactionJob::CompactionState250 Slice SmallestUserKey() {
251 for (const auto& sub_compact_state : sub_compact_states) {
252 if (!sub_compact_state.outputs.empty() &&
253 sub_compact_state.outputs[0].finished) {
254 return sub_compact_state.outputs[0].meta.smallest.user_key();
255 }
256 }
257 // If there is no finished output, return an empty slice.
258 return Slice(nullptr, 0);
259 }
260
LargestUserKeyROCKSDB_NAMESPACE::CompactionJob::CompactionState261 Slice LargestUserKey() {
262 for (auto it = sub_compact_states.rbegin(); it < sub_compact_states.rend();
263 ++it) {
264 if (!it->outputs.empty() && it->current_output()->finished) {
265 assert(it->current_output() != nullptr);
266 return it->current_output()->meta.largest.user_key();
267 }
268 }
269 // If there is no finished output, return an empty slice.
270 return Slice(nullptr, 0);
271 }
272 };
273
AggregateStatistics()274 void CompactionJob::AggregateStatistics() {
275 assert(compact_);
276
277 for (SubcompactionState& sc : compact_->sub_compact_states) {
278 auto& outputs = sc.outputs;
279
280 if (!outputs.empty() && !outputs.back().meta.fd.file_size) {
281 // An error occurred, so ignore the last output.
282 outputs.pop_back();
283 }
284
285 compact_->num_output_files += outputs.size();
286 compact_->total_bytes += sc.total_bytes;
287
288 const auto& blobs = sc.blob_file_additions;
289
290 compact_->num_blob_output_files += blobs.size();
291
292 for (const auto& blob : blobs) {
293 compact_->total_blob_bytes += blob.GetTotalBlobBytes();
294 }
295
296 compact_->num_output_records += sc.num_output_records;
297
298 compaction_job_stats_->Add(sc.compaction_job_stats);
299 }
300 }
301
CompactionJob(int job_id,Compaction * compaction,const ImmutableDBOptions & db_options,const MutableDBOptions & mutable_db_options,const FileOptions & file_options,VersionSet * versions,const std::atomic<bool> * shutting_down,const SequenceNumber preserve_deletes_seqnum,LogBuffer * log_buffer,FSDirectory * db_directory,FSDirectory * output_directory,FSDirectory * blob_output_directory,Statistics * stats,InstrumentedMutex * db_mutex,ErrorHandler * db_error_handler,std::vector<SequenceNumber> existing_snapshots,SequenceNumber earliest_write_conflict_snapshot,const SnapshotChecker * snapshot_checker,std::shared_ptr<Cache> table_cache,EventLogger * event_logger,bool paranoid_file_checks,bool measure_io_stats,const std::string & dbname,CompactionJobStats * compaction_job_stats,Env::Priority thread_pri,const std::shared_ptr<IOTracer> & io_tracer,const std::atomic<int> * manual_compaction_paused,const std::string & db_id,const std::string & db_session_id,std::string full_history_ts_low,BlobFileCompletionCallback * blob_callback)302 CompactionJob::CompactionJob(
303 int job_id, Compaction* compaction, const ImmutableDBOptions& db_options,
304 const MutableDBOptions& mutable_db_options, const FileOptions& file_options,
305 VersionSet* versions, const std::atomic<bool>* shutting_down,
306 const SequenceNumber preserve_deletes_seqnum, LogBuffer* log_buffer,
307 FSDirectory* db_directory, FSDirectory* output_directory,
308 FSDirectory* blob_output_directory, Statistics* stats,
309 InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler,
310 std::vector<SequenceNumber> existing_snapshots,
311 SequenceNumber earliest_write_conflict_snapshot,
312 const SnapshotChecker* snapshot_checker, std::shared_ptr<Cache> table_cache,
313 EventLogger* event_logger, bool paranoid_file_checks, bool measure_io_stats,
314 const std::string& dbname, CompactionJobStats* compaction_job_stats,
315 Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer,
316 const std::atomic<int>* manual_compaction_paused, const std::string& db_id,
317 const std::string& db_session_id, std::string full_history_ts_low,
318 BlobFileCompletionCallback* blob_callback)
319 : compact_(new CompactionState(compaction)),
320 compaction_stats_(compaction->compaction_reason(), 1),
321 db_options_(db_options),
322 mutable_db_options_copy_(mutable_db_options),
323 log_buffer_(log_buffer),
324 output_directory_(output_directory),
325 stats_(stats),
326 bottommost_level_(false),
327 write_hint_(Env::WLTH_NOT_SET),
328 job_id_(job_id),
329 compaction_job_stats_(compaction_job_stats),
330 dbname_(dbname),
331 db_id_(db_id),
332 db_session_id_(db_session_id),
333 file_options_(file_options),
334 env_(db_options.env),
335 io_tracer_(io_tracer),
336 fs_(db_options.fs, io_tracer),
337 file_options_for_read_(
338 fs_->OptimizeForCompactionTableRead(file_options, db_options_)),
339 versions_(versions),
340 shutting_down_(shutting_down),
341 manual_compaction_paused_(manual_compaction_paused),
342 preserve_deletes_seqnum_(preserve_deletes_seqnum),
343 db_directory_(db_directory),
344 blob_output_directory_(blob_output_directory),
345 db_mutex_(db_mutex),
346 db_error_handler_(db_error_handler),
347 existing_snapshots_(std::move(existing_snapshots)),
348 earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
349 snapshot_checker_(snapshot_checker),
350 table_cache_(std::move(table_cache)),
351 event_logger_(event_logger),
352 paranoid_file_checks_(paranoid_file_checks),
353 measure_io_stats_(measure_io_stats),
354 thread_pri_(thread_pri),
355 full_history_ts_low_(std::move(full_history_ts_low)),
356 blob_callback_(blob_callback) {
357 assert(compaction_job_stats_ != nullptr);
358 assert(log_buffer_ != nullptr);
359 const auto* cfd = compact_->compaction->column_family_data();
360 ThreadStatusUtil::SetColumnFamily(cfd, cfd->ioptions()->env,
361 db_options_.enable_thread_tracking);
362 ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
363 ReportStartedCompaction(compaction);
364 }
365
~CompactionJob()366 CompactionJob::~CompactionJob() {
367 assert(compact_ == nullptr);
368 ThreadStatusUtil::ResetThreadStatus();
369 }
370
ReportStartedCompaction(Compaction * compaction)371 void CompactionJob::ReportStartedCompaction(Compaction* compaction) {
372 const auto* cfd = compact_->compaction->column_family_data();
373 ThreadStatusUtil::SetColumnFamily(cfd, cfd->ioptions()->env,
374 db_options_.enable_thread_tracking);
375
376 ThreadStatusUtil::SetThreadOperationProperty(ThreadStatus::COMPACTION_JOB_ID,
377 job_id_);
378
379 ThreadStatusUtil::SetThreadOperationProperty(
380 ThreadStatus::COMPACTION_INPUT_OUTPUT_LEVEL,
381 (static_cast<uint64_t>(compact_->compaction->start_level()) << 32) +
382 compact_->compaction->output_level());
383
384 // In the current design, a CompactionJob is always created
385 // for non-trivial compaction.
386 assert(compaction->IsTrivialMove() == false ||
387 compaction->is_manual_compaction() == true);
388
389 ThreadStatusUtil::SetThreadOperationProperty(
390 ThreadStatus::COMPACTION_PROP_FLAGS,
391 compaction->is_manual_compaction() +
392 (compaction->deletion_compaction() << 1));
393
394 ThreadStatusUtil::SetThreadOperationProperty(
395 ThreadStatus::COMPACTION_TOTAL_INPUT_BYTES,
396 compaction->CalculateTotalInputSize());
397
398 IOSTATS_RESET(bytes_written);
399 IOSTATS_RESET(bytes_read);
400 ThreadStatusUtil::SetThreadOperationProperty(
401 ThreadStatus::COMPACTION_BYTES_WRITTEN, 0);
402 ThreadStatusUtil::SetThreadOperationProperty(
403 ThreadStatus::COMPACTION_BYTES_READ, 0);
404
405 // Set the thread operation after operation properties
406 // to ensure GetThreadList() can always show them all together.
407 ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
408
409 compaction_job_stats_->is_manual_compaction =
410 compaction->is_manual_compaction();
411 compaction_job_stats_->is_full_compaction = compaction->is_full_compaction();
412 }
413
Prepare()414 void CompactionJob::Prepare() {
415 AutoThreadOperationStageUpdater stage_updater(
416 ThreadStatus::STAGE_COMPACTION_PREPARE);
417
418 // Generate file_levels_ for compaction before making Iterator
419 auto* c = compact_->compaction;
420 assert(c->column_family_data() != nullptr);
421 assert(c->column_family_data()->current()->storage_info()->NumLevelFiles(
422 compact_->compaction->level()) > 0);
423
424 write_hint_ =
425 c->column_family_data()->CalculateSSTWriteHint(c->output_level());
426 bottommost_level_ = c->bottommost_level();
427
428 if (c->ShouldFormSubcompactions()) {
429 {
430 StopWatch sw(db_options_.clock, stats_, SUBCOMPACTION_SETUP_TIME);
431 GenSubcompactionBoundaries();
432 }
433 assert(sizes_.size() == boundaries_.size() + 1);
434
435 for (size_t i = 0; i <= boundaries_.size(); i++) {
436 Slice* start = i == 0 ? nullptr : &boundaries_[i - 1];
437 Slice* end = i == boundaries_.size() ? nullptr : &boundaries_[i];
438 compact_->sub_compact_states.emplace_back(c, start, end, sizes_[i]);
439 }
440 RecordInHistogram(stats_, NUM_SUBCOMPACTIONS_SCHEDULED,
441 compact_->sub_compact_states.size());
442 } else {
443 constexpr Slice* start = nullptr;
444 constexpr Slice* end = nullptr;
445 constexpr uint64_t size = 0;
446
447 compact_->sub_compact_states.emplace_back(c, start, end, size);
448 }
449 }
450
451 struct RangeWithSize {
452 Range range;
453 uint64_t size;
454
RangeWithSizeROCKSDB_NAMESPACE::RangeWithSize455 RangeWithSize(const Slice& a, const Slice& b, uint64_t s = 0)
456 : range(a, b), size(s) {}
457 };
458
GenSubcompactionBoundaries()459 void CompactionJob::GenSubcompactionBoundaries() {
460 auto* c = compact_->compaction;
461 auto* cfd = c->column_family_data();
462 const Comparator* cfd_comparator = cfd->user_comparator();
463 std::vector<Slice> bounds;
464 int start_lvl = c->start_level();
465 int out_lvl = c->output_level();
466
467 // Add the starting and/or ending key of certain input files as a potential
468 // boundary
469 for (size_t lvl_idx = 0; lvl_idx < c->num_input_levels(); lvl_idx++) {
470 int lvl = c->level(lvl_idx);
471 if (lvl >= start_lvl && lvl <= out_lvl) {
472 const LevelFilesBrief* flevel = c->input_levels(lvl_idx);
473 size_t num_files = flevel->num_files;
474
475 if (num_files == 0) {
476 continue;
477 }
478
479 if (lvl == 0) {
480 // For level 0 add the starting and ending key of each file since the
481 // files may have greatly differing key ranges (not range-partitioned)
482 for (size_t i = 0; i < num_files; i++) {
483 bounds.emplace_back(flevel->files[i].smallest_key);
484 bounds.emplace_back(flevel->files[i].largest_key);
485 }
486 } else {
487 // For all other levels add the smallest/largest key in the level to
488 // encompass the range covered by that level
489 bounds.emplace_back(flevel->files[0].smallest_key);
490 bounds.emplace_back(flevel->files[num_files - 1].largest_key);
491 if (lvl == out_lvl) {
492 // For the last level include the starting keys of all files since
493 // the last level is the largest and probably has the widest key
494 // range. Since it's range partitioned, the ending key of one file
495 // and the starting key of the next are very close (or identical).
496 for (size_t i = 1; i < num_files; i++) {
497 bounds.emplace_back(flevel->files[i].smallest_key);
498 }
499 }
500 }
501 }
502 }
503
504 std::sort(bounds.begin(), bounds.end(),
505 [cfd_comparator](const Slice& a, const Slice& b) -> bool {
506 return cfd_comparator->Compare(ExtractUserKey(a),
507 ExtractUserKey(b)) < 0;
508 });
509 // Remove duplicated entries from bounds
510 bounds.erase(
511 std::unique(bounds.begin(), bounds.end(),
512 [cfd_comparator](const Slice& a, const Slice& b) -> bool {
513 return cfd_comparator->Compare(ExtractUserKey(a),
514 ExtractUserKey(b)) == 0;
515 }),
516 bounds.end());
517
518 // Combine consecutive pairs of boundaries into ranges with an approximate
519 // size of data covered by keys in that range
520 uint64_t sum = 0;
521 std::vector<RangeWithSize> ranges;
522 // Get input version from CompactionState since it's already referenced
523 // earlier in SetInputVersioCompaction::SetInputVersion and will not change
524 // when db_mutex_ is released below
525 auto* v = compact_->compaction->input_version();
526 for (auto it = bounds.begin();;) {
527 const Slice a = *it;
528 ++it;
529
530 if (it == bounds.end()) {
531 break;
532 }
533
534 const Slice b = *it;
535
536 // ApproximateSize could potentially create table reader iterator to seek
537 // to the index block and may incur I/O cost in the process. Unlock db
538 // mutex to reduce contention
539 db_mutex_->Unlock();
540 uint64_t size = versions_->ApproximateSize(SizeApproximationOptions(), v, a,
541 b, start_lvl, out_lvl + 1,
542 TableReaderCaller::kCompaction);
543 db_mutex_->Lock();
544 ranges.emplace_back(a, b, size);
545 sum += size;
546 }
547
548 // Group the ranges into subcompactions
549 const double min_file_fill_percent = 4.0 / 5;
550 int base_level = v->storage_info()->base_level();
551 uint64_t max_output_files = static_cast<uint64_t>(std::ceil(
552 sum / min_file_fill_percent /
553 MaxFileSizeForLevel(*(c->mutable_cf_options()), out_lvl,
554 c->immutable_cf_options()->compaction_style, base_level,
555 c->immutable_cf_options()->level_compaction_dynamic_level_bytes)));
556 uint64_t subcompactions =
557 std::min({static_cast<uint64_t>(ranges.size()),
558 static_cast<uint64_t>(c->max_subcompactions()),
559 max_output_files});
560
561 if (subcompactions > 1) {
562 double mean = sum * 1.0 / subcompactions;
563 // Greedily add ranges to the subcompaction until the sum of the ranges'
564 // sizes becomes >= the expected mean size of a subcompaction
565 sum = 0;
566 for (size_t i = 0; i + 1 < ranges.size(); i++) {
567 sum += ranges[i].size;
568 if (subcompactions == 1) {
569 // If there's only one left to schedule then it goes to the end so no
570 // need to put an end boundary
571 continue;
572 }
573 if (sum >= mean) {
574 boundaries_.emplace_back(ExtractUserKey(ranges[i].range.limit));
575 sizes_.emplace_back(sum);
576 subcompactions--;
577 sum = 0;
578 }
579 }
580 sizes_.emplace_back(sum + ranges.back().size);
581 } else {
582 // Only one range so its size is the total sum of sizes computed above
583 sizes_.emplace_back(sum);
584 }
585 }
586
Run()587 Status CompactionJob::Run() {
588 AutoThreadOperationStageUpdater stage_updater(
589 ThreadStatus::STAGE_COMPACTION_RUN);
590 TEST_SYNC_POINT("CompactionJob::Run():Start");
591 log_buffer_->FlushBufferToLog();
592 LogCompaction();
593
594 const size_t num_threads = compact_->sub_compact_states.size();
595 assert(num_threads > 0);
596 const uint64_t start_micros = db_options_.clock->NowMicros();
597
598 // Launch a thread for each of subcompactions 1...num_threads-1
599 std::vector<port::Thread> thread_pool;
600 thread_pool.reserve(num_threads - 1);
601 for (size_t i = 1; i < compact_->sub_compact_states.size(); i++) {
602 thread_pool.emplace_back(&CompactionJob::ProcessKeyValueCompaction, this,
603 &compact_->sub_compact_states[i]);
604 }
605
606 // Always schedule the first subcompaction (whether or not there are also
607 // others) in the current thread to be efficient with resources
608 ProcessKeyValueCompaction(&compact_->sub_compact_states[0]);
609
610 // Wait for all other threads (if there are any) to finish execution
611 for (auto& thread : thread_pool) {
612 thread.join();
613 }
614
615 compaction_stats_.micros = db_options_.clock->NowMicros() - start_micros;
616 compaction_stats_.cpu_micros = 0;
617 for (size_t i = 0; i < compact_->sub_compact_states.size(); i++) {
618 compaction_stats_.cpu_micros +=
619 compact_->sub_compact_states[i].compaction_job_stats.cpu_micros;
620 }
621
622 RecordTimeToHistogram(stats_, COMPACTION_TIME, compaction_stats_.micros);
623 RecordTimeToHistogram(stats_, COMPACTION_CPU_TIME,
624 compaction_stats_.cpu_micros);
625
626 TEST_SYNC_POINT("CompactionJob::Run:BeforeVerify");
627
628 // Check if any thread encountered an error during execution
629 Status status;
630 IOStatus io_s;
631 bool wrote_new_blob_files = false;
632
633 for (const auto& state : compact_->sub_compact_states) {
634 if (!state.status.ok()) {
635 status = state.status;
636 io_s = state.io_status;
637 break;
638 }
639
640 if (!state.blob_file_additions.empty()) {
641 wrote_new_blob_files = true;
642 }
643 }
644
645 if (io_status_.ok()) {
646 io_status_ = io_s;
647 }
648 if (status.ok()) {
649 constexpr IODebugContext* dbg = nullptr;
650
651 if (output_directory_) {
652 io_s = output_directory_->Fsync(IOOptions(), dbg);
653 }
654
655 if (io_s.ok() && wrote_new_blob_files && blob_output_directory_ &&
656 blob_output_directory_ != output_directory_) {
657 io_s = blob_output_directory_->Fsync(IOOptions(), dbg);
658 }
659 }
660 if (io_status_.ok()) {
661 io_status_ = io_s;
662 }
663 if (status.ok()) {
664 status = io_s;
665 }
666 if (status.ok()) {
667 thread_pool.clear();
668 std::vector<const CompactionJob::SubcompactionState::Output*> files_output;
669 for (const auto& state : compact_->sub_compact_states) {
670 for (const auto& output : state.outputs) {
671 files_output.emplace_back(&output);
672 }
673 }
674 ColumnFamilyData* cfd = compact_->compaction->column_family_data();
675 auto prefix_extractor =
676 compact_->compaction->mutable_cf_options()->prefix_extractor.get();
677 std::atomic<size_t> next_file_idx(0);
678 auto verify_table = [&](Status& output_status) {
679 while (true) {
680 size_t file_idx = next_file_idx.fetch_add(1);
681 if (file_idx >= files_output.size()) {
682 break;
683 }
684 // Verify that the table is usable
685 // We set for_compaction to false and don't OptimizeForCompactionTableRead
686 // here because this is a special case after we finish the table building
687 // No matter whether use_direct_io_for_flush_and_compaction is true,
688 // we will regard this verification as user reads since the goal is
689 // to cache it here for further user reads
690 ReadOptions read_options;
691 InternalIterator* iter = cfd->table_cache()->NewIterator(
692 read_options, file_options_, cfd->internal_comparator(),
693 files_output[file_idx]->meta, /*range_del_agg=*/nullptr,
694 prefix_extractor,
695 /*table_reader_ptr=*/nullptr,
696 cfd->internal_stats()->GetFileReadHist(
697 compact_->compaction->output_level()),
698 TableReaderCaller::kCompactionRefill, /*arena=*/nullptr,
699 /*skip_filters=*/false, compact_->compaction->output_level(),
700 MaxFileSizeForL0MetaPin(
701 *compact_->compaction->mutable_cf_options()),
702 /*smallest_compaction_key=*/nullptr,
703 /*largest_compaction_key=*/nullptr,
704 /*allow_unprepared_value=*/false);
705 auto s = iter->status();
706
707 if (s.ok() && paranoid_file_checks_) {
708 OutputValidator validator(cfd->internal_comparator(),
709 /*_enable_order_check=*/true,
710 /*_enable_hash=*/true);
711 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
712 s = validator.Add(iter->key(), iter->value());
713 if (!s.ok()) {
714 break;
715 }
716 }
717 if (s.ok()) {
718 s = iter->status();
719 }
720 if (s.ok() &&
721 !validator.CompareValidator(files_output[file_idx]->validator)) {
722 s = Status::Corruption("Paranoid checksums do not match");
723 }
724 }
725
726 delete iter;
727
728 if (!s.ok()) {
729 output_status = s;
730 break;
731 }
732 }
733 };
734 for (size_t i = 1; i < compact_->sub_compact_states.size(); i++) {
735 thread_pool.emplace_back(verify_table,
736 std::ref(compact_->sub_compact_states[i].status));
737 }
738 verify_table(compact_->sub_compact_states[0].status);
739 for (auto& thread : thread_pool) {
740 thread.join();
741 }
742 for (const auto& state : compact_->sub_compact_states) {
743 if (!state.status.ok()) {
744 status = state.status;
745 break;
746 }
747 }
748 }
749
750 TablePropertiesCollection tp;
751 for (const auto& state : compact_->sub_compact_states) {
752 for (const auto& output : state.outputs) {
753 auto fn =
754 TableFileName(state.compaction->immutable_cf_options()->cf_paths,
755 output.meta.fd.GetNumber(), output.meta.fd.GetPathId());
756 tp[fn] = output.table_properties;
757 }
758 }
759 compact_->compaction->SetOutputTableProperties(std::move(tp));
760
761 // Finish up all book-keeping to unify the subcompaction results
762 AggregateStatistics();
763 UpdateCompactionStats();
764
765 RecordCompactionIOStats();
766 LogFlush(db_options_.info_log);
767 TEST_SYNC_POINT("CompactionJob::Run():End");
768
769 compact_->status = status;
770 return status;
771 }
772
Install(const MutableCFOptions & mutable_cf_options)773 Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
774 assert(compact_);
775
776 AutoThreadOperationStageUpdater stage_updater(
777 ThreadStatus::STAGE_COMPACTION_INSTALL);
778 db_mutex_->AssertHeld();
779 Status status = compact_->status;
780
781 ColumnFamilyData* cfd = compact_->compaction->column_family_data();
782 assert(cfd);
783
784 cfd->internal_stats()->AddCompactionStats(
785 compact_->compaction->output_level(), thread_pri_, compaction_stats_);
786
787 if (status.ok()) {
788 status = InstallCompactionResults(mutable_cf_options);
789 }
790 if (!versions_->io_status().ok()) {
791 io_status_ = versions_->io_status();
792 }
793
794 VersionStorageInfo::LevelSummaryStorage tmp;
795 auto vstorage = cfd->current()->storage_info();
796 const auto& stats = compaction_stats_;
797
798 double read_write_amp = 0.0;
799 double write_amp = 0.0;
800 double bytes_read_per_sec = 0;
801 double bytes_written_per_sec = 0;
802
803 const uint64_t bytes_read_non_output_and_blob =
804 stats.bytes_read_non_output_levels + stats.bytes_read_blob;
805 const uint64_t bytes_read_all =
806 stats.bytes_read_output_level + bytes_read_non_output_and_blob;
807 const uint64_t bytes_written_all =
808 stats.bytes_written + stats.bytes_written_blob;
809
810 if (bytes_read_non_output_and_blob > 0) {
811 read_write_amp = (bytes_written_all + bytes_read_all) /
812 static_cast<double>(bytes_read_non_output_and_blob);
813 write_amp =
814 bytes_written_all / static_cast<double>(bytes_read_non_output_and_blob);
815 }
816 if (stats.micros > 0) {
817 bytes_read_per_sec = bytes_read_all / static_cast<double>(stats.micros);
818 bytes_written_per_sec =
819 bytes_written_all / static_cast<double>(stats.micros);
820 }
821
822 const std::string& column_family_name = cfd->GetName();
823
824 constexpr double kMB = 1048576.0;
825
826 ROCKS_LOG_BUFFER(
827 log_buffer_,
828 "[%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, "
829 "files in(%d, %d) out(%d +%d blob) "
830 "MB in(%.1f, %.1f +%.1f blob) out(%.1f +%.1f blob), "
831 "read-write-amplify(%.1f) write-amplify(%.1f) %s, records in: %" PRIu64
832 ", records dropped: %" PRIu64 " output_compression: %s\n",
833 column_family_name.c_str(), vstorage->LevelSummary(&tmp),
834 bytes_read_per_sec, bytes_written_per_sec,
835 compact_->compaction->output_level(),
836 stats.num_input_files_in_non_output_levels,
837 stats.num_input_files_in_output_level, stats.num_output_files,
838 stats.num_output_files_blob, stats.bytes_read_non_output_levels / kMB,
839 stats.bytes_read_output_level / kMB, stats.bytes_read_blob / kMB,
840 stats.bytes_written / kMB, stats.bytes_written_blob / kMB, read_write_amp,
841 write_amp, status.ToString().c_str(), stats.num_input_records,
842 stats.num_dropped_records,
843 CompressionTypeToString(compact_->compaction->output_compression())
844 .c_str());
845
846 const auto& blob_files = vstorage->GetBlobFiles();
847 if (!blob_files.empty()) {
848 ROCKS_LOG_BUFFER(log_buffer_,
849 "[%s] Blob file summary: head=%" PRIu64 ", tail=%" PRIu64
850 "\n",
851 column_family_name.c_str(), blob_files.begin()->first,
852 blob_files.rbegin()->first);
853 }
854
855 UpdateCompactionJobStats(stats);
856
857 auto stream = event_logger_->LogToBuffer(log_buffer_);
858 stream << "job" << job_id_ << "event"
859 << "compaction_finished"
860 << "compaction_time_micros" << stats.micros
861 << "compaction_time_cpu_micros" << stats.cpu_micros << "output_level"
862 << compact_->compaction->output_level() << "num_output_files"
863 << compact_->num_output_files << "total_output_size"
864 << compact_->total_bytes;
865
866 if (compact_->num_blob_output_files > 0) {
867 stream << "num_blob_output_files" << compact_->num_blob_output_files
868 << "total_blob_output_size" << compact_->total_blob_bytes;
869 }
870
871 stream << "num_input_records" << stats.num_input_records
872 << "num_output_records" << compact_->num_output_records
873 << "num_subcompactions" << compact_->sub_compact_states.size()
874 << "output_compression"
875 << CompressionTypeToString(compact_->compaction->output_compression());
876
877 stream << "num_single_delete_mismatches"
878 << compaction_job_stats_->num_single_del_mismatch;
879 stream << "num_single_delete_fallthrough"
880 << compaction_job_stats_->num_single_del_fallthru;
881
882 if (measure_io_stats_) {
883 stream << "file_write_nanos" << compaction_job_stats_->file_write_nanos;
884 stream << "file_range_sync_nanos"
885 << compaction_job_stats_->file_range_sync_nanos;
886 stream << "file_fsync_nanos" << compaction_job_stats_->file_fsync_nanos;
887 stream << "file_prepare_write_nanos"
888 << compaction_job_stats_->file_prepare_write_nanos;
889 }
890
891 stream << "lsm_state";
892 stream.StartArray();
893 for (int level = 0; level < vstorage->num_levels(); ++level) {
894 stream << vstorage->NumLevelFiles(level);
895 }
896 stream.EndArray();
897
898 if (!blob_files.empty()) {
899 stream << "blob_file_head" << blob_files.begin()->first;
900 stream << "blob_file_tail" << blob_files.rbegin()->first;
901 }
902
903 CleanupCompaction();
904 return status;
905 }
906
907 #ifndef ROCKSDB_LITE
ProcessKeyValueCompactionWithCompactionService(SubcompactionState * sub_compact)908 void CompactionJob::ProcessKeyValueCompactionWithCompactionService(
909 SubcompactionState* sub_compact) {
910 assert(sub_compact);
911 assert(sub_compact->compaction);
912 assert(db_options_.compaction_service);
913
914 const Compaction* compaction = sub_compact->compaction;
915 CompactionServiceInput compaction_input;
916 compaction_input.output_level = compaction->output_level();
917
918 const std::vector<CompactionInputFiles>& inputs =
919 *(compact_->compaction->inputs());
920 for (const auto& files_per_level : inputs) {
921 for (const auto& file : files_per_level.files) {
922 compaction_input.input_files.emplace_back(
923 MakeTableFileName(file->fd.GetNumber()));
924 }
925 }
926 compaction_input.column_family.name =
927 compaction->column_family_data()->GetName();
928 compaction_input.column_family.options =
929 compaction->column_family_data()->GetLatestCFOptions();
930 compaction_input.db_options =
931 BuildDBOptions(db_options_, mutable_db_options_copy_);
932 compaction_input.snapshots = existing_snapshots_;
933 compaction_input.has_begin = sub_compact->start;
934 compaction_input.begin =
935 compaction_input.has_begin ? sub_compact->start->ToString() : "";
936 compaction_input.has_end = sub_compact->end;
937 compaction_input.end =
938 compaction_input.has_end ? sub_compact->end->ToString() : "";
939 compaction_input.approx_size = sub_compact->approx_size;
940
941 std::string compaction_input_binary;
942 Status s = compaction_input.Write(&compaction_input_binary);
943 if (!s.ok()) {
944 sub_compact->status = s;
945 return;
946 }
947
948 std::ostringstream input_files_oss;
949 bool is_first_one = true;
950 for (const auto& file : compaction_input.input_files) {
951 input_files_oss << (is_first_one ? "" : ", ") << file;
952 is_first_one = false;
953 }
954
955 ROCKS_LOG_INFO(
956 db_options_.info_log,
957 "[%s] [JOB %d] Starting remote compaction (output level: %d): %s",
958 compaction_input.column_family.name.c_str(), job_id_,
959 compaction_input.output_level, input_files_oss.str().c_str());
960 CompactionServiceJobStatus compaction_status =
961 db_options_.compaction_service->Start(compaction_input_binary, job_id_);
962 if (compaction_status != CompactionServiceJobStatus::kSuccess) {
963 sub_compact->status =
964 Status::Incomplete("CompactionService failed to start compaction job.");
965 return;
966 }
967
968 std::string compaction_result_binary;
969 compaction_status = db_options_.compaction_service->WaitForComplete(
970 job_id_, &compaction_result_binary);
971
972 CompactionServiceResult compaction_result;
973 s = CompactionServiceResult::Read(compaction_result_binary,
974 &compaction_result);
975 if (compaction_status != CompactionServiceJobStatus::kSuccess) {
976 sub_compact->status =
977 s.ok() ? compaction_result.status
978 : Status::Incomplete(
979 "CompactionService failed to run compaction job.");
980 compaction_result.status.PermitUncheckedError();
981 ROCKS_LOG_WARN(db_options_.info_log,
982 "[%s] [JOB %d] Remote compaction failed, status: %s",
983 compaction_input.column_family.name.c_str(), job_id_,
984 s.ToString().c_str());
985 return;
986 }
987
988 if (!s.ok()) {
989 sub_compact->status = s;
990 compaction_result.status.PermitUncheckedError();
991 return;
992 }
993 sub_compact->status = compaction_result.status;
994
995 std::ostringstream output_files_oss;
996 is_first_one = true;
997 for (const auto& file : compaction_result.output_files) {
998 output_files_oss << (is_first_one ? "" : ", ") << file.file_name;
999 is_first_one = false;
1000 }
1001
1002 ROCKS_LOG_INFO(db_options_.info_log,
1003 "[%s] [JOB %d] Receive remote compaction result, output path: "
1004 "%s, files: %s",
1005 compaction_input.column_family.name.c_str(), job_id_,
1006 compaction_result.output_path.c_str(),
1007 output_files_oss.str().c_str());
1008
1009 if (!s.ok()) {
1010 sub_compact->status = s;
1011 return;
1012 }
1013
1014 for (const auto& file : compaction_result.output_files) {
1015 uint64_t file_num = versions_->NewFileNumber();
1016 auto src_file = compaction_result.output_path + "/" + file.file_name;
1017 auto tgt_file = TableFileName(compaction->immutable_cf_options()->cf_paths,
1018 file_num, compaction->output_path_id());
1019 s = fs_->RenameFile(src_file, tgt_file, IOOptions(), nullptr);
1020 if (!s.ok()) {
1021 sub_compact->status = s;
1022 return;
1023 }
1024
1025 FileMetaData meta;
1026 uint64_t file_size;
1027 s = fs_->GetFileSize(tgt_file, IOOptions(), &file_size, nullptr);
1028 if (!s.ok()) {
1029 sub_compact->status = s;
1030 return;
1031 }
1032 meta.fd = FileDescriptor(file_num, compaction->output_path_id(), file_size,
1033 file.smallest_seqno, file.largest_seqno);
1034 meta.smallest.DecodeFrom(file.smallest_internal_key);
1035 meta.largest.DecodeFrom(file.largest_internal_key);
1036 meta.oldest_ancester_time = file.oldest_ancester_time;
1037 meta.file_creation_time = file.file_creation_time;
1038 meta.marked_for_compaction = file.marked_for_compaction;
1039
1040 auto cfd = compaction->column_family_data();
1041 sub_compact->outputs.emplace_back(std::move(meta),
1042 cfd->internal_comparator(), false, false,
1043 true, file.paranoid_hash);
1044 }
1045 sub_compact->compaction_job_stats = compaction_result.stats;
1046 sub_compact->num_output_records = compaction_result.num_output_records;
1047 sub_compact->approx_size = compaction_input.approx_size; // is this used?
1048 sub_compact->total_bytes = compaction_result.total_bytes;
1049 IOSTATS_ADD(bytes_written, compaction_result.bytes_written);
1050 IOSTATS_ADD(bytes_read, compaction_result.bytes_read);
1051 }
1052 #endif // !ROCKSDB_LITE
1053
ProcessKeyValueCompaction(SubcompactionState * sub_compact)1054 void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
1055 assert(sub_compact);
1056 assert(sub_compact->compaction);
1057
1058 #ifndef ROCKSDB_LITE
1059 if (db_options_.compaction_service) {
1060 return ProcessKeyValueCompactionWithCompactionService(sub_compact);
1061 }
1062 #endif // !ROCKSDB_LITE
1063
1064 uint64_t prev_cpu_micros = db_options_.clock->CPUNanos() / 1000;
1065
1066 ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
1067
1068 // Create compaction filter and fail the compaction if
1069 // IgnoreSnapshots() = false because it is not supported anymore
1070 const CompactionFilter* compaction_filter =
1071 cfd->ioptions()->compaction_filter;
1072 std::unique_ptr<CompactionFilter> compaction_filter_from_factory = nullptr;
1073 if (compaction_filter == nullptr) {
1074 compaction_filter_from_factory =
1075 sub_compact->compaction->CreateCompactionFilter();
1076 compaction_filter = compaction_filter_from_factory.get();
1077 }
1078 if (compaction_filter != nullptr && !compaction_filter->IgnoreSnapshots()) {
1079 sub_compact->status = Status::NotSupported(
1080 "CompactionFilter::IgnoreSnapshots() = false is not supported "
1081 "anymore.");
1082 return;
1083 }
1084
1085 CompactionRangeDelAggregator range_del_agg(&cfd->internal_comparator(),
1086 existing_snapshots_);
1087 ReadOptions read_options;
1088 read_options.verify_checksums = true;
1089 read_options.fill_cache = false;
1090 // Compaction iterators shouldn't be confined to a single prefix.
1091 // Compactions use Seek() for
1092 // (a) concurrent compactions,
1093 // (b) CompactionFilter::Decision::kRemoveAndSkipUntil.
1094 read_options.total_order_seek = true;
1095
1096 // Although the v2 aggregator is what the level iterator(s) know about,
1097 // the AddTombstones calls will be propagated down to the v1 aggregator.
1098 std::unique_ptr<InternalIterator> input(
1099 versions_->MakeInputIterator(read_options, sub_compact->compaction,
1100 &range_del_agg, file_options_for_read_));
1101
1102 AutoThreadOperationStageUpdater stage_updater(
1103 ThreadStatus::STAGE_COMPACTION_PROCESS_KV);
1104
1105 // I/O measurement variables
1106 PerfLevel prev_perf_level = PerfLevel::kEnableTime;
1107 const uint64_t kRecordStatsEvery = 1000;
1108 uint64_t prev_write_nanos = 0;
1109 uint64_t prev_fsync_nanos = 0;
1110 uint64_t prev_range_sync_nanos = 0;
1111 uint64_t prev_prepare_write_nanos = 0;
1112 uint64_t prev_cpu_write_nanos = 0;
1113 uint64_t prev_cpu_read_nanos = 0;
1114 if (measure_io_stats_) {
1115 prev_perf_level = GetPerfLevel();
1116 SetPerfLevel(PerfLevel::kEnableTimeAndCPUTimeExceptForMutex);
1117 prev_write_nanos = IOSTATS(write_nanos);
1118 prev_fsync_nanos = IOSTATS(fsync_nanos);
1119 prev_range_sync_nanos = IOSTATS(range_sync_nanos);
1120 prev_prepare_write_nanos = IOSTATS(prepare_write_nanos);
1121 prev_cpu_write_nanos = IOSTATS(cpu_write_nanos);
1122 prev_cpu_read_nanos = IOSTATS(cpu_read_nanos);
1123 }
1124
1125 MergeHelper merge(
1126 env_, cfd->user_comparator(), cfd->ioptions()->merge_operator.get(),
1127 compaction_filter, db_options_.info_log.get(),
1128 false /* internal key corruption is expected */,
1129 existing_snapshots_.empty() ? 0 : existing_snapshots_.back(),
1130 snapshot_checker_, compact_->compaction->level(), db_options_.stats);
1131
1132 const MutableCFOptions* mutable_cf_options =
1133 sub_compact->compaction->mutable_cf_options();
1134 assert(mutable_cf_options);
1135
1136 std::vector<std::string> blob_file_paths;
1137
1138 std::unique_ptr<BlobFileBuilder> blob_file_builder(
1139 mutable_cf_options->enable_blob_files
1140 ? new BlobFileBuilder(versions_, fs_.get(),
1141 sub_compact->compaction->immutable_cf_options(),
1142 mutable_cf_options, &file_options_, job_id_,
1143 cfd->GetID(), cfd->GetName(),
1144 Env::IOPriority::IO_LOW, write_hint_,
1145 io_tracer_, blob_callback_, &blob_file_paths,
1146 &sub_compact->blob_file_additions)
1147 : nullptr);
1148
1149 TEST_SYNC_POINT("CompactionJob::Run():Inprogress");
1150 TEST_SYNC_POINT_CALLBACK(
1151 "CompactionJob::Run():PausingManualCompaction:1",
1152 reinterpret_cast<void*>(
1153 const_cast<std::atomic<int>*>(manual_compaction_paused_)));
1154
1155 Slice* start = sub_compact->start;
1156 Slice* end = sub_compact->end;
1157 if (start != nullptr) {
1158 IterKey start_iter;
1159 start_iter.SetInternalKey(*start, kMaxSequenceNumber, kValueTypeForSeek);
1160 input->Seek(start_iter.GetInternalKey());
1161 } else {
1162 input->SeekToFirst();
1163 }
1164
1165 Status status;
1166 const std::string* const full_history_ts_low =
1167 full_history_ts_low_.empty() ? nullptr : &full_history_ts_low_;
1168 sub_compact->c_iter.reset(new CompactionIterator(
1169 input.get(), cfd->user_comparator(), &merge, versions_->LastSequence(),
1170 &existing_snapshots_, earliest_write_conflict_snapshot_,
1171 snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_),
1172 /*expect_valid_internal_key=*/true, &range_del_agg,
1173 blob_file_builder.get(), db_options_.allow_data_in_errors,
1174 sub_compact->compaction, compaction_filter, shutting_down_,
1175 preserve_deletes_seqnum_, manual_compaction_paused_, db_options_.info_log,
1176 full_history_ts_low));
1177 auto c_iter = sub_compact->c_iter.get();
1178 c_iter->SeekToFirst();
1179 if (c_iter->Valid() && sub_compact->compaction->output_level() != 0) {
1180 // ShouldStopBefore() maintains state based on keys processed so far. The
1181 // compaction loop always calls it on the "next" key, thus won't tell it the
1182 // first key. So we do that here.
1183 sub_compact->ShouldStopBefore(c_iter->key(),
1184 sub_compact->current_output_file_size);
1185 }
1186 const auto& c_iter_stats = c_iter->iter_stats();
1187
1188 std::unique_ptr<SstPartitioner> partitioner =
1189 sub_compact->compaction->output_level() == 0
1190 ? nullptr
1191 : sub_compact->compaction->CreateSstPartitioner();
1192 std::string last_key_for_partitioner;
1193
1194 while (status.ok() && !cfd->IsDropped() && c_iter->Valid()) {
1195 // Invariant: c_iter.status() is guaranteed to be OK if c_iter->Valid()
1196 // returns true.
1197 const Slice& key = c_iter->key();
1198 const Slice& value = c_iter->value();
1199
1200 // If an end key (exclusive) is specified, check if the current key is
1201 // >= than it and exit if it is because the iterator is out of its range
1202 if (end != nullptr &&
1203 cfd->user_comparator()->Compare(c_iter->user_key(), *end) >= 0) {
1204 break;
1205 }
1206 if (c_iter_stats.num_input_records % kRecordStatsEvery ==
1207 kRecordStatsEvery - 1) {
1208 RecordDroppedKeys(c_iter_stats, &sub_compact->compaction_job_stats);
1209 c_iter->ResetRecordCounts();
1210 RecordCompactionIOStats();
1211 }
1212
1213 // Open output file if necessary
1214 if (sub_compact->builder == nullptr) {
1215 status = OpenCompactionOutputFile(sub_compact);
1216 if (!status.ok()) {
1217 break;
1218 }
1219 }
1220 status = sub_compact->AddToBuilder(key, value);
1221 if (!status.ok()) {
1222 break;
1223 }
1224
1225 sub_compact->current_output_file_size =
1226 sub_compact->builder->EstimatedFileSize();
1227 const ParsedInternalKey& ikey = c_iter->ikey();
1228 sub_compact->current_output()->meta.UpdateBoundaries(
1229 key, value, ikey.sequence, ikey.type);
1230 sub_compact->num_output_records++;
1231
1232 // Close output file if it is big enough. Two possibilities determine it's
1233 // time to close it: (1) the current key should be this file's last key, (2)
1234 // the next key should not be in this file.
1235 //
1236 // TODO(aekmekji): determine if file should be closed earlier than this
1237 // during subcompactions (i.e. if output size, estimated by input size, is
1238 // going to be 1.2MB and max_output_file_size = 1MB, prefer to have 0.6MB
1239 // and 0.6MB instead of 1MB and 0.2MB)
1240 bool output_file_ended = false;
1241 if (sub_compact->compaction->output_level() != 0 &&
1242 sub_compact->current_output_file_size >=
1243 sub_compact->compaction->max_output_file_size()) {
1244 // (1) this key terminates the file. For historical reasons, the iterator
1245 // status before advancing will be given to FinishCompactionOutputFile().
1246 output_file_ended = true;
1247 }
1248 TEST_SYNC_POINT_CALLBACK(
1249 "CompactionJob::Run():PausingManualCompaction:2",
1250 reinterpret_cast<void*>(
1251 const_cast<std::atomic<int>*>(manual_compaction_paused_)));
1252 if (partitioner.get()) {
1253 last_key_for_partitioner.assign(c_iter->user_key().data_,
1254 c_iter->user_key().size_);
1255 }
1256 c_iter->Next();
1257 if (c_iter->status().IsManualCompactionPaused()) {
1258 break;
1259 }
1260 if (!output_file_ended && c_iter->Valid()) {
1261 if (((partitioner.get() &&
1262 partitioner->ShouldPartition(PartitionerRequest(
1263 last_key_for_partitioner, c_iter->user_key(),
1264 sub_compact->current_output_file_size)) == kRequired) ||
1265 (sub_compact->compaction->output_level() != 0 &&
1266 sub_compact->ShouldStopBefore(
1267 c_iter->key(), sub_compact->current_output_file_size))) &&
1268 sub_compact->builder != nullptr) {
1269 // (2) this key belongs to the next file. For historical reasons, the
1270 // iterator status after advancing will be given to
1271 // FinishCompactionOutputFile().
1272 output_file_ended = true;
1273 }
1274 }
1275 if (output_file_ended) {
1276 const Slice* next_key = nullptr;
1277 if (c_iter->Valid()) {
1278 next_key = &c_iter->key();
1279 }
1280 CompactionIterationStats range_del_out_stats;
1281 status = FinishCompactionOutputFile(input->status(), sub_compact,
1282 &range_del_agg, &range_del_out_stats,
1283 next_key);
1284 RecordDroppedKeys(range_del_out_stats,
1285 &sub_compact->compaction_job_stats);
1286 }
1287 }
1288
1289 sub_compact->compaction_job_stats.num_blobs_read =
1290 c_iter_stats.num_blobs_read;
1291 sub_compact->compaction_job_stats.total_blob_bytes_read =
1292 c_iter_stats.total_blob_bytes_read;
1293 sub_compact->compaction_job_stats.num_input_deletion_records =
1294 c_iter_stats.num_input_deletion_records;
1295 sub_compact->compaction_job_stats.num_corrupt_keys =
1296 c_iter_stats.num_input_corrupt_records;
1297 sub_compact->compaction_job_stats.num_single_del_fallthru =
1298 c_iter_stats.num_single_del_fallthru;
1299 sub_compact->compaction_job_stats.num_single_del_mismatch =
1300 c_iter_stats.num_single_del_mismatch;
1301 sub_compact->compaction_job_stats.total_input_raw_key_bytes +=
1302 c_iter_stats.total_input_raw_key_bytes;
1303 sub_compact->compaction_job_stats.total_input_raw_value_bytes +=
1304 c_iter_stats.total_input_raw_value_bytes;
1305
1306 RecordTick(stats_, FILTER_OPERATION_TOTAL_TIME,
1307 c_iter_stats.total_filter_time);
1308 RecordDroppedKeys(c_iter_stats, &sub_compact->compaction_job_stats);
1309 RecordCompactionIOStats();
1310
1311 if (status.ok() && cfd->IsDropped()) {
1312 status =
1313 Status::ColumnFamilyDropped("Column family dropped during compaction");
1314 }
1315 if ((status.ok() || status.IsColumnFamilyDropped()) &&
1316 shutting_down_->load(std::memory_order_relaxed)) {
1317 status = Status::ShutdownInProgress("Database shutdown");
1318 }
1319 if ((status.ok() || status.IsColumnFamilyDropped()) &&
1320 (manual_compaction_paused_ &&
1321 manual_compaction_paused_->load(std::memory_order_relaxed) > 0)) {
1322 status = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
1323 }
1324 if (status.ok()) {
1325 status = input->status();
1326 }
1327 if (status.ok()) {
1328 status = c_iter->status();
1329 }
1330
1331 if (status.ok() && sub_compact->builder == nullptr &&
1332 sub_compact->outputs.size() == 0 && !range_del_agg.IsEmpty()) {
1333 // handle subcompaction containing only range deletions
1334 status = OpenCompactionOutputFile(sub_compact);
1335 }
1336
1337 // Call FinishCompactionOutputFile() even if status is not ok: it needs to
1338 // close the output file.
1339 if (sub_compact->builder != nullptr) {
1340 CompactionIterationStats range_del_out_stats;
1341 Status s = FinishCompactionOutputFile(status, sub_compact, &range_del_agg,
1342 &range_del_out_stats);
1343 if (!s.ok() && status.ok()) {
1344 status = s;
1345 }
1346 RecordDroppedKeys(range_del_out_stats, &sub_compact->compaction_job_stats);
1347 }
1348
1349 if (blob_file_builder) {
1350 if (status.ok()) {
1351 status = blob_file_builder->Finish();
1352 } else {
1353 blob_file_builder->Abandon();
1354 }
1355 blob_file_builder.reset();
1356 }
1357
1358 sub_compact->compaction_job_stats.cpu_micros =
1359 db_options_.clock->CPUNanos() / 1000 - prev_cpu_micros;
1360
1361 if (measure_io_stats_) {
1362 sub_compact->compaction_job_stats.file_write_nanos +=
1363 IOSTATS(write_nanos) - prev_write_nanos;
1364 sub_compact->compaction_job_stats.file_fsync_nanos +=
1365 IOSTATS(fsync_nanos) - prev_fsync_nanos;
1366 sub_compact->compaction_job_stats.file_range_sync_nanos +=
1367 IOSTATS(range_sync_nanos) - prev_range_sync_nanos;
1368 sub_compact->compaction_job_stats.file_prepare_write_nanos +=
1369 IOSTATS(prepare_write_nanos) - prev_prepare_write_nanos;
1370 sub_compact->compaction_job_stats.cpu_micros -=
1371 (IOSTATS(cpu_write_nanos) - prev_cpu_write_nanos +
1372 IOSTATS(cpu_read_nanos) - prev_cpu_read_nanos) /
1373 1000;
1374 if (prev_perf_level != PerfLevel::kEnableTimeAndCPUTimeExceptForMutex) {
1375 SetPerfLevel(prev_perf_level);
1376 }
1377 }
1378 #ifdef ROCKSDB_ASSERT_STATUS_CHECKED
1379 if (!status.ok()) {
1380 if (sub_compact->c_iter) {
1381 sub_compact->c_iter->status().PermitUncheckedError();
1382 }
1383 if (input) {
1384 input->status().PermitUncheckedError();
1385 }
1386 }
1387 #endif // ROCKSDB_ASSERT_STATUS_CHECKED
1388
1389 sub_compact->c_iter.reset();
1390 input.reset();
1391 sub_compact->status = status;
1392 }
1393
RecordDroppedKeys(const CompactionIterationStats & c_iter_stats,CompactionJobStats * compaction_job_stats)1394 void CompactionJob::RecordDroppedKeys(
1395 const CompactionIterationStats& c_iter_stats,
1396 CompactionJobStats* compaction_job_stats) {
1397 if (c_iter_stats.num_record_drop_user > 0) {
1398 RecordTick(stats_, COMPACTION_KEY_DROP_USER,
1399 c_iter_stats.num_record_drop_user);
1400 }
1401 if (c_iter_stats.num_record_drop_hidden > 0) {
1402 RecordTick(stats_, COMPACTION_KEY_DROP_NEWER_ENTRY,
1403 c_iter_stats.num_record_drop_hidden);
1404 if (compaction_job_stats) {
1405 compaction_job_stats->num_records_replaced +=
1406 c_iter_stats.num_record_drop_hidden;
1407 }
1408 }
1409 if (c_iter_stats.num_record_drop_obsolete > 0) {
1410 RecordTick(stats_, COMPACTION_KEY_DROP_OBSOLETE,
1411 c_iter_stats.num_record_drop_obsolete);
1412 if (compaction_job_stats) {
1413 compaction_job_stats->num_expired_deletion_records +=
1414 c_iter_stats.num_record_drop_obsolete;
1415 }
1416 }
1417 if (c_iter_stats.num_record_drop_range_del > 0) {
1418 RecordTick(stats_, COMPACTION_KEY_DROP_RANGE_DEL,
1419 c_iter_stats.num_record_drop_range_del);
1420 }
1421 if (c_iter_stats.num_range_del_drop_obsolete > 0) {
1422 RecordTick(stats_, COMPACTION_RANGE_DEL_DROP_OBSOLETE,
1423 c_iter_stats.num_range_del_drop_obsolete);
1424 }
1425 if (c_iter_stats.num_optimized_del_drop_obsolete > 0) {
1426 RecordTick(stats_, COMPACTION_OPTIMIZED_DEL_DROP_OBSOLETE,
1427 c_iter_stats.num_optimized_del_drop_obsolete);
1428 }
1429 }
1430
FinishCompactionOutputFile(const Status & input_status,SubcompactionState * sub_compact,CompactionRangeDelAggregator * range_del_agg,CompactionIterationStats * range_del_out_stats,const Slice * next_table_min_key)1431 Status CompactionJob::FinishCompactionOutputFile(
1432 const Status& input_status, SubcompactionState* sub_compact,
1433 CompactionRangeDelAggregator* range_del_agg,
1434 CompactionIterationStats* range_del_out_stats,
1435 const Slice* next_table_min_key /* = nullptr */) {
1436 AutoThreadOperationStageUpdater stage_updater(
1437 ThreadStatus::STAGE_COMPACTION_SYNC_FILE);
1438 assert(sub_compact != nullptr);
1439 assert(sub_compact->outfile);
1440 assert(sub_compact->builder != nullptr);
1441 assert(sub_compact->current_output() != nullptr);
1442
1443 uint64_t output_number = sub_compact->current_output()->meta.fd.GetNumber();
1444 assert(output_number != 0);
1445
1446 ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
1447 const Comparator* ucmp = cfd->user_comparator();
1448 std::string file_checksum = kUnknownFileChecksum;
1449 std::string file_checksum_func_name = kUnknownFileChecksumFuncName;
1450
1451 // Check for iterator errors
1452 Status s = input_status;
1453 auto meta = &sub_compact->current_output()->meta;
1454 assert(meta != nullptr);
1455 if (s.ok()) {
1456 Slice lower_bound_guard, upper_bound_guard;
1457 std::string smallest_user_key;
1458 const Slice *lower_bound, *upper_bound;
1459 bool lower_bound_from_sub_compact = false;
1460 if (sub_compact->outputs.size() == 1) {
1461 // For the first output table, include range tombstones before the min key
1462 // but after the subcompaction boundary.
1463 lower_bound = sub_compact->start;
1464 lower_bound_from_sub_compact = true;
1465 } else if (meta->smallest.size() > 0) {
1466 // For subsequent output tables, only include range tombstones from min
1467 // key onwards since the previous file was extended to contain range
1468 // tombstones falling before min key.
1469 smallest_user_key = meta->smallest.user_key().ToString(false /*hex*/);
1470 lower_bound_guard = Slice(smallest_user_key);
1471 lower_bound = &lower_bound_guard;
1472 } else {
1473 lower_bound = nullptr;
1474 }
1475 if (next_table_min_key != nullptr) {
1476 // This may be the last file in the subcompaction in some cases, so we
1477 // need to compare the end key of subcompaction with the next file start
1478 // key. When the end key is chosen by the subcompaction, we know that
1479 // it must be the biggest key in output file. Therefore, it is safe to
1480 // use the smaller key as the upper bound of the output file, to ensure
1481 // that there is no overlapping between different output files.
1482 upper_bound_guard = ExtractUserKey(*next_table_min_key);
1483 if (sub_compact->end != nullptr &&
1484 ucmp->Compare(upper_bound_guard, *sub_compact->end) >= 0) {
1485 upper_bound = sub_compact->end;
1486 } else {
1487 upper_bound = &upper_bound_guard;
1488 }
1489 } else {
1490 // This is the last file in the subcompaction, so extend until the
1491 // subcompaction ends.
1492 upper_bound = sub_compact->end;
1493 }
1494 auto earliest_snapshot = kMaxSequenceNumber;
1495 if (existing_snapshots_.size() > 0) {
1496 earliest_snapshot = existing_snapshots_[0];
1497 }
1498 bool has_overlapping_endpoints;
1499 if (upper_bound != nullptr && meta->largest.size() > 0) {
1500 has_overlapping_endpoints =
1501 ucmp->Compare(meta->largest.user_key(), *upper_bound) == 0;
1502 } else {
1503 has_overlapping_endpoints = false;
1504 }
1505
1506 // The end key of the subcompaction must be bigger or equal to the upper
1507 // bound. If the end of subcompaction is null or the upper bound is null,
1508 // it means that this file is the last file in the compaction. So there
1509 // will be no overlapping between this file and others.
1510 assert(sub_compact->end == nullptr ||
1511 upper_bound == nullptr ||
1512 ucmp->Compare(*upper_bound , *sub_compact->end) <= 0);
1513 auto it = range_del_agg->NewIterator(lower_bound, upper_bound,
1514 has_overlapping_endpoints);
1515 // Position the range tombstone output iterator. There may be tombstone
1516 // fragments that are entirely out of range, so make sure that we do not
1517 // include those.
1518 if (lower_bound != nullptr) {
1519 it->Seek(*lower_bound);
1520 } else {
1521 it->SeekToFirst();
1522 }
1523 TEST_SYNC_POINT("CompactionJob::FinishCompactionOutputFile1");
1524 for (; it->Valid(); it->Next()) {
1525 auto tombstone = it->Tombstone();
1526 if (upper_bound != nullptr) {
1527 int cmp = ucmp->Compare(*upper_bound, tombstone.start_key_);
1528 if ((has_overlapping_endpoints && cmp < 0) ||
1529 (!has_overlapping_endpoints && cmp <= 0)) {
1530 // Tombstones starting after upper_bound only need to be included in
1531 // the next table. If the current SST ends before upper_bound, i.e.,
1532 // `has_overlapping_endpoints == false`, we can also skip over range
1533 // tombstones that start exactly at upper_bound. Such range tombstones
1534 // will be included in the next file and are not relevant to the point
1535 // keys or endpoints of the current file.
1536 break;
1537 }
1538 }
1539
1540 if (bottommost_level_ && tombstone.seq_ <= earliest_snapshot) {
1541 // TODO(andrewkr): tombstones that span multiple output files are
1542 // counted for each compaction output file, so lots of double counting.
1543 range_del_out_stats->num_range_del_drop_obsolete++;
1544 range_del_out_stats->num_record_drop_obsolete++;
1545 continue;
1546 }
1547
1548 auto kv = tombstone.Serialize();
1549 assert(lower_bound == nullptr ||
1550 ucmp->Compare(*lower_bound, kv.second) < 0);
1551 // Range tombstone is not supported by output validator yet.
1552 sub_compact->builder->Add(kv.first.Encode(), kv.second);
1553 InternalKey smallest_candidate = std::move(kv.first);
1554 if (lower_bound != nullptr &&
1555 ucmp->Compare(smallest_candidate.user_key(), *lower_bound) <= 0) {
1556 // Pretend the smallest key has the same user key as lower_bound
1557 // (the max key in the previous table or subcompaction) in order for
1558 // files to appear key-space partitioned.
1559 //
1560 // When lower_bound is chosen by a subcompaction, we know that
1561 // subcompactions over smaller keys cannot contain any keys at
1562 // lower_bound. We also know that smaller subcompactions exist, because
1563 // otherwise the subcompaction woud be unbounded on the left. As a
1564 // result, we know that no other files on the output level will contain
1565 // actual keys at lower_bound (an output file may have a largest key of
1566 // lower_bound@kMaxSequenceNumber, but this only indicates a large range
1567 // tombstone was truncated). Therefore, it is safe to use the
1568 // tombstone's sequence number, to ensure that keys at lower_bound at
1569 // lower levels are covered by truncated tombstones.
1570 //
1571 // If lower_bound was chosen by the smallest data key in the file,
1572 // choose lowest seqnum so this file's smallest internal key comes after
1573 // the previous file's largest. The fake seqnum is OK because the read
1574 // path's file-picking code only considers user key.
1575 smallest_candidate = InternalKey(
1576 *lower_bound, lower_bound_from_sub_compact ? tombstone.seq_ : 0,
1577 kTypeRangeDeletion);
1578 }
1579 InternalKey largest_candidate = tombstone.SerializeEndKey();
1580 if (upper_bound != nullptr &&
1581 ucmp->Compare(*upper_bound, largest_candidate.user_key()) <= 0) {
1582 // Pretend the largest key has the same user key as upper_bound (the
1583 // min key in the following table or subcompaction) in order for files
1584 // to appear key-space partitioned.
1585 //
1586 // Choose highest seqnum so this file's largest internal key comes
1587 // before the next file's/subcompaction's smallest. The fake seqnum is
1588 // OK because the read path's file-picking code only considers the user
1589 // key portion.
1590 //
1591 // Note Seek() also creates InternalKey with (user_key,
1592 // kMaxSequenceNumber), but with kTypeDeletion (0x7) instead of
1593 // kTypeRangeDeletion (0xF), so the range tombstone comes before the
1594 // Seek() key in InternalKey's ordering. So Seek() will look in the
1595 // next file for the user key.
1596 largest_candidate =
1597 InternalKey(*upper_bound, kMaxSequenceNumber, kTypeRangeDeletion);
1598 }
1599 #ifndef NDEBUG
1600 SequenceNumber smallest_ikey_seqnum = kMaxSequenceNumber;
1601 if (meta->smallest.size() > 0) {
1602 smallest_ikey_seqnum = GetInternalKeySeqno(meta->smallest.Encode());
1603 }
1604 #endif
1605 meta->UpdateBoundariesForRange(smallest_candidate, largest_candidate,
1606 tombstone.seq_,
1607 cfd->internal_comparator());
1608
1609 // The smallest key in a file is used for range tombstone truncation, so
1610 // it cannot have a seqnum of 0 (unless the smallest data key in a file
1611 // has a seqnum of 0). Otherwise, the truncated tombstone may expose
1612 // deleted keys at lower levels.
1613 assert(smallest_ikey_seqnum == 0 ||
1614 ExtractInternalKeyFooter(meta->smallest.Encode()) !=
1615 PackSequenceAndType(0, kTypeRangeDeletion));
1616 }
1617 }
1618 const uint64_t current_entries = sub_compact->builder->NumEntries();
1619 if (s.ok()) {
1620 s = sub_compact->builder->Finish();
1621 } else {
1622 sub_compact->builder->Abandon();
1623 }
1624 IOStatus io_s = sub_compact->builder->io_status();
1625 if (s.ok()) {
1626 s = io_s;
1627 }
1628 const uint64_t current_bytes = sub_compact->builder->FileSize();
1629 if (s.ok()) {
1630 meta->fd.file_size = current_bytes;
1631 meta->marked_for_compaction = sub_compact->builder->NeedCompact();
1632 }
1633 sub_compact->current_output()->finished = true;
1634 sub_compact->total_bytes += current_bytes;
1635
1636 // Finish and check for file errors
1637 if (s.ok()) {
1638 StopWatch sw(db_options_.clock, stats_, COMPACTION_OUTFILE_SYNC_MICROS);
1639 io_s = sub_compact->outfile->Sync(db_options_.use_fsync);
1640 }
1641 if (s.ok() && io_s.ok()) {
1642 io_s = sub_compact->outfile->Close();
1643 }
1644 if (s.ok() && io_s.ok()) {
1645 // Add the checksum information to file metadata.
1646 meta->file_checksum = sub_compact->outfile->GetFileChecksum();
1647 meta->file_checksum_func_name =
1648 sub_compact->outfile->GetFileChecksumFuncName();
1649 file_checksum = meta->file_checksum;
1650 file_checksum_func_name = meta->file_checksum_func_name;
1651 }
1652 if (s.ok()) {
1653 s = io_s;
1654 }
1655 if (sub_compact->io_status.ok()) {
1656 sub_compact->io_status = io_s;
1657 // Since this error is really a copy of the
1658 // "normal" status, it does not also need to be checked
1659 sub_compact->io_status.PermitUncheckedError();
1660 }
1661 sub_compact->outfile.reset();
1662
1663 TableProperties tp;
1664 if (s.ok()) {
1665 tp = sub_compact->builder->GetTableProperties();
1666 }
1667
1668 if (s.ok() && current_entries == 0 && tp.num_range_deletions == 0) {
1669 // If there is nothing to output, no necessary to generate a sst file.
1670 // This happens when the output level is bottom level, at the same time
1671 // the sub_compact output nothing.
1672 std::string fname =
1673 TableFileName(sub_compact->compaction->immutable_cf_options()->cf_paths,
1674 meta->fd.GetNumber(), meta->fd.GetPathId());
1675
1676 // TODO(AR) it is not clear if there are any larger implications if
1677 // DeleteFile fails here
1678 Status ds = env_->DeleteFile(fname);
1679 if (!ds.ok()) {
1680 ROCKS_LOG_WARN(
1681 db_options_.info_log,
1682 "[%s] [JOB %d] Unable to remove SST file for table #%" PRIu64
1683 " at bottom level%s",
1684 cfd->GetName().c_str(), job_id_, output_number,
1685 meta->marked_for_compaction ? " (need compaction)" : "");
1686 }
1687
1688 // Also need to remove the file from outputs, or it will be added to the
1689 // VersionEdit.
1690 assert(!sub_compact->outputs.empty());
1691 sub_compact->outputs.pop_back();
1692 meta = nullptr;
1693 }
1694
1695 if (s.ok() && (current_entries > 0 || tp.num_range_deletions > 0)) {
1696 // Output to event logger and fire events.
1697 sub_compact->current_output()->table_properties =
1698 std::make_shared<TableProperties>(tp);
1699 ROCKS_LOG_INFO(db_options_.info_log,
1700 "[%s] [JOB %d] Generated table #%" PRIu64 ": %" PRIu64
1701 " keys, %" PRIu64 " bytes%s",
1702 cfd->GetName().c_str(), job_id_, output_number,
1703 current_entries, current_bytes,
1704 meta->marked_for_compaction ? " (need compaction)" : "");
1705 }
1706 std::string fname;
1707 FileDescriptor output_fd;
1708 uint64_t oldest_blob_file_number = kInvalidBlobFileNumber;
1709 if (meta != nullptr) {
1710 fname = GetTableFileName(meta->fd.GetNumber());
1711 output_fd = meta->fd;
1712 oldest_blob_file_number = meta->oldest_blob_file_number;
1713 } else {
1714 fname = "(nil)";
1715 }
1716 EventHelpers::LogAndNotifyTableFileCreationFinished(
1717 event_logger_, cfd->ioptions()->listeners, dbname_, cfd->GetName(), fname,
1718 job_id_, output_fd, oldest_blob_file_number, tp,
1719 TableFileCreationReason::kCompaction, s, file_checksum,
1720 file_checksum_func_name);
1721
1722 #ifndef ROCKSDB_LITE
1723 // Report new file to SstFileManagerImpl
1724 auto sfm =
1725 static_cast<SstFileManagerImpl*>(db_options_.sst_file_manager.get());
1726 if (sfm && meta != nullptr && meta->fd.GetPathId() == 0) {
1727 Status add_s = sfm->OnAddFile(fname);
1728 if (!add_s.ok() && s.ok()) {
1729 s = add_s;
1730 }
1731 if (sfm->IsMaxAllowedSpaceReached()) {
1732 // TODO(ajkr): should we return OK() if max space was reached by the final
1733 // compaction output file (similarly to how flush works when full)?
1734 s = Status::SpaceLimit("Max allowed space was reached");
1735 TEST_SYNC_POINT(
1736 "CompactionJob::FinishCompactionOutputFile:"
1737 "MaxAllowedSpaceReached");
1738 InstrumentedMutexLock l(db_mutex_);
1739 db_error_handler_->SetBGError(s, BackgroundErrorReason::kCompaction);
1740 }
1741 }
1742 #endif
1743
1744 sub_compact->builder.reset();
1745 sub_compact->current_output_file_size = 0;
1746 return s;
1747 }
1748
InstallCompactionResults(const MutableCFOptions & mutable_cf_options)1749 Status CompactionJob::InstallCompactionResults(
1750 const MutableCFOptions& mutable_cf_options) {
1751 assert(compact_);
1752
1753 db_mutex_->AssertHeld();
1754
1755 auto* compaction = compact_->compaction;
1756 assert(compaction);
1757
1758 // paranoia: verify that the files that we started with
1759 // still exist in the current version and in the same original level.
1760 // This ensures that a concurrent compaction did not erroneously
1761 // pick the same files to compact_.
1762 if (!versions_->VerifyCompactionFileConsistency(compaction)) {
1763 Compaction::InputLevelSummaryBuffer inputs_summary;
1764
1765 ROCKS_LOG_ERROR(db_options_.info_log, "[%s] [JOB %d] Compaction %s aborted",
1766 compaction->column_family_data()->GetName().c_str(),
1767 job_id_, compaction->InputLevelSummary(&inputs_summary));
1768 return Status::Corruption("Compaction input files inconsistent");
1769 }
1770
1771 {
1772 Compaction::InputLevelSummaryBuffer inputs_summary;
1773 ROCKS_LOG_INFO(db_options_.info_log,
1774 "[%s] [JOB %d] Compacted %s => %" PRIu64 " bytes",
1775 compaction->column_family_data()->GetName().c_str(), job_id_,
1776 compaction->InputLevelSummary(&inputs_summary),
1777 compact_->total_bytes + compact_->total_blob_bytes);
1778 }
1779
1780 VersionEdit* const edit = compaction->edit();
1781 assert(edit);
1782
1783 // Add compaction inputs
1784 compaction->AddInputDeletions(edit);
1785
1786 for (const auto& sub_compact : compact_->sub_compact_states) {
1787 for (const auto& out : sub_compact.outputs) {
1788 edit->AddFile(compaction->output_level(), out.meta);
1789 }
1790
1791 for (const auto& blob : sub_compact.blob_file_additions) {
1792 edit->AddBlobFile(blob);
1793 }
1794 }
1795
1796 return versions_->LogAndApply(compaction->column_family_data(),
1797 mutable_cf_options, edit, db_mutex_,
1798 db_directory_);
1799 }
1800
RecordCompactionIOStats()1801 void CompactionJob::RecordCompactionIOStats() {
1802 RecordTick(stats_, COMPACT_READ_BYTES, IOSTATS(bytes_read));
1803 RecordTick(stats_, COMPACT_WRITE_BYTES, IOSTATS(bytes_written));
1804 CompactionReason compaction_reason =
1805 compact_->compaction->compaction_reason();
1806 if (compaction_reason == CompactionReason::kFilesMarkedForCompaction) {
1807 RecordTick(stats_, COMPACT_READ_BYTES_MARKED, IOSTATS(bytes_read));
1808 RecordTick(stats_, COMPACT_WRITE_BYTES_MARKED, IOSTATS(bytes_written));
1809 } else if (compaction_reason == CompactionReason::kPeriodicCompaction) {
1810 RecordTick(stats_, COMPACT_READ_BYTES_PERIODIC, IOSTATS(bytes_read));
1811 RecordTick(stats_, COMPACT_WRITE_BYTES_PERIODIC, IOSTATS(bytes_written));
1812 } else if (compaction_reason == CompactionReason::kTtl) {
1813 RecordTick(stats_, COMPACT_READ_BYTES_TTL, IOSTATS(bytes_read));
1814 RecordTick(stats_, COMPACT_WRITE_BYTES_TTL, IOSTATS(bytes_written));
1815 }
1816 ThreadStatusUtil::IncreaseThreadOperationProperty(
1817 ThreadStatus::COMPACTION_BYTES_READ, IOSTATS(bytes_read));
1818 IOSTATS_RESET(bytes_read);
1819 ThreadStatusUtil::IncreaseThreadOperationProperty(
1820 ThreadStatus::COMPACTION_BYTES_WRITTEN, IOSTATS(bytes_written));
1821 IOSTATS_RESET(bytes_written);
1822 }
1823
OpenCompactionOutputFile(SubcompactionState * sub_compact)1824 Status CompactionJob::OpenCompactionOutputFile(
1825 SubcompactionState* sub_compact) {
1826 assert(sub_compact != nullptr);
1827 assert(sub_compact->builder == nullptr);
1828 // no need to lock because VersionSet::next_file_number_ is atomic
1829 uint64_t file_number = versions_->NewFileNumber();
1830 std::string fname = GetTableFileName(file_number);
1831 // Fire events.
1832 ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
1833 #ifndef ROCKSDB_LITE
1834 EventHelpers::NotifyTableFileCreationStarted(
1835 cfd->ioptions()->listeners, dbname_, cfd->GetName(), fname, job_id_,
1836 TableFileCreationReason::kCompaction);
1837 #endif // !ROCKSDB_LITE
1838 // Make the output file
1839 std::unique_ptr<FSWritableFile> writable_file;
1840 #ifndef NDEBUG
1841 bool syncpoint_arg = file_options_.use_direct_writes;
1842 TEST_SYNC_POINT_CALLBACK("CompactionJob::OpenCompactionOutputFile",
1843 &syncpoint_arg);
1844 #endif
1845
1846 // Pass temperature of botommost files to FileSystem.
1847 FileOptions fo_copy = file_options_;
1848 Temperature temperature = Temperature::kUnknown;
1849 if (bottommost_level_) {
1850 fo_copy.temperature = temperature =
1851 sub_compact->compaction->mutable_cf_options()->bottommost_temperature;
1852 }
1853
1854 Status s;
1855 IOStatus io_s = NewWritableFile(fs_.get(), fname, &writable_file, fo_copy);
1856 s = io_s;
1857 if (sub_compact->io_status.ok()) {
1858 sub_compact->io_status = io_s;
1859 // Since this error is really a copy of the io_s that is checked below as s,
1860 // it does not also need to be checked.
1861 sub_compact->io_status.PermitUncheckedError();
1862 }
1863 if (!s.ok()) {
1864 ROCKS_LOG_ERROR(
1865 db_options_.info_log,
1866 "[%s] [JOB %d] OpenCompactionOutputFiles for table #%" PRIu64
1867 " fails at NewWritableFile with status %s",
1868 sub_compact->compaction->column_family_data()->GetName().c_str(),
1869 job_id_, file_number, s.ToString().c_str());
1870 LogFlush(db_options_.info_log);
1871 EventHelpers::LogAndNotifyTableFileCreationFinished(
1872 event_logger_, cfd->ioptions()->listeners, dbname_, cfd->GetName(),
1873 fname, job_id_, FileDescriptor(), kInvalidBlobFileNumber,
1874 TableProperties(), TableFileCreationReason::kCompaction, s,
1875 kUnknownFileChecksum, kUnknownFileChecksumFuncName);
1876 return s;
1877 }
1878
1879 // Try to figure out the output file's oldest ancester time.
1880 int64_t temp_current_time = 0;
1881 auto get_time_status = db_options_.clock->GetCurrentTime(&temp_current_time);
1882 // Safe to proceed even if GetCurrentTime fails. So, log and proceed.
1883 if (!get_time_status.ok()) {
1884 ROCKS_LOG_WARN(db_options_.info_log,
1885 "Failed to get current time. Status: %s",
1886 get_time_status.ToString().c_str());
1887 }
1888 uint64_t current_time = static_cast<uint64_t>(temp_current_time);
1889 uint64_t oldest_ancester_time =
1890 sub_compact->compaction->MinInputFileOldestAncesterTime();
1891 if (oldest_ancester_time == port::kMaxUint64) {
1892 oldest_ancester_time = current_time;
1893 }
1894
1895 // Initialize a SubcompactionState::Output and add it to sub_compact->outputs
1896 {
1897 FileMetaData meta;
1898 meta.fd = FileDescriptor(file_number,
1899 sub_compact->compaction->output_path_id(), 0);
1900 meta.oldest_ancester_time = oldest_ancester_time;
1901 meta.file_creation_time = current_time;
1902 meta.temperature = temperature;
1903 sub_compact->outputs.emplace_back(
1904 std::move(meta), cfd->internal_comparator(),
1905 /*enable_order_check=*/
1906 sub_compact->compaction->mutable_cf_options()
1907 ->check_flush_compaction_key_order,
1908 /*enable_hash=*/paranoid_file_checks_);
1909 }
1910
1911 writable_file->SetIOPriority(Env::IOPriority::IO_LOW);
1912 writable_file->SetWriteLifeTimeHint(write_hint_);
1913 FileTypeSet tmp_set = db_options_.checksum_handoff_file_types;
1914 writable_file->SetPreallocationBlockSize(static_cast<size_t>(
1915 sub_compact->compaction->OutputFilePreallocationSize()));
1916 const auto& listeners =
1917 sub_compact->compaction->immutable_cf_options()->listeners;
1918 sub_compact->outfile.reset(new WritableFileWriter(
1919 std::move(writable_file), fname, file_options_, db_options_.clock,
1920 io_tracer_, db_options_.stats, listeners,
1921 db_options_.file_checksum_gen_factory.get(),
1922 tmp_set.Contains(FileType::kTableFile)));
1923
1924 TableBuilderOptions tboptions(
1925 *cfd->ioptions(), *(sub_compact->compaction->mutable_cf_options()),
1926 cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(),
1927 sub_compact->compaction->output_compression(),
1928 sub_compact->compaction->output_compression_opts(), cfd->GetID(),
1929 cfd->GetName(), sub_compact->compaction->output_level(),
1930 bottommost_level_, TableFileCreationReason::kCompaction,
1931 oldest_ancester_time, 0 /* oldest_key_time */, current_time, db_id_,
1932 db_session_id_, sub_compact->compaction->max_output_file_size());
1933 sub_compact->builder.reset(
1934 NewTableBuilder(tboptions, sub_compact->outfile.get()));
1935 LogFlush(db_options_.info_log);
1936 return s;
1937 }
1938
CleanupCompaction()1939 void CompactionJob::CleanupCompaction() {
1940 for (SubcompactionState& sub_compact : compact_->sub_compact_states) {
1941 const auto& sub_status = sub_compact.status;
1942
1943 if (sub_compact.builder != nullptr) {
1944 // May happen if we get a shutdown call in the middle of compaction
1945 sub_compact.builder->Abandon();
1946 sub_compact.builder.reset();
1947 } else {
1948 assert(!sub_status.ok() || sub_compact.outfile == nullptr);
1949 }
1950 for (const auto& out : sub_compact.outputs) {
1951 // If this file was inserted into the table cache then remove
1952 // them here because this compaction was not committed.
1953 if (!sub_status.ok()) {
1954 TableCache::Evict(table_cache_.get(), out.meta.fd.GetNumber());
1955 }
1956 }
1957 // TODO: sub_compact.io_status is not checked like status. Not sure if thats
1958 // intentional. So ignoring the io_status as of now.
1959 sub_compact.io_status.PermitUncheckedError();
1960 }
1961 delete compact_;
1962 compact_ = nullptr;
1963 }
1964
1965 #ifndef ROCKSDB_LITE
1966 namespace {
CopyPrefix(const Slice & src,size_t prefix_length,std::string * dst)1967 void CopyPrefix(const Slice& src, size_t prefix_length, std::string* dst) {
1968 assert(prefix_length > 0);
1969 size_t length = src.size() > prefix_length ? prefix_length : src.size();
1970 dst->assign(src.data(), length);
1971 }
1972 } // namespace
1973
1974 #endif // !ROCKSDB_LITE
1975
UpdateCompactionStats()1976 void CompactionJob::UpdateCompactionStats() {
1977 assert(compact_);
1978
1979 Compaction* compaction = compact_->compaction;
1980 compaction_stats_.num_input_files_in_non_output_levels = 0;
1981 compaction_stats_.num_input_files_in_output_level = 0;
1982 for (int input_level = 0;
1983 input_level < static_cast<int>(compaction->num_input_levels());
1984 ++input_level) {
1985 if (compaction->level(input_level) != compaction->output_level()) {
1986 UpdateCompactionInputStatsHelper(
1987 &compaction_stats_.num_input_files_in_non_output_levels,
1988 &compaction_stats_.bytes_read_non_output_levels, input_level);
1989 } else {
1990 UpdateCompactionInputStatsHelper(
1991 &compaction_stats_.num_input_files_in_output_level,
1992 &compaction_stats_.bytes_read_output_level, input_level);
1993 }
1994 }
1995
1996 assert(compaction_job_stats_);
1997 compaction_stats_.bytes_read_blob =
1998 compaction_job_stats_->total_blob_bytes_read;
1999
2000 compaction_stats_.num_output_files =
2001 static_cast<int>(compact_->num_output_files);
2002 compaction_stats_.num_output_files_blob =
2003 static_cast<int>(compact_->num_blob_output_files);
2004 compaction_stats_.bytes_written = compact_->total_bytes;
2005 compaction_stats_.bytes_written_blob = compact_->total_blob_bytes;
2006
2007 if (compaction_stats_.num_input_records > compact_->num_output_records) {
2008 compaction_stats_.num_dropped_records =
2009 compaction_stats_.num_input_records - compact_->num_output_records;
2010 }
2011 }
2012
UpdateCompactionInputStatsHelper(int * num_files,uint64_t * bytes_read,int input_level)2013 void CompactionJob::UpdateCompactionInputStatsHelper(int* num_files,
2014 uint64_t* bytes_read,
2015 int input_level) {
2016 const Compaction* compaction = compact_->compaction;
2017 auto num_input_files = compaction->num_input_files(input_level);
2018 *num_files += static_cast<int>(num_input_files);
2019
2020 for (size_t i = 0; i < num_input_files; ++i) {
2021 const auto* file_meta = compaction->input(input_level, i);
2022 *bytes_read += file_meta->fd.GetFileSize();
2023 compaction_stats_.num_input_records +=
2024 static_cast<uint64_t>(file_meta->num_entries);
2025 }
2026 }
2027
UpdateCompactionJobStats(const InternalStats::CompactionStats & stats) const2028 void CompactionJob::UpdateCompactionJobStats(
2029 const InternalStats::CompactionStats& stats) const {
2030 #ifndef ROCKSDB_LITE
2031 compaction_job_stats_->elapsed_micros = stats.micros;
2032
2033 // input information
2034 compaction_job_stats_->total_input_bytes =
2035 stats.bytes_read_non_output_levels + stats.bytes_read_output_level;
2036 compaction_job_stats_->num_input_records = stats.num_input_records;
2037 compaction_job_stats_->num_input_files =
2038 stats.num_input_files_in_non_output_levels +
2039 stats.num_input_files_in_output_level;
2040 compaction_job_stats_->num_input_files_at_output_level =
2041 stats.num_input_files_in_output_level;
2042
2043 // output information
2044 compaction_job_stats_->total_output_bytes = stats.bytes_written;
2045 compaction_job_stats_->total_output_bytes_blob = stats.bytes_written_blob;
2046 compaction_job_stats_->num_output_records = compact_->num_output_records;
2047 compaction_job_stats_->num_output_files = stats.num_output_files;
2048 compaction_job_stats_->num_output_files_blob = stats.num_output_files_blob;
2049
2050 if (stats.num_output_files > 0) {
2051 CopyPrefix(compact_->SmallestUserKey(),
2052 CompactionJobStats::kMaxPrefixLength,
2053 &compaction_job_stats_->smallest_output_key_prefix);
2054 CopyPrefix(compact_->LargestUserKey(), CompactionJobStats::kMaxPrefixLength,
2055 &compaction_job_stats_->largest_output_key_prefix);
2056 }
2057 #else
2058 (void)stats;
2059 #endif // !ROCKSDB_LITE
2060 }
2061
LogCompaction()2062 void CompactionJob::LogCompaction() {
2063 Compaction* compaction = compact_->compaction;
2064 ColumnFamilyData* cfd = compaction->column_family_data();
2065
2066 // Let's check if anything will get logged. Don't prepare all the info if
2067 // we're not logging
2068 if (db_options_.info_log_level <= InfoLogLevel::INFO_LEVEL) {
2069 Compaction::InputLevelSummaryBuffer inputs_summary;
2070 ROCKS_LOG_INFO(
2071 db_options_.info_log, "[%s] [JOB %d] Compacting %s, score %.2f",
2072 cfd->GetName().c_str(), job_id_,
2073 compaction->InputLevelSummary(&inputs_summary), compaction->score());
2074 char scratch[2345];
2075 compaction->Summary(scratch, sizeof(scratch));
2076 ROCKS_LOG_INFO(db_options_.info_log, "[%s] Compaction start summary: %s\n",
2077 cfd->GetName().c_str(), scratch);
2078 // build event logger report
2079 auto stream = event_logger_->Log();
2080 stream << "job" << job_id_ << "event"
2081 << "compaction_started"
2082 << "compaction_reason"
2083 << GetCompactionReasonString(compaction->compaction_reason());
2084 for (size_t i = 0; i < compaction->num_input_levels(); ++i) {
2085 stream << ("files_L" + ToString(compaction->level(i)));
2086 stream.StartArray();
2087 for (auto f : *compaction->inputs(i)) {
2088 stream << f->fd.GetNumber();
2089 }
2090 stream.EndArray();
2091 }
2092 stream << "score" << compaction->score() << "input_data_size"
2093 << compaction->CalculateTotalInputSize();
2094 }
2095 }
2096
GetTableFileName(uint64_t file_number)2097 std::string CompactionJob::GetTableFileName(uint64_t file_number) {
2098 return TableFileName(compact_->compaction->immutable_cf_options()->cf_paths,
2099 file_number, compact_->compaction->output_path_id());
2100 }
2101
2102 #ifndef ROCKSDB_LITE
GetTableFileName(uint64_t file_number)2103 std::string CompactionServiceCompactionJob::GetTableFileName(
2104 uint64_t file_number) {
2105 return MakeTableFileName(output_path_, file_number);
2106 }
2107
CompactionServiceCompactionJob(int job_id,Compaction * compaction,const ImmutableDBOptions & db_options,const MutableDBOptions & mutable_db_options,const FileOptions & file_options,VersionSet * versions,const std::atomic<bool> * shutting_down,LogBuffer * log_buffer,FSDirectory * output_directory,Statistics * stats,InstrumentedMutex * db_mutex,ErrorHandler * db_error_handler,std::vector<SequenceNumber> existing_snapshots,std::shared_ptr<Cache> table_cache,EventLogger * event_logger,const std::string & dbname,const std::shared_ptr<IOTracer> & io_tracer,const std::string & db_id,const std::string & db_session_id,const std::string & output_path,const CompactionServiceInput & compaction_service_input,CompactionServiceResult * compaction_service_result)2108 CompactionServiceCompactionJob::CompactionServiceCompactionJob(
2109 int job_id, Compaction* compaction, const ImmutableDBOptions& db_options,
2110 const MutableDBOptions& mutable_db_options, const FileOptions& file_options,
2111 VersionSet* versions, const std::atomic<bool>* shutting_down,
2112 LogBuffer* log_buffer, FSDirectory* output_directory, Statistics* stats,
2113 InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler,
2114 std::vector<SequenceNumber> existing_snapshots,
2115 std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
2116 const std::string& dbname, const std::shared_ptr<IOTracer>& io_tracer,
2117 const std::string& db_id, const std::string& db_session_id,
2118 const std::string& output_path,
2119 const CompactionServiceInput& compaction_service_input,
2120 CompactionServiceResult* compaction_service_result)
2121 : CompactionJob(
2122 job_id, compaction, db_options, mutable_db_options, file_options,
2123 versions, shutting_down, 0, log_buffer, nullptr, output_directory,
2124 nullptr, stats, db_mutex, db_error_handler, existing_snapshots,
2125 kMaxSequenceNumber, nullptr, table_cache, event_logger,
2126 compaction->mutable_cf_options()->paranoid_file_checks,
2127 compaction->mutable_cf_options()->report_bg_io_stats, dbname,
2128 &(compaction_service_result->stats), Env::Priority::USER, io_tracer,
2129 nullptr, db_id, db_session_id,
2130 compaction->column_family_data()->GetFullHistoryTsLow()),
2131 output_path_(output_path),
2132 compaction_input_(compaction_service_input),
2133 compaction_result_(compaction_service_result) {}
2134
Run()2135 Status CompactionServiceCompactionJob::Run() {
2136 AutoThreadOperationStageUpdater stage_updater(
2137 ThreadStatus::STAGE_COMPACTION_RUN);
2138
2139 auto* c = compact_->compaction;
2140 assert(c->column_family_data() != nullptr);
2141 assert(c->column_family_data()->current()->storage_info()->NumLevelFiles(
2142 compact_->compaction->level()) > 0);
2143
2144 write_hint_ =
2145 c->column_family_data()->CalculateSSTWriteHint(c->output_level());
2146 bottommost_level_ = c->bottommost_level();
2147
2148 Slice begin = compaction_input_.begin;
2149 Slice end = compaction_input_.end;
2150 compact_->sub_compact_states.emplace_back(
2151 c, compaction_input_.has_begin ? &begin : nullptr,
2152 compaction_input_.has_end ? &end : nullptr,
2153 compaction_input_.approx_size);
2154
2155 log_buffer_->FlushBufferToLog();
2156 LogCompaction();
2157 const uint64_t start_micros = db_options_.clock->NowMicros();
2158 // Pick the only sub-compaction we should have
2159 assert(compact_->sub_compact_states.size() == 1);
2160 SubcompactionState* sub_compact = compact_->sub_compact_states.data();
2161
2162 ProcessKeyValueCompaction(sub_compact);
2163
2164 compaction_stats_.micros = db_options_.clock->NowMicros() - start_micros;
2165 compaction_stats_.cpu_micros = sub_compact->compaction_job_stats.cpu_micros;
2166
2167 RecordTimeToHistogram(stats_, COMPACTION_TIME, compaction_stats_.micros);
2168 RecordTimeToHistogram(stats_, COMPACTION_CPU_TIME,
2169 compaction_stats_.cpu_micros);
2170
2171 Status status = sub_compact->status;
2172 IOStatus io_s = sub_compact->io_status;
2173
2174 if (io_status_.ok()) {
2175 io_status_ = io_s;
2176 }
2177
2178 if (status.ok()) {
2179 constexpr IODebugContext* dbg = nullptr;
2180
2181 if (output_directory_) {
2182 io_s = output_directory_->Fsync(IOOptions(), dbg);
2183 }
2184 }
2185 if (io_status_.ok()) {
2186 io_status_ = io_s;
2187 }
2188 if (status.ok()) {
2189 status = io_s;
2190 }
2191 if (status.ok()) {
2192 // TODO: Add verify_table() and VerifyCompactionFileConsistency()
2193 }
2194
2195 // Finish up all book-keeping to unify the subcompaction results
2196 AggregateStatistics();
2197 UpdateCompactionStats();
2198
2199 compaction_result_->bytes_written = IOSTATS(bytes_written);
2200 compaction_result_->bytes_read = IOSTATS(bytes_read);
2201 RecordCompactionIOStats();
2202
2203 LogFlush(db_options_.info_log);
2204 compact_->status = status;
2205 compact_->status.PermitUncheckedError();
2206
2207 // Build compaction result
2208 compaction_result_->output_level = compact_->compaction->output_level();
2209 compaction_result_->output_path = output_path_;
2210 for (const auto& output_file : sub_compact->outputs) {
2211 auto& meta = output_file.meta;
2212 compaction_result_->output_files.emplace_back(
2213 MakeTableFileName(meta.fd.GetNumber()), meta.fd.smallest_seqno,
2214 meta.fd.largest_seqno, meta.smallest.Encode().ToString(),
2215 meta.largest.Encode().ToString(), meta.oldest_ancester_time,
2216 meta.file_creation_time, output_file.validator.GetHash(),
2217 meta.marked_for_compaction);
2218 }
2219 compaction_result_->num_output_records = sub_compact->num_output_records;
2220 compaction_result_->total_bytes = sub_compact->total_bytes;
2221
2222 return status;
2223 }
2224
CleanupCompaction()2225 void CompactionServiceCompactionJob::CleanupCompaction() {
2226 CompactionJob::CleanupCompaction();
2227 }
2228
2229 // Internal binary format for the input and result data
2230 enum BinaryFormatVersion : uint32_t {
2231 kOptionsString = 1, // Use string format similar to Option string format
2232 };
2233
2234 // offset_of is used to get the offset of a class data member
2235 // ex: offset_of(&ColumnFamilyDescriptor::options)
2236 // This call will return the offset of options in ColumnFamilyDescriptor class
2237 //
2238 // This is the same as offsetof() but allow us to work with non standard-layout
2239 // classes and structures
2240 // refs:
2241 // http://en.cppreference.com/w/cpp/concept/StandardLayoutType
2242 // https://gist.github.com/graphitemaster/494f21190bb2c63c5516
2243 static ColumnFamilyDescriptor dummy_cfd("", ColumnFamilyOptions());
2244 template <typename T1>
offset_of(T1 ColumnFamilyDescriptor::* member)2245 int offset_of(T1 ColumnFamilyDescriptor::*member) {
2246 return int(size_t(&(dummy_cfd.*member)) - size_t(&dummy_cfd));
2247 }
2248
2249 static CompactionServiceInput dummy_cs_input;
2250 template <typename T1>
offset_of(T1 CompactionServiceInput::* member)2251 int offset_of(T1 CompactionServiceInput::*member) {
2252 return int(size_t(&(dummy_cs_input.*member)) - size_t(&dummy_cs_input));
2253 }
2254
2255 static std::unordered_map<std::string, OptionTypeInfo> cfd_type_info = {
2256 {"name",
2257 {offset_of(&ColumnFamilyDescriptor::name), OptionType::kEncodedString,
2258 OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
2259 {"options",
2260 {offset_of(&ColumnFamilyDescriptor::options), OptionType::kConfigurable,
2261 OptionVerificationType::kNormal, OptionTypeFlags::kNone,
2262 [](const ConfigOptions& opts, const std::string& /*name*/,
__anond592dbd80502() 2263 const std::string& value, void* addr) {
2264 auto cf_options = static_cast<ColumnFamilyOptions*>(addr);
2265 return GetColumnFamilyOptionsFromString(opts, ColumnFamilyOptions(),
2266 value, cf_options);
2267 },
2268 [](const ConfigOptions& opts, const std::string& /*name*/,
__anond592dbd80602() 2269 const void* addr, std::string* value) {
2270 const auto cf_options = static_cast<const ColumnFamilyOptions*>(addr);
2271 std::string result;
2272 auto status =
2273 GetStringFromColumnFamilyOptions(opts, *cf_options, &result);
2274 *value = "{" + result + "}";
2275 return status;
2276 },
2277 [](const ConfigOptions& opts, const std::string& name, const void* addr1,
__anond592dbd80702() 2278 const void* addr2, std::string* mismatch) {
2279 const auto this_one = static_cast<const ColumnFamilyOptions*>(addr1);
2280 const auto that_one = static_cast<const ColumnFamilyOptions*>(addr2);
2281 auto this_conf = CFOptionsAsConfigurable(*this_one);
2282 auto that_conf = CFOptionsAsConfigurable(*that_one);
2283 std::string mismatch_opt;
2284 bool result =
2285 this_conf->AreEquivalent(opts, that_conf.get(), &mismatch_opt);
2286 if (!result) {
2287 *mismatch = name + "." + mismatch_opt;
2288 }
2289 return result;
2290 }}},
2291 };
2292
2293 static std::unordered_map<std::string, OptionTypeInfo> cs_input_type_info = {
2294 {"column_family",
2295 OptionTypeInfo::Struct("column_family", &cfd_type_info,
2296 offset_of(&CompactionServiceInput::column_family),
2297 OptionVerificationType::kNormal,
2298 OptionTypeFlags::kNone)},
2299 {"db_options",
2300 {offset_of(&CompactionServiceInput::db_options), OptionType::kConfigurable,
2301 OptionVerificationType::kNormal, OptionTypeFlags::kNone,
2302 [](const ConfigOptions& opts, const std::string& /*name*/,
__anond592dbd80802() 2303 const std::string& value, void* addr) {
2304 auto options = static_cast<DBOptions*>(addr);
2305 return GetDBOptionsFromString(opts, DBOptions(), value, options);
2306 },
2307 [](const ConfigOptions& opts, const std::string& /*name*/,
__anond592dbd80902() 2308 const void* addr, std::string* value) {
2309 const auto options = static_cast<const DBOptions*>(addr);
2310 std::string result;
2311 auto status = GetStringFromDBOptions(opts, *options, &result);
2312 *value = "{" + result + "}";
2313 return status;
2314 },
2315 [](const ConfigOptions& opts, const std::string& name, const void* addr1,
__anond592dbd80a02() 2316 const void* addr2, std::string* mismatch) {
2317 const auto this_one = static_cast<const DBOptions*>(addr1);
2318 const auto that_one = static_cast<const DBOptions*>(addr2);
2319 auto this_conf = DBOptionsAsConfigurable(*this_one);
2320 auto that_conf = DBOptionsAsConfigurable(*that_one);
2321 std::string mismatch_opt;
2322 bool result =
2323 this_conf->AreEquivalent(opts, that_conf.get(), &mismatch_opt);
2324 if (!result) {
2325 *mismatch = name + "." + mismatch_opt;
2326 }
2327 return result;
2328 }}},
2329 {"snapshots", OptionTypeInfo::Vector<uint64_t>(
2330 offset_of(&CompactionServiceInput::snapshots),
2331 OptionVerificationType::kNormal, OptionTypeFlags::kNone,
2332 {0, OptionType::kUInt64T})},
2333 {"input_files", OptionTypeInfo::Vector<std::string>(
2334 offset_of(&CompactionServiceInput::input_files),
2335 OptionVerificationType::kNormal, OptionTypeFlags::kNone,
2336 {0, OptionType::kEncodedString})},
2337 {"output_level",
2338 {offset_of(&CompactionServiceInput::output_level), OptionType::kInt,
2339 OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
2340 {"has_begin",
2341 {offset_of(&CompactionServiceInput::has_begin), OptionType::kBoolean,
2342 OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
2343 {"begin",
2344 {offset_of(&CompactionServiceInput::begin), OptionType::kEncodedString,
2345 OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
2346 {"has_end",
2347 {offset_of(&CompactionServiceInput::has_end), OptionType::kBoolean,
2348 OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
2349 {"end",
2350 {offset_of(&CompactionServiceInput::end), OptionType::kEncodedString,
2351 OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
2352 {"approx_size",
2353 {offset_of(&CompactionServiceInput::approx_size), OptionType::kUInt64T,
2354 OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
2355 };
2356
2357 static std::unordered_map<std::string, OptionTypeInfo>
2358 cs_output_file_type_info = {
2359 {"file_name",
2360 {offsetof(struct CompactionServiceOutputFile, file_name),
2361 OptionType::kEncodedString, OptionVerificationType::kNormal,
2362 OptionTypeFlags::kNone}},
2363 {"smallest_seqno",
2364 {offsetof(struct CompactionServiceOutputFile, smallest_seqno),
2365 OptionType::kUInt64T, OptionVerificationType::kNormal,
2366 OptionTypeFlags::kNone}},
2367 {"largest_seqno",
2368 {offsetof(struct CompactionServiceOutputFile, largest_seqno),
2369 OptionType::kUInt64T, OptionVerificationType::kNormal,
2370 OptionTypeFlags::kNone}},
2371 {"smallest_internal_key",
2372 {offsetof(struct CompactionServiceOutputFile, smallest_internal_key),
2373 OptionType::kEncodedString, OptionVerificationType::kNormal,
2374 OptionTypeFlags::kNone}},
2375 {"largest_internal_key",
2376 {offsetof(struct CompactionServiceOutputFile, largest_internal_key),
2377 OptionType::kEncodedString, OptionVerificationType::kNormal,
2378 OptionTypeFlags::kNone}},
2379 {"oldest_ancester_time",
2380 {offsetof(struct CompactionServiceOutputFile, oldest_ancester_time),
2381 OptionType::kUInt64T, OptionVerificationType::kNormal,
2382 OptionTypeFlags::kNone}},
2383 {"file_creation_time",
2384 {offsetof(struct CompactionServiceOutputFile, file_creation_time),
2385 OptionType::kUInt64T, OptionVerificationType::kNormal,
2386 OptionTypeFlags::kNone}},
2387 {"paranoid_hash",
2388 {offsetof(struct CompactionServiceOutputFile, paranoid_hash),
2389 OptionType::kUInt64T, OptionVerificationType::kNormal,
2390 OptionTypeFlags::kNone}},
2391 {"marked_for_compaction",
2392 {offsetof(struct CompactionServiceOutputFile, marked_for_compaction),
2393 OptionType::kBoolean, OptionVerificationType::kNormal,
2394 OptionTypeFlags::kNone}},
2395 };
2396
2397 static std::unordered_map<std::string, OptionTypeInfo>
2398 compaction_job_stats_type_info = {
2399 {"elapsed_micros",
2400 {offsetof(struct CompactionJobStats, elapsed_micros),
2401 OptionType::kUInt64T, OptionVerificationType::kNormal,
2402 OptionTypeFlags::kNone}},
2403 {"cpu_micros",
2404 {offsetof(struct CompactionJobStats, cpu_micros), OptionType::kUInt64T,
2405 OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
2406 {"num_input_records",
2407 {offsetof(struct CompactionJobStats, num_input_records),
2408 OptionType::kUInt64T, OptionVerificationType::kNormal,
2409 OptionTypeFlags::kNone}},
2410 {"num_blobs_read",
2411 {offsetof(struct CompactionJobStats, num_blobs_read),
2412 OptionType::kUInt64T, OptionVerificationType::kNormal,
2413 OptionTypeFlags::kNone}},
2414 {"num_input_files",
2415 {offsetof(struct CompactionJobStats, num_input_files),
2416 OptionType::kSizeT, OptionVerificationType::kNormal,
2417 OptionTypeFlags::kNone}},
2418 {"num_input_files_at_output_level",
2419 {offsetof(struct CompactionJobStats, num_input_files_at_output_level),
2420 OptionType::kSizeT, OptionVerificationType::kNormal,
2421 OptionTypeFlags::kNone}},
2422 {"num_output_records",
2423 {offsetof(struct CompactionJobStats, num_output_records),
2424 OptionType::kUInt64T, OptionVerificationType::kNormal,
2425 OptionTypeFlags::kNone}},
2426 {"num_output_files",
2427 {offsetof(struct CompactionJobStats, num_output_files),
2428 OptionType::kSizeT, OptionVerificationType::kNormal,
2429 OptionTypeFlags::kNone}},
2430 {"num_output_files_blob",
2431 {offsetof(struct CompactionJobStats, num_output_files_blob),
2432 OptionType::kSizeT, OptionVerificationType::kNormal,
2433 OptionTypeFlags::kNone}},
2434 {"is_full_compaction",
2435 {offsetof(struct CompactionJobStats, is_full_compaction),
2436 OptionType::kBoolean, OptionVerificationType::kNormal,
2437 OptionTypeFlags::kNone}},
2438 {"is_manual_compaction",
2439 {offsetof(struct CompactionJobStats, is_manual_compaction),
2440 OptionType::kBoolean, OptionVerificationType::kNormal,
2441 OptionTypeFlags::kNone}},
2442 {"total_input_bytes",
2443 {offsetof(struct CompactionJobStats, total_input_bytes),
2444 OptionType::kUInt64T, OptionVerificationType::kNormal,
2445 OptionTypeFlags::kNone}},
2446 {"total_blob_bytes_read",
2447 {offsetof(struct CompactionJobStats, total_blob_bytes_read),
2448 OptionType::kUInt64T, OptionVerificationType::kNormal,
2449 OptionTypeFlags::kNone}},
2450 {"total_output_bytes",
2451 {offsetof(struct CompactionJobStats, total_output_bytes),
2452 OptionType::kUInt64T, OptionVerificationType::kNormal,
2453 OptionTypeFlags::kNone}},
2454 {"total_output_bytes_blob",
2455 {offsetof(struct CompactionJobStats, total_output_bytes_blob),
2456 OptionType::kUInt64T, OptionVerificationType::kNormal,
2457 OptionTypeFlags::kNone}},
2458 {"num_records_replaced",
2459 {offsetof(struct CompactionJobStats, num_records_replaced),
2460 OptionType::kUInt64T, OptionVerificationType::kNormal,
2461 OptionTypeFlags::kNone}},
2462 {"total_input_raw_key_bytes",
2463 {offsetof(struct CompactionJobStats, total_input_raw_key_bytes),
2464 OptionType::kUInt64T, OptionVerificationType::kNormal,
2465 OptionTypeFlags::kNone}},
2466 {"total_input_raw_value_bytes",
2467 {offsetof(struct CompactionJobStats, total_input_raw_value_bytes),
2468 OptionType::kUInt64T, OptionVerificationType::kNormal,
2469 OptionTypeFlags::kNone}},
2470 {"num_input_deletion_records",
2471 {offsetof(struct CompactionJobStats, num_input_deletion_records),
2472 OptionType::kUInt64T, OptionVerificationType::kNormal,
2473 OptionTypeFlags::kNone}},
2474 {"num_expired_deletion_records",
2475 {offsetof(struct CompactionJobStats, num_expired_deletion_records),
2476 OptionType::kUInt64T, OptionVerificationType::kNormal,
2477 OptionTypeFlags::kNone}},
2478 {"num_corrupt_keys",
2479 {offsetof(struct CompactionJobStats, num_corrupt_keys),
2480 OptionType::kUInt64T, OptionVerificationType::kNormal,
2481 OptionTypeFlags::kNone}},
2482 {"file_write_nanos",
2483 {offsetof(struct CompactionJobStats, file_write_nanos),
2484 OptionType::kUInt64T, OptionVerificationType::kNormal,
2485 OptionTypeFlags::kNone}},
2486 {"file_range_sync_nanos",
2487 {offsetof(struct CompactionJobStats, file_range_sync_nanos),
2488 OptionType::kUInt64T, OptionVerificationType::kNormal,
2489 OptionTypeFlags::kNone}},
2490 {"file_fsync_nanos",
2491 {offsetof(struct CompactionJobStats, file_fsync_nanos),
2492 OptionType::kUInt64T, OptionVerificationType::kNormal,
2493 OptionTypeFlags::kNone}},
2494 {"file_prepare_write_nanos",
2495 {offsetof(struct CompactionJobStats, file_prepare_write_nanos),
2496 OptionType::kUInt64T, OptionVerificationType::kNormal,
2497 OptionTypeFlags::kNone}},
2498 {"smallest_output_key_prefix",
2499 {offsetof(struct CompactionJobStats, smallest_output_key_prefix),
2500 OptionType::kEncodedString, OptionVerificationType::kNormal,
2501 OptionTypeFlags::kNone}},
2502 {"largest_output_key_prefix",
2503 {offsetof(struct CompactionJobStats, largest_output_key_prefix),
2504 OptionType::kEncodedString, OptionVerificationType::kNormal,
2505 OptionTypeFlags::kNone}},
2506 {"num_single_del_fallthru",
2507 {offsetof(struct CompactionJobStats, num_single_del_fallthru),
2508 OptionType::kUInt64T, OptionVerificationType::kNormal,
2509 OptionTypeFlags::kNone}},
2510 {"num_single_del_mismatch",
2511 {offsetof(struct CompactionJobStats, num_single_del_mismatch),
2512 OptionType::kUInt64T, OptionVerificationType::kNormal,
2513 OptionTypeFlags::kNone}},
2514 };
2515
2516 namespace {
2517 // this is a helper struct to serialize and deserialize class Status, because
2518 // Status's members are not public.
2519 struct StatusSerializationAdapter {
2520 uint8_t code;
2521 uint8_t subcode;
2522 uint8_t severity;
2523 std::string message;
2524
StatusSerializationAdapterROCKSDB_NAMESPACE::__anond592dbd80b11::StatusSerializationAdapter2525 StatusSerializationAdapter() {}
StatusSerializationAdapterROCKSDB_NAMESPACE::__anond592dbd80b11::StatusSerializationAdapter2526 explicit StatusSerializationAdapter(const Status& s) {
2527 code = s.code();
2528 subcode = s.subcode();
2529 severity = s.severity();
2530 auto msg = s.getState();
2531 message = msg ? msg : "";
2532 }
2533
GetStatusROCKSDB_NAMESPACE::__anond592dbd80b11::StatusSerializationAdapter2534 Status GetStatus() {
2535 return Status(static_cast<Status::Code>(code),
2536 static_cast<Status::SubCode>(subcode),
2537 static_cast<Status::Severity>(severity), message);
2538 }
2539 };
2540 } // namespace
2541
2542 static std::unordered_map<std::string, OptionTypeInfo>
2543 status_adapter_type_info = {
2544 {"code",
2545 {offsetof(struct StatusSerializationAdapter, code),
2546 OptionType::kUInt8T, OptionVerificationType::kNormal,
2547 OptionTypeFlags::kNone}},
2548 {"subcode",
2549 {offsetof(struct StatusSerializationAdapter, subcode),
2550 OptionType::kUInt8T, OptionVerificationType::kNormal,
2551 OptionTypeFlags::kNone}},
2552 {"severity",
2553 {offsetof(struct StatusSerializationAdapter, severity),
2554 OptionType::kUInt8T, OptionVerificationType::kNormal,
2555 OptionTypeFlags::kNone}},
2556 {"message",
2557 {offsetof(struct StatusSerializationAdapter, message),
2558 OptionType::kEncodedString, OptionVerificationType::kNormal,
2559 OptionTypeFlags::kNone}},
2560 };
2561
2562 static std::unordered_map<std::string, OptionTypeInfo> cs_result_type_info = {
2563 {"status",
2564 {offsetof(struct CompactionServiceResult, status),
2565 OptionType::kCustomizable, OptionVerificationType::kNormal,
2566 OptionTypeFlags::kNone,
2567 [](const ConfigOptions& opts, const std::string& /*name*/,
__anond592dbd80c02() 2568 const std::string& value, void* addr) {
2569 auto status_obj = static_cast<Status*>(addr);
2570 StatusSerializationAdapter adapter;
2571 Status s = OptionTypeInfo::ParseType(
2572 opts, value, status_adapter_type_info, &adapter);
2573 *status_obj = adapter.GetStatus();
2574 return s;
2575 },
2576 [](const ConfigOptions& opts, const std::string& /*name*/,
__anond592dbd80d02() 2577 const void* addr, std::string* value) {
2578 const auto status_obj = static_cast<const Status*>(addr);
2579 StatusSerializationAdapter adapter(*status_obj);
2580 std::string result;
2581 Status s = OptionTypeInfo::SerializeType(opts, status_adapter_type_info,
2582 &adapter, &result);
2583 *value = "{" + result + "}";
2584 return s;
2585 },
2586 [](const ConfigOptions& opts, const std::string& /*name*/,
__anond592dbd80e02() 2587 const void* addr1, const void* addr2, std::string* mismatch) {
2588 const auto status1 = static_cast<const Status*>(addr1);
2589 const auto status2 = static_cast<const Status*>(addr2);
2590 StatusSerializationAdapter adatper1(*status1);
2591 StatusSerializationAdapter adapter2(*status2);
2592 return OptionTypeInfo::TypesAreEqual(opts, status_adapter_type_info,
2593 &adatper1, &adapter2, mismatch);
2594 }}},
2595 {"output_files",
2596 OptionTypeInfo::Vector<CompactionServiceOutputFile>(
2597 offsetof(struct CompactionServiceResult, output_files),
2598 OptionVerificationType::kNormal, OptionTypeFlags::kNone,
2599 OptionTypeInfo::Struct("output_files", &cs_output_file_type_info, 0,
2600 OptionVerificationType::kNormal,
2601 OptionTypeFlags::kNone))},
2602 {"output_level",
2603 {offsetof(struct CompactionServiceResult, output_level), OptionType::kInt,
2604 OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
2605 {"output_path",
2606 {offsetof(struct CompactionServiceResult, output_path),
2607 OptionType::kEncodedString, OptionVerificationType::kNormal,
2608 OptionTypeFlags::kNone}},
2609 {"num_output_records",
2610 {offsetof(struct CompactionServiceResult, num_output_records),
2611 OptionType::kUInt64T, OptionVerificationType::kNormal,
2612 OptionTypeFlags::kNone}},
2613 {"total_bytes",
2614 {offsetof(struct CompactionServiceResult, total_bytes),
2615 OptionType::kUInt64T, OptionVerificationType::kNormal,
2616 OptionTypeFlags::kNone}},
2617 {"bytes_read",
2618 {offsetof(struct CompactionServiceResult, bytes_read),
2619 OptionType::kUInt64T, OptionVerificationType::kNormal,
2620 OptionTypeFlags::kNone}},
2621 {"bytes_written",
2622 {offsetof(struct CompactionServiceResult, bytes_written),
2623 OptionType::kUInt64T, OptionVerificationType::kNormal,
2624 OptionTypeFlags::kNone}},
2625 {"stats", OptionTypeInfo::Struct(
2626 "stats", &compaction_job_stats_type_info,
2627 offsetof(struct CompactionServiceResult, stats),
2628 OptionVerificationType::kNormal, OptionTypeFlags::kNone)},
2629 };
2630
Read(const std::string & data_str,CompactionServiceInput * obj)2631 Status CompactionServiceInput::Read(const std::string& data_str,
2632 CompactionServiceInput* obj) {
2633 if (data_str.size() <= sizeof(BinaryFormatVersion)) {
2634 return Status::InvalidArgument("Invalid CompactionServiceInput string");
2635 }
2636 auto format_version = DecodeFixed32(data_str.data());
2637 if (format_version == kOptionsString) {
2638 ConfigOptions cf;
2639 cf.invoke_prepare_options = false;
2640 cf.ignore_unknown_options = true;
2641 return OptionTypeInfo::ParseType(
2642 cf, data_str.substr(sizeof(BinaryFormatVersion)), cs_input_type_info,
2643 obj);
2644 } else {
2645 return Status::NotSupported(
2646 "Compaction Service Input data version not supported: " +
2647 ToString(format_version));
2648 }
2649 }
2650
Write(std::string * output)2651 Status CompactionServiceInput::Write(std::string* output) {
2652 char buf[sizeof(BinaryFormatVersion)];
2653 EncodeFixed32(buf, kOptionsString);
2654 output->append(buf, sizeof(BinaryFormatVersion));
2655 ConfigOptions cf;
2656 cf.invoke_prepare_options = false;
2657 return OptionTypeInfo::SerializeType(cf, cs_input_type_info, this, output);
2658 }
2659
Read(const std::string & data_str,CompactionServiceResult * obj)2660 Status CompactionServiceResult::Read(const std::string& data_str,
2661 CompactionServiceResult* obj) {
2662 if (data_str.size() <= sizeof(BinaryFormatVersion)) {
2663 return Status::InvalidArgument("Invalid CompactionServiceResult string");
2664 }
2665 auto format_version = DecodeFixed32(data_str.data());
2666 if (format_version == kOptionsString) {
2667 ConfigOptions cf;
2668 cf.invoke_prepare_options = false;
2669 cf.ignore_unknown_options = true;
2670 return OptionTypeInfo::ParseType(
2671 cf, data_str.substr(sizeof(BinaryFormatVersion)), cs_result_type_info,
2672 obj);
2673 } else {
2674 return Status::NotSupported(
2675 "Compaction Service Result data version not supported: " +
2676 ToString(format_version));
2677 }
2678 }
2679
Write(std::string * output)2680 Status CompactionServiceResult::Write(std::string* output) {
2681 char buf[sizeof(BinaryFormatVersion)];
2682 EncodeFixed32(buf, kOptionsString);
2683 output->append(buf, sizeof(BinaryFormatVersion));
2684 ConfigOptions cf;
2685 cf.invoke_prepare_options = false;
2686 return OptionTypeInfo::SerializeType(cf, cs_result_type_info, this, output);
2687 }
2688
2689 #ifndef NDEBUG
TEST_Equals(CompactionServiceResult * other)2690 bool CompactionServiceResult::TEST_Equals(CompactionServiceResult* other) {
2691 std::string mismatch;
2692 return TEST_Equals(other, &mismatch);
2693 }
2694
TEST_Equals(CompactionServiceResult * other,std::string * mismatch)2695 bool CompactionServiceResult::TEST_Equals(CompactionServiceResult* other,
2696 std::string* mismatch) {
2697 ConfigOptions cf;
2698 cf.invoke_prepare_options = false;
2699 return OptionTypeInfo::TypesAreEqual(cf, cs_result_type_info, this, other,
2700 mismatch);
2701 }
2702
TEST_Equals(CompactionServiceInput * other)2703 bool CompactionServiceInput::TEST_Equals(CompactionServiceInput* other) {
2704 std::string mismatch;
2705 return TEST_Equals(other, &mismatch);
2706 }
2707
TEST_Equals(CompactionServiceInput * other,std::string * mismatch)2708 bool CompactionServiceInput::TEST_Equals(CompactionServiceInput* other,
2709 std::string* mismatch) {
2710 ConfigOptions cf;
2711 cf.invoke_prepare_options = false;
2712 return OptionTypeInfo::TypesAreEqual(cf, cs_input_type_info, this, other,
2713 mismatch);
2714 }
2715 #endif // NDEBUG
2716 #endif // !ROCKSDB_LITE
2717
2718 } // namespace ROCKSDB_NAMESPACE
2719