1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "table/merging_iterator.h"
11 #include <string>
12 #include <vector>
13 #include "db/dbformat.h"
14 #include "db/pinned_iterators_manager.h"
15 #include "memory/arena.h"
16 #include "monitoring/perf_context_imp.h"
17 #include "rocksdb/comparator.h"
18 #include "rocksdb/iterator.h"
19 #include "rocksdb/options.h"
20 #include "table/internal_iterator.h"
21 #include "table/iter_heap.h"
22 #include "table/iterator_wrapper.h"
23 #include "test_util/sync_point.h"
24 #include "util/autovector.h"
25 #include "util/heap.h"
26 #include "util/stop_watch.h"
27
28 namespace ROCKSDB_NAMESPACE {
29 // Without anonymous namespace here, we fail the warning -Wmissing-prototypes
30 namespace {
31 typedef BinaryHeap<IteratorWrapper*, MaxIteratorComparator> MergerMaxIterHeap;
32 typedef BinaryHeap<IteratorWrapper*, MinIteratorComparator> MergerMinIterHeap;
33 } // namespace
34
35 const size_t kNumIterReserve = 4;
36
37 class MergingIterator : public InternalIterator {
38 public:
MergingIterator(const InternalKeyComparator * comparator,InternalIterator ** children,int n,bool is_arena_mode,bool prefix_seek_mode)39 MergingIterator(const InternalKeyComparator* comparator,
40 InternalIterator** children, int n, bool is_arena_mode,
41 bool prefix_seek_mode)
42 : is_arena_mode_(is_arena_mode),
43 comparator_(comparator),
44 current_(nullptr),
45 direction_(kForward),
46 minHeap_(comparator_),
47 prefix_seek_mode_(prefix_seek_mode),
48 pinned_iters_mgr_(nullptr) {
49 children_.resize(n);
50 for (int i = 0; i < n; i++) {
51 children_[i].Set(children[i]);
52 }
53 for (auto& child : children_) {
54 AddToMinHeapOrCheckStatus(&child);
55 }
56 current_ = CurrentForward();
57 }
58
considerStatus(Status s)59 void considerStatus(Status s) {
60 if (!s.ok() && status_.ok()) {
61 status_ = s;
62 }
63 }
64
AddIterator(InternalIterator * iter)65 virtual void AddIterator(InternalIterator* iter) {
66 assert(direction_ == kForward);
67 children_.emplace_back(iter);
68 if (pinned_iters_mgr_) {
69 iter->SetPinnedItersMgr(pinned_iters_mgr_);
70 }
71 auto new_wrapper = children_.back();
72 AddToMinHeapOrCheckStatus(&new_wrapper);
73 if (new_wrapper.Valid()) {
74 current_ = CurrentForward();
75 }
76 }
77
~MergingIterator()78 ~MergingIterator() override {
79 for (auto& child : children_) {
80 child.DeleteIter(is_arena_mode_);
81 }
82 status_.PermitUncheckedError();
83 }
84
Valid() const85 bool Valid() const override { return current_ != nullptr && status_.ok(); }
86
status() const87 Status status() const override { return status_; }
88
SeekToFirst()89 void SeekToFirst() override {
90 ClearHeaps();
91 status_ = Status::OK();
92 for (auto& child : children_) {
93 child.SeekToFirst();
94 AddToMinHeapOrCheckStatus(&child);
95 }
96 direction_ = kForward;
97 current_ = CurrentForward();
98 }
99
SeekToLast()100 void SeekToLast() override {
101 ClearHeaps();
102 InitMaxHeap();
103 status_ = Status::OK();
104 for (auto& child : children_) {
105 child.SeekToLast();
106 AddToMaxHeapOrCheckStatus(&child);
107 }
108 direction_ = kReverse;
109 current_ = CurrentReverse();
110 }
111
Seek(const Slice & target)112 void Seek(const Slice& target) override {
113 ClearHeaps();
114 status_ = Status::OK();
115 for (auto& child : children_) {
116 {
117 PERF_TIMER_GUARD(seek_child_seek_time);
118 child.Seek(target);
119 }
120
121 PERF_COUNTER_ADD(seek_child_seek_count, 1);
122 {
123 // Strictly, we timed slightly more than min heap operation,
124 // but these operations are very cheap.
125 PERF_TIMER_GUARD(seek_min_heap_time);
126 AddToMinHeapOrCheckStatus(&child);
127 }
128 }
129 direction_ = kForward;
130 {
131 PERF_TIMER_GUARD(seek_min_heap_time);
132 current_ = CurrentForward();
133 }
134 }
135
SeekForPrev(const Slice & target)136 void SeekForPrev(const Slice& target) override {
137 ClearHeaps();
138 InitMaxHeap();
139 status_ = Status::OK();
140
141 for (auto& child : children_) {
142 {
143 PERF_TIMER_GUARD(seek_child_seek_time);
144 child.SeekForPrev(target);
145 }
146 PERF_COUNTER_ADD(seek_child_seek_count, 1);
147
148 {
149 PERF_TIMER_GUARD(seek_max_heap_time);
150 AddToMaxHeapOrCheckStatus(&child);
151 }
152 }
153 direction_ = kReverse;
154 {
155 PERF_TIMER_GUARD(seek_max_heap_time);
156 current_ = CurrentReverse();
157 }
158 }
159
Next()160 void Next() override {
161 assert(Valid());
162
163 // Ensure that all children are positioned after key().
164 // If we are moving in the forward direction, it is already
165 // true for all of the non-current children since current_ is
166 // the smallest child and key() == current_->key().
167 if (direction_ != kForward) {
168 SwitchToForward();
169 // The loop advanced all non-current children to be > key() so current_
170 // should still be strictly the smallest key.
171 }
172
173 // For the heap modifications below to be correct, current_ must be the
174 // current top of the heap.
175 assert(current_ == CurrentForward());
176
177 // as the current points to the current record. move the iterator forward.
178 current_->Next();
179 if (current_->Valid()) {
180 // current is still valid after the Next() call above. Call
181 // replace_top() to restore the heap property. When the same child
182 // iterator yields a sequence of keys, this is cheap.
183 assert(current_->status().ok());
184 minHeap_.replace_top(current_);
185 } else {
186 // current stopped being valid, remove it from the heap.
187 considerStatus(current_->status());
188 minHeap_.pop();
189 }
190 current_ = CurrentForward();
191 }
192
NextAndGetResult(IterateResult * result)193 bool NextAndGetResult(IterateResult* result) override {
194 Next();
195 bool is_valid = Valid();
196 if (is_valid) {
197 result->key = key();
198 result->bound_check_result = UpperBoundCheckResult();
199 result->value_prepared = current_->IsValuePrepared();
200 }
201 return is_valid;
202 }
203
Prev()204 void Prev() override {
205 assert(Valid());
206 // Ensure that all children are positioned before key().
207 // If we are moving in the reverse direction, it is already
208 // true for all of the non-current children since current_ is
209 // the largest child and key() == current_->key().
210 if (direction_ != kReverse) {
211 // Otherwise, retreat the non-current children. We retreat current_
212 // just after the if-block.
213 SwitchToBackward();
214 }
215
216 // For the heap modifications below to be correct, current_ must be the
217 // current top of the heap.
218 assert(current_ == CurrentReverse());
219
220 current_->Prev();
221 if (current_->Valid()) {
222 // current is still valid after the Prev() call above. Call
223 // replace_top() to restore the heap property. When the same child
224 // iterator yields a sequence of keys, this is cheap.
225 assert(current_->status().ok());
226 maxHeap_->replace_top(current_);
227 } else {
228 // current stopped being valid, remove it from the heap.
229 considerStatus(current_->status());
230 maxHeap_->pop();
231 }
232 current_ = CurrentReverse();
233 }
234
key() const235 Slice key() const override {
236 assert(Valid());
237 return current_->key();
238 }
239
value() const240 Slice value() const override {
241 assert(Valid());
242 return current_->value();
243 }
244
PrepareValue()245 bool PrepareValue() override {
246 assert(Valid());
247 if (current_->PrepareValue()) {
248 return true;
249 }
250
251 considerStatus(current_->status());
252 assert(!status_.ok());
253 return false;
254 }
255
256 // Here we simply relay MayBeOutOfLowerBound/MayBeOutOfUpperBound result
257 // from current child iterator. Potentially as long as one of child iterator
258 // report out of bound is not possible, we know current key is within bound.
259
MayBeOutOfLowerBound()260 bool MayBeOutOfLowerBound() override {
261 assert(Valid());
262 return current_->MayBeOutOfLowerBound();
263 }
264
UpperBoundCheckResult()265 IterBoundCheck UpperBoundCheckResult() override {
266 assert(Valid());
267 return current_->UpperBoundCheckResult();
268 }
269
SetPinnedItersMgr(PinnedIteratorsManager * pinned_iters_mgr)270 void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
271 pinned_iters_mgr_ = pinned_iters_mgr;
272 for (auto& child : children_) {
273 child.SetPinnedItersMgr(pinned_iters_mgr);
274 }
275 }
276
IsKeyPinned() const277 bool IsKeyPinned() const override {
278 assert(Valid());
279 return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
280 current_->IsKeyPinned();
281 }
282
IsValuePinned() const283 bool IsValuePinned() const override {
284 assert(Valid());
285 return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
286 current_->IsValuePinned();
287 }
288
289 private:
290 // Clears heaps for both directions, used when changing direction or seeking
291 void ClearHeaps();
292 // Ensures that maxHeap_ is initialized when starting to go in the reverse
293 // direction
294 void InitMaxHeap();
295
296 bool is_arena_mode_;
297 const InternalKeyComparator* comparator_;
298 autovector<IteratorWrapper, kNumIterReserve> children_;
299
300 // Cached pointer to child iterator with the current key, or nullptr if no
301 // child iterators are valid. This is the top of minHeap_ or maxHeap_
302 // depending on the direction.
303 IteratorWrapper* current_;
304 // If any of the children have non-ok status, this is one of them.
305 Status status_;
306 // Which direction is the iterator moving?
307 enum Direction {
308 kForward,
309 kReverse
310 };
311 Direction direction_;
312 MergerMinIterHeap minHeap_;
313 bool prefix_seek_mode_;
314
315 // Max heap is used for reverse iteration, which is way less common than
316 // forward. Lazily initialize it to save memory.
317 std::unique_ptr<MergerMaxIterHeap> maxHeap_;
318 PinnedIteratorsManager* pinned_iters_mgr_;
319
320 // In forward direction, process a child that is not in the min heap.
321 // If valid, add to the min heap. Otherwise, check status.
322 void AddToMinHeapOrCheckStatus(IteratorWrapper*);
323
324 // In backward direction, process a child that is not in the max heap.
325 // If valid, add to the min heap. Otherwise, check status.
326 void AddToMaxHeapOrCheckStatus(IteratorWrapper*);
327
328 void SwitchToForward();
329
330 // Switch the direction from forward to backward without changing the
331 // position. Iterator should still be valid.
332 void SwitchToBackward();
333
CurrentForward() const334 IteratorWrapper* CurrentForward() const {
335 assert(direction_ == kForward);
336 return !minHeap_.empty() ? minHeap_.top() : nullptr;
337 }
338
CurrentReverse() const339 IteratorWrapper* CurrentReverse() const {
340 assert(direction_ == kReverse);
341 assert(maxHeap_);
342 return !maxHeap_->empty() ? maxHeap_->top() : nullptr;
343 }
344 };
345
AddToMinHeapOrCheckStatus(IteratorWrapper * child)346 void MergingIterator::AddToMinHeapOrCheckStatus(IteratorWrapper* child) {
347 if (child->Valid()) {
348 assert(child->status().ok());
349 minHeap_.push(child);
350 } else {
351 considerStatus(child->status());
352 }
353 }
354
AddToMaxHeapOrCheckStatus(IteratorWrapper * child)355 void MergingIterator::AddToMaxHeapOrCheckStatus(IteratorWrapper* child) {
356 if (child->Valid()) {
357 assert(child->status().ok());
358 maxHeap_->push(child);
359 } else {
360 considerStatus(child->status());
361 }
362 }
363
SwitchToForward()364 void MergingIterator::SwitchToForward() {
365 // Otherwise, advance the non-current children. We advance current_
366 // just after the if-block.
367 ClearHeaps();
368 Slice target = key();
369 for (auto& child : children_) {
370 if (&child != current_) {
371 child.Seek(target);
372 if (child.Valid() && comparator_->Equal(target, child.key())) {
373 assert(child.status().ok());
374 child.Next();
375 }
376 }
377 AddToMinHeapOrCheckStatus(&child);
378 }
379 direction_ = kForward;
380 }
381
SwitchToBackward()382 void MergingIterator::SwitchToBackward() {
383 ClearHeaps();
384 InitMaxHeap();
385 Slice target = key();
386 for (auto& child : children_) {
387 if (&child != current_) {
388 child.SeekForPrev(target);
389 TEST_SYNC_POINT_CALLBACK("MergeIterator::Prev:BeforePrev", &child);
390 if (child.Valid() && comparator_->Equal(target, child.key())) {
391 assert(child.status().ok());
392 child.Prev();
393 }
394 }
395 AddToMaxHeapOrCheckStatus(&child);
396 }
397 direction_ = kReverse;
398 if (!prefix_seek_mode_) {
399 // Note that we don't do assert(current_ == CurrentReverse()) here
400 // because it is possible to have some keys larger than the seek-key
401 // inserted between Seek() and SeekToLast(), which makes current_ not
402 // equal to CurrentReverse().
403 current_ = CurrentReverse();
404 }
405 assert(current_ == CurrentReverse());
406 }
407
ClearHeaps()408 void MergingIterator::ClearHeaps() {
409 minHeap_.clear();
410 if (maxHeap_) {
411 maxHeap_->clear();
412 }
413 }
414
InitMaxHeap()415 void MergingIterator::InitMaxHeap() {
416 if (!maxHeap_) {
417 maxHeap_.reset(new MergerMaxIterHeap(comparator_));
418 }
419 }
420
NewMergingIterator(const InternalKeyComparator * cmp,InternalIterator ** list,int n,Arena * arena,bool prefix_seek_mode)421 InternalIterator* NewMergingIterator(const InternalKeyComparator* cmp,
422 InternalIterator** list, int n,
423 Arena* arena, bool prefix_seek_mode) {
424 assert(n >= 0);
425 if (n == 0) {
426 return NewEmptyInternalIterator<Slice>(arena);
427 } else if (n == 1) {
428 return list[0];
429 } else {
430 if (arena == nullptr) {
431 return new MergingIterator(cmp, list, n, false, prefix_seek_mode);
432 } else {
433 auto mem = arena->AllocateAligned(sizeof(MergingIterator));
434 return new (mem) MergingIterator(cmp, list, n, true, prefix_seek_mode);
435 }
436 }
437 }
438
MergeIteratorBuilder(const InternalKeyComparator * comparator,Arena * a,bool prefix_seek_mode)439 MergeIteratorBuilder::MergeIteratorBuilder(
440 const InternalKeyComparator* comparator, Arena* a, bool prefix_seek_mode)
441 : first_iter(nullptr), use_merging_iter(false), arena(a) {
442 auto mem = arena->AllocateAligned(sizeof(MergingIterator));
443 merge_iter =
444 new (mem) MergingIterator(comparator, nullptr, 0, true, prefix_seek_mode);
445 }
446
~MergeIteratorBuilder()447 MergeIteratorBuilder::~MergeIteratorBuilder() {
448 if (first_iter != nullptr) {
449 first_iter->~InternalIterator();
450 }
451 if (merge_iter != nullptr) {
452 merge_iter->~MergingIterator();
453 }
454 }
455
AddIterator(InternalIterator * iter)456 void MergeIteratorBuilder::AddIterator(InternalIterator* iter) {
457 if (!use_merging_iter && first_iter != nullptr) {
458 merge_iter->AddIterator(first_iter);
459 use_merging_iter = true;
460 first_iter = nullptr;
461 }
462 if (use_merging_iter) {
463 merge_iter->AddIterator(iter);
464 } else {
465 first_iter = iter;
466 }
467 }
468
Finish()469 InternalIterator* MergeIteratorBuilder::Finish() {
470 InternalIterator* ret = nullptr;
471 if (!use_merging_iter) {
472 ret = first_iter;
473 first_iter = nullptr;
474 } else {
475 ret = merge_iter;
476 merge_iter = nullptr;
477 }
478 return ret;
479 }
480
481 } // namespace ROCKSDB_NAMESPACE
482