1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. 2 // This source code is licensed under both the GPLv2 (found in the 3 // COPYING file in the root directory) and Apache 2.0 License 4 // (found in the LICENSE.Apache file in the root directory). 5 // 6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved. 7 // Use of this source code is governed by a BSD-style license that can be 8 // found in the LICENSE file. See the AUTHORS file for names of contributors. 9 #pragma once 10 11 #include <atomic> 12 #include <deque> 13 #include <functional> 14 #include <limits> 15 #include <set> 16 #include <string> 17 #include <utility> 18 #include <vector> 19 20 #include "db/column_family.h" 21 #include "db/compaction/compaction_iterator.h" 22 #include "db/dbformat.h" 23 #include "db/flush_scheduler.h" 24 #include "db/internal_stats.h" 25 #include "db/job_context.h" 26 #include "db/log_writer.h" 27 #include "db/memtable_list.h" 28 #include "db/range_del_aggregator.h" 29 #include "db/version_edit.h" 30 #include "db/write_controller.h" 31 #include "db/write_thread.h" 32 #include "logging/event_logger.h" 33 #include "options/cf_options.h" 34 #include "options/db_options.h" 35 #include "port/port.h" 36 #include "rocksdb/compaction_filter.h" 37 #include "rocksdb/compaction_job_stats.h" 38 #include "rocksdb/db.h" 39 #include "rocksdb/env.h" 40 #include "rocksdb/memtablerep.h" 41 #include "rocksdb/transaction_log.h" 42 #include "table/scoped_arena_iterator.h" 43 #include "util/autovector.h" 44 #include "util/stop_watch.h" 45 #include "util/thread_local.h" 46 47 namespace ROCKSDB_NAMESPACE { 48 49 class Arena; 50 class ErrorHandler; 51 class MemTable; 52 class SnapshotChecker; 53 class TableCache; 54 class Version; 55 class VersionEdit; 56 class VersionSet; 57 58 // CompactionJob is responsible for executing the compaction. Each (manual or 59 // automated) compaction corresponds to a CompactionJob object, and usually 60 // goes through the stages of `Prepare()`->`Run()`->`Install()`. CompactionJob 61 // will divide the compaction into subcompactions and execute them in parallel 62 // if needed. 63 class CompactionJob { 64 public: 65 CompactionJob(int job_id, Compaction* compaction, 66 const ImmutableDBOptions& db_options, 67 const FileOptions& file_options, VersionSet* versions, 68 const std::atomic<bool>* shutting_down, 69 const SequenceNumber preserve_deletes_seqnum, 70 LogBuffer* log_buffer, Directory* db_directory, 71 Directory* output_directory, Statistics* stats, 72 InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler, 73 std::vector<SequenceNumber> existing_snapshots, 74 SequenceNumber earliest_write_conflict_snapshot, 75 const SnapshotChecker* snapshot_checker, 76 std::shared_ptr<Cache> table_cache, EventLogger* event_logger, 77 bool paranoid_file_checks, bool measure_io_stats, 78 const std::string& dbname, 79 CompactionJobStats* compaction_job_stats, 80 Env::Priority thread_pri, 81 const std::atomic<bool>* manual_compaction_paused = nullptr); 82 83 ~CompactionJob(); 84 85 // no copy/move 86 CompactionJob(CompactionJob&& job) = delete; 87 CompactionJob(const CompactionJob& job) = delete; 88 CompactionJob& operator=(const CompactionJob& job) = delete; 89 90 // REQUIRED: mutex held 91 // Prepare for the compaction by setting up boundaries for each subcompaction 92 void Prepare(); 93 // REQUIRED mutex not held 94 // Launch threads for each subcompaction and wait for them to finish. After 95 // that, verify table is usable and finally do bookkeeping to unify 96 // subcompaction results 97 Status Run(); 98 99 // REQUIRED: mutex held 100 // Add compaction input/output to the current version 101 Status Install(const MutableCFOptions& mutable_cf_options); 102 103 private: 104 struct SubcompactionState; 105 106 void AggregateStatistics(); 107 108 // Generates a histogram representing potential divisions of key ranges from 109 // the input. It adds the starting and/or ending keys of certain input files 110 // to the working set and then finds the approximate size of data in between 111 // each consecutive pair of slices. Then it divides these ranges into 112 // consecutive groups such that each group has a similar size. 113 void GenSubcompactionBoundaries(); 114 115 // update the thread status for starting a compaction. 116 void ReportStartedCompaction(Compaction* compaction); 117 void AllocateCompactionOutputFileNumbers(); 118 // Call compaction filter. Then iterate through input and compact the 119 // kv-pairs 120 void ProcessKeyValueCompaction(SubcompactionState* sub_compact); 121 122 Status FinishCompactionOutputFile( 123 const Status& input_status, SubcompactionState* sub_compact, 124 CompactionRangeDelAggregator* range_del_agg, 125 CompactionIterationStats* range_del_out_stats, 126 const Slice* next_table_min_key = nullptr); 127 Status InstallCompactionResults(const MutableCFOptions& mutable_cf_options); 128 void RecordCompactionIOStats(); 129 Status OpenCompactionOutputFile(SubcompactionState* sub_compact); 130 void CleanupCompaction(); 131 void UpdateCompactionJobStats( 132 const InternalStats::CompactionStats& stats) const; 133 void RecordDroppedKeys(const CompactionIterationStats& c_iter_stats, 134 CompactionJobStats* compaction_job_stats = nullptr); 135 136 void UpdateCompactionStats(); 137 void UpdateCompactionInputStatsHelper( 138 int* num_files, uint64_t* bytes_read, int input_level); 139 140 void LogCompaction(); 141 142 int job_id_; 143 144 // CompactionJob state 145 struct CompactionState; 146 CompactionState* compact_; 147 CompactionJobStats* compaction_job_stats_; 148 InternalStats::CompactionStats compaction_stats_; 149 150 // DBImpl state 151 const std::string& dbname_; 152 const ImmutableDBOptions& db_options_; 153 const FileOptions file_options_; 154 155 Env* env_; 156 FileSystem* fs_; 157 // env_option optimized for compaction table reads 158 FileOptions file_options_for_read_; 159 VersionSet* versions_; 160 const std::atomic<bool>* shutting_down_; 161 const std::atomic<bool>* manual_compaction_paused_; 162 const SequenceNumber preserve_deletes_seqnum_; 163 LogBuffer* log_buffer_; 164 Directory* db_directory_; 165 Directory* output_directory_; 166 Statistics* stats_; 167 InstrumentedMutex* db_mutex_; 168 ErrorHandler* db_error_handler_; 169 // If there were two snapshots with seq numbers s1 and 170 // s2 and s1 < s2, and if we find two instances of a key k1 then lies 171 // entirely within s1 and s2, then the earlier version of k1 can be safely 172 // deleted because that version is not visible in any snapshot. 173 std::vector<SequenceNumber> existing_snapshots_; 174 175 // This is the earliest snapshot that could be used for write-conflict 176 // checking by a transaction. For any user-key newer than this snapshot, we 177 // should make sure not to remove evidence that a write occurred. 178 SequenceNumber earliest_write_conflict_snapshot_; 179 180 const SnapshotChecker* const snapshot_checker_; 181 182 std::shared_ptr<Cache> table_cache_; 183 184 EventLogger* event_logger_; 185 186 // Is this compaction creating a file in the bottom most level? 187 bool bottommost_level_; 188 bool paranoid_file_checks_; 189 bool measure_io_stats_; 190 // Stores the Slices that designate the boundaries for each subcompaction 191 std::vector<Slice> boundaries_; 192 // Stores the approx size of keys covered in the range of each subcompaction 193 std::vector<uint64_t> sizes_; 194 Env::WriteLifeTimeHint write_hint_; 195 Env::Priority thread_pri_; 196 }; 197 198 } // namespace ROCKSDB_NAMESPACE 199