1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 #include <string>
6 #include <vector>
7 
8 #include "db/db_test_util.h"
9 #include "db/forward_iterator.h"
10 #include "port/stack_trace.h"
11 #include "rocksdb/merge_operator.h"
12 #include "util/random.h"
13 #include "utilities/merge_operators.h"
14 #include "utilities/merge_operators/string_append/stringappend2.h"
15 
16 namespace ROCKSDB_NAMESPACE {
17 
18 class TestReadCallback : public ReadCallback {
19  public:
TestReadCallback(SnapshotChecker * snapshot_checker,SequenceNumber snapshot_seq)20   TestReadCallback(SnapshotChecker* snapshot_checker,
21                    SequenceNumber snapshot_seq)
22       : ReadCallback(snapshot_seq),
23         snapshot_checker_(snapshot_checker),
24         snapshot_seq_(snapshot_seq) {}
25 
IsVisibleFullCheck(SequenceNumber seq)26   bool IsVisibleFullCheck(SequenceNumber seq) override {
27     return snapshot_checker_->CheckInSnapshot(seq, snapshot_seq_) ==
28            SnapshotCheckerResult::kInSnapshot;
29   }
30 
31  private:
32   SnapshotChecker* snapshot_checker_;
33   SequenceNumber snapshot_seq_;
34 };
35 
36 // Test merge operator functionality.
37 class DBMergeOperatorTest : public DBTestBase {
38  public:
DBMergeOperatorTest()39   DBMergeOperatorTest()
40       : DBTestBase("db_merge_operator_test", /*env_do_fsync=*/false) {}
41 
GetWithReadCallback(SnapshotChecker * snapshot_checker,const Slice & key,const Snapshot * snapshot=nullptr)42   std::string GetWithReadCallback(SnapshotChecker* snapshot_checker,
43                                   const Slice& key,
44                                   const Snapshot* snapshot = nullptr) {
45     SequenceNumber seq = snapshot == nullptr ? db_->GetLatestSequenceNumber()
46                                              : snapshot->GetSequenceNumber();
47     TestReadCallback read_callback(snapshot_checker, seq);
48     ReadOptions read_opt;
49     read_opt.snapshot = snapshot;
50     PinnableSlice value;
51     DBImpl::GetImplOptions get_impl_options;
52     get_impl_options.column_family = db_->DefaultColumnFamily();
53     get_impl_options.value = &value;
54     get_impl_options.callback = &read_callback;
55     Status s = dbfull()->GetImpl(read_opt, key, get_impl_options);
56     if (!s.ok()) {
57       return s.ToString();
58     }
59     return value.ToString();
60   }
61 };
62 
TEST_F(DBMergeOperatorTest,LimitMergeOperands)63 TEST_F(DBMergeOperatorTest, LimitMergeOperands) {
64   class LimitedStringAppendMergeOp : public StringAppendTESTOperator {
65    public:
66     LimitedStringAppendMergeOp(int limit, char delim)
67         : StringAppendTESTOperator(delim), limit_(limit) {}
68 
69     const char* Name() const override {
70       return "DBMergeOperatorTest::LimitedStringAppendMergeOp";
71     }
72 
73     bool ShouldMerge(const std::vector<Slice>& operands) const override {
74       if (operands.size() > 0 && limit_ > 0 && operands.size() >= limit_) {
75         return true;
76       }
77       return false;
78     }
79 
80    private:
81     size_t limit_ = 0;
82   };
83 
84   Options options;
85   options.create_if_missing = true;
86   // Use only the latest two merge operands.
87   options.merge_operator =
88       std::make_shared<LimitedStringAppendMergeOp>(2, ',');
89   options.env = env_;
90   Reopen(options);
91   // All K1 values are in memtable.
92   ASSERT_OK(Merge("k1", "a"));
93   ASSERT_OK(Merge("k1", "b"));
94   ASSERT_OK(Merge("k1", "c"));
95   ASSERT_OK(Merge("k1", "d"));
96   std::string value;
97   ASSERT_OK(db_->Get(ReadOptions(), "k1", &value));
98   // Make sure that only the latest two merge operands are used. If this was
99   // not the case the value would be "a,b,c,d".
100   ASSERT_EQ(value, "c,d");
101 
102   // All K2 values are flushed to L0 into a single file.
103   ASSERT_OK(Merge("k2", "a"));
104   ASSERT_OK(Merge("k2", "b"));
105   ASSERT_OK(Merge("k2", "c"));
106   ASSERT_OK(Merge("k2", "d"));
107   ASSERT_OK(Flush());
108   ASSERT_OK(db_->Get(ReadOptions(), "k2", &value));
109   ASSERT_EQ(value, "c,d");
110 
111   // All K3 values are flushed and are in different files.
112   ASSERT_OK(Merge("k3", "ab"));
113   ASSERT_OK(Flush());
114   ASSERT_OK(Merge("k3", "bc"));
115   ASSERT_OK(Flush());
116   ASSERT_OK(Merge("k3", "cd"));
117   ASSERT_OK(Flush());
118   ASSERT_OK(Merge("k3", "de"));
119   ASSERT_OK(db_->Get(ReadOptions(), "k3", &value));
120   ASSERT_EQ(value, "cd,de");
121 
122   // All K4 values are in different levels
123   ASSERT_OK(Merge("k4", "ab"));
124   ASSERT_OK(Flush());
125   MoveFilesToLevel(4);
126   ASSERT_OK(Merge("k4", "bc"));
127   ASSERT_OK(Flush());
128   MoveFilesToLevel(3);
129   ASSERT_OK(Merge("k4", "cd"));
130   ASSERT_OK(Flush());
131   MoveFilesToLevel(1);
132   ASSERT_OK(Merge("k4", "de"));
133   ASSERT_OK(db_->Get(ReadOptions(), "k4", &value));
134   ASSERT_EQ(value, "cd,de");
135 }
136 
TEST_F(DBMergeOperatorTest,MergeErrorOnRead)137 TEST_F(DBMergeOperatorTest, MergeErrorOnRead) {
138   Options options;
139   options.create_if_missing = true;
140   options.merge_operator.reset(new TestPutOperator());
141   options.env = env_;
142   Reopen(options);
143   ASSERT_OK(Merge("k1", "v1"));
144   ASSERT_OK(Merge("k1", "corrupted"));
145   std::string value;
146   ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).IsCorruption());
147   VerifyDBInternal({{"k1", "corrupted"}, {"k1", "v1"}});
148 }
149 
TEST_F(DBMergeOperatorTest,MergeErrorOnWrite)150 TEST_F(DBMergeOperatorTest, MergeErrorOnWrite) {
151   Options options;
152   options.create_if_missing = true;
153   options.merge_operator.reset(new TestPutOperator());
154   options.max_successive_merges = 3;
155   options.env = env_;
156   Reopen(options);
157   ASSERT_OK(Merge("k1", "v1"));
158   ASSERT_OK(Merge("k1", "v2"));
159   // Will trigger a merge when hitting max_successive_merges and the merge
160   // will fail. The delta will be inserted nevertheless.
161   ASSERT_OK(Merge("k1", "corrupted"));
162   // Data should stay unmerged after the error.
163   VerifyDBInternal({{"k1", "corrupted"}, {"k1", "v2"}, {"k1", "v1"}});
164 }
165 
TEST_F(DBMergeOperatorTest,MergeErrorOnIteration)166 TEST_F(DBMergeOperatorTest, MergeErrorOnIteration) {
167   Options options;
168   options.create_if_missing = true;
169   options.merge_operator.reset(new TestPutOperator());
170   options.env = env_;
171 
172   DestroyAndReopen(options);
173   ASSERT_OK(Merge("k1", "v1"));
174   ASSERT_OK(Merge("k1", "corrupted"));
175   ASSERT_OK(Put("k2", "v2"));
176   auto* iter = db_->NewIterator(ReadOptions());
177   iter->Seek("k1");
178   ASSERT_FALSE(iter->Valid());
179   ASSERT_TRUE(iter->status().IsCorruption());
180   delete iter;
181   iter = db_->NewIterator(ReadOptions());
182   iter->Seek("k2");
183   ASSERT_TRUE(iter->Valid());
184   ASSERT_OK(iter->status());
185   iter->Prev();
186   ASSERT_FALSE(iter->Valid());
187   ASSERT_TRUE(iter->status().IsCorruption());
188   delete iter;
189   VerifyDBInternal({{"k1", "corrupted"}, {"k1", "v1"}, {"k2", "v2"}});
190 
191   DestroyAndReopen(options);
192   ASSERT_OK(Merge("k1", "v1"));
193   ASSERT_OK(Put("k2", "v2"));
194   ASSERT_OK(Merge("k2", "corrupted"));
195   iter = db_->NewIterator(ReadOptions());
196   iter->Seek("k1");
197   ASSERT_TRUE(iter->Valid());
198   ASSERT_OK(iter->status());
199   iter->Next();
200   ASSERT_FALSE(iter->Valid());
201   ASSERT_TRUE(iter->status().IsCorruption());
202   delete iter;
203   VerifyDBInternal({{"k1", "v1"}, {"k2", "corrupted"}, {"k2", "v2"}});
204 }
205 
206 
207 class MergeOperatorPinningTest : public DBMergeOperatorTest,
208                                  public testing::WithParamInterface<bool> {
209  public:
MergeOperatorPinningTest()210   MergeOperatorPinningTest() { disable_block_cache_ = GetParam(); }
211 
212   bool disable_block_cache_;
213 };
214 
215 INSTANTIATE_TEST_CASE_P(MergeOperatorPinningTest, MergeOperatorPinningTest,
216                         ::testing::Bool());
217 
218 #ifndef ROCKSDB_LITE
TEST_P(MergeOperatorPinningTest,OperandsMultiBlocks)219 TEST_P(MergeOperatorPinningTest, OperandsMultiBlocks) {
220   Options options = CurrentOptions();
221   BlockBasedTableOptions table_options;
222   table_options.block_size = 1;  // every block will contain one entry
223   table_options.no_block_cache = disable_block_cache_;
224   options.table_factory.reset(NewBlockBasedTableFactory(table_options));
225   options.merge_operator = MergeOperators::CreateStringAppendTESTOperator();
226   options.level0_slowdown_writes_trigger = (1 << 30);
227   options.level0_stop_writes_trigger = (1 << 30);
228   options.disable_auto_compactions = true;
229   DestroyAndReopen(options);
230 
231   const int kKeysPerFile = 10;
232   const int kOperandsPerKeyPerFile = 7;
233   const int kOperandSize = 100;
234   // Filse to write in L0 before compacting to lower level
235   const int kFilesPerLevel = 3;
236 
237   Random rnd(301);
238   std::map<std::string, std::string> true_data;
239   int batch_num = 1;
240   int lvl_to_fill = 4;
241   int key_id = 0;
242   while (true) {
243     for (int j = 0; j < kKeysPerFile; j++) {
244       std::string key = Key(key_id % 35);
245       key_id++;
246       for (int k = 0; k < kOperandsPerKeyPerFile; k++) {
247         std::string val = rnd.RandomString(kOperandSize);
248         ASSERT_OK(db_->Merge(WriteOptions(), key, val));
249         if (true_data[key].size() == 0) {
250           true_data[key] = val;
251         } else {
252           true_data[key] += "," + val;
253         }
254       }
255     }
256 
257     if (lvl_to_fill == -1) {
258       // Keep last batch in memtable and stop
259       break;
260     }
261 
262     ASSERT_OK(Flush());
263     if (batch_num % kFilesPerLevel == 0) {
264       if (lvl_to_fill != 0) {
265         MoveFilesToLevel(lvl_to_fill);
266       }
267       lvl_to_fill--;
268     }
269     batch_num++;
270   }
271 
272   // 3 L0 files
273   // 1 L1 file
274   // 3 L2 files
275   // 1 L3 file
276   // 3 L4 Files
277   ASSERT_EQ(FilesPerLevel(), "3,1,3,1,3");
278 
279   VerifyDBFromMap(true_data);
280 }
281 
282 class MergeOperatorHook : public MergeOperator {
283  public:
MergeOperatorHook(std::shared_ptr<MergeOperator> _merge_op)284   explicit MergeOperatorHook(std::shared_ptr<MergeOperator> _merge_op)
285       : merge_op_(_merge_op) {}
286 
FullMergeV2(const MergeOperationInput & merge_in,MergeOperationOutput * merge_out) const287   bool FullMergeV2(const MergeOperationInput& merge_in,
288                    MergeOperationOutput* merge_out) const override {
289     before_merge_();
290     bool res = merge_op_->FullMergeV2(merge_in, merge_out);
291     after_merge_();
292     return res;
293   }
294 
Name() const295   const char* Name() const override { return merge_op_->Name(); }
296 
297   std::shared_ptr<MergeOperator> merge_op_;
__anonf2b492af0102() 298   std::function<void()> before_merge_ = []() {};
__anonf2b492af0202() 299   std::function<void()> after_merge_ = []() {};
300 };
301 
TEST_P(MergeOperatorPinningTest,EvictCacheBeforeMerge)302 TEST_P(MergeOperatorPinningTest, EvictCacheBeforeMerge) {
303   Options options = CurrentOptions();
304 
305   auto merge_hook =
306       std::make_shared<MergeOperatorHook>(MergeOperators::CreateMaxOperator());
307   options.merge_operator = merge_hook;
308   options.disable_auto_compactions = true;
309   options.level0_slowdown_writes_trigger = (1 << 30);
310   options.level0_stop_writes_trigger = (1 << 30);
311   options.max_open_files = 20;
312   BlockBasedTableOptions bbto;
313   bbto.no_block_cache = disable_block_cache_;
314   if (bbto.no_block_cache == false) {
315     bbto.block_cache = NewLRUCache(64 * 1024 * 1024);
316   } else {
317     bbto.block_cache = nullptr;
318   }
319   options.table_factory.reset(NewBlockBasedTableFactory(bbto));
320   DestroyAndReopen(options);
321 
322   const int kNumOperands = 30;
323   const int kNumKeys = 1000;
324   const int kOperandSize = 100;
325   Random rnd(301);
326 
327   // 1000 keys every key have 30 operands, every operand is in a different file
328   std::map<std::string, std::string> true_data;
329   for (int i = 0; i < kNumOperands; i++) {
330     for (int j = 0; j < kNumKeys; j++) {
331       std::string k = Key(j);
332       std::string v = rnd.RandomString(kOperandSize);
333       ASSERT_OK(db_->Merge(WriteOptions(), k, v));
334 
335       true_data[k] = std::max(true_data[k], v);
336     }
337     ASSERT_OK(Flush());
338   }
339 
340   std::vector<uint64_t> file_numbers = ListTableFiles(env_, dbname_);
341   ASSERT_EQ(file_numbers.size(), kNumOperands);
342   int merge_cnt = 0;
343 
344   // Code executed before merge operation
345   merge_hook->before_merge_ = [&]() {
346     // Evict all tables from cache before every merge operation
347     auto* table_cache = dbfull()->TEST_table_cache();
348     for (uint64_t num : file_numbers) {
349       TableCache::Evict(table_cache, num);
350     }
351     // Decrease cache capacity to force all unrefed blocks to be evicted
352     if (bbto.block_cache) {
353       bbto.block_cache->SetCapacity(1);
354     }
355     merge_cnt++;
356   };
357 
358   // Code executed after merge operation
359   merge_hook->after_merge_ = [&]() {
360     // Increase capacity again after doing the merge
361     if (bbto.block_cache) {
362       bbto.block_cache->SetCapacity(64 * 1024 * 1024);
363     }
364   };
365 
366   size_t total_reads;
367   VerifyDBFromMap(true_data, &total_reads);
368   ASSERT_EQ(merge_cnt, total_reads);
369 
370   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
371 
372   VerifyDBFromMap(true_data, &total_reads);
373 }
374 
TEST_P(MergeOperatorPinningTest,TailingIterator)375 TEST_P(MergeOperatorPinningTest, TailingIterator) {
376   Options options = CurrentOptions();
377   options.merge_operator = MergeOperators::CreateMaxOperator();
378   BlockBasedTableOptions bbto;
379   bbto.no_block_cache = disable_block_cache_;
380   options.table_factory.reset(NewBlockBasedTableFactory(bbto));
381   DestroyAndReopen(options);
382 
383   const int kNumOperands = 100;
384   const int kNumWrites = 100000;
385 
386   std::function<void()> writer_func = [&]() {
387     int k = 0;
388     for (int i = 0; i < kNumWrites; i++) {
389       ASSERT_OK(db_->Merge(WriteOptions(), Key(k), Key(k)));
390 
391       if (i && i % kNumOperands == 0) {
392         k++;
393       }
394       if (i && i % 127 == 0) {
395         ASSERT_OK(Flush());
396       }
397       if (i && i % 317 == 0) {
398         ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
399       }
400     }
401   };
402 
403   std::function<void()> reader_func = [&]() {
404     ReadOptions ro;
405     ro.tailing = true;
406     Iterator* iter = db_->NewIterator(ro);
407     ASSERT_OK(iter->status());
408     iter->SeekToFirst();
409     for (int i = 0; i < (kNumWrites / kNumOperands); i++) {
410       while (!iter->Valid()) {
411         // wait for the key to be written
412         env_->SleepForMicroseconds(100);
413         iter->Seek(Key(i));
414       }
415       ASSERT_EQ(iter->key(), Key(i));
416       ASSERT_EQ(iter->value(), Key(i));
417 
418       iter->Next();
419     }
420     ASSERT_OK(iter->status());
421 
422     delete iter;
423   };
424 
425   ROCKSDB_NAMESPACE::port::Thread writer_thread(writer_func);
426   ROCKSDB_NAMESPACE::port::Thread reader_thread(reader_func);
427 
428   writer_thread.join();
429   reader_thread.join();
430 }
431 
TEST_F(DBMergeOperatorTest,TailingIteratorMemtableUnrefedBySomeoneElse)432 TEST_F(DBMergeOperatorTest, TailingIteratorMemtableUnrefedBySomeoneElse) {
433   Options options = CurrentOptions();
434   options.merge_operator = MergeOperators::CreateStringAppendOperator();
435   DestroyAndReopen(options);
436 
437   // Overview of the test:
438   //  * There are two merge operands for the same key: one in an sst file,
439   //    another in a memtable.
440   //  * Seek a tailing iterator to this key.
441   //  * As part of the seek, the iterator will:
442   //      (a) first visit the operand in the memtable and tell ForwardIterator
443   //          to pin this operand, then
444   //      (b) move on to the operand in the sst file, then pass both operands
445   //          to merge operator.
446   //  * The memtable may get flushed and unreferenced by another thread between
447   //    (a) and (b). The test simulates it by flushing the memtable inside a
448   //    SyncPoint callback located between (a) and (b).
449   //  * In this case it's ForwardIterator's responsibility to keep the memtable
450   //    pinned until (b) is complete. There used to be a bug causing
451   //    ForwardIterator to not pin it in some circumstances. This test
452   //    reproduces it.
453 
454   ASSERT_OK(db_->Merge(WriteOptions(), "key", "sst"));
455   ASSERT_OK(db_->Flush(FlushOptions()));  // Switch to SuperVersion A
456   ASSERT_OK(db_->Merge(WriteOptions(), "key", "memtable"));
457 
458   // Pin SuperVersion A
459   std::unique_ptr<Iterator> someone_else(db_->NewIterator(ReadOptions()));
460   ASSERT_OK(someone_else->status());
461 
462   bool pushed_first_operand = false;
463   bool stepped_to_next_operand = false;
464   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
465       "DBIter::MergeValuesNewToOld:PushedFirstOperand", [&](void*) {
466         EXPECT_FALSE(pushed_first_operand);
467         pushed_first_operand = true;
468         EXPECT_OK(db_->Flush(FlushOptions()));  // Switch to SuperVersion B
469       });
470   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
471       "DBIter::MergeValuesNewToOld:SteppedToNextOperand", [&](void*) {
472         EXPECT_FALSE(stepped_to_next_operand);
473         stepped_to_next_operand = true;
474         someone_else.reset(); // Unpin SuperVersion A
475       });
476   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
477 
478   ReadOptions ro;
479   ro.tailing = true;
480   std::unique_ptr<Iterator> iter(db_->NewIterator(ro));
481   iter->Seek("key");
482 
483   ASSERT_OK(iter->status());
484   ASSERT_TRUE(iter->Valid());
485   EXPECT_EQ(std::string("sst,memtable"), iter->value().ToString());
486   EXPECT_TRUE(pushed_first_operand);
487   EXPECT_TRUE(stepped_to_next_operand);
488 }
489 #endif  // ROCKSDB_LITE
490 
TEST_F(DBMergeOperatorTest,SnapshotCheckerAndReadCallback)491 TEST_F(DBMergeOperatorTest, SnapshotCheckerAndReadCallback) {
492   Options options = CurrentOptions();
493   options.merge_operator = MergeOperators::CreateStringAppendOperator();
494   DestroyAndReopen(options);
495 
496   class TestSnapshotChecker : public SnapshotChecker {
497    public:
498     SnapshotCheckerResult CheckInSnapshot(
499         SequenceNumber seq, SequenceNumber snapshot_seq) const override {
500       return IsInSnapshot(seq, snapshot_seq)
501                  ? SnapshotCheckerResult::kInSnapshot
502                  : SnapshotCheckerResult::kNotInSnapshot;
503     }
504 
505     bool IsInSnapshot(SequenceNumber seq, SequenceNumber snapshot_seq) const {
506       switch (snapshot_seq) {
507         case 0:
508           return seq == 0;
509         case 1:
510           return seq <= 1;
511         case 2:
512           // seq = 2 not visible to snapshot with seq = 2
513           return seq <= 1;
514         case 3:
515           return seq <= 3;
516         case 4:
517           // seq = 4 not visible to snpahost with seq = 4
518           return seq <= 3;
519         default:
520           // seq >=4 is uncommitted
521           return seq <= 4;
522       };
523     }
524   };
525   TestSnapshotChecker* snapshot_checker = new TestSnapshotChecker();
526   dbfull()->SetSnapshotChecker(snapshot_checker);
527 
528   std::string value;
529   ASSERT_OK(Merge("foo", "v1"));
530   ASSERT_EQ(1, db_->GetLatestSequenceNumber());
531   ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo"));
532   ASSERT_OK(Merge("foo", "v2"));
533   ASSERT_EQ(2, db_->GetLatestSequenceNumber());
534   // v2 is not visible to latest snapshot, which has seq = 2.
535   ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo"));
536   // Take a snapshot with seq = 2.
537   const Snapshot* snapshot1 = db_->GetSnapshot();
538   ASSERT_EQ(2, snapshot1->GetSequenceNumber());
539   // v2 is not visible to snapshot1, which has seq = 2
540   ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo", snapshot1));
541 
542   // Verify flush doesn't alter the result.
543   ASSERT_OK(Flush());
544   ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo", snapshot1));
545   ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo"));
546 
547   ASSERT_OK(Merge("foo", "v3"));
548   ASSERT_EQ(3, db_->GetLatestSequenceNumber());
549   ASSERT_EQ("v1,v2,v3", GetWithReadCallback(snapshot_checker, "foo"));
550   ASSERT_OK(Merge("foo", "v4"));
551   ASSERT_EQ(4, db_->GetLatestSequenceNumber());
552   // v4 is not visible to latest snapshot, which has seq = 4.
553   ASSERT_EQ("v1,v2,v3", GetWithReadCallback(snapshot_checker, "foo"));
554   const Snapshot* snapshot2 = db_->GetSnapshot();
555   ASSERT_EQ(4, snapshot2->GetSequenceNumber());
556   // v4 is not visible to snapshot2, which has seq = 4.
557   ASSERT_EQ("v1,v2,v3",
558             GetWithReadCallback(snapshot_checker, "foo", snapshot2));
559 
560   // Verify flush doesn't alter the result.
561   ASSERT_OK(Flush());
562   ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo", snapshot1));
563   ASSERT_EQ("v1,v2,v3",
564             GetWithReadCallback(snapshot_checker, "foo", snapshot2));
565   ASSERT_EQ("v1,v2,v3", GetWithReadCallback(snapshot_checker, "foo"));
566 
567   ASSERT_OK(Merge("foo", "v5"));
568   ASSERT_EQ(5, db_->GetLatestSequenceNumber());
569   // v5 is uncommitted
570   ASSERT_EQ("v1,v2,v3,v4", GetWithReadCallback(snapshot_checker, "foo"));
571 
572   // full manual compaction.
573   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
574 
575   // Verify compaction doesn't alter the result.
576   ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo", snapshot1));
577   ASSERT_EQ("v1,v2,v3",
578             GetWithReadCallback(snapshot_checker, "foo", snapshot2));
579   ASSERT_EQ("v1,v2,v3,v4", GetWithReadCallback(snapshot_checker, "foo"));
580 
581   db_->ReleaseSnapshot(snapshot1);
582   db_->ReleaseSnapshot(snapshot2);
583 }
584 
585 class PerConfigMergeOperatorPinningTest
586     : public DBMergeOperatorTest,
587       public testing::WithParamInterface<std::tuple<bool, int>> {
588  public:
PerConfigMergeOperatorPinningTest()589   PerConfigMergeOperatorPinningTest() {
590     std::tie(disable_block_cache_, option_config_) = GetParam();
591   }
592 
593   bool disable_block_cache_;
594 };
595 
596 INSTANTIATE_TEST_CASE_P(
597     MergeOperatorPinningTest, PerConfigMergeOperatorPinningTest,
598     ::testing::Combine(::testing::Bool(),
599                        ::testing::Range(static_cast<int>(DBTestBase::kDefault),
600                                         static_cast<int>(DBTestBase::kEnd))));
601 
TEST_P(PerConfigMergeOperatorPinningTest,Randomized)602 TEST_P(PerConfigMergeOperatorPinningTest, Randomized) {
603   if (ShouldSkipOptions(option_config_, kSkipMergePut)) {
604     return;
605   }
606 
607   Options options = CurrentOptions();
608   options.merge_operator = MergeOperators::CreateMaxOperator();
609   BlockBasedTableOptions table_options;
610   table_options.no_block_cache = disable_block_cache_;
611   options.table_factory.reset(NewBlockBasedTableFactory(table_options));
612   DestroyAndReopen(options);
613 
614   Random rnd(301);
615   std::map<std::string, std::string> true_data;
616 
617   const int kTotalMerges = 5000;
618   // Every key gets ~10 operands
619   const int kKeyRange = kTotalMerges / 10;
620   const int kOperandSize = 20;
621   const int kNumPutBefore = kKeyRange / 10;  // 10% value
622   const int kNumPutAfter = kKeyRange / 10;   // 10% overwrite
623   const int kNumDelete = kKeyRange / 10;     // 10% delete
624 
625   // kNumPutBefore keys will have base values
626   for (int i = 0; i < kNumPutBefore; i++) {
627     std::string key = Key(rnd.Next() % kKeyRange);
628     std::string value = rnd.RandomString(kOperandSize);
629     ASSERT_OK(db_->Put(WriteOptions(), key, value));
630 
631     true_data[key] = value;
632   }
633 
634   // Do kTotalMerges merges
635   for (int i = 0; i < kTotalMerges; i++) {
636     std::string key = Key(rnd.Next() % kKeyRange);
637     std::string value = rnd.RandomString(kOperandSize);
638     ASSERT_OK(db_->Merge(WriteOptions(), key, value));
639 
640     if (true_data[key] < value) {
641       true_data[key] = value;
642     }
643   }
644 
645   // Overwrite random kNumPutAfter keys
646   for (int i = 0; i < kNumPutAfter; i++) {
647     std::string key = Key(rnd.Next() % kKeyRange);
648     std::string value = rnd.RandomString(kOperandSize);
649     ASSERT_OK(db_->Put(WriteOptions(), key, value));
650 
651     true_data[key] = value;
652   }
653 
654   // Delete random kNumDelete keys
655   for (int i = 0; i < kNumDelete; i++) {
656     std::string key = Key(rnd.Next() % kKeyRange);
657     ASSERT_OK(db_->Delete(WriteOptions(), key));
658 
659     true_data.erase(key);
660   }
661 
662   VerifyDBFromMap(true_data);
663 }
664 
665 }  // namespace ROCKSDB_NAMESPACE
666 
main(int argc,char ** argv)667 int main(int argc, char** argv) {
668   ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
669   ::testing::InitGoogleTest(&argc, argv);
670   return RUN_ALL_TESTS();
671 }
672