1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "table/merging_iterator.h"
11 #include <string>
12 #include <vector>
13 #include "db/dbformat.h"
14 #include "db/pinned_iterators_manager.h"
15 #include "memory/arena.h"
16 #include "monitoring/perf_context_imp.h"
17 #include "rocksdb/comparator.h"
18 #include "rocksdb/iterator.h"
19 #include "rocksdb/options.h"
20 #include "table/internal_iterator.h"
21 #include "table/iter_heap.h"
22 #include "table/iterator_wrapper.h"
23 #include "test_util/sync_point.h"
24 #include "util/autovector.h"
25 #include "util/heap.h"
26 #include "util/stop_watch.h"
27
28 namespace rocksdb {
29 // Without anonymous namespace here, we fail the warning -Wmissing-prototypes
30 namespace {
31 typedef BinaryHeap<IteratorWrapper*, MaxIteratorComparator> MergerMaxIterHeap;
32 typedef BinaryHeap<IteratorWrapper*, MinIteratorComparator> MergerMinIterHeap;
33 } // namespace
34
35 const size_t kNumIterReserve = 4;
36
37 class MergingIterator : public InternalIterator {
38 public:
MergingIterator(const InternalKeyComparator * comparator,InternalIterator ** children,int n,bool is_arena_mode,bool prefix_seek_mode)39 MergingIterator(const InternalKeyComparator* comparator,
40 InternalIterator** children, int n, bool is_arena_mode,
41 bool prefix_seek_mode)
42 : is_arena_mode_(is_arena_mode),
43 comparator_(comparator),
44 current_(nullptr),
45 direction_(kForward),
46 minHeap_(comparator_),
47 prefix_seek_mode_(prefix_seek_mode),
48 pinned_iters_mgr_(nullptr) {
49 children_.resize(n);
50 for (int i = 0; i < n; i++) {
51 children_[i].Set(children[i]);
52 }
53 for (auto& child : children_) {
54 AddToMinHeapOrCheckStatus(&child);
55 }
56 current_ = CurrentForward();
57 }
58
considerStatus(Status s)59 void considerStatus(Status s) {
60 if (!s.ok() && status_.ok()) {
61 status_ = s;
62 }
63 }
64
AddIterator(InternalIterator * iter)65 virtual void AddIterator(InternalIterator* iter) {
66 assert(direction_ == kForward);
67 children_.emplace_back(iter);
68 if (pinned_iters_mgr_) {
69 iter->SetPinnedItersMgr(pinned_iters_mgr_);
70 }
71 auto new_wrapper = children_.back();
72 AddToMinHeapOrCheckStatus(&new_wrapper);
73 if (new_wrapper.Valid()) {
74 current_ = CurrentForward();
75 }
76 }
77
~MergingIterator()78 ~MergingIterator() override {
79 for (auto& child : children_) {
80 child.DeleteIter(is_arena_mode_);
81 }
82 }
83
Valid() const84 bool Valid() const override { return current_ != nullptr && status_.ok(); }
85
status() const86 Status status() const override { return status_; }
87
SeekToFirst()88 void SeekToFirst() override {
89 ClearHeaps();
90 status_ = Status::OK();
91 for (auto& child : children_) {
92 child.SeekToFirst();
93 AddToMinHeapOrCheckStatus(&child);
94 }
95 direction_ = kForward;
96 current_ = CurrentForward();
97 }
98
SeekToLast()99 void SeekToLast() override {
100 ClearHeaps();
101 InitMaxHeap();
102 status_ = Status::OK();
103 for (auto& child : children_) {
104 child.SeekToLast();
105 AddToMaxHeapOrCheckStatus(&child);
106 }
107 direction_ = kReverse;
108 current_ = CurrentReverse();
109 }
110
Seek(const Slice & target)111 void Seek(const Slice& target) override {
112 ClearHeaps();
113 status_ = Status::OK();
114 for (auto& child : children_) {
115 {
116 PERF_TIMER_GUARD(seek_child_seek_time);
117 child.Seek(target);
118 }
119
120 PERF_COUNTER_ADD(seek_child_seek_count, 1);
121 {
122 // Strictly, we timed slightly more than min heap operation,
123 // but these operations are very cheap.
124 PERF_TIMER_GUARD(seek_min_heap_time);
125 AddToMinHeapOrCheckStatus(&child);
126 }
127 }
128 direction_ = kForward;
129 {
130 PERF_TIMER_GUARD(seek_min_heap_time);
131 current_ = CurrentForward();
132 }
133 }
134
SeekForPrev(const Slice & target)135 void SeekForPrev(const Slice& target) override {
136 ClearHeaps();
137 InitMaxHeap();
138 status_ = Status::OK();
139
140 for (auto& child : children_) {
141 {
142 PERF_TIMER_GUARD(seek_child_seek_time);
143 child.SeekForPrev(target);
144 }
145 PERF_COUNTER_ADD(seek_child_seek_count, 1);
146
147 {
148 PERF_TIMER_GUARD(seek_max_heap_time);
149 AddToMaxHeapOrCheckStatus(&child);
150 }
151 }
152 direction_ = kReverse;
153 {
154 PERF_TIMER_GUARD(seek_max_heap_time);
155 current_ = CurrentReverse();
156 }
157 }
158
Next()159 void Next() override {
160 assert(Valid());
161
162 // Ensure that all children are positioned after key().
163 // If we are moving in the forward direction, it is already
164 // true for all of the non-current children since current_ is
165 // the smallest child and key() == current_->key().
166 if (direction_ != kForward) {
167 SwitchToForward();
168 // The loop advanced all non-current children to be > key() so current_
169 // should still be strictly the smallest key.
170 assert(current_ == CurrentForward());
171 }
172
173 // For the heap modifications below to be correct, current_ must be the
174 // current top of the heap.
175 assert(current_ == CurrentForward());
176
177 // as the current points to the current record. move the iterator forward.
178 current_->Next();
179 if (current_->Valid()) {
180 // current is still valid after the Next() call above. Call
181 // replace_top() to restore the heap property. When the same child
182 // iterator yields a sequence of keys, this is cheap.
183 assert(current_->status().ok());
184 minHeap_.replace_top(current_);
185 } else {
186 // current stopped being valid, remove it from the heap.
187 considerStatus(current_->status());
188 minHeap_.pop();
189 }
190 current_ = CurrentForward();
191 }
192
NextAndGetResult(IterateResult * result)193 bool NextAndGetResult(IterateResult* result) override {
194 Next();
195 bool is_valid = Valid();
196 if (is_valid) {
197 result->key = key();
198 result->may_be_out_of_upper_bound = MayBeOutOfUpperBound();
199 }
200 return is_valid;
201 }
202
Prev()203 void Prev() override {
204 assert(Valid());
205 // Ensure that all children are positioned before key().
206 // If we are moving in the reverse direction, it is already
207 // true for all of the non-current children since current_ is
208 // the largest child and key() == current_->key().
209 if (direction_ != kReverse) {
210 // Otherwise, retreat the non-current children. We retreat current_
211 // just after the if-block.
212 SwitchToBackward();
213 }
214
215 // For the heap modifications below to be correct, current_ must be the
216 // current top of the heap.
217 assert(current_ == CurrentReverse());
218
219 current_->Prev();
220 if (current_->Valid()) {
221 // current is still valid after the Prev() call above. Call
222 // replace_top() to restore the heap property. When the same child
223 // iterator yields a sequence of keys, this is cheap.
224 assert(current_->status().ok());
225 maxHeap_->replace_top(current_);
226 } else {
227 // current stopped being valid, remove it from the heap.
228 considerStatus(current_->status());
229 maxHeap_->pop();
230 }
231 current_ = CurrentReverse();
232 }
233
key() const234 Slice key() const override {
235 assert(Valid());
236 return current_->key();
237 }
238
value() const239 Slice value() const override {
240 assert(Valid());
241 return current_->value();
242 }
243
244 // Here we simply relay MayBeOutOfLowerBound/MayBeOutOfUpperBound result
245 // from current child iterator. Potentially as long as one of child iterator
246 // report out of bound is not possible, we know current key is within bound.
247
MayBeOutOfLowerBound()248 bool MayBeOutOfLowerBound() override {
249 assert(Valid());
250 return current_->MayBeOutOfLowerBound();
251 }
252
MayBeOutOfUpperBound()253 bool MayBeOutOfUpperBound() override {
254 assert(Valid());
255 return current_->MayBeOutOfUpperBound();
256 }
257
SetPinnedItersMgr(PinnedIteratorsManager * pinned_iters_mgr)258 void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
259 pinned_iters_mgr_ = pinned_iters_mgr;
260 for (auto& child : children_) {
261 child.SetPinnedItersMgr(pinned_iters_mgr);
262 }
263 }
264
IsKeyPinned() const265 bool IsKeyPinned() const override {
266 assert(Valid());
267 return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
268 current_->IsKeyPinned();
269 }
270
IsValuePinned() const271 bool IsValuePinned() const override {
272 assert(Valid());
273 return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
274 current_->IsValuePinned();
275 }
276
277 private:
278 // Clears heaps for both directions, used when changing direction or seeking
279 void ClearHeaps();
280 // Ensures that maxHeap_ is initialized when starting to go in the reverse
281 // direction
282 void InitMaxHeap();
283
284 bool is_arena_mode_;
285 const InternalKeyComparator* comparator_;
286 autovector<IteratorWrapper, kNumIterReserve> children_;
287
288 // Cached pointer to child iterator with the current key, or nullptr if no
289 // child iterators are valid. This is the top of minHeap_ or maxHeap_
290 // depending on the direction.
291 IteratorWrapper* current_;
292 // If any of the children have non-ok status, this is one of them.
293 Status status_;
294 // Which direction is the iterator moving?
295 enum Direction {
296 kForward,
297 kReverse
298 };
299 Direction direction_;
300 MergerMinIterHeap minHeap_;
301 bool prefix_seek_mode_;
302
303 // Max heap is used for reverse iteration, which is way less common than
304 // forward. Lazily initialize it to save memory.
305 std::unique_ptr<MergerMaxIterHeap> maxHeap_;
306 PinnedIteratorsManager* pinned_iters_mgr_;
307
308 // In forward direction, process a child that is not in the min heap.
309 // If valid, add to the min heap. Otherwise, check status.
310 void AddToMinHeapOrCheckStatus(IteratorWrapper*);
311
312 // In backward direction, process a child that is not in the max heap.
313 // If valid, add to the min heap. Otherwise, check status.
314 void AddToMaxHeapOrCheckStatus(IteratorWrapper*);
315
316 void SwitchToForward();
317
318 // Switch the direction from forward to backward without changing the
319 // position. Iterator should still be valid.
320 void SwitchToBackward();
321
CurrentForward() const322 IteratorWrapper* CurrentForward() const {
323 assert(direction_ == kForward);
324 return !minHeap_.empty() ? minHeap_.top() : nullptr;
325 }
326
CurrentReverse() const327 IteratorWrapper* CurrentReverse() const {
328 assert(direction_ == kReverse);
329 assert(maxHeap_);
330 return !maxHeap_->empty() ? maxHeap_->top() : nullptr;
331 }
332 };
333
AddToMinHeapOrCheckStatus(IteratorWrapper * child)334 void MergingIterator::AddToMinHeapOrCheckStatus(IteratorWrapper* child) {
335 if (child->Valid()) {
336 assert(child->status().ok());
337 minHeap_.push(child);
338 } else {
339 considerStatus(child->status());
340 }
341 }
342
AddToMaxHeapOrCheckStatus(IteratorWrapper * child)343 void MergingIterator::AddToMaxHeapOrCheckStatus(IteratorWrapper* child) {
344 if (child->Valid()) {
345 assert(child->status().ok());
346 maxHeap_->push(child);
347 } else {
348 considerStatus(child->status());
349 }
350 }
351
SwitchToForward()352 void MergingIterator::SwitchToForward() {
353 // Otherwise, advance the non-current children. We advance current_
354 // just after the if-block.
355 ClearHeaps();
356 Slice target = key();
357 for (auto& child : children_) {
358 if (&child != current_) {
359 child.Seek(target);
360 if (child.Valid() && comparator_->Equal(target, child.key())) {
361 assert(child.status().ok());
362 child.Next();
363 }
364 }
365 AddToMinHeapOrCheckStatus(&child);
366 }
367 direction_ = kForward;
368 }
369
SwitchToBackward()370 void MergingIterator::SwitchToBackward() {
371 ClearHeaps();
372 InitMaxHeap();
373 Slice target = key();
374 for (auto& child : children_) {
375 if (&child != current_) {
376 child.SeekForPrev(target);
377 TEST_SYNC_POINT_CALLBACK("MergeIterator::Prev:BeforePrev", &child);
378 if (child.Valid() && comparator_->Equal(target, child.key())) {
379 assert(child.status().ok());
380 child.Prev();
381 }
382 }
383 AddToMaxHeapOrCheckStatus(&child);
384 }
385 direction_ = kReverse;
386 if (!prefix_seek_mode_) {
387 // Note that we don't do assert(current_ == CurrentReverse()) here
388 // because it is possible to have some keys larger than the seek-key
389 // inserted between Seek() and SeekToLast(), which makes current_ not
390 // equal to CurrentReverse().
391 current_ = CurrentReverse();
392 }
393 assert(current_ == CurrentReverse());
394 }
395
ClearHeaps()396 void MergingIterator::ClearHeaps() {
397 minHeap_.clear();
398 if (maxHeap_) {
399 maxHeap_->clear();
400 }
401 }
402
InitMaxHeap()403 void MergingIterator::InitMaxHeap() {
404 if (!maxHeap_) {
405 maxHeap_.reset(new MergerMaxIterHeap(comparator_));
406 }
407 }
408
NewMergingIterator(const InternalKeyComparator * cmp,InternalIterator ** list,int n,Arena * arena,bool prefix_seek_mode)409 InternalIterator* NewMergingIterator(const InternalKeyComparator* cmp,
410 InternalIterator** list, int n,
411 Arena* arena, bool prefix_seek_mode) {
412 assert(n >= 0);
413 if (n == 0) {
414 return NewEmptyInternalIterator<Slice>(arena);
415 } else if (n == 1) {
416 return list[0];
417 } else {
418 if (arena == nullptr) {
419 return new MergingIterator(cmp, list, n, false, prefix_seek_mode);
420 } else {
421 auto mem = arena->AllocateAligned(sizeof(MergingIterator));
422 return new (mem) MergingIterator(cmp, list, n, true, prefix_seek_mode);
423 }
424 }
425 }
426
MergeIteratorBuilder(const InternalKeyComparator * comparator,Arena * a,bool prefix_seek_mode)427 MergeIteratorBuilder::MergeIteratorBuilder(
428 const InternalKeyComparator* comparator, Arena* a, bool prefix_seek_mode)
429 : first_iter(nullptr), use_merging_iter(false), arena(a) {
430 auto mem = arena->AllocateAligned(sizeof(MergingIterator));
431 merge_iter =
432 new (mem) MergingIterator(comparator, nullptr, 0, true, prefix_seek_mode);
433 }
434
~MergeIteratorBuilder()435 MergeIteratorBuilder::~MergeIteratorBuilder() {
436 if (first_iter != nullptr) {
437 first_iter->~InternalIterator();
438 }
439 if (merge_iter != nullptr) {
440 merge_iter->~MergingIterator();
441 }
442 }
443
AddIterator(InternalIterator * iter)444 void MergeIteratorBuilder::AddIterator(InternalIterator* iter) {
445 if (!use_merging_iter && first_iter != nullptr) {
446 merge_iter->AddIterator(first_iter);
447 use_merging_iter = true;
448 first_iter = nullptr;
449 }
450 if (use_merging_iter) {
451 merge_iter->AddIterator(iter);
452 } else {
453 first_iter = iter;
454 }
455 }
456
Finish()457 InternalIterator* MergeIteratorBuilder::Finish() {
458 InternalIterator* ret = nullptr;
459 if (!use_merging_iter) {
460 ret = first_iter;
461 first_iter = nullptr;
462 } else {
463 ret = merge_iter;
464 merge_iter = nullptr;
465 }
466 return ret;
467 }
468
469 } // namespace rocksdb
470