1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. 2 // This source code is licensed under both the GPLv2 (found in the 3 // COPYING file in the root directory) and Apache 2.0 License 4 // (found in the LICENSE.Apache file in the root directory). 5 // 6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved. 7 // Use of this source code is governed by a BSD-style license that can be 8 // found in the LICENSE file. See the AUTHORS file for names of contributors. 9 #pragma once 10 11 #include <atomic> 12 #include <deque> 13 #include <functional> 14 #include <limits> 15 #include <set> 16 #include <string> 17 #include <utility> 18 #include <vector> 19 20 #include "db/blob/blob_file_completion_callback.h" 21 #include "db/column_family.h" 22 #include "db/compaction/compaction_iterator.h" 23 #include "db/dbformat.h" 24 #include "db/flush_scheduler.h" 25 #include "db/internal_stats.h" 26 #include "db/job_context.h" 27 #include "db/log_writer.h" 28 #include "db/memtable_list.h" 29 #include "db/range_del_aggregator.h" 30 #include "db/version_edit.h" 31 #include "db/write_controller.h" 32 #include "db/write_thread.h" 33 #include "logging/event_logger.h" 34 #include "options/cf_options.h" 35 #include "options/db_options.h" 36 #include "port/port.h" 37 #include "rocksdb/compaction_filter.h" 38 #include "rocksdb/compaction_job_stats.h" 39 #include "rocksdb/db.h" 40 #include "rocksdb/env.h" 41 #include "rocksdb/memtablerep.h" 42 #include "rocksdb/transaction_log.h" 43 #include "table/scoped_arena_iterator.h" 44 #include "util/autovector.h" 45 #include "util/stop_watch.h" 46 #include "util/thread_local.h" 47 48 namespace ROCKSDB_NAMESPACE { 49 50 class Arena; 51 class ErrorHandler; 52 class MemTable; 53 class SnapshotChecker; 54 class SystemClock; 55 class TableCache; 56 class Version; 57 class VersionEdit; 58 class VersionSet; 59 60 // CompactionJob is responsible for executing the compaction. Each (manual or 61 // automated) compaction corresponds to a CompactionJob object, and usually 62 // goes through the stages of `Prepare()`->`Run()`->`Install()`. CompactionJob 63 // will divide the compaction into subcompactions and execute them in parallel 64 // if needed. 65 class CompactionJob { 66 public: 67 CompactionJob( 68 int job_id, Compaction* compaction, const ImmutableDBOptions& db_options, 69 const MutableDBOptions& mutable_db_options, 70 const FileOptions& file_options, VersionSet* versions, 71 const std::atomic<bool>* shutting_down, 72 const SequenceNumber preserve_deletes_seqnum, LogBuffer* log_buffer, 73 FSDirectory* db_directory, FSDirectory* output_directory, 74 FSDirectory* blob_output_directory, Statistics* stats, 75 InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler, 76 std::vector<SequenceNumber> existing_snapshots, 77 SequenceNumber earliest_write_conflict_snapshot, 78 const SnapshotChecker* snapshot_checker, 79 std::shared_ptr<Cache> table_cache, EventLogger* event_logger, 80 bool paranoid_file_checks, bool measure_io_stats, 81 const std::string& dbname, CompactionJobStats* compaction_job_stats, 82 Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer, 83 const std::atomic<int>* manual_compaction_paused = nullptr, 84 const std::string& db_id = "", const std::string& db_session_id = "", 85 std::string full_history_ts_low = "", 86 BlobFileCompletionCallback* blob_callback = nullptr); 87 88 virtual ~CompactionJob(); 89 90 // no copy/move 91 CompactionJob(CompactionJob&& job) = delete; 92 CompactionJob(const CompactionJob& job) = delete; 93 CompactionJob& operator=(const CompactionJob& job) = delete; 94 95 // REQUIRED: mutex held 96 // Prepare for the compaction by setting up boundaries for each subcompaction 97 void Prepare(); 98 // REQUIRED mutex not held 99 // Launch threads for each subcompaction and wait for them to finish. After 100 // that, verify table is usable and finally do bookkeeping to unify 101 // subcompaction results 102 Status Run(); 103 104 // REQUIRED: mutex held 105 // Add compaction input/output to the current version 106 Status Install(const MutableCFOptions& mutable_cf_options); 107 108 // Return the IO status io_status()109 IOStatus io_status() const { return io_status_; } 110 111 protected: 112 struct SubcompactionState; 113 // CompactionJob state 114 struct CompactionState; 115 116 void AggregateStatistics(); 117 void UpdateCompactionStats(); 118 void LogCompaction(); 119 void RecordCompactionIOStats(); 120 void CleanupCompaction(); 121 122 // Call compaction filter. Then iterate through input and compact the 123 // kv-pairs 124 void ProcessKeyValueCompaction(SubcompactionState* sub_compact); 125 126 CompactionState* compact_; 127 InternalStats::CompactionStats compaction_stats_; 128 const ImmutableDBOptions& db_options_; 129 const MutableDBOptions mutable_db_options_copy_; 130 LogBuffer* log_buffer_; 131 FSDirectory* output_directory_; 132 Statistics* stats_; 133 // Is this compaction creating a file in the bottom most level? 134 bool bottommost_level_; 135 136 Env::WriteLifeTimeHint write_hint_; 137 138 IOStatus io_status_; 139 140 private: 141 // Generates a histogram representing potential divisions of key ranges from 142 // the input. It adds the starting and/or ending keys of certain input files 143 // to the working set and then finds the approximate size of data in between 144 // each consecutive pair of slices. Then it divides these ranges into 145 // consecutive groups such that each group has a similar size. 146 void GenSubcompactionBoundaries(); 147 148 void ProcessKeyValueCompactionWithCompactionService( 149 SubcompactionState* sub_compact); 150 151 // update the thread status for starting a compaction. 152 void ReportStartedCompaction(Compaction* compaction); 153 void AllocateCompactionOutputFileNumbers(); 154 155 Status FinishCompactionOutputFile( 156 const Status& input_status, SubcompactionState* sub_compact, 157 CompactionRangeDelAggregator* range_del_agg, 158 CompactionIterationStats* range_del_out_stats, 159 const Slice* next_table_min_key = nullptr); 160 Status InstallCompactionResults(const MutableCFOptions& mutable_cf_options); 161 Status OpenCompactionOutputFile(SubcompactionState* sub_compact); 162 void UpdateCompactionJobStats( 163 const InternalStats::CompactionStats& stats) const; 164 void RecordDroppedKeys(const CompactionIterationStats& c_iter_stats, 165 CompactionJobStats* compaction_job_stats = nullptr); 166 167 void UpdateCompactionInputStatsHelper( 168 int* num_files, uint64_t* bytes_read, int input_level); 169 170 int job_id_; 171 172 CompactionJobStats* compaction_job_stats_; 173 174 // DBImpl state 175 const std::string& dbname_; 176 const std::string db_id_; 177 const std::string db_session_id_; 178 const FileOptions file_options_; 179 180 Env* env_; 181 std::shared_ptr<IOTracer> io_tracer_; 182 FileSystemPtr fs_; 183 // env_option optimized for compaction table reads 184 FileOptions file_options_for_read_; 185 VersionSet* versions_; 186 const std::atomic<bool>* shutting_down_; 187 const std::atomic<int>* manual_compaction_paused_; 188 const SequenceNumber preserve_deletes_seqnum_; 189 FSDirectory* db_directory_; 190 FSDirectory* blob_output_directory_; 191 InstrumentedMutex* db_mutex_; 192 ErrorHandler* db_error_handler_; 193 // If there were two snapshots with seq numbers s1 and 194 // s2 and s1 < s2, and if we find two instances of a key k1 then lies 195 // entirely within s1 and s2, then the earlier version of k1 can be safely 196 // deleted because that version is not visible in any snapshot. 197 std::vector<SequenceNumber> existing_snapshots_; 198 199 // This is the earliest snapshot that could be used for write-conflict 200 // checking by a transaction. For any user-key newer than this snapshot, we 201 // should make sure not to remove evidence that a write occurred. 202 SequenceNumber earliest_write_conflict_snapshot_; 203 204 const SnapshotChecker* const snapshot_checker_; 205 206 std::shared_ptr<Cache> table_cache_; 207 208 EventLogger* event_logger_; 209 210 bool paranoid_file_checks_; 211 bool measure_io_stats_; 212 // Stores the Slices that designate the boundaries for each subcompaction 213 std::vector<Slice> boundaries_; 214 // Stores the approx size of keys covered in the range of each subcompaction 215 std::vector<uint64_t> sizes_; 216 Env::Priority thread_pri_; 217 std::string full_history_ts_low_; 218 BlobFileCompletionCallback* blob_callback_; 219 220 // Get table file name in where it's outputting to, which should also be in 221 // `output_directory_`. 222 virtual std::string GetTableFileName(uint64_t file_number); 223 }; 224 225 // CompactionServiceInput is used the pass compaction information between two 226 // db instances. It contains the information needed to do a compaction. It 227 // doesn't contain the LSM tree information, which is passed though MANIFEST 228 // file. 229 struct CompactionServiceInput { 230 ColumnFamilyDescriptor column_family; 231 232 DBOptions db_options; 233 234 std::vector<SequenceNumber> snapshots; 235 236 // SST files for compaction, it should already be expended to include all the 237 // files needed for this compaction, for both input level files and output 238 // level files. 239 std::vector<std::string> input_files; 240 int output_level; 241 242 // information for subcompaction 243 bool has_begin = false; 244 std::string begin; 245 bool has_end = false; 246 std::string end; 247 uint64_t approx_size = 0; 248 249 // serialization interface to read and write the object 250 static Status Read(const std::string& data_str, CompactionServiceInput* obj); 251 Status Write(std::string* output); 252 253 // Initialize a dummy ColumnFamilyDescriptor CompactionServiceInputCompactionServiceInput254 CompactionServiceInput() : column_family("", ColumnFamilyOptions()) {} 255 256 #ifndef NDEBUG 257 bool TEST_Equals(CompactionServiceInput* other); 258 bool TEST_Equals(CompactionServiceInput* other, std::string* mismatch); 259 #endif // NDEBUG 260 }; 261 262 // CompactionServiceOutputFile is the metadata for the output SST file 263 struct CompactionServiceOutputFile { 264 std::string file_name; 265 SequenceNumber smallest_seqno; 266 SequenceNumber largest_seqno; 267 std::string smallest_internal_key; 268 std::string largest_internal_key; 269 uint64_t oldest_ancester_time; 270 uint64_t file_creation_time; 271 uint64_t paranoid_hash; 272 bool marked_for_compaction; 273 274 CompactionServiceOutputFile() = default; CompactionServiceOutputFileCompactionServiceOutputFile275 CompactionServiceOutputFile( 276 const std::string& name, SequenceNumber smallest, SequenceNumber largest, 277 std::string _smallest_internal_key, std::string _largest_internal_key, 278 uint64_t _oldest_ancester_time, uint64_t _file_creation_time, 279 uint64_t _paranoid_hash, bool _marked_for_compaction) 280 : file_name(name), 281 smallest_seqno(smallest), 282 largest_seqno(largest), 283 smallest_internal_key(std::move(_smallest_internal_key)), 284 largest_internal_key(std::move(_largest_internal_key)), 285 oldest_ancester_time(_oldest_ancester_time), 286 file_creation_time(_file_creation_time), 287 paranoid_hash(_paranoid_hash), 288 marked_for_compaction(_marked_for_compaction) {} 289 }; 290 291 // CompactionServiceResult contains the compaction result from a different db 292 // instance, with these information, the primary db instance with write 293 // permission is able to install the result to the DB. 294 struct CompactionServiceResult { 295 Status status; 296 std::vector<CompactionServiceOutputFile> output_files; 297 int output_level; 298 299 // location of the output files 300 std::string output_path; 301 302 // some statistics about the compaction 303 uint64_t num_output_records; 304 uint64_t total_bytes; 305 uint64_t bytes_read; 306 uint64_t bytes_written; 307 CompactionJobStats stats; 308 309 // serialization interface to read and write the object 310 static Status Read(const std::string& data_str, CompactionServiceResult* obj); 311 Status Write(std::string* output); 312 313 #ifndef NDEBUG 314 bool TEST_Equals(CompactionServiceResult* other); 315 bool TEST_Equals(CompactionServiceResult* other, std::string* mismatch); 316 #endif // NDEBUG 317 }; 318 319 // CompactionServiceCompactionJob is an read-only compaction job, it takes 320 // input information from `compaction_service_input` and put result information 321 // in `compaction_service_result`, the SST files are generated to `output_path`. 322 class CompactionServiceCompactionJob : private CompactionJob { 323 public: 324 CompactionServiceCompactionJob( 325 int job_id, Compaction* compaction, const ImmutableDBOptions& db_options, 326 const MutableDBOptions& mutable_db_options, 327 const FileOptions& file_options, VersionSet* versions, 328 const std::atomic<bool>* shutting_down, LogBuffer* log_buffer, 329 FSDirectory* output_directory, Statistics* stats, 330 InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler, 331 std::vector<SequenceNumber> existing_snapshots, 332 std::shared_ptr<Cache> table_cache, EventLogger* event_logger, 333 const std::string& dbname, const std::shared_ptr<IOTracer>& io_tracer, 334 const std::string& db_id, const std::string& db_session_id, 335 const std::string& output_path, 336 const CompactionServiceInput& compaction_service_input, 337 CompactionServiceResult* compaction_service_result); 338 339 // Run the compaction in current thread and return the result 340 Status Run(); 341 342 void CleanupCompaction(); 343 io_status()344 IOStatus io_status() const { return CompactionJob::io_status(); } 345 346 private: 347 // Get table file name in output_path 348 std::string GetTableFileName(uint64_t file_number) override; 349 // Specific the compaction output path, otherwise it uses default DB path 350 const std::string output_path_; 351 352 // Compaction job input 353 const CompactionServiceInput& compaction_input_; 354 355 // Compaction job result 356 CompactionServiceResult* compaction_result_; 357 }; 358 359 } // namespace ROCKSDB_NAMESPACE 360