1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 #include <string>
6 #include <vector>
7
8 #include "db/db_test_util.h"
9 #include "db/forward_iterator.h"
10 #include "port/stack_trace.h"
11 #include "rocksdb/merge_operator.h"
12 #include "util/random.h"
13 #include "utilities/merge_operators.h"
14 #include "utilities/merge_operators/string_append/stringappend2.h"
15
16 namespace ROCKSDB_NAMESPACE {
17
18 class TestReadCallback : public ReadCallback {
19 public:
TestReadCallback(SnapshotChecker * snapshot_checker,SequenceNumber snapshot_seq)20 TestReadCallback(SnapshotChecker* snapshot_checker,
21 SequenceNumber snapshot_seq)
22 : ReadCallback(snapshot_seq),
23 snapshot_checker_(snapshot_checker),
24 snapshot_seq_(snapshot_seq) {}
25
IsVisibleFullCheck(SequenceNumber seq)26 bool IsVisibleFullCheck(SequenceNumber seq) override {
27 return snapshot_checker_->CheckInSnapshot(seq, snapshot_seq_) ==
28 SnapshotCheckerResult::kInSnapshot;
29 }
30
31 private:
32 SnapshotChecker* snapshot_checker_;
33 SequenceNumber snapshot_seq_;
34 };
35
36 // Test merge operator functionality.
37 class DBMergeOperatorTest : public DBTestBase {
38 public:
DBMergeOperatorTest()39 DBMergeOperatorTest()
40 : DBTestBase("db_merge_operator_test", /*env_do_fsync=*/false) {}
41
GetWithReadCallback(SnapshotChecker * snapshot_checker,const Slice & key,const Snapshot * snapshot=nullptr)42 std::string GetWithReadCallback(SnapshotChecker* snapshot_checker,
43 const Slice& key,
44 const Snapshot* snapshot = nullptr) {
45 SequenceNumber seq = snapshot == nullptr ? db_->GetLatestSequenceNumber()
46 : snapshot->GetSequenceNumber();
47 TestReadCallback read_callback(snapshot_checker, seq);
48 ReadOptions read_opt;
49 read_opt.snapshot = snapshot;
50 PinnableSlice value;
51 DBImpl::GetImplOptions get_impl_options;
52 get_impl_options.column_family = db_->DefaultColumnFamily();
53 get_impl_options.value = &value;
54 get_impl_options.callback = &read_callback;
55 Status s = dbfull()->GetImpl(read_opt, key, get_impl_options);
56 if (!s.ok()) {
57 return s.ToString();
58 }
59 return value.ToString();
60 }
61 };
62
TEST_F(DBMergeOperatorTest,LimitMergeOperands)63 TEST_F(DBMergeOperatorTest, LimitMergeOperands) {
64 class LimitedStringAppendMergeOp : public StringAppendTESTOperator {
65 public:
66 LimitedStringAppendMergeOp(int limit, char delim)
67 : StringAppendTESTOperator(delim), limit_(limit) {}
68
69 const char* Name() const override {
70 return "DBMergeOperatorTest::LimitedStringAppendMergeOp";
71 }
72
73 bool ShouldMerge(const std::vector<Slice>& operands) const override {
74 if (operands.size() > 0 && limit_ > 0 && operands.size() >= limit_) {
75 return true;
76 }
77 return false;
78 }
79
80 private:
81 size_t limit_ = 0;
82 };
83
84 Options options;
85 options.create_if_missing = true;
86 // Use only the latest two merge operands.
87 options.merge_operator =
88 std::make_shared<LimitedStringAppendMergeOp>(2, ',');
89 options.env = env_;
90 Reopen(options);
91 // All K1 values are in memtable.
92 ASSERT_OK(Merge("k1", "a"));
93 ASSERT_OK(Merge("k1", "b"));
94 ASSERT_OK(Merge("k1", "c"));
95 ASSERT_OK(Merge("k1", "d"));
96 std::string value;
97 ASSERT_OK(db_->Get(ReadOptions(), "k1", &value));
98 // Make sure that only the latest two merge operands are used. If this was
99 // not the case the value would be "a,b,c,d".
100 ASSERT_EQ(value, "c,d");
101
102 // All K2 values are flushed to L0 into a single file.
103 ASSERT_OK(Merge("k2", "a"));
104 ASSERT_OK(Merge("k2", "b"));
105 ASSERT_OK(Merge("k2", "c"));
106 ASSERT_OK(Merge("k2", "d"));
107 ASSERT_OK(Flush());
108 ASSERT_OK(db_->Get(ReadOptions(), "k2", &value));
109 ASSERT_EQ(value, "c,d");
110
111 // All K3 values are flushed and are in different files.
112 ASSERT_OK(Merge("k3", "ab"));
113 ASSERT_OK(Flush());
114 ASSERT_OK(Merge("k3", "bc"));
115 ASSERT_OK(Flush());
116 ASSERT_OK(Merge("k3", "cd"));
117 ASSERT_OK(Flush());
118 ASSERT_OK(Merge("k3", "de"));
119 ASSERT_OK(db_->Get(ReadOptions(), "k3", &value));
120 ASSERT_EQ(value, "cd,de");
121
122 // All K4 values are in different levels
123 ASSERT_OK(Merge("k4", "ab"));
124 ASSERT_OK(Flush());
125 MoveFilesToLevel(4);
126 ASSERT_OK(Merge("k4", "bc"));
127 ASSERT_OK(Flush());
128 MoveFilesToLevel(3);
129 ASSERT_OK(Merge("k4", "cd"));
130 ASSERT_OK(Flush());
131 MoveFilesToLevel(1);
132 ASSERT_OK(Merge("k4", "de"));
133 ASSERT_OK(db_->Get(ReadOptions(), "k4", &value));
134 ASSERT_EQ(value, "cd,de");
135 }
136
TEST_F(DBMergeOperatorTest,MergeErrorOnRead)137 TEST_F(DBMergeOperatorTest, MergeErrorOnRead) {
138 Options options;
139 options.create_if_missing = true;
140 options.merge_operator.reset(new TestPutOperator());
141 options.env = env_;
142 Reopen(options);
143 ASSERT_OK(Merge("k1", "v1"));
144 ASSERT_OK(Merge("k1", "corrupted"));
145 std::string value;
146 ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).IsCorruption());
147 VerifyDBInternal({{"k1", "corrupted"}, {"k1", "v1"}});
148 }
149
TEST_F(DBMergeOperatorTest,MergeErrorOnWrite)150 TEST_F(DBMergeOperatorTest, MergeErrorOnWrite) {
151 Options options;
152 options.create_if_missing = true;
153 options.merge_operator.reset(new TestPutOperator());
154 options.max_successive_merges = 3;
155 options.env = env_;
156 Reopen(options);
157 ASSERT_OK(Merge("k1", "v1"));
158 ASSERT_OK(Merge("k1", "v2"));
159 // Will trigger a merge when hitting max_successive_merges and the merge
160 // will fail. The delta will be inserted nevertheless.
161 ASSERT_OK(Merge("k1", "corrupted"));
162 // Data should stay unmerged after the error.
163 VerifyDBInternal({{"k1", "corrupted"}, {"k1", "v2"}, {"k1", "v1"}});
164 }
165
TEST_F(DBMergeOperatorTest,MergeErrorOnIteration)166 TEST_F(DBMergeOperatorTest, MergeErrorOnIteration) {
167 Options options;
168 options.create_if_missing = true;
169 options.merge_operator.reset(new TestPutOperator());
170 options.env = env_;
171
172 DestroyAndReopen(options);
173 ASSERT_OK(Merge("k1", "v1"));
174 ASSERT_OK(Merge("k1", "corrupted"));
175 ASSERT_OK(Put("k2", "v2"));
176 auto* iter = db_->NewIterator(ReadOptions());
177 iter->Seek("k1");
178 ASSERT_FALSE(iter->Valid());
179 ASSERT_TRUE(iter->status().IsCorruption());
180 delete iter;
181 iter = db_->NewIterator(ReadOptions());
182 iter->Seek("k2");
183 ASSERT_TRUE(iter->Valid());
184 ASSERT_OK(iter->status());
185 iter->Prev();
186 ASSERT_FALSE(iter->Valid());
187 ASSERT_TRUE(iter->status().IsCorruption());
188 delete iter;
189 VerifyDBInternal({{"k1", "corrupted"}, {"k1", "v1"}, {"k2", "v2"}});
190
191 DestroyAndReopen(options);
192 ASSERT_OK(Merge("k1", "v1"));
193 ASSERT_OK(Put("k2", "v2"));
194 ASSERT_OK(Merge("k2", "corrupted"));
195 iter = db_->NewIterator(ReadOptions());
196 iter->Seek("k1");
197 ASSERT_TRUE(iter->Valid());
198 ASSERT_OK(iter->status());
199 iter->Next();
200 ASSERT_FALSE(iter->Valid());
201 ASSERT_TRUE(iter->status().IsCorruption());
202 delete iter;
203 VerifyDBInternal({{"k1", "v1"}, {"k2", "corrupted"}, {"k2", "v2"}});
204 }
205
206
207 class MergeOperatorPinningTest : public DBMergeOperatorTest,
208 public testing::WithParamInterface<bool> {
209 public:
MergeOperatorPinningTest()210 MergeOperatorPinningTest() { disable_block_cache_ = GetParam(); }
211
212 bool disable_block_cache_;
213 };
214
215 INSTANTIATE_TEST_CASE_P(MergeOperatorPinningTest, MergeOperatorPinningTest,
216 ::testing::Bool());
217
218 #ifndef ROCKSDB_LITE
TEST_P(MergeOperatorPinningTest,OperandsMultiBlocks)219 TEST_P(MergeOperatorPinningTest, OperandsMultiBlocks) {
220 Options options = CurrentOptions();
221 BlockBasedTableOptions table_options;
222 table_options.block_size = 1; // every block will contain one entry
223 table_options.no_block_cache = disable_block_cache_;
224 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
225 options.merge_operator = MergeOperators::CreateStringAppendTESTOperator();
226 options.level0_slowdown_writes_trigger = (1 << 30);
227 options.level0_stop_writes_trigger = (1 << 30);
228 options.disable_auto_compactions = true;
229 DestroyAndReopen(options);
230
231 const int kKeysPerFile = 10;
232 const int kOperandsPerKeyPerFile = 7;
233 const int kOperandSize = 100;
234 // Filse to write in L0 before compacting to lower level
235 const int kFilesPerLevel = 3;
236
237 Random rnd(301);
238 std::map<std::string, std::string> true_data;
239 int batch_num = 1;
240 int lvl_to_fill = 4;
241 int key_id = 0;
242 while (true) {
243 for (int j = 0; j < kKeysPerFile; j++) {
244 std::string key = Key(key_id % 35);
245 key_id++;
246 for (int k = 0; k < kOperandsPerKeyPerFile; k++) {
247 std::string val = rnd.RandomString(kOperandSize);
248 ASSERT_OK(db_->Merge(WriteOptions(), key, val));
249 if (true_data[key].size() == 0) {
250 true_data[key] = val;
251 } else {
252 true_data[key] += "," + val;
253 }
254 }
255 }
256
257 if (lvl_to_fill == -1) {
258 // Keep last batch in memtable and stop
259 break;
260 }
261
262 ASSERT_OK(Flush());
263 if (batch_num % kFilesPerLevel == 0) {
264 if (lvl_to_fill != 0) {
265 MoveFilesToLevel(lvl_to_fill);
266 }
267 lvl_to_fill--;
268 }
269 batch_num++;
270 }
271
272 // 3 L0 files
273 // 1 L1 file
274 // 3 L2 files
275 // 1 L3 file
276 // 3 L4 Files
277 ASSERT_EQ(FilesPerLevel(), "3,1,3,1,3");
278
279 VerifyDBFromMap(true_data);
280 }
281
282 class MergeOperatorHook : public MergeOperator {
283 public:
MergeOperatorHook(std::shared_ptr<MergeOperator> _merge_op)284 explicit MergeOperatorHook(std::shared_ptr<MergeOperator> _merge_op)
285 : merge_op_(_merge_op) {}
286
FullMergeV2(const MergeOperationInput & merge_in,MergeOperationOutput * merge_out) const287 bool FullMergeV2(const MergeOperationInput& merge_in,
288 MergeOperationOutput* merge_out) const override {
289 before_merge_();
290 bool res = merge_op_->FullMergeV2(merge_in, merge_out);
291 after_merge_();
292 return res;
293 }
294
Name() const295 const char* Name() const override { return merge_op_->Name(); }
296
297 std::shared_ptr<MergeOperator> merge_op_;
__anonf2b492af0102() 298 std::function<void()> before_merge_ = []() {};
__anonf2b492af0202() 299 std::function<void()> after_merge_ = []() {};
300 };
301
TEST_P(MergeOperatorPinningTest,EvictCacheBeforeMerge)302 TEST_P(MergeOperatorPinningTest, EvictCacheBeforeMerge) {
303 Options options = CurrentOptions();
304
305 auto merge_hook =
306 std::make_shared<MergeOperatorHook>(MergeOperators::CreateMaxOperator());
307 options.merge_operator = merge_hook;
308 options.disable_auto_compactions = true;
309 options.level0_slowdown_writes_trigger = (1 << 30);
310 options.level0_stop_writes_trigger = (1 << 30);
311 options.max_open_files = 20;
312 BlockBasedTableOptions bbto;
313 bbto.no_block_cache = disable_block_cache_;
314 if (bbto.no_block_cache == false) {
315 bbto.block_cache = NewLRUCache(64 * 1024 * 1024);
316 } else {
317 bbto.block_cache = nullptr;
318 }
319 options.table_factory.reset(NewBlockBasedTableFactory(bbto));
320 DestroyAndReopen(options);
321
322 const int kNumOperands = 30;
323 const int kNumKeys = 1000;
324 const int kOperandSize = 100;
325 Random rnd(301);
326
327 // 1000 keys every key have 30 operands, every operand is in a different file
328 std::map<std::string, std::string> true_data;
329 for (int i = 0; i < kNumOperands; i++) {
330 for (int j = 0; j < kNumKeys; j++) {
331 std::string k = Key(j);
332 std::string v = rnd.RandomString(kOperandSize);
333 ASSERT_OK(db_->Merge(WriteOptions(), k, v));
334
335 true_data[k] = std::max(true_data[k], v);
336 }
337 ASSERT_OK(Flush());
338 }
339
340 std::vector<uint64_t> file_numbers = ListTableFiles(env_, dbname_);
341 ASSERT_EQ(file_numbers.size(), kNumOperands);
342 int merge_cnt = 0;
343
344 // Code executed before merge operation
345 merge_hook->before_merge_ = [&]() {
346 // Evict all tables from cache before every merge operation
347 auto* table_cache = dbfull()->TEST_table_cache();
348 for (uint64_t num : file_numbers) {
349 TableCache::Evict(table_cache, num);
350 }
351 // Decrease cache capacity to force all unrefed blocks to be evicted
352 if (bbto.block_cache) {
353 bbto.block_cache->SetCapacity(1);
354 }
355 merge_cnt++;
356 };
357
358 // Code executed after merge operation
359 merge_hook->after_merge_ = [&]() {
360 // Increase capacity again after doing the merge
361 if (bbto.block_cache) {
362 bbto.block_cache->SetCapacity(64 * 1024 * 1024);
363 }
364 };
365
366 size_t total_reads;
367 VerifyDBFromMap(true_data, &total_reads);
368 ASSERT_EQ(merge_cnt, total_reads);
369
370 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
371
372 VerifyDBFromMap(true_data, &total_reads);
373 }
374
TEST_P(MergeOperatorPinningTest,TailingIterator)375 TEST_P(MergeOperatorPinningTest, TailingIterator) {
376 Options options = CurrentOptions();
377 options.merge_operator = MergeOperators::CreateMaxOperator();
378 BlockBasedTableOptions bbto;
379 bbto.no_block_cache = disable_block_cache_;
380 options.table_factory.reset(NewBlockBasedTableFactory(bbto));
381 DestroyAndReopen(options);
382
383 const int kNumOperands = 100;
384 const int kNumWrites = 100000;
385
386 std::function<void()> writer_func = [&]() {
387 int k = 0;
388 for (int i = 0; i < kNumWrites; i++) {
389 ASSERT_OK(db_->Merge(WriteOptions(), Key(k), Key(k)));
390
391 if (i && i % kNumOperands == 0) {
392 k++;
393 }
394 if (i && i % 127 == 0) {
395 ASSERT_OK(Flush());
396 }
397 if (i && i % 317 == 0) {
398 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
399 }
400 }
401 };
402
403 std::function<void()> reader_func = [&]() {
404 ReadOptions ro;
405 ro.tailing = true;
406 Iterator* iter = db_->NewIterator(ro);
407 ASSERT_OK(iter->status());
408 iter->SeekToFirst();
409 for (int i = 0; i < (kNumWrites / kNumOperands); i++) {
410 while (!iter->Valid()) {
411 // wait for the key to be written
412 env_->SleepForMicroseconds(100);
413 iter->Seek(Key(i));
414 }
415 ASSERT_EQ(iter->key(), Key(i));
416 ASSERT_EQ(iter->value(), Key(i));
417
418 iter->Next();
419 }
420 ASSERT_OK(iter->status());
421
422 delete iter;
423 };
424
425 ROCKSDB_NAMESPACE::port::Thread writer_thread(writer_func);
426 ROCKSDB_NAMESPACE::port::Thread reader_thread(reader_func);
427
428 writer_thread.join();
429 reader_thread.join();
430 }
431
TEST_F(DBMergeOperatorTest,TailingIteratorMemtableUnrefedBySomeoneElse)432 TEST_F(DBMergeOperatorTest, TailingIteratorMemtableUnrefedBySomeoneElse) {
433 Options options = CurrentOptions();
434 options.merge_operator = MergeOperators::CreateStringAppendOperator();
435 DestroyAndReopen(options);
436
437 // Overview of the test:
438 // * There are two merge operands for the same key: one in an sst file,
439 // another in a memtable.
440 // * Seek a tailing iterator to this key.
441 // * As part of the seek, the iterator will:
442 // (a) first visit the operand in the memtable and tell ForwardIterator
443 // to pin this operand, then
444 // (b) move on to the operand in the sst file, then pass both operands
445 // to merge operator.
446 // * The memtable may get flushed and unreferenced by another thread between
447 // (a) and (b). The test simulates it by flushing the memtable inside a
448 // SyncPoint callback located between (a) and (b).
449 // * In this case it's ForwardIterator's responsibility to keep the memtable
450 // pinned until (b) is complete. There used to be a bug causing
451 // ForwardIterator to not pin it in some circumstances. This test
452 // reproduces it.
453
454 ASSERT_OK(db_->Merge(WriteOptions(), "key", "sst"));
455 ASSERT_OK(db_->Flush(FlushOptions())); // Switch to SuperVersion A
456 ASSERT_OK(db_->Merge(WriteOptions(), "key", "memtable"));
457
458 // Pin SuperVersion A
459 std::unique_ptr<Iterator> someone_else(db_->NewIterator(ReadOptions()));
460 ASSERT_OK(someone_else->status());
461
462 bool pushed_first_operand = false;
463 bool stepped_to_next_operand = false;
464 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
465 "DBIter::MergeValuesNewToOld:PushedFirstOperand", [&](void*) {
466 EXPECT_FALSE(pushed_first_operand);
467 pushed_first_operand = true;
468 EXPECT_OK(db_->Flush(FlushOptions())); // Switch to SuperVersion B
469 });
470 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
471 "DBIter::MergeValuesNewToOld:SteppedToNextOperand", [&](void*) {
472 EXPECT_FALSE(stepped_to_next_operand);
473 stepped_to_next_operand = true;
474 someone_else.reset(); // Unpin SuperVersion A
475 });
476 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
477
478 ReadOptions ro;
479 ro.tailing = true;
480 std::unique_ptr<Iterator> iter(db_->NewIterator(ro));
481 iter->Seek("key");
482
483 ASSERT_OK(iter->status());
484 ASSERT_TRUE(iter->Valid());
485 EXPECT_EQ(std::string("sst,memtable"), iter->value().ToString());
486 EXPECT_TRUE(pushed_first_operand);
487 EXPECT_TRUE(stepped_to_next_operand);
488 }
489 #endif // ROCKSDB_LITE
490
TEST_F(DBMergeOperatorTest,SnapshotCheckerAndReadCallback)491 TEST_F(DBMergeOperatorTest, SnapshotCheckerAndReadCallback) {
492 Options options = CurrentOptions();
493 options.merge_operator = MergeOperators::CreateStringAppendOperator();
494 DestroyAndReopen(options);
495
496 class TestSnapshotChecker : public SnapshotChecker {
497 public:
498 SnapshotCheckerResult CheckInSnapshot(
499 SequenceNumber seq, SequenceNumber snapshot_seq) const override {
500 return IsInSnapshot(seq, snapshot_seq)
501 ? SnapshotCheckerResult::kInSnapshot
502 : SnapshotCheckerResult::kNotInSnapshot;
503 }
504
505 bool IsInSnapshot(SequenceNumber seq, SequenceNumber snapshot_seq) const {
506 switch (snapshot_seq) {
507 case 0:
508 return seq == 0;
509 case 1:
510 return seq <= 1;
511 case 2:
512 // seq = 2 not visible to snapshot with seq = 2
513 return seq <= 1;
514 case 3:
515 return seq <= 3;
516 case 4:
517 // seq = 4 not visible to snpahost with seq = 4
518 return seq <= 3;
519 default:
520 // seq >=4 is uncommitted
521 return seq <= 4;
522 };
523 }
524 };
525 TestSnapshotChecker* snapshot_checker = new TestSnapshotChecker();
526 dbfull()->SetSnapshotChecker(snapshot_checker);
527
528 std::string value;
529 ASSERT_OK(Merge("foo", "v1"));
530 ASSERT_EQ(1, db_->GetLatestSequenceNumber());
531 ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo"));
532 ASSERT_OK(Merge("foo", "v2"));
533 ASSERT_EQ(2, db_->GetLatestSequenceNumber());
534 // v2 is not visible to latest snapshot, which has seq = 2.
535 ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo"));
536 // Take a snapshot with seq = 2.
537 const Snapshot* snapshot1 = db_->GetSnapshot();
538 ASSERT_EQ(2, snapshot1->GetSequenceNumber());
539 // v2 is not visible to snapshot1, which has seq = 2
540 ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo", snapshot1));
541
542 // Verify flush doesn't alter the result.
543 ASSERT_OK(Flush());
544 ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo", snapshot1));
545 ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo"));
546
547 ASSERT_OK(Merge("foo", "v3"));
548 ASSERT_EQ(3, db_->GetLatestSequenceNumber());
549 ASSERT_EQ("v1,v2,v3", GetWithReadCallback(snapshot_checker, "foo"));
550 ASSERT_OK(Merge("foo", "v4"));
551 ASSERT_EQ(4, db_->GetLatestSequenceNumber());
552 // v4 is not visible to latest snapshot, which has seq = 4.
553 ASSERT_EQ("v1,v2,v3", GetWithReadCallback(snapshot_checker, "foo"));
554 const Snapshot* snapshot2 = db_->GetSnapshot();
555 ASSERT_EQ(4, snapshot2->GetSequenceNumber());
556 // v4 is not visible to snapshot2, which has seq = 4.
557 ASSERT_EQ("v1,v2,v3",
558 GetWithReadCallback(snapshot_checker, "foo", snapshot2));
559
560 // Verify flush doesn't alter the result.
561 ASSERT_OK(Flush());
562 ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo", snapshot1));
563 ASSERT_EQ("v1,v2,v3",
564 GetWithReadCallback(snapshot_checker, "foo", snapshot2));
565 ASSERT_EQ("v1,v2,v3", GetWithReadCallback(snapshot_checker, "foo"));
566
567 ASSERT_OK(Merge("foo", "v5"));
568 ASSERT_EQ(5, db_->GetLatestSequenceNumber());
569 // v5 is uncommitted
570 ASSERT_EQ("v1,v2,v3,v4", GetWithReadCallback(snapshot_checker, "foo"));
571
572 // full manual compaction.
573 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
574
575 // Verify compaction doesn't alter the result.
576 ASSERT_EQ("v1", GetWithReadCallback(snapshot_checker, "foo", snapshot1));
577 ASSERT_EQ("v1,v2,v3",
578 GetWithReadCallback(snapshot_checker, "foo", snapshot2));
579 ASSERT_EQ("v1,v2,v3,v4", GetWithReadCallback(snapshot_checker, "foo"));
580
581 db_->ReleaseSnapshot(snapshot1);
582 db_->ReleaseSnapshot(snapshot2);
583 }
584
585 class PerConfigMergeOperatorPinningTest
586 : public DBMergeOperatorTest,
587 public testing::WithParamInterface<std::tuple<bool, int>> {
588 public:
PerConfigMergeOperatorPinningTest()589 PerConfigMergeOperatorPinningTest() {
590 std::tie(disable_block_cache_, option_config_) = GetParam();
591 }
592
593 bool disable_block_cache_;
594 };
595
596 INSTANTIATE_TEST_CASE_P(
597 MergeOperatorPinningTest, PerConfigMergeOperatorPinningTest,
598 ::testing::Combine(::testing::Bool(),
599 ::testing::Range(static_cast<int>(DBTestBase::kDefault),
600 static_cast<int>(DBTestBase::kEnd))));
601
TEST_P(PerConfigMergeOperatorPinningTest,Randomized)602 TEST_P(PerConfigMergeOperatorPinningTest, Randomized) {
603 if (ShouldSkipOptions(option_config_, kSkipMergePut)) {
604 return;
605 }
606
607 Options options = CurrentOptions();
608 options.merge_operator = MergeOperators::CreateMaxOperator();
609 BlockBasedTableOptions table_options;
610 table_options.no_block_cache = disable_block_cache_;
611 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
612 DestroyAndReopen(options);
613
614 Random rnd(301);
615 std::map<std::string, std::string> true_data;
616
617 const int kTotalMerges = 5000;
618 // Every key gets ~10 operands
619 const int kKeyRange = kTotalMerges / 10;
620 const int kOperandSize = 20;
621 const int kNumPutBefore = kKeyRange / 10; // 10% value
622 const int kNumPutAfter = kKeyRange / 10; // 10% overwrite
623 const int kNumDelete = kKeyRange / 10; // 10% delete
624
625 // kNumPutBefore keys will have base values
626 for (int i = 0; i < kNumPutBefore; i++) {
627 std::string key = Key(rnd.Next() % kKeyRange);
628 std::string value = rnd.RandomString(kOperandSize);
629 ASSERT_OK(db_->Put(WriteOptions(), key, value));
630
631 true_data[key] = value;
632 }
633
634 // Do kTotalMerges merges
635 for (int i = 0; i < kTotalMerges; i++) {
636 std::string key = Key(rnd.Next() % kKeyRange);
637 std::string value = rnd.RandomString(kOperandSize);
638 ASSERT_OK(db_->Merge(WriteOptions(), key, value));
639
640 if (true_data[key] < value) {
641 true_data[key] = value;
642 }
643 }
644
645 // Overwrite random kNumPutAfter keys
646 for (int i = 0; i < kNumPutAfter; i++) {
647 std::string key = Key(rnd.Next() % kKeyRange);
648 std::string value = rnd.RandomString(kOperandSize);
649 ASSERT_OK(db_->Put(WriteOptions(), key, value));
650
651 true_data[key] = value;
652 }
653
654 // Delete random kNumDelete keys
655 for (int i = 0; i < kNumDelete; i++) {
656 std::string key = Key(rnd.Next() % kKeyRange);
657 ASSERT_OK(db_->Delete(WriteOptions(), key));
658
659 true_data.erase(key);
660 }
661
662 VerifyDBFromMap(true_data);
663 }
664
665 } // namespace ROCKSDB_NAMESPACE
666
main(int argc,char ** argv)667 int main(int argc, char** argv) {
668 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
669 ::testing::InitGoogleTest(&argc, argv);
670 return RUN_ALL_TESTS();
671 }
672