1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "table/merging_iterator.h"
11 #include <string>
12 #include <vector>
13 #include "db/dbformat.h"
14 #include "db/pinned_iterators_manager.h"
15 #include "memory/arena.h"
16 #include "monitoring/perf_context_imp.h"
17 #include "rocksdb/comparator.h"
18 #include "rocksdb/iterator.h"
19 #include "rocksdb/options.h"
20 #include "table/internal_iterator.h"
21 #include "table/iter_heap.h"
22 #include "table/iterator_wrapper.h"
23 #include "test_util/sync_point.h"
24 #include "util/autovector.h"
25 #include "util/heap.h"
26 #include "util/stop_watch.h"
27 
28 namespace rocksdb {
29 // Without anonymous namespace here, we fail the warning -Wmissing-prototypes
30 namespace {
31 typedef BinaryHeap<IteratorWrapper*, MaxIteratorComparator> MergerMaxIterHeap;
32 typedef BinaryHeap<IteratorWrapper*, MinIteratorComparator> MergerMinIterHeap;
33 }  // namespace
34 
35 const size_t kNumIterReserve = 4;
36 
37 class MergingIterator : public InternalIterator {
38  public:
MergingIterator(const InternalKeyComparator * comparator,InternalIterator ** children,int n,bool is_arena_mode,bool prefix_seek_mode)39   MergingIterator(const InternalKeyComparator* comparator,
40                   InternalIterator** children, int n, bool is_arena_mode,
41                   bool prefix_seek_mode)
42       : is_arena_mode_(is_arena_mode),
43         comparator_(comparator),
44         current_(nullptr),
45         direction_(kForward),
46         minHeap_(comparator_),
47         prefix_seek_mode_(prefix_seek_mode),
48         pinned_iters_mgr_(nullptr) {
49     children_.resize(n);
50     for (int i = 0; i < n; i++) {
51       children_[i].Set(children[i]);
52     }
53     for (auto& child : children_) {
54       AddToMinHeapOrCheckStatus(&child);
55     }
56     current_ = CurrentForward();
57   }
58 
considerStatus(Status s)59   void considerStatus(Status s) {
60     if (!s.ok() && status_.ok()) {
61       status_ = s;
62     }
63   }
64 
AddIterator(InternalIterator * iter)65   virtual void AddIterator(InternalIterator* iter) {
66     assert(direction_ == kForward);
67     children_.emplace_back(iter);
68     if (pinned_iters_mgr_) {
69       iter->SetPinnedItersMgr(pinned_iters_mgr_);
70     }
71     auto new_wrapper = children_.back();
72     AddToMinHeapOrCheckStatus(&new_wrapper);
73     if (new_wrapper.Valid()) {
74       current_ = CurrentForward();
75     }
76   }
77 
~MergingIterator()78   ~MergingIterator() override {
79     for (auto& child : children_) {
80       child.DeleteIter(is_arena_mode_);
81     }
82   }
83 
Valid() const84   bool Valid() const override { return current_ != nullptr && status_.ok(); }
85 
status() const86   Status status() const override { return status_; }
87 
SeekToFirst()88   void SeekToFirst() override {
89     ClearHeaps();
90     status_ = Status::OK();
91     for (auto& child : children_) {
92       child.SeekToFirst();
93       AddToMinHeapOrCheckStatus(&child);
94     }
95     direction_ = kForward;
96     current_ = CurrentForward();
97   }
98 
SeekToLast()99   void SeekToLast() override {
100     ClearHeaps();
101     InitMaxHeap();
102     status_ = Status::OK();
103     for (auto& child : children_) {
104       child.SeekToLast();
105       AddToMaxHeapOrCheckStatus(&child);
106     }
107     direction_ = kReverse;
108     current_ = CurrentReverse();
109   }
110 
Seek(const Slice & target)111   void Seek(const Slice& target) override {
112     ClearHeaps();
113     status_ = Status::OK();
114     for (auto& child : children_) {
115       {
116         PERF_TIMER_GUARD(seek_child_seek_time);
117         child.Seek(target);
118       }
119 
120       PERF_COUNTER_ADD(seek_child_seek_count, 1);
121       {
122         // Strictly, we timed slightly more than min heap operation,
123         // but these operations are very cheap.
124         PERF_TIMER_GUARD(seek_min_heap_time);
125         AddToMinHeapOrCheckStatus(&child);
126       }
127     }
128     direction_ = kForward;
129     {
130       PERF_TIMER_GUARD(seek_min_heap_time);
131       current_ = CurrentForward();
132     }
133   }
134 
SeekForPrev(const Slice & target)135   void SeekForPrev(const Slice& target) override {
136     ClearHeaps();
137     InitMaxHeap();
138     status_ = Status::OK();
139 
140     for (auto& child : children_) {
141       {
142         PERF_TIMER_GUARD(seek_child_seek_time);
143         child.SeekForPrev(target);
144       }
145       PERF_COUNTER_ADD(seek_child_seek_count, 1);
146 
147       {
148         PERF_TIMER_GUARD(seek_max_heap_time);
149         AddToMaxHeapOrCheckStatus(&child);
150       }
151     }
152     direction_ = kReverse;
153     {
154       PERF_TIMER_GUARD(seek_max_heap_time);
155       current_ = CurrentReverse();
156     }
157   }
158 
Next()159   void Next() override {
160     assert(Valid());
161 
162     // Ensure that all children are positioned after key().
163     // If we are moving in the forward direction, it is already
164     // true for all of the non-current children since current_ is
165     // the smallest child and key() == current_->key().
166     if (direction_ != kForward) {
167       SwitchToForward();
168       // The loop advanced all non-current children to be > key() so current_
169       // should still be strictly the smallest key.
170       assert(current_ == CurrentForward());
171     }
172 
173     // For the heap modifications below to be correct, current_ must be the
174     // current top of the heap.
175     assert(current_ == CurrentForward());
176 
177     // as the current points to the current record. move the iterator forward.
178     current_->Next();
179     if (current_->Valid()) {
180       // current is still valid after the Next() call above.  Call
181       // replace_top() to restore the heap property.  When the same child
182       // iterator yields a sequence of keys, this is cheap.
183       assert(current_->status().ok());
184       minHeap_.replace_top(current_);
185     } else {
186       // current stopped being valid, remove it from the heap.
187       considerStatus(current_->status());
188       minHeap_.pop();
189     }
190     current_ = CurrentForward();
191   }
192 
NextAndGetResult(IterateResult * result)193   bool NextAndGetResult(IterateResult* result) override {
194     Next();
195     bool is_valid = Valid();
196     if (is_valid) {
197       result->key = key();
198       result->may_be_out_of_upper_bound = MayBeOutOfUpperBound();
199     }
200     return is_valid;
201   }
202 
Prev()203   void Prev() override {
204     assert(Valid());
205     // Ensure that all children are positioned before key().
206     // If we are moving in the reverse direction, it is already
207     // true for all of the non-current children since current_ is
208     // the largest child and key() == current_->key().
209     if (direction_ != kReverse) {
210       // Otherwise, retreat the non-current children.  We retreat current_
211       // just after the if-block.
212       SwitchToBackward();
213     }
214 
215     // For the heap modifications below to be correct, current_ must be the
216     // current top of the heap.
217     assert(current_ == CurrentReverse());
218 
219     current_->Prev();
220     if (current_->Valid()) {
221       // current is still valid after the Prev() call above.  Call
222       // replace_top() to restore the heap property.  When the same child
223       // iterator yields a sequence of keys, this is cheap.
224       assert(current_->status().ok());
225       maxHeap_->replace_top(current_);
226     } else {
227       // current stopped being valid, remove it from the heap.
228       considerStatus(current_->status());
229       maxHeap_->pop();
230     }
231     current_ = CurrentReverse();
232   }
233 
key() const234   Slice key() const override {
235     assert(Valid());
236     return current_->key();
237   }
238 
value() const239   Slice value() const override {
240     assert(Valid());
241     return current_->value();
242   }
243 
244   // Here we simply relay MayBeOutOfLowerBound/MayBeOutOfUpperBound result
245   // from current child iterator. Potentially as long as one of child iterator
246   // report out of bound is not possible, we know current key is within bound.
247 
MayBeOutOfLowerBound()248   bool MayBeOutOfLowerBound() override {
249     assert(Valid());
250     return current_->MayBeOutOfLowerBound();
251   }
252 
MayBeOutOfUpperBound()253   bool MayBeOutOfUpperBound() override {
254     assert(Valid());
255     return current_->MayBeOutOfUpperBound();
256   }
257 
SetPinnedItersMgr(PinnedIteratorsManager * pinned_iters_mgr)258   void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
259     pinned_iters_mgr_ = pinned_iters_mgr;
260     for (auto& child : children_) {
261       child.SetPinnedItersMgr(pinned_iters_mgr);
262     }
263   }
264 
IsKeyPinned() const265   bool IsKeyPinned() const override {
266     assert(Valid());
267     return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
268            current_->IsKeyPinned();
269   }
270 
IsValuePinned() const271   bool IsValuePinned() const override {
272     assert(Valid());
273     return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
274            current_->IsValuePinned();
275   }
276 
277  private:
278   // Clears heaps for both directions, used when changing direction or seeking
279   void ClearHeaps();
280   // Ensures that maxHeap_ is initialized when starting to go in the reverse
281   // direction
282   void InitMaxHeap();
283 
284   bool is_arena_mode_;
285   const InternalKeyComparator* comparator_;
286   autovector<IteratorWrapper, kNumIterReserve> children_;
287 
288   // Cached pointer to child iterator with the current key, or nullptr if no
289   // child iterators are valid.  This is the top of minHeap_ or maxHeap_
290   // depending on the direction.
291   IteratorWrapper* current_;
292   // If any of the children have non-ok status, this is one of them.
293   Status status_;
294   // Which direction is the iterator moving?
295   enum Direction {
296     kForward,
297     kReverse
298   };
299   Direction direction_;
300   MergerMinIterHeap minHeap_;
301   bool prefix_seek_mode_;
302 
303   // Max heap is used for reverse iteration, which is way less common than
304   // forward.  Lazily initialize it to save memory.
305   std::unique_ptr<MergerMaxIterHeap> maxHeap_;
306   PinnedIteratorsManager* pinned_iters_mgr_;
307 
308   // In forward direction, process a child that is not in the min heap.
309   // If valid, add to the min heap. Otherwise, check status.
310   void AddToMinHeapOrCheckStatus(IteratorWrapper*);
311 
312   // In backward direction, process a child that is not in the max heap.
313   // If valid, add to the min heap. Otherwise, check status.
314   void AddToMaxHeapOrCheckStatus(IteratorWrapper*);
315 
316   void SwitchToForward();
317 
318   // Switch the direction from forward to backward without changing the
319   // position. Iterator should still be valid.
320   void SwitchToBackward();
321 
CurrentForward() const322   IteratorWrapper* CurrentForward() const {
323     assert(direction_ == kForward);
324     return !minHeap_.empty() ? minHeap_.top() : nullptr;
325   }
326 
CurrentReverse() const327   IteratorWrapper* CurrentReverse() const {
328     assert(direction_ == kReverse);
329     assert(maxHeap_);
330     return !maxHeap_->empty() ? maxHeap_->top() : nullptr;
331   }
332 };
333 
AddToMinHeapOrCheckStatus(IteratorWrapper * child)334 void MergingIterator::AddToMinHeapOrCheckStatus(IteratorWrapper* child) {
335   if (child->Valid()) {
336     assert(child->status().ok());
337     minHeap_.push(child);
338   } else {
339     considerStatus(child->status());
340   }
341 }
342 
AddToMaxHeapOrCheckStatus(IteratorWrapper * child)343 void MergingIterator::AddToMaxHeapOrCheckStatus(IteratorWrapper* child) {
344   if (child->Valid()) {
345     assert(child->status().ok());
346     maxHeap_->push(child);
347   } else {
348     considerStatus(child->status());
349   }
350 }
351 
SwitchToForward()352 void MergingIterator::SwitchToForward() {
353   // Otherwise, advance the non-current children.  We advance current_
354   // just after the if-block.
355   ClearHeaps();
356   Slice target = key();
357   for (auto& child : children_) {
358     if (&child != current_) {
359       child.Seek(target);
360       if (child.Valid() && comparator_->Equal(target, child.key())) {
361         assert(child.status().ok());
362         child.Next();
363       }
364     }
365     AddToMinHeapOrCheckStatus(&child);
366   }
367   direction_ = kForward;
368 }
369 
SwitchToBackward()370 void MergingIterator::SwitchToBackward() {
371   ClearHeaps();
372   InitMaxHeap();
373   Slice target = key();
374   for (auto& child : children_) {
375     if (&child != current_) {
376       child.SeekForPrev(target);
377       TEST_SYNC_POINT_CALLBACK("MergeIterator::Prev:BeforePrev", &child);
378       if (child.Valid() && comparator_->Equal(target, child.key())) {
379         assert(child.status().ok());
380         child.Prev();
381       }
382     }
383     AddToMaxHeapOrCheckStatus(&child);
384   }
385   direction_ = kReverse;
386   if (!prefix_seek_mode_) {
387     // Note that we don't do assert(current_ == CurrentReverse()) here
388     // because it is possible to have some keys larger than the seek-key
389     // inserted between Seek() and SeekToLast(), which makes current_ not
390     // equal to CurrentReverse().
391     current_ = CurrentReverse();
392   }
393   assert(current_ == CurrentReverse());
394 }
395 
ClearHeaps()396 void MergingIterator::ClearHeaps() {
397   minHeap_.clear();
398   if (maxHeap_) {
399     maxHeap_->clear();
400   }
401 }
402 
InitMaxHeap()403 void MergingIterator::InitMaxHeap() {
404   if (!maxHeap_) {
405     maxHeap_.reset(new MergerMaxIterHeap(comparator_));
406   }
407 }
408 
NewMergingIterator(const InternalKeyComparator * cmp,InternalIterator ** list,int n,Arena * arena,bool prefix_seek_mode)409 InternalIterator* NewMergingIterator(const InternalKeyComparator* cmp,
410                                      InternalIterator** list, int n,
411                                      Arena* arena, bool prefix_seek_mode) {
412   assert(n >= 0);
413   if (n == 0) {
414     return NewEmptyInternalIterator<Slice>(arena);
415   } else if (n == 1) {
416     return list[0];
417   } else {
418     if (arena == nullptr) {
419       return new MergingIterator(cmp, list, n, false, prefix_seek_mode);
420     } else {
421       auto mem = arena->AllocateAligned(sizeof(MergingIterator));
422       return new (mem) MergingIterator(cmp, list, n, true, prefix_seek_mode);
423     }
424   }
425 }
426 
MergeIteratorBuilder(const InternalKeyComparator * comparator,Arena * a,bool prefix_seek_mode)427 MergeIteratorBuilder::MergeIteratorBuilder(
428     const InternalKeyComparator* comparator, Arena* a, bool prefix_seek_mode)
429     : first_iter(nullptr), use_merging_iter(false), arena(a) {
430   auto mem = arena->AllocateAligned(sizeof(MergingIterator));
431   merge_iter =
432       new (mem) MergingIterator(comparator, nullptr, 0, true, prefix_seek_mode);
433 }
434 
~MergeIteratorBuilder()435 MergeIteratorBuilder::~MergeIteratorBuilder() {
436   if (first_iter != nullptr) {
437     first_iter->~InternalIterator();
438   }
439   if (merge_iter != nullptr) {
440     merge_iter->~MergingIterator();
441   }
442 }
443 
AddIterator(InternalIterator * iter)444 void MergeIteratorBuilder::AddIterator(InternalIterator* iter) {
445   if (!use_merging_iter && first_iter != nullptr) {
446     merge_iter->AddIterator(first_iter);
447     use_merging_iter = true;
448     first_iter = nullptr;
449   }
450   if (use_merging_iter) {
451     merge_iter->AddIterator(iter);
452   } else {
453     first_iter = iter;
454   }
455 }
456 
Finish()457 InternalIterator* MergeIteratorBuilder::Finish() {
458   InternalIterator* ret = nullptr;
459   if (!use_merging_iter) {
460     ret = first_iter;
461     first_iter = nullptr;
462   } else {
463     ret = merge_iter;
464     merge_iter = nullptr;
465   }
466   return ret;
467 }
468 
469 }  // namespace rocksdb
470