1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 
7 #include <string>
8 #include <vector>
9 
10 #include "db/compaction/compaction_iterator.h"
11 #include "port/port.h"
12 #include "test_util/testharness.h"
13 #include "test_util/testutil.h"
14 #include "util/string_util.h"
15 #include "utilities/merge_operators.h"
16 
17 namespace ROCKSDB_NAMESPACE {
18 
19 // Expects no merging attempts.
20 class NoMergingMergeOp : public MergeOperator {
21  public:
FullMergeV2(const MergeOperationInput &,MergeOperationOutput *) const22   bool FullMergeV2(const MergeOperationInput& /*merge_in*/,
23                    MergeOperationOutput* /*merge_out*/) const override {
24     ADD_FAILURE();
25     return false;
26   }
PartialMergeMulti(const Slice &,const std::deque<Slice> &,std::string *,Logger *) const27   bool PartialMergeMulti(const Slice& /*key*/,
28                          const std::deque<Slice>& /*operand_list*/,
29                          std::string* /*new_value*/,
30                          Logger* /*logger*/) const override {
31     ADD_FAILURE();
32     return false;
33   }
Name() const34   const char* Name() const override {
35     return "CompactionIteratorTest NoMergingMergeOp";
36   }
37 };
38 
39 // Compaction filter that gets stuck when it sees a particular key,
40 // then gets unstuck when told to.
41 // Always returns Decition::kRemove.
42 class StallingFilter : public CompactionFilter {
43  public:
FilterV2(int,const Slice & key,ValueType,const Slice &,std::string *,std::string *) const44   Decision FilterV2(int /*level*/, const Slice& key, ValueType /*type*/,
45                     const Slice& /*existing_value*/, std::string* /*new_value*/,
46                     std::string* /*skip_until*/) const override {
47     int k = std::atoi(key.ToString().c_str());
48     last_seen.store(k);
49     while (k >= stall_at.load()) {
50       std::this_thread::yield();
51     }
52     return Decision::kRemove;
53   }
54 
Name() const55   const char* Name() const override {
56     return "CompactionIteratorTest StallingFilter";
57   }
58 
59   // Wait until the filter sees a key >= k and stalls at that key.
60   // If `exact`, asserts that the seen key is equal to k.
WaitForStall(int k,bool exact=true)61   void WaitForStall(int k, bool exact = true) {
62     stall_at.store(k);
63     while (last_seen.load() < k) {
64       std::this_thread::yield();
65     }
66     if (exact) {
67       EXPECT_EQ(k, last_seen.load());
68     }
69   }
70 
71   // Filter will stall on key >= stall_at. Advance stall_at to unstall.
72   mutable std::atomic<int> stall_at{0};
73   // Last key the filter was called with.
74   mutable std::atomic<int> last_seen{0};
75 };
76 
77 // Compaction filter that filter out all keys.
78 class FilterAllKeysCompactionFilter : public CompactionFilter {
79  public:
FilterV2(int,const Slice &,ValueType,const Slice &,std::string *,std::string *) const80   Decision FilterV2(int /*level*/, const Slice& /*key*/, ValueType /*type*/,
81                     const Slice& /*existing_value*/, std::string* /*new_value*/,
82                     std::string* /*skip_until*/) const override {
83     return Decision::kRemove;
84   }
85 
Name() const86   const char* Name() const override { return "AllKeysCompactionFilter"; }
87 };
88 
89 class LoggingForwardVectorIterator : public InternalIterator {
90  public:
91   struct Action {
92     enum class Type {
93       SEEK_TO_FIRST,
94       SEEK,
95       NEXT,
96     };
97 
98     Type type;
99     std::string arg;
100 
ActionROCKSDB_NAMESPACE::LoggingForwardVectorIterator::Action101     explicit Action(Type _type, std::string _arg = "")
102         : type(_type), arg(_arg) {}
103 
operator ==ROCKSDB_NAMESPACE::LoggingForwardVectorIterator::Action104     bool operator==(const Action& rhs) const {
105       return std::tie(type, arg) == std::tie(rhs.type, rhs.arg);
106     }
107   };
108 
LoggingForwardVectorIterator(const std::vector<std::string> & keys,const std::vector<std::string> & values)109   LoggingForwardVectorIterator(const std::vector<std::string>& keys,
110                                const std::vector<std::string>& values)
111       : keys_(keys), values_(values), current_(keys.size()) {
112     assert(keys_.size() == values_.size());
113   }
114 
Valid() const115   bool Valid() const override { return current_ < keys_.size(); }
116 
SeekToFirst()117   void SeekToFirst() override {
118     log.emplace_back(Action::Type::SEEK_TO_FIRST);
119     current_ = 0;
120   }
SeekToLast()121   void SeekToLast() override { assert(false); }
122 
Seek(const Slice & target)123   void Seek(const Slice& target) override {
124     log.emplace_back(Action::Type::SEEK, target.ToString());
125     current_ = std::lower_bound(keys_.begin(), keys_.end(), target.ToString()) -
126                keys_.begin();
127   }
128 
SeekForPrev(const Slice &)129   void SeekForPrev(const Slice& /*target*/) override { assert(false); }
130 
Next()131   void Next() override {
132     assert(Valid());
133     log.emplace_back(Action::Type::NEXT);
134     current_++;
135   }
Prev()136   void Prev() override { assert(false); }
137 
key() const138   Slice key() const override {
139     assert(Valid());
140     return Slice(keys_[current_]);
141   }
value() const142   Slice value() const override {
143     assert(Valid());
144     return Slice(values_[current_]);
145   }
146 
status() const147   Status status() const override { return Status::OK(); }
148 
149   std::vector<Action> log;
150 
151  private:
152   std::vector<std::string> keys_;
153   std::vector<std::string> values_;
154   size_t current_;
155 };
156 
157 class FakeCompaction : public CompactionIterator::CompactionProxy {
158  public:
159   FakeCompaction() = default;
160 
level(size_t) const161   int level(size_t /*compaction_input_level*/) const override { return 0; }
KeyNotExistsBeyondOutputLevel(const Slice &,std::vector<size_t> *) const162   bool KeyNotExistsBeyondOutputLevel(
163       const Slice& /*user_key*/,
164       std::vector<size_t>* /*level_ptrs*/) const override {
165     return is_bottommost_level || key_not_exists_beyond_output_level;
166   }
bottommost_level() const167   bool bottommost_level() const override { return is_bottommost_level; }
number_levels() const168   int number_levels() const override { return 1; }
GetLargestUserKey() const169   Slice GetLargestUserKey() const override {
170     return "\xff\xff\xff\xff\xff\xff\xff\xff\xff";
171   }
allow_ingest_behind() const172   bool allow_ingest_behind() const override { return false; }
173 
preserve_deletes() const174   bool preserve_deletes() const override { return false; }
175 
176   bool key_not_exists_beyond_output_level = false;
177 
178   bool is_bottommost_level = false;
179 };
180 
181 // A simplifed snapshot checker which assumes each snapshot has a global
182 // last visible sequence.
183 class TestSnapshotChecker : public SnapshotChecker {
184  public:
TestSnapshotChecker(SequenceNumber last_committed_sequence,const std::unordered_map<SequenceNumber,SequenceNumber> & snapshots={{}})185   explicit TestSnapshotChecker(
186       SequenceNumber last_committed_sequence,
187       const std::unordered_map<SequenceNumber, SequenceNumber>& snapshots = {{}})
188       : last_committed_sequence_(last_committed_sequence),
189         snapshots_(snapshots) {}
190 
CheckInSnapshot(SequenceNumber seq,SequenceNumber snapshot_seq) const191   SnapshotCheckerResult CheckInSnapshot(
192       SequenceNumber seq, SequenceNumber snapshot_seq) const override {
193     if (snapshot_seq == kMaxSequenceNumber) {
194       return seq <= last_committed_sequence_
195                  ? SnapshotCheckerResult::kInSnapshot
196                  : SnapshotCheckerResult::kNotInSnapshot;
197     }
198     assert(snapshots_.count(snapshot_seq) > 0);
199     return seq <= snapshots_.at(snapshot_seq)
200                ? SnapshotCheckerResult::kInSnapshot
201                : SnapshotCheckerResult::kNotInSnapshot;
202   }
203 
204  private:
205   SequenceNumber last_committed_sequence_;
206   // A map of valid snapshot to last visible sequence to the snapshot.
207   std::unordered_map<SequenceNumber, SequenceNumber> snapshots_;
208 };
209 
210 // Test param:
211 //   bool: whether to pass snapshot_checker to compaction iterator.
212 class CompactionIteratorTest : public testing::TestWithParam<bool> {
213  public:
CompactionIteratorTest()214   CompactionIteratorTest()
215       : cmp_(BytewiseComparator()), icmp_(cmp_), snapshots_({}) {}
216 
InitIterators(const std::vector<std::string> & ks,const std::vector<std::string> & vs,const std::vector<std::string> & range_del_ks,const std::vector<std::string> & range_del_vs,SequenceNumber last_sequence,SequenceNumber last_committed_sequence=kMaxSequenceNumber,MergeOperator * merge_op=nullptr,CompactionFilter * filter=nullptr,bool bottommost_level=false,SequenceNumber earliest_write_conflict_snapshot=kMaxSequenceNumber)217   void InitIterators(
218       const std::vector<std::string>& ks, const std::vector<std::string>& vs,
219       const std::vector<std::string>& range_del_ks,
220       const std::vector<std::string>& range_del_vs,
221       SequenceNumber last_sequence,
222       SequenceNumber last_committed_sequence = kMaxSequenceNumber,
223       MergeOperator* merge_op = nullptr, CompactionFilter* filter = nullptr,
224       bool bottommost_level = false,
225       SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber) {
226     std::unique_ptr<InternalIterator> unfragmented_range_del_iter(
227         new test::VectorIterator(range_del_ks, range_del_vs));
228     auto tombstone_list = std::make_shared<FragmentedRangeTombstoneList>(
229         std::move(unfragmented_range_del_iter), icmp_);
230     std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
231         new FragmentedRangeTombstoneIterator(tombstone_list, icmp_,
232                                              kMaxSequenceNumber));
233     range_del_agg_.reset(new CompactionRangeDelAggregator(&icmp_, snapshots_));
234     range_del_agg_->AddTombstones(std::move(range_del_iter));
235 
236     std::unique_ptr<CompactionIterator::CompactionProxy> compaction;
237     if (filter || bottommost_level) {
238       compaction_proxy_ = new FakeCompaction();
239       compaction_proxy_->is_bottommost_level = bottommost_level;
240       compaction.reset(compaction_proxy_);
241     }
242     bool use_snapshot_checker = UseSnapshotChecker() || GetParam();
243     if (use_snapshot_checker || last_committed_sequence < kMaxSequenceNumber) {
244       snapshot_checker_.reset(
245           new TestSnapshotChecker(last_committed_sequence, snapshot_map_));
246     }
247     merge_helper_.reset(
248         new MergeHelper(Env::Default(), cmp_, merge_op, filter, nullptr, false,
249                         0 /*latest_snapshot*/, snapshot_checker_.get(),
250                         0 /*level*/, nullptr /*statistics*/, &shutting_down_));
251 
252     iter_.reset(new LoggingForwardVectorIterator(ks, vs));
253     iter_->SeekToFirst();
254     c_iter_.reset(new CompactionIterator(
255         iter_.get(), cmp_, merge_helper_.get(), last_sequence, &snapshots_,
256         earliest_write_conflict_snapshot, snapshot_checker_.get(),
257         Env::Default(), false /* report_detailed_time */, false,
258         range_del_agg_.get(), std::move(compaction), filter, &shutting_down_));
259   }
260 
AddSnapshot(SequenceNumber snapshot,SequenceNumber last_visible_seq=kMaxSequenceNumber)261   void AddSnapshot(SequenceNumber snapshot,
262                    SequenceNumber last_visible_seq = kMaxSequenceNumber) {
263     snapshots_.push_back(snapshot);
264     snapshot_map_[snapshot] = last_visible_seq;
265   }
266 
UseSnapshotChecker() const267   virtual bool UseSnapshotChecker() const { return false; }
268 
RunTest(const std::vector<std::string> & input_keys,const std::vector<std::string> & input_values,const std::vector<std::string> & expected_keys,const std::vector<std::string> & expected_values,SequenceNumber last_committed_seq=kMaxSequenceNumber,MergeOperator * merge_operator=nullptr,CompactionFilter * compaction_filter=nullptr,bool bottommost_level=false,SequenceNumber earliest_write_conflict_snapshot=kMaxSequenceNumber)269   void RunTest(
270       const std::vector<std::string>& input_keys,
271       const std::vector<std::string>& input_values,
272       const std::vector<std::string>& expected_keys,
273       const std::vector<std::string>& expected_values,
274       SequenceNumber last_committed_seq = kMaxSequenceNumber,
275       MergeOperator* merge_operator = nullptr,
276       CompactionFilter* compaction_filter = nullptr,
277       bool bottommost_level = false,
278       SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber) {
279     InitIterators(input_keys, input_values, {}, {}, kMaxSequenceNumber,
280                   last_committed_seq, merge_operator, compaction_filter,
281                   bottommost_level, earliest_write_conflict_snapshot);
282     c_iter_->SeekToFirst();
283     for (size_t i = 0; i < expected_keys.size(); i++) {
284       std::string info = "i = " + ToString(i);
285       ASSERT_TRUE(c_iter_->Valid()) << info;
286       ASSERT_OK(c_iter_->status()) << info;
287       ASSERT_EQ(expected_keys[i], c_iter_->key().ToString()) << info;
288       ASSERT_EQ(expected_values[i], c_iter_->value().ToString()) << info;
289       c_iter_->Next();
290     }
291     ASSERT_FALSE(c_iter_->Valid());
292   }
293 
294   const Comparator* cmp_;
295   const InternalKeyComparator icmp_;
296   std::vector<SequenceNumber> snapshots_;
297   // A map of valid snapshot to last visible sequence to the snapshot.
298   std::unordered_map<SequenceNumber, SequenceNumber> snapshot_map_;
299   std::unique_ptr<MergeHelper> merge_helper_;
300   std::unique_ptr<LoggingForwardVectorIterator> iter_;
301   std::unique_ptr<CompactionIterator> c_iter_;
302   std::unique_ptr<CompactionRangeDelAggregator> range_del_agg_;
303   std::unique_ptr<SnapshotChecker> snapshot_checker_;
304   std::atomic<bool> shutting_down_{false};
305   FakeCompaction* compaction_proxy_;
306 };
307 
308 // It is possible that the output of the compaction iterator is empty even if
309 // the input is not.
TEST_P(CompactionIteratorTest,EmptyResult)310 TEST_P(CompactionIteratorTest, EmptyResult) {
311   InitIterators({test::KeyStr("a", 5, kTypeSingleDeletion),
312                  test::KeyStr("a", 3, kTypeValue)},
313                 {"", "val"}, {}, {}, 5);
314   c_iter_->SeekToFirst();
315   ASSERT_FALSE(c_iter_->Valid());
316 }
317 
318 // If there is a corruption after a single deletion, the corrupted key should
319 // be preserved.
TEST_P(CompactionIteratorTest,CorruptionAfterSingleDeletion)320 TEST_P(CompactionIteratorTest, CorruptionAfterSingleDeletion) {
321   InitIterators({test::KeyStr("a", 5, kTypeSingleDeletion),
322                  test::KeyStr("a", 3, kTypeValue, true),
323                  test::KeyStr("b", 10, kTypeValue)},
324                 {"", "val", "val2"}, {}, {}, 10);
325   c_iter_->SeekToFirst();
326   ASSERT_TRUE(c_iter_->Valid());
327   ASSERT_EQ(test::KeyStr("a", 5, kTypeSingleDeletion),
328             c_iter_->key().ToString());
329   c_iter_->Next();
330   ASSERT_TRUE(c_iter_->Valid());
331   ASSERT_EQ(test::KeyStr("a", 3, kTypeValue, true), c_iter_->key().ToString());
332   c_iter_->Next();
333   ASSERT_TRUE(c_iter_->Valid());
334   ASSERT_EQ(test::KeyStr("b", 10, kTypeValue), c_iter_->key().ToString());
335   c_iter_->Next();
336   ASSERT_FALSE(c_iter_->Valid());
337 }
338 
TEST_P(CompactionIteratorTest,SimpleRangeDeletion)339 TEST_P(CompactionIteratorTest, SimpleRangeDeletion) {
340   InitIterators({test::KeyStr("morning", 5, kTypeValue),
341                  test::KeyStr("morning", 2, kTypeValue),
342                  test::KeyStr("night", 3, kTypeValue)},
343                 {"zao", "zao", "wan"},
344                 {test::KeyStr("ma", 4, kTypeRangeDeletion)}, {"mz"}, 5);
345   c_iter_->SeekToFirst();
346   ASSERT_TRUE(c_iter_->Valid());
347   ASSERT_EQ(test::KeyStr("morning", 5, kTypeValue), c_iter_->key().ToString());
348   c_iter_->Next();
349   ASSERT_TRUE(c_iter_->Valid());
350   ASSERT_EQ(test::KeyStr("night", 3, kTypeValue), c_iter_->key().ToString());
351   c_iter_->Next();
352   ASSERT_FALSE(c_iter_->Valid());
353 }
354 
TEST_P(CompactionIteratorTest,RangeDeletionWithSnapshots)355 TEST_P(CompactionIteratorTest, RangeDeletionWithSnapshots) {
356   AddSnapshot(10);
357   std::vector<std::string> ks1;
358   ks1.push_back(test::KeyStr("ma", 28, kTypeRangeDeletion));
359   std::vector<std::string> vs1{"mz"};
360   std::vector<std::string> ks2{test::KeyStr("morning", 15, kTypeValue),
361                                test::KeyStr("morning", 5, kTypeValue),
362                                test::KeyStr("night", 40, kTypeValue),
363                                test::KeyStr("night", 20, kTypeValue)};
364   std::vector<std::string> vs2{"zao 15", "zao 5", "wan 40", "wan 20"};
365   InitIterators(ks2, vs2, ks1, vs1, 40);
366   c_iter_->SeekToFirst();
367   ASSERT_TRUE(c_iter_->Valid());
368   ASSERT_EQ(test::KeyStr("morning", 5, kTypeValue), c_iter_->key().ToString());
369   c_iter_->Next();
370   ASSERT_TRUE(c_iter_->Valid());
371   ASSERT_EQ(test::KeyStr("night", 40, kTypeValue), c_iter_->key().ToString());
372   c_iter_->Next();
373   ASSERT_FALSE(c_iter_->Valid());
374 }
375 
TEST_P(CompactionIteratorTest,CompactionFilterSkipUntil)376 TEST_P(CompactionIteratorTest, CompactionFilterSkipUntil) {
377   class Filter : public CompactionFilter {
378     Decision FilterV2(int /*level*/, const Slice& key, ValueType t,
379                       const Slice& existing_value, std::string* /*new_value*/,
380                       std::string* skip_until) const override {
381       std::string k = key.ToString();
382       std::string v = existing_value.ToString();
383       // See InitIterators() call below for the sequence of keys and their
384       // filtering decisions. Here we closely assert that compaction filter is
385       // called with the expected keys and only them, and with the right values.
386       if (k == "a") {
387         EXPECT_EQ(ValueType::kValue, t);
388         EXPECT_EQ("av50", v);
389         return Decision::kKeep;
390       }
391       if (k == "b") {
392         EXPECT_EQ(ValueType::kValue, t);
393         EXPECT_EQ("bv60", v);
394         *skip_until = "d+";
395         return Decision::kRemoveAndSkipUntil;
396       }
397       if (k == "e") {
398         EXPECT_EQ(ValueType::kMergeOperand, t);
399         EXPECT_EQ("em71", v);
400         return Decision::kKeep;
401       }
402       if (k == "f") {
403         if (v == "fm65") {
404           EXPECT_EQ(ValueType::kMergeOperand, t);
405           *skip_until = "f";
406         } else {
407           EXPECT_EQ("fm30", v);
408           EXPECT_EQ(ValueType::kMergeOperand, t);
409           *skip_until = "g+";
410         }
411         return Decision::kRemoveAndSkipUntil;
412       }
413       if (k == "h") {
414         EXPECT_EQ(ValueType::kValue, t);
415         EXPECT_EQ("hv91", v);
416         return Decision::kKeep;
417       }
418       if (k == "i") {
419         EXPECT_EQ(ValueType::kMergeOperand, t);
420         EXPECT_EQ("im95", v);
421         *skip_until = "z";
422         return Decision::kRemoveAndSkipUntil;
423       }
424       ADD_FAILURE();
425       return Decision::kKeep;
426     }
427 
428     const char* Name() const override {
429       return "CompactionIteratorTest.CompactionFilterSkipUntil::Filter";
430     }
431   };
432 
433   NoMergingMergeOp merge_op;
434   Filter filter;
435   InitIterators(
436       {test::KeyStr("a", 50, kTypeValue),  // keep
437        test::KeyStr("a", 45, kTypeMerge),
438        test::KeyStr("b", 60, kTypeValue),  // skip to "d+"
439        test::KeyStr("b", 40, kTypeValue), test::KeyStr("c", 35, kTypeValue),
440        test::KeyStr("d", 70, kTypeMerge),
441        test::KeyStr("e", 71, kTypeMerge),  // keep
442        test::KeyStr("f", 65, kTypeMerge),  // skip to "f", aka keep
443        test::KeyStr("f", 30, kTypeMerge),  // skip to "g+"
444        test::KeyStr("f", 25, kTypeValue), test::KeyStr("g", 90, kTypeValue),
445        test::KeyStr("h", 91, kTypeValue),  // keep
446        test::KeyStr("i", 95, kTypeMerge),  // skip to "z"
447        test::KeyStr("j", 99, kTypeValue)},
448       {"av50", "am45", "bv60", "bv40", "cv35", "dm70", "em71", "fm65", "fm30",
449        "fv25", "gv90", "hv91", "im95", "jv99"},
450       {}, {}, kMaxSequenceNumber, kMaxSequenceNumber, &merge_op, &filter);
451 
452   // Compaction should output just "a", "e" and "h" keys.
453   c_iter_->SeekToFirst();
454   ASSERT_TRUE(c_iter_->Valid());
455   ASSERT_EQ(test::KeyStr("a", 50, kTypeValue), c_iter_->key().ToString());
456   ASSERT_EQ("av50", c_iter_->value().ToString());
457   c_iter_->Next();
458   ASSERT_TRUE(c_iter_->Valid());
459   ASSERT_EQ(test::KeyStr("e", 71, kTypeMerge), c_iter_->key().ToString());
460   ASSERT_EQ("em71", c_iter_->value().ToString());
461   c_iter_->Next();
462   ASSERT_TRUE(c_iter_->Valid());
463   ASSERT_EQ(test::KeyStr("h", 91, kTypeValue), c_iter_->key().ToString());
464   ASSERT_EQ("hv91", c_iter_->value().ToString());
465   c_iter_->Next();
466   ASSERT_FALSE(c_iter_->Valid());
467 
468   // Check that the compaction iterator did the correct sequence of calls on
469   // the underlying iterator.
470   using A = LoggingForwardVectorIterator::Action;
471   using T = A::Type;
472   std::vector<A> expected_actions = {
473       A(T::SEEK_TO_FIRST),
474       A(T::NEXT),
475       A(T::NEXT),
476       A(T::SEEK, test::KeyStr("d+", kMaxSequenceNumber, kValueTypeForSeek)),
477       A(T::NEXT),
478       A(T::NEXT),
479       A(T::SEEK, test::KeyStr("g+", kMaxSequenceNumber, kValueTypeForSeek)),
480       A(T::NEXT),
481       A(T::SEEK, test::KeyStr("z", kMaxSequenceNumber, kValueTypeForSeek))};
482   ASSERT_EQ(expected_actions, iter_->log);
483 }
484 
TEST_P(CompactionIteratorTest,ShuttingDownInFilter)485 TEST_P(CompactionIteratorTest, ShuttingDownInFilter) {
486   NoMergingMergeOp merge_op;
487   StallingFilter filter;
488   InitIterators(
489       {test::KeyStr("1", 1, kTypeValue), test::KeyStr("2", 2, kTypeValue),
490        test::KeyStr("3", 3, kTypeValue), test::KeyStr("4", 4, kTypeValue)},
491       {"v1", "v2", "v3", "v4"}, {}, {}, kMaxSequenceNumber, kMaxSequenceNumber,
492       &merge_op, &filter);
493   // Don't leave tombstones (kTypeDeletion) for filtered keys.
494   compaction_proxy_->key_not_exists_beyond_output_level = true;
495 
496   std::atomic<bool> seek_done{false};
497   ROCKSDB_NAMESPACE::port::Thread compaction_thread([&] {
498     c_iter_->SeekToFirst();
499     EXPECT_FALSE(c_iter_->Valid());
500     EXPECT_TRUE(c_iter_->status().IsShutdownInProgress());
501     seek_done.store(true);
502   });
503 
504   // Let key 1 through.
505   filter.WaitForStall(1);
506 
507   // Shutdown during compaction filter call for key 2.
508   filter.WaitForStall(2);
509   shutting_down_.store(true);
510   EXPECT_FALSE(seek_done.load());
511 
512   // Unstall filter and wait for SeekToFirst() to return.
513   filter.stall_at.store(3);
514   compaction_thread.join();
515   assert(seek_done.load());
516 
517   // Check that filter was never called again.
518   EXPECT_EQ(2, filter.last_seen.load());
519 }
520 
521 // Same as ShuttingDownInFilter, but shutdown happens during filter call for
522 // a merge operand, not for a value.
TEST_P(CompactionIteratorTest,ShuttingDownInMerge)523 TEST_P(CompactionIteratorTest, ShuttingDownInMerge) {
524   NoMergingMergeOp merge_op;
525   StallingFilter filter;
526   InitIterators(
527       {test::KeyStr("1", 1, kTypeValue), test::KeyStr("2", 2, kTypeMerge),
528        test::KeyStr("3", 3, kTypeMerge), test::KeyStr("4", 4, kTypeValue)},
529       {"v1", "v2", "v3", "v4"}, {}, {}, kMaxSequenceNumber, kMaxSequenceNumber,
530       &merge_op, &filter);
531   compaction_proxy_->key_not_exists_beyond_output_level = true;
532 
533   std::atomic<bool> seek_done{false};
534   ROCKSDB_NAMESPACE::port::Thread compaction_thread([&] {
535     c_iter_->SeekToFirst();
536     ASSERT_FALSE(c_iter_->Valid());
537     ASSERT_TRUE(c_iter_->status().IsShutdownInProgress());
538     seek_done.store(true);
539   });
540 
541   // Let key 1 through.
542   filter.WaitForStall(1);
543 
544   // Shutdown during compaction filter call for key 2.
545   filter.WaitForStall(2);
546   shutting_down_.store(true);
547   EXPECT_FALSE(seek_done.load());
548 
549   // Unstall filter and wait for SeekToFirst() to return.
550   filter.stall_at.store(3);
551   compaction_thread.join();
552   assert(seek_done.load());
553 
554   // Check that filter was never called again.
555   EXPECT_EQ(2, filter.last_seen.load());
556 }
557 
TEST_P(CompactionIteratorTest,SingleMergeOperand)558 TEST_P(CompactionIteratorTest, SingleMergeOperand) {
559   class Filter : public CompactionFilter {
560     Decision FilterV2(int /*level*/, const Slice& key, ValueType t,
561                       const Slice& existing_value, std::string* /*new_value*/,
562                       std::string* /*skip_until*/) const override {
563       std::string k = key.ToString();
564       std::string v = existing_value.ToString();
565 
566       // See InitIterators() call below for the sequence of keys and their
567       // filtering decisions. Here we closely assert that compaction filter is
568       // called with the expected keys and only them, and with the right values.
569       if (k == "a") {
570         EXPECT_EQ(ValueType::kMergeOperand, t);
571         EXPECT_EQ("av1", v);
572         return Decision::kKeep;
573       } else if (k == "b") {
574         EXPECT_EQ(ValueType::kMergeOperand, t);
575         return Decision::kKeep;
576       } else if (k == "c") {
577         return Decision::kKeep;
578       }
579 
580       ADD_FAILURE();
581       return Decision::kKeep;
582     }
583 
584     const char* Name() const override {
585       return "CompactionIteratorTest.SingleMergeOperand::Filter";
586     }
587   };
588 
589   class SingleMergeOp : public MergeOperator {
590    public:
591     bool FullMergeV2(const MergeOperationInput& merge_in,
592                      MergeOperationOutput* merge_out) const override {
593       // See InitIterators() call below for why "c" is the only key for which
594       // FullMergeV2 should be called.
595       EXPECT_EQ("c", merge_in.key.ToString());
596 
597       std::string temp_value;
598       if (merge_in.existing_value != nullptr) {
599         temp_value = merge_in.existing_value->ToString();
600       }
601 
602       for (auto& operand : merge_in.operand_list) {
603         temp_value.append(operand.ToString());
604       }
605       merge_out->new_value = temp_value;
606 
607       return true;
608     }
609 
610     bool PartialMergeMulti(const Slice& key,
611                            const std::deque<Slice>& operand_list,
612                            std::string* new_value,
613                            Logger* /*logger*/) const override {
614       std::string string_key = key.ToString();
615       EXPECT_TRUE(string_key == "a" || string_key == "b");
616 
617       if (string_key == "a") {
618         EXPECT_EQ(1, operand_list.size());
619       } else if (string_key == "b") {
620         EXPECT_EQ(2, operand_list.size());
621       }
622 
623       std::string temp_value;
624       for (auto& operand : operand_list) {
625         temp_value.append(operand.ToString());
626       }
627       swap(temp_value, *new_value);
628 
629       return true;
630     }
631 
632     const char* Name() const override {
633       return "CompactionIteratorTest SingleMergeOp";
634     }
635 
636     bool AllowSingleOperand() const override { return true; }
637   };
638 
639   SingleMergeOp merge_op;
640   Filter filter;
641   InitIterators(
642       // a should invoke PartialMergeMulti with a single merge operand.
643       {test::KeyStr("a", 50, kTypeMerge),
644        // b should invoke PartialMergeMulti with two operands.
645        test::KeyStr("b", 70, kTypeMerge), test::KeyStr("b", 60, kTypeMerge),
646        // c should invoke FullMerge due to kTypeValue at the beginning.
647        test::KeyStr("c", 90, kTypeMerge), test::KeyStr("c", 80, kTypeValue)},
648       {"av1", "bv2", "bv1", "cv2", "cv1"}, {}, {}, kMaxSequenceNumber,
649       kMaxSequenceNumber, &merge_op, &filter);
650 
651   c_iter_->SeekToFirst();
652   ASSERT_TRUE(c_iter_->Valid());
653   ASSERT_EQ(test::KeyStr("a", 50, kTypeMerge), c_iter_->key().ToString());
654   ASSERT_EQ("av1", c_iter_->value().ToString());
655   c_iter_->Next();
656   ASSERT_TRUE(c_iter_->Valid());
657   ASSERT_EQ("bv1bv2", c_iter_->value().ToString());
658   c_iter_->Next();
659   ASSERT_EQ("cv1cv2", c_iter_->value().ToString());
660 }
661 
662 // In bottommost level, values earlier than earliest snapshot can be output
663 // with sequence = 0.
TEST_P(CompactionIteratorTest,ZeroOutSequenceAtBottomLevel)664 TEST_P(CompactionIteratorTest, ZeroOutSequenceAtBottomLevel) {
665   AddSnapshot(1);
666   RunTest({test::KeyStr("a", 1, kTypeValue), test::KeyStr("b", 2, kTypeValue)},
667           {"v1", "v2"},
668           {test::KeyStr("a", 0, kTypeValue), test::KeyStr("b", 2, kTypeValue)},
669           {"v1", "v2"}, kMaxSequenceNumber /*last_commited_seq*/,
670           nullptr /*merge_operator*/, nullptr /*compaction_filter*/,
671           true /*bottommost_level*/);
672 }
673 
674 // In bottommost level, deletions earlier than earliest snapshot can be removed
675 // permanently.
TEST_P(CompactionIteratorTest,RemoveDeletionAtBottomLevel)676 TEST_P(CompactionIteratorTest, RemoveDeletionAtBottomLevel) {
677   AddSnapshot(1);
678   RunTest({test::KeyStr("a", 1, kTypeDeletion),
679            test::KeyStr("b", 3, kTypeDeletion),
680            test::KeyStr("b", 1, kTypeValue)},
681           {"", "", ""},
682           {test::KeyStr("b", 3, kTypeDeletion),
683            test::KeyStr("b", 0, kTypeValue)},
684           {"", ""},
685           kMaxSequenceNumber /*last_commited_seq*/, nullptr /*merge_operator*/,
686           nullptr /*compaction_filter*/, true /*bottommost_level*/);
687 }
688 
689 // In bottommost level, single deletions earlier than earliest snapshot can be
690 // removed permanently.
TEST_P(CompactionIteratorTest,RemoveSingleDeletionAtBottomLevel)691 TEST_P(CompactionIteratorTest, RemoveSingleDeletionAtBottomLevel) {
692   AddSnapshot(1);
693   RunTest({test::KeyStr("a", 1, kTypeSingleDeletion),
694            test::KeyStr("b", 2, kTypeSingleDeletion)},
695           {"", ""}, {test::KeyStr("b", 2, kTypeSingleDeletion)}, {""},
696           kMaxSequenceNumber /*last_commited_seq*/, nullptr /*merge_operator*/,
697           nullptr /*compaction_filter*/, true /*bottommost_level*/);
698 }
699 
700 INSTANTIATE_TEST_CASE_P(CompactionIteratorTestInstance, CompactionIteratorTest,
701                         testing::Values(true, false));
702 
703 // Tests how CompactionIterator work together with SnapshotChecker.
704 class CompactionIteratorWithSnapshotCheckerTest
705     : public CompactionIteratorTest {
706  public:
UseSnapshotChecker() const707   bool UseSnapshotChecker() const override { return true; }
708 };
709 
710 // Uncommitted keys (keys with seq > last_committed_seq) should be output as-is
711 // while committed version of these keys should get compacted as usual.
712 
TEST_F(CompactionIteratorWithSnapshotCheckerTest,PreserveUncommittedKeys_Value)713 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
714        PreserveUncommittedKeys_Value) {
715   RunTest(
716       {test::KeyStr("foo", 3, kTypeValue), test::KeyStr("foo", 2, kTypeValue),
717        test::KeyStr("foo", 1, kTypeValue)},
718       {"v3", "v2", "v1"},
719       {test::KeyStr("foo", 3, kTypeValue), test::KeyStr("foo", 2, kTypeValue)},
720       {"v3", "v2"}, 2 /*last_committed_seq*/);
721 }
722 
TEST_F(CompactionIteratorWithSnapshotCheckerTest,PreserveUncommittedKeys_Deletion)723 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
724        PreserveUncommittedKeys_Deletion) {
725   RunTest({test::KeyStr("foo", 2, kTypeDeletion),
726            test::KeyStr("foo", 1, kTypeValue)},
727           {"", "v1"},
728           {test::KeyStr("foo", 2, kTypeDeletion),
729            test::KeyStr("foo", 1, kTypeValue)},
730           {"", "v1"}, 1 /*last_committed_seq*/);
731 }
732 
TEST_F(CompactionIteratorWithSnapshotCheckerTest,PreserveUncommittedKeys_Merge)733 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
734        PreserveUncommittedKeys_Merge) {
735   auto merge_op = MergeOperators::CreateStringAppendOperator();
736   RunTest(
737       {test::KeyStr("foo", 3, kTypeMerge), test::KeyStr("foo", 2, kTypeMerge),
738        test::KeyStr("foo", 1, kTypeValue)},
739       {"v3", "v2", "v1"},
740       {test::KeyStr("foo", 3, kTypeMerge), test::KeyStr("foo", 2, kTypeValue)},
741       {"v3", "v1,v2"}, 2 /*last_committed_seq*/, merge_op.get());
742 }
743 
TEST_F(CompactionIteratorWithSnapshotCheckerTest,PreserveUncommittedKeys_SingleDelete)744 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
745        PreserveUncommittedKeys_SingleDelete) {
746   RunTest({test::KeyStr("foo", 2, kTypeSingleDeletion),
747            test::KeyStr("foo", 1, kTypeValue)},
748           {"", "v1"},
749           {test::KeyStr("foo", 2, kTypeSingleDeletion),
750            test::KeyStr("foo", 1, kTypeValue)},
751           {"", "v1"}, 1 /*last_committed_seq*/);
752 }
753 
TEST_F(CompactionIteratorWithSnapshotCheckerTest,PreserveUncommittedKeys_BlobIndex)754 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
755        PreserveUncommittedKeys_BlobIndex) {
756   RunTest({test::KeyStr("foo", 3, kTypeBlobIndex),
757            test::KeyStr("foo", 2, kTypeBlobIndex),
758            test::KeyStr("foo", 1, kTypeBlobIndex)},
759           {"v3", "v2", "v1"},
760           {test::KeyStr("foo", 3, kTypeBlobIndex),
761            test::KeyStr("foo", 2, kTypeBlobIndex)},
762           {"v3", "v2"}, 2 /*last_committed_seq*/);
763 }
764 
765 // Test compaction iterator dedup keys visible to the same snapshot.
766 
TEST_F(CompactionIteratorWithSnapshotCheckerTest,DedupSameSnapshot_Value)767 TEST_F(CompactionIteratorWithSnapshotCheckerTest, DedupSameSnapshot_Value) {
768   AddSnapshot(2, 1);
769   RunTest(
770       {test::KeyStr("foo", 4, kTypeValue), test::KeyStr("foo", 3, kTypeValue),
771        test::KeyStr("foo", 2, kTypeValue), test::KeyStr("foo", 1, kTypeValue)},
772       {"v4", "v3", "v2", "v1"},
773       {test::KeyStr("foo", 4, kTypeValue), test::KeyStr("foo", 3, kTypeValue),
774        test::KeyStr("foo", 1, kTypeValue)},
775       {"v4", "v3", "v1"}, 3 /*last_committed_seq*/);
776 }
777 
TEST_F(CompactionIteratorWithSnapshotCheckerTest,DedupSameSnapshot_Deletion)778 TEST_F(CompactionIteratorWithSnapshotCheckerTest, DedupSameSnapshot_Deletion) {
779   AddSnapshot(2, 1);
780   RunTest(
781       {test::KeyStr("foo", 4, kTypeValue),
782        test::KeyStr("foo", 3, kTypeDeletion),
783        test::KeyStr("foo", 2, kTypeValue), test::KeyStr("foo", 1, kTypeValue)},
784       {"v4", "", "v2", "v1"},
785       {test::KeyStr("foo", 4, kTypeValue),
786        test::KeyStr("foo", 3, kTypeDeletion),
787        test::KeyStr("foo", 1, kTypeValue)},
788       {"v4", "", "v1"}, 3 /*last_committed_seq*/);
789 }
790 
TEST_F(CompactionIteratorWithSnapshotCheckerTest,DedupSameSnapshot_Merge)791 TEST_F(CompactionIteratorWithSnapshotCheckerTest, DedupSameSnapshot_Merge) {
792   AddSnapshot(2, 1);
793   AddSnapshot(4, 3);
794   auto merge_op = MergeOperators::CreateStringAppendOperator();
795   RunTest(
796       {test::KeyStr("foo", 5, kTypeMerge), test::KeyStr("foo", 4, kTypeMerge),
797        test::KeyStr("foo", 3, kTypeMerge), test::KeyStr("foo", 2, kTypeMerge),
798        test::KeyStr("foo", 1, kTypeValue)},
799       {"v5", "v4", "v3", "v2", "v1"},
800       {test::KeyStr("foo", 5, kTypeMerge), test::KeyStr("foo", 4, kTypeMerge),
801        test::KeyStr("foo", 3, kTypeMerge), test::KeyStr("foo", 1, kTypeValue)},
802       {"v5", "v4", "v2,v3", "v1"}, 4 /*last_committed_seq*/, merge_op.get());
803 }
804 
TEST_F(CompactionIteratorWithSnapshotCheckerTest,DedupSameSnapshot_SingleDeletion)805 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
806        DedupSameSnapshot_SingleDeletion) {
807   AddSnapshot(2, 1);
808   RunTest(
809       {test::KeyStr("foo", 4, kTypeValue),
810        test::KeyStr("foo", 3, kTypeSingleDeletion),
811        test::KeyStr("foo", 2, kTypeValue), test::KeyStr("foo", 1, kTypeValue)},
812       {"v4", "", "v2", "v1"},
813       {test::KeyStr("foo", 4, kTypeValue), test::KeyStr("foo", 1, kTypeValue)},
814       {"v4", "v1"}, 3 /*last_committed_seq*/);
815 }
816 
TEST_F(CompactionIteratorWithSnapshotCheckerTest,DedupSameSnapshot_BlobIndex)817 TEST_F(CompactionIteratorWithSnapshotCheckerTest, DedupSameSnapshot_BlobIndex) {
818   AddSnapshot(2, 1);
819   RunTest({test::KeyStr("foo", 4, kTypeBlobIndex),
820            test::KeyStr("foo", 3, kTypeBlobIndex),
821            test::KeyStr("foo", 2, kTypeBlobIndex),
822            test::KeyStr("foo", 1, kTypeBlobIndex)},
823           {"v4", "v3", "v2", "v1"},
824           {test::KeyStr("foo", 4, kTypeBlobIndex),
825            test::KeyStr("foo", 3, kTypeBlobIndex),
826            test::KeyStr("foo", 1, kTypeBlobIndex)},
827           {"v4", "v3", "v1"}, 3 /*last_committed_seq*/);
828 }
829 
830 // At bottom level, sequence numbers can be zero out, and deletions can be
831 // removed, but only when they are visible to earliest snapshot.
832 
TEST_F(CompactionIteratorWithSnapshotCheckerTest,NotZeroOutSequenceIfNotVisibleToEarliestSnapshot)833 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
834        NotZeroOutSequenceIfNotVisibleToEarliestSnapshot) {
835   AddSnapshot(2, 1);
836   RunTest({test::KeyStr("a", 1, kTypeValue), test::KeyStr("b", 2, kTypeValue),
837            test::KeyStr("c", 3, kTypeValue)},
838           {"v1", "v2", "v3"},
839           {test::KeyStr("a", 0, kTypeValue), test::KeyStr("b", 2, kTypeValue),
840            test::KeyStr("c", 3, kTypeValue)},
841           {"v1", "v2", "v3"}, kMaxSequenceNumber /*last_commited_seq*/,
842           nullptr /*merge_operator*/, nullptr /*compaction_filter*/,
843           true /*bottommost_level*/);
844 }
845 
TEST_F(CompactionIteratorWithSnapshotCheckerTest,NotRemoveDeletionIfNotVisibleToEarliestSnapshot)846 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
847        NotRemoveDeletionIfNotVisibleToEarliestSnapshot) {
848   AddSnapshot(2, 1);
849   RunTest(
850       {test::KeyStr("a", 1, kTypeDeletion), test::KeyStr("b", 2, kTypeDeletion),
851        test::KeyStr("c", 3, kTypeDeletion)},
852       {"", "", ""},
853       {},
854       {"", ""}, kMaxSequenceNumber /*last_commited_seq*/,
855       nullptr /*merge_operator*/, nullptr /*compaction_filter*/,
856       true /*bottommost_level*/);
857 }
858 
TEST_F(CompactionIteratorWithSnapshotCheckerTest,NotRemoveDeletionIfValuePresentToEarlierSnapshot)859 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
860        NotRemoveDeletionIfValuePresentToEarlierSnapshot) {
861   AddSnapshot(2,1);
862   RunTest(
863       {test::KeyStr("a", 4, kTypeDeletion), test::KeyStr("a", 1, kTypeValue),
864           test::KeyStr("b", 3, kTypeValue)},
865       {"", "", ""},
866       {test::KeyStr("a", 4, kTypeDeletion), test::KeyStr("a", 0, kTypeValue),
867             test::KeyStr("b", 3, kTypeValue)},
868       {"", "", ""}, kMaxSequenceNumber /*last_commited_seq*/,
869       nullptr /*merge_operator*/, nullptr /*compaction_filter*/,
870       true /*bottommost_level*/);
871 }
872 
TEST_F(CompactionIteratorWithSnapshotCheckerTest,NotRemoveSingleDeletionIfNotVisibleToEarliestSnapshot)873 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
874        NotRemoveSingleDeletionIfNotVisibleToEarliestSnapshot) {
875   AddSnapshot(2, 1);
876   RunTest({test::KeyStr("a", 1, kTypeSingleDeletion),
877            test::KeyStr("b", 2, kTypeSingleDeletion),
878            test::KeyStr("c", 3, kTypeSingleDeletion)},
879           {"", "", ""},
880           {test::KeyStr("b", 2, kTypeSingleDeletion),
881            test::KeyStr("c", 3, kTypeSingleDeletion)},
882           {"", ""}, kMaxSequenceNumber /*last_commited_seq*/,
883           nullptr /*merge_operator*/, nullptr /*compaction_filter*/,
884           true /*bottommost_level*/);
885 }
886 
887 // Single delete should not cancel out values that not visible to the
888 // same set of snapshots
TEST_F(CompactionIteratorWithSnapshotCheckerTest,SingleDeleteAcrossSnapshotBoundary)889 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
890        SingleDeleteAcrossSnapshotBoundary) {
891   AddSnapshot(2, 1);
892   RunTest({test::KeyStr("a", 2, kTypeSingleDeletion),
893            test::KeyStr("a", 1, kTypeValue)},
894           {"", "v1"},
895           {test::KeyStr("a", 2, kTypeSingleDeletion),
896            test::KeyStr("a", 1, kTypeValue)},
897           {"", "v1"}, 2 /*last_committed_seq*/);
898 }
899 
900 // Single delete should be kept in case it is not visible to the
901 // earliest write conflict snapshot. If a single delete is kept for this reason,
902 // corresponding value can be trimmed to save space.
TEST_F(CompactionIteratorWithSnapshotCheckerTest,KeepSingleDeletionForWriteConflictChecking)903 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
904        KeepSingleDeletionForWriteConflictChecking) {
905   AddSnapshot(2, 0);
906   RunTest({test::KeyStr("a", 2, kTypeSingleDeletion),
907            test::KeyStr("a", 1, kTypeValue)},
908           {"", "v1"},
909           {test::KeyStr("a", 2, kTypeSingleDeletion),
910            test::KeyStr("a", 1, kTypeValue)},
911           {"", ""}, 2 /*last_committed_seq*/, nullptr /*merge_operator*/,
912           nullptr /*compaction_filter*/, false /*bottommost_level*/,
913           2 /*earliest_write_conflict_snapshot*/);
914 }
915 
916 // Compaction filter should keep uncommitted key as-is, and
917 //   * Convert the latest velue to deletion, and/or
918 //   * if latest value is a merge, apply filter to all suequent merges.
919 
TEST_F(CompactionIteratorWithSnapshotCheckerTest,CompactionFilter_Value)920 TEST_F(CompactionIteratorWithSnapshotCheckerTest, CompactionFilter_Value) {
921   std::unique_ptr<CompactionFilter> compaction_filter(
922       new FilterAllKeysCompactionFilter());
923   RunTest(
924       {test::KeyStr("a", 2, kTypeValue), test::KeyStr("a", 1, kTypeValue),
925        test::KeyStr("b", 3, kTypeValue), test::KeyStr("c", 1, kTypeValue)},
926       {"v2", "v1", "v3", "v4"},
927       {test::KeyStr("a", 2, kTypeValue), test::KeyStr("a", 1, kTypeDeletion),
928        test::KeyStr("b", 3, kTypeValue), test::KeyStr("c", 1, kTypeDeletion)},
929       {"v2", "", "v3", ""}, 1 /*last_committed_seq*/,
930       nullptr /*merge_operator*/, compaction_filter.get());
931 }
932 
TEST_F(CompactionIteratorWithSnapshotCheckerTest,CompactionFilter_Deletion)933 TEST_F(CompactionIteratorWithSnapshotCheckerTest, CompactionFilter_Deletion) {
934   std::unique_ptr<CompactionFilter> compaction_filter(
935       new FilterAllKeysCompactionFilter());
936   RunTest(
937       {test::KeyStr("a", 2, kTypeDeletion), test::KeyStr("a", 1, kTypeValue)},
938       {"", "v1"},
939       {test::KeyStr("a", 2, kTypeDeletion),
940        test::KeyStr("a", 1, kTypeDeletion)},
941       {"", ""}, 1 /*last_committed_seq*/, nullptr /*merge_operator*/,
942       compaction_filter.get());
943 }
944 
TEST_F(CompactionIteratorWithSnapshotCheckerTest,CompactionFilter_PartialMerge)945 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
946        CompactionFilter_PartialMerge) {
947   std::shared_ptr<MergeOperator> merge_op =
948       MergeOperators::CreateStringAppendOperator();
949   std::unique_ptr<CompactionFilter> compaction_filter(
950       new FilterAllKeysCompactionFilter());
951   RunTest({test::KeyStr("a", 3, kTypeMerge), test::KeyStr("a", 2, kTypeMerge),
952            test::KeyStr("a", 1, kTypeMerge)},
953           {"v3", "v2", "v1"}, {test::KeyStr("a", 3, kTypeMerge)}, {"v3"},
954           2 /*last_committed_seq*/, merge_op.get(), compaction_filter.get());
955 }
956 
TEST_F(CompactionIteratorWithSnapshotCheckerTest,CompactionFilter_FullMerge)957 TEST_F(CompactionIteratorWithSnapshotCheckerTest, CompactionFilter_FullMerge) {
958   std::shared_ptr<MergeOperator> merge_op =
959       MergeOperators::CreateStringAppendOperator();
960   std::unique_ptr<CompactionFilter> compaction_filter(
961       new FilterAllKeysCompactionFilter());
962   RunTest(
963       {test::KeyStr("a", 3, kTypeMerge), test::KeyStr("a", 2, kTypeMerge),
964        test::KeyStr("a", 1, kTypeValue)},
965       {"v3", "v2", "v1"},
966       {test::KeyStr("a", 3, kTypeMerge), test::KeyStr("a", 1, kTypeDeletion)},
967       {"v3", ""}, 2 /*last_committed_seq*/, merge_op.get(),
968       compaction_filter.get());
969 }
970 
971 }  // namespace ROCKSDB_NAMESPACE
972 
main(int argc,char ** argv)973 int main(int argc, char** argv) {
974   ::testing::InitGoogleTest(&argc, argv);
975   return RUN_ALL_TESTS();
976 }
977