1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #ifndef ROCKSDB_LITE
7 
8 #include "db/db_test_util.h"
9 #include "port/stack_trace.h"
10 
11 namespace ROCKSDB_NAMESPACE {
12 
13 class TestCompactionServiceBase {
14  public:
15   virtual int GetCompactionNum() = 0;
16 
OverrideStartStatus(CompactionServiceJobStatus s)17   void OverrideStartStatus(CompactionServiceJobStatus s) {
18     is_override_start_status = true;
19     override_start_status = s;
20   }
21 
OverrideWaitStatus(CompactionServiceJobStatus s)22   void OverrideWaitStatus(CompactionServiceJobStatus s) {
23     is_override_wait_status = true;
24     override_wait_status = s;
25   }
26 
OverrideWaitResult(std::string str)27   void OverrideWaitResult(std::string str) {
28     is_override_wait_result = true;
29     override_wait_result = std::move(str);
30   }
31 
ResetOverride()32   void ResetOverride() {
33     is_override_wait_result = false;
34     is_override_start_status = false;
35     is_override_wait_status = false;
36   }
37 
38   virtual ~TestCompactionServiceBase() = default;
39 
40  protected:
41   bool is_override_start_status = false;
42   CompactionServiceJobStatus override_start_status =
43       CompactionServiceJobStatus::kFailure;
44   bool is_override_wait_status = false;
45   CompactionServiceJobStatus override_wait_status =
46       CompactionServiceJobStatus::kFailure;
47   bool is_override_wait_result = false;
48   std::string override_wait_result;
49 };
50 
51 class MyTestCompactionServiceLegacy : public CompactionService,
52                                       public TestCompactionServiceBase {
53  public:
MyTestCompactionServiceLegacy(std::string db_path,Options & options,std::shared_ptr<Statistics> & statistics)54   MyTestCompactionServiceLegacy(std::string db_path, Options& options,
55                                 std::shared_ptr<Statistics>& statistics)
56       : db_path_(std::move(db_path)),
57         options_(options),
58         statistics_(statistics) {}
59 
kClassName()60   static const char* kClassName() { return "MyTestCompactionServiceLegacy"; }
61 
Name() const62   const char* Name() const override { return kClassName(); }
63 
Start(const std::string & compaction_service_input,uint64_t job_id)64   CompactionServiceJobStatus Start(const std::string& compaction_service_input,
65                                    uint64_t job_id) override {
66     InstrumentedMutexLock l(&mutex_);
67     jobs_.emplace(job_id, compaction_service_input);
68     CompactionServiceJobStatus s = CompactionServiceJobStatus::kSuccess;
69     if (is_override_start_status) {
70       return override_start_status;
71     }
72     return s;
73   }
74 
WaitForComplete(uint64_t job_id,std::string * compaction_service_result)75   CompactionServiceJobStatus WaitForComplete(
76       uint64_t job_id, std::string* compaction_service_result) override {
77     std::string compaction_input;
78     {
79       InstrumentedMutexLock l(&mutex_);
80       auto i = jobs_.find(job_id);
81       if (i == jobs_.end()) {
82         return CompactionServiceJobStatus::kFailure;
83       }
84       compaction_input = std::move(i->second);
85       jobs_.erase(i);
86     }
87 
88     if (is_override_wait_status) {
89       return override_wait_status;
90     }
91 
92     CompactionServiceOptionsOverride options_override;
93     options_override.env = options_.env;
94     options_override.file_checksum_gen_factory =
95         options_.file_checksum_gen_factory;
96     options_override.comparator = options_.comparator;
97     options_override.merge_operator = options_.merge_operator;
98     options_override.compaction_filter = options_.compaction_filter;
99     options_override.compaction_filter_factory =
100         options_.compaction_filter_factory;
101     options_override.prefix_extractor = options_.prefix_extractor;
102     options_override.table_factory = options_.table_factory;
103     options_override.sst_partitioner_factory = options_.sst_partitioner_factory;
104     options_override.statistics = statistics_;
105 
106     Status s = DB::OpenAndCompact(
107         db_path_, db_path_ + "/" + ROCKSDB_NAMESPACE::ToString(job_id),
108         compaction_input, compaction_service_result, options_override);
109     if (is_override_wait_result) {
110       *compaction_service_result = override_wait_result;
111     }
112     compaction_num_.fetch_add(1);
113     if (s.ok()) {
114       return CompactionServiceJobStatus::kSuccess;
115     } else {
116       return CompactionServiceJobStatus::kFailure;
117     }
118   }
119 
GetCompactionNum()120   int GetCompactionNum() override { return compaction_num_.load(); }
121 
122  private:
123   InstrumentedMutex mutex_;
124   std::atomic_int compaction_num_{0};
125   std::map<uint64_t, std::string> jobs_;
126   const std::string db_path_;
127   Options options_;
128   std::shared_ptr<Statistics> statistics_;
129 };
130 
131 class MyTestCompactionService : public CompactionService,
132                                 public TestCompactionServiceBase {
133  public:
MyTestCompactionService(std::string db_path,Options & options,std::shared_ptr<Statistics> & statistics)134   MyTestCompactionService(std::string db_path, Options& options,
135                           std::shared_ptr<Statistics>& statistics)
136       : db_path_(std::move(db_path)),
137         options_(options),
138         statistics_(statistics),
139         start_info_("na", "na", "na", 0, Env::TOTAL),
140         wait_info_("na", "na", "na", 0, Env::TOTAL) {}
141 
kClassName()142   static const char* kClassName() { return "MyTestCompactionService"; }
143 
Name() const144   const char* Name() const override { return kClassName(); }
145 
StartV2(const CompactionServiceJobInfo & info,const std::string & compaction_service_input)146   CompactionServiceJobStatus StartV2(
147       const CompactionServiceJobInfo& info,
148       const std::string& compaction_service_input) override {
149     InstrumentedMutexLock l(&mutex_);
150     start_info_ = info;
151     assert(info.db_name == db_path_);
152     jobs_.emplace(info.job_id, compaction_service_input);
153     CompactionServiceJobStatus s = CompactionServiceJobStatus::kSuccess;
154     if (is_override_start_status) {
155       return override_start_status;
156     }
157     return s;
158   }
159 
WaitForCompleteV2(const CompactionServiceJobInfo & info,std::string * compaction_service_result)160   CompactionServiceJobStatus WaitForCompleteV2(
161       const CompactionServiceJobInfo& info,
162       std::string* compaction_service_result) override {
163     std::string compaction_input;
164     assert(info.db_name == db_path_);
165     {
166       InstrumentedMutexLock l(&mutex_);
167       wait_info_ = info;
168       auto i = jobs_.find(info.job_id);
169       if (i == jobs_.end()) {
170         return CompactionServiceJobStatus::kFailure;
171       }
172       compaction_input = std::move(i->second);
173       jobs_.erase(i);
174     }
175 
176     if (is_override_wait_status) {
177       return override_wait_status;
178     }
179 
180     CompactionServiceOptionsOverride options_override;
181     options_override.env = options_.env;
182     options_override.file_checksum_gen_factory =
183         options_.file_checksum_gen_factory;
184     options_override.comparator = options_.comparator;
185     options_override.merge_operator = options_.merge_operator;
186     options_override.compaction_filter = options_.compaction_filter;
187     options_override.compaction_filter_factory =
188         options_.compaction_filter_factory;
189     options_override.prefix_extractor = options_.prefix_extractor;
190     options_override.table_factory = options_.table_factory;
191     options_override.sst_partitioner_factory = options_.sst_partitioner_factory;
192     options_override.statistics = statistics_;
193 
194     Status s = DB::OpenAndCompact(
195         db_path_, db_path_ + "/" + ROCKSDB_NAMESPACE::ToString(info.job_id),
196         compaction_input, compaction_service_result, options_override);
197     if (is_override_wait_result) {
198       *compaction_service_result = override_wait_result;
199     }
200     compaction_num_.fetch_add(1);
201     if (s.ok()) {
202       return CompactionServiceJobStatus::kSuccess;
203     } else {
204       return CompactionServiceJobStatus::kFailure;
205     }
206   }
207 
GetCompactionNum()208   int GetCompactionNum() override { return compaction_num_.load(); }
209 
GetCompactionInfoForStart()210   CompactionServiceJobInfo GetCompactionInfoForStart() { return start_info_; }
GetCompactionInfoForWait()211   CompactionServiceJobInfo GetCompactionInfoForWait() { return wait_info_; }
212 
213  private:
214   InstrumentedMutex mutex_;
215   std::atomic_int compaction_num_{0};
216   std::map<uint64_t, std::string> jobs_;
217   const std::string db_path_;
218   Options options_;
219   std::shared_ptr<Statistics> statistics_;
220   CompactionServiceJobInfo start_info_;
221   CompactionServiceJobInfo wait_info_;
222 };
223 
224 // This is only for listing test classes
225 enum TestCompactionServiceType {
226   MyTestCompactionServiceType,
227   MyTestCompactionServiceLegacyType,
228 };
229 
230 class CompactionServiceTest
231     : public DBTestBase,
232       public testing::WithParamInterface<TestCompactionServiceType> {
233  public:
CompactionServiceTest()234   explicit CompactionServiceTest()
235       : DBTestBase("compaction_service_test", true) {}
236 
237  protected:
ReopenWithCompactionService(Options * options)238   void ReopenWithCompactionService(Options* options) {
239     options->env = env_;
240     primary_statistics_ = CreateDBStatistics();
241     options->statistics = primary_statistics_;
242     compactor_statistics_ = CreateDBStatistics();
243     TestCompactionServiceType cs_type = GetParam();
244     switch (cs_type) {
245       case MyTestCompactionServiceType:
246         compaction_service_ = std::make_shared<MyTestCompactionService>(
247             dbname_, *options, compactor_statistics_);
248         break;
249       case MyTestCompactionServiceLegacyType:
250         compaction_service_ = std::make_shared<MyTestCompactionServiceLegacy>(
251             dbname_, *options, compactor_statistics_);
252         break;
253       default:
254         assert(false);
255     }
256     options->compaction_service = compaction_service_;
257     DestroyAndReopen(*options);
258   }
259 
GetCompactorStatistics()260   Statistics* GetCompactorStatistics() { return compactor_statistics_.get(); }
261 
GetPrimaryStatistics()262   Statistics* GetPrimaryStatistics() { return primary_statistics_.get(); }
263 
GetCompactionService()264   TestCompactionServiceBase* GetCompactionService() {
265     CompactionService* cs = compaction_service_.get();
266     return dynamic_cast<TestCompactionServiceBase*>(cs);
267   }
268 
GenerateTestData()269   void GenerateTestData() {
270     // Generate 20 files @ L2
271     for (int i = 0; i < 20; i++) {
272       for (int j = 0; j < 10; j++) {
273         int key_id = i * 10 + j;
274         ASSERT_OK(Put(Key(key_id), "value" + ToString(key_id)));
275       }
276       ASSERT_OK(Flush());
277     }
278     MoveFilesToLevel(2);
279 
280     // Generate 10 files @ L1 overlap with all 20 files @ L2
281     for (int i = 0; i < 10; i++) {
282       for (int j = 0; j < 10; j++) {
283         int key_id = i * 20 + j * 2;
284         ASSERT_OK(Put(Key(key_id), "value_new" + ToString(key_id)));
285       }
286       ASSERT_OK(Flush());
287     }
288     MoveFilesToLevel(1);
289     ASSERT_EQ(FilesPerLevel(), "0,10,20");
290   }
291 
VerifyTestData()292   void VerifyTestData() {
293     for (int i = 0; i < 200; i++) {
294       auto result = Get(Key(i));
295       if (i % 2) {
296         ASSERT_EQ(result, "value" + ToString(i));
297       } else {
298         ASSERT_EQ(result, "value_new" + ToString(i));
299       }
300     }
301   }
302 
303  private:
304   std::shared_ptr<Statistics> compactor_statistics_;
305   std::shared_ptr<Statistics> primary_statistics_;
306   std::shared_ptr<CompactionService> compaction_service_;
307 };
308 
TEST_P(CompactionServiceTest,BasicCompactions)309 TEST_P(CompactionServiceTest, BasicCompactions) {
310   Options options = CurrentOptions();
311   ReopenWithCompactionService(&options);
312 
313   Statistics* primary_statistics = GetPrimaryStatistics();
314   Statistics* compactor_statistics = GetCompactorStatistics();
315 
316   for (int i = 0; i < 20; i++) {
317     for (int j = 0; j < 10; j++) {
318       int key_id = i * 10 + j;
319       ASSERT_OK(Put(Key(key_id), "value" + ToString(key_id)));
320     }
321     ASSERT_OK(Flush());
322   }
323 
324   for (int i = 0; i < 10; i++) {
325     for (int j = 0; j < 10; j++) {
326       int key_id = i * 20 + j * 2;
327       ASSERT_OK(Put(Key(key_id), "value_new" + ToString(key_id)));
328     }
329     ASSERT_OK(Flush());
330   }
331   ASSERT_OK(dbfull()->TEST_WaitForCompact());
332 
333   // verify result
334   for (int i = 0; i < 200; i++) {
335     auto result = Get(Key(i));
336     if (i % 2) {
337       ASSERT_EQ(result, "value" + ToString(i));
338     } else {
339       ASSERT_EQ(result, "value_new" + ToString(i));
340     }
341   }
342   auto my_cs = GetCompactionService();
343   ASSERT_GE(my_cs->GetCompactionNum(), 1);
344 
345   // make sure the compaction statistics is only recorded on the remote side
346   ASSERT_GE(compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES), 1);
347   ASSERT_GE(compactor_statistics->getTickerCount(COMPACT_READ_BYTES), 1);
348   ASSERT_EQ(primary_statistics->getTickerCount(COMPACT_WRITE_BYTES), 0);
349   // even with remote compaction, primary host still needs to read SST files to
350   // `verify_table()`.
351   ASSERT_GE(primary_statistics->getTickerCount(COMPACT_READ_BYTES), 1);
352   // all the compaction write happens on the remote side
353   ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES),
354             compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES));
355   ASSERT_GE(primary_statistics->getTickerCount(REMOTE_COMPACT_READ_BYTES), 1);
356   ASSERT_GT(primary_statistics->getTickerCount(COMPACT_READ_BYTES),
357             primary_statistics->getTickerCount(REMOTE_COMPACT_READ_BYTES));
358   // compactor is already the remote side, which doesn't have remote
359   ASSERT_EQ(compactor_statistics->getTickerCount(REMOTE_COMPACT_READ_BYTES), 0);
360   ASSERT_EQ(compactor_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES),
361             0);
362 
363   // Test failed compaction
364   SyncPoint::GetInstance()->SetCallBack(
365       "DBImplSecondary::CompactWithoutInstallation::End", [&](void* status) {
366         // override job status
367         auto s = static_cast<Status*>(status);
368         *s = Status::Aborted("MyTestCompactionService failed to compact!");
369       });
370   SyncPoint::GetInstance()->EnableProcessing();
371 
372   Status s;
373   for (int i = 0; i < 10; i++) {
374     for (int j = 0; j < 10; j++) {
375       int key_id = i * 20 + j * 2;
376       s = Put(Key(key_id), "value_new" + ToString(key_id));
377       if (s.IsAborted()) {
378         break;
379       }
380     }
381     if (s.IsAborted()) {
382       break;
383     }
384     s = Flush();
385     if (s.IsAborted()) {
386       break;
387     }
388     s = dbfull()->TEST_WaitForCompact();
389     if (s.IsAborted()) {
390       break;
391     }
392   }
393   ASSERT_TRUE(s.IsAborted());
394 }
395 
TEST_P(CompactionServiceTest,ManualCompaction)396 TEST_P(CompactionServiceTest, ManualCompaction) {
397   Options options = CurrentOptions();
398   options.disable_auto_compactions = true;
399   ReopenWithCompactionService(&options);
400   GenerateTestData();
401 
402   auto my_cs = GetCompactionService();
403 
404   std::string start_str = Key(15);
405   std::string end_str = Key(45);
406   Slice start(start_str);
407   Slice end(end_str);
408   uint64_t comp_num = my_cs->GetCompactionNum();
409   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &end));
410   ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
411   VerifyTestData();
412 
413   start_str = Key(120);
414   start = start_str;
415   comp_num = my_cs->GetCompactionNum();
416   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, nullptr));
417   ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
418   VerifyTestData();
419 
420   end_str = Key(92);
421   end = end_str;
422   comp_num = my_cs->GetCompactionNum();
423   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, &end));
424   ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
425   VerifyTestData();
426 
427   comp_num = my_cs->GetCompactionNum();
428   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
429   ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
430   VerifyTestData();
431 }
432 
TEST_P(CompactionServiceTest,FailedToStart)433 TEST_P(CompactionServiceTest, FailedToStart) {
434   Options options = CurrentOptions();
435   options.disable_auto_compactions = true;
436   ReopenWithCompactionService(&options);
437 
438   GenerateTestData();
439 
440   auto my_cs = GetCompactionService();
441   my_cs->OverrideStartStatus(CompactionServiceJobStatus::kFailure);
442 
443   std::string start_str = Key(15);
444   std::string end_str = Key(45);
445   Slice start(start_str);
446   Slice end(end_str);
447   Status s = db_->CompactRange(CompactRangeOptions(), &start, &end);
448   ASSERT_TRUE(s.IsIncomplete());
449 }
450 
TEST_P(CompactionServiceTest,InvalidResult)451 TEST_P(CompactionServiceTest, InvalidResult) {
452   Options options = CurrentOptions();
453   options.disable_auto_compactions = true;
454   ReopenWithCompactionService(&options);
455 
456   GenerateTestData();
457 
458   auto my_cs = GetCompactionService();
459   my_cs->OverrideWaitResult("Invalid Str");
460 
461   std::string start_str = Key(15);
462   std::string end_str = Key(45);
463   Slice start(start_str);
464   Slice end(end_str);
465   Status s = db_->CompactRange(CompactRangeOptions(), &start, &end);
466   ASSERT_FALSE(s.ok());
467 }
468 
TEST_P(CompactionServiceTest,SubCompaction)469 TEST_P(CompactionServiceTest, SubCompaction) {
470   Options options = CurrentOptions();
471   options.max_subcompactions = 10;
472   options.target_file_size_base = 1 << 10;  // 1KB
473   options.disable_auto_compactions = true;
474   ReopenWithCompactionService(&options);
475 
476   GenerateTestData();
477   VerifyTestData();
478 
479   auto my_cs = GetCompactionService();
480   int compaction_num_before = my_cs->GetCompactionNum();
481 
482   auto cro = CompactRangeOptions();
483   cro.max_subcompactions = 10;
484   Status s = db_->CompactRange(cro, nullptr, nullptr);
485   ASSERT_OK(s);
486   VerifyTestData();
487   int compaction_num = my_cs->GetCompactionNum() - compaction_num_before;
488   // make sure there's sub-compaction by checking the compaction number
489   ASSERT_GE(compaction_num, 2);
490 }
491 
492 class PartialDeleteCompactionFilter : public CompactionFilter {
493  public:
FilterV2(int,const Slice & key,ValueType,const Slice &,std::string *,std::string *) const494   CompactionFilter::Decision FilterV2(
495       int /*level*/, const Slice& key, ValueType /*value_type*/,
496       const Slice& /*existing_value*/, std::string* /*new_value*/,
497       std::string* /*skip_until*/) const override {
498     int i = std::stoi(key.ToString().substr(3));
499     if (i > 5 && i <= 105) {
500       return CompactionFilter::Decision::kRemove;
501     }
502     return CompactionFilter::Decision::kKeep;
503   }
504 
Name() const505   const char* Name() const override { return "PartialDeleteCompactionFilter"; }
506 };
507 
TEST_P(CompactionServiceTest,CompactionFilter)508 TEST_P(CompactionServiceTest, CompactionFilter) {
509   Options options = CurrentOptions();
510   std::unique_ptr<CompactionFilter> delete_comp_filter(
511       new PartialDeleteCompactionFilter());
512   options.compaction_filter = delete_comp_filter.get();
513   ReopenWithCompactionService(&options);
514 
515   for (int i = 0; i < 20; i++) {
516     for (int j = 0; j < 10; j++) {
517       int key_id = i * 10 + j;
518       ASSERT_OK(Put(Key(key_id), "value" + ToString(key_id)));
519     }
520     ASSERT_OK(Flush());
521   }
522 
523   for (int i = 0; i < 10; i++) {
524     for (int j = 0; j < 10; j++) {
525       int key_id = i * 20 + j * 2;
526       ASSERT_OK(Put(Key(key_id), "value_new" + ToString(key_id)));
527     }
528     ASSERT_OK(Flush());
529   }
530   ASSERT_OK(dbfull()->TEST_WaitForCompact());
531 
532   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
533 
534   // verify result
535   for (int i = 0; i < 200; i++) {
536     auto result = Get(Key(i));
537     if (i > 5 && i <= 105) {
538       ASSERT_EQ(result, "NOT_FOUND");
539     } else if (i % 2) {
540       ASSERT_EQ(result, "value" + ToString(i));
541     } else {
542       ASSERT_EQ(result, "value_new" + ToString(i));
543     }
544   }
545   auto my_cs = GetCompactionService();
546   ASSERT_GE(my_cs->GetCompactionNum(), 1);
547 }
548 
TEST_P(CompactionServiceTest,Snapshot)549 TEST_P(CompactionServiceTest, Snapshot) {
550   Options options = CurrentOptions();
551   ReopenWithCompactionService(&options);
552 
553   ASSERT_OK(Put(Key(1), "value1"));
554   ASSERT_OK(Put(Key(2), "value1"));
555   const Snapshot* s1 = db_->GetSnapshot();
556   ASSERT_OK(Flush());
557 
558   ASSERT_OK(Put(Key(1), "value2"));
559   ASSERT_OK(Put(Key(3), "value2"));
560   ASSERT_OK(Flush());
561 
562   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
563   auto my_cs = GetCompactionService();
564   ASSERT_GE(my_cs->GetCompactionNum(), 1);
565   ASSERT_EQ("value1", Get(Key(1), s1));
566   ASSERT_EQ("value2", Get(Key(1)));
567   db_->ReleaseSnapshot(s1);
568 }
569 
TEST_P(CompactionServiceTest,ConcurrentCompaction)570 TEST_P(CompactionServiceTest, ConcurrentCompaction) {
571   Options options = CurrentOptions();
572   options.level0_file_num_compaction_trigger = 100;
573   options.max_background_jobs = 20;
574   ReopenWithCompactionService(&options);
575   GenerateTestData();
576 
577   ColumnFamilyMetaData meta;
578   db_->GetColumnFamilyMetaData(&meta);
579 
580   std::vector<std::thread> threads;
581   for (const auto& file : meta.levels[1].files) {
582     threads.push_back(std::thread([&]() {
583       std::string fname = file.db_path + "/" + file.name;
584       ASSERT_OK(db_->CompactFiles(CompactionOptions(), {fname}, 2));
585     }));
586   }
587 
588   for (auto& thread : threads) {
589     thread.join();
590   }
591   ASSERT_OK(dbfull()->TEST_WaitForCompact());
592 
593   // verify result
594   for (int i = 0; i < 200; i++) {
595     auto result = Get(Key(i));
596     if (i % 2) {
597       ASSERT_EQ(result, "value" + ToString(i));
598     } else {
599       ASSERT_EQ(result, "value_new" + ToString(i));
600     }
601   }
602   auto my_cs = GetCompactionService();
603   ASSERT_EQ(my_cs->GetCompactionNum(), 10);
604   ASSERT_EQ(FilesPerLevel(), "0,0,10");
605 }
606 
TEST_P(CompactionServiceTest,CompactionInfo)607 TEST_P(CompactionServiceTest, CompactionInfo) {
608   // only test compaction info for new compaction service interface
609   if (GetParam() != MyTestCompactionServiceType) {
610     return;
611   }
612 
613   Options options = CurrentOptions();
614   ReopenWithCompactionService(&options);
615 
616   for (int i = 0; i < 20; i++) {
617     for (int j = 0; j < 10; j++) {
618       int key_id = i * 10 + j;
619       ASSERT_OK(Put(Key(key_id), "value" + ToString(key_id)));
620     }
621     ASSERT_OK(Flush());
622   }
623 
624   for (int i = 0; i < 10; i++) {
625     for (int j = 0; j < 10; j++) {
626       int key_id = i * 20 + j * 2;
627       ASSERT_OK(Put(Key(key_id), "value_new" + ToString(key_id)));
628     }
629     ASSERT_OK(Flush());
630   }
631   ASSERT_OK(dbfull()->TEST_WaitForCompact());
632   auto my_cs =
633       static_cast_with_check<MyTestCompactionService>(GetCompactionService());
634   uint64_t comp_num = my_cs->GetCompactionNum();
635   ASSERT_GE(comp_num, 1);
636 
637   CompactionServiceJobInfo info = my_cs->GetCompactionInfoForStart();
638   ASSERT_EQ(dbname_, info.db_name);
639   std::string db_id, db_session_id;
640   ASSERT_OK(db_->GetDbIdentity(db_id));
641   ASSERT_EQ(db_id, info.db_id);
642   ASSERT_OK(db_->GetDbSessionId(db_session_id));
643   ASSERT_EQ(db_session_id, info.db_session_id);
644   ASSERT_EQ(Env::LOW, info.priority);
645   info = my_cs->GetCompactionInfoForWait();
646   ASSERT_EQ(dbname_, info.db_name);
647   ASSERT_EQ(db_id, info.db_id);
648   ASSERT_EQ(db_session_id, info.db_session_id);
649   ASSERT_EQ(Env::LOW, info.priority);
650 
651   // Test priority USER
652   ColumnFamilyMetaData meta;
653   db_->GetColumnFamilyMetaData(&meta);
654   SstFileMetaData file = meta.levels[1].files[0];
655   ASSERT_OK(db_->CompactFiles(CompactionOptions(),
656                               {file.db_path + "/" + file.name}, 2));
657   info = my_cs->GetCompactionInfoForStart();
658   ASSERT_EQ(Env::USER, info.priority);
659   info = my_cs->GetCompactionInfoForWait();
660   ASSERT_EQ(Env::USER, info.priority);
661 
662   // Test priority BOTTOM
663   env_->SetBackgroundThreads(1, Env::BOTTOM);
664   options.num_levels = 2;
665   ReopenWithCompactionService(&options);
666   my_cs =
667       static_cast_with_check<MyTestCompactionService>(GetCompactionService());
668 
669   for (int i = 0; i < 20; i++) {
670     for (int j = 0; j < 10; j++) {
671       int key_id = i * 10 + j;
672       ASSERT_OK(Put(Key(key_id), "value" + ToString(key_id)));
673     }
674     ASSERT_OK(Flush());
675   }
676 
677   for (int i = 0; i < 4; i++) {
678     for (int j = 0; j < 10; j++) {
679       int key_id = i * 20 + j * 2;
680       ASSERT_OK(Put(Key(key_id), "value_new" + ToString(key_id)));
681     }
682     ASSERT_OK(Flush());
683   }
684   ASSERT_OK(dbfull()->TEST_WaitForCompact());
685   info = my_cs->GetCompactionInfoForStart();
686   ASSERT_EQ(Env::BOTTOM, info.priority);
687   info = my_cs->GetCompactionInfoForWait();
688   ASSERT_EQ(Env::BOTTOM, info.priority);
689 }
690 
TEST_P(CompactionServiceTest,FallbackLocalAuto)691 TEST_P(CompactionServiceTest, FallbackLocalAuto) {
692   Options options = CurrentOptions();
693   ReopenWithCompactionService(&options);
694 
695   auto my_cs = GetCompactionService();
696   Statistics* compactor_statistics = GetCompactorStatistics();
697   Statistics* primary_statistics = GetPrimaryStatistics();
698   uint64_t compactor_write_bytes =
699       compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES);
700   uint64_t primary_write_bytes =
701       primary_statistics->getTickerCount(COMPACT_WRITE_BYTES);
702 
703   my_cs->OverrideStartStatus(CompactionServiceJobStatus::kUseLocal);
704 
705   for (int i = 0; i < 20; i++) {
706     for (int j = 0; j < 10; j++) {
707       int key_id = i * 10 + j;
708       ASSERT_OK(Put(Key(key_id), "value" + ToString(key_id)));
709     }
710     ASSERT_OK(Flush());
711   }
712 
713   for (int i = 0; i < 10; i++) {
714     for (int j = 0; j < 10; j++) {
715       int key_id = i * 20 + j * 2;
716       ASSERT_OK(Put(Key(key_id), "value_new" + ToString(key_id)));
717     }
718     ASSERT_OK(Flush());
719   }
720   ASSERT_OK(dbfull()->TEST_WaitForCompact());
721 
722   // verify result
723   for (int i = 0; i < 200; i++) {
724     auto result = Get(Key(i));
725     if (i % 2) {
726       ASSERT_EQ(result, "value" + ToString(i));
727     } else {
728       ASSERT_EQ(result, "value_new" + ToString(i));
729     }
730   }
731 
732   ASSERT_EQ(my_cs->GetCompactionNum(), 0);
733 
734   // make sure the compaction statistics is only recorded on the local side
735   ASSERT_EQ(compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES),
736             compactor_write_bytes);
737   ASSERT_GT(primary_statistics->getTickerCount(COMPACT_WRITE_BYTES),
738             primary_write_bytes);
739   ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_READ_BYTES), 0);
740   ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES), 0);
741 }
742 
TEST_P(CompactionServiceTest,FallbackLocalManual)743 TEST_P(CompactionServiceTest, FallbackLocalManual) {
744   Options options = CurrentOptions();
745   options.disable_auto_compactions = true;
746   ReopenWithCompactionService(&options);
747 
748   GenerateTestData();
749   VerifyTestData();
750 
751   auto my_cs = GetCompactionService();
752   Statistics* compactor_statistics = GetCompactorStatistics();
753   Statistics* primary_statistics = GetPrimaryStatistics();
754   uint64_t compactor_write_bytes =
755       compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES);
756   uint64_t primary_write_bytes =
757       primary_statistics->getTickerCount(COMPACT_WRITE_BYTES);
758 
759   // re-enable remote compaction
760   my_cs->ResetOverride();
761   std::string start_str = Key(15);
762   std::string end_str = Key(45);
763   Slice start(start_str);
764   Slice end(end_str);
765   uint64_t comp_num = my_cs->GetCompactionNum();
766 
767   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &end));
768   ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
769   // make sure the compaction statistics is only recorded on the remote side
770   ASSERT_GT(compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES),
771             compactor_write_bytes);
772   ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES),
773             compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES));
774   ASSERT_EQ(primary_statistics->getTickerCount(COMPACT_WRITE_BYTES),
775             primary_write_bytes);
776 
777   // return run local again with API WaitForComplete
778   my_cs->OverrideWaitStatus(CompactionServiceJobStatus::kUseLocal);
779   start_str = Key(120);
780   start = start_str;
781   comp_num = my_cs->GetCompactionNum();
782   compactor_write_bytes =
783       compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES);
784   primary_write_bytes = primary_statistics->getTickerCount(COMPACT_WRITE_BYTES);
785 
786   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, nullptr));
787   ASSERT_EQ(my_cs->GetCompactionNum(),
788             comp_num);  // no remote compaction is run
789   // make sure the compaction statistics is only recorded on the local side
790   ASSERT_EQ(compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES),
791             compactor_write_bytes);
792   ASSERT_GT(primary_statistics->getTickerCount(COMPACT_WRITE_BYTES),
793             primary_write_bytes);
794   ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES),
795             compactor_write_bytes);
796 
797   // verify result after 2 manual compactions
798   VerifyTestData();
799 }
800 
801 INSTANTIATE_TEST_CASE_P(
802     CompactionServiceTest, CompactionServiceTest,
803     ::testing::Values(
804         TestCompactionServiceType::MyTestCompactionServiceType,
805         TestCompactionServiceType::MyTestCompactionServiceLegacyType));
806 
807 }  // namespace ROCKSDB_NAMESPACE
808 
809 #ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
810 extern "C" {
811 void RegisterCustomObjects(int argc, char** argv);
812 }
813 #else
RegisterCustomObjects(int,char **)814 void RegisterCustomObjects(int /*argc*/, char** /*argv*/) {}
815 #endif  // !ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
816 
main(int argc,char ** argv)817 int main(int argc, char** argv) {
818   ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
819   ::testing::InitGoogleTest(&argc, argv);
820   RegisterCustomObjects(argc, argv);
821   return RUN_ALL_TESTS();
822 }
823 
824 #else
825 #include <stdio.h>
826 
main(int,char **)827 int main(int /*argc*/, char** /*argv*/) {
828   fprintf(stderr,
829           "SKIPPED as CompactionService is not supported in ROCKSDB_LITE\n");
830   return 0;
831 }
832 
833 #endif  // ROCKSDB_LITE
834