1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 
7 #include <string>
8 #include <vector>
9 
10 #include "db/compaction/compaction_iterator.h"
11 #include "port/port.h"
12 #include "test_util/testharness.h"
13 #include "test_util/testutil.h"
14 #include "util/string_util.h"
15 #include "utilities/merge_operators.h"
16 
17 namespace ROCKSDB_NAMESPACE {
18 
19 // Expects no merging attempts.
20 class NoMergingMergeOp : public MergeOperator {
21  public:
FullMergeV2(const MergeOperationInput &,MergeOperationOutput *) const22   bool FullMergeV2(const MergeOperationInput& /*merge_in*/,
23                    MergeOperationOutput* /*merge_out*/) const override {
24     ADD_FAILURE();
25     return false;
26   }
PartialMergeMulti(const Slice &,const std::deque<Slice> &,std::string *,Logger *) const27   bool PartialMergeMulti(const Slice& /*key*/,
28                          const std::deque<Slice>& /*operand_list*/,
29                          std::string* /*new_value*/,
30                          Logger* /*logger*/) const override {
31     ADD_FAILURE();
32     return false;
33   }
Name() const34   const char* Name() const override {
35     return "CompactionIteratorTest NoMergingMergeOp";
36   }
37 };
38 
39 // Compaction filter that gets stuck when it sees a particular key,
40 // then gets unstuck when told to.
41 // Always returns Decision::kRemove.
42 class StallingFilter : public CompactionFilter {
43  public:
FilterV2(int,const Slice & key,ValueType,const Slice &,std::string *,std::string *) const44   Decision FilterV2(int /*level*/, const Slice& key, ValueType /*type*/,
45                     const Slice& /*existing_value*/, std::string* /*new_value*/,
46                     std::string* /*skip_until*/) const override {
47     int k = std::atoi(key.ToString().c_str());
48     last_seen.store(k);
49     while (k >= stall_at.load()) {
50       std::this_thread::yield();
51     }
52     return Decision::kRemove;
53   }
54 
Name() const55   const char* Name() const override {
56     return "CompactionIteratorTest StallingFilter";
57   }
58 
59   // Wait until the filter sees a key >= k and stalls at that key.
60   // If `exact`, asserts that the seen key is equal to k.
WaitForStall(int k,bool exact=true)61   void WaitForStall(int k, bool exact = true) {
62     stall_at.store(k);
63     while (last_seen.load() < k) {
64       std::this_thread::yield();
65     }
66     if (exact) {
67       EXPECT_EQ(k, last_seen.load());
68     }
69   }
70 
71   // Filter will stall on key >= stall_at. Advance stall_at to unstall.
72   mutable std::atomic<int> stall_at{0};
73   // Last key the filter was called with.
74   mutable std::atomic<int> last_seen{0};
75 };
76 
77 // Compaction filter that filter out all keys.
78 class FilterAllKeysCompactionFilter : public CompactionFilter {
79  public:
FilterV2(int,const Slice &,ValueType,const Slice &,std::string *,std::string *) const80   Decision FilterV2(int /*level*/, const Slice& /*key*/, ValueType /*type*/,
81                     const Slice& /*existing_value*/, std::string* /*new_value*/,
82                     std::string* /*skip_until*/) const override {
83     return Decision::kRemove;
84   }
85 
Name() const86   const char* Name() const override { return "AllKeysCompactionFilter"; }
87 };
88 
89 class LoggingForwardVectorIterator : public InternalIterator {
90  public:
91   struct Action {
92     enum class Type {
93       SEEK_TO_FIRST,
94       SEEK,
95       NEXT,
96     };
97 
98     Type type;
99     std::string arg;
100 
ActionROCKSDB_NAMESPACE::LoggingForwardVectorIterator::Action101     explicit Action(Type _type, std::string _arg = "")
102         : type(_type), arg(_arg) {}
103 
operator ==ROCKSDB_NAMESPACE::LoggingForwardVectorIterator::Action104     bool operator==(const Action& rhs) const {
105       return std::tie(type, arg) == std::tie(rhs.type, rhs.arg);
106     }
107   };
108 
LoggingForwardVectorIterator(const std::vector<std::string> & keys,const std::vector<std::string> & values)109   LoggingForwardVectorIterator(const std::vector<std::string>& keys,
110                                const std::vector<std::string>& values)
111       : keys_(keys), values_(values), current_(keys.size()) {
112     assert(keys_.size() == values_.size());
113   }
114 
Valid() const115   bool Valid() const override { return current_ < keys_.size(); }
116 
SeekToFirst()117   void SeekToFirst() override {
118     log.emplace_back(Action::Type::SEEK_TO_FIRST);
119     current_ = 0;
120   }
SeekToLast()121   void SeekToLast() override { assert(false); }
122 
Seek(const Slice & target)123   void Seek(const Slice& target) override {
124     log.emplace_back(Action::Type::SEEK, target.ToString());
125     current_ = std::lower_bound(keys_.begin(), keys_.end(), target.ToString()) -
126                keys_.begin();
127   }
128 
SeekForPrev(const Slice &)129   void SeekForPrev(const Slice& /*target*/) override { assert(false); }
130 
Next()131   void Next() override {
132     assert(Valid());
133     log.emplace_back(Action::Type::NEXT);
134     current_++;
135   }
Prev()136   void Prev() override { assert(false); }
137 
key() const138   Slice key() const override {
139     assert(Valid());
140     return Slice(keys_[current_]);
141   }
value() const142   Slice value() const override {
143     assert(Valid());
144     return Slice(values_[current_]);
145   }
146 
status() const147   Status status() const override { return Status::OK(); }
148 
149   std::vector<Action> log;
150 
151  private:
152   std::vector<std::string> keys_;
153   std::vector<std::string> values_;
154   size_t current_;
155 };
156 
157 class FakeCompaction : public CompactionIterator::CompactionProxy {
158  public:
level() const159   int level() const override { return 0; }
160 
KeyNotExistsBeyondOutputLevel(const Slice &,std::vector<size_t> *) const161   bool KeyNotExistsBeyondOutputLevel(
162       const Slice& /*user_key*/,
163       std::vector<size_t>* /*level_ptrs*/) const override {
164     return is_bottommost_level || key_not_exists_beyond_output_level;
165   }
166 
bottommost_level() const167   bool bottommost_level() const override { return is_bottommost_level; }
168 
number_levels() const169   int number_levels() const override { return 1; }
170 
GetLargestUserKey() const171   Slice GetLargestUserKey() const override {
172     return "\xff\xff\xff\xff\xff\xff\xff\xff\xff";
173   }
174 
allow_ingest_behind() const175   bool allow_ingest_behind() const override { return is_allow_ingest_behind; }
176 
preserve_deletes() const177   bool preserve_deletes() const override { return false; }
178 
enable_blob_garbage_collection() const179   bool enable_blob_garbage_collection() const override { return false; }
180 
blob_garbage_collection_age_cutoff() const181   double blob_garbage_collection_age_cutoff() const override { return 0.0; }
182 
input_version() const183   Version* input_version() const override { return nullptr; }
184 
185   bool key_not_exists_beyond_output_level = false;
186 
187   bool is_bottommost_level = false;
188 
189   bool is_allow_ingest_behind = false;
190 };
191 
192 // A simplified snapshot checker which assumes each snapshot has a global
193 // last visible sequence.
194 class TestSnapshotChecker : public SnapshotChecker {
195  public:
TestSnapshotChecker(SequenceNumber last_committed_sequence,const std::unordered_map<SequenceNumber,SequenceNumber> & snapshots={{}})196   explicit TestSnapshotChecker(
197       SequenceNumber last_committed_sequence,
198       const std::unordered_map<SequenceNumber, SequenceNumber>& snapshots = {{}})
199       : last_committed_sequence_(last_committed_sequence),
200         snapshots_(snapshots) {}
201 
CheckInSnapshot(SequenceNumber seq,SequenceNumber snapshot_seq) const202   SnapshotCheckerResult CheckInSnapshot(
203       SequenceNumber seq, SequenceNumber snapshot_seq) const override {
204     if (snapshot_seq == kMaxSequenceNumber) {
205       return seq <= last_committed_sequence_
206                  ? SnapshotCheckerResult::kInSnapshot
207                  : SnapshotCheckerResult::kNotInSnapshot;
208     }
209     assert(snapshots_.count(snapshot_seq) > 0);
210     return seq <= snapshots_.at(snapshot_seq)
211                ? SnapshotCheckerResult::kInSnapshot
212                : SnapshotCheckerResult::kNotInSnapshot;
213   }
214 
215  private:
216   SequenceNumber last_committed_sequence_;
217   // A map of valid snapshot to last visible sequence to the snapshot.
218   std::unordered_map<SequenceNumber, SequenceNumber> snapshots_;
219 };
220 
221 // Test param:
222 //   bool: whether to pass snapshot_checker to compaction iterator.
223 class CompactionIteratorTest : public testing::TestWithParam<bool> {
224  public:
CompactionIteratorTest()225   CompactionIteratorTest()
226       : cmp_(BytewiseComparator()), icmp_(cmp_), snapshots_({}) {}
227 
CompactionIteratorTest(const Comparator * ucmp)228   explicit CompactionIteratorTest(const Comparator* ucmp)
229       : cmp_(ucmp), icmp_(cmp_), snapshots_({}) {}
230 
InitIterators(const std::vector<std::string> & ks,const std::vector<std::string> & vs,const std::vector<std::string> & range_del_ks,const std::vector<std::string> & range_del_vs,SequenceNumber last_sequence,SequenceNumber last_committed_sequence=kMaxSequenceNumber,MergeOperator * merge_op=nullptr,CompactionFilter * filter=nullptr,bool bottommost_level=false,SequenceNumber earliest_write_conflict_snapshot=kMaxSequenceNumber,bool key_not_exists_beyond_output_level=false,const std::string * full_history_ts_low=nullptr)231   void InitIterators(
232       const std::vector<std::string>& ks, const std::vector<std::string>& vs,
233       const std::vector<std::string>& range_del_ks,
234       const std::vector<std::string>& range_del_vs,
235       SequenceNumber last_sequence,
236       SequenceNumber last_committed_sequence = kMaxSequenceNumber,
237       MergeOperator* merge_op = nullptr, CompactionFilter* filter = nullptr,
238       bool bottommost_level = false,
239       SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber,
240       bool key_not_exists_beyond_output_level = false,
241       const std::string* full_history_ts_low = nullptr) {
242     std::unique_ptr<InternalIterator> unfragmented_range_del_iter(
243         new test::VectorIterator(range_del_ks, range_del_vs));
244     auto tombstone_list = std::make_shared<FragmentedRangeTombstoneList>(
245         std::move(unfragmented_range_del_iter), icmp_);
246     std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
247         new FragmentedRangeTombstoneIterator(tombstone_list, icmp_,
248                                              kMaxSequenceNumber));
249     range_del_agg_.reset(new CompactionRangeDelAggregator(&icmp_, snapshots_));
250     range_del_agg_->AddTombstones(std::move(range_del_iter));
251 
252     std::unique_ptr<CompactionIterator::CompactionProxy> compaction;
253     if (filter || bottommost_level || key_not_exists_beyond_output_level) {
254       compaction_proxy_ = new FakeCompaction();
255       compaction_proxy_->is_bottommost_level = bottommost_level;
256       compaction_proxy_->is_allow_ingest_behind = AllowIngestBehind();
257       compaction_proxy_->key_not_exists_beyond_output_level =
258           key_not_exists_beyond_output_level;
259       compaction.reset(compaction_proxy_);
260     }
261     bool use_snapshot_checker = UseSnapshotChecker() || GetParam();
262     if (use_snapshot_checker || last_committed_sequence < kMaxSequenceNumber) {
263       snapshot_checker_.reset(
264           new TestSnapshotChecker(last_committed_sequence, snapshot_map_));
265     }
266     merge_helper_.reset(
267         new MergeHelper(Env::Default(), cmp_, merge_op, filter, nullptr, false,
268                         0 /*latest_snapshot*/, snapshot_checker_.get(),
269                         0 /*level*/, nullptr /*statistics*/, &shutting_down_));
270 
271     if (c_iter_) {
272       // Since iter_ is still used in ~CompactionIterator(), we call
273       // ~CompactionIterator() first.
274       c_iter_.reset();
275     }
276     iter_.reset(new LoggingForwardVectorIterator(ks, vs));
277     iter_->SeekToFirst();
278     c_iter_.reset(new CompactionIterator(
279         iter_.get(), cmp_, merge_helper_.get(), last_sequence, &snapshots_,
280         earliest_write_conflict_snapshot, snapshot_checker_.get(),
281         Env::Default(), false /* report_detailed_time */, false,
282         range_del_agg_.get(), nullptr /* blob_file_builder */,
283         true /*allow_data_in_errors*/, std::move(compaction), filter,
284         &shutting_down_, /*preserve_deletes_seqnum=*/0,
285         /*manual_compaction_paused=*/nullptr, /*info_log=*/nullptr,
286         full_history_ts_low));
287   }
288 
AddSnapshot(SequenceNumber snapshot,SequenceNumber last_visible_seq=kMaxSequenceNumber)289   void AddSnapshot(SequenceNumber snapshot,
290                    SequenceNumber last_visible_seq = kMaxSequenceNumber) {
291     snapshots_.push_back(snapshot);
292     snapshot_map_[snapshot] = last_visible_seq;
293   }
294 
UseSnapshotChecker() const295   virtual bool UseSnapshotChecker() const { return false; }
296 
AllowIngestBehind() const297   virtual bool AllowIngestBehind() const { return false; }
298 
RunTest(const std::vector<std::string> & input_keys,const std::vector<std::string> & input_values,const std::vector<std::string> & expected_keys,const std::vector<std::string> & expected_values,SequenceNumber last_committed_seq=kMaxSequenceNumber,MergeOperator * merge_operator=nullptr,CompactionFilter * compaction_filter=nullptr,bool bottommost_level=false,SequenceNumber earliest_write_conflict_snapshot=kMaxSequenceNumber,bool key_not_exists_beyond_output_level=false,const std::string * full_history_ts_low=nullptr)299   void RunTest(
300       const std::vector<std::string>& input_keys,
301       const std::vector<std::string>& input_values,
302       const std::vector<std::string>& expected_keys,
303       const std::vector<std::string>& expected_values,
304       SequenceNumber last_committed_seq = kMaxSequenceNumber,
305       MergeOperator* merge_operator = nullptr,
306       CompactionFilter* compaction_filter = nullptr,
307       bool bottommost_level = false,
308       SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber,
309       bool key_not_exists_beyond_output_level = false,
310       const std::string* full_history_ts_low = nullptr) {
311     InitIterators(input_keys, input_values, {}, {}, kMaxSequenceNumber,
312                   last_committed_seq, merge_operator, compaction_filter,
313                   bottommost_level, earliest_write_conflict_snapshot,
314                   key_not_exists_beyond_output_level, full_history_ts_low);
315     c_iter_->SeekToFirst();
316     for (size_t i = 0; i < expected_keys.size(); i++) {
317       std::string info = "i = " + ToString(i);
318       ASSERT_TRUE(c_iter_->Valid()) << info;
319       ASSERT_OK(c_iter_->status()) << info;
320       ASSERT_EQ(expected_keys[i], c_iter_->key().ToString()) << info;
321       ASSERT_EQ(expected_values[i], c_iter_->value().ToString()) << info;
322       c_iter_->Next();
323     }
324     ASSERT_OK(c_iter_->status());
325     ASSERT_FALSE(c_iter_->Valid());
326   }
327 
ClearSnapshots()328   void ClearSnapshots() {
329     snapshots_.clear();
330     snapshot_map_.clear();
331   }
332 
333   const Comparator* cmp_;
334   const InternalKeyComparator icmp_;
335   std::vector<SequenceNumber> snapshots_;
336   // A map of valid snapshot to last visible sequence to the snapshot.
337   std::unordered_map<SequenceNumber, SequenceNumber> snapshot_map_;
338   std::unique_ptr<MergeHelper> merge_helper_;
339   std::unique_ptr<LoggingForwardVectorIterator> iter_;
340   std::unique_ptr<CompactionIterator> c_iter_;
341   std::unique_ptr<CompactionRangeDelAggregator> range_del_agg_;
342   std::unique_ptr<SnapshotChecker> snapshot_checker_;
343   std::atomic<bool> shutting_down_{false};
344   FakeCompaction* compaction_proxy_;
345 };
346 
347 // It is possible that the output of the compaction iterator is empty even if
348 // the input is not.
TEST_P(CompactionIteratorTest,EmptyResult)349 TEST_P(CompactionIteratorTest, EmptyResult) {
350   InitIterators({test::KeyStr("a", 5, kTypeSingleDeletion),
351                  test::KeyStr("a", 3, kTypeValue)},
352                 {"", "val"}, {}, {}, 5);
353   c_iter_->SeekToFirst();
354   ASSERT_OK(c_iter_->status());
355   ASSERT_FALSE(c_iter_->Valid());
356 }
357 
358 // If there is a corruption after a single deletion, the corrupted key should
359 // be preserved.
TEST_P(CompactionIteratorTest,CorruptionAfterSingleDeletion)360 TEST_P(CompactionIteratorTest, CorruptionAfterSingleDeletion) {
361   InitIterators({test::KeyStr("a", 5, kTypeSingleDeletion),
362                  test::KeyStr("a", 3, kTypeValue, true),
363                  test::KeyStr("b", 10, kTypeValue)},
364                 {"", "val", "val2"}, {}, {}, 10);
365   c_iter_->SeekToFirst();
366   ASSERT_TRUE(c_iter_->Valid());
367   ASSERT_EQ(test::KeyStr("a", 5, kTypeSingleDeletion),
368             c_iter_->key().ToString());
369   c_iter_->Next();
370   ASSERT_TRUE(c_iter_->Valid());
371   ASSERT_EQ(test::KeyStr("a", 3, kTypeValue, true), c_iter_->key().ToString());
372   c_iter_->Next();
373   ASSERT_TRUE(c_iter_->Valid());
374   ASSERT_EQ(test::KeyStr("b", 10, kTypeValue), c_iter_->key().ToString());
375   c_iter_->Next();
376   ASSERT_OK(c_iter_->status());
377   ASSERT_FALSE(c_iter_->Valid());
378 }
379 
TEST_P(CompactionIteratorTest,SimpleRangeDeletion)380 TEST_P(CompactionIteratorTest, SimpleRangeDeletion) {
381   InitIterators({test::KeyStr("morning", 5, kTypeValue),
382                  test::KeyStr("morning", 2, kTypeValue),
383                  test::KeyStr("night", 3, kTypeValue)},
384                 {"zao", "zao", "wan"},
385                 {test::KeyStr("ma", 4, kTypeRangeDeletion)}, {"mz"}, 5);
386   c_iter_->SeekToFirst();
387   ASSERT_TRUE(c_iter_->Valid());
388   ASSERT_EQ(test::KeyStr("morning", 5, kTypeValue), c_iter_->key().ToString());
389   c_iter_->Next();
390   ASSERT_TRUE(c_iter_->Valid());
391   ASSERT_EQ(test::KeyStr("night", 3, kTypeValue), c_iter_->key().ToString());
392   c_iter_->Next();
393   ASSERT_OK(c_iter_->status());
394   ASSERT_FALSE(c_iter_->Valid());
395 }
396 
TEST_P(CompactionIteratorTest,RangeDeletionWithSnapshots)397 TEST_P(CompactionIteratorTest, RangeDeletionWithSnapshots) {
398   AddSnapshot(10);
399   std::vector<std::string> ks1;
400   ks1.push_back(test::KeyStr("ma", 28, kTypeRangeDeletion));
401   std::vector<std::string> vs1{"mz"};
402   std::vector<std::string> ks2{test::KeyStr("morning", 15, kTypeValue),
403                                test::KeyStr("morning", 5, kTypeValue),
404                                test::KeyStr("night", 40, kTypeValue),
405                                test::KeyStr("night", 20, kTypeValue)};
406   std::vector<std::string> vs2{"zao 15", "zao 5", "wan 40", "wan 20"};
407   InitIterators(ks2, vs2, ks1, vs1, 40);
408   c_iter_->SeekToFirst();
409   ASSERT_TRUE(c_iter_->Valid());
410   ASSERT_EQ(test::KeyStr("morning", 5, kTypeValue), c_iter_->key().ToString());
411   c_iter_->Next();
412   ASSERT_TRUE(c_iter_->Valid());
413   ASSERT_EQ(test::KeyStr("night", 40, kTypeValue), c_iter_->key().ToString());
414   c_iter_->Next();
415   ASSERT_OK(c_iter_->status());
416   ASSERT_FALSE(c_iter_->Valid());
417 }
418 
TEST_P(CompactionIteratorTest,CompactionFilterSkipUntil)419 TEST_P(CompactionIteratorTest, CompactionFilterSkipUntil) {
420   class Filter : public CompactionFilter {
421     Decision FilterV2(int /*level*/, const Slice& key, ValueType t,
422                       const Slice& existing_value, std::string* /*new_value*/,
423                       std::string* skip_until) const override {
424       std::string k = key.ToString();
425       std::string v = existing_value.ToString();
426       // See InitIterators() call below for the sequence of keys and their
427       // filtering decisions. Here we closely assert that compaction filter is
428       // called with the expected keys and only them, and with the right values.
429       if (k == "a") {
430         EXPECT_EQ(ValueType::kValue, t);
431         EXPECT_EQ("av50", v);
432         return Decision::kKeep;
433       }
434       if (k == "b") {
435         EXPECT_EQ(ValueType::kValue, t);
436         EXPECT_EQ("bv60", v);
437         *skip_until = "d+";
438         return Decision::kRemoveAndSkipUntil;
439       }
440       if (k == "e") {
441         EXPECT_EQ(ValueType::kMergeOperand, t);
442         EXPECT_EQ("em71", v);
443         return Decision::kKeep;
444       }
445       if (k == "f") {
446         if (v == "fm65") {
447           EXPECT_EQ(ValueType::kMergeOperand, t);
448           *skip_until = "f";
449         } else {
450           EXPECT_EQ("fm30", v);
451           EXPECT_EQ(ValueType::kMergeOperand, t);
452           *skip_until = "g+";
453         }
454         return Decision::kRemoveAndSkipUntil;
455       }
456       if (k == "h") {
457         EXPECT_EQ(ValueType::kValue, t);
458         EXPECT_EQ("hv91", v);
459         return Decision::kKeep;
460       }
461       if (k == "i") {
462         EXPECT_EQ(ValueType::kMergeOperand, t);
463         EXPECT_EQ("im95", v);
464         *skip_until = "z";
465         return Decision::kRemoveAndSkipUntil;
466       }
467       ADD_FAILURE();
468       return Decision::kKeep;
469     }
470 
471     const char* Name() const override {
472       return "CompactionIteratorTest.CompactionFilterSkipUntil::Filter";
473     }
474   };
475 
476   NoMergingMergeOp merge_op;
477   Filter filter;
478   InitIterators(
479       {test::KeyStr("a", 50, kTypeValue),  // keep
480        test::KeyStr("a", 45, kTypeMerge),
481        test::KeyStr("b", 60, kTypeValue),  // skip to "d+"
482        test::KeyStr("b", 40, kTypeValue), test::KeyStr("c", 35, kTypeValue),
483        test::KeyStr("d", 70, kTypeMerge),
484        test::KeyStr("e", 71, kTypeMerge),  // keep
485        test::KeyStr("f", 65, kTypeMerge),  // skip to "f", aka keep
486        test::KeyStr("f", 30, kTypeMerge),  // skip to "g+"
487        test::KeyStr("f", 25, kTypeValue), test::KeyStr("g", 90, kTypeValue),
488        test::KeyStr("h", 91, kTypeValue),  // keep
489        test::KeyStr("i", 95, kTypeMerge),  // skip to "z"
490        test::KeyStr("j", 99, kTypeValue)},
491       {"av50", "am45", "bv60", "bv40", "cv35", "dm70", "em71", "fm65", "fm30",
492        "fv25", "gv90", "hv91", "im95", "jv99"},
493       {}, {}, kMaxSequenceNumber, kMaxSequenceNumber, &merge_op, &filter);
494 
495   // Compaction should output just "a", "e" and "h" keys.
496   c_iter_->SeekToFirst();
497   ASSERT_TRUE(c_iter_->Valid());
498   ASSERT_EQ(test::KeyStr("a", 50, kTypeValue), c_iter_->key().ToString());
499   ASSERT_EQ("av50", c_iter_->value().ToString());
500   c_iter_->Next();
501   ASSERT_TRUE(c_iter_->Valid());
502   ASSERT_EQ(test::KeyStr("e", 71, kTypeMerge), c_iter_->key().ToString());
503   ASSERT_EQ("em71", c_iter_->value().ToString());
504   c_iter_->Next();
505   ASSERT_TRUE(c_iter_->Valid());
506   ASSERT_EQ(test::KeyStr("h", 91, kTypeValue), c_iter_->key().ToString());
507   ASSERT_EQ("hv91", c_iter_->value().ToString());
508   c_iter_->Next();
509   ASSERT_OK(c_iter_->status());
510   ASSERT_FALSE(c_iter_->Valid());
511 
512   // Check that the compaction iterator did the correct sequence of calls on
513   // the underlying iterator.
514   using A = LoggingForwardVectorIterator::Action;
515   using T = A::Type;
516   std::vector<A> expected_actions = {
517       A(T::SEEK_TO_FIRST),
518       A(T::NEXT),
519       A(T::NEXT),
520       A(T::SEEK, test::KeyStr("d+", kMaxSequenceNumber, kValueTypeForSeek)),
521       A(T::NEXT),
522       A(T::NEXT),
523       A(T::SEEK, test::KeyStr("g+", kMaxSequenceNumber, kValueTypeForSeek)),
524       A(T::NEXT),
525       A(T::SEEK, test::KeyStr("z", kMaxSequenceNumber, kValueTypeForSeek))};
526   ASSERT_EQ(expected_actions, iter_->log);
527 }
528 
TEST_P(CompactionIteratorTest,ShuttingDownInFilter)529 TEST_P(CompactionIteratorTest, ShuttingDownInFilter) {
530   NoMergingMergeOp merge_op;
531   StallingFilter filter;
532   InitIterators(
533       {test::KeyStr("1", 1, kTypeValue), test::KeyStr("2", 2, kTypeValue),
534        test::KeyStr("3", 3, kTypeValue), test::KeyStr("4", 4, kTypeValue)},
535       {"v1", "v2", "v3", "v4"}, {}, {}, kMaxSequenceNumber, kMaxSequenceNumber,
536       &merge_op, &filter);
537   // Don't leave tombstones (kTypeDeletion) for filtered keys.
538   compaction_proxy_->key_not_exists_beyond_output_level = true;
539 
540   std::atomic<bool> seek_done{false};
541   ROCKSDB_NAMESPACE::port::Thread compaction_thread([&] {
542     c_iter_->SeekToFirst();
543     EXPECT_FALSE(c_iter_->Valid());
544     EXPECT_TRUE(c_iter_->status().IsShutdownInProgress());
545     seek_done.store(true);
546   });
547 
548   // Let key 1 through.
549   filter.WaitForStall(1);
550 
551   // Shutdown during compaction filter call for key 2.
552   filter.WaitForStall(2);
553   shutting_down_.store(true);
554   EXPECT_FALSE(seek_done.load());
555 
556   // Unstall filter and wait for SeekToFirst() to return.
557   filter.stall_at.store(3);
558   compaction_thread.join();
559   assert(seek_done.load());
560 
561   // Check that filter was never called again.
562   EXPECT_EQ(2, filter.last_seen.load());
563 }
564 
565 // Same as ShuttingDownInFilter, but shutdown happens during filter call for
566 // a merge operand, not for a value.
TEST_P(CompactionIteratorTest,ShuttingDownInMerge)567 TEST_P(CompactionIteratorTest, ShuttingDownInMerge) {
568   NoMergingMergeOp merge_op;
569   StallingFilter filter;
570   InitIterators(
571       {test::KeyStr("1", 1, kTypeValue), test::KeyStr("2", 2, kTypeMerge),
572        test::KeyStr("3", 3, kTypeMerge), test::KeyStr("4", 4, kTypeValue)},
573       {"v1", "v2", "v3", "v4"}, {}, {}, kMaxSequenceNumber, kMaxSequenceNumber,
574       &merge_op, &filter);
575   compaction_proxy_->key_not_exists_beyond_output_level = true;
576 
577   std::atomic<bool> seek_done{false};
578   ROCKSDB_NAMESPACE::port::Thread compaction_thread([&] {
579     c_iter_->SeekToFirst();
580     ASSERT_FALSE(c_iter_->Valid());
581     ASSERT_TRUE(c_iter_->status().IsShutdownInProgress());
582     seek_done.store(true);
583   });
584 
585   // Let key 1 through.
586   filter.WaitForStall(1);
587 
588   // Shutdown during compaction filter call for key 2.
589   filter.WaitForStall(2);
590   shutting_down_.store(true);
591   EXPECT_FALSE(seek_done.load());
592 
593   // Unstall filter and wait for SeekToFirst() to return.
594   filter.stall_at.store(3);
595   compaction_thread.join();
596   assert(seek_done.load());
597 
598   // Check that filter was never called again.
599   EXPECT_EQ(2, filter.last_seen.load());
600 }
601 
TEST_P(CompactionIteratorTest,SingleMergeOperand)602 TEST_P(CompactionIteratorTest, SingleMergeOperand) {
603   class Filter : public CompactionFilter {
604     Decision FilterV2(int /*level*/, const Slice& key, ValueType t,
605                       const Slice& existing_value, std::string* /*new_value*/,
606                       std::string* /*skip_until*/) const override {
607       std::string k = key.ToString();
608       std::string v = existing_value.ToString();
609 
610       // See InitIterators() call below for the sequence of keys and their
611       // filtering decisions. Here we closely assert that compaction filter is
612       // called with the expected keys and only them, and with the right values.
613       if (k == "a") {
614         EXPECT_EQ(ValueType::kMergeOperand, t);
615         EXPECT_EQ("av1", v);
616         return Decision::kKeep;
617       } else if (k == "b") {
618         EXPECT_EQ(ValueType::kMergeOperand, t);
619         return Decision::kKeep;
620       } else if (k == "c") {
621         return Decision::kKeep;
622       }
623 
624       ADD_FAILURE();
625       return Decision::kKeep;
626     }
627 
628     const char* Name() const override {
629       return "CompactionIteratorTest.SingleMergeOperand::Filter";
630     }
631   };
632 
633   class SingleMergeOp : public MergeOperator {
634    public:
635     bool FullMergeV2(const MergeOperationInput& merge_in,
636                      MergeOperationOutput* merge_out) const override {
637       // See InitIterators() call below for why "c" is the only key for which
638       // FullMergeV2 should be called.
639       EXPECT_EQ("c", merge_in.key.ToString());
640 
641       std::string temp_value;
642       if (merge_in.existing_value != nullptr) {
643         temp_value = merge_in.existing_value->ToString();
644       }
645 
646       for (auto& operand : merge_in.operand_list) {
647         temp_value.append(operand.ToString());
648       }
649       merge_out->new_value = temp_value;
650 
651       return true;
652     }
653 
654     bool PartialMergeMulti(const Slice& key,
655                            const std::deque<Slice>& operand_list,
656                            std::string* new_value,
657                            Logger* /*logger*/) const override {
658       std::string string_key = key.ToString();
659       EXPECT_TRUE(string_key == "a" || string_key == "b");
660 
661       if (string_key == "a") {
662         EXPECT_EQ(1, operand_list.size());
663       } else if (string_key == "b") {
664         EXPECT_EQ(2, operand_list.size());
665       }
666 
667       std::string temp_value;
668       for (auto& operand : operand_list) {
669         temp_value.append(operand.ToString());
670       }
671       swap(temp_value, *new_value);
672 
673       return true;
674     }
675 
676     const char* Name() const override {
677       return "CompactionIteratorTest SingleMergeOp";
678     }
679 
680     bool AllowSingleOperand() const override { return true; }
681   };
682 
683   SingleMergeOp merge_op;
684   Filter filter;
685   InitIterators(
686       // a should invoke PartialMergeMulti with a single merge operand.
687       {test::KeyStr("a", 50, kTypeMerge),
688        // b should invoke PartialMergeMulti with two operands.
689        test::KeyStr("b", 70, kTypeMerge), test::KeyStr("b", 60, kTypeMerge),
690        // c should invoke FullMerge due to kTypeValue at the beginning.
691        test::KeyStr("c", 90, kTypeMerge), test::KeyStr("c", 80, kTypeValue)},
692       {"av1", "bv2", "bv1", "cv2", "cv1"}, {}, {}, kMaxSequenceNumber,
693       kMaxSequenceNumber, &merge_op, &filter);
694 
695   c_iter_->SeekToFirst();
696   ASSERT_TRUE(c_iter_->Valid());
697   ASSERT_EQ(test::KeyStr("a", 50, kTypeMerge), c_iter_->key().ToString());
698   ASSERT_EQ("av1", c_iter_->value().ToString());
699   c_iter_->Next();
700   ASSERT_TRUE(c_iter_->Valid());
701   ASSERT_EQ("bv1bv2", c_iter_->value().ToString());
702   c_iter_->Next();
703   ASSERT_OK(c_iter_->status());
704   ASSERT_EQ("cv1cv2", c_iter_->value().ToString());
705 }
706 
707 // In bottommost level, values earlier than earliest snapshot can be output
708 // with sequence = 0.
TEST_P(CompactionIteratorTest,ZeroOutSequenceAtBottomLevel)709 TEST_P(CompactionIteratorTest, ZeroOutSequenceAtBottomLevel) {
710   AddSnapshot(1);
711   RunTest({test::KeyStr("a", 1, kTypeValue), test::KeyStr("b", 2, kTypeValue)},
712           {"v1", "v2"},
713           {test::KeyStr("a", 0, kTypeValue), test::KeyStr("b", 2, kTypeValue)},
714           {"v1", "v2"}, kMaxSequenceNumber /*last_committed_seq*/,
715           nullptr /*merge_operator*/, nullptr /*compaction_filter*/,
716           true /*bottommost_level*/);
717 }
718 
719 // In bottommost level, deletions earlier than earliest snapshot can be removed
720 // permanently.
TEST_P(CompactionIteratorTest,RemoveDeletionAtBottomLevel)721 TEST_P(CompactionIteratorTest, RemoveDeletionAtBottomLevel) {
722   AddSnapshot(1);
723   RunTest(
724       {test::KeyStr("a", 1, kTypeDeletion), test::KeyStr("b", 3, kTypeDeletion),
725        test::KeyStr("b", 1, kTypeValue)},
726       {"", "", ""},
727       {test::KeyStr("b", 3, kTypeDeletion), test::KeyStr("b", 0, kTypeValue)},
728       {"", ""}, kMaxSequenceNumber /*last_committed_seq*/,
729       nullptr /*merge_operator*/, nullptr /*compaction_filter*/,
730       true /*bottommost_level*/);
731 }
732 
733 // In bottommost level, single deletions earlier than earliest snapshot can be
734 // removed permanently.
TEST_P(CompactionIteratorTest,RemoveSingleDeletionAtBottomLevel)735 TEST_P(CompactionIteratorTest, RemoveSingleDeletionAtBottomLevel) {
736   AddSnapshot(1);
737   RunTest({test::KeyStr("a", 1, kTypeSingleDeletion),
738            test::KeyStr("b", 2, kTypeSingleDeletion)},
739           {"", ""}, {test::KeyStr("b", 2, kTypeSingleDeletion)}, {""},
740           kMaxSequenceNumber /*last_committed_seq*/, nullptr /*merge_operator*/,
741           nullptr /*compaction_filter*/, true /*bottommost_level*/);
742 }
743 
TEST_P(CompactionIteratorTest,ConvertToPutAtBottom)744 TEST_P(CompactionIteratorTest, ConvertToPutAtBottom) {
745   std::shared_ptr<MergeOperator> merge_op =
746       MergeOperators::CreateStringAppendOperator();
747   RunTest({test::KeyStr("a", 4, kTypeMerge), test::KeyStr("a", 3, kTypeMerge),
748            test::KeyStr("a", 2, kTypeMerge), test::KeyStr("b", 1, kTypeValue)},
749           {"a4", "a3", "a2", "b1"},
750           {test::KeyStr("a", 0, kTypeValue), test::KeyStr("b", 0, kTypeValue)},
751           {"a2,a3,a4", "b1"}, kMaxSequenceNumber /*last_committed_seq*/,
752           merge_op.get(), nullptr /*compaction_filter*/,
753           true /*bottomost_level*/);
754 }
755 
756 INSTANTIATE_TEST_CASE_P(CompactionIteratorTestInstance, CompactionIteratorTest,
757                         testing::Values(true, false));
758 
759 // Tests how CompactionIterator work together with SnapshotChecker.
760 class CompactionIteratorWithSnapshotCheckerTest
761     : public CompactionIteratorTest {
762  public:
UseSnapshotChecker() const763   bool UseSnapshotChecker() const override { return true; }
764 };
765 
766 // Uncommitted keys (keys with seq > last_committed_seq) should be output as-is
767 // while committed version of these keys should get compacted as usual.
768 
TEST_F(CompactionIteratorWithSnapshotCheckerTest,PreserveUncommittedKeys_Value)769 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
770        PreserveUncommittedKeys_Value) {
771   RunTest(
772       {test::KeyStr("foo", 3, kTypeValue), test::KeyStr("foo", 2, kTypeValue),
773        test::KeyStr("foo", 1, kTypeValue)},
774       {"v3", "v2", "v1"},
775       {test::KeyStr("foo", 3, kTypeValue), test::KeyStr("foo", 2, kTypeValue)},
776       {"v3", "v2"}, 2 /*last_committed_seq*/);
777 }
778 
TEST_F(CompactionIteratorWithSnapshotCheckerTest,PreserveUncommittedKeys_Deletion)779 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
780        PreserveUncommittedKeys_Deletion) {
781   RunTest({test::KeyStr("foo", 2, kTypeDeletion),
782            test::KeyStr("foo", 1, kTypeValue)},
783           {"", "v1"},
784           {test::KeyStr("foo", 2, kTypeDeletion),
785            test::KeyStr("foo", 1, kTypeValue)},
786           {"", "v1"}, 1 /*last_committed_seq*/);
787 }
788 
TEST_F(CompactionIteratorWithSnapshotCheckerTest,PreserveUncommittedKeys_Merge)789 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
790        PreserveUncommittedKeys_Merge) {
791   auto merge_op = MergeOperators::CreateStringAppendOperator();
792   RunTest(
793       {test::KeyStr("foo", 3, kTypeMerge), test::KeyStr("foo", 2, kTypeMerge),
794        test::KeyStr("foo", 1, kTypeValue)},
795       {"v3", "v2", "v1"},
796       {test::KeyStr("foo", 3, kTypeMerge), test::KeyStr("foo", 2, kTypeValue)},
797       {"v3", "v1,v2"}, 2 /*last_committed_seq*/, merge_op.get());
798 }
799 
TEST_F(CompactionIteratorWithSnapshotCheckerTest,PreserveUncommittedKeys_SingleDelete)800 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
801        PreserveUncommittedKeys_SingleDelete) {
802   RunTest({test::KeyStr("foo", 2, kTypeSingleDeletion),
803            test::KeyStr("foo", 1, kTypeValue)},
804           {"", "v1"},
805           {test::KeyStr("foo", 2, kTypeSingleDeletion),
806            test::KeyStr("foo", 1, kTypeValue)},
807           {"", "v1"}, 1 /*last_committed_seq*/);
808 }
809 
TEST_F(CompactionIteratorWithSnapshotCheckerTest,PreserveUncommittedKeys_BlobIndex)810 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
811        PreserveUncommittedKeys_BlobIndex) {
812   RunTest({test::KeyStr("foo", 3, kTypeBlobIndex),
813            test::KeyStr("foo", 2, kTypeBlobIndex),
814            test::KeyStr("foo", 1, kTypeBlobIndex)},
815           {"v3", "v2", "v1"},
816           {test::KeyStr("foo", 3, kTypeBlobIndex),
817            test::KeyStr("foo", 2, kTypeBlobIndex)},
818           {"v3", "v2"}, 2 /*last_committed_seq*/);
819 }
820 
821 // Test compaction iterator dedup keys visible to the same snapshot.
822 
TEST_F(CompactionIteratorWithSnapshotCheckerTest,DedupSameSnapshot_Value)823 TEST_F(CompactionIteratorWithSnapshotCheckerTest, DedupSameSnapshot_Value) {
824   AddSnapshot(2, 1);
825   RunTest(
826       {test::KeyStr("foo", 4, kTypeValue), test::KeyStr("foo", 3, kTypeValue),
827        test::KeyStr("foo", 2, kTypeValue), test::KeyStr("foo", 1, kTypeValue)},
828       {"v4", "v3", "v2", "v1"},
829       {test::KeyStr("foo", 4, kTypeValue), test::KeyStr("foo", 3, kTypeValue),
830        test::KeyStr("foo", 1, kTypeValue)},
831       {"v4", "v3", "v1"}, 3 /*last_committed_seq*/);
832 }
833 
TEST_F(CompactionIteratorWithSnapshotCheckerTest,DedupSameSnapshot_Deletion)834 TEST_F(CompactionIteratorWithSnapshotCheckerTest, DedupSameSnapshot_Deletion) {
835   AddSnapshot(2, 1);
836   RunTest(
837       {test::KeyStr("foo", 4, kTypeValue),
838        test::KeyStr("foo", 3, kTypeDeletion),
839        test::KeyStr("foo", 2, kTypeValue), test::KeyStr("foo", 1, kTypeValue)},
840       {"v4", "", "v2", "v1"},
841       {test::KeyStr("foo", 4, kTypeValue),
842        test::KeyStr("foo", 3, kTypeDeletion),
843        test::KeyStr("foo", 1, kTypeValue)},
844       {"v4", "", "v1"}, 3 /*last_committed_seq*/);
845 }
846 
TEST_F(CompactionIteratorWithSnapshotCheckerTest,DedupSameSnapshot_Merge)847 TEST_F(CompactionIteratorWithSnapshotCheckerTest, DedupSameSnapshot_Merge) {
848   AddSnapshot(2, 1);
849   AddSnapshot(4, 3);
850   auto merge_op = MergeOperators::CreateStringAppendOperator();
851   RunTest(
852       {test::KeyStr("foo", 5, kTypeMerge), test::KeyStr("foo", 4, kTypeMerge),
853        test::KeyStr("foo", 3, kTypeMerge), test::KeyStr("foo", 2, kTypeMerge),
854        test::KeyStr("foo", 1, kTypeValue)},
855       {"v5", "v4", "v3", "v2", "v1"},
856       {test::KeyStr("foo", 5, kTypeMerge), test::KeyStr("foo", 4, kTypeMerge),
857        test::KeyStr("foo", 3, kTypeMerge), test::KeyStr("foo", 1, kTypeValue)},
858       {"v5", "v4", "v2,v3", "v1"}, 4 /*last_committed_seq*/, merge_op.get());
859 }
860 
TEST_F(CompactionIteratorWithSnapshotCheckerTest,DedupSameSnapshot_SingleDeletion)861 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
862        DedupSameSnapshot_SingleDeletion) {
863   AddSnapshot(2, 1);
864   RunTest(
865       {test::KeyStr("foo", 4, kTypeValue),
866        test::KeyStr("foo", 3, kTypeSingleDeletion),
867        test::KeyStr("foo", 2, kTypeValue), test::KeyStr("foo", 1, kTypeValue)},
868       {"v4", "", "v2", "v1"},
869       {test::KeyStr("foo", 4, kTypeValue), test::KeyStr("foo", 1, kTypeValue)},
870       {"v4", "v1"}, 3 /*last_committed_seq*/);
871 }
872 
TEST_F(CompactionIteratorWithSnapshotCheckerTest,DedupSameSnapshot_BlobIndex)873 TEST_F(CompactionIteratorWithSnapshotCheckerTest, DedupSameSnapshot_BlobIndex) {
874   AddSnapshot(2, 1);
875   RunTest({test::KeyStr("foo", 4, kTypeBlobIndex),
876            test::KeyStr("foo", 3, kTypeBlobIndex),
877            test::KeyStr("foo", 2, kTypeBlobIndex),
878            test::KeyStr("foo", 1, kTypeBlobIndex)},
879           {"v4", "v3", "v2", "v1"},
880           {test::KeyStr("foo", 4, kTypeBlobIndex),
881            test::KeyStr("foo", 3, kTypeBlobIndex),
882            test::KeyStr("foo", 1, kTypeBlobIndex)},
883           {"v4", "v3", "v1"}, 3 /*last_committed_seq*/);
884 }
885 
886 // At bottom level, sequence numbers can be zero out, and deletions can be
887 // removed, but only when they are visible to earliest snapshot.
888 
TEST_F(CompactionIteratorWithSnapshotCheckerTest,NotZeroOutSequenceIfNotVisibleToEarliestSnapshot)889 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
890        NotZeroOutSequenceIfNotVisibleToEarliestSnapshot) {
891   AddSnapshot(2, 1);
892   RunTest({test::KeyStr("a", 1, kTypeValue), test::KeyStr("b", 2, kTypeValue),
893            test::KeyStr("c", 3, kTypeValue)},
894           {"v1", "v2", "v3"},
895           {test::KeyStr("a", 0, kTypeValue), test::KeyStr("b", 2, kTypeValue),
896            test::KeyStr("c", 3, kTypeValue)},
897           {"v1", "v2", "v3"}, kMaxSequenceNumber /*last_committed_seq*/,
898           nullptr /*merge_operator*/, nullptr /*compaction_filter*/,
899           true /*bottommost_level*/);
900 }
901 
TEST_F(CompactionIteratorWithSnapshotCheckerTest,NotRemoveDeletionIfNotVisibleToEarliestSnapshot)902 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
903        NotRemoveDeletionIfNotVisibleToEarliestSnapshot) {
904   AddSnapshot(2, 1);
905   RunTest(
906       {test::KeyStr("a", 1, kTypeDeletion), test::KeyStr("b", 2, kTypeDeletion),
907        test::KeyStr("c", 3, kTypeDeletion)},
908       {"", "", ""}, {}, {"", ""}, kMaxSequenceNumber /*last_committed_seq*/,
909       nullptr /*merge_operator*/, nullptr /*compaction_filter*/,
910       true /*bottommost_level*/);
911 }
912 
TEST_F(CompactionIteratorWithSnapshotCheckerTest,NotRemoveDeletionIfValuePresentToEarlierSnapshot)913 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
914        NotRemoveDeletionIfValuePresentToEarlierSnapshot) {
915   AddSnapshot(2,1);
916   RunTest({test::KeyStr("a", 4, kTypeDeletion),
917            test::KeyStr("a", 1, kTypeValue), test::KeyStr("b", 3, kTypeValue)},
918           {"", "", ""},
919           {test::KeyStr("a", 4, kTypeDeletion),
920            test::KeyStr("a", 0, kTypeValue), test::KeyStr("b", 3, kTypeValue)},
921           {"", "", ""}, kMaxSequenceNumber /*last_committed_seq*/,
922           nullptr /*merge_operator*/, nullptr /*compaction_filter*/,
923           true /*bottommost_level*/);
924 }
925 
TEST_F(CompactionIteratorWithSnapshotCheckerTest,NotRemoveSingleDeletionIfNotVisibleToEarliestSnapshot)926 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
927        NotRemoveSingleDeletionIfNotVisibleToEarliestSnapshot) {
928   AddSnapshot(2, 1);
929   RunTest({test::KeyStr("a", 1, kTypeSingleDeletion),
930            test::KeyStr("b", 2, kTypeSingleDeletion),
931            test::KeyStr("c", 3, kTypeSingleDeletion)},
932           {"", "", ""},
933           {test::KeyStr("b", 2, kTypeSingleDeletion),
934            test::KeyStr("c", 3, kTypeSingleDeletion)},
935           {"", ""}, kMaxSequenceNumber /*last_committed_seq*/,
936           nullptr /*merge_operator*/, nullptr /*compaction_filter*/,
937           true /*bottommost_level*/);
938 }
939 
940 // Single delete should not cancel out values that not visible to the
941 // same set of snapshots
TEST_F(CompactionIteratorWithSnapshotCheckerTest,SingleDeleteAcrossSnapshotBoundary)942 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
943        SingleDeleteAcrossSnapshotBoundary) {
944   AddSnapshot(2, 1);
945   RunTest({test::KeyStr("a", 2, kTypeSingleDeletion),
946            test::KeyStr("a", 1, kTypeValue)},
947           {"", "v1"},
948           {test::KeyStr("a", 2, kTypeSingleDeletion),
949            test::KeyStr("a", 1, kTypeValue)},
950           {"", "v1"}, 2 /*last_committed_seq*/);
951 }
952 
953 // Single delete should be kept in case it is not visible to the
954 // earliest write conflict snapshot. If a single delete is kept for this reason,
955 // corresponding value can be trimmed to save space.
TEST_F(CompactionIteratorWithSnapshotCheckerTest,KeepSingleDeletionForWriteConflictChecking)956 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
957        KeepSingleDeletionForWriteConflictChecking) {
958   AddSnapshot(2, 0);
959   RunTest({test::KeyStr("a", 2, kTypeSingleDeletion),
960            test::KeyStr("a", 1, kTypeValue)},
961           {"", "v1"},
962           {test::KeyStr("a", 2, kTypeSingleDeletion),
963            test::KeyStr("a", 1, kTypeValue)},
964           {"", ""}, 2 /*last_committed_seq*/, nullptr /*merge_operator*/,
965           nullptr /*compaction_filter*/, false /*bottommost_level*/,
966           2 /*earliest_write_conflict_snapshot*/);
967 }
968 
969 // Same as above but with a blob index. In addition to the value getting
970 // trimmed, the type of the KV is changed to kTypeValue.
TEST_F(CompactionIteratorWithSnapshotCheckerTest,KeepSingleDeletionForWriteConflictChecking_BlobIndex)971 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
972        KeepSingleDeletionForWriteConflictChecking_BlobIndex) {
973   AddSnapshot(2, 0);
974   RunTest({test::KeyStr("a", 2, kTypeSingleDeletion),
975            test::KeyStr("a", 1, kTypeBlobIndex)},
976           {"", "fake_blob_index"},
977           {test::KeyStr("a", 2, kTypeSingleDeletion),
978            test::KeyStr("a", 1, kTypeValue)},
979           {"", ""}, 2 /*last_committed_seq*/, nullptr /*merge_operator*/,
980           nullptr /*compaction_filter*/, false /*bottommost_level*/,
981           2 /*earliest_write_conflict_snapshot*/);
982 }
983 
984 // Compaction filter should keep uncommitted key as-is, and
985 //   * Convert the latest value to deletion, and/or
986 //   * if latest value is a merge, apply filter to all subsequent merges.
987 
TEST_F(CompactionIteratorWithSnapshotCheckerTest,CompactionFilter_Value)988 TEST_F(CompactionIteratorWithSnapshotCheckerTest, CompactionFilter_Value) {
989   std::unique_ptr<CompactionFilter> compaction_filter(
990       new FilterAllKeysCompactionFilter());
991   RunTest(
992       {test::KeyStr("a", 2, kTypeValue), test::KeyStr("a", 1, kTypeValue),
993        test::KeyStr("b", 3, kTypeValue), test::KeyStr("c", 1, kTypeValue)},
994       {"v2", "v1", "v3", "v4"},
995       {test::KeyStr("a", 2, kTypeValue), test::KeyStr("a", 1, kTypeDeletion),
996        test::KeyStr("b", 3, kTypeValue), test::KeyStr("c", 1, kTypeDeletion)},
997       {"v2", "", "v3", ""}, 1 /*last_committed_seq*/,
998       nullptr /*merge_operator*/, compaction_filter.get());
999 }
1000 
TEST_F(CompactionIteratorWithSnapshotCheckerTest,CompactionFilter_Deletion)1001 TEST_F(CompactionIteratorWithSnapshotCheckerTest, CompactionFilter_Deletion) {
1002   std::unique_ptr<CompactionFilter> compaction_filter(
1003       new FilterAllKeysCompactionFilter());
1004   RunTest(
1005       {test::KeyStr("a", 2, kTypeDeletion), test::KeyStr("a", 1, kTypeValue)},
1006       {"", "v1"},
1007       {test::KeyStr("a", 2, kTypeDeletion),
1008        test::KeyStr("a", 1, kTypeDeletion)},
1009       {"", ""}, 1 /*last_committed_seq*/, nullptr /*merge_operator*/,
1010       compaction_filter.get());
1011 }
1012 
TEST_F(CompactionIteratorWithSnapshotCheckerTest,CompactionFilter_PartialMerge)1013 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
1014        CompactionFilter_PartialMerge) {
1015   std::shared_ptr<MergeOperator> merge_op =
1016       MergeOperators::CreateStringAppendOperator();
1017   std::unique_ptr<CompactionFilter> compaction_filter(
1018       new FilterAllKeysCompactionFilter());
1019   RunTest({test::KeyStr("a", 3, kTypeMerge), test::KeyStr("a", 2, kTypeMerge),
1020            test::KeyStr("a", 1, kTypeMerge)},
1021           {"v3", "v2", "v1"}, {test::KeyStr("a", 3, kTypeMerge)}, {"v3"},
1022           2 /*last_committed_seq*/, merge_op.get(), compaction_filter.get());
1023 }
1024 
TEST_F(CompactionIteratorWithSnapshotCheckerTest,CompactionFilter_FullMerge)1025 TEST_F(CompactionIteratorWithSnapshotCheckerTest, CompactionFilter_FullMerge) {
1026   std::shared_ptr<MergeOperator> merge_op =
1027       MergeOperators::CreateStringAppendOperator();
1028   std::unique_ptr<CompactionFilter> compaction_filter(
1029       new FilterAllKeysCompactionFilter());
1030   RunTest(
1031       {test::KeyStr("a", 3, kTypeMerge), test::KeyStr("a", 2, kTypeMerge),
1032        test::KeyStr("a", 1, kTypeValue)},
1033       {"v3", "v2", "v1"},
1034       {test::KeyStr("a", 3, kTypeMerge), test::KeyStr("a", 1, kTypeDeletion)},
1035       {"v3", ""}, 2 /*last_committed_seq*/, merge_op.get(),
1036       compaction_filter.get());
1037 }
1038 
1039 // Tests how CompactionIterator work together with AllowIngestBehind.
1040 class CompactionIteratorWithAllowIngestBehindTest
1041     : public CompactionIteratorTest {
1042  public:
AllowIngestBehind() const1043   bool AllowIngestBehind() const override { return true; }
1044 };
1045 
1046 // When allow_ingest_behind is set, compaction iterator is not targeting
1047 // the bottommost level since there is no guarantee there won't be further
1048 // data ingested under the compaction output in future.
TEST_P(CompactionIteratorWithAllowIngestBehindTest,NoConvertToPutAtBottom)1049 TEST_P(CompactionIteratorWithAllowIngestBehindTest, NoConvertToPutAtBottom) {
1050   std::shared_ptr<MergeOperator> merge_op =
1051       MergeOperators::CreateStringAppendOperator();
1052   RunTest({test::KeyStr("a", 4, kTypeMerge), test::KeyStr("a", 3, kTypeMerge),
1053            test::KeyStr("a", 2, kTypeMerge), test::KeyStr("b", 1, kTypeValue)},
1054           {"a4", "a3", "a2", "b1"},
1055           {test::KeyStr("a", 4, kTypeMerge), test::KeyStr("b", 1, kTypeValue)},
1056           {"a2,a3,a4", "b1"}, kMaxSequenceNumber /*last_committed_seq*/,
1057           merge_op.get(), nullptr /*compaction_filter*/,
1058           true /*bottomost_level*/);
1059 }
1060 
TEST_P(CompactionIteratorWithAllowIngestBehindTest,MergeToPutIfEncounteredPutAtBottom)1061 TEST_P(CompactionIteratorWithAllowIngestBehindTest,
1062        MergeToPutIfEncounteredPutAtBottom) {
1063   std::shared_ptr<MergeOperator> merge_op =
1064       MergeOperators::CreateStringAppendOperator();
1065   RunTest({test::KeyStr("a", 4, kTypeMerge), test::KeyStr("a", 3, kTypeMerge),
1066            test::KeyStr("a", 2, kTypeValue), test::KeyStr("b", 1, kTypeValue)},
1067           {"a4", "a3", "a2", "b1"},
1068           {test::KeyStr("a", 4, kTypeValue), test::KeyStr("b", 1, kTypeValue)},
1069           {"a2,a3,a4", "b1"}, kMaxSequenceNumber /*last_committed_seq*/,
1070           merge_op.get(), nullptr /*compaction_filter*/,
1071           true /*bottomost_level*/);
1072 }
1073 
1074 INSTANTIATE_TEST_CASE_P(CompactionIteratorWithAllowIngestBehindTestInstance,
1075                         CompactionIteratorWithAllowIngestBehindTest,
1076                         testing::Values(true, false));
1077 
1078 class CompactionIteratorTsGcTest : public CompactionIteratorTest {
1079  public:
CompactionIteratorTsGcTest()1080   CompactionIteratorTsGcTest()
1081       : CompactionIteratorTest(test::ComparatorWithU64Ts()) {}
1082 };
1083 
TEST_P(CompactionIteratorTsGcTest,NoKeyEligibleForGC)1084 TEST_P(CompactionIteratorTsGcTest, NoKeyEligibleForGC) {
1085   constexpr char user_key[][2] = {{'a', '\0'}, {'b', '\0'}};
1086   const std::vector<std::string> input_keys = {
1087       test::KeyStr(/*ts=*/103, user_key[0], /*seq=*/4, kTypeValue),
1088       test::KeyStr(/*ts=*/102, user_key[0], /*seq=*/3,
1089                    kTypeDeletionWithTimestamp),
1090       test::KeyStr(/*ts=*/104, user_key[1], /*seq=*/5, kTypeValue)};
1091   const std::vector<std::string> input_values = {"a3", "", "b2"};
1092   std::string full_history_ts_low;
1093   // All keys' timestamps are newer than or equal to 102, thus none of them
1094   // will be eligible for GC.
1095   PutFixed64(&full_history_ts_low, 102);
1096   const std::vector<std::string>& expected_keys = input_keys;
1097   const std::vector<std::string>& expected_values = input_values;
1098   const std::vector<std::pair<bool, bool>> params = {
1099       {false, false}, {false, true}, {true, true}};
1100   for (const std::pair<bool, bool>& param : params) {
1101     const bool bottommost_level = param.first;
1102     const bool key_not_exists_beyond_output_level = param.second;
1103     RunTest(input_keys, input_values, expected_keys, expected_values,
1104             /*last_committed_seq=*/kMaxSequenceNumber,
1105             /*merge_operator=*/nullptr, /*compaction_filter=*/nullptr,
1106             bottommost_level,
1107             /*earliest_write_conflict_snapshot=*/kMaxSequenceNumber,
1108             key_not_exists_beyond_output_level, &full_history_ts_low);
1109   }
1110 }
1111 
TEST_P(CompactionIteratorTsGcTest,AllKeysOlderThanThreshold)1112 TEST_P(CompactionIteratorTsGcTest, AllKeysOlderThanThreshold) {
1113   constexpr char user_key[][2] = {{'a', '\0'}, {'b', '\0'}};
1114   const std::vector<std::string> input_keys = {
1115       test::KeyStr(/*ts=*/103, user_key[0], /*seq=*/4,
1116                    kTypeDeletionWithTimestamp),
1117       test::KeyStr(/*ts=*/102, user_key[0], /*seq=*/3, kTypeValue),
1118       test::KeyStr(/*ts=*/101, user_key[0], /*seq=*/2, kTypeValue),
1119       test::KeyStr(/*ts=*/104, user_key[1], /*seq=*/5, kTypeValue)};
1120   const std::vector<std::string> input_values = {"", "a2", "a1", "b5"};
1121   std::string full_history_ts_low;
1122   PutFixed64(&full_history_ts_low, std::numeric_limits<uint64_t>::max());
1123   {
1124     // With a snapshot at seq 3, both the deletion marker and the key at 3 must
1125     // be preserved.
1126     AddSnapshot(3);
1127     const std::vector<std::string> expected_keys = {
1128         input_keys[0], input_keys[1], input_keys[3]};
1129     const std::vector<std::string> expected_values = {"", "a2", "b5"};
1130     RunTest(input_keys, input_values, expected_keys, expected_values,
1131             /*last_committed_seq=*/kMaxSequenceNumber,
1132             /*merge_operator=*/nullptr, /*compaction_filter=*/nullptr,
1133             /*bottommost_level=*/false,
1134             /*earliest_write_conflict_snapshot=*/kMaxSequenceNumber,
1135             /*key_not_exists_beyond_output_level=*/false, &full_history_ts_low);
1136     ClearSnapshots();
1137   }
1138   {
1139     // No snapshot, the deletion marker should be preserved because the user
1140     // key may appear beyond output level.
1141     const std::vector<std::string> expected_keys = {input_keys[0],
1142                                                     input_keys[3]};
1143     const std::vector<std::string> expected_values = {"", "b5"};
1144     RunTest(input_keys, input_values, expected_keys, expected_values,
1145             /*last_committed_seq=*/kMaxSequenceNumber,
1146             /*merge_operator=*/nullptr, /*compaction_filter=*/nullptr,
1147             /*bottommost_level=*/false,
1148             /*earliest_write_conflict_snapshot=*/kMaxSequenceNumber,
1149             /*key_not_exists_beyond_output_level=*/false, &full_history_ts_low);
1150   }
1151   {
1152     // No snapshot, the deletion marker can be dropped because the user key
1153     // does not appear in higher levels.
1154     const std::vector<std::string> expected_keys = {input_keys[3]};
1155     const std::vector<std::string> expected_values = {"b5"};
1156     RunTest(input_keys, input_values, expected_keys, expected_values,
1157             /*last_committed_seq=*/kMaxSequenceNumber,
1158             /*merge_operator=*/nullptr, /*compaction_filter=*/nullptr,
1159             /*bottommost_level=*/false,
1160             /*earliest_write_conflict_snapshot=*/kMaxSequenceNumber,
1161             /*key_not_exists_beyond_output_level=*/true, &full_history_ts_low);
1162   }
1163 }
1164 
TEST_P(CompactionIteratorTsGcTest,NewHidesOldSameSnapshot)1165 TEST_P(CompactionIteratorTsGcTest, NewHidesOldSameSnapshot) {
1166   constexpr char user_key[] = "a";
1167   const std::vector<std::string> input_keys = {
1168       test::KeyStr(/*ts=*/103, user_key, /*seq=*/4, kTypeDeletionWithTimestamp),
1169       test::KeyStr(/*ts=*/102, user_key, /*seq=*/3, kTypeValue),
1170       test::KeyStr(/*ts=*/101, user_key, /*seq=*/2, kTypeValue),
1171       test::KeyStr(/*ts=*/100, user_key, /*seq=*/1, kTypeValue)};
1172   const std::vector<std::string> input_values = {"", "a2", "a1", "a0"};
1173   {
1174     std::string full_history_ts_low;
1175     // Keys whose timestamps larger than or equal to 102 will be preserved.
1176     PutFixed64(&full_history_ts_low, 102);
1177     const std::vector<std::string> expected_keys = {input_keys[0],
1178                                                     input_keys[1]};
1179     const std::vector<std::string> expected_values = {"", "a2"};
1180     RunTest(input_keys, input_values, expected_keys, expected_values,
1181             /*last_committed_seq=*/kMaxSequenceNumber,
1182             /*merge_operator=*/nullptr, /*compaction_filter=*/nullptr,
1183             /*bottommost_level=*/false,
1184             /*earliest_write_conflict_snapshot=*/kMaxSequenceNumber,
1185             /*key_not_exists_beyond_output_level=*/false, &full_history_ts_low);
1186   }
1187 }
1188 
TEST_P(CompactionIteratorTsGcTest,DropTombstones)1189 TEST_P(CompactionIteratorTsGcTest, DropTombstones) {
1190   constexpr char user_key[] = "a";
1191   const std::vector<std::string> input_keys = {
1192       test::KeyStr(/*ts=*/103, user_key, /*seq=*/4, kTypeDeletionWithTimestamp),
1193       test::KeyStr(/*ts=*/102, user_key, /*seq=*/3, kTypeValue),
1194       test::KeyStr(/*ts=*/101, user_key, /*seq=*/2, kTypeDeletionWithTimestamp),
1195       test::KeyStr(/*ts=*/100, user_key, /*seq=*/1, kTypeValue)};
1196   const std::vector<std::string> input_values = {"", "a2", "", "a0"};
1197   const std::vector<std::string> expected_keys = {input_keys[0], input_keys[1]};
1198   const std::vector<std::string> expected_values = {"", "a2"};
1199 
1200   // Take a snapshot at seq 2.
1201   AddSnapshot(2);
1202 
1203   {
1204     // Non-bottommost level, but key does not exist beyond output level.
1205     std::string full_history_ts_low;
1206     PutFixed64(&full_history_ts_low, 102);
1207     RunTest(input_keys, input_values, expected_keys, expected_values,
1208             /*last_committed_sequence=*/kMaxSequenceNumber,
1209             /*merge_op=*/nullptr, /*compaction_filter=*/nullptr,
1210             /*bottommost_level=*/false,
1211             /*earliest_write_conflict_snapshot=*/kMaxSequenceNumber,
1212             /*key_not_exists_beyond_output_level=*/true, &full_history_ts_low);
1213   }
1214   {
1215     // Bottommost level
1216     std::string full_history_ts_low;
1217     PutFixed64(&full_history_ts_low, 102);
1218     RunTest(input_keys, input_values, expected_keys, expected_values,
1219             /*last_committed_seq=*/kMaxSequenceNumber,
1220             /*merge_operator=*/nullptr, /*compaction_filter=*/nullptr,
1221             /*bottommost_level=*/true,
1222             /*earliest_write_conflict_snapshot=*/kMaxSequenceNumber,
1223             /*key_not_exists_beyond_output_level=*/false, &full_history_ts_low);
1224   }
1225 }
1226 
TEST_P(CompactionIteratorTsGcTest,RewriteTs)1227 TEST_P(CompactionIteratorTsGcTest, RewriteTs) {
1228   constexpr char user_key[] = "a";
1229   const std::vector<std::string> input_keys = {
1230       test::KeyStr(/*ts=*/103, user_key, /*seq=*/4, kTypeDeletionWithTimestamp),
1231       test::KeyStr(/*ts=*/102, user_key, /*seq=*/3, kTypeValue),
1232       test::KeyStr(/*ts=*/101, user_key, /*seq=*/2, kTypeDeletionWithTimestamp),
1233       test::KeyStr(/*ts=*/100, user_key, /*seq=*/1, kTypeValue)};
1234   const std::vector<std::string> input_values = {"", "a2", "", "a0"};
1235   const std::vector<std::string> expected_keys = {
1236       input_keys[0], input_keys[1], input_keys[2],
1237       test::KeyStr(/*ts=*/0, user_key, /*seq=*/0, kTypeValue)};
1238   const std::vector<std::string> expected_values = {"", "a2", "", "a0"};
1239 
1240   AddSnapshot(1);
1241   AddSnapshot(2);
1242 
1243   {
1244     // Bottommost level and need to rewrite both ts and seq.
1245     std::string full_history_ts_low;
1246     PutFixed64(&full_history_ts_low, 102);
1247     RunTest(input_keys, input_values, expected_keys, expected_values,
1248             /*last_committed_seq=*/kMaxSequenceNumber,
1249             /*merge_operator=*/nullptr, /*compaction_filter=*/nullptr,
1250             /*bottommost_level=*/true,
1251             /*earliest_write_conflict_snapshot=*/kMaxSequenceNumber,
1252             /*key_not_exists_beyond_output_level=*/true, &full_history_ts_low);
1253   }
1254 }
1255 
1256 INSTANTIATE_TEST_CASE_P(CompactionIteratorTsGcTestInstance,
1257                         CompactionIteratorTsGcTest,
1258                         testing::Values(true, false));
1259 
1260 }  // namespace ROCKSDB_NAMESPACE
1261 
main(int argc,char ** argv)1262 int main(int argc, char** argv) {
1263   ::testing::InitGoogleTest(&argc, argv);
1264   return RUN_ALL_TESTS();
1265 }
1266