1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. 2 // This source code is licensed under both the GPLv2 (found in the 3 // COPYING file in the root directory) and Apache 2.0 License 4 // (found in the LICENSE.Apache file in the root directory). 5 6 #include <cinttypes> 7 8 #include "db/compaction/compaction_iterator.h" 9 #include "db/snapshot_checker.h" 10 #include "port/likely.h" 11 #include "rocksdb/listener.h" 12 #include "table/internal_iterator.h" 13 #include "test_util/sync_point.h" 14 15 #define DEFINITELY_IN_SNAPSHOT(seq, snapshot) \ 16 ((seq) <= (snapshot) && \ 17 (snapshot_checker_ == nullptr || \ 18 LIKELY(snapshot_checker_->CheckInSnapshot((seq), (snapshot)) == \ 19 SnapshotCheckerResult::kInSnapshot))) 20 21 #define DEFINITELY_NOT_IN_SNAPSHOT(seq, snapshot) \ 22 ((seq) > (snapshot) || \ 23 (snapshot_checker_ != nullptr && \ 24 UNLIKELY(snapshot_checker_->CheckInSnapshot((seq), (snapshot)) == \ 25 SnapshotCheckerResult::kNotInSnapshot))) 26 27 #define IN_EARLIEST_SNAPSHOT(seq) \ 28 ((seq) <= earliest_snapshot_ && \ 29 (snapshot_checker_ == nullptr || LIKELY(IsInEarliestSnapshot(seq)))) 30 31 namespace ROCKSDB_NAMESPACE { 32 33 CompactionIterator::CompactionIterator( 34 InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, 35 SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots, 36 SequenceNumber earliest_write_conflict_snapshot, 37 const SnapshotChecker* snapshot_checker, Env* env, 38 bool report_detailed_time, bool expect_valid_internal_key, 39 CompactionRangeDelAggregator* range_del_agg, const Compaction* compaction, 40 const CompactionFilter* compaction_filter, 41 const std::atomic<bool>* shutting_down, 42 const SequenceNumber preserve_deletes_seqnum, 43 const std::atomic<bool>* manual_compaction_paused, 44 const std::shared_ptr<Logger> info_log) 45 : CompactionIterator( 46 input, cmp, merge_helper, last_sequence, snapshots, 47 earliest_write_conflict_snapshot, snapshot_checker, env, 48 report_detailed_time, expect_valid_internal_key, range_del_agg, 49 std::unique_ptr<CompactionProxy>( 50 compaction ? new CompactionProxy(compaction) : nullptr), 51 compaction_filter, shutting_down, preserve_deletes_seqnum, 52 manual_compaction_paused, info_log) {} 53 54 CompactionIterator::CompactionIterator( 55 InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, 56 SequenceNumber /*last_sequence*/, std::vector<SequenceNumber>* snapshots, 57 SequenceNumber earliest_write_conflict_snapshot, 58 const SnapshotChecker* snapshot_checker, Env* env, 59 bool report_detailed_time, bool expect_valid_internal_key, 60 CompactionRangeDelAggregator* range_del_agg, 61 std::unique_ptr<CompactionProxy> compaction, 62 const CompactionFilter* compaction_filter, 63 const std::atomic<bool>* shutting_down, 64 const SequenceNumber preserve_deletes_seqnum, 65 const std::atomic<bool>* manual_compaction_paused, 66 const std::shared_ptr<Logger> info_log) 67 : input_(input), 68 cmp_(cmp), 69 merge_helper_(merge_helper), 70 snapshots_(snapshots), 71 earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot), 72 snapshot_checker_(snapshot_checker), 73 env_(env), 74 report_detailed_time_(report_detailed_time), 75 expect_valid_internal_key_(expect_valid_internal_key), 76 range_del_agg_(range_del_agg), 77 compaction_(std::move(compaction)), 78 compaction_filter_(compaction_filter), 79 shutting_down_(shutting_down), 80 manual_compaction_paused_(manual_compaction_paused), 81 preserve_deletes_seqnum_(preserve_deletes_seqnum), 82 current_user_key_sequence_(0), 83 current_user_key_snapshot_(0), 84 merge_out_iter_(merge_helper_), 85 current_key_committed_(false), 86 info_log_(info_log) { 87 assert(compaction_filter_ == nullptr || compaction_ != nullptr); 88 assert(snapshots_ != nullptr); 89 bottommost_level_ = 90 compaction_ == nullptr ? false : compaction_->bottommost_level(); 91 if (compaction_ != nullptr) { 92 level_ptrs_ = std::vector<size_t>(compaction_->number_levels(), 0); 93 } 94 if (snapshots_->size() == 0) { 95 // optimize for fast path if there are no snapshots 96 visible_at_tip_ = true; 97 earliest_snapshot_iter_ = snapshots_->end(); 98 earliest_snapshot_ = kMaxSequenceNumber; 99 latest_snapshot_ = 0; 100 } else { 101 visible_at_tip_ = false; 102 earliest_snapshot_iter_ = snapshots_->begin(); 103 earliest_snapshot_ = snapshots_->at(0); 104 latest_snapshot_ = snapshots_->back(); 105 } 106 #ifndef NDEBUG 107 // findEarliestVisibleSnapshot assumes this ordering. 108 for (size_t i = 1; i < snapshots_->size(); ++i) { 109 assert(snapshots_->at(i - 1) < snapshots_->at(i)); 110 } 111 #endif 112 input_->SetPinnedItersMgr(&pinned_iters_mgr_); 113 TEST_SYNC_POINT_CALLBACK("CompactionIterator:AfterInit", compaction_.get()); 114 } 115 116 CompactionIterator::~CompactionIterator() { 117 // input_ Iteartor lifetime is longer than pinned_iters_mgr_ lifetime 118 input_->SetPinnedItersMgr(nullptr); 119 } 120 121 void CompactionIterator::ResetRecordCounts() { 122 iter_stats_.num_record_drop_user = 0; 123 iter_stats_.num_record_drop_hidden = 0; 124 iter_stats_.num_record_drop_obsolete = 0; 125 iter_stats_.num_record_drop_range_del = 0; 126 iter_stats_.num_range_del_drop_obsolete = 0; 127 iter_stats_.num_optimized_del_drop_obsolete = 0; 128 } 129 130 void CompactionIterator::SeekToFirst() { 131 NextFromInput(); 132 PrepareOutput(); 133 } 134 135 void CompactionIterator::Next() { 136 // If there is a merge output, return it before continuing to process the 137 // input. 138 if (merge_out_iter_.Valid()) { 139 merge_out_iter_.Next(); 140 141 // Check if we returned all records of the merge output. 142 if (merge_out_iter_.Valid()) { 143 key_ = merge_out_iter_.key(); 144 value_ = merge_out_iter_.value(); 145 bool valid_key __attribute__((__unused__)); 146 valid_key = ParseInternalKey(key_, &ikey_); 147 // MergeUntil stops when it encounters a corrupt key and does not 148 // include them in the result, so we expect the keys here to be valid. 149 assert(valid_key); 150 if (!valid_key) { 151 ROCKS_LOG_FATAL(info_log_, "Invalid key (%s) in compaction", 152 key_.ToString(true).c_str()); 153 } 154 155 // Keep current_key_ in sync. 156 current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type); 157 key_ = current_key_.GetInternalKey(); 158 ikey_.user_key = current_key_.GetUserKey(); 159 valid_ = true; 160 } else { 161 // We consumed all pinned merge operands, release pinned iterators 162 pinned_iters_mgr_.ReleasePinnedData(); 163 // MergeHelper moves the iterator to the first record after the merged 164 // records, so even though we reached the end of the merge output, we do 165 // not want to advance the iterator. 166 NextFromInput(); 167 } 168 } else { 169 // Only advance the input iterator if there is no merge output and the 170 // iterator is not already at the next record. 171 if (!at_next_) { 172 input_->Next(); 173 } 174 NextFromInput(); 175 } 176 177 if (valid_) { 178 // Record that we've outputted a record for the current key. 179 has_outputted_key_ = true; 180 } 181 182 PrepareOutput(); 183 } 184 185 void CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, 186 Slice* skip_until) { 187 if (compaction_filter_ != nullptr && 188 (ikey_.type == kTypeValue || ikey_.type == kTypeBlobIndex)) { 189 // If the user has specified a compaction filter and the sequence 190 // number is greater than any external snapshot, then invoke the 191 // filter. If the return value of the compaction filter is true, 192 // replace the entry with a deletion marker. 193 CompactionFilter::Decision filter; 194 compaction_filter_value_.clear(); 195 compaction_filter_skip_until_.Clear(); 196 CompactionFilter::ValueType value_type = 197 ikey_.type == kTypeValue ? CompactionFilter::ValueType::kValue 198 : CompactionFilter::ValueType::kBlobIndex; 199 // Hack: pass internal key to BlobIndexCompactionFilter since it needs 200 // to get sequence number. 201 Slice& filter_key = ikey_.type == kTypeValue ? ikey_.user_key : key_; 202 { 203 StopWatchNano timer(env_, report_detailed_time_); 204 filter = compaction_filter_->FilterV2( 205 compaction_->level(), filter_key, value_type, value_, 206 &compaction_filter_value_, compaction_filter_skip_until_.rep()); 207 iter_stats_.total_filter_time += 208 env_ != nullptr && report_detailed_time_ ? timer.ElapsedNanos() : 0; 209 } 210 211 if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil && 212 cmp_->Compare(*compaction_filter_skip_until_.rep(), ikey_.user_key) <= 213 0) { 214 // Can't skip to a key smaller than the current one. 215 // Keep the key as per FilterV2 documentation. 216 filter = CompactionFilter::Decision::kKeep; 217 } 218 219 if (filter == CompactionFilter::Decision::kRemove) { 220 // convert the current key to a delete; key_ is pointing into 221 // current_key_ at this point, so updating current_key_ updates key() 222 ikey_.type = kTypeDeletion; 223 current_key_.UpdateInternalKey(ikey_.sequence, kTypeDeletion); 224 // no value associated with delete 225 value_.clear(); 226 iter_stats_.num_record_drop_user++; 227 } else if (filter == CompactionFilter::Decision::kChangeValue) { 228 value_ = compaction_filter_value_; 229 } else if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil) { 230 *need_skip = true; 231 compaction_filter_skip_until_.ConvertFromUserKey(kMaxSequenceNumber, 232 kValueTypeForSeek); 233 *skip_until = compaction_filter_skip_until_.Encode(); 234 } 235 } 236 } 237 238 void CompactionIterator::NextFromInput() { 239 at_next_ = false; 240 valid_ = false; 241 242 while (!valid_ && input_->Valid() && !IsPausingManualCompaction() && 243 !IsShuttingDown()) { 244 key_ = input_->key(); 245 value_ = input_->value(); 246 iter_stats_.num_input_records++; 247 248 if (!ParseInternalKey(key_, &ikey_)) { 249 // If `expect_valid_internal_key_` is false, return the corrupted key 250 // and let the caller decide what to do with it. 251 // TODO(noetzli): We should have a more elegant solution for this. 252 if (expect_valid_internal_key_) { 253 assert(!"Corrupted internal key not expected."); 254 status_ = Status::Corruption("Corrupted internal key not expected."); 255 break; 256 } 257 key_ = current_key_.SetInternalKey(key_); 258 has_current_user_key_ = false; 259 current_user_key_sequence_ = kMaxSequenceNumber; 260 current_user_key_snapshot_ = 0; 261 iter_stats_.num_input_corrupt_records++; 262 valid_ = true; 263 break; 264 } 265 TEST_SYNC_POINT_CALLBACK("CompactionIterator:ProcessKV", &ikey_); 266 267 // Update input statistics 268 if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion) { 269 iter_stats_.num_input_deletion_records++; 270 } 271 iter_stats_.total_input_raw_key_bytes += key_.size(); 272 iter_stats_.total_input_raw_value_bytes += value_.size(); 273 274 // If need_skip is true, we should seek the input iterator 275 // to internal key skip_until and continue from there. 276 bool need_skip = false; 277 // Points either into compaction_filter_skip_until_ or into 278 // merge_helper_->compaction_filter_skip_until_. 279 Slice skip_until; 280 281 // Check whether the user key changed. After this if statement current_key_ 282 // is a copy of the current input key (maybe converted to a delete by the 283 // compaction filter). ikey_.user_key is pointing to the copy. 284 if (!has_current_user_key_ || 285 !cmp_->Equal(ikey_.user_key, current_user_key_)) { 286 // First occurrence of this user key 287 // Copy key for output 288 key_ = current_key_.SetInternalKey(key_, &ikey_); 289 current_user_key_ = ikey_.user_key; 290 has_current_user_key_ = true; 291 has_outputted_key_ = false; 292 current_user_key_sequence_ = kMaxSequenceNumber; 293 current_user_key_snapshot_ = 0; 294 current_key_committed_ = KeyCommitted(ikey_.sequence); 295 296 // Apply the compaction filter to the first committed version of the user 297 // key. 298 if (current_key_committed_) { 299 InvokeFilterIfNeeded(&need_skip, &skip_until); 300 } 301 } else { 302 // Update the current key to reflect the new sequence number/type without 303 // copying the user key. 304 // TODO(rven): Compaction filter does not process keys in this path 305 // Need to have the compaction filter process multiple versions 306 // if we have versions on both sides of a snapshot 307 current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type); 308 key_ = current_key_.GetInternalKey(); 309 ikey_.user_key = current_key_.GetUserKey(); 310 311 // Note that newer version of a key is ordered before older versions. If a 312 // newer version of a key is committed, so as the older version. No need 313 // to query snapshot_checker_ in that case. 314 if (UNLIKELY(!current_key_committed_)) { 315 assert(snapshot_checker_ != nullptr); 316 current_key_committed_ = KeyCommitted(ikey_.sequence); 317 // Apply the compaction filter to the first committed version of the 318 // user key. 319 if (current_key_committed_) { 320 InvokeFilterIfNeeded(&need_skip, &skip_until); 321 } 322 } 323 } 324 325 if (UNLIKELY(!current_key_committed_)) { 326 assert(snapshot_checker_ != nullptr); 327 valid_ = true; 328 break; 329 } 330 331 // If there are no snapshots, then this kv affect visibility at tip. 332 // Otherwise, search though all existing snapshots to find the earliest 333 // snapshot that is affected by this kv. 334 SequenceNumber last_sequence __attribute__((__unused__)); 335 last_sequence = current_user_key_sequence_; 336 current_user_key_sequence_ = ikey_.sequence; 337 SequenceNumber last_snapshot = current_user_key_snapshot_; 338 SequenceNumber prev_snapshot = 0; // 0 means no previous snapshot 339 current_user_key_snapshot_ = 340 visible_at_tip_ 341 ? earliest_snapshot_ 342 : findEarliestVisibleSnapshot(ikey_.sequence, &prev_snapshot); 343 344 if (need_skip) { 345 // This case is handled below. 346 } else if (clear_and_output_next_key_) { 347 // In the previous iteration we encountered a single delete that we could 348 // not compact out. We will keep this Put, but can drop it's data. 349 // (See Optimization 3, below.) 350 assert(ikey_.type == kTypeValue); 351 if (ikey_.type != kTypeValue) { 352 ROCKS_LOG_FATAL(info_log_, 353 "Unexpected key type %d for compaction output", 354 ikey_.type); 355 } 356 assert(current_user_key_snapshot_ == last_snapshot); 357 if (current_user_key_snapshot_ != last_snapshot) { 358 ROCKS_LOG_FATAL(info_log_, 359 "current_user_key_snapshot_ (%" PRIu64 360 ") != last_snapshot (%" PRIu64 ")", 361 current_user_key_snapshot_, last_snapshot); 362 } 363 364 value_.clear(); 365 valid_ = true; 366 clear_and_output_next_key_ = false; 367 } else if (ikey_.type == kTypeSingleDeletion) { 368 // We can compact out a SingleDelete if: 369 // 1) We encounter the corresponding PUT -OR- we know that this key 370 // doesn't appear past this output level 371 // =AND= 372 // 2) We've already returned a record in this snapshot -OR- 373 // there are no earlier earliest_write_conflict_snapshot. 374 // 375 // Rule 1 is needed for SingleDelete correctness. Rule 2 is needed to 376 // allow Transactions to do write-conflict checking (if we compacted away 377 // all keys, then we wouldn't know that a write happened in this 378 // snapshot). If there is no earlier snapshot, then we know that there 379 // are no active transactions that need to know about any writes. 380 // 381 // Optimization 3: 382 // If we encounter a SingleDelete followed by a PUT and Rule 2 is NOT 383 // true, then we must output a SingleDelete. In this case, we will decide 384 // to also output the PUT. While we are compacting less by outputting the 385 // PUT now, hopefully this will lead to better compaction in the future 386 // when Rule 2 is later true (Ie, We are hoping we can later compact out 387 // both the SingleDelete and the Put, while we couldn't if we only 388 // outputted the SingleDelete now). 389 // In this case, we can save space by removing the PUT's value as it will 390 // never be read. 391 // 392 // Deletes and Merges are not supported on the same key that has a 393 // SingleDelete as it is not possible to correctly do any partial 394 // compaction of such a combination of operations. The result of mixing 395 // those operations for a given key is documented as being undefined. So 396 // we can choose how to handle such a combinations of operations. We will 397 // try to compact out as much as we can in these cases. 398 // We will report counts on these anomalous cases. 399 400 // The easiest way to process a SingleDelete during iteration is to peek 401 // ahead at the next key. 402 ParsedInternalKey next_ikey; 403 input_->Next(); 404 405 // Check whether the next key exists, is not corrupt, and is the same key 406 // as the single delete. 407 if (input_->Valid() && ParseInternalKey(input_->key(), &next_ikey) && 408 cmp_->Equal(ikey_.user_key, next_ikey.user_key)) { 409 // Check whether the next key belongs to the same snapshot as the 410 // SingleDelete. 411 if (prev_snapshot == 0 || 412 DEFINITELY_NOT_IN_SNAPSHOT(next_ikey.sequence, prev_snapshot)) { 413 if (next_ikey.type == kTypeSingleDeletion) { 414 // We encountered two SingleDeletes in a row. This could be due to 415 // unexpected user input. 416 // Skip the first SingleDelete and let the next iteration decide how 417 // to handle the second SingleDelete 418 419 // First SingleDelete has been skipped since we already called 420 // input_->Next(). 421 ++iter_stats_.num_record_drop_obsolete; 422 ++iter_stats_.num_single_del_mismatch; 423 } else if (has_outputted_key_ || 424 DEFINITELY_IN_SNAPSHOT( 425 ikey_.sequence, earliest_write_conflict_snapshot_)) { 426 // Found a matching value, we can drop the single delete and the 427 // value. It is safe to drop both records since we've already 428 // outputted a key in this snapshot, or there is no earlier 429 // snapshot (Rule 2 above). 430 431 // Note: it doesn't matter whether the second key is a Put or if it 432 // is an unexpected Merge or Delete. We will compact it out 433 // either way. We will maintain counts of how many mismatches 434 // happened 435 if (next_ikey.type != kTypeValue && 436 next_ikey.type != kTypeBlobIndex) { 437 ++iter_stats_.num_single_del_mismatch; 438 } 439 440 ++iter_stats_.num_record_drop_hidden; 441 ++iter_stats_.num_record_drop_obsolete; 442 // Already called input_->Next() once. Call it a second time to 443 // skip past the second key. 444 input_->Next(); 445 } else { 446 // Found a matching value, but we cannot drop both keys since 447 // there is an earlier snapshot and we need to leave behind a record 448 // to know that a write happened in this snapshot (Rule 2 above). 449 // Clear the value and output the SingleDelete. (The value will be 450 // outputted on the next iteration.) 451 452 // Setting valid_ to true will output the current SingleDelete 453 valid_ = true; 454 455 // Set up the Put to be outputted in the next iteration. 456 // (Optimization 3). 457 clear_and_output_next_key_ = true; 458 } 459 } else { 460 // We hit the next snapshot without hitting a put, so the iterator 461 // returns the single delete. 462 valid_ = true; 463 } 464 } else { 465 // We are at the end of the input, could not parse the next key, or hit 466 // a different key. The iterator returns the single delete if the key 467 // possibly exists beyond the current output level. We set 468 // has_current_user_key to false so that if the iterator is at the next 469 // key, we do not compare it again against the previous key at the next 470 // iteration. If the next key is corrupt, we return before the 471 // comparison, so the value of has_current_user_key does not matter. 472 has_current_user_key_ = false; 473 if (compaction_ != nullptr && IN_EARLIEST_SNAPSHOT(ikey_.sequence) && 474 compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key, 475 &level_ptrs_)) { 476 // Key doesn't exist outside of this range. 477 // Can compact out this SingleDelete. 478 ++iter_stats_.num_record_drop_obsolete; 479 ++iter_stats_.num_single_del_fallthru; 480 if (!bottommost_level_) { 481 ++iter_stats_.num_optimized_del_drop_obsolete; 482 } 483 } else { 484 // Output SingleDelete 485 valid_ = true; 486 } 487 } 488 489 if (valid_) { 490 at_next_ = true; 491 } 492 } else if (last_snapshot == current_user_key_snapshot_ || 493 (last_snapshot > 0 && 494 last_snapshot < current_user_key_snapshot_)) { 495 // If the earliest snapshot is which this key is visible in 496 // is the same as the visibility of a previous instance of the 497 // same key, then this kv is not visible in any snapshot. 498 // Hidden by an newer entry for same user key 499 // 500 // Note: Dropping this key will not affect TransactionDB write-conflict 501 // checking since there has already been a record returned for this key 502 // in this snapshot. 503 assert(last_sequence >= current_user_key_sequence_); 504 if (last_sequence < current_user_key_sequence_) { 505 ROCKS_LOG_FATAL(info_log_, 506 "last_sequence (%" PRIu64 507 ") < current_user_key_sequence_ (%" PRIu64 ")", 508 last_sequence, current_user_key_sequence_); 509 } 510 511 ++iter_stats_.num_record_drop_hidden; // (A) 512 input_->Next(); 513 } else if (compaction_ != nullptr && ikey_.type == kTypeDeletion && 514 IN_EARLIEST_SNAPSHOT(ikey_.sequence) && 515 ikeyNotNeededForIncrementalSnapshot() && 516 compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key, 517 &level_ptrs_)) { 518 // TODO(noetzli): This is the only place where we use compaction_ 519 // (besides the constructor). We should probably get rid of this 520 // dependency and find a way to do similar filtering during flushes. 521 // 522 // For this user key: 523 // (1) there is no data in higher levels 524 // (2) data in lower levels will have larger sequence numbers 525 // (3) data in layers that are being compacted here and have 526 // smaller sequence numbers will be dropped in the next 527 // few iterations of this loop (by rule (A) above). 528 // Therefore this deletion marker is obsolete and can be dropped. 529 // 530 // Note: Dropping this Delete will not affect TransactionDB 531 // write-conflict checking since it is earlier than any snapshot. 532 // 533 // It seems that we can also drop deletion later than earliest snapshot 534 // given that: 535 // (1) The deletion is earlier than earliest_write_conflict_snapshot, and 536 // (2) No value exist earlier than the deletion. 537 ++iter_stats_.num_record_drop_obsolete; 538 if (!bottommost_level_) { 539 ++iter_stats_.num_optimized_del_drop_obsolete; 540 } 541 input_->Next(); 542 } else if ((ikey_.type == kTypeDeletion) && bottommost_level_ && 543 ikeyNotNeededForIncrementalSnapshot()) { 544 // Handle the case where we have a delete key at the bottom most level 545 // We can skip outputting the key iff there are no subsequent puts for this 546 // key 547 ParsedInternalKey next_ikey; 548 input_->Next(); 549 // Skip over all versions of this key that happen to occur in the same snapshot 550 // range as the delete 551 while (input_->Valid() && ParseInternalKey(input_->key(), &next_ikey) && 552 cmp_->Equal(ikey_.user_key, next_ikey.user_key) && 553 (prev_snapshot == 0 || 554 DEFINITELY_NOT_IN_SNAPSHOT(next_ikey.sequence, prev_snapshot))) { 555 input_->Next(); 556 } 557 // If you find you still need to output a row with this key, we need to output the 558 // delete too 559 if (input_->Valid() && ParseInternalKey(input_->key(), &next_ikey) && 560 cmp_->Equal(ikey_.user_key, next_ikey.user_key)) { 561 valid_ = true; 562 at_next_ = true; 563 } 564 } else if (ikey_.type == kTypeMerge) { 565 if (!merge_helper_->HasOperator()) { 566 status_ = Status::InvalidArgument( 567 "merge_operator is not properly initialized."); 568 return; 569 } 570 571 pinned_iters_mgr_.StartPinning(); 572 // We know the merge type entry is not hidden, otherwise we would 573 // have hit (A) 574 // We encapsulate the merge related state machine in a different 575 // object to minimize change to the existing flow. 576 Status s = merge_helper_->MergeUntil(input_, range_del_agg_, 577 prev_snapshot, bottommost_level_); 578 merge_out_iter_.SeekToFirst(); 579 580 if (!s.ok() && !s.IsMergeInProgress()) { 581 status_ = s; 582 return; 583 } else if (merge_out_iter_.Valid()) { 584 // NOTE: key, value, and ikey_ refer to old entries. 585 // These will be correctly set below. 586 key_ = merge_out_iter_.key(); 587 value_ = merge_out_iter_.value(); 588 bool valid_key __attribute__((__unused__)); 589 valid_key = ParseInternalKey(key_, &ikey_); 590 // MergeUntil stops when it encounters a corrupt key and does not 591 // include them in the result, so we expect the keys here to valid. 592 assert(valid_key); 593 if (!valid_key) { 594 ROCKS_LOG_FATAL(info_log_, "Invalid key (%s) in compaction", 595 key_.ToString(true).c_str()); 596 } 597 // Keep current_key_ in sync. 598 current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type); 599 key_ = current_key_.GetInternalKey(); 600 ikey_.user_key = current_key_.GetUserKey(); 601 valid_ = true; 602 } else { 603 // all merge operands were filtered out. reset the user key, since the 604 // batch consumed by the merge operator should not shadow any keys 605 // coming after the merges 606 has_current_user_key_ = false; 607 pinned_iters_mgr_.ReleasePinnedData(); 608 609 if (merge_helper_->FilteredUntil(&skip_until)) { 610 need_skip = true; 611 } 612 } 613 } else { 614 // 1. new user key -OR- 615 // 2. different snapshot stripe 616 bool should_delete = range_del_agg_->ShouldDelete( 617 key_, RangeDelPositioningMode::kForwardTraversal); 618 if (should_delete) { 619 ++iter_stats_.num_record_drop_hidden; 620 ++iter_stats_.num_record_drop_range_del; 621 input_->Next(); 622 } else { 623 valid_ = true; 624 } 625 } 626 627 if (need_skip) { 628 input_->Seek(skip_until); 629 } 630 } 631 632 if (!valid_ && IsShuttingDown()) { 633 status_ = Status::ShutdownInProgress(); 634 } 635 636 if (IsPausingManualCompaction()) { 637 status_ = Status::Incomplete(Status::SubCode::kManualCompactionPaused); 638 } 639 } 640 641 void CompactionIterator::PrepareOutput() { 642 if (valid_) { 643 if (compaction_filter_ && ikey_.type == kTypeBlobIndex) { 644 const auto blob_decision = compaction_filter_->PrepareBlobOutput( 645 user_key(), value_, &compaction_filter_value_); 646 647 if (blob_decision == CompactionFilter::BlobDecision::kCorruption) { 648 status_ = Status::Corruption( 649 "Corrupted blob reference encountered during GC"); 650 valid_ = false; 651 } else if (blob_decision == CompactionFilter::BlobDecision::kIOError) { 652 status_ = Status::IOError("Could not relocate blob during GC"); 653 valid_ = false; 654 } else if (blob_decision == 655 CompactionFilter::BlobDecision::kChangeValue) { 656 value_ = compaction_filter_value_; 657 } 658 } 659 660 // Zeroing out the sequence number leads to better compression. 661 // If this is the bottommost level (no files in lower levels) 662 // and the earliest snapshot is larger than this seqno 663 // and the userkey differs from the last userkey in compaction 664 // then we can squash the seqno to zero. 665 // 666 // This is safe for TransactionDB write-conflict checking since transactions 667 // only care about sequence number larger than any active snapshots. 668 // 669 // Can we do the same for levels above bottom level as long as 670 // KeyNotExistsBeyondOutputLevel() return true? 671 if (valid_ && compaction_ != nullptr && 672 !compaction_->allow_ingest_behind() && 673 ikeyNotNeededForIncrementalSnapshot() && bottommost_level_ && 674 IN_EARLIEST_SNAPSHOT(ikey_.sequence) && ikey_.type != kTypeMerge) { 675 assert(ikey_.type != kTypeDeletion && ikey_.type != kTypeSingleDeletion); 676 if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion) { 677 ROCKS_LOG_FATAL(info_log_, 678 "Unexpected key type %d for seq-zero optimization", 679 ikey_.type); 680 } 681 ikey_.sequence = 0; 682 current_key_.UpdateInternalKey(0, ikey_.type); 683 } 684 } 685 } 686 687 inline SequenceNumber CompactionIterator::findEarliestVisibleSnapshot( 688 SequenceNumber in, SequenceNumber* prev_snapshot) { 689 assert(snapshots_->size()); 690 if (snapshots_->size() == 0) { 691 ROCKS_LOG_FATAL(info_log_, 692 "No snapshot left in findEarliestVisibleSnapshot"); 693 } 694 auto snapshots_iter = std::lower_bound( 695 snapshots_->begin(), snapshots_->end(), in); 696 if (snapshots_iter == snapshots_->begin()) { 697 *prev_snapshot = 0; 698 } else { 699 *prev_snapshot = *std::prev(snapshots_iter); 700 assert(*prev_snapshot < in); 701 if (*prev_snapshot >= in) { 702 ROCKS_LOG_FATAL(info_log_, 703 "*prev_snapshot >= in in findEarliestVisibleSnapshot"); 704 } 705 } 706 if (snapshot_checker_ == nullptr) { 707 return snapshots_iter != snapshots_->end() 708 ? *snapshots_iter : kMaxSequenceNumber; 709 } 710 bool has_released_snapshot = !released_snapshots_.empty(); 711 for (; snapshots_iter != snapshots_->end(); ++snapshots_iter) { 712 auto cur = *snapshots_iter; 713 assert(in <= cur); 714 if (in > cur) { 715 ROCKS_LOG_FATAL(info_log_, "in > cur in findEarliestVisibleSnapshot"); 716 } 717 // Skip if cur is in released_snapshots. 718 if (has_released_snapshot && released_snapshots_.count(cur) > 0) { 719 continue; 720 } 721 auto res = snapshot_checker_->CheckInSnapshot(in, cur); 722 if (res == SnapshotCheckerResult::kInSnapshot) { 723 return cur; 724 } else if (res == SnapshotCheckerResult::kSnapshotReleased) { 725 released_snapshots_.insert(cur); 726 } 727 *prev_snapshot = cur; 728 } 729 return kMaxSequenceNumber; 730 } 731 732 // used in 2 places - prevents deletion markers to be dropped if they may be 733 // needed and disables seqnum zero-out in PrepareOutput for recent keys. 734 inline bool CompactionIterator::ikeyNotNeededForIncrementalSnapshot() { 735 return (!compaction_->preserve_deletes()) || 736 (ikey_.sequence < preserve_deletes_seqnum_); 737 } 738 739 bool CompactionIterator::IsInEarliestSnapshot(SequenceNumber sequence) { 740 assert(snapshot_checker_ != nullptr); 741 bool pre_condition = (earliest_snapshot_ == kMaxSequenceNumber || 742 (earliest_snapshot_iter_ != snapshots_->end() && 743 *earliest_snapshot_iter_ == earliest_snapshot_)); 744 assert(pre_condition); 745 if (!pre_condition) { 746 ROCKS_LOG_FATAL(info_log_, 747 "Pre-Condition is not hold in IsInEarliestSnapshot"); 748 } 749 auto in_snapshot = 750 snapshot_checker_->CheckInSnapshot(sequence, earliest_snapshot_); 751 while (UNLIKELY(in_snapshot == SnapshotCheckerResult::kSnapshotReleased)) { 752 // Avoid the the current earliest_snapshot_ being return as 753 // earliest visible snapshot for the next value. So if a value's sequence 754 // is zero-ed out by PrepareOutput(), the next value will be compact out. 755 released_snapshots_.insert(earliest_snapshot_); 756 earliest_snapshot_iter_++; 757 758 if (earliest_snapshot_iter_ == snapshots_->end()) { 759 earliest_snapshot_ = kMaxSequenceNumber; 760 } else { 761 earliest_snapshot_ = *earliest_snapshot_iter_; 762 } 763 in_snapshot = 764 snapshot_checker_->CheckInSnapshot(sequence, earliest_snapshot_); 765 } 766 assert(in_snapshot != SnapshotCheckerResult::kSnapshotReleased); 767 if (in_snapshot == SnapshotCheckerResult::kSnapshotReleased) { 768 ROCKS_LOG_FATAL(info_log_, 769 "Unexpected released snapshot in IsInEarliestSnapshot"); 770 } 771 return in_snapshot == SnapshotCheckerResult::kInSnapshot; 772 } 773 774 } // namespace ROCKSDB_NAMESPACE 775