1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include "db/compaction/compaction_iterator.h"
7
8 #include <iterator>
9 #include <limits>
10
11 #include "db/blob/blob_file_builder.h"
12 #include "db/blob/blob_index.h"
13 #include "db/snapshot_checker.h"
14 #include "logging/logging.h"
15 #include "port/likely.h"
16 #include "rocksdb/listener.h"
17 #include "table/internal_iterator.h"
18 #include "test_util/sync_point.h"
19
20 namespace ROCKSDB_NAMESPACE {
CompactionIterator(InternalIterator * input,const Comparator * cmp,MergeHelper * merge_helper,SequenceNumber last_sequence,std::vector<SequenceNumber> * snapshots,SequenceNumber earliest_write_conflict_snapshot,const SnapshotChecker * snapshot_checker,Env * env,bool report_detailed_time,bool expect_valid_internal_key,CompactionRangeDelAggregator * range_del_agg,BlobFileBuilder * blob_file_builder,bool allow_data_in_errors,const Compaction * compaction,const CompactionFilter * compaction_filter,const std::atomic<bool> * shutting_down,const SequenceNumber preserve_deletes_seqnum,const std::atomic<int> * manual_compaction_paused,const std::atomic<bool> * manual_compaction_canceled,const std::shared_ptr<Logger> info_log,const std::string * full_history_ts_low)21 CompactionIterator::CompactionIterator(
22 InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
23 SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
24 SequenceNumber earliest_write_conflict_snapshot,
25 const SnapshotChecker* snapshot_checker, Env* env,
26 bool report_detailed_time, bool expect_valid_internal_key,
27 CompactionRangeDelAggregator* range_del_agg,
28 BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
29 const Compaction* compaction, const CompactionFilter* compaction_filter,
30 const std::atomic<bool>* shutting_down,
31 const SequenceNumber preserve_deletes_seqnum,
32 const std::atomic<int>* manual_compaction_paused,
33 const std::atomic<bool>* manual_compaction_canceled,
34 const std::shared_ptr<Logger> info_log,
35 const std::string* full_history_ts_low)
36 : CompactionIterator(
37 input, cmp, merge_helper, last_sequence, snapshots,
38 earliest_write_conflict_snapshot, snapshot_checker, env,
39 report_detailed_time, expect_valid_internal_key, range_del_agg,
40 blob_file_builder, allow_data_in_errors,
41 std::unique_ptr<CompactionProxy>(
42 compaction ? new RealCompaction(compaction) : nullptr),
43 compaction_filter, shutting_down, preserve_deletes_seqnum,
44 manual_compaction_paused, manual_compaction_canceled, info_log,
45 full_history_ts_low) {}
46
CompactionIterator(InternalIterator * input,const Comparator * cmp,MergeHelper * merge_helper,SequenceNumber,std::vector<SequenceNumber> * snapshots,SequenceNumber earliest_write_conflict_snapshot,const SnapshotChecker * snapshot_checker,Env * env,bool report_detailed_time,bool expect_valid_internal_key,CompactionRangeDelAggregator * range_del_agg,BlobFileBuilder * blob_file_builder,bool allow_data_in_errors,std::unique_ptr<CompactionProxy> compaction,const CompactionFilter * compaction_filter,const std::atomic<bool> * shutting_down,const SequenceNumber preserve_deletes_seqnum,const std::atomic<int> * manual_compaction_paused,const std::atomic<bool> * manual_compaction_canceled,const std::shared_ptr<Logger> info_log,const std::string * full_history_ts_low)47 CompactionIterator::CompactionIterator(
48 InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
49 SequenceNumber /*last_sequence*/, std::vector<SequenceNumber>* snapshots,
50 SequenceNumber earliest_write_conflict_snapshot,
51 const SnapshotChecker* snapshot_checker, Env* env,
52 bool report_detailed_time, bool expect_valid_internal_key,
53 CompactionRangeDelAggregator* range_del_agg,
54 BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
55 std::unique_ptr<CompactionProxy> compaction,
56 const CompactionFilter* compaction_filter,
57 const std::atomic<bool>* shutting_down,
58 const SequenceNumber preserve_deletes_seqnum,
59 const std::atomic<int>* manual_compaction_paused,
60 const std::atomic<bool>* manual_compaction_canceled,
61 const std::shared_ptr<Logger> info_log,
62 const std::string* full_history_ts_low)
63 : input_(input, cmp,
64 !compaction || compaction->DoesInputReferenceBlobFiles()),
65 cmp_(cmp),
66 merge_helper_(merge_helper),
67 snapshots_(snapshots),
68 earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
69 snapshot_checker_(snapshot_checker),
70 env_(env),
71 clock_(env_->GetSystemClock().get()),
72 report_detailed_time_(report_detailed_time),
73 expect_valid_internal_key_(expect_valid_internal_key),
74 range_del_agg_(range_del_agg),
75 blob_file_builder_(blob_file_builder),
76 compaction_(std::move(compaction)),
77 compaction_filter_(compaction_filter),
78 shutting_down_(shutting_down),
79 manual_compaction_paused_(manual_compaction_paused),
80 manual_compaction_canceled_(manual_compaction_canceled),
81 preserve_deletes_seqnum_(preserve_deletes_seqnum),
82 info_log_(info_log),
83 allow_data_in_errors_(allow_data_in_errors),
84 timestamp_size_(cmp_ ? cmp_->timestamp_size() : 0),
85 full_history_ts_low_(full_history_ts_low),
86 current_user_key_sequence_(0),
87 current_user_key_snapshot_(0),
88 merge_out_iter_(merge_helper_),
89 blob_garbage_collection_cutoff_file_number_(
90 ComputeBlobGarbageCollectionCutoffFileNumber(compaction_.get())),
91 current_key_committed_(false),
92 cmp_with_history_ts_low_(0),
93 level_(compaction_ == nullptr ? 0 : compaction_->level()) {
94 assert(snapshots_ != nullptr);
95 bottommost_level_ = compaction_ == nullptr
96 ? false
97 : compaction_->bottommost_level() &&
98 !compaction_->allow_ingest_behind();
99 if (compaction_ != nullptr) {
100 level_ptrs_ = std::vector<size_t>(compaction_->number_levels(), 0);
101 }
102 if (snapshots_->size() == 0) {
103 // optimize for fast path if there are no snapshots
104 visible_at_tip_ = true;
105 earliest_snapshot_iter_ = snapshots_->end();
106 earliest_snapshot_ = kMaxSequenceNumber;
107 latest_snapshot_ = 0;
108 } else {
109 visible_at_tip_ = false;
110 earliest_snapshot_iter_ = snapshots_->begin();
111 earliest_snapshot_ = snapshots_->at(0);
112 latest_snapshot_ = snapshots_->back();
113 }
114 #ifndef NDEBUG
115 // findEarliestVisibleSnapshot assumes this ordering.
116 for (size_t i = 1; i < snapshots_->size(); ++i) {
117 assert(snapshots_->at(i - 1) < snapshots_->at(i));
118 }
119 assert(timestamp_size_ == 0 || !full_history_ts_low_ ||
120 timestamp_size_ == full_history_ts_low_->size());
121 #endif
122 input_.SetPinnedItersMgr(&pinned_iters_mgr_);
123 TEST_SYNC_POINT_CALLBACK("CompactionIterator:AfterInit", compaction_.get());
124 }
125
~CompactionIterator()126 CompactionIterator::~CompactionIterator() {
127 // input_ Iterator lifetime is longer than pinned_iters_mgr_ lifetime
128 input_.SetPinnedItersMgr(nullptr);
129 }
130
ResetRecordCounts()131 void CompactionIterator::ResetRecordCounts() {
132 iter_stats_.num_record_drop_user = 0;
133 iter_stats_.num_record_drop_hidden = 0;
134 iter_stats_.num_record_drop_obsolete = 0;
135 iter_stats_.num_record_drop_range_del = 0;
136 iter_stats_.num_range_del_drop_obsolete = 0;
137 iter_stats_.num_optimized_del_drop_obsolete = 0;
138 }
139
SeekToFirst()140 void CompactionIterator::SeekToFirst() {
141 NextFromInput();
142 PrepareOutput();
143 }
144
Next()145 void CompactionIterator::Next() {
146 // If there is a merge output, return it before continuing to process the
147 // input.
148 if (merge_out_iter_.Valid()) {
149 merge_out_iter_.Next();
150
151 // Check if we returned all records of the merge output.
152 if (merge_out_iter_.Valid()) {
153 key_ = merge_out_iter_.key();
154 value_ = merge_out_iter_.value();
155 Status s = ParseInternalKey(key_, &ikey_, allow_data_in_errors_);
156 // MergeUntil stops when it encounters a corrupt key and does not
157 // include them in the result, so we expect the keys here to be valid.
158 assert(s.ok());
159 if (!s.ok()) {
160 ROCKS_LOG_FATAL(info_log_, "Invalid key in compaction. %s",
161 s.getState());
162 }
163
164 // Keep current_key_ in sync.
165 current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
166 key_ = current_key_.GetInternalKey();
167 ikey_.user_key = current_key_.GetUserKey();
168 valid_ = true;
169 } else {
170 // We consumed all pinned merge operands, release pinned iterators
171 pinned_iters_mgr_.ReleasePinnedData();
172 // MergeHelper moves the iterator to the first record after the merged
173 // records, so even though we reached the end of the merge output, we do
174 // not want to advance the iterator.
175 NextFromInput();
176 }
177 } else {
178 // Only advance the input iterator if there is no merge output and the
179 // iterator is not already at the next record.
180 if (!at_next_) {
181 AdvanceInputIter();
182 }
183 NextFromInput();
184 }
185
186 if (valid_) {
187 // Record that we've outputted a record for the current key.
188 has_outputted_key_ = true;
189 }
190
191 PrepareOutput();
192 }
193
InvokeFilterIfNeeded(bool * need_skip,Slice * skip_until)194 bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
195 Slice* skip_until) {
196 if (!compaction_filter_ ||
197 (ikey_.type != kTypeValue && ikey_.type != kTypeBlobIndex)) {
198 return true;
199 }
200 bool error = false;
201 // If the user has specified a compaction filter and the sequence
202 // number is greater than any external snapshot, then invoke the
203 // filter. If the return value of the compaction filter is true,
204 // replace the entry with a deletion marker.
205 CompactionFilter::Decision filter = CompactionFilter::Decision::kUndetermined;
206 compaction_filter_value_.clear();
207 compaction_filter_skip_until_.Clear();
208 CompactionFilter::ValueType value_type =
209 ikey_.type == kTypeValue ? CompactionFilter::ValueType::kValue
210 : CompactionFilter::ValueType::kBlobIndex;
211 // Hack: pass internal key to BlobIndexCompactionFilter since it needs
212 // to get sequence number.
213 assert(compaction_filter_);
214 Slice& filter_key =
215 (ikey_.type == kTypeValue ||
216 !compaction_filter_->IsStackedBlobDbInternalCompactionFilter())
217 ? ikey_.user_key
218 : key_;
219 {
220 StopWatchNano timer(clock_, report_detailed_time_);
221 if (kTypeBlobIndex == ikey_.type) {
222 blob_value_.Reset();
223 filter = compaction_filter_->FilterBlobByKey(
224 level_, filter_key, &compaction_filter_value_,
225 compaction_filter_skip_until_.rep());
226 if (CompactionFilter::Decision::kUndetermined == filter &&
227 !compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) {
228 // For integrated BlobDB impl, CompactionIterator reads blob value.
229 // For Stacked BlobDB impl, the corresponding CompactionFilter's
230 // FilterV2 method should read the blob value.
231 BlobIndex blob_index;
232 Status s = blob_index.DecodeFrom(value_);
233 if (!s.ok()) {
234 status_ = s;
235 valid_ = false;
236 return false;
237 }
238 if (blob_index.HasTTL() || blob_index.IsInlined()) {
239 status_ = Status::Corruption("Unexpected TTL/inlined blob index");
240 valid_ = false;
241 return false;
242 }
243 if (compaction_ == nullptr) {
244 status_ =
245 Status::Corruption("Unexpected blob index outside of compaction");
246 valid_ = false;
247 return false;
248 }
249 const Version* const version = compaction_->input_version();
250 assert(version);
251
252 uint64_t bytes_read = 0;
253 s = version->GetBlob(ReadOptions(), ikey_.user_key, blob_index,
254 &blob_value_, &bytes_read);
255 if (!s.ok()) {
256 status_ = s;
257 valid_ = false;
258 return false;
259 }
260
261 ++iter_stats_.num_blobs_read;
262 iter_stats_.total_blob_bytes_read += bytes_read;
263
264 value_type = CompactionFilter::ValueType::kValue;
265 }
266 }
267 if (CompactionFilter::Decision::kUndetermined == filter) {
268 filter = compaction_filter_->FilterV2(
269 level_, filter_key, value_type,
270 blob_value_.empty() ? value_ : blob_value_, &compaction_filter_value_,
271 compaction_filter_skip_until_.rep());
272 }
273 iter_stats_.total_filter_time +=
274 env_ != nullptr && report_detailed_time_ ? timer.ElapsedNanos() : 0;
275 }
276
277 if (CompactionFilter::Decision::kUndetermined == filter) {
278 // Should not reach here, since FilterV2 should never return kUndetermined.
279 status_ =
280 Status::NotSupported("FilterV2() should never return kUndetermined");
281 valid_ = false;
282 return false;
283 }
284
285 if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil &&
286 cmp_->Compare(*compaction_filter_skip_until_.rep(), ikey_.user_key) <=
287 0) {
288 // Can't skip to a key smaller than the current one.
289 // Keep the key as per FilterV2 documentation.
290 filter = CompactionFilter::Decision::kKeep;
291 }
292
293 if (filter == CompactionFilter::Decision::kRemove) {
294 // convert the current key to a delete; key_ is pointing into
295 // current_key_ at this point, so updating current_key_ updates key()
296 ikey_.type = kTypeDeletion;
297 current_key_.UpdateInternalKey(ikey_.sequence, kTypeDeletion);
298 // no value associated with delete
299 value_.clear();
300 iter_stats_.num_record_drop_user++;
301 } else if (filter == CompactionFilter::Decision::kChangeValue) {
302 if (ikey_.type == kTypeBlobIndex) {
303 // value transfer from blob file to inlined data
304 ikey_.type = kTypeValue;
305 current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
306 }
307 value_ = compaction_filter_value_;
308 } else if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil) {
309 *need_skip = true;
310 compaction_filter_skip_until_.ConvertFromUserKey(kMaxSequenceNumber,
311 kValueTypeForSeek);
312 *skip_until = compaction_filter_skip_until_.Encode();
313 } else if (filter == CompactionFilter::Decision::kChangeBlobIndex) {
314 // Only the StackableDB-based BlobDB impl's compaction filter should return
315 // kChangeBlobIndex. Decision about rewriting blob and changing blob index
316 // in the integrated BlobDB impl is made in subsequent call to
317 // PrepareOutput() and its callees.
318 if (!compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) {
319 status_ = Status::NotSupported(
320 "Only stacked BlobDB's internal compaction filter can return "
321 "kChangeBlobIndex.");
322 valid_ = false;
323 return false;
324 }
325 if (ikey_.type == kTypeValue) {
326 // value transfer from inlined data to blob file
327 ikey_.type = kTypeBlobIndex;
328 current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
329 }
330 value_ = compaction_filter_value_;
331 } else if (filter == CompactionFilter::Decision::kIOError) {
332 if (!compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) {
333 status_ = Status::NotSupported(
334 "CompactionFilter for integrated BlobDB should not return kIOError");
335 valid_ = false;
336 return false;
337 }
338 status_ = Status::IOError("Failed to access blob during compaction filter");
339 error = true;
340 }
341 return !error;
342 }
343
NextFromInput()344 void CompactionIterator::NextFromInput() {
345 at_next_ = false;
346 valid_ = false;
347
348 while (!valid_ && input_.Valid() && !IsPausingManualCompaction() &&
349 !IsShuttingDown()) {
350 key_ = input_.key();
351 value_ = input_.value();
352 iter_stats_.num_input_records++;
353
354 Status pik_status = ParseInternalKey(key_, &ikey_, allow_data_in_errors_);
355 if (!pik_status.ok()) {
356 iter_stats_.num_input_corrupt_records++;
357
358 // If `expect_valid_internal_key_` is false, return the corrupted key
359 // and let the caller decide what to do with it.
360 if (expect_valid_internal_key_) {
361 status_ = pik_status;
362 return;
363 }
364 key_ = current_key_.SetInternalKey(key_);
365 has_current_user_key_ = false;
366 current_user_key_sequence_ = kMaxSequenceNumber;
367 current_user_key_snapshot_ = 0;
368 valid_ = true;
369 break;
370 }
371 TEST_SYNC_POINT_CALLBACK("CompactionIterator:ProcessKV", &ikey_);
372
373 // Update input statistics
374 if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion ||
375 ikey_.type == kTypeDeletionWithTimestamp) {
376 iter_stats_.num_input_deletion_records++;
377 }
378 iter_stats_.total_input_raw_key_bytes += key_.size();
379 iter_stats_.total_input_raw_value_bytes += value_.size();
380
381 // If need_skip is true, we should seek the input iterator
382 // to internal key skip_until and continue from there.
383 bool need_skip = false;
384 // Points either into compaction_filter_skip_until_ or into
385 // merge_helper_->compaction_filter_skip_until_.
386 Slice skip_until;
387
388 bool user_key_equal_without_ts = false;
389 int cmp_ts = 0;
390 if (has_current_user_key_) {
391 user_key_equal_without_ts =
392 cmp_->EqualWithoutTimestamp(ikey_.user_key, current_user_key_);
393 // if timestamp_size_ > 0, then curr_ts_ has been initialized by a
394 // previous key.
395 cmp_ts = timestamp_size_ ? cmp_->CompareTimestamp(
396 ExtractTimestampFromUserKey(
397 ikey_.user_key, timestamp_size_),
398 curr_ts_)
399 : 0;
400 }
401
402 // Check whether the user key changed. After this if statement current_key_
403 // is a copy of the current input key (maybe converted to a delete by the
404 // compaction filter). ikey_.user_key is pointing to the copy.
405 if (!has_current_user_key_ || !user_key_equal_without_ts || cmp_ts != 0) {
406 // First occurrence of this user key
407 // Copy key for output
408 key_ = current_key_.SetInternalKey(key_, &ikey_);
409
410 // If timestamp_size_ > 0, then copy from ikey_ to curr_ts_ for the use
411 // in next iteration to compare with the timestamp of next key.
412 UpdateTimestampAndCompareWithFullHistoryLow();
413
414 // If
415 // (1) !has_current_user_key_, OR
416 // (2) timestamp is disabled, OR
417 // (3) all history will be preserved, OR
418 // (4) user key (excluding timestamp) is different from previous key, OR
419 // (5) timestamp is NO older than *full_history_ts_low_
420 // then current_user_key_ must be treated as a different user key.
421 // This means, if a user key (excluding ts) is the same as the previous
422 // user key, and its ts is older than *full_history_ts_low_, then we
423 // consider this key for GC, e.g. it may be dropped if certain conditions
424 // match.
425 if (!has_current_user_key_ || !timestamp_size_ || !full_history_ts_low_ ||
426 !user_key_equal_without_ts || cmp_with_history_ts_low_ >= 0) {
427 // Initialize for future comparison for rule (A) and etc.
428 current_user_key_sequence_ = kMaxSequenceNumber;
429 current_user_key_snapshot_ = 0;
430 has_current_user_key_ = true;
431 }
432 current_user_key_ = ikey_.user_key;
433
434 has_outputted_key_ = false;
435
436 current_key_committed_ = KeyCommitted(ikey_.sequence);
437
438 // Apply the compaction filter to the first committed version of the user
439 // key.
440 if (current_key_committed_ &&
441 !InvokeFilterIfNeeded(&need_skip, &skip_until)) {
442 break;
443 }
444 } else {
445 // Update the current key to reflect the new sequence number/type without
446 // copying the user key.
447 // TODO(rven): Compaction filter does not process keys in this path
448 // Need to have the compaction filter process multiple versions
449 // if we have versions on both sides of a snapshot
450 current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
451 key_ = current_key_.GetInternalKey();
452 ikey_.user_key = current_key_.GetUserKey();
453
454 // Note that newer version of a key is ordered before older versions. If a
455 // newer version of a key is committed, so as the older version. No need
456 // to query snapshot_checker_ in that case.
457 if (UNLIKELY(!current_key_committed_)) {
458 assert(snapshot_checker_ != nullptr);
459 current_key_committed_ = KeyCommitted(ikey_.sequence);
460 // Apply the compaction filter to the first committed version of the
461 // user key.
462 if (current_key_committed_ &&
463 !InvokeFilterIfNeeded(&need_skip, &skip_until)) {
464 break;
465 }
466 }
467 }
468
469 if (UNLIKELY(!current_key_committed_)) {
470 assert(snapshot_checker_ != nullptr);
471 valid_ = true;
472 break;
473 }
474
475 // If there are no snapshots, then this kv affect visibility at tip.
476 // Otherwise, search though all existing snapshots to find the earliest
477 // snapshot that is affected by this kv.
478 SequenceNumber last_sequence = current_user_key_sequence_;
479 current_user_key_sequence_ = ikey_.sequence;
480 SequenceNumber last_snapshot = current_user_key_snapshot_;
481 SequenceNumber prev_snapshot = 0; // 0 means no previous snapshot
482 current_user_key_snapshot_ =
483 visible_at_tip_
484 ? earliest_snapshot_
485 : findEarliestVisibleSnapshot(ikey_.sequence, &prev_snapshot);
486
487 if (need_skip) {
488 // This case is handled below.
489 } else if (clear_and_output_next_key_) {
490 // In the previous iteration we encountered a single delete that we could
491 // not compact out. We will keep this Put, but can drop it's data.
492 // (See Optimization 3, below.)
493 assert(ikey_.type == kTypeValue || ikey_.type == kTypeBlobIndex);
494 if (ikey_.type != kTypeValue && ikey_.type != kTypeBlobIndex) {
495 ROCKS_LOG_FATAL(info_log_,
496 "Unexpected key type %d for compaction output",
497 ikey_.type);
498 }
499 assert(current_user_key_snapshot_ >= last_snapshot);
500 if (current_user_key_snapshot_ < last_snapshot) {
501 ROCKS_LOG_FATAL(info_log_,
502 "current_user_key_snapshot_ (%" PRIu64
503 ") < last_snapshot (%" PRIu64 ")",
504 current_user_key_snapshot_, last_snapshot);
505 }
506
507 if (ikey_.type == kTypeBlobIndex) {
508 ikey_.type = kTypeValue;
509 current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
510 }
511
512 value_.clear();
513 valid_ = true;
514 clear_and_output_next_key_ = false;
515 } else if (ikey_.type == kTypeSingleDeletion) {
516 // We can compact out a SingleDelete if:
517 // 1) We encounter the corresponding PUT -OR- we know that this key
518 // doesn't appear past this output level
519 // =AND=
520 // 2) We've already returned a record in this snapshot -OR-
521 // there are no earlier earliest_write_conflict_snapshot.
522 //
523 // Rule 1 is needed for SingleDelete correctness. Rule 2 is needed to
524 // allow Transactions to do write-conflict checking (if we compacted away
525 // all keys, then we wouldn't know that a write happened in this
526 // snapshot). If there is no earlier snapshot, then we know that there
527 // are no active transactions that need to know about any writes.
528 //
529 // Optimization 3:
530 // If we encounter a SingleDelete followed by a PUT and Rule 2 is NOT
531 // true, then we must output a SingleDelete. In this case, we will decide
532 // to also output the PUT. While we are compacting less by outputting the
533 // PUT now, hopefully this will lead to better compaction in the future
534 // when Rule 2 is later true (Ie, We are hoping we can later compact out
535 // both the SingleDelete and the Put, while we couldn't if we only
536 // outputted the SingleDelete now).
537 // In this case, we can save space by removing the PUT's value as it will
538 // never be read.
539 //
540 // Deletes and Merges are not supported on the same key that has a
541 // SingleDelete as it is not possible to correctly do any partial
542 // compaction of such a combination of operations. The result of mixing
543 // those operations for a given key is documented as being undefined. So
544 // we can choose how to handle such a combinations of operations. We will
545 // try to compact out as much as we can in these cases.
546 // We will report counts on these anomalous cases.
547 //
548 // Note: If timestamp is enabled, then record will be eligible for
549 // deletion, only if, along with above conditions (Rule 1 and Rule 2)
550 // full_history_ts_low_ is specified and timestamp for that key is less
551 // than *full_history_ts_low_. If it's not eligible for deletion, then we
552 // will output the SingleDelete. For Optimization 3 also, if
553 // full_history_ts_low_ is specified and timestamp for the key is less
554 // than *full_history_ts_low_ then only optimization will be applied.
555
556 // The easiest way to process a SingleDelete during iteration is to peek
557 // ahead at the next key.
558 const bool is_timestamp_eligible_for_gc =
559 (timestamp_size_ == 0 ||
560 (full_history_ts_low_ && cmp_with_history_ts_low_ < 0));
561
562 ParsedInternalKey next_ikey;
563 AdvanceInputIter();
564
565 // Check whether the next key exists, is not corrupt, and is the same key
566 // as the single delete.
567 if (input_.Valid() &&
568 ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_)
569 .ok() &&
570 cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key)) {
571 #ifndef NDEBUG
572 const Compaction* c =
573 compaction_ ? compaction_->real_compaction() : nullptr;
574 #endif
575 TEST_SYNC_POINT_CALLBACK(
576 "CompactionIterator::NextFromInput:SingleDelete:1",
577 const_cast<Compaction*>(c));
578 // Check whether the next key belongs to the same snapshot as the
579 // SingleDelete.
580 if (prev_snapshot == 0 ||
581 DefinitelyNotInSnapshot(next_ikey.sequence, prev_snapshot)) {
582 TEST_SYNC_POINT_CALLBACK(
583 "CompactionIterator::NextFromInput:SingleDelete:2", nullptr);
584 if (next_ikey.type == kTypeSingleDeletion) {
585 // We encountered two SingleDeletes for same key in a row. This
586 // could be due to unexpected user input. Skip the first
587 // SingleDelete and let the next iteration decide how to handle the
588 // second SingleDelete
589
590 // First SingleDelete has been skipped since we already called
591 // input_.Next().
592 ++iter_stats_.num_record_drop_obsolete;
593 ++iter_stats_.num_single_del_mismatch;
594 } else if (!is_timestamp_eligible_for_gc) {
595 // We cannot drop the SingleDelete as timestamp is enabled, and
596 // timestamp of this key is greater than or equal to
597 // *full_history_ts_low_. We will output the SingleDelete.
598 valid_ = true;
599 } else if (has_outputted_key_ ||
600 DefinitelyInSnapshot(ikey_.sequence,
601 earliest_write_conflict_snapshot_)) {
602 // Found a matching value, we can drop the single delete and the
603 // value. It is safe to drop both records since we've already
604 // outputted a key in this snapshot, or there is no earlier
605 // snapshot (Rule 2 above).
606
607 // Note: it doesn't matter whether the second key is a Put or if it
608 // is an unexpected Merge or Delete. We will compact it out
609 // either way. We will maintain counts of how many mismatches
610 // happened
611 if (next_ikey.type != kTypeValue &&
612 next_ikey.type != kTypeBlobIndex) {
613 ++iter_stats_.num_single_del_mismatch;
614 }
615
616 ++iter_stats_.num_record_drop_hidden;
617 ++iter_stats_.num_record_drop_obsolete;
618 // Already called input_.Next() once. Call it a second time to
619 // skip past the second key.
620 AdvanceInputIter();
621 } else {
622 // Found a matching value, but we cannot drop both keys since
623 // there is an earlier snapshot and we need to leave behind a record
624 // to know that a write happened in this snapshot (Rule 2 above).
625 // Clear the value and output the SingleDelete. (The value will be
626 // outputted on the next iteration.)
627
628 // Setting valid_ to true will output the current SingleDelete
629 valid_ = true;
630
631 // Set up the Put to be outputted in the next iteration.
632 // (Optimization 3).
633 clear_and_output_next_key_ = true;
634 TEST_SYNC_POINT_CALLBACK(
635 "CompactionIterator::NextFromInput:KeepSDForWW",
636 /*arg=*/nullptr);
637 }
638 } else {
639 // We hit the next snapshot without hitting a put, so the iterator
640 // returns the single delete.
641 valid_ = true;
642 }
643 } else {
644 // We are at the end of the input, could not parse the next key, or hit
645 // a different key. The iterator returns the single delete if the key
646 // possibly exists beyond the current output level. We set
647 // has_current_user_key to false so that if the iterator is at the next
648 // key, we do not compare it again against the previous key at the next
649 // iteration. If the next key is corrupt, we return before the
650 // comparison, so the value of has_current_user_key does not matter.
651 has_current_user_key_ = false;
652 if (compaction_ != nullptr &&
653 DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) &&
654 compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key,
655 &level_ptrs_) &&
656 is_timestamp_eligible_for_gc) {
657 // Key doesn't exist outside of this range.
658 // Can compact out this SingleDelete.
659 ++iter_stats_.num_record_drop_obsolete;
660 ++iter_stats_.num_single_del_fallthru;
661 if (!bottommost_level_) {
662 ++iter_stats_.num_optimized_del_drop_obsolete;
663 }
664 } else {
665 // Output SingleDelete
666 valid_ = true;
667 }
668 }
669
670 if (valid_) {
671 at_next_ = true;
672 }
673 } else if (last_snapshot == current_user_key_snapshot_ ||
674 (last_snapshot > 0 &&
675 last_snapshot < current_user_key_snapshot_)) {
676 // If the earliest snapshot is which this key is visible in
677 // is the same as the visibility of a previous instance of the
678 // same key, then this kv is not visible in any snapshot.
679 // Hidden by an newer entry for same user key
680 //
681 // Note: Dropping this key will not affect TransactionDB write-conflict
682 // checking since there has already been a record returned for this key
683 // in this snapshot.
684 assert(last_sequence >= current_user_key_sequence_);
685 if (last_sequence < current_user_key_sequence_) {
686 ROCKS_LOG_FATAL(info_log_,
687 "last_sequence (%" PRIu64
688 ") < current_user_key_sequence_ (%" PRIu64 ")",
689 last_sequence, current_user_key_sequence_);
690 }
691
692 ++iter_stats_.num_record_drop_hidden; // rule (A)
693 AdvanceInputIter();
694 } else if (compaction_ != nullptr &&
695 (ikey_.type == kTypeDeletion ||
696 (ikey_.type == kTypeDeletionWithTimestamp &&
697 cmp_with_history_ts_low_ < 0)) &&
698 DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) &&
699 ikeyNotNeededForIncrementalSnapshot() &&
700 compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key,
701 &level_ptrs_)) {
702 // TODO(noetzli): This is the only place where we use compaction_
703 // (besides the constructor). We should probably get rid of this
704 // dependency and find a way to do similar filtering during flushes.
705 //
706 // For this user key:
707 // (1) there is no data in higher levels
708 // (2) data in lower levels will have larger sequence numbers
709 // (3) data in layers that are being compacted here and have
710 // smaller sequence numbers will be dropped in the next
711 // few iterations of this loop (by rule (A) above).
712 // Therefore this deletion marker is obsolete and can be dropped.
713 //
714 // Note: Dropping this Delete will not affect TransactionDB
715 // write-conflict checking since it is earlier than any snapshot.
716 //
717 // It seems that we can also drop deletion later than earliest snapshot
718 // given that:
719 // (1) The deletion is earlier than earliest_write_conflict_snapshot, and
720 // (2) No value exist earlier than the deletion.
721 //
722 // Note also that a deletion marker of type kTypeDeletionWithTimestamp
723 // will be treated as a different user key unless the timestamp is older
724 // than *full_history_ts_low_.
725 ++iter_stats_.num_record_drop_obsolete;
726 if (!bottommost_level_) {
727 ++iter_stats_.num_optimized_del_drop_obsolete;
728 }
729 AdvanceInputIter();
730 } else if ((ikey_.type == kTypeDeletion ||
731 (ikey_.type == kTypeDeletionWithTimestamp &&
732 cmp_with_history_ts_low_ < 0)) &&
733 bottommost_level_ && ikeyNotNeededForIncrementalSnapshot()) {
734 // Handle the case where we have a delete key at the bottom most level
735 // We can skip outputting the key iff there are no subsequent puts for this
736 // key
737 assert(!compaction_ || compaction_->KeyNotExistsBeyondOutputLevel(
738 ikey_.user_key, &level_ptrs_));
739 ParsedInternalKey next_ikey;
740 AdvanceInputIter();
741 #ifndef NDEBUG
742 const Compaction* c =
743 compaction_ ? compaction_->real_compaction() : nullptr;
744 #endif
745 TEST_SYNC_POINT_CALLBACK(
746 "CompactionIterator::NextFromInput:BottommostDelete:1",
747 const_cast<Compaction*>(c));
748 // Skip over all versions of this key that happen to occur in the same
749 // snapshot range as the delete.
750 //
751 // Note that a deletion marker of type kTypeDeletionWithTimestamp will be
752 // considered to have a different user key unless the timestamp is older
753 // than *full_history_ts_low_.
754 while (!IsPausingManualCompaction() && !IsShuttingDown() &&
755 input_.Valid() &&
756 (ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_)
757 .ok()) &&
758 cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key) &&
759 (prev_snapshot == 0 ||
760 DefinitelyNotInSnapshot(next_ikey.sequence, prev_snapshot))) {
761 AdvanceInputIter();
762 }
763 // If you find you still need to output a row with this key, we need to output the
764 // delete too
765 if (input_.Valid() &&
766 (ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_)
767 .ok()) &&
768 cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key)) {
769 valid_ = true;
770 at_next_ = true;
771 }
772 } else if (ikey_.type == kTypeMerge) {
773 if (!merge_helper_->HasOperator()) {
774 status_ = Status::InvalidArgument(
775 "merge_operator is not properly initialized.");
776 return;
777 }
778
779 pinned_iters_mgr_.StartPinning();
780 Version* version = compaction_ ? compaction_->input_version() : nullptr;
781
782 // We know the merge type entry is not hidden, otherwise we would
783 // have hit (A)
784 // We encapsulate the merge related state machine in a different
785 // object to minimize change to the existing flow.
786 Status s = merge_helper_->MergeUntil(&input_, range_del_agg_,
787 prev_snapshot, bottommost_level_,
788 allow_data_in_errors_, version);
789 merge_out_iter_.SeekToFirst();
790
791 if (!s.ok() && !s.IsMergeInProgress()) {
792 status_ = s;
793 return;
794 } else if (merge_out_iter_.Valid()) {
795 // NOTE: key, value, and ikey_ refer to old entries.
796 // These will be correctly set below.
797 key_ = merge_out_iter_.key();
798 value_ = merge_out_iter_.value();
799 pik_status = ParseInternalKey(key_, &ikey_, allow_data_in_errors_);
800 // MergeUntil stops when it encounters a corrupt key and does not
801 // include them in the result, so we expect the keys here to valid.
802 assert(pik_status.ok());
803 if (!pik_status.ok()) {
804 ROCKS_LOG_FATAL(info_log_, "Invalid key in compaction. %s",
805 pik_status.getState());
806 }
807 // Keep current_key_ in sync.
808 current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
809 key_ = current_key_.GetInternalKey();
810 ikey_.user_key = current_key_.GetUserKey();
811 valid_ = true;
812 } else {
813 // all merge operands were filtered out. reset the user key, since the
814 // batch consumed by the merge operator should not shadow any keys
815 // coming after the merges
816 has_current_user_key_ = false;
817 pinned_iters_mgr_.ReleasePinnedData();
818
819 if (merge_helper_->FilteredUntil(&skip_until)) {
820 need_skip = true;
821 }
822 }
823 } else {
824 // 1. new user key -OR-
825 // 2. different snapshot stripe
826 bool should_delete = range_del_agg_->ShouldDelete(
827 key_, RangeDelPositioningMode::kForwardTraversal);
828 if (should_delete) {
829 ++iter_stats_.num_record_drop_hidden;
830 ++iter_stats_.num_record_drop_range_del;
831 AdvanceInputIter();
832 } else {
833 valid_ = true;
834 }
835 }
836
837 if (need_skip) {
838 SkipUntil(skip_until);
839 }
840 }
841
842 if (!valid_ && IsShuttingDown()) {
843 status_ = Status::ShutdownInProgress();
844 }
845
846 if (IsPausingManualCompaction()) {
847 status_ = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
848 }
849 }
850
ExtractLargeValueIfNeededImpl()851 bool CompactionIterator::ExtractLargeValueIfNeededImpl() {
852 if (!blob_file_builder_) {
853 return false;
854 }
855
856 blob_index_.clear();
857 const Status s = blob_file_builder_->Add(user_key(), value_, &blob_index_);
858
859 if (!s.ok()) {
860 status_ = s;
861 valid_ = false;
862
863 return false;
864 }
865
866 if (blob_index_.empty()) {
867 return false;
868 }
869
870 value_ = blob_index_;
871
872 return true;
873 }
874
ExtractLargeValueIfNeeded()875 void CompactionIterator::ExtractLargeValueIfNeeded() {
876 assert(ikey_.type == kTypeValue);
877
878 if (!ExtractLargeValueIfNeededImpl()) {
879 return;
880 }
881
882 ikey_.type = kTypeBlobIndex;
883 current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
884 }
885
GarbageCollectBlobIfNeeded()886 void CompactionIterator::GarbageCollectBlobIfNeeded() {
887 assert(ikey_.type == kTypeBlobIndex);
888
889 if (!compaction_) {
890 return;
891 }
892
893 // GC for integrated BlobDB
894 if (compaction_->enable_blob_garbage_collection()) {
895 BlobIndex blob_index;
896
897 {
898 const Status s = blob_index.DecodeFrom(value_);
899
900 if (!s.ok()) {
901 status_ = s;
902 valid_ = false;
903
904 return;
905 }
906 }
907
908 if (blob_index.IsInlined() || blob_index.HasTTL()) {
909 status_ = Status::Corruption("Unexpected TTL/inlined blob index");
910 valid_ = false;
911
912 return;
913 }
914
915 if (blob_index.file_number() >=
916 blob_garbage_collection_cutoff_file_number_) {
917 return;
918 }
919
920 const Version* const version = compaction_->input_version();
921 assert(version);
922
923 uint64_t bytes_read = 0;
924
925 {
926 const Status s = version->GetBlob(ReadOptions(), user_key(), blob_index,
927 &blob_value_, &bytes_read);
928
929 if (!s.ok()) {
930 status_ = s;
931 valid_ = false;
932
933 return;
934 }
935 }
936
937 ++iter_stats_.num_blobs_read;
938 iter_stats_.total_blob_bytes_read += bytes_read;
939
940 ++iter_stats_.num_blobs_relocated;
941 iter_stats_.total_blob_bytes_relocated += blob_index.size();
942
943 value_ = blob_value_;
944
945 if (ExtractLargeValueIfNeededImpl()) {
946 return;
947 }
948
949 ikey_.type = kTypeValue;
950 current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
951
952 return;
953 }
954
955 // GC for stacked BlobDB
956 if (compaction_filter_ &&
957 compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) {
958 const auto blob_decision = compaction_filter_->PrepareBlobOutput(
959 user_key(), value_, &compaction_filter_value_);
960
961 if (blob_decision == CompactionFilter::BlobDecision::kCorruption) {
962 status_ =
963 Status::Corruption("Corrupted blob reference encountered during GC");
964 valid_ = false;
965
966 return;
967 }
968
969 if (blob_decision == CompactionFilter::BlobDecision::kIOError) {
970 status_ = Status::IOError("Could not relocate blob during GC");
971 valid_ = false;
972
973 return;
974 }
975
976 if (blob_decision == CompactionFilter::BlobDecision::kChangeValue) {
977 value_ = compaction_filter_value_;
978
979 return;
980 }
981 }
982 }
983
PrepareOutput()984 void CompactionIterator::PrepareOutput() {
985 if (valid_) {
986 if (ikey_.type == kTypeValue) {
987 ExtractLargeValueIfNeeded();
988 } else if (ikey_.type == kTypeBlobIndex) {
989 GarbageCollectBlobIfNeeded();
990 }
991
992 // Zeroing out the sequence number leads to better compression.
993 // If this is the bottommost level (no files in lower levels)
994 // and the earliest snapshot is larger than this seqno
995 // and the userkey differs from the last userkey in compaction
996 // then we can squash the seqno to zero.
997 //
998 // This is safe for TransactionDB write-conflict checking since transactions
999 // only care about sequence number larger than any active snapshots.
1000 //
1001 // Can we do the same for levels above bottom level as long as
1002 // KeyNotExistsBeyondOutputLevel() return true?
1003 if (valid_ && compaction_ != nullptr &&
1004 !compaction_->allow_ingest_behind() &&
1005 ikeyNotNeededForIncrementalSnapshot() && bottommost_level_ &&
1006 DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) &&
1007 ikey_.type != kTypeMerge) {
1008 assert(ikey_.type != kTypeDeletion);
1009 assert(ikey_.type != kTypeSingleDeletion ||
1010 (timestamp_size_ || full_history_ts_low_));
1011 if (ikey_.type == kTypeDeletion ||
1012 (ikey_.type == kTypeSingleDeletion &&
1013 (!timestamp_size_ || !full_history_ts_low_))) {
1014 ROCKS_LOG_FATAL(info_log_,
1015 "Unexpected key type %d for seq-zero optimization",
1016 ikey_.type);
1017 }
1018 ikey_.sequence = 0;
1019 if (!timestamp_size_) {
1020 current_key_.UpdateInternalKey(0, ikey_.type);
1021 } else if (full_history_ts_low_ && cmp_with_history_ts_low_ < 0) {
1022 // We can also zero out timestamp for better compression.
1023 // For the same user key (excluding timestamp), the timestamp-based
1024 // history can be collapsed to save some space if the timestamp is
1025 // older than *full_history_ts_low_.
1026 const std::string kTsMin(timestamp_size_, static_cast<char>(0));
1027 const Slice ts_slice = kTsMin;
1028 ikey_.SetTimestamp(ts_slice);
1029 current_key_.UpdateInternalKey(0, ikey_.type, &ts_slice);
1030 }
1031 }
1032 }
1033 }
1034
findEarliestVisibleSnapshot(SequenceNumber in,SequenceNumber * prev_snapshot)1035 inline SequenceNumber CompactionIterator::findEarliestVisibleSnapshot(
1036 SequenceNumber in, SequenceNumber* prev_snapshot) {
1037 assert(snapshots_->size());
1038 if (snapshots_->size() == 0) {
1039 ROCKS_LOG_FATAL(info_log_,
1040 "No snapshot left in findEarliestVisibleSnapshot");
1041 }
1042 auto snapshots_iter = std::lower_bound(
1043 snapshots_->begin(), snapshots_->end(), in);
1044 if (snapshots_iter == snapshots_->begin()) {
1045 *prev_snapshot = 0;
1046 } else {
1047 *prev_snapshot = *std::prev(snapshots_iter);
1048 assert(*prev_snapshot < in);
1049 if (*prev_snapshot >= in) {
1050 ROCKS_LOG_FATAL(info_log_,
1051 "*prev_snapshot >= in in findEarliestVisibleSnapshot");
1052 }
1053 }
1054 if (snapshot_checker_ == nullptr) {
1055 return snapshots_iter != snapshots_->end()
1056 ? *snapshots_iter : kMaxSequenceNumber;
1057 }
1058 bool has_released_snapshot = !released_snapshots_.empty();
1059 for (; snapshots_iter != snapshots_->end(); ++snapshots_iter) {
1060 auto cur = *snapshots_iter;
1061 assert(in <= cur);
1062 if (in > cur) {
1063 ROCKS_LOG_FATAL(info_log_, "in > cur in findEarliestVisibleSnapshot");
1064 }
1065 // Skip if cur is in released_snapshots.
1066 if (has_released_snapshot && released_snapshots_.count(cur) > 0) {
1067 continue;
1068 }
1069 auto res = snapshot_checker_->CheckInSnapshot(in, cur);
1070 if (res == SnapshotCheckerResult::kInSnapshot) {
1071 return cur;
1072 } else if (res == SnapshotCheckerResult::kSnapshotReleased) {
1073 released_snapshots_.insert(cur);
1074 }
1075 *prev_snapshot = cur;
1076 }
1077 return kMaxSequenceNumber;
1078 }
1079
1080 // used in 2 places - prevents deletion markers to be dropped if they may be
1081 // needed and disables seqnum zero-out in PrepareOutput for recent keys.
ikeyNotNeededForIncrementalSnapshot()1082 inline bool CompactionIterator::ikeyNotNeededForIncrementalSnapshot() {
1083 return (!compaction_->preserve_deletes()) ||
1084 (ikey_.sequence < preserve_deletes_seqnum_);
1085 }
1086
IsInCurrentEarliestSnapshot(SequenceNumber sequence)1087 bool CompactionIterator::IsInCurrentEarliestSnapshot(SequenceNumber sequence) {
1088 assert(snapshot_checker_ != nullptr);
1089 bool pre_condition = (earliest_snapshot_ == kMaxSequenceNumber ||
1090 (earliest_snapshot_iter_ != snapshots_->end() &&
1091 *earliest_snapshot_iter_ == earliest_snapshot_));
1092 assert(pre_condition);
1093 if (!pre_condition) {
1094 ROCKS_LOG_FATAL(info_log_,
1095 "Pre-Condition is not hold in IsInEarliestSnapshot");
1096 }
1097 auto in_snapshot =
1098 snapshot_checker_->CheckInSnapshot(sequence, earliest_snapshot_);
1099 while (UNLIKELY(in_snapshot == SnapshotCheckerResult::kSnapshotReleased)) {
1100 // Avoid the the current earliest_snapshot_ being return as
1101 // earliest visible snapshot for the next value. So if a value's sequence
1102 // is zero-ed out by PrepareOutput(), the next value will be compact out.
1103 released_snapshots_.insert(earliest_snapshot_);
1104 earliest_snapshot_iter_++;
1105
1106 if (earliest_snapshot_iter_ == snapshots_->end()) {
1107 earliest_snapshot_ = kMaxSequenceNumber;
1108 } else {
1109 earliest_snapshot_ = *earliest_snapshot_iter_;
1110 }
1111 in_snapshot =
1112 snapshot_checker_->CheckInSnapshot(sequence, earliest_snapshot_);
1113 }
1114 assert(in_snapshot != SnapshotCheckerResult::kSnapshotReleased);
1115 if (in_snapshot == SnapshotCheckerResult::kSnapshotReleased) {
1116 ROCKS_LOG_FATAL(info_log_,
1117 "Unexpected released snapshot in IsInEarliestSnapshot");
1118 }
1119 return in_snapshot == SnapshotCheckerResult::kInSnapshot;
1120 }
1121
ComputeBlobGarbageCollectionCutoffFileNumber(const CompactionProxy * compaction)1122 uint64_t CompactionIterator::ComputeBlobGarbageCollectionCutoffFileNumber(
1123 const CompactionProxy* compaction) {
1124 if (!compaction) {
1125 return 0;
1126 }
1127
1128 if (!compaction->enable_blob_garbage_collection()) {
1129 return 0;
1130 }
1131
1132 Version* const version = compaction->input_version();
1133 assert(version);
1134
1135 const VersionStorageInfo* const storage_info = version->storage_info();
1136 assert(storage_info);
1137
1138 const auto& blob_files = storage_info->GetBlobFiles();
1139
1140 auto it = blob_files.begin();
1141 std::advance(
1142 it, compaction->blob_garbage_collection_age_cutoff() * blob_files.size());
1143
1144 return it != blob_files.end() ? it->first
1145 : std::numeric_limits<uint64_t>::max();
1146 }
1147
1148 } // namespace ROCKSDB_NAMESPACE
1149