1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. 2 // This source code is licensed under both the GPLv2 (found in the 3 // COPYING file in the root directory) and Apache 2.0 License 4 // (found in the LICENSE.Apache file in the root directory). 5 // 6 #pragma once 7 8 #include <deque> 9 #include <string> 10 #include <vector> 11 12 #include "db/dbformat.h" 13 #include "db/merge_context.h" 14 #include "db/range_del_aggregator.h" 15 #include "db/snapshot_checker.h" 16 #include "rocksdb/compaction_filter.h" 17 #include "rocksdb/env.h" 18 #include "rocksdb/slice.h" 19 #include "util/stop_watch.h" 20 21 namespace ROCKSDB_NAMESPACE { 22 23 class Comparator; 24 class Iterator; 25 class Logger; 26 class MergeOperator; 27 class Statistics; 28 29 class MergeHelper { 30 public: 31 MergeHelper(Env* env, const Comparator* user_comparator, 32 const MergeOperator* user_merge_operator, 33 const CompactionFilter* compaction_filter, Logger* logger, 34 bool assert_valid_internal_key, SequenceNumber latest_snapshot, 35 const SnapshotChecker* snapshot_checker = nullptr, int level = 0, 36 Statistics* stats = nullptr, 37 const std::atomic<bool>* shutting_down = nullptr); 38 39 // Wrapper around MergeOperator::FullMergeV2() that records perf statistics. 40 // Result of merge will be written to result if status returned is OK. 41 // If operands is empty, the value will simply be copied to result. 42 // Set `update_num_ops_stats` to true if it is from a user read, so that 43 // the latency is sensitive. 44 // Returns one of the following statuses: 45 // - OK: Entries were successfully merged. 46 // - Corruption: Merge operator reported unsuccessful merge. 47 static Status TimedFullMerge(const MergeOperator* merge_operator, 48 const Slice& key, const Slice* value, 49 const std::vector<Slice>& operands, 50 std::string* result, Logger* logger, 51 Statistics* statistics, Env* env, 52 Slice* result_operand = nullptr, 53 bool update_num_ops_stats = false); 54 55 // Merge entries until we hit 56 // - a corrupted key 57 // - a Put/Delete, 58 // - a different user key, 59 // - a specific sequence number (snapshot boundary), 60 // - REMOVE_AND_SKIP_UNTIL returned from compaction filter, 61 // or - the end of iteration 62 // iter: (IN) points to the first merge type entry 63 // (OUT) points to the first entry not included in the merge process 64 // range_del_agg: (IN) filters merge operands covered by range tombstones. 65 // stop_before: (IN) a sequence number that merge should not cross. 66 // 0 means no restriction 67 // at_bottom: (IN) true if the iterator covers the bottem level, which means 68 // we could reach the start of the history of this user key. 69 // 70 // Returns one of the following statuses: 71 // - OK: Entries were successfully merged. 72 // - MergeInProgress: Put/Delete not encountered, and didn't reach the start 73 // of key's history. Output consists of merge operands only. 74 // - Corruption: Merge operator reported unsuccessful merge or a corrupted 75 // key has been encountered and not expected (applies only when compiling 76 // with asserts removed). 77 // - ShutdownInProgress: interrupted by shutdown (*shutting_down == true). 78 // 79 // REQUIRED: The first key in the input is not corrupted. 80 Status MergeUntil(InternalIterator* iter, 81 CompactionRangeDelAggregator* range_del_agg = nullptr, 82 const SequenceNumber stop_before = 0, 83 const bool at_bottom = false); 84 85 // Filters a merge operand using the compaction filter specified 86 // in the constructor. Returns the decision that the filter made. 87 // Uses compaction_filter_value_ and compaction_filter_skip_until_ for the 88 // optional outputs of compaction filter. 89 CompactionFilter::Decision FilterMerge(const Slice& user_key, 90 const Slice& value_slice); 91 92 // Query the merge result 93 // These are valid until the next MergeUntil call 94 // If the merging was successful: 95 // - keys() contains a single element with the latest sequence number of 96 // the merges. The type will be Put or Merge. See IMPORTANT 1 note, below. 97 // - values() contains a single element with the result of merging all the 98 // operands together 99 // 100 // IMPORTANT 1: the key type could change after the MergeUntil call. 101 // Put/Delete + Merge + ... + Merge => Put 102 // Merge + ... + Merge => Merge 103 // 104 // If the merge operator is not associative, and if a Put/Delete is not found 105 // then the merging will be unsuccessful. In this case: 106 // - keys() contains the list of internal keys seen in order of iteration. 107 // - values() contains the list of values (merges) seen in the same order. 108 // values() is parallel to keys() so that the first entry in 109 // keys() is the key associated with the first entry in values() 110 // and so on. These lists will be the same length. 111 // All of these pairs will be merges over the same user key. 112 // See IMPORTANT 2 note below. 113 // 114 // IMPORTANT 2: The entries were traversed in order from BACK to FRONT. 115 // So keys().back() was the first key seen by iterator. 116 // TODO: Re-style this comment to be like the first one keys()117 const std::deque<std::string>& keys() const { return keys_; } values()118 const std::vector<Slice>& values() const { 119 return merge_context_.GetOperands(); 120 } TotalFilterTime()121 uint64_t TotalFilterTime() const { return total_filter_time_; } HasOperator()122 bool HasOperator() const { return user_merge_operator_ != nullptr; } 123 124 // If compaction filter returned REMOVE_AND_SKIP_UNTIL, this method will 125 // return true and fill *until with the key to which we should skip. 126 // If true, keys() and values() are empty. FilteredUntil(Slice * skip_until)127 bool FilteredUntil(Slice* skip_until) const { 128 if (!has_compaction_filter_skip_until_) { 129 return false; 130 } 131 assert(compaction_filter_ != nullptr); 132 assert(skip_until != nullptr); 133 assert(compaction_filter_skip_until_.Valid()); 134 *skip_until = compaction_filter_skip_until_.Encode(); 135 return true; 136 } 137 138 private: 139 Env* env_; 140 const Comparator* user_comparator_; 141 const MergeOperator* user_merge_operator_; 142 const CompactionFilter* compaction_filter_; 143 const std::atomic<bool>* shutting_down_; 144 Logger* logger_; 145 bool assert_valid_internal_key_; // enforce no internal key corruption? 146 bool allow_single_operand_; 147 SequenceNumber latest_snapshot_; 148 const SnapshotChecker* const snapshot_checker_; 149 int level_; 150 151 // the scratch area that holds the result of MergeUntil 152 // valid up to the next MergeUntil call 153 154 // Keeps track of the sequence of keys seen 155 std::deque<std::string> keys_; 156 // Parallel with keys_; stores the operands 157 mutable MergeContext merge_context_; 158 159 StopWatchNano filter_timer_; 160 uint64_t total_filter_time_; 161 Statistics* stats_; 162 163 bool has_compaction_filter_skip_until_ = false; 164 std::string compaction_filter_value_; 165 InternalKey compaction_filter_skip_until_; 166 IsShuttingDown()167 bool IsShuttingDown() { 168 // This is a best-effort facility, so memory_order_relaxed is sufficient. 169 return shutting_down_ && shutting_down_->load(std::memory_order_relaxed); 170 } 171 }; 172 173 // MergeOutputIterator can be used to iterate over the result of a merge. 174 class MergeOutputIterator { 175 public: 176 // The MergeOutputIterator is bound to a MergeHelper instance. 177 explicit MergeOutputIterator(const MergeHelper* merge_helper); 178 179 // Seeks to the first record in the output. 180 void SeekToFirst(); 181 // Advances to the next record in the output. 182 void Next(); 183 key()184 Slice key() { return Slice(*it_keys_); } value()185 Slice value() { return Slice(*it_values_); } Valid()186 bool Valid() { return it_keys_ != merge_helper_->keys().rend(); } 187 188 private: 189 const MergeHelper* merge_helper_; 190 std::deque<std::string>::const_reverse_iterator it_keys_; 191 std::vector<Slice>::const_reverse_iterator it_values_; 192 }; 193 194 } // namespace ROCKSDB_NAMESPACE 195