1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include "db/merge_helper.h"
7
8 #include <string>
9
10 #include "db/dbformat.h"
11 #include "monitoring/perf_context_imp.h"
12 #include "monitoring/statistics.h"
13 #include "port/likely.h"
14 #include "rocksdb/comparator.h"
15 #include "rocksdb/db.h"
16 #include "rocksdb/merge_operator.h"
17 #include "rocksdb/system_clock.h"
18 #include "table/format.h"
19 #include "table/internal_iterator.h"
20
21 namespace ROCKSDB_NAMESPACE {
22
MergeHelper(Env * env,const Comparator * user_comparator,const MergeOperator * user_merge_operator,const CompactionFilter * compaction_filter,Logger * logger,bool assert_valid_internal_key,SequenceNumber latest_snapshot,const SnapshotChecker * snapshot_checker,int level,Statistics * stats,const std::atomic<bool> * shutting_down)23 MergeHelper::MergeHelper(Env* env, const Comparator* user_comparator,
24 const MergeOperator* user_merge_operator,
25 const CompactionFilter* compaction_filter,
26 Logger* logger, bool assert_valid_internal_key,
27 SequenceNumber latest_snapshot,
28 const SnapshotChecker* snapshot_checker, int level,
29 Statistics* stats,
30 const std::atomic<bool>* shutting_down)
31 : env_(env),
32 clock_(env->GetSystemClock().get()),
33 user_comparator_(user_comparator),
34 user_merge_operator_(user_merge_operator),
35 compaction_filter_(compaction_filter),
36 shutting_down_(shutting_down),
37 logger_(logger),
38 assert_valid_internal_key_(assert_valid_internal_key),
39 allow_single_operand_(false),
40 latest_snapshot_(latest_snapshot),
41 snapshot_checker_(snapshot_checker),
42 level_(level),
43 keys_(),
44 filter_timer_(clock_),
45 total_filter_time_(0U),
46 stats_(stats) {
47 assert(user_comparator_ != nullptr);
48 if (user_merge_operator_) {
49 allow_single_operand_ = user_merge_operator_->AllowSingleOperand();
50 }
51 }
52
TimedFullMerge(const MergeOperator * merge_operator,const Slice & key,const Slice * value,const std::vector<Slice> & operands,std::string * result,Logger * logger,Statistics * statistics,SystemClock * clock,Slice * result_operand,bool update_num_ops_stats)53 Status MergeHelper::TimedFullMerge(const MergeOperator* merge_operator,
54 const Slice& key, const Slice* value,
55 const std::vector<Slice>& operands,
56 std::string* result, Logger* logger,
57 Statistics* statistics, SystemClock* clock,
58 Slice* result_operand,
59 bool update_num_ops_stats) {
60 assert(merge_operator != nullptr);
61
62 if (operands.size() == 0) {
63 assert(value != nullptr && result != nullptr);
64 result->assign(value->data(), value->size());
65 return Status::OK();
66 }
67
68 if (update_num_ops_stats) {
69 RecordInHistogram(statistics, READ_NUM_MERGE_OPERANDS,
70 static_cast<uint64_t>(operands.size()));
71 }
72
73 bool success;
74 Slice tmp_result_operand(nullptr, 0);
75 const MergeOperator::MergeOperationInput merge_in(key, value, operands,
76 logger);
77 MergeOperator::MergeOperationOutput merge_out(*result, tmp_result_operand);
78 {
79 // Setup to time the merge
80 StopWatchNano timer(clock, statistics != nullptr);
81 PERF_TIMER_GUARD(merge_operator_time_nanos);
82
83 // Do the merge
84 success = merge_operator->FullMergeV2(merge_in, &merge_out);
85
86 if (tmp_result_operand.data()) {
87 // FullMergeV2 result is an existing operand
88 if (result_operand != nullptr) {
89 *result_operand = tmp_result_operand;
90 } else {
91 result->assign(tmp_result_operand.data(), tmp_result_operand.size());
92 }
93 } else if (result_operand) {
94 *result_operand = Slice(nullptr, 0);
95 }
96
97 RecordTick(statistics, MERGE_OPERATION_TOTAL_TIME,
98 statistics ? timer.ElapsedNanos() : 0);
99 }
100
101 if (!success) {
102 RecordTick(statistics, NUMBER_MERGE_FAILURES);
103 return Status::Corruption("Error: Could not perform merge.");
104 }
105
106 return Status::OK();
107 }
108
109 // PRE: iter points to the first merge type entry
110 // POST: iter points to the first entry beyond the merge process (or the end)
111 // keys_, operands_ are updated to reflect the merge result.
112 // keys_ stores the list of keys encountered while merging.
113 // operands_ stores the list of merge operands encountered while merging.
114 // keys_[i] corresponds to operands_[i] for each i.
115 //
116 // TODO: Avoid the snapshot stripe map lookup in CompactionRangeDelAggregator
117 // and just pass the StripeRep corresponding to the stripe being merged.
MergeUntil(InternalIterator * iter,CompactionRangeDelAggregator * range_del_agg,const SequenceNumber stop_before,const bool at_bottom,const bool allow_data_in_errors)118 Status MergeHelper::MergeUntil(InternalIterator* iter,
119 CompactionRangeDelAggregator* range_del_agg,
120 const SequenceNumber stop_before,
121 const bool at_bottom,
122 const bool allow_data_in_errors) {
123 // Get a copy of the internal key, before it's invalidated by iter->Next()
124 // Also maintain the list of merge operands seen.
125 assert(HasOperator());
126 keys_.clear();
127 merge_context_.Clear();
128 has_compaction_filter_skip_until_ = false;
129 assert(user_merge_operator_);
130 bool first_key = true;
131
132 // We need to parse the internal key again as the parsed key is
133 // backed by the internal key!
134 // Assume no internal key corruption as it has been successfully parsed
135 // by the caller.
136 // original_key_is_iter variable is just caching the information:
137 // original_key_is_iter == (iter->key().ToString() == original_key)
138 bool original_key_is_iter = true;
139 std::string original_key = iter->key().ToString();
140 // Important:
141 // orig_ikey is backed by original_key if keys_.empty()
142 // orig_ikey is backed by keys_.back() if !keys_.empty()
143 ParsedInternalKey orig_ikey;
144
145 Status s = ParseInternalKey(original_key, &orig_ikey, allow_data_in_errors);
146 assert(s.ok());
147 if (!s.ok()) return s;
148
149 bool hit_the_next_user_key = false;
150 for (; iter->Valid(); iter->Next(), original_key_is_iter = false) {
151 if (IsShuttingDown()) {
152 s = Status::ShutdownInProgress();
153 return s;
154 }
155
156 ParsedInternalKey ikey;
157 assert(keys_.size() == merge_context_.GetNumOperands());
158
159 Status pik_status =
160 ParseInternalKey(iter->key(), &ikey, allow_data_in_errors);
161 if (!pik_status.ok()) {
162 // stop at corrupted key
163 if (assert_valid_internal_key_) {
164 return pik_status;
165 }
166 break;
167 } else if (first_key) {
168 assert(user_comparator_->Equal(ikey.user_key, orig_ikey.user_key));
169 first_key = false;
170 } else if (!user_comparator_->Equal(ikey.user_key, orig_ikey.user_key)) {
171 // hit a different user key, stop right here
172 hit_the_next_user_key = true;
173 break;
174 } else if (stop_before > 0 && ikey.sequence <= stop_before &&
175 LIKELY(snapshot_checker_ == nullptr ||
176 snapshot_checker_->CheckInSnapshot(ikey.sequence,
177 stop_before) !=
178 SnapshotCheckerResult::kNotInSnapshot)) {
179 // hit an entry that's possibly visible by the previous snapshot, can't
180 // touch that
181 break;
182 }
183
184 // At this point we are guaranteed that we need to process this key.
185
186 assert(IsValueType(ikey.type));
187 if (ikey.type != kTypeMerge) {
188 // hit a put/delete/single delete
189 // => merge the put value or a nullptr with operands_
190 // => store result in operands_.back() (and update keys_.back())
191 // => change the entry type to kTypeValue for keys_.back()
192 // We are done! Success!
193
194 // If there are no operands, just return the Status::OK(). That will cause
195 // the compaction iterator to write out the key we're currently at, which
196 // is the put/delete we just encountered.
197 if (keys_.empty()) {
198 return s;
199 }
200
201 // TODO(noetzli) If the merge operator returns false, we are currently
202 // (almost) silently dropping the put/delete. That's probably not what we
203 // want. Also if we're in compaction and it's a put, it would be nice to
204 // run compaction filter on it.
205 const Slice val = iter->value();
206 const Slice* val_ptr;
207 if (kTypeValue == ikey.type &&
208 (range_del_agg == nullptr ||
209 !range_del_agg->ShouldDelete(
210 ikey, RangeDelPositioningMode::kForwardTraversal))) {
211 val_ptr = &val;
212 } else {
213 val_ptr = nullptr;
214 }
215 std::string merge_result;
216 s = TimedFullMerge(user_merge_operator_, ikey.user_key, val_ptr,
217 merge_context_.GetOperands(), &merge_result, logger_,
218 stats_, clock_);
219
220 // We store the result in keys_.back() and operands_.back()
221 // if nothing went wrong (i.e.: no operand corruption on disk)
222 if (s.ok()) {
223 // The original key encountered
224 original_key = std::move(keys_.back());
225 orig_ikey.type = kTypeValue;
226 UpdateInternalKey(&original_key, orig_ikey.sequence, orig_ikey.type);
227 keys_.clear();
228 merge_context_.Clear();
229 keys_.emplace_front(std::move(original_key));
230 merge_context_.PushOperand(merge_result);
231 }
232
233 // move iter to the next entry
234 iter->Next();
235 return s;
236 } else {
237 // hit a merge
238 // => if there is a compaction filter, apply it.
239 // => check for range tombstones covering the operand
240 // => merge the operand into the front of the operands_ list
241 // if not filtered
242 // => then continue because we haven't yet seen a Put/Delete.
243 //
244 // Keep queuing keys and operands until we either meet a put / delete
245 // request or later did a partial merge.
246
247 Slice value_slice = iter->value();
248 // add an operand to the list if:
249 // 1) it's included in one of the snapshots. in that case we *must* write
250 // it out, no matter what compaction filter says
251 // 2) it's not filtered by a compaction filter
252 CompactionFilter::Decision filter =
253 ikey.sequence <= latest_snapshot_
254 ? CompactionFilter::Decision::kKeep
255 : FilterMerge(orig_ikey.user_key, value_slice);
256 if (filter != CompactionFilter::Decision::kRemoveAndSkipUntil &&
257 range_del_agg != nullptr &&
258 range_del_agg->ShouldDelete(
259 iter->key(), RangeDelPositioningMode::kForwardTraversal)) {
260 filter = CompactionFilter::Decision::kRemove;
261 }
262 if (filter == CompactionFilter::Decision::kKeep ||
263 filter == CompactionFilter::Decision::kChangeValue) {
264 if (original_key_is_iter) {
265 // this is just an optimization that saves us one memcpy
266 keys_.push_front(std::move(original_key));
267 } else {
268 keys_.push_front(iter->key().ToString());
269 }
270 if (keys_.size() == 1) {
271 // we need to re-anchor the orig_ikey because it was anchored by
272 // original_key before
273 pik_status =
274 ParseInternalKey(keys_.back(), &orig_ikey, allow_data_in_errors);
275 pik_status.PermitUncheckedError();
276 assert(pik_status.ok());
277 }
278 if (filter == CompactionFilter::Decision::kKeep) {
279 merge_context_.PushOperand(
280 value_slice, iter->IsValuePinned() /* operand_pinned */);
281 } else { // kChangeValue
282 // Compaction filter asked us to change the operand from value_slice
283 // to compaction_filter_value_.
284 merge_context_.PushOperand(compaction_filter_value_, false);
285 }
286 } else if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil) {
287 // Compaction filter asked us to remove this key altogether
288 // (not just this operand), along with some keys following it.
289 keys_.clear();
290 merge_context_.Clear();
291 has_compaction_filter_skip_until_ = true;
292 return s;
293 }
294 }
295 }
296
297 if (merge_context_.GetNumOperands() == 0) {
298 // we filtered out all the merge operands
299 return s;
300 }
301
302 // We are sure we have seen this key's entire history if:
303 // at_bottom == true (this does not necessarily mean it is the bottommost
304 // layer, but rather that we are confident the key does not appear on any of
305 // the lower layers, at_bottom == false doesn't mean it does appear, just
306 // that we can't be sure, see Compaction::IsBottommostLevel for details)
307 // AND
308 // we have either encountered another key or end of key history on this
309 // layer.
310 //
311 // When these conditions are true we are able to merge all the keys
312 // using full merge.
313 //
314 // For these cases we are not sure about, we simply miss the opportunity
315 // to combine the keys. Since VersionSet::SetupOtherInputs() always makes
316 // sure that all merge-operands on the same level get compacted together,
317 // this will simply lead to these merge operands moving to the next level.
318 bool surely_seen_the_beginning =
319 (hit_the_next_user_key || !iter->Valid()) && at_bottom;
320 if (surely_seen_the_beginning) {
321 // do a final merge with nullptr as the existing value and say
322 // bye to the merge type (it's now converted to a Put)
323 assert(kTypeMerge == orig_ikey.type);
324 assert(merge_context_.GetNumOperands() >= 1);
325 assert(merge_context_.GetNumOperands() == keys_.size());
326 std::string merge_result;
327 s = TimedFullMerge(user_merge_operator_, orig_ikey.user_key, nullptr,
328 merge_context_.GetOperands(), &merge_result, logger_,
329 stats_, clock_);
330 if (s.ok()) {
331 // The original key encountered
332 // We are certain that keys_ is not empty here (see assertions couple of
333 // lines before).
334 original_key = std::move(keys_.back());
335 orig_ikey.type = kTypeValue;
336 UpdateInternalKey(&original_key, orig_ikey.sequence, orig_ikey.type);
337 keys_.clear();
338 merge_context_.Clear();
339 keys_.emplace_front(std::move(original_key));
340 merge_context_.PushOperand(merge_result);
341 }
342 } else {
343 // We haven't seen the beginning of the key nor a Put/Delete.
344 // Attempt to use the user's associative merge function to
345 // merge the stacked merge operands into a single operand.
346 s = Status::MergeInProgress();
347 if (merge_context_.GetNumOperands() >= 2 ||
348 (allow_single_operand_ && merge_context_.GetNumOperands() == 1)) {
349 bool merge_success = false;
350 std::string merge_result;
351 {
352 StopWatchNano timer(clock_, stats_ != nullptr);
353 PERF_TIMER_GUARD(merge_operator_time_nanos);
354 merge_success = user_merge_operator_->PartialMergeMulti(
355 orig_ikey.user_key,
356 std::deque<Slice>(merge_context_.GetOperands().begin(),
357 merge_context_.GetOperands().end()),
358 &merge_result, logger_);
359 RecordTick(stats_, MERGE_OPERATION_TOTAL_TIME,
360 stats_ ? timer.ElapsedNanosSafe() : 0);
361 }
362 if (merge_success) {
363 // Merging of operands (associative merge) was successful.
364 // Replace operands with the merge result
365 merge_context_.Clear();
366 merge_context_.PushOperand(merge_result);
367 keys_.erase(keys_.begin(), keys_.end() - 1);
368 }
369 }
370 }
371
372 return s;
373 }
374
MergeOutputIterator(const MergeHelper * merge_helper)375 MergeOutputIterator::MergeOutputIterator(const MergeHelper* merge_helper)
376 : merge_helper_(merge_helper) {
377 it_keys_ = merge_helper_->keys().rend();
378 it_values_ = merge_helper_->values().rend();
379 }
380
SeekToFirst()381 void MergeOutputIterator::SeekToFirst() {
382 const auto& keys = merge_helper_->keys();
383 const auto& values = merge_helper_->values();
384 assert(keys.size() == values.size());
385 it_keys_ = keys.rbegin();
386 it_values_ = values.rbegin();
387 }
388
Next()389 void MergeOutputIterator::Next() {
390 ++it_keys_;
391 ++it_values_;
392 }
393
FilterMerge(const Slice & user_key,const Slice & value_slice)394 CompactionFilter::Decision MergeHelper::FilterMerge(const Slice& user_key,
395 const Slice& value_slice) {
396 if (compaction_filter_ == nullptr) {
397 return CompactionFilter::Decision::kKeep;
398 }
399 if (stats_ != nullptr && ShouldReportDetailedTime(env_, stats_)) {
400 filter_timer_.Start();
401 }
402 compaction_filter_value_.clear();
403 compaction_filter_skip_until_.Clear();
404 auto ret = compaction_filter_->FilterV2(
405 level_, user_key, CompactionFilter::ValueType::kMergeOperand, value_slice,
406 &compaction_filter_value_, compaction_filter_skip_until_.rep());
407 if (ret == CompactionFilter::Decision::kRemoveAndSkipUntil) {
408 if (user_comparator_->Compare(*compaction_filter_skip_until_.rep(),
409 user_key) <= 0) {
410 // Invalid skip_until returned from compaction filter.
411 // Keep the key as per FilterV2 documentation.
412 ret = CompactionFilter::Decision::kKeep;
413 } else {
414 compaction_filter_skip_until_.ConvertFromUserKey(kMaxSequenceNumber,
415 kValueTypeForSeek);
416 }
417 }
418 if (stats_ != nullptr && ShouldReportDetailedTime(env_, stats_)) {
419 total_filter_time_ += filter_timer_.ElapsedNanosSafe();
420 }
421 return ret;
422 }
423
424 } // namespace ROCKSDB_NAMESPACE
425