1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "table/merging_iterator.h"
11 #include <string>
12 #include <vector>
13 #include "db/dbformat.h"
14 #include "db/pinned_iterators_manager.h"
15 #include "memory/arena.h"
16 #include "monitoring/perf_context_imp.h"
17 #include "rocksdb/comparator.h"
18 #include "rocksdb/iterator.h"
19 #include "rocksdb/options.h"
20 #include "table/internal_iterator.h"
21 #include "table/iter_heap.h"
22 #include "table/iterator_wrapper.h"
23 #include "test_util/sync_point.h"
24 #include "util/autovector.h"
25 #include "util/heap.h"
26 #include "util/stop_watch.h"
27
28 namespace ROCKSDB_NAMESPACE {
29 // Without anonymous namespace here, we fail the warning -Wmissing-prototypes
30 namespace {
31 typedef BinaryHeap<IteratorWrapper*, MaxIteratorComparator> MergerMaxIterHeap;
32 typedef BinaryHeap<IteratorWrapper*, MinIteratorComparator> MergerMinIterHeap;
33 } // namespace
34
35 const size_t kNumIterReserve = 4;
36
37 class MergingIterator : public InternalIterator {
38 public:
MergingIterator(const InternalKeyComparator * comparator,InternalIterator ** children,int n,bool is_arena_mode,bool prefix_seek_mode)39 MergingIterator(const InternalKeyComparator* comparator,
40 InternalIterator** children, int n, bool is_arena_mode,
41 bool prefix_seek_mode)
42 : is_arena_mode_(is_arena_mode),
43 comparator_(comparator),
44 current_(nullptr),
45 direction_(kForward),
46 minHeap_(comparator_),
47 prefix_seek_mode_(prefix_seek_mode),
48 pinned_iters_mgr_(nullptr) {
49 children_.resize(n);
50 for (int i = 0; i < n; i++) {
51 children_[i].Set(children[i]);
52 }
53 for (auto& child : children_) {
54 AddToMinHeapOrCheckStatus(&child);
55 }
56 current_ = CurrentForward();
57 }
58
considerStatus(Status s)59 void considerStatus(Status s) {
60 if (!s.ok() && status_.ok()) {
61 status_ = s;
62 }
63 }
64
AddIterator(InternalIterator * iter)65 virtual void AddIterator(InternalIterator* iter) {
66 assert(direction_ == kForward);
67 children_.emplace_back(iter);
68 if (pinned_iters_mgr_) {
69 iter->SetPinnedItersMgr(pinned_iters_mgr_);
70 }
71 auto new_wrapper = children_.back();
72 AddToMinHeapOrCheckStatus(&new_wrapper);
73 if (new_wrapper.Valid()) {
74 current_ = CurrentForward();
75 }
76 }
77
~MergingIterator()78 ~MergingIterator() override {
79 for (auto& child : children_) {
80 child.DeleteIter(is_arena_mode_);
81 }
82 }
83
Valid() const84 bool Valid() const override { return current_ != nullptr && status_.ok(); }
85
status() const86 Status status() const override { return status_; }
87
SeekToFirst()88 void SeekToFirst() override {
89 ClearHeaps();
90 status_ = Status::OK();
91 for (auto& child : children_) {
92 child.SeekToFirst();
93 AddToMinHeapOrCheckStatus(&child);
94 }
95 direction_ = kForward;
96 current_ = CurrentForward();
97 }
98
SeekToLast()99 void SeekToLast() override {
100 ClearHeaps();
101 InitMaxHeap();
102 status_ = Status::OK();
103 for (auto& child : children_) {
104 child.SeekToLast();
105 AddToMaxHeapOrCheckStatus(&child);
106 }
107 direction_ = kReverse;
108 current_ = CurrentReverse();
109 }
110
Seek(const Slice & target)111 void Seek(const Slice& target) override {
112 ClearHeaps();
113 status_ = Status::OK();
114 for (auto& child : children_) {
115 {
116 PERF_TIMER_GUARD(seek_child_seek_time);
117 child.Seek(target);
118 }
119
120 PERF_COUNTER_ADD(seek_child_seek_count, 1);
121 {
122 // Strictly, we timed slightly more than min heap operation,
123 // but these operations are very cheap.
124 PERF_TIMER_GUARD(seek_min_heap_time);
125 AddToMinHeapOrCheckStatus(&child);
126 }
127 }
128 direction_ = kForward;
129 {
130 PERF_TIMER_GUARD(seek_min_heap_time);
131 current_ = CurrentForward();
132 }
133 }
134
SeekForPrev(const Slice & target)135 void SeekForPrev(const Slice& target) override {
136 ClearHeaps();
137 InitMaxHeap();
138 status_ = Status::OK();
139
140 for (auto& child : children_) {
141 {
142 PERF_TIMER_GUARD(seek_child_seek_time);
143 child.SeekForPrev(target);
144 }
145 PERF_COUNTER_ADD(seek_child_seek_count, 1);
146
147 {
148 PERF_TIMER_GUARD(seek_max_heap_time);
149 AddToMaxHeapOrCheckStatus(&child);
150 }
151 }
152 direction_ = kReverse;
153 {
154 PERF_TIMER_GUARD(seek_max_heap_time);
155 current_ = CurrentReverse();
156 }
157 }
158
Next()159 void Next() override {
160 assert(Valid());
161
162 // Ensure that all children are positioned after key().
163 // If we are moving in the forward direction, it is already
164 // true for all of the non-current children since current_ is
165 // the smallest child and key() == current_->key().
166 if (direction_ != kForward) {
167 SwitchToForward();
168 // The loop advanced all non-current children to be > key() so current_
169 // should still be strictly the smallest key.
170 }
171
172 // For the heap modifications below to be correct, current_ must be the
173 // current top of the heap.
174 assert(current_ == CurrentForward());
175
176 // as the current points to the current record. move the iterator forward.
177 current_->Next();
178 if (current_->Valid()) {
179 // current is still valid after the Next() call above. Call
180 // replace_top() to restore the heap property. When the same child
181 // iterator yields a sequence of keys, this is cheap.
182 assert(current_->status().ok());
183 minHeap_.replace_top(current_);
184 } else {
185 // current stopped being valid, remove it from the heap.
186 considerStatus(current_->status());
187 minHeap_.pop();
188 }
189 current_ = CurrentForward();
190 }
191
NextAndGetResult(IterateResult * result)192 bool NextAndGetResult(IterateResult* result) override {
193 Next();
194 bool is_valid = Valid();
195 if (is_valid) {
196 result->key = key();
197 result->may_be_out_of_upper_bound = MayBeOutOfUpperBound();
198 }
199 return is_valid;
200 }
201
Prev()202 void Prev() override {
203 assert(Valid());
204 // Ensure that all children are positioned before key().
205 // If we are moving in the reverse direction, it is already
206 // true for all of the non-current children since current_ is
207 // the largest child and key() == current_->key().
208 if (direction_ != kReverse) {
209 // Otherwise, retreat the non-current children. We retreat current_
210 // just after the if-block.
211 SwitchToBackward();
212 }
213
214 // For the heap modifications below to be correct, current_ must be the
215 // current top of the heap.
216 assert(current_ == CurrentReverse());
217
218 current_->Prev();
219 if (current_->Valid()) {
220 // current is still valid after the Prev() call above. Call
221 // replace_top() to restore the heap property. When the same child
222 // iterator yields a sequence of keys, this is cheap.
223 assert(current_->status().ok());
224 maxHeap_->replace_top(current_);
225 } else {
226 // current stopped being valid, remove it from the heap.
227 considerStatus(current_->status());
228 maxHeap_->pop();
229 }
230 current_ = CurrentReverse();
231 }
232
key() const233 Slice key() const override {
234 assert(Valid());
235 return current_->key();
236 }
237
value() const238 Slice value() const override {
239 assert(Valid());
240 return current_->value();
241 }
242
243 // Here we simply relay MayBeOutOfLowerBound/MayBeOutOfUpperBound result
244 // from current child iterator. Potentially as long as one of child iterator
245 // report out of bound is not possible, we know current key is within bound.
246
MayBeOutOfLowerBound()247 bool MayBeOutOfLowerBound() override {
248 assert(Valid());
249 return current_->MayBeOutOfLowerBound();
250 }
251
MayBeOutOfUpperBound()252 bool MayBeOutOfUpperBound() override {
253 assert(Valid());
254 return current_->MayBeOutOfUpperBound();
255 }
256
SetPinnedItersMgr(PinnedIteratorsManager * pinned_iters_mgr)257 void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
258 pinned_iters_mgr_ = pinned_iters_mgr;
259 for (auto& child : children_) {
260 child.SetPinnedItersMgr(pinned_iters_mgr);
261 }
262 }
263
IsKeyPinned() const264 bool IsKeyPinned() const override {
265 assert(Valid());
266 return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
267 current_->IsKeyPinned();
268 }
269
IsValuePinned() const270 bool IsValuePinned() const override {
271 assert(Valid());
272 return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
273 current_->IsValuePinned();
274 }
275
276 private:
277 // Clears heaps for both directions, used when changing direction or seeking
278 void ClearHeaps();
279 // Ensures that maxHeap_ is initialized when starting to go in the reverse
280 // direction
281 void InitMaxHeap();
282
283 bool is_arena_mode_;
284 const InternalKeyComparator* comparator_;
285 autovector<IteratorWrapper, kNumIterReserve> children_;
286
287 // Cached pointer to child iterator with the current key, or nullptr if no
288 // child iterators are valid. This is the top of minHeap_ or maxHeap_
289 // depending on the direction.
290 IteratorWrapper* current_;
291 // If any of the children have non-ok status, this is one of them.
292 Status status_;
293 // Which direction is the iterator moving?
294 enum Direction {
295 kForward,
296 kReverse
297 };
298 Direction direction_;
299 MergerMinIterHeap minHeap_;
300 bool prefix_seek_mode_;
301
302 // Max heap is used for reverse iteration, which is way less common than
303 // forward. Lazily initialize it to save memory.
304 std::unique_ptr<MergerMaxIterHeap> maxHeap_;
305 PinnedIteratorsManager* pinned_iters_mgr_;
306
307 // In forward direction, process a child that is not in the min heap.
308 // If valid, add to the min heap. Otherwise, check status.
309 void AddToMinHeapOrCheckStatus(IteratorWrapper*);
310
311 // In backward direction, process a child that is not in the max heap.
312 // If valid, add to the min heap. Otherwise, check status.
313 void AddToMaxHeapOrCheckStatus(IteratorWrapper*);
314
315 void SwitchToForward();
316
317 // Switch the direction from forward to backward without changing the
318 // position. Iterator should still be valid.
319 void SwitchToBackward();
320
CurrentForward() const321 IteratorWrapper* CurrentForward() const {
322 assert(direction_ == kForward);
323 return !minHeap_.empty() ? minHeap_.top() : nullptr;
324 }
325
CurrentReverse() const326 IteratorWrapper* CurrentReverse() const {
327 assert(direction_ == kReverse);
328 assert(maxHeap_);
329 return !maxHeap_->empty() ? maxHeap_->top() : nullptr;
330 }
331 };
332
AddToMinHeapOrCheckStatus(IteratorWrapper * child)333 void MergingIterator::AddToMinHeapOrCheckStatus(IteratorWrapper* child) {
334 if (child->Valid()) {
335 assert(child->status().ok());
336 minHeap_.push(child);
337 } else {
338 considerStatus(child->status());
339 }
340 }
341
AddToMaxHeapOrCheckStatus(IteratorWrapper * child)342 void MergingIterator::AddToMaxHeapOrCheckStatus(IteratorWrapper* child) {
343 if (child->Valid()) {
344 assert(child->status().ok());
345 maxHeap_->push(child);
346 } else {
347 considerStatus(child->status());
348 }
349 }
350
SwitchToForward()351 void MergingIterator::SwitchToForward() {
352 // Otherwise, advance the non-current children. We advance current_
353 // just after the if-block.
354 ClearHeaps();
355 Slice target = key();
356 for (auto& child : children_) {
357 if (&child != current_) {
358 child.Seek(target);
359 if (child.Valid() && comparator_->Equal(target, child.key())) {
360 assert(child.status().ok());
361 child.Next();
362 }
363 }
364 AddToMinHeapOrCheckStatus(&child);
365 }
366 direction_ = kForward;
367 }
368
SwitchToBackward()369 void MergingIterator::SwitchToBackward() {
370 ClearHeaps();
371 InitMaxHeap();
372 Slice target = key();
373 for (auto& child : children_) {
374 if (&child != current_) {
375 child.SeekForPrev(target);
376 TEST_SYNC_POINT_CALLBACK("MergeIterator::Prev:BeforePrev", &child);
377 if (child.Valid() && comparator_->Equal(target, child.key())) {
378 assert(child.status().ok());
379 child.Prev();
380 }
381 }
382 AddToMaxHeapOrCheckStatus(&child);
383 }
384 direction_ = kReverse;
385 if (!prefix_seek_mode_) {
386 // Note that we don't do assert(current_ == CurrentReverse()) here
387 // because it is possible to have some keys larger than the seek-key
388 // inserted between Seek() and SeekToLast(), which makes current_ not
389 // equal to CurrentReverse().
390 current_ = CurrentReverse();
391 }
392 assert(current_ == CurrentReverse());
393 }
394
ClearHeaps()395 void MergingIterator::ClearHeaps() {
396 minHeap_.clear();
397 if (maxHeap_) {
398 maxHeap_->clear();
399 }
400 }
401
InitMaxHeap()402 void MergingIterator::InitMaxHeap() {
403 if (!maxHeap_) {
404 maxHeap_.reset(new MergerMaxIterHeap(comparator_));
405 }
406 }
407
NewMergingIterator(const InternalKeyComparator * cmp,InternalIterator ** list,int n,Arena * arena,bool prefix_seek_mode)408 InternalIterator* NewMergingIterator(const InternalKeyComparator* cmp,
409 InternalIterator** list, int n,
410 Arena* arena, bool prefix_seek_mode) {
411 assert(n >= 0);
412 if (n == 0) {
413 return NewEmptyInternalIterator<Slice>(arena);
414 } else if (n == 1) {
415 return list[0];
416 } else {
417 if (arena == nullptr) {
418 return new MergingIterator(cmp, list, n, false, prefix_seek_mode);
419 } else {
420 auto mem = arena->AllocateAligned(sizeof(MergingIterator));
421 return new (mem) MergingIterator(cmp, list, n, true, prefix_seek_mode);
422 }
423 }
424 }
425
MergeIteratorBuilder(const InternalKeyComparator * comparator,Arena * a,bool prefix_seek_mode)426 MergeIteratorBuilder::MergeIteratorBuilder(
427 const InternalKeyComparator* comparator, Arena* a, bool prefix_seek_mode)
428 : first_iter(nullptr), use_merging_iter(false), arena(a) {
429 auto mem = arena->AllocateAligned(sizeof(MergingIterator));
430 merge_iter =
431 new (mem) MergingIterator(comparator, nullptr, 0, true, prefix_seek_mode);
432 }
433
~MergeIteratorBuilder()434 MergeIteratorBuilder::~MergeIteratorBuilder() {
435 if (first_iter != nullptr) {
436 first_iter->~InternalIterator();
437 }
438 if (merge_iter != nullptr) {
439 merge_iter->~MergingIterator();
440 }
441 }
442
AddIterator(InternalIterator * iter)443 void MergeIteratorBuilder::AddIterator(InternalIterator* iter) {
444 if (!use_merging_iter && first_iter != nullptr) {
445 merge_iter->AddIterator(first_iter);
446 use_merging_iter = true;
447 first_iter = nullptr;
448 }
449 if (use_merging_iter) {
450 merge_iter->AddIterator(iter);
451 } else {
452 first_iter = iter;
453 }
454 }
455
Finish()456 InternalIterator* MergeIteratorBuilder::Finish() {
457 InternalIterator* ret = nullptr;
458 if (!use_merging_iter) {
459 ret = first_iter;
460 first_iter = nullptr;
461 } else {
462 ret = merge_iter;
463 merge_iter = nullptr;
464 }
465 return ret;
466 }
467
468 } // namespace ROCKSDB_NAMESPACE
469