1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "table/merging_iterator.h"
11 #include <string>
12 #include <vector>
13 #include "db/dbformat.h"
14 #include "db/pinned_iterators_manager.h"
15 #include "memory/arena.h"
16 #include "monitoring/perf_context_imp.h"
17 #include "rocksdb/comparator.h"
18 #include "rocksdb/iterator.h"
19 #include "rocksdb/options.h"
20 #include "table/internal_iterator.h"
21 #include "table/iter_heap.h"
22 #include "table/iterator_wrapper.h"
23 #include "test_util/sync_point.h"
24 #include "util/autovector.h"
25 #include "util/heap.h"
26 #include "util/stop_watch.h"
27 
28 namespace ROCKSDB_NAMESPACE {
29 // Without anonymous namespace here, we fail the warning -Wmissing-prototypes
30 namespace {
31 typedef BinaryHeap<IteratorWrapper*, MaxIteratorComparator> MergerMaxIterHeap;
32 typedef BinaryHeap<IteratorWrapper*, MinIteratorComparator> MergerMinIterHeap;
33 }  // namespace
34 
35 const size_t kNumIterReserve = 4;
36 
37 class MergingIterator : public InternalIterator {
38  public:
MergingIterator(const InternalKeyComparator * comparator,InternalIterator ** children,int n,bool is_arena_mode,bool prefix_seek_mode)39   MergingIterator(const InternalKeyComparator* comparator,
40                   InternalIterator** children, int n, bool is_arena_mode,
41                   bool prefix_seek_mode)
42       : is_arena_mode_(is_arena_mode),
43         comparator_(comparator),
44         current_(nullptr),
45         direction_(kForward),
46         minHeap_(comparator_),
47         prefix_seek_mode_(prefix_seek_mode),
48         pinned_iters_mgr_(nullptr) {
49     children_.resize(n);
50     for (int i = 0; i < n; i++) {
51       children_[i].Set(children[i]);
52     }
53     for (auto& child : children_) {
54       AddToMinHeapOrCheckStatus(&child);
55     }
56     current_ = CurrentForward();
57   }
58 
considerStatus(Status s)59   void considerStatus(Status s) {
60     if (!s.ok() && status_.ok()) {
61       status_ = s;
62     }
63   }
64 
AddIterator(InternalIterator * iter)65   virtual void AddIterator(InternalIterator* iter) {
66     assert(direction_ == kForward);
67     children_.emplace_back(iter);
68     if (pinned_iters_mgr_) {
69       iter->SetPinnedItersMgr(pinned_iters_mgr_);
70     }
71     auto new_wrapper = children_.back();
72     AddToMinHeapOrCheckStatus(&new_wrapper);
73     if (new_wrapper.Valid()) {
74       current_ = CurrentForward();
75     }
76   }
77 
~MergingIterator()78   ~MergingIterator() override {
79     for (auto& child : children_) {
80       child.DeleteIter(is_arena_mode_);
81     }
82     status_.PermitUncheckedError();
83   }
84 
Valid() const85   bool Valid() const override { return current_ != nullptr && status_.ok(); }
86 
status() const87   Status status() const override { return status_; }
88 
SeekToFirst()89   void SeekToFirst() override {
90     ClearHeaps();
91     status_ = Status::OK();
92     for (auto& child : children_) {
93       child.SeekToFirst();
94       AddToMinHeapOrCheckStatus(&child);
95     }
96     direction_ = kForward;
97     current_ = CurrentForward();
98   }
99 
SeekToLast()100   void SeekToLast() override {
101     ClearHeaps();
102     InitMaxHeap();
103     status_ = Status::OK();
104     for (auto& child : children_) {
105       child.SeekToLast();
106       AddToMaxHeapOrCheckStatus(&child);
107     }
108     direction_ = kReverse;
109     current_ = CurrentReverse();
110   }
111 
Seek(const Slice & target)112   void Seek(const Slice& target) override {
113     ClearHeaps();
114     status_ = Status::OK();
115     for (auto& child : children_) {
116       {
117         PERF_TIMER_GUARD(seek_child_seek_time);
118         child.Seek(target);
119       }
120 
121       PERF_COUNTER_ADD(seek_child_seek_count, 1);
122       {
123         // Strictly, we timed slightly more than min heap operation,
124         // but these operations are very cheap.
125         PERF_TIMER_GUARD(seek_min_heap_time);
126         AddToMinHeapOrCheckStatus(&child);
127       }
128     }
129     direction_ = kForward;
130     {
131       PERF_TIMER_GUARD(seek_min_heap_time);
132       current_ = CurrentForward();
133     }
134   }
135 
SeekForPrev(const Slice & target)136   void SeekForPrev(const Slice& target) override {
137     ClearHeaps();
138     InitMaxHeap();
139     status_ = Status::OK();
140 
141     for (auto& child : children_) {
142       {
143         PERF_TIMER_GUARD(seek_child_seek_time);
144         child.SeekForPrev(target);
145       }
146       PERF_COUNTER_ADD(seek_child_seek_count, 1);
147 
148       {
149         PERF_TIMER_GUARD(seek_max_heap_time);
150         AddToMaxHeapOrCheckStatus(&child);
151       }
152     }
153     direction_ = kReverse;
154     {
155       PERF_TIMER_GUARD(seek_max_heap_time);
156       current_ = CurrentReverse();
157     }
158   }
159 
Next()160   void Next() override {
161     assert(Valid());
162 
163     // Ensure that all children are positioned after key().
164     // If we are moving in the forward direction, it is already
165     // true for all of the non-current children since current_ is
166     // the smallest child and key() == current_->key().
167     if (direction_ != kForward) {
168       SwitchToForward();
169       // The loop advanced all non-current children to be > key() so current_
170       // should still be strictly the smallest key.
171     }
172 
173     // For the heap modifications below to be correct, current_ must be the
174     // current top of the heap.
175     assert(current_ == CurrentForward());
176 
177     // as the current points to the current record. move the iterator forward.
178     current_->Next();
179     if (current_->Valid()) {
180       // current is still valid after the Next() call above.  Call
181       // replace_top() to restore the heap property.  When the same child
182       // iterator yields a sequence of keys, this is cheap.
183       assert(current_->status().ok());
184       minHeap_.replace_top(current_);
185     } else {
186       // current stopped being valid, remove it from the heap.
187       considerStatus(current_->status());
188       minHeap_.pop();
189     }
190     current_ = CurrentForward();
191   }
192 
NextAndGetResult(IterateResult * result)193   bool NextAndGetResult(IterateResult* result) override {
194     Next();
195     bool is_valid = Valid();
196     if (is_valid) {
197       result->key = key();
198       result->bound_check_result = UpperBoundCheckResult();
199       result->value_prepared = current_->IsValuePrepared();
200     }
201     return is_valid;
202   }
203 
Prev()204   void Prev() override {
205     assert(Valid());
206     // Ensure that all children are positioned before key().
207     // If we are moving in the reverse direction, it is already
208     // true for all of the non-current children since current_ is
209     // the largest child and key() == current_->key().
210     if (direction_ != kReverse) {
211       // Otherwise, retreat the non-current children.  We retreat current_
212       // just after the if-block.
213       SwitchToBackward();
214     }
215 
216     // For the heap modifications below to be correct, current_ must be the
217     // current top of the heap.
218     assert(current_ == CurrentReverse());
219 
220     current_->Prev();
221     if (current_->Valid()) {
222       // current is still valid after the Prev() call above.  Call
223       // replace_top() to restore the heap property.  When the same child
224       // iterator yields a sequence of keys, this is cheap.
225       assert(current_->status().ok());
226       maxHeap_->replace_top(current_);
227     } else {
228       // current stopped being valid, remove it from the heap.
229       considerStatus(current_->status());
230       maxHeap_->pop();
231     }
232     current_ = CurrentReverse();
233   }
234 
key() const235   Slice key() const override {
236     assert(Valid());
237     return current_->key();
238   }
239 
value() const240   Slice value() const override {
241     assert(Valid());
242     return current_->value();
243   }
244 
PrepareValue()245   bool PrepareValue() override {
246     assert(Valid());
247     if (current_->PrepareValue()) {
248       return true;
249     }
250 
251     considerStatus(current_->status());
252     assert(!status_.ok());
253     return false;
254   }
255 
256   // Here we simply relay MayBeOutOfLowerBound/MayBeOutOfUpperBound result
257   // from current child iterator. Potentially as long as one of child iterator
258   // report out of bound is not possible, we know current key is within bound.
259 
MayBeOutOfLowerBound()260   bool MayBeOutOfLowerBound() override {
261     assert(Valid());
262     return current_->MayBeOutOfLowerBound();
263   }
264 
UpperBoundCheckResult()265   IterBoundCheck UpperBoundCheckResult() override {
266     assert(Valid());
267     return current_->UpperBoundCheckResult();
268   }
269 
SetPinnedItersMgr(PinnedIteratorsManager * pinned_iters_mgr)270   void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
271     pinned_iters_mgr_ = pinned_iters_mgr;
272     for (auto& child : children_) {
273       child.SetPinnedItersMgr(pinned_iters_mgr);
274     }
275   }
276 
IsKeyPinned() const277   bool IsKeyPinned() const override {
278     assert(Valid());
279     return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
280            current_->IsKeyPinned();
281   }
282 
IsValuePinned() const283   bool IsValuePinned() const override {
284     assert(Valid());
285     return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
286            current_->IsValuePinned();
287   }
288 
289  private:
290   // Clears heaps for both directions, used when changing direction or seeking
291   void ClearHeaps();
292   // Ensures that maxHeap_ is initialized when starting to go in the reverse
293   // direction
294   void InitMaxHeap();
295 
296   bool is_arena_mode_;
297   const InternalKeyComparator* comparator_;
298   autovector<IteratorWrapper, kNumIterReserve> children_;
299 
300   // Cached pointer to child iterator with the current key, or nullptr if no
301   // child iterators are valid.  This is the top of minHeap_ or maxHeap_
302   // depending on the direction.
303   IteratorWrapper* current_;
304   // If any of the children have non-ok status, this is one of them.
305   Status status_;
306   // Which direction is the iterator moving?
307   enum Direction {
308     kForward,
309     kReverse
310   };
311   Direction direction_;
312   MergerMinIterHeap minHeap_;
313   bool prefix_seek_mode_;
314 
315   // Max heap is used for reverse iteration, which is way less common than
316   // forward.  Lazily initialize it to save memory.
317   std::unique_ptr<MergerMaxIterHeap> maxHeap_;
318   PinnedIteratorsManager* pinned_iters_mgr_;
319 
320   // In forward direction, process a child that is not in the min heap.
321   // If valid, add to the min heap. Otherwise, check status.
322   void AddToMinHeapOrCheckStatus(IteratorWrapper*);
323 
324   // In backward direction, process a child that is not in the max heap.
325   // If valid, add to the min heap. Otherwise, check status.
326   void AddToMaxHeapOrCheckStatus(IteratorWrapper*);
327 
328   void SwitchToForward();
329 
330   // Switch the direction from forward to backward without changing the
331   // position. Iterator should still be valid.
332   void SwitchToBackward();
333 
CurrentForward() const334   IteratorWrapper* CurrentForward() const {
335     assert(direction_ == kForward);
336     return !minHeap_.empty() ? minHeap_.top() : nullptr;
337   }
338 
CurrentReverse() const339   IteratorWrapper* CurrentReverse() const {
340     assert(direction_ == kReverse);
341     assert(maxHeap_);
342     return !maxHeap_->empty() ? maxHeap_->top() : nullptr;
343   }
344 };
345 
AddToMinHeapOrCheckStatus(IteratorWrapper * child)346 void MergingIterator::AddToMinHeapOrCheckStatus(IteratorWrapper* child) {
347   if (child->Valid()) {
348     assert(child->status().ok());
349     minHeap_.push(child);
350   } else {
351     considerStatus(child->status());
352   }
353 }
354 
AddToMaxHeapOrCheckStatus(IteratorWrapper * child)355 void MergingIterator::AddToMaxHeapOrCheckStatus(IteratorWrapper* child) {
356   if (child->Valid()) {
357     assert(child->status().ok());
358     maxHeap_->push(child);
359   } else {
360     considerStatus(child->status());
361   }
362 }
363 
SwitchToForward()364 void MergingIterator::SwitchToForward() {
365   // Otherwise, advance the non-current children.  We advance current_
366   // just after the if-block.
367   ClearHeaps();
368   Slice target = key();
369   for (auto& child : children_) {
370     if (&child != current_) {
371       child.Seek(target);
372       if (child.Valid() && comparator_->Equal(target, child.key())) {
373         assert(child.status().ok());
374         child.Next();
375       }
376     }
377     AddToMinHeapOrCheckStatus(&child);
378   }
379   direction_ = kForward;
380 }
381 
SwitchToBackward()382 void MergingIterator::SwitchToBackward() {
383   ClearHeaps();
384   InitMaxHeap();
385   Slice target = key();
386   for (auto& child : children_) {
387     if (&child != current_) {
388       child.SeekForPrev(target);
389       TEST_SYNC_POINT_CALLBACK("MergeIterator::Prev:BeforePrev", &child);
390       if (child.Valid() && comparator_->Equal(target, child.key())) {
391         assert(child.status().ok());
392         child.Prev();
393       }
394     }
395     AddToMaxHeapOrCheckStatus(&child);
396   }
397   direction_ = kReverse;
398   if (!prefix_seek_mode_) {
399     // Note that we don't do assert(current_ == CurrentReverse()) here
400     // because it is possible to have some keys larger than the seek-key
401     // inserted between Seek() and SeekToLast(), which makes current_ not
402     // equal to CurrentReverse().
403     current_ = CurrentReverse();
404   }
405   assert(current_ == CurrentReverse());
406 }
407 
ClearHeaps()408 void MergingIterator::ClearHeaps() {
409   minHeap_.clear();
410   if (maxHeap_) {
411     maxHeap_->clear();
412   }
413 }
414 
InitMaxHeap()415 void MergingIterator::InitMaxHeap() {
416   if (!maxHeap_) {
417     maxHeap_.reset(new MergerMaxIterHeap(comparator_));
418   }
419 }
420 
NewMergingIterator(const InternalKeyComparator * cmp,InternalIterator ** list,int n,Arena * arena,bool prefix_seek_mode)421 InternalIterator* NewMergingIterator(const InternalKeyComparator* cmp,
422                                      InternalIterator** list, int n,
423                                      Arena* arena, bool prefix_seek_mode) {
424   assert(n >= 0);
425   if (n == 0) {
426     return NewEmptyInternalIterator<Slice>(arena);
427   } else if (n == 1) {
428     return list[0];
429   } else {
430     if (arena == nullptr) {
431       return new MergingIterator(cmp, list, n, false, prefix_seek_mode);
432     } else {
433       auto mem = arena->AllocateAligned(sizeof(MergingIterator));
434       return new (mem) MergingIterator(cmp, list, n, true, prefix_seek_mode);
435     }
436   }
437 }
438 
MergeIteratorBuilder(const InternalKeyComparator * comparator,Arena * a,bool prefix_seek_mode)439 MergeIteratorBuilder::MergeIteratorBuilder(
440     const InternalKeyComparator* comparator, Arena* a, bool prefix_seek_mode)
441     : first_iter(nullptr), use_merging_iter(false), arena(a) {
442   auto mem = arena->AllocateAligned(sizeof(MergingIterator));
443   merge_iter =
444       new (mem) MergingIterator(comparator, nullptr, 0, true, prefix_seek_mode);
445 }
446 
~MergeIteratorBuilder()447 MergeIteratorBuilder::~MergeIteratorBuilder() {
448   if (first_iter != nullptr) {
449     first_iter->~InternalIterator();
450   }
451   if (merge_iter != nullptr) {
452     merge_iter->~MergingIterator();
453   }
454 }
455 
AddIterator(InternalIterator * iter)456 void MergeIteratorBuilder::AddIterator(InternalIterator* iter) {
457   if (!use_merging_iter && first_iter != nullptr) {
458     merge_iter->AddIterator(first_iter);
459     use_merging_iter = true;
460     first_iter = nullptr;
461   }
462   if (use_merging_iter) {
463     merge_iter->AddIterator(iter);
464   } else {
465     first_iter = iter;
466   }
467 }
468 
Finish()469 InternalIterator* MergeIteratorBuilder::Finish() {
470   InternalIterator* ret = nullptr;
471   if (!use_merging_iter) {
472     ret = first_iter;
473     first_iter = nullptr;
474   } else {
475     ret = merge_iter;
476     merge_iter = nullptr;
477   }
478   return ret;
479 }
480 
481 }  // namespace ROCKSDB_NAMESPACE
482