1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6
7 #include <string>
8 #include <vector>
9
10 #include "db/compaction/compaction_iterator.h"
11 #include "port/port.h"
12 #include "test_util/testharness.h"
13 #include "test_util/testutil.h"
14 #include "util/string_util.h"
15 #include "utilities/merge_operators.h"
16
17 namespace ROCKSDB_NAMESPACE {
18
19 // Expects no merging attempts.
20 class NoMergingMergeOp : public MergeOperator {
21 public:
FullMergeV2(const MergeOperationInput &,MergeOperationOutput *) const22 bool FullMergeV2(const MergeOperationInput& /*merge_in*/,
23 MergeOperationOutput* /*merge_out*/) const override {
24 ADD_FAILURE();
25 return false;
26 }
PartialMergeMulti(const Slice &,const std::deque<Slice> &,std::string *,Logger *) const27 bool PartialMergeMulti(const Slice& /*key*/,
28 const std::deque<Slice>& /*operand_list*/,
29 std::string* /*new_value*/,
30 Logger* /*logger*/) const override {
31 ADD_FAILURE();
32 return false;
33 }
Name() const34 const char* Name() const override {
35 return "CompactionIteratorTest NoMergingMergeOp";
36 }
37 };
38
39 // Compaction filter that gets stuck when it sees a particular key,
40 // then gets unstuck when told to.
41 // Always returns Decision::kRemove.
42 class StallingFilter : public CompactionFilter {
43 public:
FilterV2(int,const Slice & key,ValueType,const Slice &,std::string *,std::string *) const44 Decision FilterV2(int /*level*/, const Slice& key, ValueType /*type*/,
45 const Slice& /*existing_value*/, std::string* /*new_value*/,
46 std::string* /*skip_until*/) const override {
47 int k = std::atoi(key.ToString().c_str());
48 last_seen.store(k);
49 while (k >= stall_at.load()) {
50 std::this_thread::yield();
51 }
52 return Decision::kRemove;
53 }
54
Name() const55 const char* Name() const override {
56 return "CompactionIteratorTest StallingFilter";
57 }
58
59 // Wait until the filter sees a key >= k and stalls at that key.
60 // If `exact`, asserts that the seen key is equal to k.
WaitForStall(int k,bool exact=true)61 void WaitForStall(int k, bool exact = true) {
62 stall_at.store(k);
63 while (last_seen.load() < k) {
64 std::this_thread::yield();
65 }
66 if (exact) {
67 EXPECT_EQ(k, last_seen.load());
68 }
69 }
70
71 // Filter will stall on key >= stall_at. Advance stall_at to unstall.
72 mutable std::atomic<int> stall_at{0};
73 // Last key the filter was called with.
74 mutable std::atomic<int> last_seen{0};
75 };
76
77 // Compaction filter that filter out all keys.
78 class FilterAllKeysCompactionFilter : public CompactionFilter {
79 public:
FilterV2(int,const Slice &,ValueType,const Slice &,std::string *,std::string *) const80 Decision FilterV2(int /*level*/, const Slice& /*key*/, ValueType /*type*/,
81 const Slice& /*existing_value*/, std::string* /*new_value*/,
82 std::string* /*skip_until*/) const override {
83 return Decision::kRemove;
84 }
85
Name() const86 const char* Name() const override { return "AllKeysCompactionFilter"; }
87 };
88
89 class LoggingForwardVectorIterator : public InternalIterator {
90 public:
91 struct Action {
92 enum class Type {
93 SEEK_TO_FIRST,
94 SEEK,
95 NEXT,
96 };
97
98 Type type;
99 std::string arg;
100
ActionROCKSDB_NAMESPACE::LoggingForwardVectorIterator::Action101 explicit Action(Type _type, std::string _arg = "")
102 : type(_type), arg(_arg) {}
103
operator ==ROCKSDB_NAMESPACE::LoggingForwardVectorIterator::Action104 bool operator==(const Action& rhs) const {
105 return std::tie(type, arg) == std::tie(rhs.type, rhs.arg);
106 }
107 };
108
LoggingForwardVectorIterator(const std::vector<std::string> & keys,const std::vector<std::string> & values)109 LoggingForwardVectorIterator(const std::vector<std::string>& keys,
110 const std::vector<std::string>& values)
111 : keys_(keys), values_(values), current_(keys.size()) {
112 assert(keys_.size() == values_.size());
113 }
114
Valid() const115 bool Valid() const override { return current_ < keys_.size(); }
116
SeekToFirst()117 void SeekToFirst() override {
118 log.emplace_back(Action::Type::SEEK_TO_FIRST);
119 current_ = 0;
120 }
SeekToLast()121 void SeekToLast() override { assert(false); }
122
Seek(const Slice & target)123 void Seek(const Slice& target) override {
124 log.emplace_back(Action::Type::SEEK, target.ToString());
125 current_ = std::lower_bound(keys_.begin(), keys_.end(), target.ToString()) -
126 keys_.begin();
127 }
128
SeekForPrev(const Slice &)129 void SeekForPrev(const Slice& /*target*/) override { assert(false); }
130
Next()131 void Next() override {
132 assert(Valid());
133 log.emplace_back(Action::Type::NEXT);
134 current_++;
135 }
Prev()136 void Prev() override { assert(false); }
137
key() const138 Slice key() const override {
139 assert(Valid());
140 return Slice(keys_[current_]);
141 }
value() const142 Slice value() const override {
143 assert(Valid());
144 return Slice(values_[current_]);
145 }
146
status() const147 Status status() const override { return Status::OK(); }
148
149 std::vector<Action> log;
150
151 private:
152 std::vector<std::string> keys_;
153 std::vector<std::string> values_;
154 size_t current_;
155 };
156
157 class FakeCompaction : public CompactionIterator::CompactionProxy {
158 public:
level() const159 int level() const override { return 0; }
160
KeyNotExistsBeyondOutputLevel(const Slice &,std::vector<size_t> *) const161 bool KeyNotExistsBeyondOutputLevel(
162 const Slice& /*user_key*/,
163 std::vector<size_t>* /*level_ptrs*/) const override {
164 return is_bottommost_level || key_not_exists_beyond_output_level;
165 }
166
bottommost_level() const167 bool bottommost_level() const override { return is_bottommost_level; }
168
number_levels() const169 int number_levels() const override { return 1; }
170
GetLargestUserKey() const171 Slice GetLargestUserKey() const override {
172 return "\xff\xff\xff\xff\xff\xff\xff\xff\xff";
173 }
174
allow_ingest_behind() const175 bool allow_ingest_behind() const override { return is_allow_ingest_behind; }
176
preserve_deletes() const177 bool preserve_deletes() const override { return false; }
178
enable_blob_garbage_collection() const179 bool enable_blob_garbage_collection() const override { return false; }
180
blob_garbage_collection_age_cutoff() const181 double blob_garbage_collection_age_cutoff() const override { return 0.0; }
182
input_version() const183 Version* input_version() const override { return nullptr; }
184
185 bool key_not_exists_beyond_output_level = false;
186
187 bool is_bottommost_level = false;
188
189 bool is_allow_ingest_behind = false;
190 };
191
192 // A simplified snapshot checker which assumes each snapshot has a global
193 // last visible sequence.
194 class TestSnapshotChecker : public SnapshotChecker {
195 public:
TestSnapshotChecker(SequenceNumber last_committed_sequence,const std::unordered_map<SequenceNumber,SequenceNumber> & snapshots={{}})196 explicit TestSnapshotChecker(
197 SequenceNumber last_committed_sequence,
198 const std::unordered_map<SequenceNumber, SequenceNumber>& snapshots = {{}})
199 : last_committed_sequence_(last_committed_sequence),
200 snapshots_(snapshots) {}
201
CheckInSnapshot(SequenceNumber seq,SequenceNumber snapshot_seq) const202 SnapshotCheckerResult CheckInSnapshot(
203 SequenceNumber seq, SequenceNumber snapshot_seq) const override {
204 if (snapshot_seq == kMaxSequenceNumber) {
205 return seq <= last_committed_sequence_
206 ? SnapshotCheckerResult::kInSnapshot
207 : SnapshotCheckerResult::kNotInSnapshot;
208 }
209 assert(snapshots_.count(snapshot_seq) > 0);
210 return seq <= snapshots_.at(snapshot_seq)
211 ? SnapshotCheckerResult::kInSnapshot
212 : SnapshotCheckerResult::kNotInSnapshot;
213 }
214
215 private:
216 SequenceNumber last_committed_sequence_;
217 // A map of valid snapshot to last visible sequence to the snapshot.
218 std::unordered_map<SequenceNumber, SequenceNumber> snapshots_;
219 };
220
221 // Test param:
222 // bool: whether to pass snapshot_checker to compaction iterator.
223 class CompactionIteratorTest : public testing::TestWithParam<bool> {
224 public:
CompactionIteratorTest()225 CompactionIteratorTest()
226 : cmp_(BytewiseComparator()), icmp_(cmp_), snapshots_({}) {}
227
CompactionIteratorTest(const Comparator * ucmp)228 explicit CompactionIteratorTest(const Comparator* ucmp)
229 : cmp_(ucmp), icmp_(cmp_), snapshots_({}) {}
230
InitIterators(const std::vector<std::string> & ks,const std::vector<std::string> & vs,const std::vector<std::string> & range_del_ks,const std::vector<std::string> & range_del_vs,SequenceNumber last_sequence,SequenceNumber last_committed_sequence=kMaxSequenceNumber,MergeOperator * merge_op=nullptr,CompactionFilter * filter=nullptr,bool bottommost_level=false,SequenceNumber earliest_write_conflict_snapshot=kMaxSequenceNumber,bool key_not_exists_beyond_output_level=false,const std::string * full_history_ts_low=nullptr)231 void InitIterators(
232 const std::vector<std::string>& ks, const std::vector<std::string>& vs,
233 const std::vector<std::string>& range_del_ks,
234 const std::vector<std::string>& range_del_vs,
235 SequenceNumber last_sequence,
236 SequenceNumber last_committed_sequence = kMaxSequenceNumber,
237 MergeOperator* merge_op = nullptr, CompactionFilter* filter = nullptr,
238 bool bottommost_level = false,
239 SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber,
240 bool key_not_exists_beyond_output_level = false,
241 const std::string* full_history_ts_low = nullptr) {
242 std::unique_ptr<InternalIterator> unfragmented_range_del_iter(
243 new test::VectorIterator(range_del_ks, range_del_vs));
244 auto tombstone_list = std::make_shared<FragmentedRangeTombstoneList>(
245 std::move(unfragmented_range_del_iter), icmp_);
246 std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
247 new FragmentedRangeTombstoneIterator(tombstone_list, icmp_,
248 kMaxSequenceNumber));
249 range_del_agg_.reset(new CompactionRangeDelAggregator(&icmp_, snapshots_));
250 range_del_agg_->AddTombstones(std::move(range_del_iter));
251
252 std::unique_ptr<CompactionIterator::CompactionProxy> compaction;
253 if (filter || bottommost_level || key_not_exists_beyond_output_level) {
254 compaction_proxy_ = new FakeCompaction();
255 compaction_proxy_->is_bottommost_level = bottommost_level;
256 compaction_proxy_->is_allow_ingest_behind = AllowIngestBehind();
257 compaction_proxy_->key_not_exists_beyond_output_level =
258 key_not_exists_beyond_output_level;
259 compaction.reset(compaction_proxy_);
260 }
261 bool use_snapshot_checker = UseSnapshotChecker() || GetParam();
262 if (use_snapshot_checker || last_committed_sequence < kMaxSequenceNumber) {
263 snapshot_checker_.reset(
264 new TestSnapshotChecker(last_committed_sequence, snapshot_map_));
265 }
266 merge_helper_.reset(
267 new MergeHelper(Env::Default(), cmp_, merge_op, filter, nullptr, false,
268 0 /*latest_snapshot*/, snapshot_checker_.get(),
269 0 /*level*/, nullptr /*statistics*/, &shutting_down_));
270
271 if (c_iter_) {
272 // Since iter_ is still used in ~CompactionIterator(), we call
273 // ~CompactionIterator() first.
274 c_iter_.reset();
275 }
276 iter_.reset(new LoggingForwardVectorIterator(ks, vs));
277 iter_->SeekToFirst();
278 c_iter_.reset(new CompactionIterator(
279 iter_.get(), cmp_, merge_helper_.get(), last_sequence, &snapshots_,
280 earliest_write_conflict_snapshot, snapshot_checker_.get(),
281 Env::Default(), false /* report_detailed_time */, false,
282 range_del_agg_.get(), nullptr /* blob_file_builder */,
283 true /*allow_data_in_errors*/, std::move(compaction), filter,
284 &shutting_down_, /*preserve_deletes_seqnum=*/0,
285 /*manual_compaction_paused=*/nullptr, /*info_log=*/nullptr,
286 full_history_ts_low));
287 }
288
AddSnapshot(SequenceNumber snapshot,SequenceNumber last_visible_seq=kMaxSequenceNumber)289 void AddSnapshot(SequenceNumber snapshot,
290 SequenceNumber last_visible_seq = kMaxSequenceNumber) {
291 snapshots_.push_back(snapshot);
292 snapshot_map_[snapshot] = last_visible_seq;
293 }
294
UseSnapshotChecker() const295 virtual bool UseSnapshotChecker() const { return false; }
296
AllowIngestBehind() const297 virtual bool AllowIngestBehind() const { return false; }
298
RunTest(const std::vector<std::string> & input_keys,const std::vector<std::string> & input_values,const std::vector<std::string> & expected_keys,const std::vector<std::string> & expected_values,SequenceNumber last_committed_seq=kMaxSequenceNumber,MergeOperator * merge_operator=nullptr,CompactionFilter * compaction_filter=nullptr,bool bottommost_level=false,SequenceNumber earliest_write_conflict_snapshot=kMaxSequenceNumber,bool key_not_exists_beyond_output_level=false,const std::string * full_history_ts_low=nullptr)299 void RunTest(
300 const std::vector<std::string>& input_keys,
301 const std::vector<std::string>& input_values,
302 const std::vector<std::string>& expected_keys,
303 const std::vector<std::string>& expected_values,
304 SequenceNumber last_committed_seq = kMaxSequenceNumber,
305 MergeOperator* merge_operator = nullptr,
306 CompactionFilter* compaction_filter = nullptr,
307 bool bottommost_level = false,
308 SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber,
309 bool key_not_exists_beyond_output_level = false,
310 const std::string* full_history_ts_low = nullptr) {
311 InitIterators(input_keys, input_values, {}, {}, kMaxSequenceNumber,
312 last_committed_seq, merge_operator, compaction_filter,
313 bottommost_level, earliest_write_conflict_snapshot,
314 key_not_exists_beyond_output_level, full_history_ts_low);
315 c_iter_->SeekToFirst();
316 for (size_t i = 0; i < expected_keys.size(); i++) {
317 std::string info = "i = " + ToString(i);
318 ASSERT_TRUE(c_iter_->Valid()) << info;
319 ASSERT_OK(c_iter_->status()) << info;
320 ASSERT_EQ(expected_keys[i], c_iter_->key().ToString()) << info;
321 ASSERT_EQ(expected_values[i], c_iter_->value().ToString()) << info;
322 c_iter_->Next();
323 }
324 ASSERT_OK(c_iter_->status());
325 ASSERT_FALSE(c_iter_->Valid());
326 }
327
ClearSnapshots()328 void ClearSnapshots() {
329 snapshots_.clear();
330 snapshot_map_.clear();
331 }
332
333 const Comparator* cmp_;
334 const InternalKeyComparator icmp_;
335 std::vector<SequenceNumber> snapshots_;
336 // A map of valid snapshot to last visible sequence to the snapshot.
337 std::unordered_map<SequenceNumber, SequenceNumber> snapshot_map_;
338 std::unique_ptr<MergeHelper> merge_helper_;
339 std::unique_ptr<LoggingForwardVectorIterator> iter_;
340 std::unique_ptr<CompactionIterator> c_iter_;
341 std::unique_ptr<CompactionRangeDelAggregator> range_del_agg_;
342 std::unique_ptr<SnapshotChecker> snapshot_checker_;
343 std::atomic<bool> shutting_down_{false};
344 FakeCompaction* compaction_proxy_;
345 };
346
347 // It is possible that the output of the compaction iterator is empty even if
348 // the input is not.
TEST_P(CompactionIteratorTest,EmptyResult)349 TEST_P(CompactionIteratorTest, EmptyResult) {
350 InitIterators({test::KeyStr("a", 5, kTypeSingleDeletion),
351 test::KeyStr("a", 3, kTypeValue)},
352 {"", "val"}, {}, {}, 5);
353 c_iter_->SeekToFirst();
354 ASSERT_OK(c_iter_->status());
355 ASSERT_FALSE(c_iter_->Valid());
356 }
357
358 // If there is a corruption after a single deletion, the corrupted key should
359 // be preserved.
TEST_P(CompactionIteratorTest,CorruptionAfterSingleDeletion)360 TEST_P(CompactionIteratorTest, CorruptionAfterSingleDeletion) {
361 InitIterators({test::KeyStr("a", 5, kTypeSingleDeletion),
362 test::KeyStr("a", 3, kTypeValue, true),
363 test::KeyStr("b", 10, kTypeValue)},
364 {"", "val", "val2"}, {}, {}, 10);
365 c_iter_->SeekToFirst();
366 ASSERT_TRUE(c_iter_->Valid());
367 ASSERT_EQ(test::KeyStr("a", 5, kTypeSingleDeletion),
368 c_iter_->key().ToString());
369 c_iter_->Next();
370 ASSERT_TRUE(c_iter_->Valid());
371 ASSERT_EQ(test::KeyStr("a", 3, kTypeValue, true), c_iter_->key().ToString());
372 c_iter_->Next();
373 ASSERT_TRUE(c_iter_->Valid());
374 ASSERT_EQ(test::KeyStr("b", 10, kTypeValue), c_iter_->key().ToString());
375 c_iter_->Next();
376 ASSERT_OK(c_iter_->status());
377 ASSERT_FALSE(c_iter_->Valid());
378 }
379
TEST_P(CompactionIteratorTest,SimpleRangeDeletion)380 TEST_P(CompactionIteratorTest, SimpleRangeDeletion) {
381 InitIterators({test::KeyStr("morning", 5, kTypeValue),
382 test::KeyStr("morning", 2, kTypeValue),
383 test::KeyStr("night", 3, kTypeValue)},
384 {"zao", "zao", "wan"},
385 {test::KeyStr("ma", 4, kTypeRangeDeletion)}, {"mz"}, 5);
386 c_iter_->SeekToFirst();
387 ASSERT_TRUE(c_iter_->Valid());
388 ASSERT_EQ(test::KeyStr("morning", 5, kTypeValue), c_iter_->key().ToString());
389 c_iter_->Next();
390 ASSERT_TRUE(c_iter_->Valid());
391 ASSERT_EQ(test::KeyStr("night", 3, kTypeValue), c_iter_->key().ToString());
392 c_iter_->Next();
393 ASSERT_OK(c_iter_->status());
394 ASSERT_FALSE(c_iter_->Valid());
395 }
396
TEST_P(CompactionIteratorTest,RangeDeletionWithSnapshots)397 TEST_P(CompactionIteratorTest, RangeDeletionWithSnapshots) {
398 AddSnapshot(10);
399 std::vector<std::string> ks1;
400 ks1.push_back(test::KeyStr("ma", 28, kTypeRangeDeletion));
401 std::vector<std::string> vs1{"mz"};
402 std::vector<std::string> ks2{test::KeyStr("morning", 15, kTypeValue),
403 test::KeyStr("morning", 5, kTypeValue),
404 test::KeyStr("night", 40, kTypeValue),
405 test::KeyStr("night", 20, kTypeValue)};
406 std::vector<std::string> vs2{"zao 15", "zao 5", "wan 40", "wan 20"};
407 InitIterators(ks2, vs2, ks1, vs1, 40);
408 c_iter_->SeekToFirst();
409 ASSERT_TRUE(c_iter_->Valid());
410 ASSERT_EQ(test::KeyStr("morning", 5, kTypeValue), c_iter_->key().ToString());
411 c_iter_->Next();
412 ASSERT_TRUE(c_iter_->Valid());
413 ASSERT_EQ(test::KeyStr("night", 40, kTypeValue), c_iter_->key().ToString());
414 c_iter_->Next();
415 ASSERT_OK(c_iter_->status());
416 ASSERT_FALSE(c_iter_->Valid());
417 }
418
TEST_P(CompactionIteratorTest,CompactionFilterSkipUntil)419 TEST_P(CompactionIteratorTest, CompactionFilterSkipUntil) {
420 class Filter : public CompactionFilter {
421 Decision FilterV2(int /*level*/, const Slice& key, ValueType t,
422 const Slice& existing_value, std::string* /*new_value*/,
423 std::string* skip_until) const override {
424 std::string k = key.ToString();
425 std::string v = existing_value.ToString();
426 // See InitIterators() call below for the sequence of keys and their
427 // filtering decisions. Here we closely assert that compaction filter is
428 // called with the expected keys and only them, and with the right values.
429 if (k == "a") {
430 EXPECT_EQ(ValueType::kValue, t);
431 EXPECT_EQ("av50", v);
432 return Decision::kKeep;
433 }
434 if (k == "b") {
435 EXPECT_EQ(ValueType::kValue, t);
436 EXPECT_EQ("bv60", v);
437 *skip_until = "d+";
438 return Decision::kRemoveAndSkipUntil;
439 }
440 if (k == "e") {
441 EXPECT_EQ(ValueType::kMergeOperand, t);
442 EXPECT_EQ("em71", v);
443 return Decision::kKeep;
444 }
445 if (k == "f") {
446 if (v == "fm65") {
447 EXPECT_EQ(ValueType::kMergeOperand, t);
448 *skip_until = "f";
449 } else {
450 EXPECT_EQ("fm30", v);
451 EXPECT_EQ(ValueType::kMergeOperand, t);
452 *skip_until = "g+";
453 }
454 return Decision::kRemoveAndSkipUntil;
455 }
456 if (k == "h") {
457 EXPECT_EQ(ValueType::kValue, t);
458 EXPECT_EQ("hv91", v);
459 return Decision::kKeep;
460 }
461 if (k == "i") {
462 EXPECT_EQ(ValueType::kMergeOperand, t);
463 EXPECT_EQ("im95", v);
464 *skip_until = "z";
465 return Decision::kRemoveAndSkipUntil;
466 }
467 ADD_FAILURE();
468 return Decision::kKeep;
469 }
470
471 const char* Name() const override {
472 return "CompactionIteratorTest.CompactionFilterSkipUntil::Filter";
473 }
474 };
475
476 NoMergingMergeOp merge_op;
477 Filter filter;
478 InitIterators(
479 {test::KeyStr("a", 50, kTypeValue), // keep
480 test::KeyStr("a", 45, kTypeMerge),
481 test::KeyStr("b", 60, kTypeValue), // skip to "d+"
482 test::KeyStr("b", 40, kTypeValue), test::KeyStr("c", 35, kTypeValue),
483 test::KeyStr("d", 70, kTypeMerge),
484 test::KeyStr("e", 71, kTypeMerge), // keep
485 test::KeyStr("f", 65, kTypeMerge), // skip to "f", aka keep
486 test::KeyStr("f", 30, kTypeMerge), // skip to "g+"
487 test::KeyStr("f", 25, kTypeValue), test::KeyStr("g", 90, kTypeValue),
488 test::KeyStr("h", 91, kTypeValue), // keep
489 test::KeyStr("i", 95, kTypeMerge), // skip to "z"
490 test::KeyStr("j", 99, kTypeValue)},
491 {"av50", "am45", "bv60", "bv40", "cv35", "dm70", "em71", "fm65", "fm30",
492 "fv25", "gv90", "hv91", "im95", "jv99"},
493 {}, {}, kMaxSequenceNumber, kMaxSequenceNumber, &merge_op, &filter);
494
495 // Compaction should output just "a", "e" and "h" keys.
496 c_iter_->SeekToFirst();
497 ASSERT_TRUE(c_iter_->Valid());
498 ASSERT_EQ(test::KeyStr("a", 50, kTypeValue), c_iter_->key().ToString());
499 ASSERT_EQ("av50", c_iter_->value().ToString());
500 c_iter_->Next();
501 ASSERT_TRUE(c_iter_->Valid());
502 ASSERT_EQ(test::KeyStr("e", 71, kTypeMerge), c_iter_->key().ToString());
503 ASSERT_EQ("em71", c_iter_->value().ToString());
504 c_iter_->Next();
505 ASSERT_TRUE(c_iter_->Valid());
506 ASSERT_EQ(test::KeyStr("h", 91, kTypeValue), c_iter_->key().ToString());
507 ASSERT_EQ("hv91", c_iter_->value().ToString());
508 c_iter_->Next();
509 ASSERT_OK(c_iter_->status());
510 ASSERT_FALSE(c_iter_->Valid());
511
512 // Check that the compaction iterator did the correct sequence of calls on
513 // the underlying iterator.
514 using A = LoggingForwardVectorIterator::Action;
515 using T = A::Type;
516 std::vector<A> expected_actions = {
517 A(T::SEEK_TO_FIRST),
518 A(T::NEXT),
519 A(T::NEXT),
520 A(T::SEEK, test::KeyStr("d+", kMaxSequenceNumber, kValueTypeForSeek)),
521 A(T::NEXT),
522 A(T::NEXT),
523 A(T::SEEK, test::KeyStr("g+", kMaxSequenceNumber, kValueTypeForSeek)),
524 A(T::NEXT),
525 A(T::SEEK, test::KeyStr("z", kMaxSequenceNumber, kValueTypeForSeek))};
526 ASSERT_EQ(expected_actions, iter_->log);
527 }
528
TEST_P(CompactionIteratorTest,ShuttingDownInFilter)529 TEST_P(CompactionIteratorTest, ShuttingDownInFilter) {
530 NoMergingMergeOp merge_op;
531 StallingFilter filter;
532 InitIterators(
533 {test::KeyStr("1", 1, kTypeValue), test::KeyStr("2", 2, kTypeValue),
534 test::KeyStr("3", 3, kTypeValue), test::KeyStr("4", 4, kTypeValue)},
535 {"v1", "v2", "v3", "v4"}, {}, {}, kMaxSequenceNumber, kMaxSequenceNumber,
536 &merge_op, &filter);
537 // Don't leave tombstones (kTypeDeletion) for filtered keys.
538 compaction_proxy_->key_not_exists_beyond_output_level = true;
539
540 std::atomic<bool> seek_done{false};
541 ROCKSDB_NAMESPACE::port::Thread compaction_thread([&] {
542 c_iter_->SeekToFirst();
543 EXPECT_FALSE(c_iter_->Valid());
544 EXPECT_TRUE(c_iter_->status().IsShutdownInProgress());
545 seek_done.store(true);
546 });
547
548 // Let key 1 through.
549 filter.WaitForStall(1);
550
551 // Shutdown during compaction filter call for key 2.
552 filter.WaitForStall(2);
553 shutting_down_.store(true);
554 EXPECT_FALSE(seek_done.load());
555
556 // Unstall filter and wait for SeekToFirst() to return.
557 filter.stall_at.store(3);
558 compaction_thread.join();
559 assert(seek_done.load());
560
561 // Check that filter was never called again.
562 EXPECT_EQ(2, filter.last_seen.load());
563 }
564
565 // Same as ShuttingDownInFilter, but shutdown happens during filter call for
566 // a merge operand, not for a value.
TEST_P(CompactionIteratorTest,ShuttingDownInMerge)567 TEST_P(CompactionIteratorTest, ShuttingDownInMerge) {
568 NoMergingMergeOp merge_op;
569 StallingFilter filter;
570 InitIterators(
571 {test::KeyStr("1", 1, kTypeValue), test::KeyStr("2", 2, kTypeMerge),
572 test::KeyStr("3", 3, kTypeMerge), test::KeyStr("4", 4, kTypeValue)},
573 {"v1", "v2", "v3", "v4"}, {}, {}, kMaxSequenceNumber, kMaxSequenceNumber,
574 &merge_op, &filter);
575 compaction_proxy_->key_not_exists_beyond_output_level = true;
576
577 std::atomic<bool> seek_done{false};
578 ROCKSDB_NAMESPACE::port::Thread compaction_thread([&] {
579 c_iter_->SeekToFirst();
580 ASSERT_FALSE(c_iter_->Valid());
581 ASSERT_TRUE(c_iter_->status().IsShutdownInProgress());
582 seek_done.store(true);
583 });
584
585 // Let key 1 through.
586 filter.WaitForStall(1);
587
588 // Shutdown during compaction filter call for key 2.
589 filter.WaitForStall(2);
590 shutting_down_.store(true);
591 EXPECT_FALSE(seek_done.load());
592
593 // Unstall filter and wait for SeekToFirst() to return.
594 filter.stall_at.store(3);
595 compaction_thread.join();
596 assert(seek_done.load());
597
598 // Check that filter was never called again.
599 EXPECT_EQ(2, filter.last_seen.load());
600 }
601
TEST_P(CompactionIteratorTest,SingleMergeOperand)602 TEST_P(CompactionIteratorTest, SingleMergeOperand) {
603 class Filter : public CompactionFilter {
604 Decision FilterV2(int /*level*/, const Slice& key, ValueType t,
605 const Slice& existing_value, std::string* /*new_value*/,
606 std::string* /*skip_until*/) const override {
607 std::string k = key.ToString();
608 std::string v = existing_value.ToString();
609
610 // See InitIterators() call below for the sequence of keys and their
611 // filtering decisions. Here we closely assert that compaction filter is
612 // called with the expected keys and only them, and with the right values.
613 if (k == "a") {
614 EXPECT_EQ(ValueType::kMergeOperand, t);
615 EXPECT_EQ("av1", v);
616 return Decision::kKeep;
617 } else if (k == "b") {
618 EXPECT_EQ(ValueType::kMergeOperand, t);
619 return Decision::kKeep;
620 } else if (k == "c") {
621 return Decision::kKeep;
622 }
623
624 ADD_FAILURE();
625 return Decision::kKeep;
626 }
627
628 const char* Name() const override {
629 return "CompactionIteratorTest.SingleMergeOperand::Filter";
630 }
631 };
632
633 class SingleMergeOp : public MergeOperator {
634 public:
635 bool FullMergeV2(const MergeOperationInput& merge_in,
636 MergeOperationOutput* merge_out) const override {
637 // See InitIterators() call below for why "c" is the only key for which
638 // FullMergeV2 should be called.
639 EXPECT_EQ("c", merge_in.key.ToString());
640
641 std::string temp_value;
642 if (merge_in.existing_value != nullptr) {
643 temp_value = merge_in.existing_value->ToString();
644 }
645
646 for (auto& operand : merge_in.operand_list) {
647 temp_value.append(operand.ToString());
648 }
649 merge_out->new_value = temp_value;
650
651 return true;
652 }
653
654 bool PartialMergeMulti(const Slice& key,
655 const std::deque<Slice>& operand_list,
656 std::string* new_value,
657 Logger* /*logger*/) const override {
658 std::string string_key = key.ToString();
659 EXPECT_TRUE(string_key == "a" || string_key == "b");
660
661 if (string_key == "a") {
662 EXPECT_EQ(1, operand_list.size());
663 } else if (string_key == "b") {
664 EXPECT_EQ(2, operand_list.size());
665 }
666
667 std::string temp_value;
668 for (auto& operand : operand_list) {
669 temp_value.append(operand.ToString());
670 }
671 swap(temp_value, *new_value);
672
673 return true;
674 }
675
676 const char* Name() const override {
677 return "CompactionIteratorTest SingleMergeOp";
678 }
679
680 bool AllowSingleOperand() const override { return true; }
681 };
682
683 SingleMergeOp merge_op;
684 Filter filter;
685 InitIterators(
686 // a should invoke PartialMergeMulti with a single merge operand.
687 {test::KeyStr("a", 50, kTypeMerge),
688 // b should invoke PartialMergeMulti with two operands.
689 test::KeyStr("b", 70, kTypeMerge), test::KeyStr("b", 60, kTypeMerge),
690 // c should invoke FullMerge due to kTypeValue at the beginning.
691 test::KeyStr("c", 90, kTypeMerge), test::KeyStr("c", 80, kTypeValue)},
692 {"av1", "bv2", "bv1", "cv2", "cv1"}, {}, {}, kMaxSequenceNumber,
693 kMaxSequenceNumber, &merge_op, &filter);
694
695 c_iter_->SeekToFirst();
696 ASSERT_TRUE(c_iter_->Valid());
697 ASSERT_EQ(test::KeyStr("a", 50, kTypeMerge), c_iter_->key().ToString());
698 ASSERT_EQ("av1", c_iter_->value().ToString());
699 c_iter_->Next();
700 ASSERT_TRUE(c_iter_->Valid());
701 ASSERT_EQ("bv1bv2", c_iter_->value().ToString());
702 c_iter_->Next();
703 ASSERT_OK(c_iter_->status());
704 ASSERT_EQ("cv1cv2", c_iter_->value().ToString());
705 }
706
707 // In bottommost level, values earlier than earliest snapshot can be output
708 // with sequence = 0.
TEST_P(CompactionIteratorTest,ZeroOutSequenceAtBottomLevel)709 TEST_P(CompactionIteratorTest, ZeroOutSequenceAtBottomLevel) {
710 AddSnapshot(1);
711 RunTest({test::KeyStr("a", 1, kTypeValue), test::KeyStr("b", 2, kTypeValue)},
712 {"v1", "v2"},
713 {test::KeyStr("a", 0, kTypeValue), test::KeyStr("b", 2, kTypeValue)},
714 {"v1", "v2"}, kMaxSequenceNumber /*last_committed_seq*/,
715 nullptr /*merge_operator*/, nullptr /*compaction_filter*/,
716 true /*bottommost_level*/);
717 }
718
719 // In bottommost level, deletions earlier than earliest snapshot can be removed
720 // permanently.
TEST_P(CompactionIteratorTest,RemoveDeletionAtBottomLevel)721 TEST_P(CompactionIteratorTest, RemoveDeletionAtBottomLevel) {
722 AddSnapshot(1);
723 RunTest(
724 {test::KeyStr("a", 1, kTypeDeletion), test::KeyStr("b", 3, kTypeDeletion),
725 test::KeyStr("b", 1, kTypeValue)},
726 {"", "", ""},
727 {test::KeyStr("b", 3, kTypeDeletion), test::KeyStr("b", 0, kTypeValue)},
728 {"", ""}, kMaxSequenceNumber /*last_committed_seq*/,
729 nullptr /*merge_operator*/, nullptr /*compaction_filter*/,
730 true /*bottommost_level*/);
731 }
732
733 // In bottommost level, single deletions earlier than earliest snapshot can be
734 // removed permanently.
TEST_P(CompactionIteratorTest,RemoveSingleDeletionAtBottomLevel)735 TEST_P(CompactionIteratorTest, RemoveSingleDeletionAtBottomLevel) {
736 AddSnapshot(1);
737 RunTest({test::KeyStr("a", 1, kTypeSingleDeletion),
738 test::KeyStr("b", 2, kTypeSingleDeletion)},
739 {"", ""}, {test::KeyStr("b", 2, kTypeSingleDeletion)}, {""},
740 kMaxSequenceNumber /*last_committed_seq*/, nullptr /*merge_operator*/,
741 nullptr /*compaction_filter*/, true /*bottommost_level*/);
742 }
743
TEST_P(CompactionIteratorTest,ConvertToPutAtBottom)744 TEST_P(CompactionIteratorTest, ConvertToPutAtBottom) {
745 std::shared_ptr<MergeOperator> merge_op =
746 MergeOperators::CreateStringAppendOperator();
747 RunTest({test::KeyStr("a", 4, kTypeMerge), test::KeyStr("a", 3, kTypeMerge),
748 test::KeyStr("a", 2, kTypeMerge), test::KeyStr("b", 1, kTypeValue)},
749 {"a4", "a3", "a2", "b1"},
750 {test::KeyStr("a", 0, kTypeValue), test::KeyStr("b", 0, kTypeValue)},
751 {"a2,a3,a4", "b1"}, kMaxSequenceNumber /*last_committed_seq*/,
752 merge_op.get(), nullptr /*compaction_filter*/,
753 true /*bottomost_level*/);
754 }
755
756 INSTANTIATE_TEST_CASE_P(CompactionIteratorTestInstance, CompactionIteratorTest,
757 testing::Values(true, false));
758
759 // Tests how CompactionIterator work together with SnapshotChecker.
760 class CompactionIteratorWithSnapshotCheckerTest
761 : public CompactionIteratorTest {
762 public:
UseSnapshotChecker() const763 bool UseSnapshotChecker() const override { return true; }
764 };
765
766 // Uncommitted keys (keys with seq > last_committed_seq) should be output as-is
767 // while committed version of these keys should get compacted as usual.
768
TEST_F(CompactionIteratorWithSnapshotCheckerTest,PreserveUncommittedKeys_Value)769 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
770 PreserveUncommittedKeys_Value) {
771 RunTest(
772 {test::KeyStr("foo", 3, kTypeValue), test::KeyStr("foo", 2, kTypeValue),
773 test::KeyStr("foo", 1, kTypeValue)},
774 {"v3", "v2", "v1"},
775 {test::KeyStr("foo", 3, kTypeValue), test::KeyStr("foo", 2, kTypeValue)},
776 {"v3", "v2"}, 2 /*last_committed_seq*/);
777 }
778
TEST_F(CompactionIteratorWithSnapshotCheckerTest,PreserveUncommittedKeys_Deletion)779 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
780 PreserveUncommittedKeys_Deletion) {
781 RunTest({test::KeyStr("foo", 2, kTypeDeletion),
782 test::KeyStr("foo", 1, kTypeValue)},
783 {"", "v1"},
784 {test::KeyStr("foo", 2, kTypeDeletion),
785 test::KeyStr("foo", 1, kTypeValue)},
786 {"", "v1"}, 1 /*last_committed_seq*/);
787 }
788
TEST_F(CompactionIteratorWithSnapshotCheckerTest,PreserveUncommittedKeys_Merge)789 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
790 PreserveUncommittedKeys_Merge) {
791 auto merge_op = MergeOperators::CreateStringAppendOperator();
792 RunTest(
793 {test::KeyStr("foo", 3, kTypeMerge), test::KeyStr("foo", 2, kTypeMerge),
794 test::KeyStr("foo", 1, kTypeValue)},
795 {"v3", "v2", "v1"},
796 {test::KeyStr("foo", 3, kTypeMerge), test::KeyStr("foo", 2, kTypeValue)},
797 {"v3", "v1,v2"}, 2 /*last_committed_seq*/, merge_op.get());
798 }
799
TEST_F(CompactionIteratorWithSnapshotCheckerTest,PreserveUncommittedKeys_SingleDelete)800 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
801 PreserveUncommittedKeys_SingleDelete) {
802 RunTest({test::KeyStr("foo", 2, kTypeSingleDeletion),
803 test::KeyStr("foo", 1, kTypeValue)},
804 {"", "v1"},
805 {test::KeyStr("foo", 2, kTypeSingleDeletion),
806 test::KeyStr("foo", 1, kTypeValue)},
807 {"", "v1"}, 1 /*last_committed_seq*/);
808 }
809
TEST_F(CompactionIteratorWithSnapshotCheckerTest,PreserveUncommittedKeys_BlobIndex)810 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
811 PreserveUncommittedKeys_BlobIndex) {
812 RunTest({test::KeyStr("foo", 3, kTypeBlobIndex),
813 test::KeyStr("foo", 2, kTypeBlobIndex),
814 test::KeyStr("foo", 1, kTypeBlobIndex)},
815 {"v3", "v2", "v1"},
816 {test::KeyStr("foo", 3, kTypeBlobIndex),
817 test::KeyStr("foo", 2, kTypeBlobIndex)},
818 {"v3", "v2"}, 2 /*last_committed_seq*/);
819 }
820
821 // Test compaction iterator dedup keys visible to the same snapshot.
822
TEST_F(CompactionIteratorWithSnapshotCheckerTest,DedupSameSnapshot_Value)823 TEST_F(CompactionIteratorWithSnapshotCheckerTest, DedupSameSnapshot_Value) {
824 AddSnapshot(2, 1);
825 RunTest(
826 {test::KeyStr("foo", 4, kTypeValue), test::KeyStr("foo", 3, kTypeValue),
827 test::KeyStr("foo", 2, kTypeValue), test::KeyStr("foo", 1, kTypeValue)},
828 {"v4", "v3", "v2", "v1"},
829 {test::KeyStr("foo", 4, kTypeValue), test::KeyStr("foo", 3, kTypeValue),
830 test::KeyStr("foo", 1, kTypeValue)},
831 {"v4", "v3", "v1"}, 3 /*last_committed_seq*/);
832 }
833
TEST_F(CompactionIteratorWithSnapshotCheckerTest,DedupSameSnapshot_Deletion)834 TEST_F(CompactionIteratorWithSnapshotCheckerTest, DedupSameSnapshot_Deletion) {
835 AddSnapshot(2, 1);
836 RunTest(
837 {test::KeyStr("foo", 4, kTypeValue),
838 test::KeyStr("foo", 3, kTypeDeletion),
839 test::KeyStr("foo", 2, kTypeValue), test::KeyStr("foo", 1, kTypeValue)},
840 {"v4", "", "v2", "v1"},
841 {test::KeyStr("foo", 4, kTypeValue),
842 test::KeyStr("foo", 3, kTypeDeletion),
843 test::KeyStr("foo", 1, kTypeValue)},
844 {"v4", "", "v1"}, 3 /*last_committed_seq*/);
845 }
846
TEST_F(CompactionIteratorWithSnapshotCheckerTest,DedupSameSnapshot_Merge)847 TEST_F(CompactionIteratorWithSnapshotCheckerTest, DedupSameSnapshot_Merge) {
848 AddSnapshot(2, 1);
849 AddSnapshot(4, 3);
850 auto merge_op = MergeOperators::CreateStringAppendOperator();
851 RunTest(
852 {test::KeyStr("foo", 5, kTypeMerge), test::KeyStr("foo", 4, kTypeMerge),
853 test::KeyStr("foo", 3, kTypeMerge), test::KeyStr("foo", 2, kTypeMerge),
854 test::KeyStr("foo", 1, kTypeValue)},
855 {"v5", "v4", "v3", "v2", "v1"},
856 {test::KeyStr("foo", 5, kTypeMerge), test::KeyStr("foo", 4, kTypeMerge),
857 test::KeyStr("foo", 3, kTypeMerge), test::KeyStr("foo", 1, kTypeValue)},
858 {"v5", "v4", "v2,v3", "v1"}, 4 /*last_committed_seq*/, merge_op.get());
859 }
860
TEST_F(CompactionIteratorWithSnapshotCheckerTest,DedupSameSnapshot_SingleDeletion)861 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
862 DedupSameSnapshot_SingleDeletion) {
863 AddSnapshot(2, 1);
864 RunTest(
865 {test::KeyStr("foo", 4, kTypeValue),
866 test::KeyStr("foo", 3, kTypeSingleDeletion),
867 test::KeyStr("foo", 2, kTypeValue), test::KeyStr("foo", 1, kTypeValue)},
868 {"v4", "", "v2", "v1"},
869 {test::KeyStr("foo", 4, kTypeValue), test::KeyStr("foo", 1, kTypeValue)},
870 {"v4", "v1"}, 3 /*last_committed_seq*/);
871 }
872
TEST_F(CompactionIteratorWithSnapshotCheckerTest,DedupSameSnapshot_BlobIndex)873 TEST_F(CompactionIteratorWithSnapshotCheckerTest, DedupSameSnapshot_BlobIndex) {
874 AddSnapshot(2, 1);
875 RunTest({test::KeyStr("foo", 4, kTypeBlobIndex),
876 test::KeyStr("foo", 3, kTypeBlobIndex),
877 test::KeyStr("foo", 2, kTypeBlobIndex),
878 test::KeyStr("foo", 1, kTypeBlobIndex)},
879 {"v4", "v3", "v2", "v1"},
880 {test::KeyStr("foo", 4, kTypeBlobIndex),
881 test::KeyStr("foo", 3, kTypeBlobIndex),
882 test::KeyStr("foo", 1, kTypeBlobIndex)},
883 {"v4", "v3", "v1"}, 3 /*last_committed_seq*/);
884 }
885
886 // At bottom level, sequence numbers can be zero out, and deletions can be
887 // removed, but only when they are visible to earliest snapshot.
888
TEST_F(CompactionIteratorWithSnapshotCheckerTest,NotZeroOutSequenceIfNotVisibleToEarliestSnapshot)889 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
890 NotZeroOutSequenceIfNotVisibleToEarliestSnapshot) {
891 AddSnapshot(2, 1);
892 RunTest({test::KeyStr("a", 1, kTypeValue), test::KeyStr("b", 2, kTypeValue),
893 test::KeyStr("c", 3, kTypeValue)},
894 {"v1", "v2", "v3"},
895 {test::KeyStr("a", 0, kTypeValue), test::KeyStr("b", 2, kTypeValue),
896 test::KeyStr("c", 3, kTypeValue)},
897 {"v1", "v2", "v3"}, kMaxSequenceNumber /*last_committed_seq*/,
898 nullptr /*merge_operator*/, nullptr /*compaction_filter*/,
899 true /*bottommost_level*/);
900 }
901
TEST_F(CompactionIteratorWithSnapshotCheckerTest,NotRemoveDeletionIfNotVisibleToEarliestSnapshot)902 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
903 NotRemoveDeletionIfNotVisibleToEarliestSnapshot) {
904 AddSnapshot(2, 1);
905 RunTest(
906 {test::KeyStr("a", 1, kTypeDeletion), test::KeyStr("b", 2, kTypeDeletion),
907 test::KeyStr("c", 3, kTypeDeletion)},
908 {"", "", ""}, {}, {"", ""}, kMaxSequenceNumber /*last_committed_seq*/,
909 nullptr /*merge_operator*/, nullptr /*compaction_filter*/,
910 true /*bottommost_level*/);
911 }
912
TEST_F(CompactionIteratorWithSnapshotCheckerTest,NotRemoveDeletionIfValuePresentToEarlierSnapshot)913 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
914 NotRemoveDeletionIfValuePresentToEarlierSnapshot) {
915 AddSnapshot(2,1);
916 RunTest({test::KeyStr("a", 4, kTypeDeletion),
917 test::KeyStr("a", 1, kTypeValue), test::KeyStr("b", 3, kTypeValue)},
918 {"", "", ""},
919 {test::KeyStr("a", 4, kTypeDeletion),
920 test::KeyStr("a", 0, kTypeValue), test::KeyStr("b", 3, kTypeValue)},
921 {"", "", ""}, kMaxSequenceNumber /*last_committed_seq*/,
922 nullptr /*merge_operator*/, nullptr /*compaction_filter*/,
923 true /*bottommost_level*/);
924 }
925
TEST_F(CompactionIteratorWithSnapshotCheckerTest,NotRemoveSingleDeletionIfNotVisibleToEarliestSnapshot)926 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
927 NotRemoveSingleDeletionIfNotVisibleToEarliestSnapshot) {
928 AddSnapshot(2, 1);
929 RunTest({test::KeyStr("a", 1, kTypeSingleDeletion),
930 test::KeyStr("b", 2, kTypeSingleDeletion),
931 test::KeyStr("c", 3, kTypeSingleDeletion)},
932 {"", "", ""},
933 {test::KeyStr("b", 2, kTypeSingleDeletion),
934 test::KeyStr("c", 3, kTypeSingleDeletion)},
935 {"", ""}, kMaxSequenceNumber /*last_committed_seq*/,
936 nullptr /*merge_operator*/, nullptr /*compaction_filter*/,
937 true /*bottommost_level*/);
938 }
939
940 // Single delete should not cancel out values that not visible to the
941 // same set of snapshots
TEST_F(CompactionIteratorWithSnapshotCheckerTest,SingleDeleteAcrossSnapshotBoundary)942 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
943 SingleDeleteAcrossSnapshotBoundary) {
944 AddSnapshot(2, 1);
945 RunTest({test::KeyStr("a", 2, kTypeSingleDeletion),
946 test::KeyStr("a", 1, kTypeValue)},
947 {"", "v1"},
948 {test::KeyStr("a", 2, kTypeSingleDeletion),
949 test::KeyStr("a", 1, kTypeValue)},
950 {"", "v1"}, 2 /*last_committed_seq*/);
951 }
952
953 // Single delete should be kept in case it is not visible to the
954 // earliest write conflict snapshot. If a single delete is kept for this reason,
955 // corresponding value can be trimmed to save space.
TEST_F(CompactionIteratorWithSnapshotCheckerTest,KeepSingleDeletionForWriteConflictChecking)956 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
957 KeepSingleDeletionForWriteConflictChecking) {
958 AddSnapshot(2, 0);
959 RunTest({test::KeyStr("a", 2, kTypeSingleDeletion),
960 test::KeyStr("a", 1, kTypeValue)},
961 {"", "v1"},
962 {test::KeyStr("a", 2, kTypeSingleDeletion),
963 test::KeyStr("a", 1, kTypeValue)},
964 {"", ""}, 2 /*last_committed_seq*/, nullptr /*merge_operator*/,
965 nullptr /*compaction_filter*/, false /*bottommost_level*/,
966 2 /*earliest_write_conflict_snapshot*/);
967 }
968
969 // Same as above but with a blob index. In addition to the value getting
970 // trimmed, the type of the KV is changed to kTypeValue.
TEST_F(CompactionIteratorWithSnapshotCheckerTest,KeepSingleDeletionForWriteConflictChecking_BlobIndex)971 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
972 KeepSingleDeletionForWriteConflictChecking_BlobIndex) {
973 AddSnapshot(2, 0);
974 RunTest({test::KeyStr("a", 2, kTypeSingleDeletion),
975 test::KeyStr("a", 1, kTypeBlobIndex)},
976 {"", "fake_blob_index"},
977 {test::KeyStr("a", 2, kTypeSingleDeletion),
978 test::KeyStr("a", 1, kTypeValue)},
979 {"", ""}, 2 /*last_committed_seq*/, nullptr /*merge_operator*/,
980 nullptr /*compaction_filter*/, false /*bottommost_level*/,
981 2 /*earliest_write_conflict_snapshot*/);
982 }
983
984 // Compaction filter should keep uncommitted key as-is, and
985 // * Convert the latest value to deletion, and/or
986 // * if latest value is a merge, apply filter to all subsequent merges.
987
TEST_F(CompactionIteratorWithSnapshotCheckerTest,CompactionFilter_Value)988 TEST_F(CompactionIteratorWithSnapshotCheckerTest, CompactionFilter_Value) {
989 std::unique_ptr<CompactionFilter> compaction_filter(
990 new FilterAllKeysCompactionFilter());
991 RunTest(
992 {test::KeyStr("a", 2, kTypeValue), test::KeyStr("a", 1, kTypeValue),
993 test::KeyStr("b", 3, kTypeValue), test::KeyStr("c", 1, kTypeValue)},
994 {"v2", "v1", "v3", "v4"},
995 {test::KeyStr("a", 2, kTypeValue), test::KeyStr("a", 1, kTypeDeletion),
996 test::KeyStr("b", 3, kTypeValue), test::KeyStr("c", 1, kTypeDeletion)},
997 {"v2", "", "v3", ""}, 1 /*last_committed_seq*/,
998 nullptr /*merge_operator*/, compaction_filter.get());
999 }
1000
TEST_F(CompactionIteratorWithSnapshotCheckerTest,CompactionFilter_Deletion)1001 TEST_F(CompactionIteratorWithSnapshotCheckerTest, CompactionFilter_Deletion) {
1002 std::unique_ptr<CompactionFilter> compaction_filter(
1003 new FilterAllKeysCompactionFilter());
1004 RunTest(
1005 {test::KeyStr("a", 2, kTypeDeletion), test::KeyStr("a", 1, kTypeValue)},
1006 {"", "v1"},
1007 {test::KeyStr("a", 2, kTypeDeletion),
1008 test::KeyStr("a", 1, kTypeDeletion)},
1009 {"", ""}, 1 /*last_committed_seq*/, nullptr /*merge_operator*/,
1010 compaction_filter.get());
1011 }
1012
TEST_F(CompactionIteratorWithSnapshotCheckerTest,CompactionFilter_PartialMerge)1013 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
1014 CompactionFilter_PartialMerge) {
1015 std::shared_ptr<MergeOperator> merge_op =
1016 MergeOperators::CreateStringAppendOperator();
1017 std::unique_ptr<CompactionFilter> compaction_filter(
1018 new FilterAllKeysCompactionFilter());
1019 RunTest({test::KeyStr("a", 3, kTypeMerge), test::KeyStr("a", 2, kTypeMerge),
1020 test::KeyStr("a", 1, kTypeMerge)},
1021 {"v3", "v2", "v1"}, {test::KeyStr("a", 3, kTypeMerge)}, {"v3"},
1022 2 /*last_committed_seq*/, merge_op.get(), compaction_filter.get());
1023 }
1024
TEST_F(CompactionIteratorWithSnapshotCheckerTest,CompactionFilter_FullMerge)1025 TEST_F(CompactionIteratorWithSnapshotCheckerTest, CompactionFilter_FullMerge) {
1026 std::shared_ptr<MergeOperator> merge_op =
1027 MergeOperators::CreateStringAppendOperator();
1028 std::unique_ptr<CompactionFilter> compaction_filter(
1029 new FilterAllKeysCompactionFilter());
1030 RunTest(
1031 {test::KeyStr("a", 3, kTypeMerge), test::KeyStr("a", 2, kTypeMerge),
1032 test::KeyStr("a", 1, kTypeValue)},
1033 {"v3", "v2", "v1"},
1034 {test::KeyStr("a", 3, kTypeMerge), test::KeyStr("a", 1, kTypeDeletion)},
1035 {"v3", ""}, 2 /*last_committed_seq*/, merge_op.get(),
1036 compaction_filter.get());
1037 }
1038
1039 // Tests how CompactionIterator work together with AllowIngestBehind.
1040 class CompactionIteratorWithAllowIngestBehindTest
1041 : public CompactionIteratorTest {
1042 public:
AllowIngestBehind() const1043 bool AllowIngestBehind() const override { return true; }
1044 };
1045
1046 // When allow_ingest_behind is set, compaction iterator is not targeting
1047 // the bottommost level since there is no guarantee there won't be further
1048 // data ingested under the compaction output in future.
TEST_P(CompactionIteratorWithAllowIngestBehindTest,NoConvertToPutAtBottom)1049 TEST_P(CompactionIteratorWithAllowIngestBehindTest, NoConvertToPutAtBottom) {
1050 std::shared_ptr<MergeOperator> merge_op =
1051 MergeOperators::CreateStringAppendOperator();
1052 RunTest({test::KeyStr("a", 4, kTypeMerge), test::KeyStr("a", 3, kTypeMerge),
1053 test::KeyStr("a", 2, kTypeMerge), test::KeyStr("b", 1, kTypeValue)},
1054 {"a4", "a3", "a2", "b1"},
1055 {test::KeyStr("a", 4, kTypeMerge), test::KeyStr("b", 1, kTypeValue)},
1056 {"a2,a3,a4", "b1"}, kMaxSequenceNumber /*last_committed_seq*/,
1057 merge_op.get(), nullptr /*compaction_filter*/,
1058 true /*bottomost_level*/);
1059 }
1060
TEST_P(CompactionIteratorWithAllowIngestBehindTest,MergeToPutIfEncounteredPutAtBottom)1061 TEST_P(CompactionIteratorWithAllowIngestBehindTest,
1062 MergeToPutIfEncounteredPutAtBottom) {
1063 std::shared_ptr<MergeOperator> merge_op =
1064 MergeOperators::CreateStringAppendOperator();
1065 RunTest({test::KeyStr("a", 4, kTypeMerge), test::KeyStr("a", 3, kTypeMerge),
1066 test::KeyStr("a", 2, kTypeValue), test::KeyStr("b", 1, kTypeValue)},
1067 {"a4", "a3", "a2", "b1"},
1068 {test::KeyStr("a", 4, kTypeValue), test::KeyStr("b", 1, kTypeValue)},
1069 {"a2,a3,a4", "b1"}, kMaxSequenceNumber /*last_committed_seq*/,
1070 merge_op.get(), nullptr /*compaction_filter*/,
1071 true /*bottomost_level*/);
1072 }
1073
1074 INSTANTIATE_TEST_CASE_P(CompactionIteratorWithAllowIngestBehindTestInstance,
1075 CompactionIteratorWithAllowIngestBehindTest,
1076 testing::Values(true, false));
1077
1078 class CompactionIteratorTsGcTest : public CompactionIteratorTest {
1079 public:
CompactionIteratorTsGcTest()1080 CompactionIteratorTsGcTest()
1081 : CompactionIteratorTest(test::ComparatorWithU64Ts()) {}
1082 };
1083
TEST_P(CompactionIteratorTsGcTest,NoKeyEligibleForGC)1084 TEST_P(CompactionIteratorTsGcTest, NoKeyEligibleForGC) {
1085 constexpr char user_key[][2] = {{'a', '\0'}, {'b', '\0'}};
1086 const std::vector<std::string> input_keys = {
1087 test::KeyStr(/*ts=*/103, user_key[0], /*seq=*/4, kTypeValue),
1088 test::KeyStr(/*ts=*/102, user_key[0], /*seq=*/3,
1089 kTypeDeletionWithTimestamp),
1090 test::KeyStr(/*ts=*/104, user_key[1], /*seq=*/5, kTypeValue)};
1091 const std::vector<std::string> input_values = {"a3", "", "b2"};
1092 std::string full_history_ts_low;
1093 // All keys' timestamps are newer than or equal to 102, thus none of them
1094 // will be eligible for GC.
1095 PutFixed64(&full_history_ts_low, 102);
1096 const std::vector<std::string>& expected_keys = input_keys;
1097 const std::vector<std::string>& expected_values = input_values;
1098 const std::vector<std::pair<bool, bool>> params = {
1099 {false, false}, {false, true}, {true, true}};
1100 for (const std::pair<bool, bool>& param : params) {
1101 const bool bottommost_level = param.first;
1102 const bool key_not_exists_beyond_output_level = param.second;
1103 RunTest(input_keys, input_values, expected_keys, expected_values,
1104 /*last_committed_seq=*/kMaxSequenceNumber,
1105 /*merge_operator=*/nullptr, /*compaction_filter=*/nullptr,
1106 bottommost_level,
1107 /*earliest_write_conflict_snapshot=*/kMaxSequenceNumber,
1108 key_not_exists_beyond_output_level, &full_history_ts_low);
1109 }
1110 }
1111
TEST_P(CompactionIteratorTsGcTest,AllKeysOlderThanThreshold)1112 TEST_P(CompactionIteratorTsGcTest, AllKeysOlderThanThreshold) {
1113 constexpr char user_key[][2] = {{'a', '\0'}, {'b', '\0'}};
1114 const std::vector<std::string> input_keys = {
1115 test::KeyStr(/*ts=*/103, user_key[0], /*seq=*/4,
1116 kTypeDeletionWithTimestamp),
1117 test::KeyStr(/*ts=*/102, user_key[0], /*seq=*/3, kTypeValue),
1118 test::KeyStr(/*ts=*/101, user_key[0], /*seq=*/2, kTypeValue),
1119 test::KeyStr(/*ts=*/104, user_key[1], /*seq=*/5, kTypeValue)};
1120 const std::vector<std::string> input_values = {"", "a2", "a1", "b5"};
1121 std::string full_history_ts_low;
1122 PutFixed64(&full_history_ts_low, std::numeric_limits<uint64_t>::max());
1123 {
1124 // With a snapshot at seq 3, both the deletion marker and the key at 3 must
1125 // be preserved.
1126 AddSnapshot(3);
1127 const std::vector<std::string> expected_keys = {
1128 input_keys[0], input_keys[1], input_keys[3]};
1129 const std::vector<std::string> expected_values = {"", "a2", "b5"};
1130 RunTest(input_keys, input_values, expected_keys, expected_values,
1131 /*last_committed_seq=*/kMaxSequenceNumber,
1132 /*merge_operator=*/nullptr, /*compaction_filter=*/nullptr,
1133 /*bottommost_level=*/false,
1134 /*earliest_write_conflict_snapshot=*/kMaxSequenceNumber,
1135 /*key_not_exists_beyond_output_level=*/false, &full_history_ts_low);
1136 ClearSnapshots();
1137 }
1138 {
1139 // No snapshot, the deletion marker should be preserved because the user
1140 // key may appear beyond output level.
1141 const std::vector<std::string> expected_keys = {input_keys[0],
1142 input_keys[3]};
1143 const std::vector<std::string> expected_values = {"", "b5"};
1144 RunTest(input_keys, input_values, expected_keys, expected_values,
1145 /*last_committed_seq=*/kMaxSequenceNumber,
1146 /*merge_operator=*/nullptr, /*compaction_filter=*/nullptr,
1147 /*bottommost_level=*/false,
1148 /*earliest_write_conflict_snapshot=*/kMaxSequenceNumber,
1149 /*key_not_exists_beyond_output_level=*/false, &full_history_ts_low);
1150 }
1151 {
1152 // No snapshot, the deletion marker can be dropped because the user key
1153 // does not appear in higher levels.
1154 const std::vector<std::string> expected_keys = {input_keys[3]};
1155 const std::vector<std::string> expected_values = {"b5"};
1156 RunTest(input_keys, input_values, expected_keys, expected_values,
1157 /*last_committed_seq=*/kMaxSequenceNumber,
1158 /*merge_operator=*/nullptr, /*compaction_filter=*/nullptr,
1159 /*bottommost_level=*/false,
1160 /*earliest_write_conflict_snapshot=*/kMaxSequenceNumber,
1161 /*key_not_exists_beyond_output_level=*/true, &full_history_ts_low);
1162 }
1163 }
1164
TEST_P(CompactionIteratorTsGcTest,NewHidesOldSameSnapshot)1165 TEST_P(CompactionIteratorTsGcTest, NewHidesOldSameSnapshot) {
1166 constexpr char user_key[] = "a";
1167 const std::vector<std::string> input_keys = {
1168 test::KeyStr(/*ts=*/103, user_key, /*seq=*/4, kTypeDeletionWithTimestamp),
1169 test::KeyStr(/*ts=*/102, user_key, /*seq=*/3, kTypeValue),
1170 test::KeyStr(/*ts=*/101, user_key, /*seq=*/2, kTypeValue),
1171 test::KeyStr(/*ts=*/100, user_key, /*seq=*/1, kTypeValue)};
1172 const std::vector<std::string> input_values = {"", "a2", "a1", "a0"};
1173 {
1174 std::string full_history_ts_low;
1175 // Keys whose timestamps larger than or equal to 102 will be preserved.
1176 PutFixed64(&full_history_ts_low, 102);
1177 const std::vector<std::string> expected_keys = {input_keys[0],
1178 input_keys[1]};
1179 const std::vector<std::string> expected_values = {"", "a2"};
1180 RunTest(input_keys, input_values, expected_keys, expected_values,
1181 /*last_committed_seq=*/kMaxSequenceNumber,
1182 /*merge_operator=*/nullptr, /*compaction_filter=*/nullptr,
1183 /*bottommost_level=*/false,
1184 /*earliest_write_conflict_snapshot=*/kMaxSequenceNumber,
1185 /*key_not_exists_beyond_output_level=*/false, &full_history_ts_low);
1186 }
1187 }
1188
TEST_P(CompactionIteratorTsGcTest,DropTombstones)1189 TEST_P(CompactionIteratorTsGcTest, DropTombstones) {
1190 constexpr char user_key[] = "a";
1191 const std::vector<std::string> input_keys = {
1192 test::KeyStr(/*ts=*/103, user_key, /*seq=*/4, kTypeDeletionWithTimestamp),
1193 test::KeyStr(/*ts=*/102, user_key, /*seq=*/3, kTypeValue),
1194 test::KeyStr(/*ts=*/101, user_key, /*seq=*/2, kTypeDeletionWithTimestamp),
1195 test::KeyStr(/*ts=*/100, user_key, /*seq=*/1, kTypeValue)};
1196 const std::vector<std::string> input_values = {"", "a2", "", "a0"};
1197 const std::vector<std::string> expected_keys = {input_keys[0], input_keys[1]};
1198 const std::vector<std::string> expected_values = {"", "a2"};
1199
1200 // Take a snapshot at seq 2.
1201 AddSnapshot(2);
1202
1203 {
1204 // Non-bottommost level, but key does not exist beyond output level.
1205 std::string full_history_ts_low;
1206 PutFixed64(&full_history_ts_low, 102);
1207 RunTest(input_keys, input_values, expected_keys, expected_values,
1208 /*last_committed_sequence=*/kMaxSequenceNumber,
1209 /*merge_op=*/nullptr, /*compaction_filter=*/nullptr,
1210 /*bottommost_level=*/false,
1211 /*earliest_write_conflict_snapshot=*/kMaxSequenceNumber,
1212 /*key_not_exists_beyond_output_level=*/true, &full_history_ts_low);
1213 }
1214 {
1215 // Bottommost level
1216 std::string full_history_ts_low;
1217 PutFixed64(&full_history_ts_low, 102);
1218 RunTest(input_keys, input_values, expected_keys, expected_values,
1219 /*last_committed_seq=*/kMaxSequenceNumber,
1220 /*merge_operator=*/nullptr, /*compaction_filter=*/nullptr,
1221 /*bottommost_level=*/true,
1222 /*earliest_write_conflict_snapshot=*/kMaxSequenceNumber,
1223 /*key_not_exists_beyond_output_level=*/false, &full_history_ts_low);
1224 }
1225 }
1226
TEST_P(CompactionIteratorTsGcTest,RewriteTs)1227 TEST_P(CompactionIteratorTsGcTest, RewriteTs) {
1228 constexpr char user_key[] = "a";
1229 const std::vector<std::string> input_keys = {
1230 test::KeyStr(/*ts=*/103, user_key, /*seq=*/4, kTypeDeletionWithTimestamp),
1231 test::KeyStr(/*ts=*/102, user_key, /*seq=*/3, kTypeValue),
1232 test::KeyStr(/*ts=*/101, user_key, /*seq=*/2, kTypeDeletionWithTimestamp),
1233 test::KeyStr(/*ts=*/100, user_key, /*seq=*/1, kTypeValue)};
1234 const std::vector<std::string> input_values = {"", "a2", "", "a0"};
1235 const std::vector<std::string> expected_keys = {
1236 input_keys[0], input_keys[1], input_keys[2],
1237 test::KeyStr(/*ts=*/0, user_key, /*seq=*/0, kTypeValue)};
1238 const std::vector<std::string> expected_values = {"", "a2", "", "a0"};
1239
1240 AddSnapshot(1);
1241 AddSnapshot(2);
1242
1243 {
1244 // Bottommost level and need to rewrite both ts and seq.
1245 std::string full_history_ts_low;
1246 PutFixed64(&full_history_ts_low, 102);
1247 RunTest(input_keys, input_values, expected_keys, expected_values,
1248 /*last_committed_seq=*/kMaxSequenceNumber,
1249 /*merge_operator=*/nullptr, /*compaction_filter=*/nullptr,
1250 /*bottommost_level=*/true,
1251 /*earliest_write_conflict_snapshot=*/kMaxSequenceNumber,
1252 /*key_not_exists_beyond_output_level=*/true, &full_history_ts_low);
1253 }
1254 }
1255
1256 INSTANTIATE_TEST_CASE_P(CompactionIteratorTsGcTestInstance,
1257 CompactionIteratorTsGcTest,
1258 testing::Values(true, false));
1259
1260 } // namespace ROCKSDB_NAMESPACE
1261
main(int argc,char ** argv)1262 int main(int argc, char** argv) {
1263 ::testing::InitGoogleTest(&argc, argv);
1264 return RUN_ALL_TESTS();
1265 }
1266