1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 #include <assert.h>
7
8 #include <iostream>
9 #include <memory>
10
11 #include "db/db_impl/db_impl.h"
12 #include "db/dbformat.h"
13 #include "db/write_batch_internal.h"
14 #include "port/stack_trace.h"
15 #include "rocksdb/cache.h"
16 #include "rocksdb/comparator.h"
17 #include "rocksdb/db.h"
18 #include "rocksdb/env.h"
19 #include "rocksdb/merge_operator.h"
20 #include "rocksdb/utilities/db_ttl.h"
21 #include "test_util/testharness.h"
22 #include "util/coding.h"
23 #include "utilities/merge_operators.h"
24
25 namespace ROCKSDB_NAMESPACE {
26
27 bool use_compression;
28
29 class MergeTest : public testing::Test {};
30
31 size_t num_merge_operator_calls;
resetNumMergeOperatorCalls()32 void resetNumMergeOperatorCalls() { num_merge_operator_calls = 0; }
33
34 size_t num_partial_merge_calls;
resetNumPartialMergeCalls()35 void resetNumPartialMergeCalls() { num_partial_merge_calls = 0; }
36
37 class CountMergeOperator : public AssociativeMergeOperator {
38 public:
CountMergeOperator()39 CountMergeOperator() {
40 mergeOperator_ = MergeOperators::CreateUInt64AddOperator();
41 }
42
Merge(const Slice & key,const Slice * existing_value,const Slice & value,std::string * new_value,Logger * logger) const43 bool Merge(const Slice& key, const Slice* existing_value, const Slice& value,
44 std::string* new_value, Logger* logger) const override {
45 assert(new_value->empty());
46 ++num_merge_operator_calls;
47 if (existing_value == nullptr) {
48 new_value->assign(value.data(), value.size());
49 return true;
50 }
51
52 return mergeOperator_->PartialMerge(key, *existing_value, value, new_value,
53 logger);
54 }
55
PartialMergeMulti(const Slice & key,const std::deque<Slice> & operand_list,std::string * new_value,Logger * logger) const56 bool PartialMergeMulti(const Slice& key,
57 const std::deque<Slice>& operand_list,
58 std::string* new_value,
59 Logger* logger) const override {
60 assert(new_value->empty());
61 ++num_partial_merge_calls;
62 return mergeOperator_->PartialMergeMulti(key, operand_list, new_value,
63 logger);
64 }
65
Name() const66 const char* Name() const override { return "UInt64AddOperator"; }
67
68 private:
69 std::shared_ptr<MergeOperator> mergeOperator_;
70 };
71
72 class EnvMergeTest : public EnvWrapper {
73 public:
EnvMergeTest()74 EnvMergeTest() : EnvWrapper(Env::Default()) {}
75 // ~EnvMergeTest() override {}
76
NowNanos()77 uint64_t NowNanos() override {
78 ++now_nanos_count_;
79 return target()->NowNanos();
80 }
81
82 static uint64_t now_nanos_count_;
83
84 static std::unique_ptr<EnvMergeTest> singleton_;
85
GetInstance()86 static EnvMergeTest* GetInstance() {
87 if (nullptr == singleton_) singleton_.reset(new EnvMergeTest);
88 return singleton_.get();
89 }
90 };
91
92 uint64_t EnvMergeTest::now_nanos_count_{0};
93 std::unique_ptr<EnvMergeTest> EnvMergeTest::singleton_;
94
OpenDb(const std::string & dbname,const bool ttl=false,const size_t max_successive_merges=0)95 std::shared_ptr<DB> OpenDb(const std::string& dbname, const bool ttl = false,
96 const size_t max_successive_merges = 0) {
97 DB* db;
98 Options options;
99 options.create_if_missing = true;
100 options.merge_operator = std::make_shared<CountMergeOperator>();
101 options.max_successive_merges = max_successive_merges;
102 options.env = EnvMergeTest::GetInstance();
103 EXPECT_OK(DestroyDB(dbname, Options()));
104 Status s;
105 // DBWithTTL is not supported in ROCKSDB_LITE
106 #ifndef ROCKSDB_LITE
107 if (ttl) {
108 DBWithTTL* db_with_ttl;
109 s = DBWithTTL::Open(options, dbname, &db_with_ttl);
110 db = db_with_ttl;
111 } else {
112 s = DB::Open(options, dbname, &db);
113 }
114 #else
115 assert(!ttl);
116 s = DB::Open(options, dbname, &db);
117 #endif // !ROCKSDB_LITE
118 EXPECT_OK(s);
119 assert(s.ok());
120 // Allowed to call NowNanos during DB creation (in GenerateRawUniqueId() for
121 // session ID)
122 EnvMergeTest::now_nanos_count_ = 0;
123 return std::shared_ptr<DB>(db);
124 }
125
126 // Imagine we are maintaining a set of uint64 counters.
127 // Each counter has a distinct name. And we would like
128 // to support four high level operations:
129 // set, add, get and remove
130 // This is a quick implementation without a Merge operation.
131 class Counters {
132 protected:
133 std::shared_ptr<DB> db_;
134
135 WriteOptions put_option_;
136 ReadOptions get_option_;
137 WriteOptions delete_option_;
138
139 uint64_t default_;
140
141 public:
Counters(std::shared_ptr<DB> db,uint64_t defaultCount=0)142 explicit Counters(std::shared_ptr<DB> db, uint64_t defaultCount = 0)
143 : db_(db),
144 put_option_(),
145 get_option_(),
146 delete_option_(),
147 default_(defaultCount) {
148 assert(db_);
149 }
150
~Counters()151 virtual ~Counters() {}
152
153 // public interface of Counters.
154 // All four functions return false
155 // if the underlying level db operation failed.
156
157 // mapped to a levedb Put
set(const std::string & key,uint64_t value)158 bool set(const std::string& key, uint64_t value) {
159 // just treat the internal rep of int64 as the string
160 char buf[sizeof(value)];
161 EncodeFixed64(buf, value);
162 Slice slice(buf, sizeof(value));
163 auto s = db_->Put(put_option_, key, slice);
164
165 if (s.ok()) {
166 return true;
167 } else {
168 std::cerr << s.ToString() << std::endl;
169 return false;
170 }
171 }
172
173 // mapped to a rocksdb Delete
remove(const std::string & key)174 bool remove(const std::string& key) {
175 auto s = db_->Delete(delete_option_, key);
176
177 if (s.ok()) {
178 return true;
179 } else {
180 std::cerr << s.ToString() << std::endl;
181 return false;
182 }
183 }
184
185 // mapped to a rocksdb Get
get(const std::string & key,uint64_t * value)186 bool get(const std::string& key, uint64_t* value) {
187 std::string str;
188 auto s = db_->Get(get_option_, key, &str);
189
190 if (s.IsNotFound()) {
191 // return default value if not found;
192 *value = default_;
193 return true;
194 } else if (s.ok()) {
195 // deserialization
196 if (str.size() != sizeof(uint64_t)) {
197 std::cerr << "value corruption\n";
198 return false;
199 }
200 *value = DecodeFixed64(&str[0]);
201 return true;
202 } else {
203 std::cerr << s.ToString() << std::endl;
204 return false;
205 }
206 }
207
208 // 'add' is implemented as get -> modify -> set
209 // An alternative is a single merge operation, see MergeBasedCounters
add(const std::string & key,uint64_t value)210 virtual bool add(const std::string& key, uint64_t value) {
211 uint64_t base = default_;
212 return get(key, &base) && set(key, base + value);
213 }
214
215 // convenience functions for testing
assert_set(const std::string & key,uint64_t value)216 void assert_set(const std::string& key, uint64_t value) {
217 assert(set(key, value));
218 }
219
assert_remove(const std::string & key)220 void assert_remove(const std::string& key) { assert(remove(key)); }
221
assert_get(const std::string & key)222 uint64_t assert_get(const std::string& key) {
223 uint64_t value = default_;
224 int result = get(key, &value);
225 assert(result);
226 if (result == 0) exit(1); // Disable unused variable warning.
227 return value;
228 }
229
assert_add(const std::string & key,uint64_t value)230 void assert_add(const std::string& key, uint64_t value) {
231 int result = add(key, value);
232 assert(result);
233 if (result == 0) exit(1); // Disable unused variable warning.
234 }
235 };
236
237 // Implement 'add' directly with the new Merge operation
238 class MergeBasedCounters : public Counters {
239 private:
240 WriteOptions merge_option_; // for merge
241
242 public:
MergeBasedCounters(std::shared_ptr<DB> db,uint64_t defaultCount=0)243 explicit MergeBasedCounters(std::shared_ptr<DB> db, uint64_t defaultCount = 0)
244 : Counters(db, defaultCount), merge_option_() {}
245
246 // mapped to a rocksdb Merge operation
add(const std::string & key,uint64_t value)247 bool add(const std::string& key, uint64_t value) override {
248 char encoded[sizeof(uint64_t)];
249 EncodeFixed64(encoded, value);
250 Slice slice(encoded, sizeof(uint64_t));
251 auto s = db_->Merge(merge_option_, key, slice);
252
253 if (s.ok()) {
254 return true;
255 } else {
256 std::cerr << s.ToString() << std::endl;
257 return false;
258 }
259 }
260 };
261
dumpDb(DB * db)262 void dumpDb(DB* db) {
263 auto it = std::unique_ptr<Iterator>(db->NewIterator(ReadOptions()));
264 for (it->SeekToFirst(); it->Valid(); it->Next()) {
265 // uint64_t value = DecodeFixed64(it->value().data());
266 // std::cout << it->key().ToString() << ": " << value << std::endl;
267 }
268 assert(it->status().ok()); // Check for any errors found during the scan
269 }
270
testCounters(Counters & counters,DB * db,bool test_compaction)271 void testCounters(Counters& counters, DB* db, bool test_compaction) {
272 FlushOptions o;
273 o.wait = true;
274
275 counters.assert_set("a", 1);
276
277 if (test_compaction) {
278 ASSERT_OK(db->Flush(o));
279 }
280
281 ASSERT_EQ(counters.assert_get("a"), 1);
282
283 counters.assert_remove("b");
284
285 // defaut value is 0 if non-existent
286 ASSERT_EQ(counters.assert_get("b"), 0);
287
288 counters.assert_add("a", 2);
289
290 if (test_compaction) {
291 ASSERT_OK(db->Flush(o));
292 }
293
294 // 1+2 = 3
295 ASSERT_EQ(counters.assert_get("a"), 3);
296
297 dumpDb(db);
298
299 // 1+...+49 = ?
300 uint64_t sum = 0;
301 for (int i = 1; i < 50; i++) {
302 counters.assert_add("b", i);
303 sum += i;
304 }
305 ASSERT_EQ(counters.assert_get("b"), sum);
306
307 dumpDb(db);
308
309 if (test_compaction) {
310 ASSERT_OK(db->Flush(o));
311
312 ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
313
314 dumpDb(db);
315
316 ASSERT_EQ(counters.assert_get("a"), 3);
317 ASSERT_EQ(counters.assert_get("b"), sum);
318 }
319 }
320
testCountersWithFlushAndCompaction(Counters & counters,DB * db)321 void testCountersWithFlushAndCompaction(Counters& counters, DB* db) {
322 ASSERT_OK(db->Put({}, "1", "1"));
323 ASSERT_OK(db->Flush(FlushOptions()));
324
325 std::atomic<int> cnt{0};
326 const auto get_thread_id = [&cnt]() {
327 thread_local int thread_id{cnt++};
328 return thread_id;
329 };
330 SyncPoint::GetInstance()->DisableProcessing();
331 SyncPoint::GetInstance()->ClearAllCallBacks();
332 SyncPoint::GetInstance()->SetCallBack(
333 "VersionSet::LogAndApply:BeforeWriterWaiting", [&](void* /*arg*/) {
334 int thread_id = get_thread_id();
335 if (1 == thread_id) {
336 TEST_SYNC_POINT(
337 "testCountersWithFlushAndCompaction::bg_compact_thread:0");
338 } else if (2 == thread_id) {
339 TEST_SYNC_POINT(
340 "testCountersWithFlushAndCompaction::bg_flush_thread:0");
341 }
342 });
343 SyncPoint::GetInstance()->SetCallBack(
344 "VersionSet::LogAndApply:WriteManifest", [&](void* /*arg*/) {
345 int thread_id = get_thread_id();
346 if (0 == thread_id) {
347 TEST_SYNC_POINT(
348 "testCountersWithFlushAndCompaction::set_options_thread:0");
349 TEST_SYNC_POINT(
350 "testCountersWithFlushAndCompaction::set_options_thread:1");
351 }
352 });
353 SyncPoint::GetInstance()->SetCallBack(
354 "VersionSet::LogAndApply:WakeUpAndDone", [&](void* arg) {
355 auto* mutex = reinterpret_cast<InstrumentedMutex*>(arg);
356 mutex->AssertHeld();
357 int thread_id = get_thread_id();
358 ASSERT_EQ(2, thread_id);
359 mutex->Unlock();
360 TEST_SYNC_POINT(
361 "testCountersWithFlushAndCompaction::bg_flush_thread:1");
362 TEST_SYNC_POINT(
363 "testCountersWithFlushAndCompaction::bg_flush_thread:2");
364 mutex->Lock();
365 });
366 SyncPoint::GetInstance()->LoadDependency({
367 {"testCountersWithFlushAndCompaction::set_options_thread:0",
368 "testCountersWithCompactionAndFlush:BeforeCompact"},
369 {"testCountersWithFlushAndCompaction::bg_compact_thread:0",
370 "testCountersWithFlushAndCompaction:BeforeIncCounters"},
371 {"testCountersWithFlushAndCompaction::bg_flush_thread:0",
372 "testCountersWithFlushAndCompaction::set_options_thread:1"},
373 {"testCountersWithFlushAndCompaction::bg_flush_thread:1",
374 "testCountersWithFlushAndCompaction:BeforeVerification"},
375 {"testCountersWithFlushAndCompaction:AfterGet",
376 "testCountersWithFlushAndCompaction::bg_flush_thread:2"},
377 });
378 SyncPoint::GetInstance()->EnableProcessing();
379
380 port::Thread set_options_thread([&]() {
381 ASSERT_OK(reinterpret_cast<DBImpl*>(db)->SetOptions(
382 {{"disable_auto_compactions", "false"}}));
383 });
384 TEST_SYNC_POINT("testCountersWithCompactionAndFlush:BeforeCompact");
385 port::Thread compact_thread([&]() {
386 ASSERT_OK(reinterpret_cast<DBImpl*>(db)->CompactRange(
387 CompactRangeOptions(), db->DefaultColumnFamily(), nullptr, nullptr));
388 });
389
390 TEST_SYNC_POINT("testCountersWithFlushAndCompaction:BeforeIncCounters");
391 counters.add("test-key", 1);
392
393 FlushOptions flush_opts;
394 flush_opts.wait = false;
395 ASSERT_OK(db->Flush(flush_opts));
396
397 TEST_SYNC_POINT("testCountersWithFlushAndCompaction:BeforeVerification");
398 std::string expected;
399 PutFixed64(&expected, 1);
400 std::string actual;
401 Status s = db->Get(ReadOptions(), "test-key", &actual);
402 TEST_SYNC_POINT("testCountersWithFlushAndCompaction:AfterGet");
403 set_options_thread.join();
404 compact_thread.join();
405 ASSERT_OK(s);
406 ASSERT_EQ(expected, actual);
407 SyncPoint::GetInstance()->DisableProcessing();
408 SyncPoint::GetInstance()->ClearAllCallBacks();
409 }
410
testSuccessiveMerge(Counters & counters,size_t max_num_merges,size_t num_merges)411 void testSuccessiveMerge(Counters& counters, size_t max_num_merges,
412 size_t num_merges) {
413 counters.assert_remove("z");
414 uint64_t sum = 0;
415
416 for (size_t i = 1; i <= num_merges; ++i) {
417 resetNumMergeOperatorCalls();
418 counters.assert_add("z", i);
419 sum += i;
420
421 if (i % (max_num_merges + 1) == 0) {
422 ASSERT_EQ(num_merge_operator_calls, max_num_merges + 1);
423 } else {
424 ASSERT_EQ(num_merge_operator_calls, 0);
425 }
426
427 resetNumMergeOperatorCalls();
428 ASSERT_EQ(counters.assert_get("z"), sum);
429 ASSERT_EQ(num_merge_operator_calls, i % (max_num_merges + 1));
430 }
431 }
432
testPartialMerge(Counters * counters,DB * db,size_t max_merge,size_t min_merge,size_t count)433 void testPartialMerge(Counters* counters, DB* db, size_t max_merge,
434 size_t min_merge, size_t count) {
435 FlushOptions o;
436 o.wait = true;
437
438 // Test case 1: partial merge should be called when the number of merge
439 // operands exceeds the threshold.
440 uint64_t tmp_sum = 0;
441 resetNumPartialMergeCalls();
442 for (size_t i = 1; i <= count; i++) {
443 counters->assert_add("b", i);
444 tmp_sum += i;
445 }
446 ASSERT_OK(db->Flush(o));
447 ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
448 ASSERT_EQ(tmp_sum, counters->assert_get("b"));
449 if (count > max_merge) {
450 // in this case, FullMerge should be called instead.
451 ASSERT_EQ(num_partial_merge_calls, 0U);
452 } else {
453 // if count >= min_merge, then partial merge should be called once.
454 ASSERT_EQ((count >= min_merge), (num_partial_merge_calls == 1));
455 }
456
457 // Test case 2: partial merge should not be called when a put is found.
458 resetNumPartialMergeCalls();
459 tmp_sum = 0;
460 ASSERT_OK(db->Put(ROCKSDB_NAMESPACE::WriteOptions(), "c", "10"));
461 for (size_t i = 1; i <= count; i++) {
462 counters->assert_add("c", i);
463 tmp_sum += i;
464 }
465 ASSERT_OK(db->Flush(o));
466 ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
467 ASSERT_EQ(tmp_sum, counters->assert_get("c"));
468 ASSERT_EQ(num_partial_merge_calls, 0U);
469 // NowNanos was previously called in MergeHelper::FilterMerge(), which
470 // harmed performance.
471 ASSERT_EQ(EnvMergeTest::now_nanos_count_, 0U);
472 }
473
testSingleBatchSuccessiveMerge(DB * db,size_t max_num_merges,size_t num_merges)474 void testSingleBatchSuccessiveMerge(DB* db, size_t max_num_merges,
475 size_t num_merges) {
476 ASSERT_GT(num_merges, max_num_merges);
477
478 Slice key("BatchSuccessiveMerge");
479 uint64_t merge_value = 1;
480 char buf[sizeof(merge_value)];
481 EncodeFixed64(buf, merge_value);
482 Slice merge_value_slice(buf, sizeof(merge_value));
483
484 // Create the batch
485 WriteBatch batch;
486 for (size_t i = 0; i < num_merges; ++i) {
487 ASSERT_OK(batch.Merge(key, merge_value_slice));
488 }
489
490 // Apply to memtable and count the number of merges
491 resetNumMergeOperatorCalls();
492 ASSERT_OK(db->Write(WriteOptions(), &batch));
493 ASSERT_EQ(
494 num_merge_operator_calls,
495 static_cast<size_t>(num_merges - (num_merges % (max_num_merges + 1))));
496
497 // Get the value
498 resetNumMergeOperatorCalls();
499 std::string get_value_str;
500 ASSERT_OK(db->Get(ReadOptions(), key, &get_value_str));
501 assert(get_value_str.size() == sizeof(uint64_t));
502 uint64_t get_value = DecodeFixed64(&get_value_str[0]);
503 ASSERT_EQ(get_value, num_merges * merge_value);
504 ASSERT_EQ(num_merge_operator_calls,
505 static_cast<size_t>((num_merges % (max_num_merges + 1))));
506 }
507
runTest(const std::string & dbname,const bool use_ttl=false)508 void runTest(const std::string& dbname, const bool use_ttl = false) {
509 {
510 auto db = OpenDb(dbname, use_ttl);
511
512 {
513 Counters counters(db, 0);
514 testCounters(counters, db.get(), true);
515 }
516
517 {
518 MergeBasedCounters counters(db, 0);
519 testCounters(counters, db.get(), use_compression);
520 }
521 }
522
523 ASSERT_OK(DestroyDB(dbname, Options()));
524
525 {
526 size_t max_merge = 5;
527 auto db = OpenDb(dbname, use_ttl, max_merge);
528 MergeBasedCounters counters(db, 0);
529 testCounters(counters, db.get(), use_compression);
530 testSuccessiveMerge(counters, max_merge, max_merge * 2);
531 testSingleBatchSuccessiveMerge(db.get(), 5, 7);
532 ASSERT_OK(db->Close());
533 ASSERT_OK(DestroyDB(dbname, Options()));
534 }
535
536 {
537 size_t max_merge = 100;
538 // Min merge is hard-coded to 2.
539 uint32_t min_merge = 2;
540 for (uint32_t count = min_merge - 1; count <= min_merge + 1; count++) {
541 auto db = OpenDb(dbname, use_ttl, max_merge);
542 MergeBasedCounters counters(db, 0);
543 testPartialMerge(&counters, db.get(), max_merge, min_merge, count);
544 ASSERT_OK(db->Close());
545 ASSERT_OK(DestroyDB(dbname, Options()));
546 }
547 {
548 auto db = OpenDb(dbname, use_ttl, max_merge);
549 MergeBasedCounters counters(db, 0);
550 testPartialMerge(&counters, db.get(), max_merge, min_merge,
551 min_merge * 10);
552 ASSERT_OK(db->Close());
553 ASSERT_OK(DestroyDB(dbname, Options()));
554 }
555 }
556
557 {
558 {
559 auto db = OpenDb(dbname);
560 MergeBasedCounters counters(db, 0);
561 counters.add("test-key", 1);
562 counters.add("test-key", 1);
563 counters.add("test-key", 1);
564 ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
565 }
566
567 DB* reopen_db;
568 ASSERT_OK(DB::Open(Options(), dbname, &reopen_db));
569 std::string value;
570 ASSERT_NOK(reopen_db->Get(ReadOptions(), "test-key", &value));
571 delete reopen_db;
572 ASSERT_OK(DestroyDB(dbname, Options()));
573 }
574
575 /* Temporary remove this test
576 {
577 std::cout << "Test merge-operator not set after reopen (recovery case)\n";
578 {
579 auto db = OpenDb(dbname);
580 MergeBasedCounters counters(db, 0);
581 counters.add("test-key", 1);
582 counters.add("test-key", 1);
583 counters.add("test-key", 1);
584 }
585
586 DB* reopen_db;
587 ASSERT_TRUE(DB::Open(Options(), dbname, &reopen_db).IsInvalidArgument());
588 }
589 */
590 }
591
TEST_F(MergeTest,MergeDbTest)592 TEST_F(MergeTest, MergeDbTest) {
593 runTest(test::PerThreadDBPath("merge_testdb"));
594 }
595
596 #ifndef ROCKSDB_LITE
TEST_F(MergeTest,MergeDbTtlTest)597 TEST_F(MergeTest, MergeDbTtlTest) {
598 runTest(test::PerThreadDBPath("merge_testdbttl"),
599 true); // Run test on TTL database
600 }
601
TEST_F(MergeTest,MergeWithCompactionAndFlush)602 TEST_F(MergeTest, MergeWithCompactionAndFlush) {
603 const std::string dbname =
604 test::PerThreadDBPath("merge_with_compaction_and_flush");
605 {
606 auto db = OpenDb(dbname);
607 {
608 MergeBasedCounters counters(db, 0);
609 testCountersWithFlushAndCompaction(counters, db.get());
610 }
611 }
612 ASSERT_OK(DestroyDB(dbname, Options()));
613 }
614 #endif // !ROCKSDB_LITE
615
616 } // namespace ROCKSDB_NAMESPACE
617
main(int argc,char ** argv)618 int main(int argc, char** argv) {
619 ROCKSDB_NAMESPACE::use_compression = false;
620 if (argc > 1) {
621 ROCKSDB_NAMESPACE::use_compression = true;
622 }
623
624 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
625 ::testing::InitGoogleTest(&argc, argv);
626 return RUN_ALL_TESTS();
627 }
628