1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #include "db/merge_helper.h"
7 
8 #include <string>
9 
10 #include "db/dbformat.h"
11 #include "monitoring/perf_context_imp.h"
12 #include "monitoring/statistics.h"
13 #include "port/likely.h"
14 #include "rocksdb/comparator.h"
15 #include "rocksdb/db.h"
16 #include "rocksdb/merge_operator.h"
17 #include "rocksdb/system_clock.h"
18 #include "table/format.h"
19 #include "table/internal_iterator.h"
20 
21 namespace ROCKSDB_NAMESPACE {
22 
MergeHelper(Env * env,const Comparator * user_comparator,const MergeOperator * user_merge_operator,const CompactionFilter * compaction_filter,Logger * logger,bool assert_valid_internal_key,SequenceNumber latest_snapshot,const SnapshotChecker * snapshot_checker,int level,Statistics * stats,const std::atomic<bool> * shutting_down)23 MergeHelper::MergeHelper(Env* env, const Comparator* user_comparator,
24                          const MergeOperator* user_merge_operator,
25                          const CompactionFilter* compaction_filter,
26                          Logger* logger, bool assert_valid_internal_key,
27                          SequenceNumber latest_snapshot,
28                          const SnapshotChecker* snapshot_checker, int level,
29                          Statistics* stats,
30                          const std::atomic<bool>* shutting_down)
31     : env_(env),
32       clock_(env->GetSystemClock().get()),
33       user_comparator_(user_comparator),
34       user_merge_operator_(user_merge_operator),
35       compaction_filter_(compaction_filter),
36       shutting_down_(shutting_down),
37       logger_(logger),
38       assert_valid_internal_key_(assert_valid_internal_key),
39       allow_single_operand_(false),
40       latest_snapshot_(latest_snapshot),
41       snapshot_checker_(snapshot_checker),
42       level_(level),
43       keys_(),
44       filter_timer_(clock_),
45       total_filter_time_(0U),
46       stats_(stats) {
47   assert(user_comparator_ != nullptr);
48   if (user_merge_operator_) {
49     allow_single_operand_ = user_merge_operator_->AllowSingleOperand();
50   }
51 }
52 
TimedFullMerge(const MergeOperator * merge_operator,const Slice & key,const Slice * value,const std::vector<Slice> & operands,std::string * result,Logger * logger,Statistics * statistics,SystemClock * clock,Slice * result_operand,bool update_num_ops_stats)53 Status MergeHelper::TimedFullMerge(const MergeOperator* merge_operator,
54                                    const Slice& key, const Slice* value,
55                                    const std::vector<Slice>& operands,
56                                    std::string* result, Logger* logger,
57                                    Statistics* statistics, SystemClock* clock,
58                                    Slice* result_operand,
59                                    bool update_num_ops_stats) {
60   assert(merge_operator != nullptr);
61 
62   if (operands.size() == 0) {
63     assert(value != nullptr && result != nullptr);
64     result->assign(value->data(), value->size());
65     return Status::OK();
66   }
67 
68   if (update_num_ops_stats) {
69     RecordInHistogram(statistics, READ_NUM_MERGE_OPERANDS,
70                       static_cast<uint64_t>(operands.size()));
71   }
72 
73   bool success;
74   Slice tmp_result_operand(nullptr, 0);
75   const MergeOperator::MergeOperationInput merge_in(key, value, operands,
76                                                     logger);
77   MergeOperator::MergeOperationOutput merge_out(*result, tmp_result_operand);
78   {
79     // Setup to time the merge
80     StopWatchNano timer(clock, statistics != nullptr);
81     PERF_TIMER_GUARD(merge_operator_time_nanos);
82 
83     // Do the merge
84     success = merge_operator->FullMergeV2(merge_in, &merge_out);
85 
86     if (tmp_result_operand.data()) {
87       // FullMergeV2 result is an existing operand
88       if (result_operand != nullptr) {
89         *result_operand = tmp_result_operand;
90       } else {
91         result->assign(tmp_result_operand.data(), tmp_result_operand.size());
92       }
93     } else if (result_operand) {
94       *result_operand = Slice(nullptr, 0);
95     }
96 
97     RecordTick(statistics, MERGE_OPERATION_TOTAL_TIME,
98                statistics ? timer.ElapsedNanos() : 0);
99   }
100 
101   if (!success) {
102     RecordTick(statistics, NUMBER_MERGE_FAILURES);
103     return Status::Corruption("Error: Could not perform merge.");
104   }
105 
106   return Status::OK();
107 }
108 
109 // PRE:  iter points to the first merge type entry
110 // POST: iter points to the first entry beyond the merge process (or the end)
111 //       keys_, operands_ are updated to reflect the merge result.
112 //       keys_ stores the list of keys encountered while merging.
113 //       operands_ stores the list of merge operands encountered while merging.
114 //       keys_[i] corresponds to operands_[i] for each i.
115 //
116 // TODO: Avoid the snapshot stripe map lookup in CompactionRangeDelAggregator
117 // and just pass the StripeRep corresponding to the stripe being merged.
MergeUntil(InternalIterator * iter,CompactionRangeDelAggregator * range_del_agg,const SequenceNumber stop_before,const bool at_bottom,const bool allow_data_in_errors)118 Status MergeHelper::MergeUntil(InternalIterator* iter,
119                                CompactionRangeDelAggregator* range_del_agg,
120                                const SequenceNumber stop_before,
121                                const bool at_bottom,
122                                const bool allow_data_in_errors) {
123   // Get a copy of the internal key, before it's invalidated by iter->Next()
124   // Also maintain the list of merge operands seen.
125   assert(HasOperator());
126   keys_.clear();
127   merge_context_.Clear();
128   has_compaction_filter_skip_until_ = false;
129   assert(user_merge_operator_);
130   bool first_key = true;
131 
132   // We need to parse the internal key again as the parsed key is
133   // backed by the internal key!
134   // Assume no internal key corruption as it has been successfully parsed
135   // by the caller.
136   // original_key_is_iter variable is just caching the information:
137   // original_key_is_iter == (iter->key().ToString() == original_key)
138   bool original_key_is_iter = true;
139   std::string original_key = iter->key().ToString();
140   // Important:
141   // orig_ikey is backed by original_key if keys_.empty()
142   // orig_ikey is backed by keys_.back() if !keys_.empty()
143   ParsedInternalKey orig_ikey;
144 
145   Status s = ParseInternalKey(original_key, &orig_ikey, allow_data_in_errors);
146   assert(s.ok());
147   if (!s.ok()) return s;
148 
149   bool hit_the_next_user_key = false;
150   for (; iter->Valid(); iter->Next(), original_key_is_iter = false) {
151     if (IsShuttingDown()) {
152       s = Status::ShutdownInProgress();
153       return s;
154     }
155 
156     ParsedInternalKey ikey;
157     assert(keys_.size() == merge_context_.GetNumOperands());
158 
159     Status pik_status =
160         ParseInternalKey(iter->key(), &ikey, allow_data_in_errors);
161     if (!pik_status.ok()) {
162       // stop at corrupted key
163       if (assert_valid_internal_key_) {
164         return pik_status;
165       }
166       break;
167     } else if (first_key) {
168       assert(user_comparator_->Equal(ikey.user_key, orig_ikey.user_key));
169       first_key = false;
170     } else if (!user_comparator_->Equal(ikey.user_key, orig_ikey.user_key)) {
171       // hit a different user key, stop right here
172       hit_the_next_user_key = true;
173       break;
174     } else if (stop_before > 0 && ikey.sequence <= stop_before &&
175                LIKELY(snapshot_checker_ == nullptr ||
176                       snapshot_checker_->CheckInSnapshot(ikey.sequence,
177                                                          stop_before) !=
178                           SnapshotCheckerResult::kNotInSnapshot)) {
179       // hit an entry that's possibly visible by the previous snapshot, can't
180       // touch that
181       break;
182     }
183 
184     // At this point we are guaranteed that we need to process this key.
185 
186     assert(IsValueType(ikey.type));
187     if (ikey.type != kTypeMerge) {
188       // hit a put/delete/single delete
189       //   => merge the put value or a nullptr with operands_
190       //   => store result in operands_.back() (and update keys_.back())
191       //   => change the entry type to kTypeValue for keys_.back()
192       // We are done! Success!
193 
194       // If there are no operands, just return the Status::OK(). That will cause
195       // the compaction iterator to write out the key we're currently at, which
196       // is the put/delete we just encountered.
197       if (keys_.empty()) {
198         return s;
199       }
200 
201       // TODO(noetzli) If the merge operator returns false, we are currently
202       // (almost) silently dropping the put/delete. That's probably not what we
203       // want. Also if we're in compaction and it's a put, it would be nice to
204       // run compaction filter on it.
205       const Slice val = iter->value();
206       const Slice* val_ptr;
207       if (kTypeValue == ikey.type &&
208           (range_del_agg == nullptr ||
209            !range_del_agg->ShouldDelete(
210                ikey, RangeDelPositioningMode::kForwardTraversal))) {
211         val_ptr = &val;
212       } else {
213         val_ptr = nullptr;
214       }
215       std::string merge_result;
216       s = TimedFullMerge(user_merge_operator_, ikey.user_key, val_ptr,
217                          merge_context_.GetOperands(), &merge_result, logger_,
218                          stats_, clock_);
219 
220       // We store the result in keys_.back() and operands_.back()
221       // if nothing went wrong (i.e.: no operand corruption on disk)
222       if (s.ok()) {
223         // The original key encountered
224         original_key = std::move(keys_.back());
225         orig_ikey.type = kTypeValue;
226         UpdateInternalKey(&original_key, orig_ikey.sequence, orig_ikey.type);
227         keys_.clear();
228         merge_context_.Clear();
229         keys_.emplace_front(std::move(original_key));
230         merge_context_.PushOperand(merge_result);
231       }
232 
233       // move iter to the next entry
234       iter->Next();
235       return s;
236     } else {
237       // hit a merge
238       //   => if there is a compaction filter, apply it.
239       //   => check for range tombstones covering the operand
240       //   => merge the operand into the front of the operands_ list
241       //      if not filtered
242       //   => then continue because we haven't yet seen a Put/Delete.
243       //
244       // Keep queuing keys and operands until we either meet a put / delete
245       // request or later did a partial merge.
246 
247       Slice value_slice = iter->value();
248       // add an operand to the list if:
249       // 1) it's included in one of the snapshots. in that case we *must* write
250       // it out, no matter what compaction filter says
251       // 2) it's not filtered by a compaction filter
252       CompactionFilter::Decision filter =
253           ikey.sequence <= latest_snapshot_
254               ? CompactionFilter::Decision::kKeep
255               : FilterMerge(orig_ikey.user_key, value_slice);
256       if (filter != CompactionFilter::Decision::kRemoveAndSkipUntil &&
257           range_del_agg != nullptr &&
258           range_del_agg->ShouldDelete(
259               iter->key(), RangeDelPositioningMode::kForwardTraversal)) {
260         filter = CompactionFilter::Decision::kRemove;
261       }
262       if (filter == CompactionFilter::Decision::kKeep ||
263           filter == CompactionFilter::Decision::kChangeValue) {
264         if (original_key_is_iter) {
265           // this is just an optimization that saves us one memcpy
266           keys_.push_front(std::move(original_key));
267         } else {
268           keys_.push_front(iter->key().ToString());
269         }
270         if (keys_.size() == 1) {
271           // we need to re-anchor the orig_ikey because it was anchored by
272           // original_key before
273           pik_status =
274               ParseInternalKey(keys_.back(), &orig_ikey, allow_data_in_errors);
275           pik_status.PermitUncheckedError();
276           assert(pik_status.ok());
277         }
278         if (filter == CompactionFilter::Decision::kKeep) {
279           merge_context_.PushOperand(
280               value_slice, iter->IsValuePinned() /* operand_pinned */);
281         } else {  // kChangeValue
282           // Compaction filter asked us to change the operand from value_slice
283           // to compaction_filter_value_.
284           merge_context_.PushOperand(compaction_filter_value_, false);
285         }
286       } else if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil) {
287         // Compaction filter asked us to remove this key altogether
288         // (not just this operand), along with some keys following it.
289         keys_.clear();
290         merge_context_.Clear();
291         has_compaction_filter_skip_until_ = true;
292         return s;
293       }
294     }
295   }
296 
297   if (merge_context_.GetNumOperands() == 0) {
298     // we filtered out all the merge operands
299     return s;
300   }
301 
302   // We are sure we have seen this key's entire history if:
303   // at_bottom == true (this does not necessarily mean it is the bottommost
304   // layer, but rather that we are confident the key does not appear on any of
305   // the lower layers, at_bottom == false doesn't mean it does appear, just
306   // that we can't be sure, see Compaction::IsBottommostLevel for details)
307   // AND
308   // we have either encountered another key or end of key history on this
309   // layer.
310   //
311   // When these conditions are true we are able to merge all the keys
312   // using full merge.
313   //
314   // For these cases we are not sure about, we simply miss the opportunity
315   // to combine the keys. Since VersionSet::SetupOtherInputs() always makes
316   // sure that all merge-operands on the same level get compacted together,
317   // this will simply lead to these merge operands moving to the next level.
318   bool surely_seen_the_beginning =
319       (hit_the_next_user_key || !iter->Valid()) && at_bottom;
320   if (surely_seen_the_beginning) {
321     // do a final merge with nullptr as the existing value and say
322     // bye to the merge type (it's now converted to a Put)
323     assert(kTypeMerge == orig_ikey.type);
324     assert(merge_context_.GetNumOperands() >= 1);
325     assert(merge_context_.GetNumOperands() == keys_.size());
326     std::string merge_result;
327     s = TimedFullMerge(user_merge_operator_, orig_ikey.user_key, nullptr,
328                        merge_context_.GetOperands(), &merge_result, logger_,
329                        stats_, clock_);
330     if (s.ok()) {
331       // The original key encountered
332       // We are certain that keys_ is not empty here (see assertions couple of
333       // lines before).
334       original_key = std::move(keys_.back());
335       orig_ikey.type = kTypeValue;
336       UpdateInternalKey(&original_key, orig_ikey.sequence, orig_ikey.type);
337       keys_.clear();
338       merge_context_.Clear();
339       keys_.emplace_front(std::move(original_key));
340       merge_context_.PushOperand(merge_result);
341     }
342   } else {
343     // We haven't seen the beginning of the key nor a Put/Delete.
344     // Attempt to use the user's associative merge function to
345     // merge the stacked merge operands into a single operand.
346     s = Status::MergeInProgress();
347     if (merge_context_.GetNumOperands() >= 2 ||
348         (allow_single_operand_ && merge_context_.GetNumOperands() == 1)) {
349       bool merge_success = false;
350       std::string merge_result;
351       {
352         StopWatchNano timer(clock_, stats_ != nullptr);
353         PERF_TIMER_GUARD(merge_operator_time_nanos);
354         merge_success = user_merge_operator_->PartialMergeMulti(
355             orig_ikey.user_key,
356             std::deque<Slice>(merge_context_.GetOperands().begin(),
357                               merge_context_.GetOperands().end()),
358             &merge_result, logger_);
359         RecordTick(stats_, MERGE_OPERATION_TOTAL_TIME,
360                    stats_ ? timer.ElapsedNanosSafe() : 0);
361       }
362       if (merge_success) {
363         // Merging of operands (associative merge) was successful.
364         // Replace operands with the merge result
365         merge_context_.Clear();
366         merge_context_.PushOperand(merge_result);
367         keys_.erase(keys_.begin(), keys_.end() - 1);
368       }
369     }
370   }
371 
372   return s;
373 }
374 
MergeOutputIterator(const MergeHelper * merge_helper)375 MergeOutputIterator::MergeOutputIterator(const MergeHelper* merge_helper)
376     : merge_helper_(merge_helper) {
377   it_keys_ = merge_helper_->keys().rend();
378   it_values_ = merge_helper_->values().rend();
379 }
380 
SeekToFirst()381 void MergeOutputIterator::SeekToFirst() {
382   const auto& keys = merge_helper_->keys();
383   const auto& values = merge_helper_->values();
384   assert(keys.size() == values.size());
385   it_keys_ = keys.rbegin();
386   it_values_ = values.rbegin();
387 }
388 
Next()389 void MergeOutputIterator::Next() {
390   ++it_keys_;
391   ++it_values_;
392 }
393 
FilterMerge(const Slice & user_key,const Slice & value_slice)394 CompactionFilter::Decision MergeHelper::FilterMerge(const Slice& user_key,
395                                                     const Slice& value_slice) {
396   if (compaction_filter_ == nullptr) {
397     return CompactionFilter::Decision::kKeep;
398   }
399   if (stats_ != nullptr && ShouldReportDetailedTime(env_, stats_)) {
400     filter_timer_.Start();
401   }
402   compaction_filter_value_.clear();
403   compaction_filter_skip_until_.Clear();
404   auto ret = compaction_filter_->FilterV2(
405       level_, user_key, CompactionFilter::ValueType::kMergeOperand, value_slice,
406       &compaction_filter_value_, compaction_filter_skip_until_.rep());
407   if (ret == CompactionFilter::Decision::kRemoveAndSkipUntil) {
408     if (user_comparator_->Compare(*compaction_filter_skip_until_.rep(),
409                                   user_key) <= 0) {
410       // Invalid skip_until returned from compaction filter.
411       // Keep the key as per FilterV2 documentation.
412       ret = CompactionFilter::Decision::kKeep;
413     } else {
414       compaction_filter_skip_until_.ConvertFromUserKey(kMaxSequenceNumber,
415                                                        kValueTypeForSeek);
416     }
417   }
418   if (stats_ != nullptr && ShouldReportDetailedTime(env_, stats_)) {
419     total_filter_time_ += filter_timer_.ElapsedNanosSafe();
420   }
421   return ret;
422 }
423 
424 }  // namespace ROCKSDB_NAMESPACE
425