1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 #pragma once
6 
7 #include <algorithm>
8 #include <cinttypes>
9 #include <deque>
10 #include <string>
11 #include <unordered_set>
12 #include <vector>
13 
14 #include "db/compaction/compaction.h"
15 #include "db/compaction/compaction_iteration_stats.h"
16 #include "db/merge_helper.h"
17 #include "db/pinned_iterators_manager.h"
18 #include "db/range_del_aggregator.h"
19 #include "db/snapshot_checker.h"
20 #include "options/cf_options.h"
21 #include "rocksdb/compaction_filter.h"
22 
23 namespace ROCKSDB_NAMESPACE {
24 
25 class BlobFileBuilder;
26 
27 // A wrapper of internal iterator whose purpose is to count how
28 // many entries there are in the iterator.
29 class SequenceIterWrapper : public InternalIterator {
30  public:
SequenceIterWrapper(InternalIterator * iter,const Comparator * cmp,bool need_count_entries)31   SequenceIterWrapper(InternalIterator* iter, const Comparator* cmp,
32                       bool need_count_entries)
33       : icmp_(cmp, /*named=*/false),
34         inner_iter_(iter),
35         need_count_entries_(need_count_entries) {}
Valid()36   bool Valid() const override { return inner_iter_->Valid(); }
status()37   Status status() const override { return inner_iter_->status(); }
Next()38   void Next() override {
39     num_itered_++;
40     inner_iter_->Next();
41   }
Seek(const Slice & target)42   void Seek(const Slice& target) override {
43     if (!need_count_entries_) {
44       inner_iter_->Seek(target);
45     } else {
46       // For flush cases, we need to count total number of entries, so we
47       // do Next() rather than Seek().
48       while (inner_iter_->Valid() &&
49              icmp_.Compare(inner_iter_->key(), target) < 0) {
50         Next();
51       }
52     }
53   }
key()54   Slice key() const override { return inner_iter_->key(); }
value()55   Slice value() const override { return inner_iter_->value(); }
56 
57   // Unused InternalIterator methods
SeekToFirst()58   void SeekToFirst() override { assert(false); }
Prev()59   void Prev() override { assert(false); }
SeekForPrev(const Slice &)60   void SeekForPrev(const Slice& /* target */) override { assert(false); }
SeekToLast()61   void SeekToLast() override { assert(false); }
62 
num_itered()63   uint64_t num_itered() const { return num_itered_; }
64 
65  private:
66   InternalKeyComparator icmp_;
67   InternalIterator* inner_iter_;  // not owned
68   uint64_t num_itered_ = 0;
69   bool need_count_entries_;
70 };
71 
72 class CompactionIterator {
73  public:
74   // A wrapper around Compaction. Has a much smaller interface, only what
75   // CompactionIterator uses. Tests can override it.
76   class CompactionProxy {
77    public:
78     virtual ~CompactionProxy() = default;
79 
80     virtual int level() const = 0;
81 
82     virtual bool KeyNotExistsBeyondOutputLevel(
83         const Slice& user_key, std::vector<size_t>* level_ptrs) const = 0;
84 
85     virtual bool bottommost_level() const = 0;
86 
87     virtual int number_levels() const = 0;
88 
89     virtual Slice GetLargestUserKey() const = 0;
90 
91     virtual bool allow_ingest_behind() const = 0;
92 
93     virtual bool preserve_deletes() const = 0;
94 
95     virtual bool enable_blob_garbage_collection() const = 0;
96 
97     virtual double blob_garbage_collection_age_cutoff() const = 0;
98 
99     virtual Version* input_version() const = 0;
100   };
101 
102   class RealCompaction : public CompactionProxy {
103    public:
RealCompaction(const Compaction * compaction)104     explicit RealCompaction(const Compaction* compaction)
105         : compaction_(compaction) {
106       assert(compaction_);
107       assert(compaction_->immutable_cf_options());
108       assert(compaction_->mutable_cf_options());
109     }
110 
level()111     int level() const override { return compaction_->level(); }
112 
KeyNotExistsBeyondOutputLevel(const Slice & user_key,std::vector<size_t> * level_ptrs)113     bool KeyNotExistsBeyondOutputLevel(
114         const Slice& user_key, std::vector<size_t>* level_ptrs) const override {
115       return compaction_->KeyNotExistsBeyondOutputLevel(user_key, level_ptrs);
116     }
117 
bottommost_level()118     bool bottommost_level() const override {
119       return compaction_->bottommost_level();
120     }
121 
number_levels()122     int number_levels() const override { return compaction_->number_levels(); }
123 
GetLargestUserKey()124     Slice GetLargestUserKey() const override {
125       return compaction_->GetLargestUserKey();
126     }
127 
allow_ingest_behind()128     bool allow_ingest_behind() const override {
129       return compaction_->immutable_cf_options()->allow_ingest_behind;
130     }
131 
preserve_deletes()132     bool preserve_deletes() const override {
133       return compaction_->immutable_cf_options()->preserve_deletes;
134     }
135 
enable_blob_garbage_collection()136     bool enable_blob_garbage_collection() const override {
137       return compaction_->mutable_cf_options()->enable_blob_garbage_collection;
138     }
139 
blob_garbage_collection_age_cutoff()140     double blob_garbage_collection_age_cutoff() const override {
141       return compaction_->mutable_cf_options()
142           ->blob_garbage_collection_age_cutoff;
143     }
144 
input_version()145     Version* input_version() const override {
146       return compaction_->input_version();
147     }
148 
149    private:
150     const Compaction* compaction_;
151   };
152 
153   CompactionIterator(InternalIterator* input, const Comparator* cmp,
154                      MergeHelper* merge_helper, SequenceNumber last_sequence,
155                      std::vector<SequenceNumber>* snapshots,
156                      SequenceNumber earliest_write_conflict_snapshot,
157                      const SnapshotChecker* snapshot_checker, Env* env,
158                      bool report_detailed_time, bool expect_valid_internal_key,
159                      CompactionRangeDelAggregator* range_del_agg,
160                      BlobFileBuilder* blob_file_builder,
161                      bool allow_data_in_errors,
162                      const Compaction* compaction = nullptr,
163                      const CompactionFilter* compaction_filter = nullptr,
164                      const std::atomic<bool>* shutting_down = nullptr,
165                      const SequenceNumber preserve_deletes_seqnum = 0,
166                      const std::atomic<int>* manual_compaction_paused = nullptr,
167                      const std::shared_ptr<Logger> info_log = nullptr,
168                      const std::string* full_history_ts_low = nullptr);
169 
170   // Constructor with custom CompactionProxy, used for tests.
171   CompactionIterator(InternalIterator* input, const Comparator* cmp,
172                      MergeHelper* merge_helper, SequenceNumber last_sequence,
173                      std::vector<SequenceNumber>* snapshots,
174                      SequenceNumber earliest_write_conflict_snapshot,
175                      const SnapshotChecker* snapshot_checker, Env* env,
176                      bool report_detailed_time, bool expect_valid_internal_key,
177                      CompactionRangeDelAggregator* range_del_agg,
178                      BlobFileBuilder* blob_file_builder,
179                      bool allow_data_in_errors,
180                      std::unique_ptr<CompactionProxy> compaction,
181                      const CompactionFilter* compaction_filter = nullptr,
182                      const std::atomic<bool>* shutting_down = nullptr,
183                      const SequenceNumber preserve_deletes_seqnum = 0,
184                      const std::atomic<int>* manual_compaction_paused = nullptr,
185                      const std::shared_ptr<Logger> info_log = nullptr,
186                      const std::string* full_history_ts_low = nullptr);
187 
188   ~CompactionIterator();
189 
190   void ResetRecordCounts();
191 
192   // Seek to the beginning of the compaction iterator output.
193   //
194   // REQUIRED: Call only once.
195   void SeekToFirst();
196 
197   // Produces the next record in the compaction.
198   //
199   // REQUIRED: SeekToFirst() has been called.
200   void Next();
201 
202   // Getters
key()203   const Slice& key() const { return key_; }
value()204   const Slice& value() const { return value_; }
status()205   const Status& status() const { return status_; }
ikey()206   const ParsedInternalKey& ikey() const { return ikey_; }
Valid()207   bool Valid() const { return valid_; }
user_key()208   const Slice& user_key() const { return current_user_key_; }
iter_stats()209   const CompactionIterationStats& iter_stats() const { return iter_stats_; }
num_input_entry_scanned()210   uint64_t num_input_entry_scanned() const { return input_.num_itered(); }
211 
212  private:
213   // Processes the input stream to find the next output
214   void NextFromInput();
215 
216   // Do final preparations before presenting the output to the callee.
217   void PrepareOutput();
218 
219   // Passes the output value to the blob file builder (if any), and replaces it
220   // with the corresponding blob reference if it has been actually written to a
221   // blob file (i.e. if it passed the value size check). Returns true if the
222   // value got extracted to a blob file, false otherwise.
223   bool ExtractLargeValueIfNeededImpl();
224 
225   // Extracts large values as described above, and updates the internal key's
226   // type to kTypeBlobIndex if the value got extracted. Should only be called
227   // for regular values (kTypeValue).
228   void ExtractLargeValueIfNeeded();
229 
230   // Relocates valid blobs residing in the oldest blob files if garbage
231   // collection is enabled. Relocated blobs are written to new blob files or
232   // inlined in the LSM tree depending on the current settings (i.e.
233   // enable_blob_files and min_blob_size). Should only be called for blob
234   // references (kTypeBlobIndex).
235   //
236   // Note: the stacked BlobDB implementation's compaction filter based GC
237   // algorithm is also called from here.
238   void GarbageCollectBlobIfNeeded();
239 
240   // Invoke compaction filter if needed.
241   // Return true on success, false on failures (e.g.: kIOError).
242   bool InvokeFilterIfNeeded(bool* need_skip, Slice* skip_until);
243 
244   // Given a sequence number, return the sequence number of the
245   // earliest snapshot that this sequence number is visible in.
246   // The snapshots themselves are arranged in ascending order of
247   // sequence numbers.
248   // Employ a sequential search because the total number of
249   // snapshots are typically small.
250   inline SequenceNumber findEarliestVisibleSnapshot(
251       SequenceNumber in, SequenceNumber* prev_snapshot);
252 
253   // Checks whether the currently seen ikey_ is needed for
254   // incremental (differential) snapshot and hence can't be dropped
255   // or seqnum be zero-ed out even if all other conditions for it are met.
256   inline bool ikeyNotNeededForIncrementalSnapshot();
257 
KeyCommitted(SequenceNumber sequence)258   inline bool KeyCommitted(SequenceNumber sequence) {
259     return snapshot_checker_ == nullptr ||
260            snapshot_checker_->CheckInSnapshot(sequence, kMaxSequenceNumber) ==
261                SnapshotCheckerResult::kInSnapshot;
262   }
263 
264   bool IsInEarliestSnapshot(SequenceNumber sequence);
265 
266   // Extract user-defined timestamp from user key if possible and compare it
267   // with *full_history_ts_low_ if applicable.
UpdateTimestampAndCompareWithFullHistoryLow()268   inline void UpdateTimestampAndCompareWithFullHistoryLow() {
269     if (!timestamp_size_) {
270       return;
271     }
272     Slice ts = ExtractTimestampFromUserKey(ikey_.user_key, timestamp_size_);
273     curr_ts_.assign(ts.data(), ts.size());
274     if (full_history_ts_low_) {
275       cmp_with_history_ts_low_ =
276           cmp_->CompareTimestamp(ts, *full_history_ts_low_);
277     }
278   }
279 
280   static uint64_t ComputeBlobGarbageCollectionCutoffFileNumber(
281       const CompactionProxy* compaction);
282 
283   SequenceIterWrapper input_;
284   const Comparator* cmp_;
285   MergeHelper* merge_helper_;
286   const std::vector<SequenceNumber>* snapshots_;
287   // List of snapshots released during compaction.
288   // findEarliestVisibleSnapshot() find them out from return of
289   // snapshot_checker, and make sure they will not be returned as
290   // earliest visible snapshot of an older value.
291   // See WritePreparedTransactionTest::ReleaseSnapshotDuringCompaction3.
292   std::unordered_set<SequenceNumber> released_snapshots_;
293   std::vector<SequenceNumber>::const_iterator earliest_snapshot_iter_;
294   const SequenceNumber earliest_write_conflict_snapshot_;
295   const SnapshotChecker* const snapshot_checker_;
296   Env* env_;
297   SystemClock* clock_;
298   bool report_detailed_time_;
299   bool expect_valid_internal_key_;
300   CompactionRangeDelAggregator* range_del_agg_;
301   BlobFileBuilder* blob_file_builder_;
302   std::unique_ptr<CompactionProxy> compaction_;
303   const CompactionFilter* compaction_filter_;
304   const std::atomic<bool>* shutting_down_;
305   const std::atomic<int>* manual_compaction_paused_;
306   const SequenceNumber preserve_deletes_seqnum_;
307   bool bottommost_level_;
308   bool valid_ = false;
309   bool visible_at_tip_;
310   SequenceNumber earliest_snapshot_;
311   SequenceNumber latest_snapshot_;
312 
313   std::shared_ptr<Logger> info_log_;
314 
315   bool allow_data_in_errors_;
316 
317   // Comes from comparator.
318   const size_t timestamp_size_;
319 
320   // Lower bound timestamp to retain full history in terms of user-defined
321   // timestamp. If a key's timestamp is older than full_history_ts_low_, then
322   // the key *may* be eligible for garbage collection (GC). The skipping logic
323   // is in `NextFromInput()` and `PrepareOutput()`.
324   // If nullptr, NO GC will be performed and all history will be preserved.
325   const std::string* const full_history_ts_low_;
326 
327   // State
328   //
329   // Points to a copy of the current compaction iterator output (current_key_)
330   // if valid_.
331   Slice key_;
332   // Points to the value in the underlying iterator that corresponds to the
333   // current output.
334   Slice value_;
335   // The status is OK unless compaction iterator encounters a merge operand
336   // while not having a merge operator defined.
337   Status status_;
338   // Stores the user key, sequence number and type of the current compaction
339   // iterator output (or current key in the underlying iterator during
340   // NextFromInput()).
341   ParsedInternalKey ikey_;
342   // Stores whether ikey_.user_key is valid. If set to false, the user key is
343   // not compared against the current key in the underlying iterator.
344   bool has_current_user_key_ = false;
345   // If false, the iterator holds a copy of the current compaction iterator
346   // output (or current key in the underlying iterator during NextFromInput()).
347   bool at_next_ = false;
348 
349   IterKey current_key_;
350   Slice current_user_key_;
351   std::string curr_ts_;
352   SequenceNumber current_user_key_sequence_;
353   SequenceNumber current_user_key_snapshot_;
354 
355   // True if the iterator has already returned a record for the current key.
356   bool has_outputted_key_ = false;
357 
358   // truncated the value of the next key and output it without applying any
359   // compaction rules.  This is used for outputting a put after a single delete.
360   bool clear_and_output_next_key_ = false;
361 
362   MergeOutputIterator merge_out_iter_;
363   // PinnedIteratorsManager used to pin input_ Iterator blocks while reading
364   // merge operands and then releasing them after consuming them.
365   PinnedIteratorsManager pinned_iters_mgr_;
366 
367   uint64_t blob_garbage_collection_cutoff_file_number_;
368 
369   std::string blob_index_;
370   PinnableSlice blob_value_;
371   std::string compaction_filter_value_;
372   InternalKey compaction_filter_skip_until_;
373   // "level_ptrs" holds indices that remember which file of an associated
374   // level we were last checking during the last call to compaction->
375   // KeyNotExistsBeyondOutputLevel(). This allows future calls to the function
376   // to pick off where it left off since each subcompaction's key range is
377   // increasing so a later call to the function must be looking for a key that
378   // is in or beyond the last file checked during the previous call
379   std::vector<size_t> level_ptrs_;
380   CompactionIterationStats iter_stats_;
381 
382   // Used to avoid purging uncommitted values. The application can specify
383   // uncommitted values by providing a SnapshotChecker object.
384   bool current_key_committed_;
385 
386   // Saved result of ucmp->CompareTimestamp(current_ts_, *full_history_ts_low_)
387   int cmp_with_history_ts_low_;
388 
389   const int level_;
390 
AdvanceInputIter()391   void AdvanceInputIter() { input_.Next(); }
392 
SkipUntil(const Slice & skip_until)393   void SkipUntil(const Slice& skip_until) { input_.Seek(skip_until); }
394 
IsShuttingDown()395   bool IsShuttingDown() {
396     // This is a best-effort facility, so memory_order_relaxed is sufficient.
397     return shutting_down_ && shutting_down_->load(std::memory_order_relaxed);
398   }
399 
IsPausingManualCompaction()400   bool IsPausingManualCompaction() {
401     // This is a best-effort facility, so memory_order_relaxed is sufficient.
402     return manual_compaction_paused_ &&
403            manual_compaction_paused_->load(std::memory_order_relaxed) > 0;
404   }
405 };
406 }  // namespace ROCKSDB_NAMESPACE
407