1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include <cinttypes>
7
8 #include "db/compaction/compaction_iterator.h"
9 #include "db/snapshot_checker.h"
10 #include "port/likely.h"
11 #include "rocksdb/listener.h"
12 #include "table/internal_iterator.h"
13 #include "test_util/sync_point.h"
14
15 #define DEFINITELY_IN_SNAPSHOT(seq, snapshot) \
16 ((seq) <= (snapshot) && \
17 (snapshot_checker_ == nullptr || \
18 LIKELY(snapshot_checker_->CheckInSnapshot((seq), (snapshot)) == \
19 SnapshotCheckerResult::kInSnapshot)))
20
21 #define DEFINITELY_NOT_IN_SNAPSHOT(seq, snapshot) \
22 ((seq) > (snapshot) || \
23 (snapshot_checker_ != nullptr && \
24 UNLIKELY(snapshot_checker_->CheckInSnapshot((seq), (snapshot)) == \
25 SnapshotCheckerResult::kNotInSnapshot)))
26
27 #define IN_EARLIEST_SNAPSHOT(seq) \
28 ((seq) <= earliest_snapshot_ && \
29 (snapshot_checker_ == nullptr || LIKELY(IsInEarliestSnapshot(seq))))
30
31 namespace rocksdb {
32
CompactionIterator(InternalIterator * input,const Comparator * cmp,MergeHelper * merge_helper,SequenceNumber last_sequence,std::vector<SequenceNumber> * snapshots,SequenceNumber earliest_write_conflict_snapshot,const SnapshotChecker * snapshot_checker,Env * env,bool report_detailed_time,bool expect_valid_internal_key,CompactionRangeDelAggregator * range_del_agg,const Compaction * compaction,const CompactionFilter * compaction_filter,const std::atomic<bool> * shutting_down,const SequenceNumber preserve_deletes_seqnum,const std::atomic<bool> * manual_compaction_paused,const std::shared_ptr<Logger> info_log)33 CompactionIterator::CompactionIterator(
34 InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
35 SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
36 SequenceNumber earliest_write_conflict_snapshot,
37 const SnapshotChecker* snapshot_checker, Env* env,
38 bool report_detailed_time, bool expect_valid_internal_key,
39 CompactionRangeDelAggregator* range_del_agg, const Compaction* compaction,
40 const CompactionFilter* compaction_filter,
41 const std::atomic<bool>* shutting_down,
42 const SequenceNumber preserve_deletes_seqnum,
43 const std::atomic<bool>* manual_compaction_paused,
44 const std::shared_ptr<Logger> info_log)
45 : CompactionIterator(
46 input, cmp, merge_helper, last_sequence, snapshots,
47 earliest_write_conflict_snapshot, snapshot_checker, env,
48 report_detailed_time, expect_valid_internal_key, range_del_agg,
49 std::unique_ptr<CompactionProxy>(
50 compaction ? new CompactionProxy(compaction) : nullptr),
51 compaction_filter, shutting_down, preserve_deletes_seqnum,
52 manual_compaction_paused, info_log) {}
53
CompactionIterator(InternalIterator * input,const Comparator * cmp,MergeHelper * merge_helper,SequenceNumber,std::vector<SequenceNumber> * snapshots,SequenceNumber earliest_write_conflict_snapshot,const SnapshotChecker * snapshot_checker,Env * env,bool report_detailed_time,bool expect_valid_internal_key,CompactionRangeDelAggregator * range_del_agg,std::unique_ptr<CompactionProxy> compaction,const CompactionFilter * compaction_filter,const std::atomic<bool> * shutting_down,const SequenceNumber preserve_deletes_seqnum,const std::atomic<bool> * manual_compaction_paused,const std::shared_ptr<Logger> info_log)54 CompactionIterator::CompactionIterator(
55 InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
56 SequenceNumber /*last_sequence*/, std::vector<SequenceNumber>* snapshots,
57 SequenceNumber earliest_write_conflict_snapshot,
58 const SnapshotChecker* snapshot_checker, Env* env,
59 bool report_detailed_time, bool expect_valid_internal_key,
60 CompactionRangeDelAggregator* range_del_agg,
61 std::unique_ptr<CompactionProxy> compaction,
62 const CompactionFilter* compaction_filter,
63 const std::atomic<bool>* shutting_down,
64 const SequenceNumber preserve_deletes_seqnum,
65 const std::atomic<bool>* manual_compaction_paused,
66 const std::shared_ptr<Logger> info_log)
67 : input_(input),
68 cmp_(cmp),
69 merge_helper_(merge_helper),
70 snapshots_(snapshots),
71 earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
72 snapshot_checker_(snapshot_checker),
73 env_(env),
74 report_detailed_time_(report_detailed_time),
75 expect_valid_internal_key_(expect_valid_internal_key),
76 range_del_agg_(range_del_agg),
77 compaction_(std::move(compaction)),
78 compaction_filter_(compaction_filter),
79 shutting_down_(shutting_down),
80 manual_compaction_paused_(manual_compaction_paused),
81 preserve_deletes_seqnum_(preserve_deletes_seqnum),
82 current_user_key_sequence_(0),
83 current_user_key_snapshot_(0),
84 merge_out_iter_(merge_helper_),
85 current_key_committed_(false),
86 info_log_(info_log) {
87 assert(compaction_filter_ == nullptr || compaction_ != nullptr);
88 assert(snapshots_ != nullptr);
89 bottommost_level_ =
90 compaction_ == nullptr ? false : compaction_->bottommost_level();
91 if (compaction_ != nullptr) {
92 level_ptrs_ = std::vector<size_t>(compaction_->number_levels(), 0);
93 }
94 if (snapshots_->size() == 0) {
95 // optimize for fast path if there are no snapshots
96 visible_at_tip_ = true;
97 earliest_snapshot_iter_ = snapshots_->end();
98 earliest_snapshot_ = kMaxSequenceNumber;
99 latest_snapshot_ = 0;
100 } else {
101 visible_at_tip_ = false;
102 earliest_snapshot_iter_ = snapshots_->begin();
103 earliest_snapshot_ = snapshots_->at(0);
104 latest_snapshot_ = snapshots_->back();
105 }
106 #ifndef NDEBUG
107 // findEarliestVisibleSnapshot assumes this ordering.
108 for (size_t i = 1; i < snapshots_->size(); ++i) {
109 assert(snapshots_->at(i - 1) < snapshots_->at(i));
110 }
111 #endif
112 input_->SetPinnedItersMgr(&pinned_iters_mgr_);
113 TEST_SYNC_POINT_CALLBACK("CompactionIterator:AfterInit", compaction_.get());
114 }
115
~CompactionIterator()116 CompactionIterator::~CompactionIterator() {
117 // input_ Iteartor lifetime is longer than pinned_iters_mgr_ lifetime
118 input_->SetPinnedItersMgr(nullptr);
119 }
120
ResetRecordCounts()121 void CompactionIterator::ResetRecordCounts() {
122 iter_stats_.num_record_drop_user = 0;
123 iter_stats_.num_record_drop_hidden = 0;
124 iter_stats_.num_record_drop_obsolete = 0;
125 iter_stats_.num_record_drop_range_del = 0;
126 iter_stats_.num_range_del_drop_obsolete = 0;
127 iter_stats_.num_optimized_del_drop_obsolete = 0;
128 }
129
SeekToFirst()130 void CompactionIterator::SeekToFirst() {
131 NextFromInput();
132 PrepareOutput();
133 }
134
Next()135 void CompactionIterator::Next() {
136 // If there is a merge output, return it before continuing to process the
137 // input.
138 if (merge_out_iter_.Valid()) {
139 merge_out_iter_.Next();
140
141 // Check if we returned all records of the merge output.
142 if (merge_out_iter_.Valid()) {
143 key_ = merge_out_iter_.key();
144 value_ = merge_out_iter_.value();
145 bool valid_key __attribute__((__unused__));
146 valid_key = ParseInternalKey(key_, &ikey_);
147 // MergeUntil stops when it encounters a corrupt key and does not
148 // include them in the result, so we expect the keys here to be valid.
149 assert(valid_key);
150 if (!valid_key) {
151 ROCKS_LOG_FATAL(info_log_, "Invalid key (%s) in compaction",
152 key_.ToString(true).c_str());
153 }
154
155 // Keep current_key_ in sync.
156 current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
157 key_ = current_key_.GetInternalKey();
158 ikey_.user_key = current_key_.GetUserKey();
159 valid_ = true;
160 } else {
161 // We consumed all pinned merge operands, release pinned iterators
162 pinned_iters_mgr_.ReleasePinnedData();
163 // MergeHelper moves the iterator to the first record after the merged
164 // records, so even though we reached the end of the merge output, we do
165 // not want to advance the iterator.
166 NextFromInput();
167 }
168 } else {
169 // Only advance the input iterator if there is no merge output and the
170 // iterator is not already at the next record.
171 if (!at_next_) {
172 input_->Next();
173 }
174 NextFromInput();
175 }
176
177 if (valid_) {
178 // Record that we've outputted a record for the current key.
179 has_outputted_key_ = true;
180 }
181
182 PrepareOutput();
183 }
184
InvokeFilterIfNeeded(bool * need_skip,Slice * skip_until)185 void CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
186 Slice* skip_until) {
187 if (compaction_filter_ != nullptr &&
188 (ikey_.type == kTypeValue || ikey_.type == kTypeBlobIndex)) {
189 // If the user has specified a compaction filter and the sequence
190 // number is greater than any external snapshot, then invoke the
191 // filter. If the return value of the compaction filter is true,
192 // replace the entry with a deletion marker.
193 CompactionFilter::Decision filter;
194 compaction_filter_value_.clear();
195 compaction_filter_skip_until_.Clear();
196 CompactionFilter::ValueType value_type =
197 ikey_.type == kTypeValue ? CompactionFilter::ValueType::kValue
198 : CompactionFilter::ValueType::kBlobIndex;
199 // Hack: pass internal key to BlobIndexCompactionFilter since it needs
200 // to get sequence number.
201 Slice& filter_key = ikey_.type == kTypeValue ? ikey_.user_key : key_;
202 {
203 StopWatchNano timer(env_, report_detailed_time_);
204 filter = compaction_filter_->FilterV2(
205 compaction_->level(), filter_key, value_type, value_,
206 &compaction_filter_value_, compaction_filter_skip_until_.rep());
207 iter_stats_.total_filter_time +=
208 env_ != nullptr && report_detailed_time_ ? timer.ElapsedNanos() : 0;
209 }
210
211 if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil &&
212 cmp_->Compare(*compaction_filter_skip_until_.rep(), ikey_.user_key) <=
213 0) {
214 // Can't skip to a key smaller than the current one.
215 // Keep the key as per FilterV2 documentation.
216 filter = CompactionFilter::Decision::kKeep;
217 }
218
219 if (filter == CompactionFilter::Decision::kRemove) {
220 // convert the current key to a delete; key_ is pointing into
221 // current_key_ at this point, so updating current_key_ updates key()
222 ikey_.type = kTypeDeletion;
223 current_key_.UpdateInternalKey(ikey_.sequence, kTypeDeletion);
224 // no value associated with delete
225 value_.clear();
226 iter_stats_.num_record_drop_user++;
227 } else if (filter == CompactionFilter::Decision::kChangeValue) {
228 value_ = compaction_filter_value_;
229 } else if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil) {
230 *need_skip = true;
231 compaction_filter_skip_until_.ConvertFromUserKey(kMaxSequenceNumber,
232 kValueTypeForSeek);
233 *skip_until = compaction_filter_skip_until_.Encode();
234 }
235 }
236 }
237
NextFromInput()238 void CompactionIterator::NextFromInput() {
239 at_next_ = false;
240 valid_ = false;
241
242 while (!valid_ && input_->Valid() && !IsPausingManualCompaction() &&
243 !IsShuttingDown()) {
244 key_ = input_->key();
245 value_ = input_->value();
246 iter_stats_.num_input_records++;
247
248 if (!ParseInternalKey(key_, &ikey_)) {
249 // If `expect_valid_internal_key_` is false, return the corrupted key
250 // and let the caller decide what to do with it.
251 // TODO(noetzli): We should have a more elegant solution for this.
252 if (expect_valid_internal_key_) {
253 assert(!"Corrupted internal key not expected.");
254 status_ = Status::Corruption("Corrupted internal key not expected.");
255 break;
256 }
257 key_ = current_key_.SetInternalKey(key_);
258 has_current_user_key_ = false;
259 current_user_key_sequence_ = kMaxSequenceNumber;
260 current_user_key_snapshot_ = 0;
261 iter_stats_.num_input_corrupt_records++;
262 valid_ = true;
263 break;
264 }
265 TEST_SYNC_POINT_CALLBACK("CompactionIterator:ProcessKV", &ikey_);
266
267 // Update input statistics
268 if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion) {
269 iter_stats_.num_input_deletion_records++;
270 }
271 iter_stats_.total_input_raw_key_bytes += key_.size();
272 iter_stats_.total_input_raw_value_bytes += value_.size();
273
274 // If need_skip is true, we should seek the input iterator
275 // to internal key skip_until and continue from there.
276 bool need_skip = false;
277 // Points either into compaction_filter_skip_until_ or into
278 // merge_helper_->compaction_filter_skip_until_.
279 Slice skip_until;
280
281 // Check whether the user key changed. After this if statement current_key_
282 // is a copy of the current input key (maybe converted to a delete by the
283 // compaction filter). ikey_.user_key is pointing to the copy.
284 if (!has_current_user_key_ ||
285 !cmp_->Equal(ikey_.user_key, current_user_key_)) {
286 // First occurrence of this user key
287 // Copy key for output
288 key_ = current_key_.SetInternalKey(key_, &ikey_);
289 current_user_key_ = ikey_.user_key;
290 has_current_user_key_ = true;
291 has_outputted_key_ = false;
292 current_user_key_sequence_ = kMaxSequenceNumber;
293 current_user_key_snapshot_ = 0;
294 current_key_committed_ = KeyCommitted(ikey_.sequence);
295
296 // Apply the compaction filter to the first committed version of the user
297 // key.
298 if (current_key_committed_) {
299 InvokeFilterIfNeeded(&need_skip, &skip_until);
300 }
301 } else {
302 // Update the current key to reflect the new sequence number/type without
303 // copying the user key.
304 // TODO(rven): Compaction filter does not process keys in this path
305 // Need to have the compaction filter process multiple versions
306 // if we have versions on both sides of a snapshot
307 current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
308 key_ = current_key_.GetInternalKey();
309 ikey_.user_key = current_key_.GetUserKey();
310
311 // Note that newer version of a key is ordered before older versions. If a
312 // newer version of a key is committed, so as the older version. No need
313 // to query snapshot_checker_ in that case.
314 if (UNLIKELY(!current_key_committed_)) {
315 assert(snapshot_checker_ != nullptr);
316 current_key_committed_ = KeyCommitted(ikey_.sequence);
317 // Apply the compaction filter to the first committed version of the
318 // user key.
319 if (current_key_committed_) {
320 InvokeFilterIfNeeded(&need_skip, &skip_until);
321 }
322 }
323 }
324
325 if (UNLIKELY(!current_key_committed_)) {
326 assert(snapshot_checker_ != nullptr);
327 valid_ = true;
328 break;
329 }
330
331 // If there are no snapshots, then this kv affect visibility at tip.
332 // Otherwise, search though all existing snapshots to find the earliest
333 // snapshot that is affected by this kv.
334 SequenceNumber last_sequence __attribute__((__unused__));
335 last_sequence = current_user_key_sequence_;
336 current_user_key_sequence_ = ikey_.sequence;
337 SequenceNumber last_snapshot = current_user_key_snapshot_;
338 SequenceNumber prev_snapshot = 0; // 0 means no previous snapshot
339 current_user_key_snapshot_ =
340 visible_at_tip_
341 ? earliest_snapshot_
342 : findEarliestVisibleSnapshot(ikey_.sequence, &prev_snapshot);
343
344 if (need_skip) {
345 // This case is handled below.
346 } else if (clear_and_output_next_key_) {
347 // In the previous iteration we encountered a single delete that we could
348 // not compact out. We will keep this Put, but can drop it's data.
349 // (See Optimization 3, below.)
350 assert(ikey_.type == kTypeValue);
351 if (ikey_.type != kTypeValue) {
352 ROCKS_LOG_FATAL(info_log_,
353 "Unexpected key type %d for compaction output",
354 ikey_.type);
355 }
356 assert(current_user_key_snapshot_ == last_snapshot);
357 if (current_user_key_snapshot_ != last_snapshot) {
358 ROCKS_LOG_FATAL(info_log_,
359 "current_user_key_snapshot_ (%" PRIu64
360 ") != last_snapshot (%" PRIu64 ")",
361 current_user_key_snapshot_, last_snapshot);
362 }
363
364 value_.clear();
365 valid_ = true;
366 clear_and_output_next_key_ = false;
367 } else if (ikey_.type == kTypeSingleDeletion) {
368 // We can compact out a SingleDelete if:
369 // 1) We encounter the corresponding PUT -OR- we know that this key
370 // doesn't appear past this output level
371 // =AND=
372 // 2) We've already returned a record in this snapshot -OR-
373 // there are no earlier earliest_write_conflict_snapshot.
374 //
375 // Rule 1 is needed for SingleDelete correctness. Rule 2 is needed to
376 // allow Transactions to do write-conflict checking (if we compacted away
377 // all keys, then we wouldn't know that a write happened in this
378 // snapshot). If there is no earlier snapshot, then we know that there
379 // are no active transactions that need to know about any writes.
380 //
381 // Optimization 3:
382 // If we encounter a SingleDelete followed by a PUT and Rule 2 is NOT
383 // true, then we must output a SingleDelete. In this case, we will decide
384 // to also output the PUT. While we are compacting less by outputting the
385 // PUT now, hopefully this will lead to better compaction in the future
386 // when Rule 2 is later true (Ie, We are hoping we can later compact out
387 // both the SingleDelete and the Put, while we couldn't if we only
388 // outputted the SingleDelete now).
389 // In this case, we can save space by removing the PUT's value as it will
390 // never be read.
391 //
392 // Deletes and Merges are not supported on the same key that has a
393 // SingleDelete as it is not possible to correctly do any partial
394 // compaction of such a combination of operations. The result of mixing
395 // those operations for a given key is documented as being undefined. So
396 // we can choose how to handle such a combinations of operations. We will
397 // try to compact out as much as we can in these cases.
398 // We will report counts on these anomalous cases.
399
400 // The easiest way to process a SingleDelete during iteration is to peek
401 // ahead at the next key.
402 ParsedInternalKey next_ikey;
403 input_->Next();
404
405 // Check whether the next key exists, is not corrupt, and is the same key
406 // as the single delete.
407 if (input_->Valid() && ParseInternalKey(input_->key(), &next_ikey) &&
408 cmp_->Equal(ikey_.user_key, next_ikey.user_key)) {
409 // Check whether the next key belongs to the same snapshot as the
410 // SingleDelete.
411 if (prev_snapshot == 0 ||
412 DEFINITELY_NOT_IN_SNAPSHOT(next_ikey.sequence, prev_snapshot)) {
413 if (next_ikey.type == kTypeSingleDeletion) {
414 // We encountered two SingleDeletes in a row. This could be due to
415 // unexpected user input.
416 // Skip the first SingleDelete and let the next iteration decide how
417 // to handle the second SingleDelete
418
419 // First SingleDelete has been skipped since we already called
420 // input_->Next().
421 ++iter_stats_.num_record_drop_obsolete;
422 ++iter_stats_.num_single_del_mismatch;
423 } else if (has_outputted_key_ ||
424 DEFINITELY_IN_SNAPSHOT(
425 ikey_.sequence, earliest_write_conflict_snapshot_)) {
426 // Found a matching value, we can drop the single delete and the
427 // value. It is safe to drop both records since we've already
428 // outputted a key in this snapshot, or there is no earlier
429 // snapshot (Rule 2 above).
430
431 // Note: it doesn't matter whether the second key is a Put or if it
432 // is an unexpected Merge or Delete. We will compact it out
433 // either way. We will maintain counts of how many mismatches
434 // happened
435 if (next_ikey.type != kTypeValue &&
436 next_ikey.type != kTypeBlobIndex) {
437 ++iter_stats_.num_single_del_mismatch;
438 }
439
440 ++iter_stats_.num_record_drop_hidden;
441 ++iter_stats_.num_record_drop_obsolete;
442 // Already called input_->Next() once. Call it a second time to
443 // skip past the second key.
444 input_->Next();
445 } else {
446 // Found a matching value, but we cannot drop both keys since
447 // there is an earlier snapshot and we need to leave behind a record
448 // to know that a write happened in this snapshot (Rule 2 above).
449 // Clear the value and output the SingleDelete. (The value will be
450 // outputted on the next iteration.)
451
452 // Setting valid_ to true will output the current SingleDelete
453 valid_ = true;
454
455 // Set up the Put to be outputted in the next iteration.
456 // (Optimization 3).
457 clear_and_output_next_key_ = true;
458 }
459 } else {
460 // We hit the next snapshot without hitting a put, so the iterator
461 // returns the single delete.
462 valid_ = true;
463 }
464 } else {
465 // We are at the end of the input, could not parse the next key, or hit
466 // a different key. The iterator returns the single delete if the key
467 // possibly exists beyond the current output level. We set
468 // has_current_user_key to false so that if the iterator is at the next
469 // key, we do not compare it again against the previous key at the next
470 // iteration. If the next key is corrupt, we return before the
471 // comparison, so the value of has_current_user_key does not matter.
472 has_current_user_key_ = false;
473 if (compaction_ != nullptr && IN_EARLIEST_SNAPSHOT(ikey_.sequence) &&
474 compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key,
475 &level_ptrs_)) {
476 // Key doesn't exist outside of this range.
477 // Can compact out this SingleDelete.
478 ++iter_stats_.num_record_drop_obsolete;
479 ++iter_stats_.num_single_del_fallthru;
480 if (!bottommost_level_) {
481 ++iter_stats_.num_optimized_del_drop_obsolete;
482 }
483 } else {
484 // Output SingleDelete
485 valid_ = true;
486 }
487 }
488
489 if (valid_) {
490 at_next_ = true;
491 }
492 } else if (last_snapshot == current_user_key_snapshot_ ||
493 (last_snapshot > 0 &&
494 last_snapshot < current_user_key_snapshot_)) {
495 // If the earliest snapshot is which this key is visible in
496 // is the same as the visibility of a previous instance of the
497 // same key, then this kv is not visible in any snapshot.
498 // Hidden by an newer entry for same user key
499 //
500 // Note: Dropping this key will not affect TransactionDB write-conflict
501 // checking since there has already been a record returned for this key
502 // in this snapshot.
503 assert(last_sequence >= current_user_key_sequence_);
504 if (last_sequence < current_user_key_sequence_) {
505 ROCKS_LOG_FATAL(info_log_,
506 "last_sequence (%" PRIu64
507 ") < current_user_key_sequence_ (%" PRIu64 ")",
508 last_sequence, current_user_key_sequence_);
509 }
510
511 ++iter_stats_.num_record_drop_hidden; // (A)
512 input_->Next();
513 } else if (compaction_ != nullptr && ikey_.type == kTypeDeletion &&
514 IN_EARLIEST_SNAPSHOT(ikey_.sequence) &&
515 ikeyNotNeededForIncrementalSnapshot() &&
516 compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key,
517 &level_ptrs_)) {
518 // TODO(noetzli): This is the only place where we use compaction_
519 // (besides the constructor). We should probably get rid of this
520 // dependency and find a way to do similar filtering during flushes.
521 //
522 // For this user key:
523 // (1) there is no data in higher levels
524 // (2) data in lower levels will have larger sequence numbers
525 // (3) data in layers that are being compacted here and have
526 // smaller sequence numbers will be dropped in the next
527 // few iterations of this loop (by rule (A) above).
528 // Therefore this deletion marker is obsolete and can be dropped.
529 //
530 // Note: Dropping this Delete will not affect TransactionDB
531 // write-conflict checking since it is earlier than any snapshot.
532 //
533 // It seems that we can also drop deletion later than earliest snapshot
534 // given that:
535 // (1) The deletion is earlier than earliest_write_conflict_snapshot, and
536 // (2) No value exist earlier than the deletion.
537 ++iter_stats_.num_record_drop_obsolete;
538 if (!bottommost_level_) {
539 ++iter_stats_.num_optimized_del_drop_obsolete;
540 }
541 input_->Next();
542 } else if ((ikey_.type == kTypeDeletion) && bottommost_level_ &&
543 ikeyNotNeededForIncrementalSnapshot()) {
544 // Handle the case where we have a delete key at the bottom most level
545 // We can skip outputting the key iff there are no subsequent puts for this
546 // key
547 ParsedInternalKey next_ikey;
548 input_->Next();
549 // Skip over all versions of this key that happen to occur in the same snapshot
550 // range as the delete
551 while (input_->Valid() && ParseInternalKey(input_->key(), &next_ikey) &&
552 cmp_->Equal(ikey_.user_key, next_ikey.user_key) &&
553 (prev_snapshot == 0 ||
554 DEFINITELY_NOT_IN_SNAPSHOT(next_ikey.sequence, prev_snapshot))) {
555 input_->Next();
556 }
557 // If you find you still need to output a row with this key, we need to output the
558 // delete too
559 if (input_->Valid() && ParseInternalKey(input_->key(), &next_ikey) &&
560 cmp_->Equal(ikey_.user_key, next_ikey.user_key)) {
561 valid_ = true;
562 at_next_ = true;
563 }
564 } else if (ikey_.type == kTypeMerge) {
565 if (!merge_helper_->HasOperator()) {
566 status_ = Status::InvalidArgument(
567 "merge_operator is not properly initialized.");
568 return;
569 }
570
571 pinned_iters_mgr_.StartPinning();
572 // We know the merge type entry is not hidden, otherwise we would
573 // have hit (A)
574 // We encapsulate the merge related state machine in a different
575 // object to minimize change to the existing flow.
576 Status s = merge_helper_->MergeUntil(input_, range_del_agg_,
577 prev_snapshot, bottommost_level_);
578 merge_out_iter_.SeekToFirst();
579
580 if (!s.ok() && !s.IsMergeInProgress()) {
581 status_ = s;
582 return;
583 } else if (merge_out_iter_.Valid()) {
584 // NOTE: key, value, and ikey_ refer to old entries.
585 // These will be correctly set below.
586 key_ = merge_out_iter_.key();
587 value_ = merge_out_iter_.value();
588 bool valid_key __attribute__((__unused__));
589 valid_key = ParseInternalKey(key_, &ikey_);
590 // MergeUntil stops when it encounters a corrupt key and does not
591 // include them in the result, so we expect the keys here to valid.
592 assert(valid_key);
593 if (!valid_key) {
594 ROCKS_LOG_FATAL(info_log_, "Invalid key (%s) in compaction",
595 key_.ToString(true).c_str());
596 }
597 // Keep current_key_ in sync.
598 current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
599 key_ = current_key_.GetInternalKey();
600 ikey_.user_key = current_key_.GetUserKey();
601 valid_ = true;
602 } else {
603 // all merge operands were filtered out. reset the user key, since the
604 // batch consumed by the merge operator should not shadow any keys
605 // coming after the merges
606 has_current_user_key_ = false;
607 pinned_iters_mgr_.ReleasePinnedData();
608
609 if (merge_helper_->FilteredUntil(&skip_until)) {
610 need_skip = true;
611 }
612 }
613 } else {
614 // 1. new user key -OR-
615 // 2. different snapshot stripe
616 bool should_delete = range_del_agg_->ShouldDelete(
617 key_, RangeDelPositioningMode::kForwardTraversal);
618 if (should_delete) {
619 ++iter_stats_.num_record_drop_hidden;
620 ++iter_stats_.num_record_drop_range_del;
621 input_->Next();
622 } else {
623 valid_ = true;
624 }
625 }
626
627 if (need_skip) {
628 input_->Seek(skip_until);
629 }
630 }
631
632 if (!valid_ && IsShuttingDown()) {
633 status_ = Status::ShutdownInProgress();
634 }
635
636 if (IsPausingManualCompaction()) {
637 status_ = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
638 }
639 }
640
PrepareOutput()641 void CompactionIterator::PrepareOutput() {
642 if (valid_) {
643 if (compaction_filter_ && ikey_.type == kTypeBlobIndex) {
644 const auto blob_decision = compaction_filter_->PrepareBlobOutput(
645 user_key(), value_, &compaction_filter_value_);
646
647 if (blob_decision == CompactionFilter::BlobDecision::kCorruption) {
648 status_ = Status::Corruption(
649 "Corrupted blob reference encountered during GC");
650 valid_ = false;
651 } else if (blob_decision == CompactionFilter::BlobDecision::kIOError) {
652 status_ = Status::IOError("Could not relocate blob during GC");
653 valid_ = false;
654 } else if (blob_decision ==
655 CompactionFilter::BlobDecision::kChangeValue) {
656 value_ = compaction_filter_value_;
657 }
658 }
659
660 // Zeroing out the sequence number leads to better compression.
661 // If this is the bottommost level (no files in lower levels)
662 // and the earliest snapshot is larger than this seqno
663 // and the userkey differs from the last userkey in compaction
664 // then we can squash the seqno to zero.
665 //
666 // This is safe for TransactionDB write-conflict checking since transactions
667 // only care about sequence number larger than any active snapshots.
668 //
669 // Can we do the same for levels above bottom level as long as
670 // KeyNotExistsBeyondOutputLevel() return true?
671 if (valid_ && compaction_ != nullptr &&
672 !compaction_->allow_ingest_behind() &&
673 ikeyNotNeededForIncrementalSnapshot() && bottommost_level_ &&
674 IN_EARLIEST_SNAPSHOT(ikey_.sequence) && ikey_.type != kTypeMerge) {
675 assert(ikey_.type != kTypeDeletion && ikey_.type != kTypeSingleDeletion);
676 if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion) {
677 ROCKS_LOG_FATAL(info_log_,
678 "Unexpected key type %d for seq-zero optimization",
679 ikey_.type);
680 }
681 ikey_.sequence = 0;
682 current_key_.UpdateInternalKey(0, ikey_.type);
683 }
684 }
685 }
686
findEarliestVisibleSnapshot(SequenceNumber in,SequenceNumber * prev_snapshot)687 inline SequenceNumber CompactionIterator::findEarliestVisibleSnapshot(
688 SequenceNumber in, SequenceNumber* prev_snapshot) {
689 assert(snapshots_->size());
690 if (snapshots_->size() == 0) {
691 ROCKS_LOG_FATAL(info_log_,
692 "No snapshot left in findEarliestVisibleSnapshot");
693 }
694 auto snapshots_iter = std::lower_bound(
695 snapshots_->begin(), snapshots_->end(), in);
696 if (snapshots_iter == snapshots_->begin()) {
697 *prev_snapshot = 0;
698 } else {
699 *prev_snapshot = *std::prev(snapshots_iter);
700 assert(*prev_snapshot < in);
701 if (*prev_snapshot >= in) {
702 ROCKS_LOG_FATAL(info_log_,
703 "*prev_snapshot >= in in findEarliestVisibleSnapshot");
704 }
705 }
706 if (snapshot_checker_ == nullptr) {
707 return snapshots_iter != snapshots_->end()
708 ? *snapshots_iter : kMaxSequenceNumber;
709 }
710 bool has_released_snapshot = !released_snapshots_.empty();
711 for (; snapshots_iter != snapshots_->end(); ++snapshots_iter) {
712 auto cur = *snapshots_iter;
713 assert(in <= cur);
714 if (in > cur) {
715 ROCKS_LOG_FATAL(info_log_, "in > cur in findEarliestVisibleSnapshot");
716 }
717 // Skip if cur is in released_snapshots.
718 if (has_released_snapshot && released_snapshots_.count(cur) > 0) {
719 continue;
720 }
721 auto res = snapshot_checker_->CheckInSnapshot(in, cur);
722 if (res == SnapshotCheckerResult::kInSnapshot) {
723 return cur;
724 } else if (res == SnapshotCheckerResult::kSnapshotReleased) {
725 released_snapshots_.insert(cur);
726 }
727 *prev_snapshot = cur;
728 }
729 return kMaxSequenceNumber;
730 }
731
732 // used in 2 places - prevents deletion markers to be dropped if they may be
733 // needed and disables seqnum zero-out in PrepareOutput for recent keys.
ikeyNotNeededForIncrementalSnapshot()734 inline bool CompactionIterator::ikeyNotNeededForIncrementalSnapshot() {
735 return (!compaction_->preserve_deletes()) ||
736 (ikey_.sequence < preserve_deletes_seqnum_);
737 }
738
IsInEarliestSnapshot(SequenceNumber sequence)739 bool CompactionIterator::IsInEarliestSnapshot(SequenceNumber sequence) {
740 assert(snapshot_checker_ != nullptr);
741 bool pre_condition = (earliest_snapshot_ == kMaxSequenceNumber ||
742 (earliest_snapshot_iter_ != snapshots_->end() &&
743 *earliest_snapshot_iter_ == earliest_snapshot_));
744 assert(pre_condition);
745 if (!pre_condition) {
746 ROCKS_LOG_FATAL(info_log_,
747 "Pre-Condition is not hold in IsInEarliestSnapshot");
748 }
749 auto in_snapshot =
750 snapshot_checker_->CheckInSnapshot(sequence, earliest_snapshot_);
751 while (UNLIKELY(in_snapshot == SnapshotCheckerResult::kSnapshotReleased)) {
752 // Avoid the the current earliest_snapshot_ being return as
753 // earliest visible snapshot for the next value. So if a value's sequence
754 // is zero-ed out by PrepareOutput(), the next value will be compact out.
755 released_snapshots_.insert(earliest_snapshot_);
756 earliest_snapshot_iter_++;
757
758 if (earliest_snapshot_iter_ == snapshots_->end()) {
759 earliest_snapshot_ = kMaxSequenceNumber;
760 } else {
761 earliest_snapshot_ = *earliest_snapshot_iter_;
762 }
763 in_snapshot =
764 snapshot_checker_->CheckInSnapshot(sequence, earliest_snapshot_);
765 }
766 assert(in_snapshot != SnapshotCheckerResult::kSnapshotReleased);
767 if (in_snapshot == SnapshotCheckerResult::kSnapshotReleased) {
768 ROCKS_LOG_FATAL(info_log_,
769 "Unexpected released snapshot in IsInEarliestSnapshot");
770 }
771 return in_snapshot == SnapshotCheckerResult::kInSnapshot;
772 }
773
774 } // namespace rocksdb
775