1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #include <cinttypes>
7 
8 #include "db/compaction/compaction_iterator.h"
9 #include "db/snapshot_checker.h"
10 #include "port/likely.h"
11 #include "rocksdb/listener.h"
12 #include "table/internal_iterator.h"
13 #include "test_util/sync_point.h"
14 
15 #define DEFINITELY_IN_SNAPSHOT(seq, snapshot)                       \
16   ((seq) <= (snapshot) &&                                           \
17    (snapshot_checker_ == nullptr ||                                 \
18     LIKELY(snapshot_checker_->CheckInSnapshot((seq), (snapshot)) == \
19            SnapshotCheckerResult::kInSnapshot)))
20 
21 #define DEFINITELY_NOT_IN_SNAPSHOT(seq, snapshot)                     \
22   ((seq) > (snapshot) ||                                              \
23    (snapshot_checker_ != nullptr &&                                   \
24     UNLIKELY(snapshot_checker_->CheckInSnapshot((seq), (snapshot)) == \
25              SnapshotCheckerResult::kNotInSnapshot)))
26 
27 #define IN_EARLIEST_SNAPSHOT(seq) \
28   ((seq) <= earliest_snapshot_ && \
29    (snapshot_checker_ == nullptr || LIKELY(IsInEarliestSnapshot(seq))))
30 
31 namespace ROCKSDB_NAMESPACE {
32 
33 CompactionIterator::CompactionIterator(
34     InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
35     SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
36     SequenceNumber earliest_write_conflict_snapshot,
37     const SnapshotChecker* snapshot_checker, Env* env,
38     bool report_detailed_time, bool expect_valid_internal_key,
39     CompactionRangeDelAggregator* range_del_agg, const Compaction* compaction,
40     const CompactionFilter* compaction_filter,
41     const std::atomic<bool>* shutting_down,
42     const SequenceNumber preserve_deletes_seqnum,
43     const std::atomic<bool>* manual_compaction_paused,
44     const std::shared_ptr<Logger> info_log)
45     : CompactionIterator(
46           input, cmp, merge_helper, last_sequence, snapshots,
47           earliest_write_conflict_snapshot, snapshot_checker, env,
48           report_detailed_time, expect_valid_internal_key, range_del_agg,
49           std::unique_ptr<CompactionProxy>(
50               compaction ? new CompactionProxy(compaction) : nullptr),
51           compaction_filter, shutting_down, preserve_deletes_seqnum,
52           manual_compaction_paused, info_log) {}
53 
54 CompactionIterator::CompactionIterator(
55     InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
56     SequenceNumber /*last_sequence*/, std::vector<SequenceNumber>* snapshots,
57     SequenceNumber earliest_write_conflict_snapshot,
58     const SnapshotChecker* snapshot_checker, Env* env,
59     bool report_detailed_time, bool expect_valid_internal_key,
60     CompactionRangeDelAggregator* range_del_agg,
61     std::unique_ptr<CompactionProxy> compaction,
62     const CompactionFilter* compaction_filter,
63     const std::atomic<bool>* shutting_down,
64     const SequenceNumber preserve_deletes_seqnum,
65     const std::atomic<bool>* manual_compaction_paused,
66     const std::shared_ptr<Logger> info_log)
67     : input_(input),
68       cmp_(cmp),
69       merge_helper_(merge_helper),
70       snapshots_(snapshots),
71       earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
72       snapshot_checker_(snapshot_checker),
73       env_(env),
74       report_detailed_time_(report_detailed_time),
75       expect_valid_internal_key_(expect_valid_internal_key),
76       range_del_agg_(range_del_agg),
77       compaction_(std::move(compaction)),
78       compaction_filter_(compaction_filter),
79       shutting_down_(shutting_down),
80       manual_compaction_paused_(manual_compaction_paused),
81       preserve_deletes_seqnum_(preserve_deletes_seqnum),
82       current_user_key_sequence_(0),
83       current_user_key_snapshot_(0),
84       merge_out_iter_(merge_helper_),
85       current_key_committed_(false),
86       info_log_(info_log) {
87   assert(compaction_filter_ == nullptr || compaction_ != nullptr);
88   assert(snapshots_ != nullptr);
89   bottommost_level_ =
90       compaction_ == nullptr ? false : compaction_->bottommost_level();
91   if (compaction_ != nullptr) {
92     level_ptrs_ = std::vector<size_t>(compaction_->number_levels(), 0);
93   }
94   if (snapshots_->size() == 0) {
95     // optimize for fast path if there are no snapshots
96     visible_at_tip_ = true;
97     earliest_snapshot_iter_ = snapshots_->end();
98     earliest_snapshot_ = kMaxSequenceNumber;
99     latest_snapshot_ = 0;
100   } else {
101     visible_at_tip_ = false;
102     earliest_snapshot_iter_ = snapshots_->begin();
103     earliest_snapshot_ = snapshots_->at(0);
104     latest_snapshot_ = snapshots_->back();
105   }
106 #ifndef NDEBUG
107   // findEarliestVisibleSnapshot assumes this ordering.
108   for (size_t i = 1; i < snapshots_->size(); ++i) {
109     assert(snapshots_->at(i - 1) < snapshots_->at(i));
110   }
111 #endif
112   input_->SetPinnedItersMgr(&pinned_iters_mgr_);
113   TEST_SYNC_POINT_CALLBACK("CompactionIterator:AfterInit", compaction_.get());
114 }
115 
116 CompactionIterator::~CompactionIterator() {
117   // input_ Iteartor lifetime is longer than pinned_iters_mgr_ lifetime
118   input_->SetPinnedItersMgr(nullptr);
119 }
120 
121 void CompactionIterator::ResetRecordCounts() {
122   iter_stats_.num_record_drop_user = 0;
123   iter_stats_.num_record_drop_hidden = 0;
124   iter_stats_.num_record_drop_obsolete = 0;
125   iter_stats_.num_record_drop_range_del = 0;
126   iter_stats_.num_range_del_drop_obsolete = 0;
127   iter_stats_.num_optimized_del_drop_obsolete = 0;
128 }
129 
130 void CompactionIterator::SeekToFirst() {
131   NextFromInput();
132   PrepareOutput();
133 }
134 
135 void CompactionIterator::Next() {
136   // If there is a merge output, return it before continuing to process the
137   // input.
138   if (merge_out_iter_.Valid()) {
139     merge_out_iter_.Next();
140 
141     // Check if we returned all records of the merge output.
142     if (merge_out_iter_.Valid()) {
143       key_ = merge_out_iter_.key();
144       value_ = merge_out_iter_.value();
145       bool valid_key __attribute__((__unused__));
146       valid_key =  ParseInternalKey(key_, &ikey_);
147       // MergeUntil stops when it encounters a corrupt key and does not
148       // include them in the result, so we expect the keys here to be valid.
149       assert(valid_key);
150       if (!valid_key) {
151         ROCKS_LOG_FATAL(info_log_, "Invalid key (%s) in compaction",
152                         key_.ToString(true).c_str());
153       }
154 
155       // Keep current_key_ in sync.
156       current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
157       key_ = current_key_.GetInternalKey();
158       ikey_.user_key = current_key_.GetUserKey();
159       valid_ = true;
160     } else {
161       // We consumed all pinned merge operands, release pinned iterators
162       pinned_iters_mgr_.ReleasePinnedData();
163       // MergeHelper moves the iterator to the first record after the merged
164       // records, so even though we reached the end of the merge output, we do
165       // not want to advance the iterator.
166       NextFromInput();
167     }
168   } else {
169     // Only advance the input iterator if there is no merge output and the
170     // iterator is not already at the next record.
171     if (!at_next_) {
172       input_->Next();
173     }
174     NextFromInput();
175   }
176 
177   if (valid_) {
178     // Record that we've outputted a record for the current key.
179     has_outputted_key_ = true;
180   }
181 
182   PrepareOutput();
183 }
184 
185 void CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
186                                               Slice* skip_until) {
187   if (compaction_filter_ != nullptr &&
188       (ikey_.type == kTypeValue || ikey_.type == kTypeBlobIndex)) {
189     // If the user has specified a compaction filter and the sequence
190     // number is greater than any external snapshot, then invoke the
191     // filter. If the return value of the compaction filter is true,
192     // replace the entry with a deletion marker.
193     CompactionFilter::Decision filter;
194     compaction_filter_value_.clear();
195     compaction_filter_skip_until_.Clear();
196     CompactionFilter::ValueType value_type =
197         ikey_.type == kTypeValue ? CompactionFilter::ValueType::kValue
198                                  : CompactionFilter::ValueType::kBlobIndex;
199     // Hack: pass internal key to BlobIndexCompactionFilter since it needs
200     // to get sequence number.
201     Slice& filter_key = ikey_.type == kTypeValue ? ikey_.user_key : key_;
202     {
203       StopWatchNano timer(env_, report_detailed_time_);
204       filter = compaction_filter_->FilterV2(
205           compaction_->level(), filter_key, value_type, value_,
206           &compaction_filter_value_, compaction_filter_skip_until_.rep());
207       iter_stats_.total_filter_time +=
208           env_ != nullptr && report_detailed_time_ ? timer.ElapsedNanos() : 0;
209     }
210 
211     if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil &&
212         cmp_->Compare(*compaction_filter_skip_until_.rep(), ikey_.user_key) <=
213             0) {
214       // Can't skip to a key smaller than the current one.
215       // Keep the key as per FilterV2 documentation.
216       filter = CompactionFilter::Decision::kKeep;
217     }
218 
219     if (filter == CompactionFilter::Decision::kRemove) {
220       // convert the current key to a delete; key_ is pointing into
221       // current_key_ at this point, so updating current_key_ updates key()
222       ikey_.type = kTypeDeletion;
223       current_key_.UpdateInternalKey(ikey_.sequence, kTypeDeletion);
224       // no value associated with delete
225       value_.clear();
226       iter_stats_.num_record_drop_user++;
227     } else if (filter == CompactionFilter::Decision::kChangeValue) {
228       value_ = compaction_filter_value_;
229     } else if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil) {
230       *need_skip = true;
231       compaction_filter_skip_until_.ConvertFromUserKey(kMaxSequenceNumber,
232                                                        kValueTypeForSeek);
233       *skip_until = compaction_filter_skip_until_.Encode();
234     }
235   }
236 }
237 
238 void CompactionIterator::NextFromInput() {
239   at_next_ = false;
240   valid_ = false;
241 
242   while (!valid_ && input_->Valid() && !IsPausingManualCompaction() &&
243          !IsShuttingDown()) {
244     key_ = input_->key();
245     value_ = input_->value();
246     iter_stats_.num_input_records++;
247 
248     if (!ParseInternalKey(key_, &ikey_)) {
249       // If `expect_valid_internal_key_` is false, return the corrupted key
250       // and let the caller decide what to do with it.
251       // TODO(noetzli): We should have a more elegant solution for this.
252       if (expect_valid_internal_key_) {
253         assert(!"Corrupted internal key not expected.");
254         status_ = Status::Corruption("Corrupted internal key not expected.");
255         break;
256       }
257       key_ = current_key_.SetInternalKey(key_);
258       has_current_user_key_ = false;
259       current_user_key_sequence_ = kMaxSequenceNumber;
260       current_user_key_snapshot_ = 0;
261       iter_stats_.num_input_corrupt_records++;
262       valid_ = true;
263       break;
264     }
265     TEST_SYNC_POINT_CALLBACK("CompactionIterator:ProcessKV", &ikey_);
266 
267     // Update input statistics
268     if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion) {
269       iter_stats_.num_input_deletion_records++;
270     }
271     iter_stats_.total_input_raw_key_bytes += key_.size();
272     iter_stats_.total_input_raw_value_bytes += value_.size();
273 
274     // If need_skip is true, we should seek the input iterator
275     // to internal key skip_until and continue from there.
276     bool need_skip = false;
277     // Points either into compaction_filter_skip_until_ or into
278     // merge_helper_->compaction_filter_skip_until_.
279     Slice skip_until;
280 
281     // Check whether the user key changed. After this if statement current_key_
282     // is a copy of the current input key (maybe converted to a delete by the
283     // compaction filter). ikey_.user_key is pointing to the copy.
284     if (!has_current_user_key_ ||
285         !cmp_->Equal(ikey_.user_key, current_user_key_)) {
286       // First occurrence of this user key
287       // Copy key for output
288       key_ = current_key_.SetInternalKey(key_, &ikey_);
289       current_user_key_ = ikey_.user_key;
290       has_current_user_key_ = true;
291       has_outputted_key_ = false;
292       current_user_key_sequence_ = kMaxSequenceNumber;
293       current_user_key_snapshot_ = 0;
294       current_key_committed_ = KeyCommitted(ikey_.sequence);
295 
296       // Apply the compaction filter to the first committed version of the user
297       // key.
298       if (current_key_committed_) {
299         InvokeFilterIfNeeded(&need_skip, &skip_until);
300       }
301     } else {
302       // Update the current key to reflect the new sequence number/type without
303       // copying the user key.
304       // TODO(rven): Compaction filter does not process keys in this path
305       // Need to have the compaction filter process multiple versions
306       // if we have versions on both sides of a snapshot
307       current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
308       key_ = current_key_.GetInternalKey();
309       ikey_.user_key = current_key_.GetUserKey();
310 
311       // Note that newer version of a key is ordered before older versions. If a
312       // newer version of a key is committed, so as the older version. No need
313       // to query snapshot_checker_ in that case.
314       if (UNLIKELY(!current_key_committed_)) {
315         assert(snapshot_checker_ != nullptr);
316         current_key_committed_ = KeyCommitted(ikey_.sequence);
317         // Apply the compaction filter to the first committed version of the
318         // user key.
319         if (current_key_committed_) {
320           InvokeFilterIfNeeded(&need_skip, &skip_until);
321         }
322       }
323     }
324 
325     if (UNLIKELY(!current_key_committed_)) {
326       assert(snapshot_checker_ != nullptr);
327       valid_ = true;
328       break;
329     }
330 
331     // If there are no snapshots, then this kv affect visibility at tip.
332     // Otherwise, search though all existing snapshots to find the earliest
333     // snapshot that is affected by this kv.
334     SequenceNumber last_sequence __attribute__((__unused__));
335     last_sequence = current_user_key_sequence_;
336     current_user_key_sequence_ = ikey_.sequence;
337     SequenceNumber last_snapshot = current_user_key_snapshot_;
338     SequenceNumber prev_snapshot = 0;  // 0 means no previous snapshot
339     current_user_key_snapshot_ =
340         visible_at_tip_
341             ? earliest_snapshot_
342             : findEarliestVisibleSnapshot(ikey_.sequence, &prev_snapshot);
343 
344     if (need_skip) {
345       // This case is handled below.
346     } else if (clear_and_output_next_key_) {
347       // In the previous iteration we encountered a single delete that we could
348       // not compact out.  We will keep this Put, but can drop it's data.
349       // (See Optimization 3, below.)
350       assert(ikey_.type == kTypeValue);
351       if (ikey_.type != kTypeValue) {
352         ROCKS_LOG_FATAL(info_log_,
353                         "Unexpected key type %d for compaction output",
354                         ikey_.type);
355       }
356       assert(current_user_key_snapshot_ == last_snapshot);
357       if (current_user_key_snapshot_ != last_snapshot) {
358         ROCKS_LOG_FATAL(info_log_,
359                         "current_user_key_snapshot_ (%" PRIu64
360                         ") != last_snapshot (%" PRIu64 ")",
361                         current_user_key_snapshot_, last_snapshot);
362       }
363 
364       value_.clear();
365       valid_ = true;
366       clear_and_output_next_key_ = false;
367     } else if (ikey_.type == kTypeSingleDeletion) {
368       // We can compact out a SingleDelete if:
369       // 1) We encounter the corresponding PUT -OR- we know that this key
370       //    doesn't appear past this output level
371       // =AND=
372       // 2) We've already returned a record in this snapshot -OR-
373       //    there are no earlier earliest_write_conflict_snapshot.
374       //
375       // Rule 1 is needed for SingleDelete correctness.  Rule 2 is needed to
376       // allow Transactions to do write-conflict checking (if we compacted away
377       // all keys, then we wouldn't know that a write happened in this
378       // snapshot).  If there is no earlier snapshot, then we know that there
379       // are no active transactions that need to know about any writes.
380       //
381       // Optimization 3:
382       // If we encounter a SingleDelete followed by a PUT and Rule 2 is NOT
383       // true, then we must output a SingleDelete.  In this case, we will decide
384       // to also output the PUT.  While we are compacting less by outputting the
385       // PUT now, hopefully this will lead to better compaction in the future
386       // when Rule 2 is later true (Ie, We are hoping we can later compact out
387       // both the SingleDelete and the Put, while we couldn't if we only
388       // outputted the SingleDelete now).
389       // In this case, we can save space by removing the PUT's value as it will
390       // never be read.
391       //
392       // Deletes and Merges are not supported on the same key that has a
393       // SingleDelete as it is not possible to correctly do any partial
394       // compaction of such a combination of operations.  The result of mixing
395       // those operations for a given key is documented as being undefined.  So
396       // we can choose how to handle such a combinations of operations.  We will
397       // try to compact out as much as we can in these cases.
398       // We will report counts on these anomalous cases.
399 
400       // The easiest way to process a SingleDelete during iteration is to peek
401       // ahead at the next key.
402       ParsedInternalKey next_ikey;
403       input_->Next();
404 
405       // Check whether the next key exists, is not corrupt, and is the same key
406       // as the single delete.
407       if (input_->Valid() && ParseInternalKey(input_->key(), &next_ikey) &&
408           cmp_->Equal(ikey_.user_key, next_ikey.user_key)) {
409         // Check whether the next key belongs to the same snapshot as the
410         // SingleDelete.
411         if (prev_snapshot == 0 ||
412             DEFINITELY_NOT_IN_SNAPSHOT(next_ikey.sequence, prev_snapshot)) {
413           if (next_ikey.type == kTypeSingleDeletion) {
414             // We encountered two SingleDeletes in a row.  This could be due to
415             // unexpected user input.
416             // Skip the first SingleDelete and let the next iteration decide how
417             // to handle the second SingleDelete
418 
419             // First SingleDelete has been skipped since we already called
420             // input_->Next().
421             ++iter_stats_.num_record_drop_obsolete;
422             ++iter_stats_.num_single_del_mismatch;
423           } else if (has_outputted_key_ ||
424                      DEFINITELY_IN_SNAPSHOT(
425                          ikey_.sequence, earliest_write_conflict_snapshot_)) {
426             // Found a matching value, we can drop the single delete and the
427             // value.  It is safe to drop both records since we've already
428             // outputted a key in this snapshot, or there is no earlier
429             // snapshot (Rule 2 above).
430 
431             // Note: it doesn't matter whether the second key is a Put or if it
432             // is an unexpected Merge or Delete.  We will compact it out
433             // either way. We will maintain counts of how many mismatches
434             // happened
435             if (next_ikey.type != kTypeValue &&
436                 next_ikey.type != kTypeBlobIndex) {
437               ++iter_stats_.num_single_del_mismatch;
438             }
439 
440             ++iter_stats_.num_record_drop_hidden;
441             ++iter_stats_.num_record_drop_obsolete;
442             // Already called input_->Next() once.  Call it a second time to
443             // skip past the second key.
444             input_->Next();
445           } else {
446             // Found a matching value, but we cannot drop both keys since
447             // there is an earlier snapshot and we need to leave behind a record
448             // to know that a write happened in this snapshot (Rule 2 above).
449             // Clear the value and output the SingleDelete. (The value will be
450             // outputted on the next iteration.)
451 
452             // Setting valid_ to true will output the current SingleDelete
453             valid_ = true;
454 
455             // Set up the Put to be outputted in the next iteration.
456             // (Optimization 3).
457             clear_and_output_next_key_ = true;
458           }
459         } else {
460           // We hit the next snapshot without hitting a put, so the iterator
461           // returns the single delete.
462           valid_ = true;
463         }
464       } else {
465         // We are at the end of the input, could not parse the next key, or hit
466         // a different key. The iterator returns the single delete if the key
467         // possibly exists beyond the current output level.  We set
468         // has_current_user_key to false so that if the iterator is at the next
469         // key, we do not compare it again against the previous key at the next
470         // iteration. If the next key is corrupt, we return before the
471         // comparison, so the value of has_current_user_key does not matter.
472         has_current_user_key_ = false;
473         if (compaction_ != nullptr && IN_EARLIEST_SNAPSHOT(ikey_.sequence) &&
474             compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key,
475                                                        &level_ptrs_)) {
476           // Key doesn't exist outside of this range.
477           // Can compact out this SingleDelete.
478           ++iter_stats_.num_record_drop_obsolete;
479           ++iter_stats_.num_single_del_fallthru;
480           if (!bottommost_level_) {
481             ++iter_stats_.num_optimized_del_drop_obsolete;
482           }
483         } else {
484           // Output SingleDelete
485           valid_ = true;
486         }
487       }
488 
489       if (valid_) {
490         at_next_ = true;
491       }
492     } else if (last_snapshot == current_user_key_snapshot_ ||
493                (last_snapshot > 0 &&
494                 last_snapshot < current_user_key_snapshot_)) {
495       // If the earliest snapshot is which this key is visible in
496       // is the same as the visibility of a previous instance of the
497       // same key, then this kv is not visible in any snapshot.
498       // Hidden by an newer entry for same user key
499       //
500       // Note: Dropping this key will not affect TransactionDB write-conflict
501       // checking since there has already been a record returned for this key
502       // in this snapshot.
503       assert(last_sequence >= current_user_key_sequence_);
504       if (last_sequence < current_user_key_sequence_) {
505         ROCKS_LOG_FATAL(info_log_,
506                         "last_sequence (%" PRIu64
507                         ") < current_user_key_sequence_ (%" PRIu64 ")",
508                         last_sequence, current_user_key_sequence_);
509       }
510 
511       ++iter_stats_.num_record_drop_hidden;  // (A)
512       input_->Next();
513     } else if (compaction_ != nullptr && ikey_.type == kTypeDeletion &&
514                IN_EARLIEST_SNAPSHOT(ikey_.sequence) &&
515                ikeyNotNeededForIncrementalSnapshot() &&
516                compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key,
517                                                           &level_ptrs_)) {
518       // TODO(noetzli): This is the only place where we use compaction_
519       // (besides the constructor). We should probably get rid of this
520       // dependency and find a way to do similar filtering during flushes.
521       //
522       // For this user key:
523       // (1) there is no data in higher levels
524       // (2) data in lower levels will have larger sequence numbers
525       // (3) data in layers that are being compacted here and have
526       //     smaller sequence numbers will be dropped in the next
527       //     few iterations of this loop (by rule (A) above).
528       // Therefore this deletion marker is obsolete and can be dropped.
529       //
530       // Note:  Dropping this Delete will not affect TransactionDB
531       // write-conflict checking since it is earlier than any snapshot.
532       //
533       // It seems that we can also drop deletion later than earliest snapshot
534       // given that:
535       // (1) The deletion is earlier than earliest_write_conflict_snapshot, and
536       // (2) No value exist earlier than the deletion.
537       ++iter_stats_.num_record_drop_obsolete;
538       if (!bottommost_level_) {
539         ++iter_stats_.num_optimized_del_drop_obsolete;
540       }
541       input_->Next();
542     } else if ((ikey_.type == kTypeDeletion) && bottommost_level_ &&
543                ikeyNotNeededForIncrementalSnapshot()) {
544       // Handle the case where we have a delete key at the bottom most level
545       // We can skip outputting the key iff there are no subsequent puts for this
546       // key
547       ParsedInternalKey next_ikey;
548       input_->Next();
549       // Skip over all versions of this key that happen to occur in the same snapshot
550       // range as the delete
551       while (input_->Valid() && ParseInternalKey(input_->key(), &next_ikey) &&
552              cmp_->Equal(ikey_.user_key, next_ikey.user_key) &&
553              (prev_snapshot == 0 ||
554               DEFINITELY_NOT_IN_SNAPSHOT(next_ikey.sequence, prev_snapshot))) {
555         input_->Next();
556       }
557       // If you find you still need to output a row with this key, we need to output the
558       // delete too
559       if (input_->Valid() && ParseInternalKey(input_->key(), &next_ikey) &&
560           cmp_->Equal(ikey_.user_key, next_ikey.user_key)) {
561         valid_ = true;
562         at_next_ = true;
563       }
564     } else if (ikey_.type == kTypeMerge) {
565       if (!merge_helper_->HasOperator()) {
566         status_ = Status::InvalidArgument(
567             "merge_operator is not properly initialized.");
568         return;
569       }
570 
571       pinned_iters_mgr_.StartPinning();
572       // We know the merge type entry is not hidden, otherwise we would
573       // have hit (A)
574       // We encapsulate the merge related state machine in a different
575       // object to minimize change to the existing flow.
576       Status s = merge_helper_->MergeUntil(input_, range_del_agg_,
577                                            prev_snapshot, bottommost_level_);
578       merge_out_iter_.SeekToFirst();
579 
580       if (!s.ok() && !s.IsMergeInProgress()) {
581         status_ = s;
582         return;
583       } else if (merge_out_iter_.Valid()) {
584         // NOTE: key, value, and ikey_ refer to old entries.
585         //       These will be correctly set below.
586         key_ = merge_out_iter_.key();
587         value_ = merge_out_iter_.value();
588         bool valid_key __attribute__((__unused__));
589         valid_key = ParseInternalKey(key_, &ikey_);
590         // MergeUntil stops when it encounters a corrupt key and does not
591         // include them in the result, so we expect the keys here to valid.
592         assert(valid_key);
593         if (!valid_key) {
594           ROCKS_LOG_FATAL(info_log_, "Invalid key (%s) in compaction",
595                           key_.ToString(true).c_str());
596         }
597         // Keep current_key_ in sync.
598         current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
599         key_ = current_key_.GetInternalKey();
600         ikey_.user_key = current_key_.GetUserKey();
601         valid_ = true;
602       } else {
603         // all merge operands were filtered out. reset the user key, since the
604         // batch consumed by the merge operator should not shadow any keys
605         // coming after the merges
606         has_current_user_key_ = false;
607         pinned_iters_mgr_.ReleasePinnedData();
608 
609         if (merge_helper_->FilteredUntil(&skip_until)) {
610           need_skip = true;
611         }
612       }
613     } else {
614       // 1. new user key -OR-
615       // 2. different snapshot stripe
616       bool should_delete = range_del_agg_->ShouldDelete(
617           key_, RangeDelPositioningMode::kForwardTraversal);
618       if (should_delete) {
619         ++iter_stats_.num_record_drop_hidden;
620         ++iter_stats_.num_record_drop_range_del;
621         input_->Next();
622       } else {
623         valid_ = true;
624       }
625     }
626 
627     if (need_skip) {
628       input_->Seek(skip_until);
629     }
630   }
631 
632   if (!valid_ && IsShuttingDown()) {
633     status_ = Status::ShutdownInProgress();
634   }
635 
636   if (IsPausingManualCompaction()) {
637     status_ = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
638   }
639 }
640 
641 void CompactionIterator::PrepareOutput() {
642   if (valid_) {
643     if (compaction_filter_ && ikey_.type == kTypeBlobIndex) {
644       const auto blob_decision = compaction_filter_->PrepareBlobOutput(
645           user_key(), value_, &compaction_filter_value_);
646 
647       if (blob_decision == CompactionFilter::BlobDecision::kCorruption) {
648         status_ = Status::Corruption(
649             "Corrupted blob reference encountered during GC");
650         valid_ = false;
651       } else if (blob_decision == CompactionFilter::BlobDecision::kIOError) {
652         status_ = Status::IOError("Could not relocate blob during GC");
653         valid_ = false;
654       } else if (blob_decision ==
655                  CompactionFilter::BlobDecision::kChangeValue) {
656         value_ = compaction_filter_value_;
657       }
658     }
659 
660     // Zeroing out the sequence number leads to better compression.
661     // If this is the bottommost level (no files in lower levels)
662     // and the earliest snapshot is larger than this seqno
663     // and the userkey differs from the last userkey in compaction
664     // then we can squash the seqno to zero.
665     //
666     // This is safe for TransactionDB write-conflict checking since transactions
667     // only care about sequence number larger than any active snapshots.
668     //
669     // Can we do the same for levels above bottom level as long as
670     // KeyNotExistsBeyondOutputLevel() return true?
671     if (valid_ && compaction_ != nullptr &&
672         !compaction_->allow_ingest_behind() &&
673         ikeyNotNeededForIncrementalSnapshot() && bottommost_level_ &&
674         IN_EARLIEST_SNAPSHOT(ikey_.sequence) && ikey_.type != kTypeMerge) {
675       assert(ikey_.type != kTypeDeletion && ikey_.type != kTypeSingleDeletion);
676       if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion) {
677         ROCKS_LOG_FATAL(info_log_,
678                         "Unexpected key type %d for seq-zero optimization",
679                         ikey_.type);
680       }
681       ikey_.sequence = 0;
682       current_key_.UpdateInternalKey(0, ikey_.type);
683     }
684   }
685 }
686 
687 inline SequenceNumber CompactionIterator::findEarliestVisibleSnapshot(
688     SequenceNumber in, SequenceNumber* prev_snapshot) {
689   assert(snapshots_->size());
690   if (snapshots_->size() == 0) {
691     ROCKS_LOG_FATAL(info_log_,
692                     "No snapshot left in findEarliestVisibleSnapshot");
693   }
694   auto snapshots_iter = std::lower_bound(
695       snapshots_->begin(), snapshots_->end(), in);
696   if (snapshots_iter == snapshots_->begin()) {
697     *prev_snapshot = 0;
698   } else {
699     *prev_snapshot = *std::prev(snapshots_iter);
700     assert(*prev_snapshot < in);
701     if (*prev_snapshot >= in) {
702       ROCKS_LOG_FATAL(info_log_,
703                       "*prev_snapshot >= in in findEarliestVisibleSnapshot");
704     }
705   }
706   if (snapshot_checker_ == nullptr) {
707     return snapshots_iter != snapshots_->end()
708       ? *snapshots_iter : kMaxSequenceNumber;
709   }
710   bool has_released_snapshot = !released_snapshots_.empty();
711   for (; snapshots_iter != snapshots_->end(); ++snapshots_iter) {
712     auto cur = *snapshots_iter;
713     assert(in <= cur);
714     if (in > cur) {
715       ROCKS_LOG_FATAL(info_log_, "in > cur in findEarliestVisibleSnapshot");
716     }
717     // Skip if cur is in released_snapshots.
718     if (has_released_snapshot && released_snapshots_.count(cur) > 0) {
719       continue;
720     }
721     auto res = snapshot_checker_->CheckInSnapshot(in, cur);
722     if (res == SnapshotCheckerResult::kInSnapshot) {
723       return cur;
724     } else if (res == SnapshotCheckerResult::kSnapshotReleased) {
725       released_snapshots_.insert(cur);
726     }
727     *prev_snapshot = cur;
728   }
729   return kMaxSequenceNumber;
730 }
731 
732 // used in 2 places - prevents deletion markers to be dropped if they may be
733 // needed and disables seqnum zero-out in PrepareOutput for recent keys.
734 inline bool CompactionIterator::ikeyNotNeededForIncrementalSnapshot() {
735   return (!compaction_->preserve_deletes()) ||
736          (ikey_.sequence < preserve_deletes_seqnum_);
737 }
738 
739 bool CompactionIterator::IsInEarliestSnapshot(SequenceNumber sequence) {
740   assert(snapshot_checker_ != nullptr);
741   bool pre_condition = (earliest_snapshot_ == kMaxSequenceNumber ||
742                         (earliest_snapshot_iter_ != snapshots_->end() &&
743                          *earliest_snapshot_iter_ == earliest_snapshot_));
744   assert(pre_condition);
745   if (!pre_condition) {
746     ROCKS_LOG_FATAL(info_log_,
747                     "Pre-Condition is not hold in IsInEarliestSnapshot");
748   }
749   auto in_snapshot =
750       snapshot_checker_->CheckInSnapshot(sequence, earliest_snapshot_);
751   while (UNLIKELY(in_snapshot == SnapshotCheckerResult::kSnapshotReleased)) {
752     // Avoid the the current earliest_snapshot_ being return as
753     // earliest visible snapshot for the next value. So if a value's sequence
754     // is zero-ed out by PrepareOutput(), the next value will be compact out.
755     released_snapshots_.insert(earliest_snapshot_);
756     earliest_snapshot_iter_++;
757 
758     if (earliest_snapshot_iter_ == snapshots_->end()) {
759       earliest_snapshot_ = kMaxSequenceNumber;
760     } else {
761       earliest_snapshot_ = *earliest_snapshot_iter_;
762     }
763     in_snapshot =
764         snapshot_checker_->CheckInSnapshot(sequence, earliest_snapshot_);
765   }
766   assert(in_snapshot != SnapshotCheckerResult::kSnapshotReleased);
767   if (in_snapshot == SnapshotCheckerResult::kSnapshotReleased) {
768     ROCKS_LOG_FATAL(info_log_,
769                     "Unexpected released snapshot in IsInEarliestSnapshot");
770   }
771   return in_snapshot == SnapshotCheckerResult::kInSnapshot;
772 }
773 
774 }  // namespace ROCKSDB_NAMESPACE
775