1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "table/merging_iterator.h"
11 #include <string>
12 #include <vector>
13 #include "db/dbformat.h"
14 #include "db/pinned_iterators_manager.h"
15 #include "memory/arena.h"
16 #include "monitoring/perf_context_imp.h"
17 #include "rocksdb/comparator.h"
18 #include "rocksdb/iterator.h"
19 #include "rocksdb/options.h"
20 #include "table/internal_iterator.h"
21 #include "table/iter_heap.h"
22 #include "table/iterator_wrapper.h"
23 #include "test_util/sync_point.h"
24 #include "util/autovector.h"
25 #include "util/heap.h"
26 #include "util/stop_watch.h"
27 
28 namespace ROCKSDB_NAMESPACE {
29 // Without anonymous namespace here, we fail the warning -Wmissing-prototypes
30 namespace {
31 typedef BinaryHeap<IteratorWrapper*, MaxIteratorComparator> MergerMaxIterHeap;
32 typedef BinaryHeap<IteratorWrapper*, MinIteratorComparator> MergerMinIterHeap;
33 }  // namespace
34 
35 const size_t kNumIterReserve = 4;
36 
37 class MergingIterator : public InternalIterator {
38  public:
MergingIterator(const InternalKeyComparator * comparator,InternalIterator ** children,int n,bool is_arena_mode,bool prefix_seek_mode)39   MergingIterator(const InternalKeyComparator* comparator,
40                   InternalIterator** children, int n, bool is_arena_mode,
41                   bool prefix_seek_mode)
42       : is_arena_mode_(is_arena_mode),
43         comparator_(comparator),
44         current_(nullptr),
45         direction_(kForward),
46         minHeap_(comparator_),
47         prefix_seek_mode_(prefix_seek_mode),
48         pinned_iters_mgr_(nullptr) {
49     children_.resize(n);
50     for (int i = 0; i < n; i++) {
51       children_[i].Set(children[i]);
52     }
53     for (auto& child : children_) {
54       AddToMinHeapOrCheckStatus(&child);
55     }
56     current_ = CurrentForward();
57   }
58 
considerStatus(Status s)59   void considerStatus(Status s) {
60     if (!s.ok() && status_.ok()) {
61       status_ = s;
62     }
63   }
64 
AddIterator(InternalIterator * iter)65   virtual void AddIterator(InternalIterator* iter) {
66     assert(direction_ == kForward);
67     children_.emplace_back(iter);
68     if (pinned_iters_mgr_) {
69       iter->SetPinnedItersMgr(pinned_iters_mgr_);
70     }
71     auto new_wrapper = children_.back();
72     AddToMinHeapOrCheckStatus(&new_wrapper);
73     if (new_wrapper.Valid()) {
74       current_ = CurrentForward();
75     }
76   }
77 
~MergingIterator()78   ~MergingIterator() override {
79     for (auto& child : children_) {
80       child.DeleteIter(is_arena_mode_);
81     }
82   }
83 
Valid() const84   bool Valid() const override { return current_ != nullptr && status_.ok(); }
85 
status() const86   Status status() const override { return status_; }
87 
SeekToFirst()88   void SeekToFirst() override {
89     ClearHeaps();
90     status_ = Status::OK();
91     for (auto& child : children_) {
92       child.SeekToFirst();
93       AddToMinHeapOrCheckStatus(&child);
94     }
95     direction_ = kForward;
96     current_ = CurrentForward();
97   }
98 
SeekToLast()99   void SeekToLast() override {
100     ClearHeaps();
101     InitMaxHeap();
102     status_ = Status::OK();
103     for (auto& child : children_) {
104       child.SeekToLast();
105       AddToMaxHeapOrCheckStatus(&child);
106     }
107     direction_ = kReverse;
108     current_ = CurrentReverse();
109   }
110 
Seek(const Slice & target)111   void Seek(const Slice& target) override {
112     ClearHeaps();
113     status_ = Status::OK();
114     for (auto& child : children_) {
115       {
116         PERF_TIMER_GUARD(seek_child_seek_time);
117         child.Seek(target);
118       }
119 
120       PERF_COUNTER_ADD(seek_child_seek_count, 1);
121       {
122         // Strictly, we timed slightly more than min heap operation,
123         // but these operations are very cheap.
124         PERF_TIMER_GUARD(seek_min_heap_time);
125         AddToMinHeapOrCheckStatus(&child);
126       }
127     }
128     direction_ = kForward;
129     {
130       PERF_TIMER_GUARD(seek_min_heap_time);
131       current_ = CurrentForward();
132     }
133   }
134 
SeekForPrev(const Slice & target)135   void SeekForPrev(const Slice& target) override {
136     ClearHeaps();
137     InitMaxHeap();
138     status_ = Status::OK();
139 
140     for (auto& child : children_) {
141       {
142         PERF_TIMER_GUARD(seek_child_seek_time);
143         child.SeekForPrev(target);
144       }
145       PERF_COUNTER_ADD(seek_child_seek_count, 1);
146 
147       {
148         PERF_TIMER_GUARD(seek_max_heap_time);
149         AddToMaxHeapOrCheckStatus(&child);
150       }
151     }
152     direction_ = kReverse;
153     {
154       PERF_TIMER_GUARD(seek_max_heap_time);
155       current_ = CurrentReverse();
156     }
157   }
158 
Next()159   void Next() override {
160     assert(Valid());
161 
162     // Ensure that all children are positioned after key().
163     // If we are moving in the forward direction, it is already
164     // true for all of the non-current children since current_ is
165     // the smallest child and key() == current_->key().
166     if (direction_ != kForward) {
167       SwitchToForward();
168       // The loop advanced all non-current children to be > key() so current_
169       // should still be strictly the smallest key.
170     }
171 
172     // For the heap modifications below to be correct, current_ must be the
173     // current top of the heap.
174     assert(current_ == CurrentForward());
175 
176     // as the current points to the current record. move the iterator forward.
177     current_->Next();
178     if (current_->Valid()) {
179       // current is still valid after the Next() call above.  Call
180       // replace_top() to restore the heap property.  When the same child
181       // iterator yields a sequence of keys, this is cheap.
182       assert(current_->status().ok());
183       minHeap_.replace_top(current_);
184     } else {
185       // current stopped being valid, remove it from the heap.
186       considerStatus(current_->status());
187       minHeap_.pop();
188     }
189     current_ = CurrentForward();
190   }
191 
NextAndGetResult(IterateResult * result)192   bool NextAndGetResult(IterateResult* result) override {
193     Next();
194     bool is_valid = Valid();
195     if (is_valid) {
196       result->key = key();
197       result->may_be_out_of_upper_bound = MayBeOutOfUpperBound();
198     }
199     return is_valid;
200   }
201 
Prev()202   void Prev() override {
203     assert(Valid());
204     // Ensure that all children are positioned before key().
205     // If we are moving in the reverse direction, it is already
206     // true for all of the non-current children since current_ is
207     // the largest child and key() == current_->key().
208     if (direction_ != kReverse) {
209       // Otherwise, retreat the non-current children.  We retreat current_
210       // just after the if-block.
211       SwitchToBackward();
212     }
213 
214     // For the heap modifications below to be correct, current_ must be the
215     // current top of the heap.
216     assert(current_ == CurrentReverse());
217 
218     current_->Prev();
219     if (current_->Valid()) {
220       // current is still valid after the Prev() call above.  Call
221       // replace_top() to restore the heap property.  When the same child
222       // iterator yields a sequence of keys, this is cheap.
223       assert(current_->status().ok());
224       maxHeap_->replace_top(current_);
225     } else {
226       // current stopped being valid, remove it from the heap.
227       considerStatus(current_->status());
228       maxHeap_->pop();
229     }
230     current_ = CurrentReverse();
231   }
232 
key() const233   Slice key() const override {
234     assert(Valid());
235     return current_->key();
236   }
237 
value() const238   Slice value() const override {
239     assert(Valid());
240     return current_->value();
241   }
242 
243   // Here we simply relay MayBeOutOfLowerBound/MayBeOutOfUpperBound result
244   // from current child iterator. Potentially as long as one of child iterator
245   // report out of bound is not possible, we know current key is within bound.
246 
MayBeOutOfLowerBound()247   bool MayBeOutOfLowerBound() override {
248     assert(Valid());
249     return current_->MayBeOutOfLowerBound();
250   }
251 
MayBeOutOfUpperBound()252   bool MayBeOutOfUpperBound() override {
253     assert(Valid());
254     return current_->MayBeOutOfUpperBound();
255   }
256 
SetPinnedItersMgr(PinnedIteratorsManager * pinned_iters_mgr)257   void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
258     pinned_iters_mgr_ = pinned_iters_mgr;
259     for (auto& child : children_) {
260       child.SetPinnedItersMgr(pinned_iters_mgr);
261     }
262   }
263 
IsKeyPinned() const264   bool IsKeyPinned() const override {
265     assert(Valid());
266     return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
267            current_->IsKeyPinned();
268   }
269 
IsValuePinned() const270   bool IsValuePinned() const override {
271     assert(Valid());
272     return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
273            current_->IsValuePinned();
274   }
275 
276  private:
277   // Clears heaps for both directions, used when changing direction or seeking
278   void ClearHeaps();
279   // Ensures that maxHeap_ is initialized when starting to go in the reverse
280   // direction
281   void InitMaxHeap();
282 
283   bool is_arena_mode_;
284   const InternalKeyComparator* comparator_;
285   autovector<IteratorWrapper, kNumIterReserve> children_;
286 
287   // Cached pointer to child iterator with the current key, or nullptr if no
288   // child iterators are valid.  This is the top of minHeap_ or maxHeap_
289   // depending on the direction.
290   IteratorWrapper* current_;
291   // If any of the children have non-ok status, this is one of them.
292   Status status_;
293   // Which direction is the iterator moving?
294   enum Direction {
295     kForward,
296     kReverse
297   };
298   Direction direction_;
299   MergerMinIterHeap minHeap_;
300   bool prefix_seek_mode_;
301 
302   // Max heap is used for reverse iteration, which is way less common than
303   // forward.  Lazily initialize it to save memory.
304   std::unique_ptr<MergerMaxIterHeap> maxHeap_;
305   PinnedIteratorsManager* pinned_iters_mgr_;
306 
307   // In forward direction, process a child that is not in the min heap.
308   // If valid, add to the min heap. Otherwise, check status.
309   void AddToMinHeapOrCheckStatus(IteratorWrapper*);
310 
311   // In backward direction, process a child that is not in the max heap.
312   // If valid, add to the min heap. Otherwise, check status.
313   void AddToMaxHeapOrCheckStatus(IteratorWrapper*);
314 
315   void SwitchToForward();
316 
317   // Switch the direction from forward to backward without changing the
318   // position. Iterator should still be valid.
319   void SwitchToBackward();
320 
CurrentForward() const321   IteratorWrapper* CurrentForward() const {
322     assert(direction_ == kForward);
323     return !minHeap_.empty() ? minHeap_.top() : nullptr;
324   }
325 
CurrentReverse() const326   IteratorWrapper* CurrentReverse() const {
327     assert(direction_ == kReverse);
328     assert(maxHeap_);
329     return !maxHeap_->empty() ? maxHeap_->top() : nullptr;
330   }
331 };
332 
AddToMinHeapOrCheckStatus(IteratorWrapper * child)333 void MergingIterator::AddToMinHeapOrCheckStatus(IteratorWrapper* child) {
334   if (child->Valid()) {
335     assert(child->status().ok());
336     minHeap_.push(child);
337   } else {
338     considerStatus(child->status());
339   }
340 }
341 
AddToMaxHeapOrCheckStatus(IteratorWrapper * child)342 void MergingIterator::AddToMaxHeapOrCheckStatus(IteratorWrapper* child) {
343   if (child->Valid()) {
344     assert(child->status().ok());
345     maxHeap_->push(child);
346   } else {
347     considerStatus(child->status());
348   }
349 }
350 
SwitchToForward()351 void MergingIterator::SwitchToForward() {
352   // Otherwise, advance the non-current children.  We advance current_
353   // just after the if-block.
354   ClearHeaps();
355   Slice target = key();
356   for (auto& child : children_) {
357     if (&child != current_) {
358       child.Seek(target);
359       if (child.Valid() && comparator_->Equal(target, child.key())) {
360         assert(child.status().ok());
361         child.Next();
362       }
363     }
364     AddToMinHeapOrCheckStatus(&child);
365   }
366   direction_ = kForward;
367 }
368 
SwitchToBackward()369 void MergingIterator::SwitchToBackward() {
370   ClearHeaps();
371   InitMaxHeap();
372   Slice target = key();
373   for (auto& child : children_) {
374     if (&child != current_) {
375       child.SeekForPrev(target);
376       TEST_SYNC_POINT_CALLBACK("MergeIterator::Prev:BeforePrev", &child);
377       if (child.Valid() && comparator_->Equal(target, child.key())) {
378         assert(child.status().ok());
379         child.Prev();
380       }
381     }
382     AddToMaxHeapOrCheckStatus(&child);
383   }
384   direction_ = kReverse;
385   if (!prefix_seek_mode_) {
386     // Note that we don't do assert(current_ == CurrentReverse()) here
387     // because it is possible to have some keys larger than the seek-key
388     // inserted between Seek() and SeekToLast(), which makes current_ not
389     // equal to CurrentReverse().
390     current_ = CurrentReverse();
391   }
392   assert(current_ == CurrentReverse());
393 }
394 
ClearHeaps()395 void MergingIterator::ClearHeaps() {
396   minHeap_.clear();
397   if (maxHeap_) {
398     maxHeap_->clear();
399   }
400 }
401 
InitMaxHeap()402 void MergingIterator::InitMaxHeap() {
403   if (!maxHeap_) {
404     maxHeap_.reset(new MergerMaxIterHeap(comparator_));
405   }
406 }
407 
NewMergingIterator(const InternalKeyComparator * cmp,InternalIterator ** list,int n,Arena * arena,bool prefix_seek_mode)408 InternalIterator* NewMergingIterator(const InternalKeyComparator* cmp,
409                                      InternalIterator** list, int n,
410                                      Arena* arena, bool prefix_seek_mode) {
411   assert(n >= 0);
412   if (n == 0) {
413     return NewEmptyInternalIterator<Slice>(arena);
414   } else if (n == 1) {
415     return list[0];
416   } else {
417     if (arena == nullptr) {
418       return new MergingIterator(cmp, list, n, false, prefix_seek_mode);
419     } else {
420       auto mem = arena->AllocateAligned(sizeof(MergingIterator));
421       return new (mem) MergingIterator(cmp, list, n, true, prefix_seek_mode);
422     }
423   }
424 }
425 
MergeIteratorBuilder(const InternalKeyComparator * comparator,Arena * a,bool prefix_seek_mode)426 MergeIteratorBuilder::MergeIteratorBuilder(
427     const InternalKeyComparator* comparator, Arena* a, bool prefix_seek_mode)
428     : first_iter(nullptr), use_merging_iter(false), arena(a) {
429   auto mem = arena->AllocateAligned(sizeof(MergingIterator));
430   merge_iter =
431       new (mem) MergingIterator(comparator, nullptr, 0, true, prefix_seek_mode);
432 }
433 
~MergeIteratorBuilder()434 MergeIteratorBuilder::~MergeIteratorBuilder() {
435   if (first_iter != nullptr) {
436     first_iter->~InternalIterator();
437   }
438   if (merge_iter != nullptr) {
439     merge_iter->~MergingIterator();
440   }
441 }
442 
AddIterator(InternalIterator * iter)443 void MergeIteratorBuilder::AddIterator(InternalIterator* iter) {
444   if (!use_merging_iter && first_iter != nullptr) {
445     merge_iter->AddIterator(first_iter);
446     use_merging_iter = true;
447     first_iter = nullptr;
448   }
449   if (use_merging_iter) {
450     merge_iter->AddIterator(iter);
451   } else {
452     first_iter = iter;
453   }
454 }
455 
Finish()456 InternalIterator* MergeIteratorBuilder::Finish() {
457   InternalIterator* ret = nullptr;
458   if (!use_merging_iter) {
459     ret = first_iter;
460     first_iter = nullptr;
461   } else {
462     ret = merge_iter;
463     merge_iter = nullptr;
464   }
465   return ret;
466 }
467 
468 }  // namespace ROCKSDB_NAMESPACE
469