1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #include "db/compaction/compaction_iterator.h"
7 
8 #include <iterator>
9 #include <limits>
10 
11 #include "db/blob/blob_file_builder.h"
12 #include "db/blob/blob_index.h"
13 #include "db/snapshot_checker.h"
14 #include "logging/logging.h"
15 #include "port/likely.h"
16 #include "rocksdb/listener.h"
17 #include "table/internal_iterator.h"
18 #include "test_util/sync_point.h"
19 
20 namespace ROCKSDB_NAMESPACE {
CompactionIterator(InternalIterator * input,const Comparator * cmp,MergeHelper * merge_helper,SequenceNumber last_sequence,std::vector<SequenceNumber> * snapshots,SequenceNumber earliest_write_conflict_snapshot,const SnapshotChecker * snapshot_checker,Env * env,bool report_detailed_time,bool expect_valid_internal_key,CompactionRangeDelAggregator * range_del_agg,BlobFileBuilder * blob_file_builder,bool allow_data_in_errors,const Compaction * compaction,const CompactionFilter * compaction_filter,const std::atomic<bool> * shutting_down,const SequenceNumber preserve_deletes_seqnum,const std::atomic<int> * manual_compaction_paused,const std::atomic<bool> * manual_compaction_canceled,const std::shared_ptr<Logger> info_log,const std::string * full_history_ts_low)21 CompactionIterator::CompactionIterator(
22     InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
23     SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
24     SequenceNumber earliest_write_conflict_snapshot,
25     const SnapshotChecker* snapshot_checker, Env* env,
26     bool report_detailed_time, bool expect_valid_internal_key,
27     CompactionRangeDelAggregator* range_del_agg,
28     BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
29     const Compaction* compaction, const CompactionFilter* compaction_filter,
30     const std::atomic<bool>* shutting_down,
31     const SequenceNumber preserve_deletes_seqnum,
32     const std::atomic<int>* manual_compaction_paused,
33     const std::atomic<bool>* manual_compaction_canceled,
34     const std::shared_ptr<Logger> info_log,
35     const std::string* full_history_ts_low)
36     : CompactionIterator(
37           input, cmp, merge_helper, last_sequence, snapshots,
38           earliest_write_conflict_snapshot, snapshot_checker, env,
39           report_detailed_time, expect_valid_internal_key, range_del_agg,
40           blob_file_builder, allow_data_in_errors,
41           std::unique_ptr<CompactionProxy>(
42               compaction ? new RealCompaction(compaction) : nullptr),
43           compaction_filter, shutting_down, preserve_deletes_seqnum,
44           manual_compaction_paused, manual_compaction_canceled, info_log,
45           full_history_ts_low) {}
46 
CompactionIterator(InternalIterator * input,const Comparator * cmp,MergeHelper * merge_helper,SequenceNumber,std::vector<SequenceNumber> * snapshots,SequenceNumber earliest_write_conflict_snapshot,const SnapshotChecker * snapshot_checker,Env * env,bool report_detailed_time,bool expect_valid_internal_key,CompactionRangeDelAggregator * range_del_agg,BlobFileBuilder * blob_file_builder,bool allow_data_in_errors,std::unique_ptr<CompactionProxy> compaction,const CompactionFilter * compaction_filter,const std::atomic<bool> * shutting_down,const SequenceNumber preserve_deletes_seqnum,const std::atomic<int> * manual_compaction_paused,const std::atomic<bool> * manual_compaction_canceled,const std::shared_ptr<Logger> info_log,const std::string * full_history_ts_low)47 CompactionIterator::CompactionIterator(
48     InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
49     SequenceNumber /*last_sequence*/, std::vector<SequenceNumber>* snapshots,
50     SequenceNumber earliest_write_conflict_snapshot,
51     const SnapshotChecker* snapshot_checker, Env* env,
52     bool report_detailed_time, bool expect_valid_internal_key,
53     CompactionRangeDelAggregator* range_del_agg,
54     BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
55     std::unique_ptr<CompactionProxy> compaction,
56     const CompactionFilter* compaction_filter,
57     const std::atomic<bool>* shutting_down,
58     const SequenceNumber preserve_deletes_seqnum,
59     const std::atomic<int>* manual_compaction_paused,
60     const std::atomic<bool>* manual_compaction_canceled,
61     const std::shared_ptr<Logger> info_log,
62     const std::string* full_history_ts_low)
63     : input_(input, cmp,
64              !compaction || compaction->DoesInputReferenceBlobFiles()),
65       cmp_(cmp),
66       merge_helper_(merge_helper),
67       snapshots_(snapshots),
68       earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
69       snapshot_checker_(snapshot_checker),
70       env_(env),
71       clock_(env_->GetSystemClock().get()),
72       report_detailed_time_(report_detailed_time),
73       expect_valid_internal_key_(expect_valid_internal_key),
74       range_del_agg_(range_del_agg),
75       blob_file_builder_(blob_file_builder),
76       compaction_(std::move(compaction)),
77       compaction_filter_(compaction_filter),
78       shutting_down_(shutting_down),
79       manual_compaction_paused_(manual_compaction_paused),
80       manual_compaction_canceled_(manual_compaction_canceled),
81       preserve_deletes_seqnum_(preserve_deletes_seqnum),
82       info_log_(info_log),
83       allow_data_in_errors_(allow_data_in_errors),
84       timestamp_size_(cmp_ ? cmp_->timestamp_size() : 0),
85       full_history_ts_low_(full_history_ts_low),
86       current_user_key_sequence_(0),
87       current_user_key_snapshot_(0),
88       merge_out_iter_(merge_helper_),
89       blob_garbage_collection_cutoff_file_number_(
90           ComputeBlobGarbageCollectionCutoffFileNumber(compaction_.get())),
91       current_key_committed_(false),
92       cmp_with_history_ts_low_(0),
93       level_(compaction_ == nullptr ? 0 : compaction_->level()) {
94   assert(snapshots_ != nullptr);
95   bottommost_level_ = compaction_ == nullptr
96                           ? false
97                           : compaction_->bottommost_level() &&
98                                 !compaction_->allow_ingest_behind();
99   if (compaction_ != nullptr) {
100     level_ptrs_ = std::vector<size_t>(compaction_->number_levels(), 0);
101   }
102   if (snapshots_->size() == 0) {
103     // optimize for fast path if there are no snapshots
104     visible_at_tip_ = true;
105     earliest_snapshot_iter_ = snapshots_->end();
106     earliest_snapshot_ = kMaxSequenceNumber;
107     latest_snapshot_ = 0;
108   } else {
109     visible_at_tip_ = false;
110     earliest_snapshot_iter_ = snapshots_->begin();
111     earliest_snapshot_ = snapshots_->at(0);
112     latest_snapshot_ = snapshots_->back();
113   }
114 #ifndef NDEBUG
115   // findEarliestVisibleSnapshot assumes this ordering.
116   for (size_t i = 1; i < snapshots_->size(); ++i) {
117     assert(snapshots_->at(i - 1) < snapshots_->at(i));
118   }
119   assert(timestamp_size_ == 0 || !full_history_ts_low_ ||
120          timestamp_size_ == full_history_ts_low_->size());
121 #endif
122   input_.SetPinnedItersMgr(&pinned_iters_mgr_);
123   TEST_SYNC_POINT_CALLBACK("CompactionIterator:AfterInit", compaction_.get());
124 }
125 
~CompactionIterator()126 CompactionIterator::~CompactionIterator() {
127   // input_ Iterator lifetime is longer than pinned_iters_mgr_ lifetime
128   input_.SetPinnedItersMgr(nullptr);
129 }
130 
ResetRecordCounts()131 void CompactionIterator::ResetRecordCounts() {
132   iter_stats_.num_record_drop_user = 0;
133   iter_stats_.num_record_drop_hidden = 0;
134   iter_stats_.num_record_drop_obsolete = 0;
135   iter_stats_.num_record_drop_range_del = 0;
136   iter_stats_.num_range_del_drop_obsolete = 0;
137   iter_stats_.num_optimized_del_drop_obsolete = 0;
138 }
139 
SeekToFirst()140 void CompactionIterator::SeekToFirst() {
141   NextFromInput();
142   PrepareOutput();
143 }
144 
Next()145 void CompactionIterator::Next() {
146   // If there is a merge output, return it before continuing to process the
147   // input.
148   if (merge_out_iter_.Valid()) {
149     merge_out_iter_.Next();
150 
151     // Check if we returned all records of the merge output.
152     if (merge_out_iter_.Valid()) {
153       key_ = merge_out_iter_.key();
154       value_ = merge_out_iter_.value();
155       Status s = ParseInternalKey(key_, &ikey_, allow_data_in_errors_);
156       // MergeUntil stops when it encounters a corrupt key and does not
157       // include them in the result, so we expect the keys here to be valid.
158       assert(s.ok());
159       if (!s.ok()) {
160         ROCKS_LOG_FATAL(info_log_, "Invalid key in compaction. %s",
161                         s.getState());
162       }
163 
164       // Keep current_key_ in sync.
165       current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
166       key_ = current_key_.GetInternalKey();
167       ikey_.user_key = current_key_.GetUserKey();
168       valid_ = true;
169     } else {
170       // We consumed all pinned merge operands, release pinned iterators
171       pinned_iters_mgr_.ReleasePinnedData();
172       // MergeHelper moves the iterator to the first record after the merged
173       // records, so even though we reached the end of the merge output, we do
174       // not want to advance the iterator.
175       NextFromInput();
176     }
177   } else {
178     // Only advance the input iterator if there is no merge output and the
179     // iterator is not already at the next record.
180     if (!at_next_) {
181       AdvanceInputIter();
182     }
183     NextFromInput();
184   }
185 
186   if (valid_) {
187     // Record that we've outputted a record for the current key.
188     has_outputted_key_ = true;
189   }
190 
191   PrepareOutput();
192 }
193 
InvokeFilterIfNeeded(bool * need_skip,Slice * skip_until)194 bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
195                                               Slice* skip_until) {
196   if (!compaction_filter_ ||
197       (ikey_.type != kTypeValue && ikey_.type != kTypeBlobIndex)) {
198     return true;
199   }
200   bool error = false;
201   // If the user has specified a compaction filter and the sequence
202   // number is greater than any external snapshot, then invoke the
203   // filter. If the return value of the compaction filter is true,
204   // replace the entry with a deletion marker.
205   CompactionFilter::Decision filter = CompactionFilter::Decision::kUndetermined;
206   compaction_filter_value_.clear();
207   compaction_filter_skip_until_.Clear();
208   CompactionFilter::ValueType value_type =
209       ikey_.type == kTypeValue ? CompactionFilter::ValueType::kValue
210                                : CompactionFilter::ValueType::kBlobIndex;
211   // Hack: pass internal key to BlobIndexCompactionFilter since it needs
212   // to get sequence number.
213   assert(compaction_filter_);
214   Slice& filter_key =
215       (ikey_.type == kTypeValue ||
216        !compaction_filter_->IsStackedBlobDbInternalCompactionFilter())
217           ? ikey_.user_key
218           : key_;
219   {
220     StopWatchNano timer(clock_, report_detailed_time_);
221     if (kTypeBlobIndex == ikey_.type) {
222       blob_value_.Reset();
223       filter = compaction_filter_->FilterBlobByKey(
224           level_, filter_key, &compaction_filter_value_,
225           compaction_filter_skip_until_.rep());
226       if (CompactionFilter::Decision::kUndetermined == filter &&
227           !compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) {
228         // For integrated BlobDB impl, CompactionIterator reads blob value.
229         // For Stacked BlobDB impl, the corresponding CompactionFilter's
230         // FilterV2 method should read the blob value.
231         BlobIndex blob_index;
232         Status s = blob_index.DecodeFrom(value_);
233         if (!s.ok()) {
234           status_ = s;
235           valid_ = false;
236           return false;
237         }
238         if (blob_index.HasTTL() || blob_index.IsInlined()) {
239           status_ = Status::Corruption("Unexpected TTL/inlined blob index");
240           valid_ = false;
241           return false;
242         }
243         if (compaction_ == nullptr) {
244           status_ =
245               Status::Corruption("Unexpected blob index outside of compaction");
246           valid_ = false;
247           return false;
248         }
249         const Version* const version = compaction_->input_version();
250         assert(version);
251 
252         uint64_t bytes_read = 0;
253         s = version->GetBlob(ReadOptions(), ikey_.user_key, blob_index,
254                              &blob_value_, &bytes_read);
255         if (!s.ok()) {
256           status_ = s;
257           valid_ = false;
258           return false;
259         }
260 
261         ++iter_stats_.num_blobs_read;
262         iter_stats_.total_blob_bytes_read += bytes_read;
263 
264         value_type = CompactionFilter::ValueType::kValue;
265       }
266     }
267     if (CompactionFilter::Decision::kUndetermined == filter) {
268       filter = compaction_filter_->FilterV2(
269           level_, filter_key, value_type,
270           blob_value_.empty() ? value_ : blob_value_, &compaction_filter_value_,
271           compaction_filter_skip_until_.rep());
272     }
273     iter_stats_.total_filter_time +=
274         env_ != nullptr && report_detailed_time_ ? timer.ElapsedNanos() : 0;
275   }
276 
277   if (CompactionFilter::Decision::kUndetermined == filter) {
278     // Should not reach here, since FilterV2 should never return kUndetermined.
279     status_ =
280         Status::NotSupported("FilterV2() should never return kUndetermined");
281     valid_ = false;
282     return false;
283   }
284 
285   if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil &&
286       cmp_->Compare(*compaction_filter_skip_until_.rep(), ikey_.user_key) <=
287           0) {
288     // Can't skip to a key smaller than the current one.
289     // Keep the key as per FilterV2 documentation.
290     filter = CompactionFilter::Decision::kKeep;
291   }
292 
293   if (filter == CompactionFilter::Decision::kRemove) {
294     // convert the current key to a delete; key_ is pointing into
295     // current_key_ at this point, so updating current_key_ updates key()
296     ikey_.type = kTypeDeletion;
297     current_key_.UpdateInternalKey(ikey_.sequence, kTypeDeletion);
298     // no value associated with delete
299     value_.clear();
300     iter_stats_.num_record_drop_user++;
301   } else if (filter == CompactionFilter::Decision::kChangeValue) {
302     if (ikey_.type == kTypeBlobIndex) {
303       // value transfer from blob file to inlined data
304       ikey_.type = kTypeValue;
305       current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
306     }
307     value_ = compaction_filter_value_;
308   } else if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil) {
309     *need_skip = true;
310     compaction_filter_skip_until_.ConvertFromUserKey(kMaxSequenceNumber,
311                                                      kValueTypeForSeek);
312     *skip_until = compaction_filter_skip_until_.Encode();
313   } else if (filter == CompactionFilter::Decision::kChangeBlobIndex) {
314     // Only the StackableDB-based BlobDB impl's compaction filter should return
315     // kChangeBlobIndex. Decision about rewriting blob and changing blob index
316     // in the integrated BlobDB impl is made in subsequent call to
317     // PrepareOutput() and its callees.
318     if (!compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) {
319       status_ = Status::NotSupported(
320           "Only stacked BlobDB's internal compaction filter can return "
321           "kChangeBlobIndex.");
322       valid_ = false;
323       return false;
324     }
325     if (ikey_.type == kTypeValue) {
326       // value transfer from inlined data to blob file
327       ikey_.type = kTypeBlobIndex;
328       current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
329     }
330     value_ = compaction_filter_value_;
331   } else if (filter == CompactionFilter::Decision::kIOError) {
332     if (!compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) {
333       status_ = Status::NotSupported(
334           "CompactionFilter for integrated BlobDB should not return kIOError");
335       valid_ = false;
336       return false;
337     }
338     status_ = Status::IOError("Failed to access blob during compaction filter");
339     error = true;
340   }
341   return !error;
342 }
343 
NextFromInput()344 void CompactionIterator::NextFromInput() {
345   at_next_ = false;
346   valid_ = false;
347 
348   while (!valid_ && input_.Valid() && !IsPausingManualCompaction() &&
349          !IsShuttingDown()) {
350     key_ = input_.key();
351     value_ = input_.value();
352     iter_stats_.num_input_records++;
353 
354     Status pik_status = ParseInternalKey(key_, &ikey_, allow_data_in_errors_);
355     if (!pik_status.ok()) {
356       iter_stats_.num_input_corrupt_records++;
357 
358       // If `expect_valid_internal_key_` is false, return the corrupted key
359       // and let the caller decide what to do with it.
360       if (expect_valid_internal_key_) {
361         status_ = pik_status;
362         return;
363       }
364       key_ = current_key_.SetInternalKey(key_);
365       has_current_user_key_ = false;
366       current_user_key_sequence_ = kMaxSequenceNumber;
367       current_user_key_snapshot_ = 0;
368       valid_ = true;
369       break;
370     }
371     TEST_SYNC_POINT_CALLBACK("CompactionIterator:ProcessKV", &ikey_);
372 
373     // Update input statistics
374     if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion ||
375         ikey_.type == kTypeDeletionWithTimestamp) {
376       iter_stats_.num_input_deletion_records++;
377     }
378     iter_stats_.total_input_raw_key_bytes += key_.size();
379     iter_stats_.total_input_raw_value_bytes += value_.size();
380 
381     // If need_skip is true, we should seek the input iterator
382     // to internal key skip_until and continue from there.
383     bool need_skip = false;
384     // Points either into compaction_filter_skip_until_ or into
385     // merge_helper_->compaction_filter_skip_until_.
386     Slice skip_until;
387 
388     bool user_key_equal_without_ts = false;
389     int cmp_ts = 0;
390     if (has_current_user_key_) {
391       user_key_equal_without_ts =
392           cmp_->EqualWithoutTimestamp(ikey_.user_key, current_user_key_);
393       // if timestamp_size_ > 0, then curr_ts_ has been initialized by a
394       // previous key.
395       cmp_ts = timestamp_size_ ? cmp_->CompareTimestamp(
396                                      ExtractTimestampFromUserKey(
397                                          ikey_.user_key, timestamp_size_),
398                                      curr_ts_)
399                                : 0;
400     }
401 
402     // Check whether the user key changed. After this if statement current_key_
403     // is a copy of the current input key (maybe converted to a delete by the
404     // compaction filter). ikey_.user_key is pointing to the copy.
405     if (!has_current_user_key_ || !user_key_equal_without_ts || cmp_ts != 0) {
406       // First occurrence of this user key
407       // Copy key for output
408       key_ = current_key_.SetInternalKey(key_, &ikey_);
409 
410       // If timestamp_size_ > 0, then copy from ikey_ to curr_ts_ for the use
411       // in next iteration to compare with the timestamp of next key.
412       UpdateTimestampAndCompareWithFullHistoryLow();
413 
414       // If
415       // (1) !has_current_user_key_, OR
416       // (2) timestamp is disabled, OR
417       // (3) all history will be preserved, OR
418       // (4) user key (excluding timestamp) is different from previous key, OR
419       // (5) timestamp is NO older than *full_history_ts_low_
420       // then current_user_key_ must be treated as a different user key.
421       // This means, if a user key (excluding ts) is the same as the previous
422       // user key, and its ts is older than *full_history_ts_low_, then we
423       // consider this key for GC, e.g. it may be dropped if certain conditions
424       // match.
425       if (!has_current_user_key_ || !timestamp_size_ || !full_history_ts_low_ ||
426           !user_key_equal_without_ts || cmp_with_history_ts_low_ >= 0) {
427         // Initialize for future comparison for rule (A) and etc.
428         current_user_key_sequence_ = kMaxSequenceNumber;
429         current_user_key_snapshot_ = 0;
430         has_current_user_key_ = true;
431       }
432       current_user_key_ = ikey_.user_key;
433 
434       has_outputted_key_ = false;
435 
436       current_key_committed_ = KeyCommitted(ikey_.sequence);
437 
438       // Apply the compaction filter to the first committed version of the user
439       // key.
440       if (current_key_committed_ &&
441           !InvokeFilterIfNeeded(&need_skip, &skip_until)) {
442         break;
443       }
444     } else {
445       // Update the current key to reflect the new sequence number/type without
446       // copying the user key.
447       // TODO(rven): Compaction filter does not process keys in this path
448       // Need to have the compaction filter process multiple versions
449       // if we have versions on both sides of a snapshot
450       current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
451       key_ = current_key_.GetInternalKey();
452       ikey_.user_key = current_key_.GetUserKey();
453 
454       // Note that newer version of a key is ordered before older versions. If a
455       // newer version of a key is committed, so as the older version. No need
456       // to query snapshot_checker_ in that case.
457       if (UNLIKELY(!current_key_committed_)) {
458         assert(snapshot_checker_ != nullptr);
459         current_key_committed_ = KeyCommitted(ikey_.sequence);
460         // Apply the compaction filter to the first committed version of the
461         // user key.
462         if (current_key_committed_ &&
463             !InvokeFilterIfNeeded(&need_skip, &skip_until)) {
464           break;
465         }
466       }
467     }
468 
469     if (UNLIKELY(!current_key_committed_)) {
470       assert(snapshot_checker_ != nullptr);
471       valid_ = true;
472       break;
473     }
474 
475     // If there are no snapshots, then this kv affect visibility at tip.
476     // Otherwise, search though all existing snapshots to find the earliest
477     // snapshot that is affected by this kv.
478     SequenceNumber last_sequence = current_user_key_sequence_;
479     current_user_key_sequence_ = ikey_.sequence;
480     SequenceNumber last_snapshot = current_user_key_snapshot_;
481     SequenceNumber prev_snapshot = 0;  // 0 means no previous snapshot
482     current_user_key_snapshot_ =
483         visible_at_tip_
484             ? earliest_snapshot_
485             : findEarliestVisibleSnapshot(ikey_.sequence, &prev_snapshot);
486 
487     if (need_skip) {
488       // This case is handled below.
489     } else if (clear_and_output_next_key_) {
490       // In the previous iteration we encountered a single delete that we could
491       // not compact out.  We will keep this Put, but can drop it's data.
492       // (See Optimization 3, below.)
493       assert(ikey_.type == kTypeValue || ikey_.type == kTypeBlobIndex);
494       if (ikey_.type != kTypeValue && ikey_.type != kTypeBlobIndex) {
495         ROCKS_LOG_FATAL(info_log_,
496                         "Unexpected key type %d for compaction output",
497                         ikey_.type);
498       }
499       assert(current_user_key_snapshot_ >= last_snapshot);
500       if (current_user_key_snapshot_ < last_snapshot) {
501         ROCKS_LOG_FATAL(info_log_,
502                         "current_user_key_snapshot_ (%" PRIu64
503                         ") < last_snapshot (%" PRIu64 ")",
504                         current_user_key_snapshot_, last_snapshot);
505       }
506 
507       if (ikey_.type == kTypeBlobIndex) {
508         ikey_.type = kTypeValue;
509         current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
510       }
511 
512       value_.clear();
513       valid_ = true;
514       clear_and_output_next_key_ = false;
515     } else if (ikey_.type == kTypeSingleDeletion) {
516       // We can compact out a SingleDelete if:
517       // 1) We encounter the corresponding PUT -OR- we know that this key
518       //    doesn't appear past this output level
519       // =AND=
520       // 2) We've already returned a record in this snapshot -OR-
521       //    there are no earlier earliest_write_conflict_snapshot.
522       //
523       // Rule 1 is needed for SingleDelete correctness.  Rule 2 is needed to
524       // allow Transactions to do write-conflict checking (if we compacted away
525       // all keys, then we wouldn't know that a write happened in this
526       // snapshot).  If there is no earlier snapshot, then we know that there
527       // are no active transactions that need to know about any writes.
528       //
529       // Optimization 3:
530       // If we encounter a SingleDelete followed by a PUT and Rule 2 is NOT
531       // true, then we must output a SingleDelete.  In this case, we will decide
532       // to also output the PUT.  While we are compacting less by outputting the
533       // PUT now, hopefully this will lead to better compaction in the future
534       // when Rule 2 is later true (Ie, We are hoping we can later compact out
535       // both the SingleDelete and the Put, while we couldn't if we only
536       // outputted the SingleDelete now).
537       // In this case, we can save space by removing the PUT's value as it will
538       // never be read.
539       //
540       // Deletes and Merges are not supported on the same key that has a
541       // SingleDelete as it is not possible to correctly do any partial
542       // compaction of such a combination of operations.  The result of mixing
543       // those operations for a given key is documented as being undefined.  So
544       // we can choose how to handle such a combinations of operations.  We will
545       // try to compact out as much as we can in these cases.
546       // We will report counts on these anomalous cases.
547       //
548       // Note: If timestamp is enabled, then record will be eligible for
549       // deletion, only if, along with above conditions (Rule 1 and Rule 2)
550       // full_history_ts_low_ is specified and timestamp for that key is less
551       // than *full_history_ts_low_. If it's not eligible for deletion, then we
552       // will output the SingleDelete. For Optimization 3 also, if
553       // full_history_ts_low_ is specified and timestamp for the key is less
554       // than *full_history_ts_low_ then only optimization will be applied.
555 
556       // The easiest way to process a SingleDelete during iteration is to peek
557       // ahead at the next key.
558       const bool is_timestamp_eligible_for_gc =
559           (timestamp_size_ == 0 ||
560            (full_history_ts_low_ && cmp_with_history_ts_low_ < 0));
561 
562       ParsedInternalKey next_ikey;
563       AdvanceInputIter();
564 
565       // Check whether the next key exists, is not corrupt, and is the same key
566       // as the single delete.
567       if (input_.Valid() &&
568           ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_)
569               .ok() &&
570           cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key)) {
571 #ifndef NDEBUG
572         const Compaction* c =
573             compaction_ ? compaction_->real_compaction() : nullptr;
574 #endif
575         TEST_SYNC_POINT_CALLBACK(
576             "CompactionIterator::NextFromInput:SingleDelete:1",
577             const_cast<Compaction*>(c));
578         // Check whether the next key belongs to the same snapshot as the
579         // SingleDelete.
580         if (prev_snapshot == 0 ||
581             DefinitelyNotInSnapshot(next_ikey.sequence, prev_snapshot)) {
582           TEST_SYNC_POINT_CALLBACK(
583               "CompactionIterator::NextFromInput:SingleDelete:2", nullptr);
584           if (next_ikey.type == kTypeSingleDeletion) {
585             // We encountered two SingleDeletes for same key in a row. This
586             // could be due to unexpected user input. Skip the first
587             // SingleDelete and let the next iteration decide how to handle the
588             // second SingleDelete
589 
590             // First SingleDelete has been skipped since we already called
591             // input_.Next().
592             ++iter_stats_.num_record_drop_obsolete;
593             ++iter_stats_.num_single_del_mismatch;
594           } else if (!is_timestamp_eligible_for_gc) {
595             // We cannot drop the SingleDelete as timestamp is enabled, and
596             // timestamp of this key is greater than or equal to
597             // *full_history_ts_low_. We will output the SingleDelete.
598             valid_ = true;
599           } else if (has_outputted_key_ ||
600                      DefinitelyInSnapshot(ikey_.sequence,
601                                           earliest_write_conflict_snapshot_)) {
602             // Found a matching value, we can drop the single delete and the
603             // value.  It is safe to drop both records since we've already
604             // outputted a key in this snapshot, or there is no earlier
605             // snapshot (Rule 2 above).
606 
607             // Note: it doesn't matter whether the second key is a Put or if it
608             // is an unexpected Merge or Delete.  We will compact it out
609             // either way. We will maintain counts of how many mismatches
610             // happened
611             if (next_ikey.type != kTypeValue &&
612                 next_ikey.type != kTypeBlobIndex) {
613               ++iter_stats_.num_single_del_mismatch;
614             }
615 
616             ++iter_stats_.num_record_drop_hidden;
617             ++iter_stats_.num_record_drop_obsolete;
618             // Already called input_.Next() once.  Call it a second time to
619             // skip past the second key.
620             AdvanceInputIter();
621           } else {
622             // Found a matching value, but we cannot drop both keys since
623             // there is an earlier snapshot and we need to leave behind a record
624             // to know that a write happened in this snapshot (Rule 2 above).
625             // Clear the value and output the SingleDelete. (The value will be
626             // outputted on the next iteration.)
627 
628             // Setting valid_ to true will output the current SingleDelete
629             valid_ = true;
630 
631             // Set up the Put to be outputted in the next iteration.
632             // (Optimization 3).
633             clear_and_output_next_key_ = true;
634             TEST_SYNC_POINT_CALLBACK(
635                 "CompactionIterator::NextFromInput:KeepSDForWW",
636                 /*arg=*/nullptr);
637           }
638         } else {
639           // We hit the next snapshot without hitting a put, so the iterator
640           // returns the single delete.
641           valid_ = true;
642         }
643       } else {
644         // We are at the end of the input, could not parse the next key, or hit
645         // a different key. The iterator returns the single delete if the key
646         // possibly exists beyond the current output level.  We set
647         // has_current_user_key to false so that if the iterator is at the next
648         // key, we do not compare it again against the previous key at the next
649         // iteration. If the next key is corrupt, we return before the
650         // comparison, so the value of has_current_user_key does not matter.
651         has_current_user_key_ = false;
652         if (compaction_ != nullptr &&
653             DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) &&
654             compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key,
655                                                        &level_ptrs_) &&
656             is_timestamp_eligible_for_gc) {
657           // Key doesn't exist outside of this range.
658           // Can compact out this SingleDelete.
659           ++iter_stats_.num_record_drop_obsolete;
660           ++iter_stats_.num_single_del_fallthru;
661           if (!bottommost_level_) {
662             ++iter_stats_.num_optimized_del_drop_obsolete;
663           }
664         } else {
665           // Output SingleDelete
666           valid_ = true;
667         }
668       }
669 
670       if (valid_) {
671         at_next_ = true;
672       }
673     } else if (last_snapshot == current_user_key_snapshot_ ||
674                (last_snapshot > 0 &&
675                 last_snapshot < current_user_key_snapshot_)) {
676       // If the earliest snapshot is which this key is visible in
677       // is the same as the visibility of a previous instance of the
678       // same key, then this kv is not visible in any snapshot.
679       // Hidden by an newer entry for same user key
680       //
681       // Note: Dropping this key will not affect TransactionDB write-conflict
682       // checking since there has already been a record returned for this key
683       // in this snapshot.
684       assert(last_sequence >= current_user_key_sequence_);
685       if (last_sequence < current_user_key_sequence_) {
686         ROCKS_LOG_FATAL(info_log_,
687                         "last_sequence (%" PRIu64
688                         ") < current_user_key_sequence_ (%" PRIu64 ")",
689                         last_sequence, current_user_key_sequence_);
690       }
691 
692       ++iter_stats_.num_record_drop_hidden;  // rule (A)
693       AdvanceInputIter();
694     } else if (compaction_ != nullptr &&
695                (ikey_.type == kTypeDeletion ||
696                 (ikey_.type == kTypeDeletionWithTimestamp &&
697                  cmp_with_history_ts_low_ < 0)) &&
698                DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) &&
699                ikeyNotNeededForIncrementalSnapshot() &&
700                compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key,
701                                                           &level_ptrs_)) {
702       // TODO(noetzli): This is the only place where we use compaction_
703       // (besides the constructor). We should probably get rid of this
704       // dependency and find a way to do similar filtering during flushes.
705       //
706       // For this user key:
707       // (1) there is no data in higher levels
708       // (2) data in lower levels will have larger sequence numbers
709       // (3) data in layers that are being compacted here and have
710       //     smaller sequence numbers will be dropped in the next
711       //     few iterations of this loop (by rule (A) above).
712       // Therefore this deletion marker is obsolete and can be dropped.
713       //
714       // Note:  Dropping this Delete will not affect TransactionDB
715       // write-conflict checking since it is earlier than any snapshot.
716       //
717       // It seems that we can also drop deletion later than earliest snapshot
718       // given that:
719       // (1) The deletion is earlier than earliest_write_conflict_snapshot, and
720       // (2) No value exist earlier than the deletion.
721       //
722       // Note also that a deletion marker of type kTypeDeletionWithTimestamp
723       // will be treated as a different user key unless the timestamp is older
724       // than *full_history_ts_low_.
725       ++iter_stats_.num_record_drop_obsolete;
726       if (!bottommost_level_) {
727         ++iter_stats_.num_optimized_del_drop_obsolete;
728       }
729       AdvanceInputIter();
730     } else if ((ikey_.type == kTypeDeletion ||
731                 (ikey_.type == kTypeDeletionWithTimestamp &&
732                  cmp_with_history_ts_low_ < 0)) &&
733                bottommost_level_ && ikeyNotNeededForIncrementalSnapshot()) {
734       // Handle the case where we have a delete key at the bottom most level
735       // We can skip outputting the key iff there are no subsequent puts for this
736       // key
737       assert(!compaction_ || compaction_->KeyNotExistsBeyondOutputLevel(
738                                  ikey_.user_key, &level_ptrs_));
739       ParsedInternalKey next_ikey;
740       AdvanceInputIter();
741 #ifndef NDEBUG
742       const Compaction* c =
743           compaction_ ? compaction_->real_compaction() : nullptr;
744 #endif
745       TEST_SYNC_POINT_CALLBACK(
746           "CompactionIterator::NextFromInput:BottommostDelete:1",
747           const_cast<Compaction*>(c));
748       // Skip over all versions of this key that happen to occur in the same
749       // snapshot range as the delete.
750       //
751       // Note that a deletion marker of type kTypeDeletionWithTimestamp will be
752       // considered to have a different user key unless the timestamp is older
753       // than *full_history_ts_low_.
754       while (!IsPausingManualCompaction() && !IsShuttingDown() &&
755              input_.Valid() &&
756              (ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_)
757                   .ok()) &&
758              cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key) &&
759              (prev_snapshot == 0 ||
760               DefinitelyNotInSnapshot(next_ikey.sequence, prev_snapshot))) {
761         AdvanceInputIter();
762       }
763       // If you find you still need to output a row with this key, we need to output the
764       // delete too
765       if (input_.Valid() &&
766           (ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_)
767                .ok()) &&
768           cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key)) {
769         valid_ = true;
770         at_next_ = true;
771       }
772     } else if (ikey_.type == kTypeMerge) {
773       if (!merge_helper_->HasOperator()) {
774         status_ = Status::InvalidArgument(
775             "merge_operator is not properly initialized.");
776         return;
777       }
778 
779       pinned_iters_mgr_.StartPinning();
780       Version* version = compaction_ ? compaction_->input_version() : nullptr;
781 
782       // We know the merge type entry is not hidden, otherwise we would
783       // have hit (A)
784       // We encapsulate the merge related state machine in a different
785       // object to minimize change to the existing flow.
786       Status s = merge_helper_->MergeUntil(&input_, range_del_agg_,
787                                            prev_snapshot, bottommost_level_,
788                                            allow_data_in_errors_, version);
789       merge_out_iter_.SeekToFirst();
790 
791       if (!s.ok() && !s.IsMergeInProgress()) {
792         status_ = s;
793         return;
794       } else if (merge_out_iter_.Valid()) {
795         // NOTE: key, value, and ikey_ refer to old entries.
796         //       These will be correctly set below.
797         key_ = merge_out_iter_.key();
798         value_ = merge_out_iter_.value();
799         pik_status = ParseInternalKey(key_, &ikey_, allow_data_in_errors_);
800         // MergeUntil stops when it encounters a corrupt key and does not
801         // include them in the result, so we expect the keys here to valid.
802         assert(pik_status.ok());
803         if (!pik_status.ok()) {
804           ROCKS_LOG_FATAL(info_log_, "Invalid key in compaction. %s",
805                           pik_status.getState());
806         }
807         // Keep current_key_ in sync.
808         current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
809         key_ = current_key_.GetInternalKey();
810         ikey_.user_key = current_key_.GetUserKey();
811         valid_ = true;
812       } else {
813         // all merge operands were filtered out. reset the user key, since the
814         // batch consumed by the merge operator should not shadow any keys
815         // coming after the merges
816         has_current_user_key_ = false;
817         pinned_iters_mgr_.ReleasePinnedData();
818 
819         if (merge_helper_->FilteredUntil(&skip_until)) {
820           need_skip = true;
821         }
822       }
823     } else {
824       // 1. new user key -OR-
825       // 2. different snapshot stripe
826       bool should_delete = range_del_agg_->ShouldDelete(
827           key_, RangeDelPositioningMode::kForwardTraversal);
828       if (should_delete) {
829         ++iter_stats_.num_record_drop_hidden;
830         ++iter_stats_.num_record_drop_range_del;
831         AdvanceInputIter();
832       } else {
833         valid_ = true;
834       }
835     }
836 
837     if (need_skip) {
838       SkipUntil(skip_until);
839     }
840   }
841 
842   if (!valid_ && IsShuttingDown()) {
843     status_ = Status::ShutdownInProgress();
844   }
845 
846   if (IsPausingManualCompaction()) {
847     status_ = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
848   }
849 }
850 
ExtractLargeValueIfNeededImpl()851 bool CompactionIterator::ExtractLargeValueIfNeededImpl() {
852   if (!blob_file_builder_) {
853     return false;
854   }
855 
856   blob_index_.clear();
857   const Status s = blob_file_builder_->Add(user_key(), value_, &blob_index_);
858 
859   if (!s.ok()) {
860     status_ = s;
861     valid_ = false;
862 
863     return false;
864   }
865 
866   if (blob_index_.empty()) {
867     return false;
868   }
869 
870   value_ = blob_index_;
871 
872   return true;
873 }
874 
ExtractLargeValueIfNeeded()875 void CompactionIterator::ExtractLargeValueIfNeeded() {
876   assert(ikey_.type == kTypeValue);
877 
878   if (!ExtractLargeValueIfNeededImpl()) {
879     return;
880   }
881 
882   ikey_.type = kTypeBlobIndex;
883   current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
884 }
885 
GarbageCollectBlobIfNeeded()886 void CompactionIterator::GarbageCollectBlobIfNeeded() {
887   assert(ikey_.type == kTypeBlobIndex);
888 
889   if (!compaction_) {
890     return;
891   }
892 
893   // GC for integrated BlobDB
894   if (compaction_->enable_blob_garbage_collection()) {
895     BlobIndex blob_index;
896 
897     {
898       const Status s = blob_index.DecodeFrom(value_);
899 
900       if (!s.ok()) {
901         status_ = s;
902         valid_ = false;
903 
904         return;
905       }
906     }
907 
908     if (blob_index.IsInlined() || blob_index.HasTTL()) {
909       status_ = Status::Corruption("Unexpected TTL/inlined blob index");
910       valid_ = false;
911 
912       return;
913     }
914 
915     if (blob_index.file_number() >=
916         blob_garbage_collection_cutoff_file_number_) {
917       return;
918     }
919 
920     const Version* const version = compaction_->input_version();
921     assert(version);
922 
923     uint64_t bytes_read = 0;
924 
925     {
926       const Status s = version->GetBlob(ReadOptions(), user_key(), blob_index,
927                                         &blob_value_, &bytes_read);
928 
929       if (!s.ok()) {
930         status_ = s;
931         valid_ = false;
932 
933         return;
934       }
935     }
936 
937     ++iter_stats_.num_blobs_read;
938     iter_stats_.total_blob_bytes_read += bytes_read;
939 
940     ++iter_stats_.num_blobs_relocated;
941     iter_stats_.total_blob_bytes_relocated += blob_index.size();
942 
943     value_ = blob_value_;
944 
945     if (ExtractLargeValueIfNeededImpl()) {
946       return;
947     }
948 
949     ikey_.type = kTypeValue;
950     current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
951 
952     return;
953   }
954 
955   // GC for stacked BlobDB
956   if (compaction_filter_ &&
957       compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) {
958     const auto blob_decision = compaction_filter_->PrepareBlobOutput(
959         user_key(), value_, &compaction_filter_value_);
960 
961     if (blob_decision == CompactionFilter::BlobDecision::kCorruption) {
962       status_ =
963           Status::Corruption("Corrupted blob reference encountered during GC");
964       valid_ = false;
965 
966       return;
967     }
968 
969     if (blob_decision == CompactionFilter::BlobDecision::kIOError) {
970       status_ = Status::IOError("Could not relocate blob during GC");
971       valid_ = false;
972 
973       return;
974     }
975 
976     if (blob_decision == CompactionFilter::BlobDecision::kChangeValue) {
977       value_ = compaction_filter_value_;
978 
979       return;
980     }
981   }
982 }
983 
PrepareOutput()984 void CompactionIterator::PrepareOutput() {
985   if (valid_) {
986     if (ikey_.type == kTypeValue) {
987       ExtractLargeValueIfNeeded();
988     } else if (ikey_.type == kTypeBlobIndex) {
989       GarbageCollectBlobIfNeeded();
990     }
991 
992     // Zeroing out the sequence number leads to better compression.
993     // If this is the bottommost level (no files in lower levels)
994     // and the earliest snapshot is larger than this seqno
995     // and the userkey differs from the last userkey in compaction
996     // then we can squash the seqno to zero.
997     //
998     // This is safe for TransactionDB write-conflict checking since transactions
999     // only care about sequence number larger than any active snapshots.
1000     //
1001     // Can we do the same for levels above bottom level as long as
1002     // KeyNotExistsBeyondOutputLevel() return true?
1003     if (valid_ && compaction_ != nullptr &&
1004         !compaction_->allow_ingest_behind() &&
1005         ikeyNotNeededForIncrementalSnapshot() && bottommost_level_ &&
1006         DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) &&
1007         ikey_.type != kTypeMerge) {
1008       assert(ikey_.type != kTypeDeletion);
1009       assert(ikey_.type != kTypeSingleDeletion ||
1010              (timestamp_size_ || full_history_ts_low_));
1011       if (ikey_.type == kTypeDeletion ||
1012           (ikey_.type == kTypeSingleDeletion &&
1013            (!timestamp_size_ || !full_history_ts_low_))) {
1014         ROCKS_LOG_FATAL(info_log_,
1015                         "Unexpected key type %d for seq-zero optimization",
1016                         ikey_.type);
1017       }
1018       ikey_.sequence = 0;
1019       if (!timestamp_size_) {
1020         current_key_.UpdateInternalKey(0, ikey_.type);
1021       } else if (full_history_ts_low_ && cmp_with_history_ts_low_ < 0) {
1022         // We can also zero out timestamp for better compression.
1023         // For the same user key (excluding timestamp), the timestamp-based
1024         // history can be collapsed to save some space if the timestamp is
1025         // older than *full_history_ts_low_.
1026         const std::string kTsMin(timestamp_size_, static_cast<char>(0));
1027         const Slice ts_slice = kTsMin;
1028         ikey_.SetTimestamp(ts_slice);
1029         current_key_.UpdateInternalKey(0, ikey_.type, &ts_slice);
1030       }
1031     }
1032   }
1033 }
1034 
findEarliestVisibleSnapshot(SequenceNumber in,SequenceNumber * prev_snapshot)1035 inline SequenceNumber CompactionIterator::findEarliestVisibleSnapshot(
1036     SequenceNumber in, SequenceNumber* prev_snapshot) {
1037   assert(snapshots_->size());
1038   if (snapshots_->size() == 0) {
1039     ROCKS_LOG_FATAL(info_log_,
1040                     "No snapshot left in findEarliestVisibleSnapshot");
1041   }
1042   auto snapshots_iter = std::lower_bound(
1043       snapshots_->begin(), snapshots_->end(), in);
1044   if (snapshots_iter == snapshots_->begin()) {
1045     *prev_snapshot = 0;
1046   } else {
1047     *prev_snapshot = *std::prev(snapshots_iter);
1048     assert(*prev_snapshot < in);
1049     if (*prev_snapshot >= in) {
1050       ROCKS_LOG_FATAL(info_log_,
1051                       "*prev_snapshot >= in in findEarliestVisibleSnapshot");
1052     }
1053   }
1054   if (snapshot_checker_ == nullptr) {
1055     return snapshots_iter != snapshots_->end()
1056       ? *snapshots_iter : kMaxSequenceNumber;
1057   }
1058   bool has_released_snapshot = !released_snapshots_.empty();
1059   for (; snapshots_iter != snapshots_->end(); ++snapshots_iter) {
1060     auto cur = *snapshots_iter;
1061     assert(in <= cur);
1062     if (in > cur) {
1063       ROCKS_LOG_FATAL(info_log_, "in > cur in findEarliestVisibleSnapshot");
1064     }
1065     // Skip if cur is in released_snapshots.
1066     if (has_released_snapshot && released_snapshots_.count(cur) > 0) {
1067       continue;
1068     }
1069     auto res = snapshot_checker_->CheckInSnapshot(in, cur);
1070     if (res == SnapshotCheckerResult::kInSnapshot) {
1071       return cur;
1072     } else if (res == SnapshotCheckerResult::kSnapshotReleased) {
1073       released_snapshots_.insert(cur);
1074     }
1075     *prev_snapshot = cur;
1076   }
1077   return kMaxSequenceNumber;
1078 }
1079 
1080 // used in 2 places - prevents deletion markers to be dropped if they may be
1081 // needed and disables seqnum zero-out in PrepareOutput for recent keys.
ikeyNotNeededForIncrementalSnapshot()1082 inline bool CompactionIterator::ikeyNotNeededForIncrementalSnapshot() {
1083   return (!compaction_->preserve_deletes()) ||
1084          (ikey_.sequence < preserve_deletes_seqnum_);
1085 }
1086 
IsInCurrentEarliestSnapshot(SequenceNumber sequence)1087 bool CompactionIterator::IsInCurrentEarliestSnapshot(SequenceNumber sequence) {
1088   assert(snapshot_checker_ != nullptr);
1089   bool pre_condition = (earliest_snapshot_ == kMaxSequenceNumber ||
1090                         (earliest_snapshot_iter_ != snapshots_->end() &&
1091                          *earliest_snapshot_iter_ == earliest_snapshot_));
1092   assert(pre_condition);
1093   if (!pre_condition) {
1094     ROCKS_LOG_FATAL(info_log_,
1095                     "Pre-Condition is not hold in IsInEarliestSnapshot");
1096   }
1097   auto in_snapshot =
1098       snapshot_checker_->CheckInSnapshot(sequence, earliest_snapshot_);
1099   while (UNLIKELY(in_snapshot == SnapshotCheckerResult::kSnapshotReleased)) {
1100     // Avoid the the current earliest_snapshot_ being return as
1101     // earliest visible snapshot for the next value. So if a value's sequence
1102     // is zero-ed out by PrepareOutput(), the next value will be compact out.
1103     released_snapshots_.insert(earliest_snapshot_);
1104     earliest_snapshot_iter_++;
1105 
1106     if (earliest_snapshot_iter_ == snapshots_->end()) {
1107       earliest_snapshot_ = kMaxSequenceNumber;
1108     } else {
1109       earliest_snapshot_ = *earliest_snapshot_iter_;
1110     }
1111     in_snapshot =
1112         snapshot_checker_->CheckInSnapshot(sequence, earliest_snapshot_);
1113   }
1114   assert(in_snapshot != SnapshotCheckerResult::kSnapshotReleased);
1115   if (in_snapshot == SnapshotCheckerResult::kSnapshotReleased) {
1116     ROCKS_LOG_FATAL(info_log_,
1117                     "Unexpected released snapshot in IsInEarliestSnapshot");
1118   }
1119   return in_snapshot == SnapshotCheckerResult::kInSnapshot;
1120 }
1121 
ComputeBlobGarbageCollectionCutoffFileNumber(const CompactionProxy * compaction)1122 uint64_t CompactionIterator::ComputeBlobGarbageCollectionCutoffFileNumber(
1123     const CompactionProxy* compaction) {
1124   if (!compaction) {
1125     return 0;
1126   }
1127 
1128   if (!compaction->enable_blob_garbage_collection()) {
1129     return 0;
1130   }
1131 
1132   Version* const version = compaction->input_version();
1133   assert(version);
1134 
1135   const VersionStorageInfo* const storage_info = version->storage_info();
1136   assert(storage_info);
1137 
1138   const auto& blob_files = storage_info->GetBlobFiles();
1139 
1140   auto it = blob_files.begin();
1141   std::advance(
1142       it, compaction->blob_garbage_collection_age_cutoff() * blob_files.size());
1143 
1144   return it != blob_files.end() ? it->first
1145                                 : std::numeric_limits<uint64_t>::max();
1146 }
1147 
1148 }  // namespace ROCKSDB_NAMESPACE
1149