1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #ifndef ROCKSDB_LITE
7 
8 #include "db/db_test_util.h"
9 #include "port/stack_trace.h"
10 
11 namespace ROCKSDB_NAMESPACE {
12 
13 class MyTestCompactionService : public CompactionService {
14  public:
MyTestCompactionService(const std::string & db_path,std::shared_ptr<FileSystem> fs,Options & options)15   MyTestCompactionService(const std::string& db_path,
16                           std::shared_ptr<FileSystem> fs, Options& options)
17       : db_path_(db_path), fs_(fs), options_(options) {}
18 
Start(const std::string & compaction_service_input,int job_id)19   CompactionServiceJobStatus Start(const std::string& compaction_service_input,
20                                    int job_id) override {
21     InstrumentedMutexLock l(&mutex_);
22     jobs_.emplace(job_id, compaction_service_input);
23     CompactionServiceJobStatus s = CompactionServiceJobStatus::kSuccess;
24     TEST_SYNC_POINT_CALLBACK("MyTestCompactionService::Start::End", &s);
25     return s;
26   }
27 
WaitForComplete(int job_id,std::string * compaction_service_result)28   CompactionServiceJobStatus WaitForComplete(
29       int job_id, std::string* compaction_service_result) override {
30     std::string compaction_input;
31     {
32       InstrumentedMutexLock l(&mutex_);
33       auto i = jobs_.find(job_id);
34       if (i == jobs_.end()) {
35         return CompactionServiceJobStatus::kFailure;
36       }
37       compaction_input = std::move(i->second);
38       jobs_.erase(i);
39     }
40 
41     CompactionServiceOptionsOverride options_override;
42     options_override.env = options_.env;
43     options_override.file_checksum_gen_factory =
44         options_.file_checksum_gen_factory;
45     options_override.comparator = options_.comparator;
46     options_override.merge_operator = options_.merge_operator;
47     options_override.compaction_filter = options_.compaction_filter;
48     options_override.compaction_filter_factory =
49         options_.compaction_filter_factory;
50     options_override.prefix_extractor = options_.prefix_extractor;
51     options_override.table_factory = options_.table_factory;
52     options_override.sst_partitioner_factory = options_.sst_partitioner_factory;
53 
54     Status s = DB::OpenAndCompact(db_path_, db_path_ + "/" + ToString(job_id),
55                                   compaction_input, compaction_service_result,
56                                   options_override);
57     TEST_SYNC_POINT_CALLBACK("MyTestCompactionService::WaitForComplete::End",
58                              compaction_service_result);
59     compaction_num_.fetch_add(1);
60     if (s.ok()) {
61       return CompactionServiceJobStatus::kSuccess;
62     } else {
63       return CompactionServiceJobStatus::kFailure;
64     }
65   }
66 
GetCompactionNum()67   int GetCompactionNum() { return compaction_num_.load(); }
68 
69  private:
70   InstrumentedMutex mutex_;
71   std::atomic_int compaction_num_{0};
72   std::map<int, std::string> jobs_;
73   const std::string db_path_;
74   std::shared_ptr<FileSystem> fs_;
75   Options options_;
76 };
77 
78 class CompactionServiceTest : public DBTestBase {
79  public:
CompactionServiceTest()80   explicit CompactionServiceTest()
81       : DBTestBase("compaction_service_test", true) {}
82 
83  protected:
GenerateTestData()84   void GenerateTestData() {
85     // Generate 20 files @ L2
86     for (int i = 0; i < 20; i++) {
87       for (int j = 0; j < 10; j++) {
88         int key_id = i * 10 + j;
89         ASSERT_OK(Put(Key(key_id), "value" + ToString(key_id)));
90       }
91       ASSERT_OK(Flush());
92     }
93     MoveFilesToLevel(2);
94 
95     // Generate 10 files @ L1 overlap with all 20 files @ L2
96     for (int i = 0; i < 10; i++) {
97       for (int j = 0; j < 10; j++) {
98         int key_id = i * 20 + j * 2;
99         ASSERT_OK(Put(Key(key_id), "value_new" + ToString(key_id)));
100       }
101       ASSERT_OK(Flush());
102     }
103     MoveFilesToLevel(1);
104     ASSERT_EQ(FilesPerLevel(), "0,10,20");
105   }
106 
VerifyTestData()107   void VerifyTestData() {
108     for (int i = 0; i < 200; i++) {
109       auto result = Get(Key(i));
110       if (i % 2) {
111         ASSERT_EQ(result, "value" + ToString(i));
112       } else {
113         ASSERT_EQ(result, "value_new" + ToString(i));
114       }
115     }
116   }
117 };
118 
TEST_F(CompactionServiceTest,BasicCompactions)119 TEST_F(CompactionServiceTest, BasicCompactions) {
120   Options options = CurrentOptions();
121   options.env = env_;
122   options.compaction_service = std::make_shared<MyTestCompactionService>(
123       dbname_, env_->GetFileSystem(), options);
124 
125   DestroyAndReopen(options);
126 
127   for (int i = 0; i < 20; i++) {
128     for (int j = 0; j < 10; j++) {
129       int key_id = i * 10 + j;
130       ASSERT_OK(Put(Key(key_id), "value" + ToString(key_id)));
131     }
132     ASSERT_OK(Flush());
133   }
134 
135   for (int i = 0; i < 10; i++) {
136     for (int j = 0; j < 10; j++) {
137       int key_id = i * 20 + j * 2;
138       ASSERT_OK(Put(Key(key_id), "value_new" + ToString(key_id)));
139     }
140     ASSERT_OK(Flush());
141   }
142   ASSERT_OK(dbfull()->TEST_WaitForCompact());
143 
144   // verify result
145   for (int i = 0; i < 200; i++) {
146     auto result = Get(Key(i));
147     if (i % 2) {
148       ASSERT_EQ(result, "value" + ToString(i));
149     } else {
150       ASSERT_EQ(result, "value_new" + ToString(i));
151     }
152   }
153   auto my_cs =
154       dynamic_cast<MyTestCompactionService*>(options.compaction_service.get());
155   ASSERT_GE(my_cs->GetCompactionNum(), 1);
156 
157   // Test failed compaction
158   SyncPoint::GetInstance()->SetCallBack(
159       "DBImplSecondary::CompactWithoutInstallation::End", [&](void* status) {
160         // override job status
161         Status* s = static_cast<Status*>(status);
162         *s = Status::Aborted("MyTestCompactionService failed to compact!");
163       });
164   SyncPoint::GetInstance()->EnableProcessing();
165 
166   Status s;
167   for (int i = 0; i < 10; i++) {
168     for (int j = 0; j < 10; j++) {
169       int key_id = i * 20 + j * 2;
170       s = Put(Key(key_id), "value_new" + ToString(key_id));
171       if (s.IsAborted()) {
172         break;
173       }
174     }
175     if (s.IsAborted()) {
176       break;
177     }
178     s = Flush();
179     if (s.IsAborted()) {
180       break;
181     }
182     s = dbfull()->TEST_WaitForCompact();
183     if (s.IsAborted()) {
184       break;
185     }
186   }
187   ASSERT_TRUE(s.IsAborted());
188 }
189 
TEST_F(CompactionServiceTest,ManualCompaction)190 TEST_F(CompactionServiceTest, ManualCompaction) {
191   Options options = CurrentOptions();
192   options.env = env_;
193   options.disable_auto_compactions = true;
194   options.compaction_service = std::make_shared<MyTestCompactionService>(
195       dbname_, env_->GetFileSystem(), options);
196   DestroyAndReopen(options);
197   GenerateTestData();
198 
199   auto my_cs =
200       dynamic_cast<MyTestCompactionService*>(options.compaction_service.get());
201 
202   std::string start_str = Key(15);
203   std::string end_str = Key(45);
204   Slice start(start_str);
205   Slice end(end_str);
206   uint64_t comp_num = my_cs->GetCompactionNum();
207   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &end));
208   ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
209   VerifyTestData();
210 
211   start_str = Key(120);
212   start = start_str;
213   comp_num = my_cs->GetCompactionNum();
214   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, nullptr));
215   ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
216   VerifyTestData();
217 
218   end_str = Key(92);
219   end = end_str;
220   comp_num = my_cs->GetCompactionNum();
221   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, &end));
222   ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
223   VerifyTestData();
224 
225   comp_num = my_cs->GetCompactionNum();
226   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
227   ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
228   VerifyTestData();
229 }
230 
TEST_F(CompactionServiceTest,FailedToStart)231 TEST_F(CompactionServiceTest, FailedToStart) {
232   Options options = CurrentOptions();
233   options.env = env_;
234   options.disable_auto_compactions = true;
235   options.compaction_service = std::make_shared<MyTestCompactionService>(
236       dbname_, env_->GetFileSystem(), options);
237   DestroyAndReopen(options);
238   GenerateTestData();
239 
240   SyncPoint::GetInstance()->SetCallBack(
241       "MyTestCompactionService::Start::End", [&](void* status) {
242         // override job status
243         auto s = static_cast<CompactionServiceJobStatus*>(status);
244         *s = CompactionServiceJobStatus::kFailure;
245       });
246   SyncPoint::GetInstance()->EnableProcessing();
247 
248   std::string start_str = Key(15);
249   std::string end_str = Key(45);
250   Slice start(start_str);
251   Slice end(end_str);
252   Status s = db_->CompactRange(CompactRangeOptions(), &start, &end);
253   ASSERT_TRUE(s.IsIncomplete());
254 }
255 
TEST_F(CompactionServiceTest,InvalidResult)256 TEST_F(CompactionServiceTest, InvalidResult) {
257   Options options = CurrentOptions();
258   options.env = env_;
259   options.disable_auto_compactions = true;
260   options.compaction_service = std::make_shared<MyTestCompactionService>(
261       dbname_, env_->GetFileSystem(), options);
262   DestroyAndReopen(options);
263   GenerateTestData();
264 
265   SyncPoint::GetInstance()->SetCallBack(
266       "MyTestCompactionService::WaitForComplete::End", [&](void* result) {
267         // override job status
268         auto result_str = static_cast<std::string*>(result);
269         *result_str = "Invalid Str";
270       });
271   SyncPoint::GetInstance()->EnableProcessing();
272 
273   std::string start_str = Key(15);
274   std::string end_str = Key(45);
275   Slice start(start_str);
276   Slice end(end_str);
277   Status s = db_->CompactRange(CompactRangeOptions(), &start, &end);
278   ASSERT_FALSE(s.ok());
279 }
280 
281 // TODO: support sub-compaction
TEST_F(CompactionServiceTest,DISABLED_SubCompaction)282 TEST_F(CompactionServiceTest, DISABLED_SubCompaction) {
283   Options options = CurrentOptions();
284   options.env = env_;
285   options.max_subcompactions = 10;
286   options.target_file_size_base = 1 << 10;  // 1KB
287   options.disable_auto_compactions = true;
288   options.compaction_service = std::make_shared<MyTestCompactionService>(
289       dbname_, env_->GetFileSystem(), options);
290 
291   DestroyAndReopen(options);
292   GenerateTestData();
293 
294   auto cro = CompactRangeOptions();
295   cro.max_subcompactions = 10;
296   db_->CompactRange(cro, nullptr, nullptr);
297 }
298 
299 class PartialDeleteCompactionFilter : public CompactionFilter {
300  public:
FilterV2(int,const Slice & key,ValueType,const Slice &,std::string *,std::string *) const301   CompactionFilter::Decision FilterV2(
302       int /*level*/, const Slice& key, ValueType /*value_type*/,
303       const Slice& /*existing_value*/, std::string* /*new_value*/,
304       std::string* /*skip_until*/) const override {
305     int i = std::stoi(key.ToString().substr(3));
306     if (i > 5 && i <= 105) {
307       return CompactionFilter::Decision::kRemove;
308     }
309     return CompactionFilter::Decision::kKeep;
310   }
311 
Name() const312   const char* Name() const override { return "PartialDeleteCompactionFilter"; }
313 };
314 
TEST_F(CompactionServiceTest,CompactionFilter)315 TEST_F(CompactionServiceTest, CompactionFilter) {
316   Options options = CurrentOptions();
317   options.env = env_;
318   auto delete_comp_filter = PartialDeleteCompactionFilter();
319   options.compaction_filter = &delete_comp_filter;
320   options.compaction_service = std::make_shared<MyTestCompactionService>(
321       dbname_, env_->GetFileSystem(), options);
322 
323   DestroyAndReopen(options);
324 
325   for (int i = 0; i < 20; i++) {
326     for (int j = 0; j < 10; j++) {
327       int key_id = i * 10 + j;
328       ASSERT_OK(Put(Key(key_id), "value" + ToString(key_id)));
329     }
330     ASSERT_OK(Flush());
331   }
332 
333   for (int i = 0; i < 10; i++) {
334     for (int j = 0; j < 10; j++) {
335       int key_id = i * 20 + j * 2;
336       ASSERT_OK(Put(Key(key_id), "value_new" + ToString(key_id)));
337     }
338     ASSERT_OK(Flush());
339   }
340   ASSERT_OK(dbfull()->TEST_WaitForCompact());
341 
342   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
343 
344   // verify result
345   for (int i = 0; i < 200; i++) {
346     auto result = Get(Key(i));
347     if (i > 5 && i <= 105) {
348       ASSERT_EQ(result, "NOT_FOUND");
349     } else if (i % 2) {
350       ASSERT_EQ(result, "value" + ToString(i));
351     } else {
352       ASSERT_EQ(result, "value_new" + ToString(i));
353     }
354   }
355   auto my_cs =
356       dynamic_cast<MyTestCompactionService*>(options.compaction_service.get());
357   ASSERT_GE(my_cs->GetCompactionNum(), 1);
358 }
359 
TEST_F(CompactionServiceTest,Snapshot)360 TEST_F(CompactionServiceTest, Snapshot) {
361   Options options = CurrentOptions();
362   options.env = env_;
363   options.compaction_service = std::make_shared<MyTestCompactionService>(
364       dbname_, env_->GetFileSystem(), options);
365 
366   DestroyAndReopen(options);
367 
368   ASSERT_OK(Put(Key(1), "value1"));
369   ASSERT_OK(Put(Key(2), "value1"));
370   const Snapshot* s1 = db_->GetSnapshot();
371   ASSERT_OK(Flush());
372 
373   ASSERT_OK(Put(Key(1), "value2"));
374   ASSERT_OK(Put(Key(3), "value2"));
375   ASSERT_OK(Flush());
376 
377   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
378   auto my_cs =
379       dynamic_cast<MyTestCompactionService*>(options.compaction_service.get());
380   ASSERT_GE(my_cs->GetCompactionNum(), 1);
381   ASSERT_EQ("value1", Get(Key(1), s1));
382   ASSERT_EQ("value2", Get(Key(1)));
383   db_->ReleaseSnapshot(s1);
384 }
385 
TEST_F(CompactionServiceTest,ConcurrentCompaction)386 TEST_F(CompactionServiceTest, ConcurrentCompaction) {
387   Options options = CurrentOptions();
388   options.level0_file_num_compaction_trigger = 100;
389   options.env = env_;
390   options.compaction_service = std::make_shared<MyTestCompactionService>(
391       dbname_, env_->GetFileSystem(), options);
392   options.max_background_jobs = 20;
393 
394   DestroyAndReopen(options);
395   GenerateTestData();
396 
397   ColumnFamilyMetaData meta;
398   db_->GetColumnFamilyMetaData(&meta);
399 
400   std::vector<std::thread> threads;
401   for (const auto& file : meta.levels[1].files) {
402     threads.push_back(std::thread([&]() {
403       std::string fname = file.db_path + "/" + file.name;
404       ASSERT_OK(db_->CompactFiles(CompactionOptions(), {fname}, 2));
405     }));
406   }
407 
408   for (auto& thread : threads) {
409     thread.join();
410   }
411   ASSERT_OK(dbfull()->TEST_WaitForCompact());
412 
413   // verify result
414   for (int i = 0; i < 200; i++) {
415     auto result = Get(Key(i));
416     if (i % 2) {
417       ASSERT_EQ(result, "value" + ToString(i));
418     } else {
419       ASSERT_EQ(result, "value_new" + ToString(i));
420     }
421   }
422   auto my_cs =
423       dynamic_cast<MyTestCompactionService*>(options.compaction_service.get());
424   ASSERT_EQ(my_cs->GetCompactionNum(), 10);
425   ASSERT_EQ(FilesPerLevel(), "0,0,10");
426 }
427 
428 }  // namespace ROCKSDB_NAMESPACE
429 
430 #ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
431 extern "C" {
432 void RegisterCustomObjects(int argc, char** argv);
433 }
434 #else
RegisterCustomObjects(int,char **)435 void RegisterCustomObjects(int /*argc*/, char** /*argv*/) {}
436 #endif  // !ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
437 
main(int argc,char ** argv)438 int main(int argc, char** argv) {
439   ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
440   ::testing::InitGoogleTest(&argc, argv);
441   RegisterCustomObjects(argc, argv);
442   return RUN_ALL_TESTS();
443 }
444 
445 #else
446 #include <stdio.h>
447 
main(int,char **)448 int main(int /*argc*/, char** /*argv*/) {
449   fprintf(stderr,
450           "SKIPPED as CompactionService is not supported in ROCKSDB_LITE\n");
451   return 0;
452 }
453 
454 #endif  // ROCKSDB_LITE
455