1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #pragma once
10 
11 #include <atomic>
12 #include <deque>
13 #include <functional>
14 #include <limits>
15 #include <set>
16 #include <string>
17 #include <utility>
18 #include <vector>
19 
20 #include "db/blob/blob_file_completion_callback.h"
21 #include "db/column_family.h"
22 #include "db/compaction/compaction_iterator.h"
23 #include "db/flush_scheduler.h"
24 #include "db/internal_stats.h"
25 #include "db/job_context.h"
26 #include "db/log_writer.h"
27 #include "db/memtable_list.h"
28 #include "db/range_del_aggregator.h"
29 #include "db/version_edit.h"
30 #include "db/write_controller.h"
31 #include "db/write_thread.h"
32 #include "logging/event_logger.h"
33 #include "options/cf_options.h"
34 #include "options/db_options.h"
35 #include "port/port.h"
36 #include "rocksdb/compaction_filter.h"
37 #include "rocksdb/compaction_job_stats.h"
38 #include "rocksdb/db.h"
39 #include "rocksdb/env.h"
40 #include "rocksdb/memtablerep.h"
41 #include "rocksdb/transaction_log.h"
42 #include "table/scoped_arena_iterator.h"
43 #include "util/autovector.h"
44 #include "util/stop_watch.h"
45 #include "util/thread_local.h"
46 
47 namespace ROCKSDB_NAMESPACE {
48 
49 class Arena;
50 class ErrorHandler;
51 class MemTable;
52 class SnapshotChecker;
53 class SystemClock;
54 class TableCache;
55 class Version;
56 class VersionEdit;
57 class VersionSet;
58 
59 // CompactionJob is responsible for executing the compaction. Each (manual or
60 // automated) compaction corresponds to a CompactionJob object, and usually
61 // goes through the stages of `Prepare()`->`Run()`->`Install()`. CompactionJob
62 // will divide the compaction into subcompactions and execute them in parallel
63 // if needed.
64 class CompactionJob {
65  public:
66   CompactionJob(
67       int job_id, Compaction* compaction, const ImmutableDBOptions& db_options,
68       const MutableDBOptions& mutable_db_options,
69       const FileOptions& file_options, VersionSet* versions,
70       const std::atomic<bool>* shutting_down,
71       const SequenceNumber preserve_deletes_seqnum, LogBuffer* log_buffer,
72       FSDirectory* db_directory, FSDirectory* output_directory,
73       FSDirectory* blob_output_directory, Statistics* stats,
74       InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler,
75       std::vector<SequenceNumber> existing_snapshots,
76       SequenceNumber earliest_write_conflict_snapshot,
77       const SnapshotChecker* snapshot_checker,
78       std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
79       bool paranoid_file_checks, bool measure_io_stats,
80       const std::string& dbname, CompactionJobStats* compaction_job_stats,
81       Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer,
82       const std::atomic<int>* manual_compaction_paused = nullptr,
83       const std::atomic<bool>* manual_compaction_canceled = nullptr,
84       const std::string& db_id = "", const std::string& db_session_id = "",
85       std::string full_history_ts_low = "",
86       BlobFileCompletionCallback* blob_callback = nullptr);
87 
88   virtual ~CompactionJob();
89 
90   // no copy/move
91   CompactionJob(CompactionJob&& job) = delete;
92   CompactionJob(const CompactionJob& job) = delete;
93   CompactionJob& operator=(const CompactionJob& job) = delete;
94 
95   // REQUIRED: mutex held
96   // Prepare for the compaction by setting up boundaries for each subcompaction
97   void Prepare();
98   // REQUIRED mutex not held
99   // Launch threads for each subcompaction and wait for them to finish. After
100   // that, verify table is usable and finally do bookkeeping to unify
101   // subcompaction results
102   Status Run();
103 
104   // REQUIRED: mutex held
105   // Add compaction input/output to the current version
106   Status Install(const MutableCFOptions& mutable_cf_options);
107 
108   // Return the IO status
io_status()109   IOStatus io_status() const { return io_status_; }
110 
111  protected:
112   struct SubcompactionState;
113   // CompactionJob state
114   struct CompactionState;
115 
116   void AggregateStatistics();
117   void UpdateCompactionStats();
118   void LogCompaction();
119   virtual void RecordCompactionIOStats();
120   void CleanupCompaction();
121 
122   // Call compaction filter. Then iterate through input and compact the
123   // kv-pairs
124   void ProcessKeyValueCompaction(SubcompactionState* sub_compact);
125 
126   CompactionState* compact_;
127   InternalStats::CompactionStats compaction_stats_;
128   const ImmutableDBOptions& db_options_;
129   const MutableDBOptions mutable_db_options_copy_;
130   LogBuffer* log_buffer_;
131   FSDirectory* output_directory_;
132   Statistics* stats_;
133   // Is this compaction creating a file in the bottom most level?
134   bool bottommost_level_;
135 
136   Env::WriteLifeTimeHint write_hint_;
137 
138   IOStatus io_status_;
139 
140  private:
141   // Generates a histogram representing potential divisions of key ranges from
142   // the input. It adds the starting and/or ending keys of certain input files
143   // to the working set and then finds the approximate size of data in between
144   // each consecutive pair of slices. Then it divides these ranges into
145   // consecutive groups such that each group has a similar size.
146   void GenSubcompactionBoundaries();
147 
148   CompactionServiceJobStatus ProcessKeyValueCompactionWithCompactionService(
149       SubcompactionState* sub_compact);
150 
151   // update the thread status for starting a compaction.
152   void ReportStartedCompaction(Compaction* compaction);
153   void AllocateCompactionOutputFileNumbers();
154 
155   Status FinishCompactionOutputFile(
156       const Status& input_status, SubcompactionState* sub_compact,
157       CompactionRangeDelAggregator* range_del_agg,
158       CompactionIterationStats* range_del_out_stats,
159       const Slice* next_table_min_key = nullptr);
160   Status InstallCompactionResults(const MutableCFOptions& mutable_cf_options);
161   Status OpenCompactionOutputFile(SubcompactionState* sub_compact);
162   void UpdateCompactionJobStats(
163     const InternalStats::CompactionStats& stats) const;
164   void RecordDroppedKeys(const CompactionIterationStats& c_iter_stats,
165                          CompactionJobStats* compaction_job_stats = nullptr);
166 
167   void UpdateCompactionInputStatsHelper(
168       int* num_files, uint64_t* bytes_read, int input_level);
169 
170   uint32_t job_id_;
171 
172   CompactionJobStats* compaction_job_stats_;
173 
174   // DBImpl state
175   const std::string& dbname_;
176   const std::string db_id_;
177   const std::string db_session_id_;
178   const FileOptions file_options_;
179 
180   Env* env_;
181   std::shared_ptr<IOTracer> io_tracer_;
182   FileSystemPtr fs_;
183   // env_option optimized for compaction table reads
184   FileOptions file_options_for_read_;
185   VersionSet* versions_;
186   const std::atomic<bool>* shutting_down_;
187   const std::atomic<int>* manual_compaction_paused_;
188   const std::atomic<bool>* manual_compaction_canceled_;
189   const SequenceNumber preserve_deletes_seqnum_;
190   FSDirectory* db_directory_;
191   FSDirectory* blob_output_directory_;
192   InstrumentedMutex* db_mutex_;
193   ErrorHandler* db_error_handler_;
194   // If there were two snapshots with seq numbers s1 and
195   // s2 and s1 < s2, and if we find two instances of a key k1 then lies
196   // entirely within s1 and s2, then the earlier version of k1 can be safely
197   // deleted because that version is not visible in any snapshot.
198   std::vector<SequenceNumber> existing_snapshots_;
199 
200   // This is the earliest snapshot that could be used for write-conflict
201   // checking by a transaction.  For any user-key newer than this snapshot, we
202   // should make sure not to remove evidence that a write occurred.
203   SequenceNumber earliest_write_conflict_snapshot_;
204 
205   const SnapshotChecker* const snapshot_checker_;
206 
207   std::shared_ptr<Cache> table_cache_;
208 
209   EventLogger* event_logger_;
210 
211   bool paranoid_file_checks_;
212   bool measure_io_stats_;
213   // Stores the Slices that designate the boundaries for each subcompaction
214   std::vector<Slice> boundaries_;
215   // Stores the approx size of keys covered in the range of each subcompaction
216   std::vector<uint64_t> sizes_;
217   Env::Priority thread_pri_;
218   std::string full_history_ts_low_;
219   BlobFileCompletionCallback* blob_callback_;
220 
221   uint64_t GetCompactionId(SubcompactionState* sub_compact);
222 
223   // Get table file name in where it's outputting to, which should also be in
224   // `output_directory_`.
225   virtual std::string GetTableFileName(uint64_t file_number);
226 };
227 
228 // CompactionServiceInput is used the pass compaction information between two
229 // db instances. It contains the information needed to do a compaction. It
230 // doesn't contain the LSM tree information, which is passed though MANIFEST
231 // file.
232 struct CompactionServiceInput {
233   ColumnFamilyDescriptor column_family;
234 
235   DBOptions db_options;
236 
237   std::vector<SequenceNumber> snapshots;
238 
239   // SST files for compaction, it should already be expended to include all the
240   // files needed for this compaction, for both input level files and output
241   // level files.
242   std::vector<std::string> input_files;
243   int output_level;
244 
245   // information for subcompaction
246   bool has_begin = false;
247   std::string begin;
248   bool has_end = false;
249   std::string end;
250   uint64_t approx_size = 0;
251 
252   // serialization interface to read and write the object
253   static Status Read(const std::string& data_str, CompactionServiceInput* obj);
254   Status Write(std::string* output);
255 
256   // Initialize a dummy ColumnFamilyDescriptor
CompactionServiceInputCompactionServiceInput257   CompactionServiceInput() : column_family("", ColumnFamilyOptions()) {}
258 
259 #ifndef NDEBUG
260   bool TEST_Equals(CompactionServiceInput* other);
261   bool TEST_Equals(CompactionServiceInput* other, std::string* mismatch);
262 #endif  // NDEBUG
263 };
264 
265 // CompactionServiceOutputFile is the metadata for the output SST file
266 struct CompactionServiceOutputFile {
267   std::string file_name;
268   SequenceNumber smallest_seqno;
269   SequenceNumber largest_seqno;
270   std::string smallest_internal_key;
271   std::string largest_internal_key;
272   uint64_t oldest_ancester_time;
273   uint64_t file_creation_time;
274   uint64_t paranoid_hash;
275   bool marked_for_compaction;
276 
277   CompactionServiceOutputFile() = default;
CompactionServiceOutputFileCompactionServiceOutputFile278   CompactionServiceOutputFile(
279       const std::string& name, SequenceNumber smallest, SequenceNumber largest,
280       std::string _smallest_internal_key, std::string _largest_internal_key,
281       uint64_t _oldest_ancester_time, uint64_t _file_creation_time,
282       uint64_t _paranoid_hash, bool _marked_for_compaction)
283       : file_name(name),
284         smallest_seqno(smallest),
285         largest_seqno(largest),
286         smallest_internal_key(std::move(_smallest_internal_key)),
287         largest_internal_key(std::move(_largest_internal_key)),
288         oldest_ancester_time(_oldest_ancester_time),
289         file_creation_time(_file_creation_time),
290         paranoid_hash(_paranoid_hash),
291         marked_for_compaction(_marked_for_compaction) {}
292 };
293 
294 // CompactionServiceResult contains the compaction result from a different db
295 // instance, with these information, the primary db instance with write
296 // permission is able to install the result to the DB.
297 struct CompactionServiceResult {
298   Status status;
299   std::vector<CompactionServiceOutputFile> output_files;
300   int output_level;
301 
302   // location of the output files
303   std::string output_path;
304 
305   // some statistics about the compaction
306   uint64_t num_output_records = 0;
307   uint64_t total_bytes = 0;
308   uint64_t bytes_read = 0;
309   uint64_t bytes_written = 0;
310   CompactionJobStats stats;
311 
312   // serialization interface to read and write the object
313   static Status Read(const std::string& data_str, CompactionServiceResult* obj);
314   Status Write(std::string* output);
315 
316 #ifndef NDEBUG
317   bool TEST_Equals(CompactionServiceResult* other);
318   bool TEST_Equals(CompactionServiceResult* other, std::string* mismatch);
319 #endif  // NDEBUG
320 };
321 
322 // CompactionServiceCompactionJob is an read-only compaction job, it takes
323 // input information from `compaction_service_input` and put result information
324 // in `compaction_service_result`, the SST files are generated to `output_path`.
325 class CompactionServiceCompactionJob : private CompactionJob {
326  public:
327   CompactionServiceCompactionJob(
328       int job_id, Compaction* compaction, const ImmutableDBOptions& db_options,
329       const MutableDBOptions& mutable_db_options,
330       const FileOptions& file_options, VersionSet* versions,
331       const std::atomic<bool>* shutting_down, LogBuffer* log_buffer,
332       FSDirectory* output_directory, Statistics* stats,
333       InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler,
334       std::vector<SequenceNumber> existing_snapshots,
335       std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
336       const std::string& dbname, const std::shared_ptr<IOTracer>& io_tracer,
337       const std::string& db_id, const std::string& db_session_id,
338       const std::string& output_path,
339       const CompactionServiceInput& compaction_service_input,
340       CompactionServiceResult* compaction_service_result);
341 
342   // Run the compaction in current thread and return the result
343   Status Run();
344 
345   void CleanupCompaction();
346 
io_status()347   IOStatus io_status() const { return CompactionJob::io_status(); }
348 
349  protected:
350   void RecordCompactionIOStats() override;
351 
352  private:
353   // Get table file name in output_path
354   std::string GetTableFileName(uint64_t file_number) override;
355   // Specific the compaction output path, otherwise it uses default DB path
356   const std::string output_path_;
357 
358   // Compaction job input
359   const CompactionServiceInput& compaction_input_;
360 
361   // Compaction job result
362   CompactionServiceResult* compaction_result_;
363 };
364 
365 }  // namespace ROCKSDB_NAMESPACE
366