1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #ifndef ROCKSDB_LITE
7
8 #include "db/db_test_util.h"
9 #include "port/stack_trace.h"
10
11 namespace ROCKSDB_NAMESPACE {
12
13 class TestCompactionServiceBase {
14 public:
15 virtual int GetCompactionNum() = 0;
16
OverrideStartStatus(CompactionServiceJobStatus s)17 void OverrideStartStatus(CompactionServiceJobStatus s) {
18 is_override_start_status = true;
19 override_start_status = s;
20 }
21
OverrideWaitStatus(CompactionServiceJobStatus s)22 void OverrideWaitStatus(CompactionServiceJobStatus s) {
23 is_override_wait_status = true;
24 override_wait_status = s;
25 }
26
OverrideWaitResult(std::string str)27 void OverrideWaitResult(std::string str) {
28 is_override_wait_result = true;
29 override_wait_result = std::move(str);
30 }
31
ResetOverride()32 void ResetOverride() {
33 is_override_wait_result = false;
34 is_override_start_status = false;
35 is_override_wait_status = false;
36 }
37
38 virtual ~TestCompactionServiceBase() = default;
39
40 protected:
41 bool is_override_start_status = false;
42 CompactionServiceJobStatus override_start_status =
43 CompactionServiceJobStatus::kFailure;
44 bool is_override_wait_status = false;
45 CompactionServiceJobStatus override_wait_status =
46 CompactionServiceJobStatus::kFailure;
47 bool is_override_wait_result = false;
48 std::string override_wait_result;
49 };
50
51 class MyTestCompactionServiceLegacy : public CompactionService,
52 public TestCompactionServiceBase {
53 public:
MyTestCompactionServiceLegacy(std::string db_path,Options & options,std::shared_ptr<Statistics> & statistics)54 MyTestCompactionServiceLegacy(std::string db_path, Options& options,
55 std::shared_ptr<Statistics>& statistics)
56 : db_path_(std::move(db_path)),
57 options_(options),
58 statistics_(statistics) {}
59
kClassName()60 static const char* kClassName() { return "MyTestCompactionServiceLegacy"; }
61
Name() const62 const char* Name() const override { return kClassName(); }
63
Start(const std::string & compaction_service_input,uint64_t job_id)64 CompactionServiceJobStatus Start(const std::string& compaction_service_input,
65 uint64_t job_id) override {
66 InstrumentedMutexLock l(&mutex_);
67 jobs_.emplace(job_id, compaction_service_input);
68 CompactionServiceJobStatus s = CompactionServiceJobStatus::kSuccess;
69 if (is_override_start_status) {
70 return override_start_status;
71 }
72 return s;
73 }
74
WaitForComplete(uint64_t job_id,std::string * compaction_service_result)75 CompactionServiceJobStatus WaitForComplete(
76 uint64_t job_id, std::string* compaction_service_result) override {
77 std::string compaction_input;
78 {
79 InstrumentedMutexLock l(&mutex_);
80 auto i = jobs_.find(job_id);
81 if (i == jobs_.end()) {
82 return CompactionServiceJobStatus::kFailure;
83 }
84 compaction_input = std::move(i->second);
85 jobs_.erase(i);
86 }
87
88 if (is_override_wait_status) {
89 return override_wait_status;
90 }
91
92 CompactionServiceOptionsOverride options_override;
93 options_override.env = options_.env;
94 options_override.file_checksum_gen_factory =
95 options_.file_checksum_gen_factory;
96 options_override.comparator = options_.comparator;
97 options_override.merge_operator = options_.merge_operator;
98 options_override.compaction_filter = options_.compaction_filter;
99 options_override.compaction_filter_factory =
100 options_.compaction_filter_factory;
101 options_override.prefix_extractor = options_.prefix_extractor;
102 options_override.table_factory = options_.table_factory;
103 options_override.sst_partitioner_factory = options_.sst_partitioner_factory;
104 options_override.statistics = statistics_;
105
106 Status s = DB::OpenAndCompact(
107 db_path_, db_path_ + "/" + ROCKSDB_NAMESPACE::ToString(job_id),
108 compaction_input, compaction_service_result, options_override);
109 if (is_override_wait_result) {
110 *compaction_service_result = override_wait_result;
111 }
112 compaction_num_.fetch_add(1);
113 if (s.ok()) {
114 return CompactionServiceJobStatus::kSuccess;
115 } else {
116 return CompactionServiceJobStatus::kFailure;
117 }
118 }
119
GetCompactionNum()120 int GetCompactionNum() override { return compaction_num_.load(); }
121
122 private:
123 InstrumentedMutex mutex_;
124 std::atomic_int compaction_num_{0};
125 std::map<uint64_t, std::string> jobs_;
126 const std::string db_path_;
127 Options options_;
128 std::shared_ptr<Statistics> statistics_;
129 };
130
131 class MyTestCompactionService : public CompactionService,
132 public TestCompactionServiceBase {
133 public:
MyTestCompactionService(std::string db_path,Options & options,std::shared_ptr<Statistics> & statistics)134 MyTestCompactionService(std::string db_path, Options& options,
135 std::shared_ptr<Statistics>& statistics)
136 : db_path_(std::move(db_path)),
137 options_(options),
138 statistics_(statistics),
139 start_info_("na", "na", "na", 0, Env::TOTAL),
140 wait_info_("na", "na", "na", 0, Env::TOTAL) {}
141
kClassName()142 static const char* kClassName() { return "MyTestCompactionService"; }
143
Name() const144 const char* Name() const override { return kClassName(); }
145
StartV2(const CompactionServiceJobInfo & info,const std::string & compaction_service_input)146 CompactionServiceJobStatus StartV2(
147 const CompactionServiceJobInfo& info,
148 const std::string& compaction_service_input) override {
149 InstrumentedMutexLock l(&mutex_);
150 start_info_ = info;
151 assert(info.db_name == db_path_);
152 jobs_.emplace(info.job_id, compaction_service_input);
153 CompactionServiceJobStatus s = CompactionServiceJobStatus::kSuccess;
154 if (is_override_start_status) {
155 return override_start_status;
156 }
157 return s;
158 }
159
WaitForCompleteV2(const CompactionServiceJobInfo & info,std::string * compaction_service_result)160 CompactionServiceJobStatus WaitForCompleteV2(
161 const CompactionServiceJobInfo& info,
162 std::string* compaction_service_result) override {
163 std::string compaction_input;
164 assert(info.db_name == db_path_);
165 {
166 InstrumentedMutexLock l(&mutex_);
167 wait_info_ = info;
168 auto i = jobs_.find(info.job_id);
169 if (i == jobs_.end()) {
170 return CompactionServiceJobStatus::kFailure;
171 }
172 compaction_input = std::move(i->second);
173 jobs_.erase(i);
174 }
175
176 if (is_override_wait_status) {
177 return override_wait_status;
178 }
179
180 CompactionServiceOptionsOverride options_override;
181 options_override.env = options_.env;
182 options_override.file_checksum_gen_factory =
183 options_.file_checksum_gen_factory;
184 options_override.comparator = options_.comparator;
185 options_override.merge_operator = options_.merge_operator;
186 options_override.compaction_filter = options_.compaction_filter;
187 options_override.compaction_filter_factory =
188 options_.compaction_filter_factory;
189 options_override.prefix_extractor = options_.prefix_extractor;
190 options_override.table_factory = options_.table_factory;
191 options_override.sst_partitioner_factory = options_.sst_partitioner_factory;
192 options_override.statistics = statistics_;
193
194 Status s = DB::OpenAndCompact(
195 db_path_, db_path_ + "/" + ROCKSDB_NAMESPACE::ToString(info.job_id),
196 compaction_input, compaction_service_result, options_override);
197 if (is_override_wait_result) {
198 *compaction_service_result = override_wait_result;
199 }
200 compaction_num_.fetch_add(1);
201 if (s.ok()) {
202 return CompactionServiceJobStatus::kSuccess;
203 } else {
204 return CompactionServiceJobStatus::kFailure;
205 }
206 }
207
GetCompactionNum()208 int GetCompactionNum() override { return compaction_num_.load(); }
209
GetCompactionInfoForStart()210 CompactionServiceJobInfo GetCompactionInfoForStart() { return start_info_; }
GetCompactionInfoForWait()211 CompactionServiceJobInfo GetCompactionInfoForWait() { return wait_info_; }
212
213 private:
214 InstrumentedMutex mutex_;
215 std::atomic_int compaction_num_{0};
216 std::map<uint64_t, std::string> jobs_;
217 const std::string db_path_;
218 Options options_;
219 std::shared_ptr<Statistics> statistics_;
220 CompactionServiceJobInfo start_info_;
221 CompactionServiceJobInfo wait_info_;
222 };
223
224 // This is only for listing test classes
225 enum TestCompactionServiceType {
226 MyTestCompactionServiceType,
227 MyTestCompactionServiceLegacyType,
228 };
229
230 class CompactionServiceTest
231 : public DBTestBase,
232 public testing::WithParamInterface<TestCompactionServiceType> {
233 public:
CompactionServiceTest()234 explicit CompactionServiceTest()
235 : DBTestBase("compaction_service_test", true) {}
236
237 protected:
ReopenWithCompactionService(Options * options)238 void ReopenWithCompactionService(Options* options) {
239 options->env = env_;
240 primary_statistics_ = CreateDBStatistics();
241 options->statistics = primary_statistics_;
242 compactor_statistics_ = CreateDBStatistics();
243 TestCompactionServiceType cs_type = GetParam();
244 switch (cs_type) {
245 case MyTestCompactionServiceType:
246 compaction_service_ = std::make_shared<MyTestCompactionService>(
247 dbname_, *options, compactor_statistics_);
248 break;
249 case MyTestCompactionServiceLegacyType:
250 compaction_service_ = std::make_shared<MyTestCompactionServiceLegacy>(
251 dbname_, *options, compactor_statistics_);
252 break;
253 default:
254 assert(false);
255 }
256 options->compaction_service = compaction_service_;
257 DestroyAndReopen(*options);
258 }
259
GetCompactorStatistics()260 Statistics* GetCompactorStatistics() { return compactor_statistics_.get(); }
261
GetPrimaryStatistics()262 Statistics* GetPrimaryStatistics() { return primary_statistics_.get(); }
263
GetCompactionService()264 TestCompactionServiceBase* GetCompactionService() {
265 CompactionService* cs = compaction_service_.get();
266 return dynamic_cast<TestCompactionServiceBase*>(cs);
267 }
268
GenerateTestData()269 void GenerateTestData() {
270 // Generate 20 files @ L2
271 for (int i = 0; i < 20; i++) {
272 for (int j = 0; j < 10; j++) {
273 int key_id = i * 10 + j;
274 ASSERT_OK(Put(Key(key_id), "value" + ToString(key_id)));
275 }
276 ASSERT_OK(Flush());
277 }
278 MoveFilesToLevel(2);
279
280 // Generate 10 files @ L1 overlap with all 20 files @ L2
281 for (int i = 0; i < 10; i++) {
282 for (int j = 0; j < 10; j++) {
283 int key_id = i * 20 + j * 2;
284 ASSERT_OK(Put(Key(key_id), "value_new" + ToString(key_id)));
285 }
286 ASSERT_OK(Flush());
287 }
288 MoveFilesToLevel(1);
289 ASSERT_EQ(FilesPerLevel(), "0,10,20");
290 }
291
VerifyTestData()292 void VerifyTestData() {
293 for (int i = 0; i < 200; i++) {
294 auto result = Get(Key(i));
295 if (i % 2) {
296 ASSERT_EQ(result, "value" + ToString(i));
297 } else {
298 ASSERT_EQ(result, "value_new" + ToString(i));
299 }
300 }
301 }
302
303 private:
304 std::shared_ptr<Statistics> compactor_statistics_;
305 std::shared_ptr<Statistics> primary_statistics_;
306 std::shared_ptr<CompactionService> compaction_service_;
307 };
308
TEST_P(CompactionServiceTest,BasicCompactions)309 TEST_P(CompactionServiceTest, BasicCompactions) {
310 Options options = CurrentOptions();
311 ReopenWithCompactionService(&options);
312
313 Statistics* primary_statistics = GetPrimaryStatistics();
314 Statistics* compactor_statistics = GetCompactorStatistics();
315
316 for (int i = 0; i < 20; i++) {
317 for (int j = 0; j < 10; j++) {
318 int key_id = i * 10 + j;
319 ASSERT_OK(Put(Key(key_id), "value" + ToString(key_id)));
320 }
321 ASSERT_OK(Flush());
322 }
323
324 for (int i = 0; i < 10; i++) {
325 for (int j = 0; j < 10; j++) {
326 int key_id = i * 20 + j * 2;
327 ASSERT_OK(Put(Key(key_id), "value_new" + ToString(key_id)));
328 }
329 ASSERT_OK(Flush());
330 }
331 ASSERT_OK(dbfull()->TEST_WaitForCompact());
332
333 // verify result
334 for (int i = 0; i < 200; i++) {
335 auto result = Get(Key(i));
336 if (i % 2) {
337 ASSERT_EQ(result, "value" + ToString(i));
338 } else {
339 ASSERT_EQ(result, "value_new" + ToString(i));
340 }
341 }
342 auto my_cs = GetCompactionService();
343 ASSERT_GE(my_cs->GetCompactionNum(), 1);
344
345 // make sure the compaction statistics is only recorded on the remote side
346 ASSERT_GE(compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES), 1);
347 ASSERT_GE(compactor_statistics->getTickerCount(COMPACT_READ_BYTES), 1);
348 ASSERT_EQ(primary_statistics->getTickerCount(COMPACT_WRITE_BYTES), 0);
349 // even with remote compaction, primary host still needs to read SST files to
350 // `verify_table()`.
351 ASSERT_GE(primary_statistics->getTickerCount(COMPACT_READ_BYTES), 1);
352 // all the compaction write happens on the remote side
353 ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES),
354 compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES));
355 ASSERT_GE(primary_statistics->getTickerCount(REMOTE_COMPACT_READ_BYTES), 1);
356 ASSERT_GT(primary_statistics->getTickerCount(COMPACT_READ_BYTES),
357 primary_statistics->getTickerCount(REMOTE_COMPACT_READ_BYTES));
358 // compactor is already the remote side, which doesn't have remote
359 ASSERT_EQ(compactor_statistics->getTickerCount(REMOTE_COMPACT_READ_BYTES), 0);
360 ASSERT_EQ(compactor_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES),
361 0);
362
363 // Test failed compaction
364 SyncPoint::GetInstance()->SetCallBack(
365 "DBImplSecondary::CompactWithoutInstallation::End", [&](void* status) {
366 // override job status
367 auto s = static_cast<Status*>(status);
368 *s = Status::Aborted("MyTestCompactionService failed to compact!");
369 });
370 SyncPoint::GetInstance()->EnableProcessing();
371
372 Status s;
373 for (int i = 0; i < 10; i++) {
374 for (int j = 0; j < 10; j++) {
375 int key_id = i * 20 + j * 2;
376 s = Put(Key(key_id), "value_new" + ToString(key_id));
377 if (s.IsAborted()) {
378 break;
379 }
380 }
381 if (s.IsAborted()) {
382 break;
383 }
384 s = Flush();
385 if (s.IsAborted()) {
386 break;
387 }
388 s = dbfull()->TEST_WaitForCompact();
389 if (s.IsAborted()) {
390 break;
391 }
392 }
393 ASSERT_TRUE(s.IsAborted());
394 }
395
TEST_P(CompactionServiceTest,ManualCompaction)396 TEST_P(CompactionServiceTest, ManualCompaction) {
397 Options options = CurrentOptions();
398 options.disable_auto_compactions = true;
399 ReopenWithCompactionService(&options);
400 GenerateTestData();
401
402 auto my_cs = GetCompactionService();
403
404 std::string start_str = Key(15);
405 std::string end_str = Key(45);
406 Slice start(start_str);
407 Slice end(end_str);
408 uint64_t comp_num = my_cs->GetCompactionNum();
409 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &end));
410 ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
411 VerifyTestData();
412
413 start_str = Key(120);
414 start = start_str;
415 comp_num = my_cs->GetCompactionNum();
416 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, nullptr));
417 ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
418 VerifyTestData();
419
420 end_str = Key(92);
421 end = end_str;
422 comp_num = my_cs->GetCompactionNum();
423 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, &end));
424 ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
425 VerifyTestData();
426
427 comp_num = my_cs->GetCompactionNum();
428 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
429 ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
430 VerifyTestData();
431 }
432
TEST_P(CompactionServiceTest,FailedToStart)433 TEST_P(CompactionServiceTest, FailedToStart) {
434 Options options = CurrentOptions();
435 options.disable_auto_compactions = true;
436 ReopenWithCompactionService(&options);
437
438 GenerateTestData();
439
440 auto my_cs = GetCompactionService();
441 my_cs->OverrideStartStatus(CompactionServiceJobStatus::kFailure);
442
443 std::string start_str = Key(15);
444 std::string end_str = Key(45);
445 Slice start(start_str);
446 Slice end(end_str);
447 Status s = db_->CompactRange(CompactRangeOptions(), &start, &end);
448 ASSERT_TRUE(s.IsIncomplete());
449 }
450
TEST_P(CompactionServiceTest,InvalidResult)451 TEST_P(CompactionServiceTest, InvalidResult) {
452 Options options = CurrentOptions();
453 options.disable_auto_compactions = true;
454 ReopenWithCompactionService(&options);
455
456 GenerateTestData();
457
458 auto my_cs = GetCompactionService();
459 my_cs->OverrideWaitResult("Invalid Str");
460
461 std::string start_str = Key(15);
462 std::string end_str = Key(45);
463 Slice start(start_str);
464 Slice end(end_str);
465 Status s = db_->CompactRange(CompactRangeOptions(), &start, &end);
466 ASSERT_FALSE(s.ok());
467 }
468
TEST_P(CompactionServiceTest,SubCompaction)469 TEST_P(CompactionServiceTest, SubCompaction) {
470 Options options = CurrentOptions();
471 options.max_subcompactions = 10;
472 options.target_file_size_base = 1 << 10; // 1KB
473 options.disable_auto_compactions = true;
474 ReopenWithCompactionService(&options);
475
476 GenerateTestData();
477 VerifyTestData();
478
479 auto my_cs = GetCompactionService();
480 int compaction_num_before = my_cs->GetCompactionNum();
481
482 auto cro = CompactRangeOptions();
483 cro.max_subcompactions = 10;
484 Status s = db_->CompactRange(cro, nullptr, nullptr);
485 ASSERT_OK(s);
486 VerifyTestData();
487 int compaction_num = my_cs->GetCompactionNum() - compaction_num_before;
488 // make sure there's sub-compaction by checking the compaction number
489 ASSERT_GE(compaction_num, 2);
490 }
491
492 class PartialDeleteCompactionFilter : public CompactionFilter {
493 public:
FilterV2(int,const Slice & key,ValueType,const Slice &,std::string *,std::string *) const494 CompactionFilter::Decision FilterV2(
495 int /*level*/, const Slice& key, ValueType /*value_type*/,
496 const Slice& /*existing_value*/, std::string* /*new_value*/,
497 std::string* /*skip_until*/) const override {
498 int i = std::stoi(key.ToString().substr(3));
499 if (i > 5 && i <= 105) {
500 return CompactionFilter::Decision::kRemove;
501 }
502 return CompactionFilter::Decision::kKeep;
503 }
504
Name() const505 const char* Name() const override { return "PartialDeleteCompactionFilter"; }
506 };
507
TEST_P(CompactionServiceTest,CompactionFilter)508 TEST_P(CompactionServiceTest, CompactionFilter) {
509 Options options = CurrentOptions();
510 std::unique_ptr<CompactionFilter> delete_comp_filter(
511 new PartialDeleteCompactionFilter());
512 options.compaction_filter = delete_comp_filter.get();
513 ReopenWithCompactionService(&options);
514
515 for (int i = 0; i < 20; i++) {
516 for (int j = 0; j < 10; j++) {
517 int key_id = i * 10 + j;
518 ASSERT_OK(Put(Key(key_id), "value" + ToString(key_id)));
519 }
520 ASSERT_OK(Flush());
521 }
522
523 for (int i = 0; i < 10; i++) {
524 for (int j = 0; j < 10; j++) {
525 int key_id = i * 20 + j * 2;
526 ASSERT_OK(Put(Key(key_id), "value_new" + ToString(key_id)));
527 }
528 ASSERT_OK(Flush());
529 }
530 ASSERT_OK(dbfull()->TEST_WaitForCompact());
531
532 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
533
534 // verify result
535 for (int i = 0; i < 200; i++) {
536 auto result = Get(Key(i));
537 if (i > 5 && i <= 105) {
538 ASSERT_EQ(result, "NOT_FOUND");
539 } else if (i % 2) {
540 ASSERT_EQ(result, "value" + ToString(i));
541 } else {
542 ASSERT_EQ(result, "value_new" + ToString(i));
543 }
544 }
545 auto my_cs = GetCompactionService();
546 ASSERT_GE(my_cs->GetCompactionNum(), 1);
547 }
548
TEST_P(CompactionServiceTest,Snapshot)549 TEST_P(CompactionServiceTest, Snapshot) {
550 Options options = CurrentOptions();
551 ReopenWithCompactionService(&options);
552
553 ASSERT_OK(Put(Key(1), "value1"));
554 ASSERT_OK(Put(Key(2), "value1"));
555 const Snapshot* s1 = db_->GetSnapshot();
556 ASSERT_OK(Flush());
557
558 ASSERT_OK(Put(Key(1), "value2"));
559 ASSERT_OK(Put(Key(3), "value2"));
560 ASSERT_OK(Flush());
561
562 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
563 auto my_cs = GetCompactionService();
564 ASSERT_GE(my_cs->GetCompactionNum(), 1);
565 ASSERT_EQ("value1", Get(Key(1), s1));
566 ASSERT_EQ("value2", Get(Key(1)));
567 db_->ReleaseSnapshot(s1);
568 }
569
TEST_P(CompactionServiceTest,ConcurrentCompaction)570 TEST_P(CompactionServiceTest, ConcurrentCompaction) {
571 Options options = CurrentOptions();
572 options.level0_file_num_compaction_trigger = 100;
573 options.max_background_jobs = 20;
574 ReopenWithCompactionService(&options);
575 GenerateTestData();
576
577 ColumnFamilyMetaData meta;
578 db_->GetColumnFamilyMetaData(&meta);
579
580 std::vector<std::thread> threads;
581 for (const auto& file : meta.levels[1].files) {
582 threads.push_back(std::thread([&]() {
583 std::string fname = file.db_path + "/" + file.name;
584 ASSERT_OK(db_->CompactFiles(CompactionOptions(), {fname}, 2));
585 }));
586 }
587
588 for (auto& thread : threads) {
589 thread.join();
590 }
591 ASSERT_OK(dbfull()->TEST_WaitForCompact());
592
593 // verify result
594 for (int i = 0; i < 200; i++) {
595 auto result = Get(Key(i));
596 if (i % 2) {
597 ASSERT_EQ(result, "value" + ToString(i));
598 } else {
599 ASSERT_EQ(result, "value_new" + ToString(i));
600 }
601 }
602 auto my_cs = GetCompactionService();
603 ASSERT_EQ(my_cs->GetCompactionNum(), 10);
604 ASSERT_EQ(FilesPerLevel(), "0,0,10");
605 }
606
TEST_P(CompactionServiceTest,CompactionInfo)607 TEST_P(CompactionServiceTest, CompactionInfo) {
608 // only test compaction info for new compaction service interface
609 if (GetParam() != MyTestCompactionServiceType) {
610 return;
611 }
612
613 Options options = CurrentOptions();
614 ReopenWithCompactionService(&options);
615
616 for (int i = 0; i < 20; i++) {
617 for (int j = 0; j < 10; j++) {
618 int key_id = i * 10 + j;
619 ASSERT_OK(Put(Key(key_id), "value" + ToString(key_id)));
620 }
621 ASSERT_OK(Flush());
622 }
623
624 for (int i = 0; i < 10; i++) {
625 for (int j = 0; j < 10; j++) {
626 int key_id = i * 20 + j * 2;
627 ASSERT_OK(Put(Key(key_id), "value_new" + ToString(key_id)));
628 }
629 ASSERT_OK(Flush());
630 }
631 ASSERT_OK(dbfull()->TEST_WaitForCompact());
632 auto my_cs =
633 static_cast_with_check<MyTestCompactionService>(GetCompactionService());
634 uint64_t comp_num = my_cs->GetCompactionNum();
635 ASSERT_GE(comp_num, 1);
636
637 CompactionServiceJobInfo info = my_cs->GetCompactionInfoForStart();
638 ASSERT_EQ(dbname_, info.db_name);
639 std::string db_id, db_session_id;
640 ASSERT_OK(db_->GetDbIdentity(db_id));
641 ASSERT_EQ(db_id, info.db_id);
642 ASSERT_OK(db_->GetDbSessionId(db_session_id));
643 ASSERT_EQ(db_session_id, info.db_session_id);
644 ASSERT_EQ(Env::LOW, info.priority);
645 info = my_cs->GetCompactionInfoForWait();
646 ASSERT_EQ(dbname_, info.db_name);
647 ASSERT_EQ(db_id, info.db_id);
648 ASSERT_EQ(db_session_id, info.db_session_id);
649 ASSERT_EQ(Env::LOW, info.priority);
650
651 // Test priority USER
652 ColumnFamilyMetaData meta;
653 db_->GetColumnFamilyMetaData(&meta);
654 SstFileMetaData file = meta.levels[1].files[0];
655 ASSERT_OK(db_->CompactFiles(CompactionOptions(),
656 {file.db_path + "/" + file.name}, 2));
657 info = my_cs->GetCompactionInfoForStart();
658 ASSERT_EQ(Env::USER, info.priority);
659 info = my_cs->GetCompactionInfoForWait();
660 ASSERT_EQ(Env::USER, info.priority);
661
662 // Test priority BOTTOM
663 env_->SetBackgroundThreads(1, Env::BOTTOM);
664 options.num_levels = 2;
665 ReopenWithCompactionService(&options);
666 my_cs =
667 static_cast_with_check<MyTestCompactionService>(GetCompactionService());
668
669 for (int i = 0; i < 20; i++) {
670 for (int j = 0; j < 10; j++) {
671 int key_id = i * 10 + j;
672 ASSERT_OK(Put(Key(key_id), "value" + ToString(key_id)));
673 }
674 ASSERT_OK(Flush());
675 }
676
677 for (int i = 0; i < 4; i++) {
678 for (int j = 0; j < 10; j++) {
679 int key_id = i * 20 + j * 2;
680 ASSERT_OK(Put(Key(key_id), "value_new" + ToString(key_id)));
681 }
682 ASSERT_OK(Flush());
683 }
684 ASSERT_OK(dbfull()->TEST_WaitForCompact());
685 info = my_cs->GetCompactionInfoForStart();
686 ASSERT_EQ(Env::BOTTOM, info.priority);
687 info = my_cs->GetCompactionInfoForWait();
688 ASSERT_EQ(Env::BOTTOM, info.priority);
689 }
690
TEST_P(CompactionServiceTest,FallbackLocalAuto)691 TEST_P(CompactionServiceTest, FallbackLocalAuto) {
692 Options options = CurrentOptions();
693 ReopenWithCompactionService(&options);
694
695 auto my_cs = GetCompactionService();
696 Statistics* compactor_statistics = GetCompactorStatistics();
697 Statistics* primary_statistics = GetPrimaryStatistics();
698 uint64_t compactor_write_bytes =
699 compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES);
700 uint64_t primary_write_bytes =
701 primary_statistics->getTickerCount(COMPACT_WRITE_BYTES);
702
703 my_cs->OverrideStartStatus(CompactionServiceJobStatus::kUseLocal);
704
705 for (int i = 0; i < 20; i++) {
706 for (int j = 0; j < 10; j++) {
707 int key_id = i * 10 + j;
708 ASSERT_OK(Put(Key(key_id), "value" + ToString(key_id)));
709 }
710 ASSERT_OK(Flush());
711 }
712
713 for (int i = 0; i < 10; i++) {
714 for (int j = 0; j < 10; j++) {
715 int key_id = i * 20 + j * 2;
716 ASSERT_OK(Put(Key(key_id), "value_new" + ToString(key_id)));
717 }
718 ASSERT_OK(Flush());
719 }
720 ASSERT_OK(dbfull()->TEST_WaitForCompact());
721
722 // verify result
723 for (int i = 0; i < 200; i++) {
724 auto result = Get(Key(i));
725 if (i % 2) {
726 ASSERT_EQ(result, "value" + ToString(i));
727 } else {
728 ASSERT_EQ(result, "value_new" + ToString(i));
729 }
730 }
731
732 ASSERT_EQ(my_cs->GetCompactionNum(), 0);
733
734 // make sure the compaction statistics is only recorded on the local side
735 ASSERT_EQ(compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES),
736 compactor_write_bytes);
737 ASSERT_GT(primary_statistics->getTickerCount(COMPACT_WRITE_BYTES),
738 primary_write_bytes);
739 ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_READ_BYTES), 0);
740 ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES), 0);
741 }
742
TEST_P(CompactionServiceTest,FallbackLocalManual)743 TEST_P(CompactionServiceTest, FallbackLocalManual) {
744 Options options = CurrentOptions();
745 options.disable_auto_compactions = true;
746 ReopenWithCompactionService(&options);
747
748 GenerateTestData();
749 VerifyTestData();
750
751 auto my_cs = GetCompactionService();
752 Statistics* compactor_statistics = GetCompactorStatistics();
753 Statistics* primary_statistics = GetPrimaryStatistics();
754 uint64_t compactor_write_bytes =
755 compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES);
756 uint64_t primary_write_bytes =
757 primary_statistics->getTickerCount(COMPACT_WRITE_BYTES);
758
759 // re-enable remote compaction
760 my_cs->ResetOverride();
761 std::string start_str = Key(15);
762 std::string end_str = Key(45);
763 Slice start(start_str);
764 Slice end(end_str);
765 uint64_t comp_num = my_cs->GetCompactionNum();
766
767 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &end));
768 ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
769 // make sure the compaction statistics is only recorded on the remote side
770 ASSERT_GT(compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES),
771 compactor_write_bytes);
772 ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES),
773 compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES));
774 ASSERT_EQ(primary_statistics->getTickerCount(COMPACT_WRITE_BYTES),
775 primary_write_bytes);
776
777 // return run local again with API WaitForComplete
778 my_cs->OverrideWaitStatus(CompactionServiceJobStatus::kUseLocal);
779 start_str = Key(120);
780 start = start_str;
781 comp_num = my_cs->GetCompactionNum();
782 compactor_write_bytes =
783 compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES);
784 primary_write_bytes = primary_statistics->getTickerCount(COMPACT_WRITE_BYTES);
785
786 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, nullptr));
787 ASSERT_EQ(my_cs->GetCompactionNum(),
788 comp_num); // no remote compaction is run
789 // make sure the compaction statistics is only recorded on the local side
790 ASSERT_EQ(compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES),
791 compactor_write_bytes);
792 ASSERT_GT(primary_statistics->getTickerCount(COMPACT_WRITE_BYTES),
793 primary_write_bytes);
794 ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES),
795 compactor_write_bytes);
796
797 // verify result after 2 manual compactions
798 VerifyTestData();
799 }
800
801 INSTANTIATE_TEST_CASE_P(
802 CompactionServiceTest, CompactionServiceTest,
803 ::testing::Values(
804 TestCompactionServiceType::MyTestCompactionServiceType,
805 TestCompactionServiceType::MyTestCompactionServiceLegacyType));
806
807 } // namespace ROCKSDB_NAMESPACE
808
809 #ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
810 extern "C" {
811 void RegisterCustomObjects(int argc, char** argv);
812 }
813 #else
RegisterCustomObjects(int,char **)814 void RegisterCustomObjects(int /*argc*/, char** /*argv*/) {}
815 #endif // !ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
816
main(int argc,char ** argv)817 int main(int argc, char** argv) {
818 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
819 ::testing::InitGoogleTest(&argc, argv);
820 RegisterCustomObjects(argc, argv);
821 return RUN_ALL_TESTS();
822 }
823
824 #else
825 #include <stdio.h>
826
main(int,char **)827 int main(int /*argc*/, char** /*argv*/) {
828 fprintf(stderr,
829 "SKIPPED as CompactionService is not supported in ROCKSDB_LITE\n");
830 return 0;
831 }
832
833 #endif // ROCKSDB_LITE
834