1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #pragma once
10 
11 #include <atomic>
12 #include <deque>
13 #include <functional>
14 #include <limits>
15 #include <set>
16 #include <string>
17 #include <utility>
18 #include <vector>
19 
20 #include "db/column_family.h"
21 #include "db/compaction/compaction_iterator.h"
22 #include "db/dbformat.h"
23 #include "db/flush_scheduler.h"
24 #include "db/internal_stats.h"
25 #include "db/job_context.h"
26 #include "db/log_writer.h"
27 #include "db/memtable_list.h"
28 #include "db/range_del_aggregator.h"
29 #include "db/version_edit.h"
30 #include "db/write_controller.h"
31 #include "db/write_thread.h"
32 #include "logging/event_logger.h"
33 #include "options/cf_options.h"
34 #include "options/db_options.h"
35 #include "port/port.h"
36 #include "rocksdb/compaction_filter.h"
37 #include "rocksdb/compaction_job_stats.h"
38 #include "rocksdb/db.h"
39 #include "rocksdb/env.h"
40 #include "rocksdb/memtablerep.h"
41 #include "rocksdb/transaction_log.h"
42 #include "table/scoped_arena_iterator.h"
43 #include "util/autovector.h"
44 #include "util/stop_watch.h"
45 #include "util/thread_local.h"
46 
47 namespace ROCKSDB_NAMESPACE {
48 
49 class Arena;
50 class ErrorHandler;
51 class MemTable;
52 class SnapshotChecker;
53 class TableCache;
54 class Version;
55 class VersionEdit;
56 class VersionSet;
57 
58 // CompactionJob is responsible for executing the compaction. Each (manual or
59 // automated) compaction corresponds to a CompactionJob object, and usually
60 // goes through the stages of `Prepare()`->`Run()`->`Install()`. CompactionJob
61 // will divide the compaction into subcompactions and execute them in parallel
62 // if needed.
63 class CompactionJob {
64  public:
65   CompactionJob(int job_id, Compaction* compaction,
66                 const ImmutableDBOptions& db_options,
67                 const FileOptions& file_options, VersionSet* versions,
68                 const std::atomic<bool>* shutting_down,
69                 const SequenceNumber preserve_deletes_seqnum,
70                 LogBuffer* log_buffer, Directory* db_directory,
71                 Directory* output_directory, Statistics* stats,
72                 InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler,
73                 std::vector<SequenceNumber> existing_snapshots,
74                 SequenceNumber earliest_write_conflict_snapshot,
75                 const SnapshotChecker* snapshot_checker,
76                 std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
77                 bool paranoid_file_checks, bool measure_io_stats,
78                 const std::string& dbname,
79                 CompactionJobStats* compaction_job_stats,
80                 Env::Priority thread_pri,
81                 const std::atomic<bool>* manual_compaction_paused = nullptr);
82 
83   ~CompactionJob();
84 
85   // no copy/move
86   CompactionJob(CompactionJob&& job) = delete;
87   CompactionJob(const CompactionJob& job) = delete;
88   CompactionJob& operator=(const CompactionJob& job) = delete;
89 
90   // REQUIRED: mutex held
91   // Prepare for the compaction by setting up boundaries for each subcompaction
92   void Prepare();
93   // REQUIRED mutex not held
94   // Launch threads for each subcompaction and wait for them to finish. After
95   // that, verify table is usable and finally do bookkeeping to unify
96   // subcompaction results
97   Status Run();
98 
99   // REQUIRED: mutex held
100   // Add compaction input/output to the current version
101   Status Install(const MutableCFOptions& mutable_cf_options);
102 
103  private:
104   struct SubcompactionState;
105 
106   void AggregateStatistics();
107 
108   // Generates a histogram representing potential divisions of key ranges from
109   // the input. It adds the starting and/or ending keys of certain input files
110   // to the working set and then finds the approximate size of data in between
111   // each consecutive pair of slices. Then it divides these ranges into
112   // consecutive groups such that each group has a similar size.
113   void GenSubcompactionBoundaries();
114 
115   // update the thread status for starting a compaction.
116   void ReportStartedCompaction(Compaction* compaction);
117   void AllocateCompactionOutputFileNumbers();
118   // Call compaction filter. Then iterate through input and compact the
119   // kv-pairs
120   void ProcessKeyValueCompaction(SubcompactionState* sub_compact);
121 
122   Status FinishCompactionOutputFile(
123       const Status& input_status, SubcompactionState* sub_compact,
124       CompactionRangeDelAggregator* range_del_agg,
125       CompactionIterationStats* range_del_out_stats,
126       const Slice* next_table_min_key = nullptr);
127   Status InstallCompactionResults(const MutableCFOptions& mutable_cf_options);
128   void RecordCompactionIOStats();
129   Status OpenCompactionOutputFile(SubcompactionState* sub_compact);
130   void CleanupCompaction();
131   void UpdateCompactionJobStats(
132     const InternalStats::CompactionStats& stats) const;
133   void RecordDroppedKeys(const CompactionIterationStats& c_iter_stats,
134                          CompactionJobStats* compaction_job_stats = nullptr);
135 
136   void UpdateCompactionStats();
137   void UpdateCompactionInputStatsHelper(
138       int* num_files, uint64_t* bytes_read, int input_level);
139 
140   void LogCompaction();
141 
142   int job_id_;
143 
144   // CompactionJob state
145   struct CompactionState;
146   CompactionState* compact_;
147   CompactionJobStats* compaction_job_stats_;
148   InternalStats::CompactionStats compaction_stats_;
149 
150   // DBImpl state
151   const std::string& dbname_;
152   const ImmutableDBOptions& db_options_;
153   const FileOptions file_options_;
154 
155   Env* env_;
156   FileSystem* fs_;
157   // env_option optimized for compaction table reads
158   FileOptions file_options_for_read_;
159   VersionSet* versions_;
160   const std::atomic<bool>* shutting_down_;
161   const std::atomic<bool>* manual_compaction_paused_;
162   const SequenceNumber preserve_deletes_seqnum_;
163   LogBuffer* log_buffer_;
164   Directory* db_directory_;
165   Directory* output_directory_;
166   Statistics* stats_;
167   InstrumentedMutex* db_mutex_;
168   ErrorHandler* db_error_handler_;
169   // If there were two snapshots with seq numbers s1 and
170   // s2 and s1 < s2, and if we find two instances of a key k1 then lies
171   // entirely within s1 and s2, then the earlier version of k1 can be safely
172   // deleted because that version is not visible in any snapshot.
173   std::vector<SequenceNumber> existing_snapshots_;
174 
175   // This is the earliest snapshot that could be used for write-conflict
176   // checking by a transaction.  For any user-key newer than this snapshot, we
177   // should make sure not to remove evidence that a write occurred.
178   SequenceNumber earliest_write_conflict_snapshot_;
179 
180   const SnapshotChecker* const snapshot_checker_;
181 
182   std::shared_ptr<Cache> table_cache_;
183 
184   EventLogger* event_logger_;
185 
186   // Is this compaction creating a file in the bottom most level?
187   bool bottommost_level_;
188   bool paranoid_file_checks_;
189   bool measure_io_stats_;
190   // Stores the Slices that designate the boundaries for each subcompaction
191   std::vector<Slice> boundaries_;
192   // Stores the approx size of keys covered in the range of each subcompaction
193   std::vector<uint64_t> sizes_;
194   Env::WriteLifeTimeHint write_hint_;
195   Env::Priority thread_pri_;
196 };
197 
198 }  // namespace ROCKSDB_NAMESPACE
199