1 // Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // An example code demonstrating how to use CompactFiles, EventListener,
7 // and GetColumnFamilyMetaData APIs to implement custom compaction algorithm.
8 
9 #include <mutex>
10 #include <string>
11 #include "rocksdb/db.h"
12 #include "rocksdb/env.h"
13 #include "rocksdb/options.h"
14 
15 using namespace ROCKSDB_NAMESPACE;
16 std::string kDBPath = "/tmp/rocksdb_compact_files_example";
17 struct CompactionTask;
18 
19 // This is an example interface of external-compaction algorithm.
20 // Compaction algorithm can be implemented outside the core-RocksDB
21 // code by using the pluggable compaction APIs that RocksDb provides.
22 class Compactor : public EventListener {
23  public:
24   // Picks and returns a compaction task given the specified DB
25   // and column family.  It is the caller's responsibility to
26   // destroy the returned CompactionTask.  Returns "nullptr"
27   // if it cannot find a proper compaction task.
28   virtual CompactionTask* PickCompaction(
29       DB* db, const std::string& cf_name) = 0;
30 
31   // Schedule and run the specified compaction task in background.
32   virtual void ScheduleCompaction(CompactionTask *task) = 0;
33 };
34 
35 // Example structure that describes a compaction task.
36 struct CompactionTask {
CompactionTaskCompactionTask37   CompactionTask(
38       DB* _db, Compactor* _compactor,
39       const std::string& _column_family_name,
40       const std::vector<std::string>& _input_file_names,
41       const int _output_level,
42       const CompactionOptions& _compact_options,
43       bool _retry_on_fail)
44           : db(_db),
45             compactor(_compactor),
46             column_family_name(_column_family_name),
47             input_file_names(_input_file_names),
48             output_level(_output_level),
49             compact_options(_compact_options),
50             retry_on_fail(_retry_on_fail) {}
51   DB* db;
52   Compactor* compactor;
53   const std::string& column_family_name;
54   std::vector<std::string> input_file_names;
55   int output_level;
56   CompactionOptions compact_options;
57   bool retry_on_fail;
58 };
59 
60 // A simple compaction algorithm that always compacts everything
61 // to the highest level whenever possible.
62 class FullCompactor : public Compactor {
63  public:
FullCompactor(const Options options)64   explicit FullCompactor(const Options options) : options_(options) {
65     compact_options_.compression = options_.compression;
66     compact_options_.output_file_size_limit =
67         options_.target_file_size_base;
68   }
69 
70   // When flush happens, it determines whether to trigger compaction. If
71   // triggered_writes_stop is true, it will also set the retry flag of
72   // compaction-task to true.
OnFlushCompleted(DB * db,const FlushJobInfo & info)73   void OnFlushCompleted(
74       DB* db, const FlushJobInfo& info) override {
75     CompactionTask* task = PickCompaction(db, info.cf_name);
76     if (task != nullptr) {
77       if (info.triggered_writes_stop) {
78         task->retry_on_fail = true;
79       }
80       // Schedule compaction in a different thread.
81       ScheduleCompaction(task);
82     }
83   }
84 
85   // Always pick a compaction which includes all files whenever possible.
PickCompaction(DB * db,const std::string & cf_name)86   CompactionTask* PickCompaction(
87       DB* db, const std::string& cf_name) override {
88     ColumnFamilyMetaData cf_meta;
89     db->GetColumnFamilyMetaData(&cf_meta);
90 
91     std::vector<std::string> input_file_names;
92     for (auto level : cf_meta.levels) {
93       for (auto file : level.files) {
94         if (file.being_compacted) {
95           return nullptr;
96         }
97         input_file_names.push_back(file.name);
98       }
99     }
100     return new CompactionTask(
101         db, this, cf_name, input_file_names,
102         options_.num_levels - 1, compact_options_, false);
103   }
104 
105   // Schedule the specified compaction task in background.
ScheduleCompaction(CompactionTask * task)106   void ScheduleCompaction(CompactionTask* task) override {
107     options_.env->Schedule(&FullCompactor::CompactFiles, task);
108   }
109 
CompactFiles(void * arg)110   static void CompactFiles(void* arg) {
111     std::unique_ptr<CompactionTask> task(
112         reinterpret_cast<CompactionTask*>(arg));
113     assert(task);
114     assert(task->db);
115     Status s = task->db->CompactFiles(
116         task->compact_options,
117         task->input_file_names,
118         task->output_level);
119     printf("CompactFiles() finished with status %s\n", s.ToString().c_str());
120     if (!s.ok() && !s.IsIOError() && task->retry_on_fail) {
121       // If a compaction task with its retry_on_fail=true failed,
122       // try to schedule another compaction in case the reason
123       // is not an IO error.
124       CompactionTask* new_task = task->compactor->PickCompaction(
125           task->db, task->column_family_name);
126       task->compactor->ScheduleCompaction(new_task);
127     }
128   }
129 
130  private:
131   Options options_;
132   CompactionOptions compact_options_;
133 };
134 
main()135 int main() {
136   Options options;
137   options.create_if_missing = true;
138   // Disable RocksDB background compaction.
139   options.compaction_style = kCompactionStyleNone;
140   // Small slowdown and stop trigger for experimental purpose.
141   options.level0_slowdown_writes_trigger = 3;
142   options.level0_stop_writes_trigger = 5;
143   options.IncreaseParallelism(5);
144   options.listeners.emplace_back(new FullCompactor(options));
145 
146   DB* db = nullptr;
147   DestroyDB(kDBPath, options);
148   Status s = DB::Open(options, kDBPath, &db);
149   assert(s.ok());
150   assert(db);
151 
152   // if background compaction is not working, write will stall
153   // because of options.level0_stop_writes_trigger
154   for (int i = 1000; i < 99999; ++i) {
155     db->Put(WriteOptions(), std::to_string(i),
156                             std::string(500, 'a' + (i % 26)));
157   }
158 
159   // verify the values are still there
160   std::string value;
161   for (int i = 1000; i < 99999; ++i) {
162     db->Get(ReadOptions(), std::to_string(i),
163                            &value);
164     assert(value == std::string(500, 'a' + (i % 26)));
165   }
166 
167   // close the db.
168   delete db;
169 
170   return 0;
171 }
172