1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #ifndef ROCKSDB_LITE
7
8 #include "db/db_test_util.h"
9 #include "port/stack_trace.h"
10
11 namespace ROCKSDB_NAMESPACE {
12
13 class MyTestCompactionService : public CompactionService {
14 public:
MyTestCompactionService(const std::string & db_path,std::shared_ptr<FileSystem> fs,Options & options)15 MyTestCompactionService(const std::string& db_path,
16 std::shared_ptr<FileSystem> fs, Options& options)
17 : db_path_(db_path), fs_(fs), options_(options) {}
18
Start(const std::string & compaction_service_input,int job_id)19 CompactionServiceJobStatus Start(const std::string& compaction_service_input,
20 int job_id) override {
21 InstrumentedMutexLock l(&mutex_);
22 jobs_.emplace(job_id, compaction_service_input);
23 CompactionServiceJobStatus s = CompactionServiceJobStatus::kSuccess;
24 TEST_SYNC_POINT_CALLBACK("MyTestCompactionService::Start::End", &s);
25 return s;
26 }
27
WaitForComplete(int job_id,std::string * compaction_service_result)28 CompactionServiceJobStatus WaitForComplete(
29 int job_id, std::string* compaction_service_result) override {
30 std::string compaction_input;
31 {
32 InstrumentedMutexLock l(&mutex_);
33 auto i = jobs_.find(job_id);
34 if (i == jobs_.end()) {
35 return CompactionServiceJobStatus::kFailure;
36 }
37 compaction_input = std::move(i->second);
38 jobs_.erase(i);
39 }
40
41 CompactionServiceOptionsOverride options_override;
42 options_override.env = options_.env;
43 options_override.file_checksum_gen_factory =
44 options_.file_checksum_gen_factory;
45 options_override.comparator = options_.comparator;
46 options_override.merge_operator = options_.merge_operator;
47 options_override.compaction_filter = options_.compaction_filter;
48 options_override.compaction_filter_factory =
49 options_.compaction_filter_factory;
50 options_override.prefix_extractor = options_.prefix_extractor;
51 options_override.table_factory = options_.table_factory;
52 options_override.sst_partitioner_factory = options_.sst_partitioner_factory;
53
54 Status s = DB::OpenAndCompact(db_path_, db_path_ + "/" + ToString(job_id),
55 compaction_input, compaction_service_result,
56 options_override);
57 TEST_SYNC_POINT_CALLBACK("MyTestCompactionService::WaitForComplete::End",
58 compaction_service_result);
59 compaction_num_.fetch_add(1);
60 if (s.ok()) {
61 return CompactionServiceJobStatus::kSuccess;
62 } else {
63 return CompactionServiceJobStatus::kFailure;
64 }
65 }
66
GetCompactionNum()67 int GetCompactionNum() { return compaction_num_.load(); }
68
69 private:
70 InstrumentedMutex mutex_;
71 std::atomic_int compaction_num_{0};
72 std::map<int, std::string> jobs_;
73 const std::string db_path_;
74 std::shared_ptr<FileSystem> fs_;
75 Options options_;
76 };
77
78 class CompactionServiceTest : public DBTestBase {
79 public:
CompactionServiceTest()80 explicit CompactionServiceTest()
81 : DBTestBase("compaction_service_test", true) {}
82
83 protected:
GenerateTestData()84 void GenerateTestData() {
85 // Generate 20 files @ L2
86 for (int i = 0; i < 20; i++) {
87 for (int j = 0; j < 10; j++) {
88 int key_id = i * 10 + j;
89 ASSERT_OK(Put(Key(key_id), "value" + ToString(key_id)));
90 }
91 ASSERT_OK(Flush());
92 }
93 MoveFilesToLevel(2);
94
95 // Generate 10 files @ L1 overlap with all 20 files @ L2
96 for (int i = 0; i < 10; i++) {
97 for (int j = 0; j < 10; j++) {
98 int key_id = i * 20 + j * 2;
99 ASSERT_OK(Put(Key(key_id), "value_new" + ToString(key_id)));
100 }
101 ASSERT_OK(Flush());
102 }
103 MoveFilesToLevel(1);
104 ASSERT_EQ(FilesPerLevel(), "0,10,20");
105 }
106
VerifyTestData()107 void VerifyTestData() {
108 for (int i = 0; i < 200; i++) {
109 auto result = Get(Key(i));
110 if (i % 2) {
111 ASSERT_EQ(result, "value" + ToString(i));
112 } else {
113 ASSERT_EQ(result, "value_new" + ToString(i));
114 }
115 }
116 }
117 };
118
TEST_F(CompactionServiceTest,BasicCompactions)119 TEST_F(CompactionServiceTest, BasicCompactions) {
120 Options options = CurrentOptions();
121 options.env = env_;
122 options.compaction_service = std::make_shared<MyTestCompactionService>(
123 dbname_, env_->GetFileSystem(), options);
124
125 DestroyAndReopen(options);
126
127 for (int i = 0; i < 20; i++) {
128 for (int j = 0; j < 10; j++) {
129 int key_id = i * 10 + j;
130 ASSERT_OK(Put(Key(key_id), "value" + ToString(key_id)));
131 }
132 ASSERT_OK(Flush());
133 }
134
135 for (int i = 0; i < 10; i++) {
136 for (int j = 0; j < 10; j++) {
137 int key_id = i * 20 + j * 2;
138 ASSERT_OK(Put(Key(key_id), "value_new" + ToString(key_id)));
139 }
140 ASSERT_OK(Flush());
141 }
142 ASSERT_OK(dbfull()->TEST_WaitForCompact());
143
144 // verify result
145 for (int i = 0; i < 200; i++) {
146 auto result = Get(Key(i));
147 if (i % 2) {
148 ASSERT_EQ(result, "value" + ToString(i));
149 } else {
150 ASSERT_EQ(result, "value_new" + ToString(i));
151 }
152 }
153 auto my_cs =
154 dynamic_cast<MyTestCompactionService*>(options.compaction_service.get());
155 ASSERT_GE(my_cs->GetCompactionNum(), 1);
156
157 // Test failed compaction
158 SyncPoint::GetInstance()->SetCallBack(
159 "DBImplSecondary::CompactWithoutInstallation::End", [&](void* status) {
160 // override job status
161 Status* s = static_cast<Status*>(status);
162 *s = Status::Aborted("MyTestCompactionService failed to compact!");
163 });
164 SyncPoint::GetInstance()->EnableProcessing();
165
166 Status s;
167 for (int i = 0; i < 10; i++) {
168 for (int j = 0; j < 10; j++) {
169 int key_id = i * 20 + j * 2;
170 s = Put(Key(key_id), "value_new" + ToString(key_id));
171 if (s.IsAborted()) {
172 break;
173 }
174 }
175 if (s.IsAborted()) {
176 break;
177 }
178 s = Flush();
179 if (s.IsAborted()) {
180 break;
181 }
182 s = dbfull()->TEST_WaitForCompact();
183 if (s.IsAborted()) {
184 break;
185 }
186 }
187 ASSERT_TRUE(s.IsAborted());
188 }
189
TEST_F(CompactionServiceTest,ManualCompaction)190 TEST_F(CompactionServiceTest, ManualCompaction) {
191 Options options = CurrentOptions();
192 options.env = env_;
193 options.disable_auto_compactions = true;
194 options.compaction_service = std::make_shared<MyTestCompactionService>(
195 dbname_, env_->GetFileSystem(), options);
196 DestroyAndReopen(options);
197 GenerateTestData();
198
199 auto my_cs =
200 dynamic_cast<MyTestCompactionService*>(options.compaction_service.get());
201
202 std::string start_str = Key(15);
203 std::string end_str = Key(45);
204 Slice start(start_str);
205 Slice end(end_str);
206 uint64_t comp_num = my_cs->GetCompactionNum();
207 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &end));
208 ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
209 VerifyTestData();
210
211 start_str = Key(120);
212 start = start_str;
213 comp_num = my_cs->GetCompactionNum();
214 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, nullptr));
215 ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
216 VerifyTestData();
217
218 end_str = Key(92);
219 end = end_str;
220 comp_num = my_cs->GetCompactionNum();
221 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, &end));
222 ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
223 VerifyTestData();
224
225 comp_num = my_cs->GetCompactionNum();
226 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
227 ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
228 VerifyTestData();
229 }
230
TEST_F(CompactionServiceTest,FailedToStart)231 TEST_F(CompactionServiceTest, FailedToStart) {
232 Options options = CurrentOptions();
233 options.env = env_;
234 options.disable_auto_compactions = true;
235 options.compaction_service = std::make_shared<MyTestCompactionService>(
236 dbname_, env_->GetFileSystem(), options);
237 DestroyAndReopen(options);
238 GenerateTestData();
239
240 SyncPoint::GetInstance()->SetCallBack(
241 "MyTestCompactionService::Start::End", [&](void* status) {
242 // override job status
243 auto s = static_cast<CompactionServiceJobStatus*>(status);
244 *s = CompactionServiceJobStatus::kFailure;
245 });
246 SyncPoint::GetInstance()->EnableProcessing();
247
248 std::string start_str = Key(15);
249 std::string end_str = Key(45);
250 Slice start(start_str);
251 Slice end(end_str);
252 Status s = db_->CompactRange(CompactRangeOptions(), &start, &end);
253 ASSERT_TRUE(s.IsIncomplete());
254 }
255
TEST_F(CompactionServiceTest,InvalidResult)256 TEST_F(CompactionServiceTest, InvalidResult) {
257 Options options = CurrentOptions();
258 options.env = env_;
259 options.disable_auto_compactions = true;
260 options.compaction_service = std::make_shared<MyTestCompactionService>(
261 dbname_, env_->GetFileSystem(), options);
262 DestroyAndReopen(options);
263 GenerateTestData();
264
265 SyncPoint::GetInstance()->SetCallBack(
266 "MyTestCompactionService::WaitForComplete::End", [&](void* result) {
267 // override job status
268 auto result_str = static_cast<std::string*>(result);
269 *result_str = "Invalid Str";
270 });
271 SyncPoint::GetInstance()->EnableProcessing();
272
273 std::string start_str = Key(15);
274 std::string end_str = Key(45);
275 Slice start(start_str);
276 Slice end(end_str);
277 Status s = db_->CompactRange(CompactRangeOptions(), &start, &end);
278 ASSERT_FALSE(s.ok());
279 }
280
281 // TODO: support sub-compaction
TEST_F(CompactionServiceTest,DISABLED_SubCompaction)282 TEST_F(CompactionServiceTest, DISABLED_SubCompaction) {
283 Options options = CurrentOptions();
284 options.env = env_;
285 options.max_subcompactions = 10;
286 options.target_file_size_base = 1 << 10; // 1KB
287 options.disable_auto_compactions = true;
288 options.compaction_service = std::make_shared<MyTestCompactionService>(
289 dbname_, env_->GetFileSystem(), options);
290
291 DestroyAndReopen(options);
292 GenerateTestData();
293
294 auto cro = CompactRangeOptions();
295 cro.max_subcompactions = 10;
296 db_->CompactRange(cro, nullptr, nullptr);
297 }
298
299 class PartialDeleteCompactionFilter : public CompactionFilter {
300 public:
FilterV2(int,const Slice & key,ValueType,const Slice &,std::string *,std::string *) const301 CompactionFilter::Decision FilterV2(
302 int /*level*/, const Slice& key, ValueType /*value_type*/,
303 const Slice& /*existing_value*/, std::string* /*new_value*/,
304 std::string* /*skip_until*/) const override {
305 int i = std::stoi(key.ToString().substr(3));
306 if (i > 5 && i <= 105) {
307 return CompactionFilter::Decision::kRemove;
308 }
309 return CompactionFilter::Decision::kKeep;
310 }
311
Name() const312 const char* Name() const override { return "PartialDeleteCompactionFilter"; }
313 };
314
TEST_F(CompactionServiceTest,CompactionFilter)315 TEST_F(CompactionServiceTest, CompactionFilter) {
316 Options options = CurrentOptions();
317 options.env = env_;
318 auto delete_comp_filter = PartialDeleteCompactionFilter();
319 options.compaction_filter = &delete_comp_filter;
320 options.compaction_service = std::make_shared<MyTestCompactionService>(
321 dbname_, env_->GetFileSystem(), options);
322
323 DestroyAndReopen(options);
324
325 for (int i = 0; i < 20; i++) {
326 for (int j = 0; j < 10; j++) {
327 int key_id = i * 10 + j;
328 ASSERT_OK(Put(Key(key_id), "value" + ToString(key_id)));
329 }
330 ASSERT_OK(Flush());
331 }
332
333 for (int i = 0; i < 10; i++) {
334 for (int j = 0; j < 10; j++) {
335 int key_id = i * 20 + j * 2;
336 ASSERT_OK(Put(Key(key_id), "value_new" + ToString(key_id)));
337 }
338 ASSERT_OK(Flush());
339 }
340 ASSERT_OK(dbfull()->TEST_WaitForCompact());
341
342 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
343
344 // verify result
345 for (int i = 0; i < 200; i++) {
346 auto result = Get(Key(i));
347 if (i > 5 && i <= 105) {
348 ASSERT_EQ(result, "NOT_FOUND");
349 } else if (i % 2) {
350 ASSERT_EQ(result, "value" + ToString(i));
351 } else {
352 ASSERT_EQ(result, "value_new" + ToString(i));
353 }
354 }
355 auto my_cs =
356 dynamic_cast<MyTestCompactionService*>(options.compaction_service.get());
357 ASSERT_GE(my_cs->GetCompactionNum(), 1);
358 }
359
TEST_F(CompactionServiceTest,Snapshot)360 TEST_F(CompactionServiceTest, Snapshot) {
361 Options options = CurrentOptions();
362 options.env = env_;
363 options.compaction_service = std::make_shared<MyTestCompactionService>(
364 dbname_, env_->GetFileSystem(), options);
365
366 DestroyAndReopen(options);
367
368 ASSERT_OK(Put(Key(1), "value1"));
369 ASSERT_OK(Put(Key(2), "value1"));
370 const Snapshot* s1 = db_->GetSnapshot();
371 ASSERT_OK(Flush());
372
373 ASSERT_OK(Put(Key(1), "value2"));
374 ASSERT_OK(Put(Key(3), "value2"));
375 ASSERT_OK(Flush());
376
377 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
378 auto my_cs =
379 dynamic_cast<MyTestCompactionService*>(options.compaction_service.get());
380 ASSERT_GE(my_cs->GetCompactionNum(), 1);
381 ASSERT_EQ("value1", Get(Key(1), s1));
382 ASSERT_EQ("value2", Get(Key(1)));
383 db_->ReleaseSnapshot(s1);
384 }
385
TEST_F(CompactionServiceTest,ConcurrentCompaction)386 TEST_F(CompactionServiceTest, ConcurrentCompaction) {
387 Options options = CurrentOptions();
388 options.level0_file_num_compaction_trigger = 100;
389 options.env = env_;
390 options.compaction_service = std::make_shared<MyTestCompactionService>(
391 dbname_, env_->GetFileSystem(), options);
392 options.max_background_jobs = 20;
393
394 DestroyAndReopen(options);
395 GenerateTestData();
396
397 ColumnFamilyMetaData meta;
398 db_->GetColumnFamilyMetaData(&meta);
399
400 std::vector<std::thread> threads;
401 for (const auto& file : meta.levels[1].files) {
402 threads.push_back(std::thread([&]() {
403 std::string fname = file.db_path + "/" + file.name;
404 ASSERT_OK(db_->CompactFiles(CompactionOptions(), {fname}, 2));
405 }));
406 }
407
408 for (auto& thread : threads) {
409 thread.join();
410 }
411 ASSERT_OK(dbfull()->TEST_WaitForCompact());
412
413 // verify result
414 for (int i = 0; i < 200; i++) {
415 auto result = Get(Key(i));
416 if (i % 2) {
417 ASSERT_EQ(result, "value" + ToString(i));
418 } else {
419 ASSERT_EQ(result, "value_new" + ToString(i));
420 }
421 }
422 auto my_cs =
423 dynamic_cast<MyTestCompactionService*>(options.compaction_service.get());
424 ASSERT_EQ(my_cs->GetCompactionNum(), 10);
425 ASSERT_EQ(FilesPerLevel(), "0,0,10");
426 }
427
428 } // namespace ROCKSDB_NAMESPACE
429
430 #ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
431 extern "C" {
432 void RegisterCustomObjects(int argc, char** argv);
433 }
434 #else
RegisterCustomObjects(int,char **)435 void RegisterCustomObjects(int /*argc*/, char** /*argv*/) {}
436 #endif // !ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
437
main(int argc,char ** argv)438 int main(int argc, char** argv) {
439 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
440 ::testing::InitGoogleTest(&argc, argv);
441 RegisterCustomObjects(argc, argv);
442 return RUN_ALL_TESTS();
443 }
444
445 #else
446 #include <stdio.h>
447
main(int,char **)448 int main(int /*argc*/, char** /*argv*/) {
449 fprintf(stderr,
450 "SKIPPED as CompactionService is not supported in ROCKSDB_LITE\n");
451 return 0;
452 }
453
454 #endif // ROCKSDB_LITE
455