1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 #pragma once
7 
8 #include <deque>
9 #include <string>
10 #include <vector>
11 
12 #include "db/dbformat.h"
13 #include "db/merge_context.h"
14 #include "db/range_del_aggregator.h"
15 #include "db/snapshot_checker.h"
16 #include "rocksdb/compaction_filter.h"
17 #include "rocksdb/env.h"
18 #include "rocksdb/slice.h"
19 #include "util/stop_watch.h"
20 
21 namespace ROCKSDB_NAMESPACE {
22 
23 class Comparator;
24 class Iterator;
25 class Logger;
26 class MergeOperator;
27 class Statistics;
28 
29 class MergeHelper {
30  public:
31   MergeHelper(Env* env, const Comparator* user_comparator,
32               const MergeOperator* user_merge_operator,
33               const CompactionFilter* compaction_filter, Logger* logger,
34               bool assert_valid_internal_key, SequenceNumber latest_snapshot,
35               const SnapshotChecker* snapshot_checker = nullptr, int level = 0,
36               Statistics* stats = nullptr,
37               const std::atomic<bool>* shutting_down = nullptr);
38 
39   // Wrapper around MergeOperator::FullMergeV2() that records perf statistics.
40   // Result of merge will be written to result if status returned is OK.
41   // If operands is empty, the value will simply be copied to result.
42   // Set `update_num_ops_stats` to true if it is from a user read, so that
43   // the latency is sensitive.
44   // Returns one of the following statuses:
45   // - OK: Entries were successfully merged.
46   // - Corruption: Merge operator reported unsuccessful merge.
47   static Status TimedFullMerge(const MergeOperator* merge_operator,
48                                const Slice& key, const Slice* value,
49                                const std::vector<Slice>& operands,
50                                std::string* result, Logger* logger,
51                                Statistics* statistics, Env* env,
52                                Slice* result_operand = nullptr,
53                                bool update_num_ops_stats = false);
54 
55   // Merge entries until we hit
56   //     - a corrupted key
57   //     - a Put/Delete,
58   //     - a different user key,
59   //     - a specific sequence number (snapshot boundary),
60   //     - REMOVE_AND_SKIP_UNTIL returned from compaction filter,
61   //  or - the end of iteration
62   // iter: (IN)  points to the first merge type entry
63   //       (OUT) points to the first entry not included in the merge process
64   // range_del_agg: (IN) filters merge operands covered by range tombstones.
65   // stop_before: (IN) a sequence number that merge should not cross.
66   //                   0 means no restriction
67   // at_bottom:   (IN) true if the iterator covers the bottem level, which means
68   //                   we could reach the start of the history of this user key.
69   //
70   // Returns one of the following statuses:
71   // - OK: Entries were successfully merged.
72   // - MergeInProgress: Put/Delete not encountered, and didn't reach the start
73   //   of key's history. Output consists of merge operands only.
74   // - Corruption: Merge operator reported unsuccessful merge or a corrupted
75   //   key has been encountered and not expected (applies only when compiling
76   //   with asserts removed).
77   // - ShutdownInProgress: interrupted by shutdown (*shutting_down == true).
78   //
79   // REQUIRED: The first key in the input is not corrupted.
80   Status MergeUntil(InternalIterator* iter,
81                     CompactionRangeDelAggregator* range_del_agg = nullptr,
82                     const SequenceNumber stop_before = 0,
83                     const bool at_bottom = false);
84 
85   // Filters a merge operand using the compaction filter specified
86   // in the constructor. Returns the decision that the filter made.
87   // Uses compaction_filter_value_ and compaction_filter_skip_until_ for the
88   // optional outputs of compaction filter.
89   CompactionFilter::Decision FilterMerge(const Slice& user_key,
90                                          const Slice& value_slice);
91 
92   // Query the merge result
93   // These are valid until the next MergeUntil call
94   // If the merging was successful:
95   //   - keys() contains a single element with the latest sequence number of
96   //     the merges. The type will be Put or Merge. See IMPORTANT 1 note, below.
97   //   - values() contains a single element with the result of merging all the
98   //     operands together
99   //
100   //   IMPORTANT 1: the key type could change after the MergeUntil call.
101   //        Put/Delete + Merge + ... + Merge => Put
102   //        Merge + ... + Merge => Merge
103   //
104   // If the merge operator is not associative, and if a Put/Delete is not found
105   // then the merging will be unsuccessful. In this case:
106   //   - keys() contains the list of internal keys seen in order of iteration.
107   //   - values() contains the list of values (merges) seen in the same order.
108   //              values() is parallel to keys() so that the first entry in
109   //              keys() is the key associated with the first entry in values()
110   //              and so on. These lists will be the same length.
111   //              All of these pairs will be merges over the same user key.
112   //              See IMPORTANT 2 note below.
113   //
114   //   IMPORTANT 2: The entries were traversed in order from BACK to FRONT.
115   //                So keys().back() was the first key seen by iterator.
116   // TODO: Re-style this comment to be like the first one
keys()117   const std::deque<std::string>& keys() const { return keys_; }
values()118   const std::vector<Slice>& values() const {
119     return merge_context_.GetOperands();
120   }
TotalFilterTime()121   uint64_t TotalFilterTime() const { return total_filter_time_; }
HasOperator()122   bool HasOperator() const { return user_merge_operator_ != nullptr; }
123 
124   // If compaction filter returned REMOVE_AND_SKIP_UNTIL, this method will
125   // return true and fill *until with the key to which we should skip.
126   // If true, keys() and values() are empty.
FilteredUntil(Slice * skip_until)127   bool FilteredUntil(Slice* skip_until) const {
128     if (!has_compaction_filter_skip_until_) {
129       return false;
130     }
131     assert(compaction_filter_ != nullptr);
132     assert(skip_until != nullptr);
133     assert(compaction_filter_skip_until_.Valid());
134     *skip_until = compaction_filter_skip_until_.Encode();
135     return true;
136   }
137 
138  private:
139   Env* env_;
140   const Comparator* user_comparator_;
141   const MergeOperator* user_merge_operator_;
142   const CompactionFilter* compaction_filter_;
143   const std::atomic<bool>* shutting_down_;
144   Logger* logger_;
145   bool assert_valid_internal_key_; // enforce no internal key corruption?
146   bool allow_single_operand_;
147   SequenceNumber latest_snapshot_;
148   const SnapshotChecker* const snapshot_checker_;
149   int level_;
150 
151   // the scratch area that holds the result of MergeUntil
152   // valid up to the next MergeUntil call
153 
154   // Keeps track of the sequence of keys seen
155   std::deque<std::string> keys_;
156   // Parallel with keys_; stores the operands
157   mutable MergeContext merge_context_;
158 
159   StopWatchNano filter_timer_;
160   uint64_t total_filter_time_;
161   Statistics* stats_;
162 
163   bool has_compaction_filter_skip_until_ = false;
164   std::string compaction_filter_value_;
165   InternalKey compaction_filter_skip_until_;
166 
IsShuttingDown()167   bool IsShuttingDown() {
168     // This is a best-effort facility, so memory_order_relaxed is sufficient.
169     return shutting_down_ && shutting_down_->load(std::memory_order_relaxed);
170   }
171 };
172 
173 // MergeOutputIterator can be used to iterate over the result of a merge.
174 class MergeOutputIterator {
175  public:
176   // The MergeOutputIterator is bound to a MergeHelper instance.
177   explicit MergeOutputIterator(const MergeHelper* merge_helper);
178 
179   // Seeks to the first record in the output.
180   void SeekToFirst();
181   // Advances to the next record in the output.
182   void Next();
183 
key()184   Slice key() { return Slice(*it_keys_); }
value()185   Slice value() { return Slice(*it_values_); }
Valid()186   bool Valid() { return it_keys_ != merge_helper_->keys().rend(); }
187 
188  private:
189   const MergeHelper* merge_helper_;
190   std::deque<std::string>::const_reverse_iterator it_keys_;
191   std::vector<Slice>::const_reverse_iterator it_values_;
192 };
193 
194 }  // namespace ROCKSDB_NAMESPACE
195