1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. 2 // This source code is licensed under both the GPLv2 (found in the 3 // COPYING file in the root directory) and Apache 2.0 License 4 // (found in the LICENSE.Apache file in the root directory). 5 #pragma once 6 7 #include <algorithm> 8 #include <cinttypes> 9 #include <deque> 10 #include <string> 11 #include <unordered_set> 12 #include <vector> 13 14 #include "db/compaction/compaction.h" 15 #include "db/compaction/compaction_iteration_stats.h" 16 #include "db/merge_helper.h" 17 #include "db/pinned_iterators_manager.h" 18 #include "db/range_del_aggregator.h" 19 #include "db/snapshot_checker.h" 20 #include "options/cf_options.h" 21 #include "rocksdb/compaction_filter.h" 22 23 namespace ROCKSDB_NAMESPACE { 24 25 class BlobFileBuilder; 26 27 // A wrapper of internal iterator whose purpose is to count how 28 // many entries there are in the iterator. 29 class SequenceIterWrapper : public InternalIterator { 30 public: SequenceIterWrapper(InternalIterator * iter,const Comparator * cmp,bool need_count_entries)31 SequenceIterWrapper(InternalIterator* iter, const Comparator* cmp, 32 bool need_count_entries) 33 : icmp_(cmp, /*named=*/false), 34 inner_iter_(iter), 35 need_count_entries_(need_count_entries) {} Valid()36 bool Valid() const override { return inner_iter_->Valid(); } status()37 Status status() const override { return inner_iter_->status(); } Next()38 void Next() override { 39 num_itered_++; 40 inner_iter_->Next(); 41 } Seek(const Slice & target)42 void Seek(const Slice& target) override { 43 if (!need_count_entries_) { 44 inner_iter_->Seek(target); 45 } else { 46 // For flush cases, we need to count total number of entries, so we 47 // do Next() rather than Seek(). 48 while (inner_iter_->Valid() && 49 icmp_.Compare(inner_iter_->key(), target) < 0) { 50 Next(); 51 } 52 } 53 } key()54 Slice key() const override { return inner_iter_->key(); } value()55 Slice value() const override { return inner_iter_->value(); } 56 57 // Unused InternalIterator methods SeekToFirst()58 void SeekToFirst() override { assert(false); } Prev()59 void Prev() override { assert(false); } SeekForPrev(const Slice &)60 void SeekForPrev(const Slice& /* target */) override { assert(false); } SeekToLast()61 void SeekToLast() override { assert(false); } 62 num_itered()63 uint64_t num_itered() const { return num_itered_; } 64 65 private: 66 InternalKeyComparator icmp_; 67 InternalIterator* inner_iter_; // not owned 68 uint64_t num_itered_ = 0; 69 bool need_count_entries_; 70 }; 71 72 class CompactionIterator { 73 public: 74 // A wrapper around Compaction. Has a much smaller interface, only what 75 // CompactionIterator uses. Tests can override it. 76 class CompactionProxy { 77 public: 78 virtual ~CompactionProxy() = default; 79 80 virtual int level() const = 0; 81 82 virtual bool KeyNotExistsBeyondOutputLevel( 83 const Slice& user_key, std::vector<size_t>* level_ptrs) const = 0; 84 85 virtual bool bottommost_level() const = 0; 86 87 virtual int number_levels() const = 0; 88 89 virtual Slice GetLargestUserKey() const = 0; 90 91 virtual bool allow_ingest_behind() const = 0; 92 93 virtual bool preserve_deletes() const = 0; 94 95 virtual bool enable_blob_garbage_collection() const = 0; 96 97 virtual double blob_garbage_collection_age_cutoff() const = 0; 98 99 virtual Version* input_version() const = 0; 100 }; 101 102 class RealCompaction : public CompactionProxy { 103 public: RealCompaction(const Compaction * compaction)104 explicit RealCompaction(const Compaction* compaction) 105 : compaction_(compaction) { 106 assert(compaction_); 107 assert(compaction_->immutable_cf_options()); 108 assert(compaction_->mutable_cf_options()); 109 } 110 level()111 int level() const override { return compaction_->level(); } 112 KeyNotExistsBeyondOutputLevel(const Slice & user_key,std::vector<size_t> * level_ptrs)113 bool KeyNotExistsBeyondOutputLevel( 114 const Slice& user_key, std::vector<size_t>* level_ptrs) const override { 115 return compaction_->KeyNotExistsBeyondOutputLevel(user_key, level_ptrs); 116 } 117 bottommost_level()118 bool bottommost_level() const override { 119 return compaction_->bottommost_level(); 120 } 121 number_levels()122 int number_levels() const override { return compaction_->number_levels(); } 123 GetLargestUserKey()124 Slice GetLargestUserKey() const override { 125 return compaction_->GetLargestUserKey(); 126 } 127 allow_ingest_behind()128 bool allow_ingest_behind() const override { 129 return compaction_->immutable_cf_options()->allow_ingest_behind; 130 } 131 preserve_deletes()132 bool preserve_deletes() const override { 133 return compaction_->immutable_cf_options()->preserve_deletes; 134 } 135 enable_blob_garbage_collection()136 bool enable_blob_garbage_collection() const override { 137 return compaction_->mutable_cf_options()->enable_blob_garbage_collection; 138 } 139 blob_garbage_collection_age_cutoff()140 double blob_garbage_collection_age_cutoff() const override { 141 return compaction_->mutable_cf_options() 142 ->blob_garbage_collection_age_cutoff; 143 } 144 input_version()145 Version* input_version() const override { 146 return compaction_->input_version(); 147 } 148 149 private: 150 const Compaction* compaction_; 151 }; 152 153 CompactionIterator(InternalIterator* input, const Comparator* cmp, 154 MergeHelper* merge_helper, SequenceNumber last_sequence, 155 std::vector<SequenceNumber>* snapshots, 156 SequenceNumber earliest_write_conflict_snapshot, 157 const SnapshotChecker* snapshot_checker, Env* env, 158 bool report_detailed_time, bool expect_valid_internal_key, 159 CompactionRangeDelAggregator* range_del_agg, 160 BlobFileBuilder* blob_file_builder, 161 bool allow_data_in_errors, 162 const Compaction* compaction = nullptr, 163 const CompactionFilter* compaction_filter = nullptr, 164 const std::atomic<bool>* shutting_down = nullptr, 165 const SequenceNumber preserve_deletes_seqnum = 0, 166 const std::atomic<int>* manual_compaction_paused = nullptr, 167 const std::shared_ptr<Logger> info_log = nullptr, 168 const std::string* full_history_ts_low = nullptr); 169 170 // Constructor with custom CompactionProxy, used for tests. 171 CompactionIterator(InternalIterator* input, const Comparator* cmp, 172 MergeHelper* merge_helper, SequenceNumber last_sequence, 173 std::vector<SequenceNumber>* snapshots, 174 SequenceNumber earliest_write_conflict_snapshot, 175 const SnapshotChecker* snapshot_checker, Env* env, 176 bool report_detailed_time, bool expect_valid_internal_key, 177 CompactionRangeDelAggregator* range_del_agg, 178 BlobFileBuilder* blob_file_builder, 179 bool allow_data_in_errors, 180 std::unique_ptr<CompactionProxy> compaction, 181 const CompactionFilter* compaction_filter = nullptr, 182 const std::atomic<bool>* shutting_down = nullptr, 183 const SequenceNumber preserve_deletes_seqnum = 0, 184 const std::atomic<int>* manual_compaction_paused = nullptr, 185 const std::shared_ptr<Logger> info_log = nullptr, 186 const std::string* full_history_ts_low = nullptr); 187 188 ~CompactionIterator(); 189 190 void ResetRecordCounts(); 191 192 // Seek to the beginning of the compaction iterator output. 193 // 194 // REQUIRED: Call only once. 195 void SeekToFirst(); 196 197 // Produces the next record in the compaction. 198 // 199 // REQUIRED: SeekToFirst() has been called. 200 void Next(); 201 202 // Getters key()203 const Slice& key() const { return key_; } value()204 const Slice& value() const { return value_; } status()205 const Status& status() const { return status_; } ikey()206 const ParsedInternalKey& ikey() const { return ikey_; } Valid()207 bool Valid() const { return valid_; } user_key()208 const Slice& user_key() const { return current_user_key_; } iter_stats()209 const CompactionIterationStats& iter_stats() const { return iter_stats_; } num_input_entry_scanned()210 uint64_t num_input_entry_scanned() const { return input_.num_itered(); } 211 212 private: 213 // Processes the input stream to find the next output 214 void NextFromInput(); 215 216 // Do final preparations before presenting the output to the callee. 217 void PrepareOutput(); 218 219 // Passes the output value to the blob file builder (if any), and replaces it 220 // with the corresponding blob reference if it has been actually written to a 221 // blob file (i.e. if it passed the value size check). Returns true if the 222 // value got extracted to a blob file, false otherwise. 223 bool ExtractLargeValueIfNeededImpl(); 224 225 // Extracts large values as described above, and updates the internal key's 226 // type to kTypeBlobIndex if the value got extracted. Should only be called 227 // for regular values (kTypeValue). 228 void ExtractLargeValueIfNeeded(); 229 230 // Relocates valid blobs residing in the oldest blob files if garbage 231 // collection is enabled. Relocated blobs are written to new blob files or 232 // inlined in the LSM tree depending on the current settings (i.e. 233 // enable_blob_files and min_blob_size). Should only be called for blob 234 // references (kTypeBlobIndex). 235 // 236 // Note: the stacked BlobDB implementation's compaction filter based GC 237 // algorithm is also called from here. 238 void GarbageCollectBlobIfNeeded(); 239 240 // Invoke compaction filter if needed. 241 // Return true on success, false on failures (e.g.: kIOError). 242 bool InvokeFilterIfNeeded(bool* need_skip, Slice* skip_until); 243 244 // Given a sequence number, return the sequence number of the 245 // earliest snapshot that this sequence number is visible in. 246 // The snapshots themselves are arranged in ascending order of 247 // sequence numbers. 248 // Employ a sequential search because the total number of 249 // snapshots are typically small. 250 inline SequenceNumber findEarliestVisibleSnapshot( 251 SequenceNumber in, SequenceNumber* prev_snapshot); 252 253 // Checks whether the currently seen ikey_ is needed for 254 // incremental (differential) snapshot and hence can't be dropped 255 // or seqnum be zero-ed out even if all other conditions for it are met. 256 inline bool ikeyNotNeededForIncrementalSnapshot(); 257 KeyCommitted(SequenceNumber sequence)258 inline bool KeyCommitted(SequenceNumber sequence) { 259 return snapshot_checker_ == nullptr || 260 snapshot_checker_->CheckInSnapshot(sequence, kMaxSequenceNumber) == 261 SnapshotCheckerResult::kInSnapshot; 262 } 263 264 bool IsInEarliestSnapshot(SequenceNumber sequence); 265 266 // Extract user-defined timestamp from user key if possible and compare it 267 // with *full_history_ts_low_ if applicable. UpdateTimestampAndCompareWithFullHistoryLow()268 inline void UpdateTimestampAndCompareWithFullHistoryLow() { 269 if (!timestamp_size_) { 270 return; 271 } 272 Slice ts = ExtractTimestampFromUserKey(ikey_.user_key, timestamp_size_); 273 curr_ts_.assign(ts.data(), ts.size()); 274 if (full_history_ts_low_) { 275 cmp_with_history_ts_low_ = 276 cmp_->CompareTimestamp(ts, *full_history_ts_low_); 277 } 278 } 279 280 static uint64_t ComputeBlobGarbageCollectionCutoffFileNumber( 281 const CompactionProxy* compaction); 282 283 SequenceIterWrapper input_; 284 const Comparator* cmp_; 285 MergeHelper* merge_helper_; 286 const std::vector<SequenceNumber>* snapshots_; 287 // List of snapshots released during compaction. 288 // findEarliestVisibleSnapshot() find them out from return of 289 // snapshot_checker, and make sure they will not be returned as 290 // earliest visible snapshot of an older value. 291 // See WritePreparedTransactionTest::ReleaseSnapshotDuringCompaction3. 292 std::unordered_set<SequenceNumber> released_snapshots_; 293 std::vector<SequenceNumber>::const_iterator earliest_snapshot_iter_; 294 const SequenceNumber earliest_write_conflict_snapshot_; 295 const SnapshotChecker* const snapshot_checker_; 296 Env* env_; 297 SystemClock* clock_; 298 bool report_detailed_time_; 299 bool expect_valid_internal_key_; 300 CompactionRangeDelAggregator* range_del_agg_; 301 BlobFileBuilder* blob_file_builder_; 302 std::unique_ptr<CompactionProxy> compaction_; 303 const CompactionFilter* compaction_filter_; 304 const std::atomic<bool>* shutting_down_; 305 const std::atomic<int>* manual_compaction_paused_; 306 const SequenceNumber preserve_deletes_seqnum_; 307 bool bottommost_level_; 308 bool valid_ = false; 309 bool visible_at_tip_; 310 SequenceNumber earliest_snapshot_; 311 SequenceNumber latest_snapshot_; 312 313 std::shared_ptr<Logger> info_log_; 314 315 bool allow_data_in_errors_; 316 317 // Comes from comparator. 318 const size_t timestamp_size_; 319 320 // Lower bound timestamp to retain full history in terms of user-defined 321 // timestamp. If a key's timestamp is older than full_history_ts_low_, then 322 // the key *may* be eligible for garbage collection (GC). The skipping logic 323 // is in `NextFromInput()` and `PrepareOutput()`. 324 // If nullptr, NO GC will be performed and all history will be preserved. 325 const std::string* const full_history_ts_low_; 326 327 // State 328 // 329 // Points to a copy of the current compaction iterator output (current_key_) 330 // if valid_. 331 Slice key_; 332 // Points to the value in the underlying iterator that corresponds to the 333 // current output. 334 Slice value_; 335 // The status is OK unless compaction iterator encounters a merge operand 336 // while not having a merge operator defined. 337 Status status_; 338 // Stores the user key, sequence number and type of the current compaction 339 // iterator output (or current key in the underlying iterator during 340 // NextFromInput()). 341 ParsedInternalKey ikey_; 342 // Stores whether ikey_.user_key is valid. If set to false, the user key is 343 // not compared against the current key in the underlying iterator. 344 bool has_current_user_key_ = false; 345 // If false, the iterator holds a copy of the current compaction iterator 346 // output (or current key in the underlying iterator during NextFromInput()). 347 bool at_next_ = false; 348 349 IterKey current_key_; 350 Slice current_user_key_; 351 std::string curr_ts_; 352 SequenceNumber current_user_key_sequence_; 353 SequenceNumber current_user_key_snapshot_; 354 355 // True if the iterator has already returned a record for the current key. 356 bool has_outputted_key_ = false; 357 358 // truncated the value of the next key and output it without applying any 359 // compaction rules. This is used for outputting a put after a single delete. 360 bool clear_and_output_next_key_ = false; 361 362 MergeOutputIterator merge_out_iter_; 363 // PinnedIteratorsManager used to pin input_ Iterator blocks while reading 364 // merge operands and then releasing them after consuming them. 365 PinnedIteratorsManager pinned_iters_mgr_; 366 367 uint64_t blob_garbage_collection_cutoff_file_number_; 368 369 std::string blob_index_; 370 PinnableSlice blob_value_; 371 std::string compaction_filter_value_; 372 InternalKey compaction_filter_skip_until_; 373 // "level_ptrs" holds indices that remember which file of an associated 374 // level we were last checking during the last call to compaction-> 375 // KeyNotExistsBeyondOutputLevel(). This allows future calls to the function 376 // to pick off where it left off since each subcompaction's key range is 377 // increasing so a later call to the function must be looking for a key that 378 // is in or beyond the last file checked during the previous call 379 std::vector<size_t> level_ptrs_; 380 CompactionIterationStats iter_stats_; 381 382 // Used to avoid purging uncommitted values. The application can specify 383 // uncommitted values by providing a SnapshotChecker object. 384 bool current_key_committed_; 385 386 // Saved result of ucmp->CompareTimestamp(current_ts_, *full_history_ts_low_) 387 int cmp_with_history_ts_low_; 388 389 const int level_; 390 AdvanceInputIter()391 void AdvanceInputIter() { input_.Next(); } 392 SkipUntil(const Slice & skip_until)393 void SkipUntil(const Slice& skip_until) { input_.Seek(skip_until); } 394 IsShuttingDown()395 bool IsShuttingDown() { 396 // This is a best-effort facility, so memory_order_relaxed is sufficient. 397 return shutting_down_ && shutting_down_->load(std::memory_order_relaxed); 398 } 399 IsPausingManualCompaction()400 bool IsPausingManualCompaction() { 401 // This is a best-effort facility, so memory_order_relaxed is sufficient. 402 return manual_compaction_paused_ && 403 manual_compaction_paused_->load(std::memory_order_relaxed) > 0; 404 } 405 }; 406 } // namespace ROCKSDB_NAMESPACE 407