1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #ifndef ROCKSDB_LITE
7 #include "db/forward_iterator.h"
8 
9 #include <limits>
10 #include <string>
11 #include <utility>
12 
13 #include "db/column_family.h"
14 #include "db/db_impl/db_impl.h"
15 #include "db/db_iter.h"
16 #include "db/dbformat.h"
17 #include "db/job_context.h"
18 #include "db/range_del_aggregator.h"
19 #include "db/range_tombstone_fragmenter.h"
20 #include "rocksdb/env.h"
21 #include "rocksdb/slice.h"
22 #include "rocksdb/slice_transform.h"
23 #include "table/merging_iterator.h"
24 #include "test_util/sync_point.h"
25 #include "util/string_util.h"
26 
27 namespace ROCKSDB_NAMESPACE {
28 
29 // Usage:
30 //     ForwardLevelIterator iter;
31 //     iter.SetFileIndex(file_index);
32 //     iter.Seek(target); // or iter.SeekToFirst();
33 //     iter.Next()
34 class ForwardLevelIterator : public InternalIterator {
35  public:
ForwardLevelIterator(const ColumnFamilyData * const cfd,const ReadOptions & read_options,const std::vector<FileMetaData * > & files,const SliceTransform * prefix_extractor)36   ForwardLevelIterator(const ColumnFamilyData* const cfd,
37                        const ReadOptions& read_options,
38                        const std::vector<FileMetaData*>& files,
39                        const SliceTransform* prefix_extractor)
40       : cfd_(cfd),
41         read_options_(read_options),
42         files_(files),
43         valid_(false),
44         file_index_(std::numeric_limits<uint32_t>::max()),
45         file_iter_(nullptr),
46         pinned_iters_mgr_(nullptr),
47         prefix_extractor_(prefix_extractor) {}
48 
~ForwardLevelIterator()49   ~ForwardLevelIterator() override {
50     // Reset current pointer
51     if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
52       pinned_iters_mgr_->PinIterator(file_iter_);
53     } else {
54       delete file_iter_;
55     }
56   }
57 
SetFileIndex(uint32_t file_index)58   void SetFileIndex(uint32_t file_index) {
59     assert(file_index < files_.size());
60     status_ = Status::OK();
61     if (file_index != file_index_) {
62       file_index_ = file_index;
63       Reset();
64     }
65   }
Reset()66   void Reset() {
67     assert(file_index_ < files_.size());
68 
69     // Reset current pointer
70     if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
71       pinned_iters_mgr_->PinIterator(file_iter_);
72     } else {
73       delete file_iter_;
74     }
75 
76     ReadRangeDelAggregator range_del_agg(&cfd_->internal_comparator(),
77                                          kMaxSequenceNumber /* upper_bound */);
78     file_iter_ = cfd_->table_cache()->NewIterator(
79         read_options_, *(cfd_->soptions()), cfd_->internal_comparator(),
80         *files_[file_index_],
81         read_options_.ignore_range_deletions ? nullptr : &range_del_agg,
82         prefix_extractor_, /*table_reader_ptr=*/nullptr,
83         /*file_read_hist=*/nullptr, TableReaderCaller::kUserIterator,
84         /*arena=*/nullptr, /*skip_filters=*/false, /*level=*/-1,
85         /*smallest_compaction_key=*/nullptr,
86         /*largest_compaction_key=*/nullptr);
87     file_iter_->SetPinnedItersMgr(pinned_iters_mgr_);
88     valid_ = false;
89     if (!range_del_agg.IsEmpty()) {
90       status_ = Status::NotSupported(
91           "Range tombstones unsupported with ForwardIterator");
92     }
93   }
SeekToLast()94   void SeekToLast() override {
95     status_ = Status::NotSupported("ForwardLevelIterator::SeekToLast()");
96     valid_ = false;
97   }
Prev()98   void Prev() override {
99     status_ = Status::NotSupported("ForwardLevelIterator::Prev()");
100     valid_ = false;
101   }
Valid() const102   bool Valid() const override {
103     return valid_;
104   }
SeekToFirst()105   void SeekToFirst() override {
106     assert(file_iter_ != nullptr);
107     if (!status_.ok()) {
108       assert(!valid_);
109       return;
110     }
111     file_iter_->SeekToFirst();
112     valid_ = file_iter_->Valid();
113   }
Seek(const Slice & internal_key)114   void Seek(const Slice& internal_key) override {
115     assert(file_iter_ != nullptr);
116 
117     // This deviates from the usual convention for InternalIterator::Seek() in
118     // that it doesn't discard pre-existing error status. That's because this
119     // Seek() is only supposed to be called immediately after SetFileIndex()
120     // (which discards pre-existing error status), and SetFileIndex() may set
121     // an error status, which we shouldn't discard.
122     if (!status_.ok()) {
123       assert(!valid_);
124       return;
125     }
126 
127     file_iter_->Seek(internal_key);
128     valid_ = file_iter_->Valid();
129   }
SeekForPrev(const Slice &)130   void SeekForPrev(const Slice& /*internal_key*/) override {
131     status_ = Status::NotSupported("ForwardLevelIterator::SeekForPrev()");
132     valid_ = false;
133   }
Next()134   void Next() override {
135     assert(valid_);
136     file_iter_->Next();
137     for (;;) {
138       valid_ = file_iter_->Valid();
139       if (!file_iter_->status().ok()) {
140         assert(!valid_);
141         return;
142       }
143       if (valid_) {
144         return;
145       }
146       if (file_index_ + 1 >= files_.size()) {
147         valid_ = false;
148         return;
149       }
150       SetFileIndex(file_index_ + 1);
151       if (!status_.ok()) {
152         assert(!valid_);
153         return;
154       }
155       file_iter_->SeekToFirst();
156     }
157   }
key() const158   Slice key() const override {
159     assert(valid_);
160     return file_iter_->key();
161   }
value() const162   Slice value() const override {
163     assert(valid_);
164     return file_iter_->value();
165   }
status() const166   Status status() const override {
167     if (!status_.ok()) {
168       return status_;
169     } else if (file_iter_) {
170       return file_iter_->status();
171     }
172     return Status::OK();
173   }
IsKeyPinned() const174   bool IsKeyPinned() const override {
175     return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
176            file_iter_->IsKeyPinned();
177   }
IsValuePinned() const178   bool IsValuePinned() const override {
179     return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
180            file_iter_->IsValuePinned();
181   }
SetPinnedItersMgr(PinnedIteratorsManager * pinned_iters_mgr)182   void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
183     pinned_iters_mgr_ = pinned_iters_mgr;
184     if (file_iter_) {
185       file_iter_->SetPinnedItersMgr(pinned_iters_mgr_);
186     }
187   }
188 
189  private:
190   const ColumnFamilyData* const cfd_;
191   const ReadOptions& read_options_;
192   const std::vector<FileMetaData*>& files_;
193 
194   bool valid_;
195   uint32_t file_index_;
196   Status status_;
197   InternalIterator* file_iter_;
198   PinnedIteratorsManager* pinned_iters_mgr_;
199   const SliceTransform* prefix_extractor_;
200 };
201 
ForwardIterator(DBImpl * db,const ReadOptions & read_options,ColumnFamilyData * cfd,SuperVersion * current_sv)202 ForwardIterator::ForwardIterator(DBImpl* db, const ReadOptions& read_options,
203                                  ColumnFamilyData* cfd,
204                                  SuperVersion* current_sv)
205     : db_(db),
206       read_options_(read_options),
207       cfd_(cfd),
208       prefix_extractor_(current_sv->mutable_cf_options.prefix_extractor.get()),
209       user_comparator_(cfd->user_comparator()),
210       immutable_min_heap_(MinIterComparator(&cfd_->internal_comparator())),
211       sv_(current_sv),
212       mutable_iter_(nullptr),
213       current_(nullptr),
214       valid_(false),
215       status_(Status::OK()),
216       immutable_status_(Status::OK()),
217       has_iter_trimmed_for_upper_bound_(false),
218       current_over_upper_bound_(false),
219       is_prev_set_(false),
220       is_prev_inclusive_(false),
221       pinned_iters_mgr_(nullptr) {
222   if (sv_) {
223     RebuildIterators(false);
224   }
225 }
226 
~ForwardIterator()227 ForwardIterator::~ForwardIterator() {
228   Cleanup(true);
229 }
230 
SVCleanup(DBImpl * db,SuperVersion * sv,bool background_purge_on_iterator_cleanup)231 void ForwardIterator::SVCleanup(DBImpl* db, SuperVersion* sv,
232                                 bool background_purge_on_iterator_cleanup) {
233   if (sv->Unref()) {
234     // Job id == 0 means that this is not our background process, but rather
235     // user thread
236     JobContext job_context(0);
237     db->mutex_.Lock();
238     sv->Cleanup();
239     db->FindObsoleteFiles(&job_context, false, true);
240     if (background_purge_on_iterator_cleanup) {
241       db->ScheduleBgLogWriterClose(&job_context);
242       db->AddSuperVersionsToFreeQueue(sv);
243       db->SchedulePurge();
244     }
245     db->mutex_.Unlock();
246     if (!background_purge_on_iterator_cleanup) {
247       delete sv;
248     }
249     if (job_context.HaveSomethingToDelete()) {
250       db->PurgeObsoleteFiles(job_context, background_purge_on_iterator_cleanup);
251     }
252     job_context.Clean();
253   }
254 }
255 
256 namespace {
257 struct SVCleanupParams {
258   DBImpl* db;
259   SuperVersion* sv;
260   bool background_purge_on_iterator_cleanup;
261 };
262 }
263 
264 // Used in PinnedIteratorsManager to release pinned SuperVersion
DeferredSVCleanup(void * arg)265 void ForwardIterator::DeferredSVCleanup(void* arg) {
266   auto d = reinterpret_cast<SVCleanupParams*>(arg);
267   ForwardIterator::SVCleanup(
268     d->db, d->sv, d->background_purge_on_iterator_cleanup);
269   delete d;
270 }
271 
SVCleanup()272 void ForwardIterator::SVCleanup() {
273   if (sv_ == nullptr) {
274     return;
275   }
276   bool background_purge =
277       read_options_.background_purge_on_iterator_cleanup ||
278       db_->immutable_db_options().avoid_unnecessary_blocking_io;
279   if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
280     // pinned_iters_mgr_ tells us to make sure that all visited key-value slices
281     // are alive until pinned_iters_mgr_->ReleasePinnedData() is called.
282     // The slices may point into some memtables owned by sv_, so we need to keep
283     // sv_ referenced until pinned_iters_mgr_ unpins everything.
284     auto p = new SVCleanupParams{db_, sv_, background_purge};
285     pinned_iters_mgr_->PinPtr(p, &ForwardIterator::DeferredSVCleanup);
286   } else {
287     SVCleanup(db_, sv_, background_purge);
288   }
289 }
290 
Cleanup(bool release_sv)291 void ForwardIterator::Cleanup(bool release_sv) {
292   if (mutable_iter_ != nullptr) {
293     DeleteIterator(mutable_iter_, true /* is_arena */);
294   }
295 
296   for (auto* m : imm_iters_) {
297     DeleteIterator(m, true /* is_arena */);
298   }
299   imm_iters_.clear();
300 
301   for (auto* f : l0_iters_) {
302     DeleteIterator(f);
303   }
304   l0_iters_.clear();
305 
306   for (auto* l : level_iters_) {
307     DeleteIterator(l);
308   }
309   level_iters_.clear();
310 
311   if (release_sv) {
312     SVCleanup();
313   }
314 }
315 
Valid() const316 bool ForwardIterator::Valid() const {
317   // See UpdateCurrent().
318   return valid_ ? !current_over_upper_bound_ : false;
319 }
320 
SeekToFirst()321 void ForwardIterator::SeekToFirst() {
322   if (sv_ == nullptr) {
323     RebuildIterators(true);
324   } else if (sv_->version_number != cfd_->GetSuperVersionNumber()) {
325     RenewIterators();
326   } else if (immutable_status_.IsIncomplete()) {
327     ResetIncompleteIterators();
328   }
329   SeekInternal(Slice(), true);
330 }
331 
IsOverUpperBound(const Slice & internal_key) const332 bool ForwardIterator::IsOverUpperBound(const Slice& internal_key) const {
333   return !(read_options_.iterate_upper_bound == nullptr ||
334            cfd_->internal_comparator().user_comparator()->Compare(
335                ExtractUserKey(internal_key),
336                *read_options_.iterate_upper_bound) < 0);
337 }
338 
Seek(const Slice & internal_key)339 void ForwardIterator::Seek(const Slice& internal_key) {
340   if (sv_ == nullptr) {
341     RebuildIterators(true);
342   } else if (sv_->version_number != cfd_->GetSuperVersionNumber()) {
343     RenewIterators();
344   } else if (immutable_status_.IsIncomplete()) {
345     ResetIncompleteIterators();
346   }
347   SeekInternal(internal_key, false);
348 }
349 
SeekInternal(const Slice & internal_key,bool seek_to_first)350 void ForwardIterator::SeekInternal(const Slice& internal_key,
351                                    bool seek_to_first) {
352   assert(mutable_iter_);
353   // mutable
354   seek_to_first ? mutable_iter_->SeekToFirst() :
355                   mutable_iter_->Seek(internal_key);
356 
357   // immutable
358   // TODO(ljin): NeedToSeekImmutable has negative impact on performance
359   // if it turns to need to seek immutable often. We probably want to have
360   // an option to turn it off.
361   if (seek_to_first || NeedToSeekImmutable(internal_key)) {
362     immutable_status_ = Status::OK();
363     if (has_iter_trimmed_for_upper_bound_ &&
364         (
365             // prev_ is not set yet
366             is_prev_set_ == false ||
367             // We are doing SeekToFirst() and internal_key.size() = 0
368             seek_to_first ||
369             // prev_key_ > internal_key
370             cfd_->internal_comparator().InternalKeyComparator::Compare(
371                 prev_key_.GetInternalKey(), internal_key) > 0)) {
372       // Some iterators are trimmed. Need to rebuild.
373       RebuildIterators(true);
374       // Already seeked mutable iter, so seek again
375       seek_to_first ? mutable_iter_->SeekToFirst()
376                     : mutable_iter_->Seek(internal_key);
377     }
378     {
379       auto tmp = MinIterHeap(MinIterComparator(&cfd_->internal_comparator()));
380       immutable_min_heap_.swap(tmp);
381     }
382     for (size_t i = 0; i < imm_iters_.size(); i++) {
383       auto* m = imm_iters_[i];
384       seek_to_first ? m->SeekToFirst() : m->Seek(internal_key);
385       if (!m->status().ok()) {
386         immutable_status_ = m->status();
387       } else if (m->Valid()) {
388         immutable_min_heap_.push(m);
389       }
390     }
391 
392     Slice target_user_key;
393     if (!seek_to_first) {
394       target_user_key = ExtractUserKey(internal_key);
395     }
396     const VersionStorageInfo* vstorage = sv_->current->storage_info();
397     const std::vector<FileMetaData*>& l0 = vstorage->LevelFiles(0);
398     for (size_t i = 0; i < l0.size(); ++i) {
399       if (!l0_iters_[i]) {
400         continue;
401       }
402       if (seek_to_first) {
403         l0_iters_[i]->SeekToFirst();
404       } else {
405         // If the target key passes over the larget key, we are sure Next()
406         // won't go over this file.
407         if (user_comparator_->Compare(target_user_key,
408                                       l0[i]->largest.user_key()) > 0) {
409           if (read_options_.iterate_upper_bound != nullptr) {
410             has_iter_trimmed_for_upper_bound_ = true;
411             DeleteIterator(l0_iters_[i]);
412             l0_iters_[i] = nullptr;
413           }
414           continue;
415         }
416         l0_iters_[i]->Seek(internal_key);
417       }
418 
419       if (!l0_iters_[i]->status().ok()) {
420         immutable_status_ = l0_iters_[i]->status();
421       } else if (l0_iters_[i]->Valid() &&
422                  !IsOverUpperBound(l0_iters_[i]->key())) {
423         immutable_min_heap_.push(l0_iters_[i]);
424       } else {
425         has_iter_trimmed_for_upper_bound_ = true;
426         DeleteIterator(l0_iters_[i]);
427         l0_iters_[i] = nullptr;
428       }
429     }
430 
431     for (int32_t level = 1; level < vstorage->num_levels(); ++level) {
432       const std::vector<FileMetaData*>& level_files =
433           vstorage->LevelFiles(level);
434       if (level_files.empty()) {
435         continue;
436       }
437       if (level_iters_[level - 1] == nullptr) {
438         continue;
439       }
440       uint32_t f_idx = 0;
441       if (!seek_to_first) {
442         f_idx = FindFileInRange(level_files, internal_key, 0,
443                                 static_cast<uint32_t>(level_files.size()));
444       }
445 
446       // Seek
447       if (f_idx < level_files.size()) {
448         level_iters_[level - 1]->SetFileIndex(f_idx);
449         seek_to_first ? level_iters_[level - 1]->SeekToFirst() :
450                         level_iters_[level - 1]->Seek(internal_key);
451 
452         if (!level_iters_[level - 1]->status().ok()) {
453           immutable_status_ = level_iters_[level - 1]->status();
454         } else if (level_iters_[level - 1]->Valid() &&
455                    !IsOverUpperBound(level_iters_[level - 1]->key())) {
456           immutable_min_heap_.push(level_iters_[level - 1]);
457         } else {
458           // Nothing in this level is interesting. Remove.
459           has_iter_trimmed_for_upper_bound_ = true;
460           DeleteIterator(level_iters_[level - 1]);
461           level_iters_[level - 1] = nullptr;
462         }
463       }
464     }
465 
466     if (seek_to_first) {
467       is_prev_set_ = false;
468     } else {
469       prev_key_.SetInternalKey(internal_key);
470       is_prev_set_ = true;
471       is_prev_inclusive_ = true;
472     }
473 
474     TEST_SYNC_POINT_CALLBACK("ForwardIterator::SeekInternal:Immutable", this);
475   } else if (current_ && current_ != mutable_iter_) {
476     // current_ is one of immutable iterators, push it back to the heap
477     immutable_min_heap_.push(current_);
478   }
479 
480   UpdateCurrent();
481   TEST_SYNC_POINT_CALLBACK("ForwardIterator::SeekInternal:Return", this);
482 }
483 
Next()484 void ForwardIterator::Next() {
485   assert(valid_);
486   bool update_prev_key = false;
487 
488   if (sv_ == nullptr ||
489       sv_->version_number != cfd_->GetSuperVersionNumber()) {
490     std::string current_key = key().ToString();
491     Slice old_key(current_key.data(), current_key.size());
492 
493     if (sv_ == nullptr) {
494       RebuildIterators(true);
495     } else {
496       RenewIterators();
497     }
498     SeekInternal(old_key, false);
499     if (!valid_ || key().compare(old_key) != 0) {
500       return;
501     }
502   } else if (current_ != mutable_iter_) {
503     // It is going to advance immutable iterator
504 
505     if (is_prev_set_ && prefix_extractor_) {
506       // advance prev_key_ to current_ only if they share the same prefix
507       update_prev_key =
508           prefix_extractor_->Transform(prev_key_.GetUserKey())
509               .compare(prefix_extractor_->Transform(current_->key())) == 0;
510     } else {
511       update_prev_key = true;
512     }
513 
514 
515     if (update_prev_key) {
516       prev_key_.SetInternalKey(current_->key());
517       is_prev_set_ = true;
518       is_prev_inclusive_ = false;
519     }
520   }
521 
522   current_->Next();
523   if (current_ != mutable_iter_) {
524     if (!current_->status().ok()) {
525       immutable_status_ = current_->status();
526     } else if ((current_->Valid()) && (!IsOverUpperBound(current_->key()))) {
527       immutable_min_heap_.push(current_);
528     } else {
529       if ((current_->Valid()) && (IsOverUpperBound(current_->key()))) {
530         // remove the current iterator
531         DeleteCurrentIter();
532         current_ = nullptr;
533       }
534       if (update_prev_key) {
535         mutable_iter_->Seek(prev_key_.GetInternalKey());
536       }
537     }
538   }
539   UpdateCurrent();
540   TEST_SYNC_POINT_CALLBACK("ForwardIterator::Next:Return", this);
541 }
542 
key() const543 Slice ForwardIterator::key() const {
544   assert(valid_);
545   return current_->key();
546 }
547 
value() const548 Slice ForwardIterator::value() const {
549   assert(valid_);
550   return current_->value();
551 }
552 
status() const553 Status ForwardIterator::status() const {
554   if (!status_.ok()) {
555     return status_;
556   } else if (!mutable_iter_->status().ok()) {
557     return mutable_iter_->status();
558   }
559 
560   return immutable_status_;
561 }
562 
GetProperty(std::string prop_name,std::string * prop)563 Status ForwardIterator::GetProperty(std::string prop_name, std::string* prop) {
564   assert(prop != nullptr);
565   if (prop_name == "rocksdb.iterator.super-version-number") {
566     *prop = ToString(sv_->version_number);
567     return Status::OK();
568   }
569   return Status::InvalidArgument();
570 }
571 
SetPinnedItersMgr(PinnedIteratorsManager * pinned_iters_mgr)572 void ForwardIterator::SetPinnedItersMgr(
573     PinnedIteratorsManager* pinned_iters_mgr) {
574   pinned_iters_mgr_ = pinned_iters_mgr;
575   UpdateChildrenPinnedItersMgr();
576 }
577 
UpdateChildrenPinnedItersMgr()578 void ForwardIterator::UpdateChildrenPinnedItersMgr() {
579   // Set PinnedIteratorsManager for mutable memtable iterator.
580   if (mutable_iter_) {
581     mutable_iter_->SetPinnedItersMgr(pinned_iters_mgr_);
582   }
583 
584   // Set PinnedIteratorsManager for immutable memtable iterators.
585   for (InternalIterator* child_iter : imm_iters_) {
586     if (child_iter) {
587       child_iter->SetPinnedItersMgr(pinned_iters_mgr_);
588     }
589   }
590 
591   // Set PinnedIteratorsManager for L0 files iterators.
592   for (InternalIterator* child_iter : l0_iters_) {
593     if (child_iter) {
594       child_iter->SetPinnedItersMgr(pinned_iters_mgr_);
595     }
596   }
597 
598   // Set PinnedIteratorsManager for L1+ levels iterators.
599   for (ForwardLevelIterator* child_iter : level_iters_) {
600     if (child_iter) {
601       child_iter->SetPinnedItersMgr(pinned_iters_mgr_);
602     }
603   }
604 }
605 
IsKeyPinned() const606 bool ForwardIterator::IsKeyPinned() const {
607   return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
608          current_->IsKeyPinned();
609 }
610 
IsValuePinned() const611 bool ForwardIterator::IsValuePinned() const {
612   return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
613          current_->IsValuePinned();
614 }
615 
RebuildIterators(bool refresh_sv)616 void ForwardIterator::RebuildIterators(bool refresh_sv) {
617   // Clean up
618   Cleanup(refresh_sv);
619   if (refresh_sv) {
620     // New
621     sv_ = cfd_->GetReferencedSuperVersion(db_);
622   }
623   ReadRangeDelAggregator range_del_agg(&cfd_->internal_comparator(),
624                                        kMaxSequenceNumber /* upper_bound */);
625   mutable_iter_ = sv_->mem->NewIterator(read_options_, &arena_);
626   sv_->imm->AddIterators(read_options_, &imm_iters_, &arena_);
627   if (!read_options_.ignore_range_deletions) {
628     std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
629         sv_->mem->NewRangeTombstoneIterator(
630             read_options_, sv_->current->version_set()->LastSequence()));
631     range_del_agg.AddTombstones(std::move(range_del_iter));
632     sv_->imm->AddRangeTombstoneIterators(read_options_, &arena_,
633                                          &range_del_agg);
634   }
635   has_iter_trimmed_for_upper_bound_ = false;
636 
637   const auto* vstorage = sv_->current->storage_info();
638   const auto& l0_files = vstorage->LevelFiles(0);
639   l0_iters_.reserve(l0_files.size());
640   for (const auto* l0 : l0_files) {
641     if ((read_options_.iterate_upper_bound != nullptr) &&
642         cfd_->internal_comparator().user_comparator()->Compare(
643             l0->smallest.user_key(), *read_options_.iterate_upper_bound) > 0) {
644       // No need to set has_iter_trimmed_for_upper_bound_: this ForwardIterator
645       // will never be interested in files with smallest key above
646       // iterate_upper_bound, since iterate_upper_bound can't be changed.
647       l0_iters_.push_back(nullptr);
648       continue;
649     }
650     l0_iters_.push_back(cfd_->table_cache()->NewIterator(
651         read_options_, *cfd_->soptions(), cfd_->internal_comparator(), *l0,
652         read_options_.ignore_range_deletions ? nullptr : &range_del_agg,
653         sv_->mutable_cf_options.prefix_extractor.get(),
654         /*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr,
655         TableReaderCaller::kUserIterator, /*arena=*/nullptr,
656         /*skip_filters=*/false, /*level=*/-1,
657         /*smallest_compaction_key=*/nullptr,
658         /*largest_compaction_key=*/nullptr));
659   }
660   BuildLevelIterators(vstorage);
661   current_ = nullptr;
662   is_prev_set_ = false;
663 
664   UpdateChildrenPinnedItersMgr();
665   if (!range_del_agg.IsEmpty()) {
666     status_ = Status::NotSupported(
667         "Range tombstones unsupported with ForwardIterator");
668     valid_ = false;
669   }
670 }
671 
RenewIterators()672 void ForwardIterator::RenewIterators() {
673   SuperVersion* svnew;
674   assert(sv_);
675   svnew = cfd_->GetReferencedSuperVersion(db_);
676 
677   if (mutable_iter_ != nullptr) {
678     DeleteIterator(mutable_iter_, true /* is_arena */);
679   }
680   for (auto* m : imm_iters_) {
681     DeleteIterator(m, true /* is_arena */);
682   }
683   imm_iters_.clear();
684 
685   mutable_iter_ = svnew->mem->NewIterator(read_options_, &arena_);
686   svnew->imm->AddIterators(read_options_, &imm_iters_, &arena_);
687   ReadRangeDelAggregator range_del_agg(&cfd_->internal_comparator(),
688                                        kMaxSequenceNumber /* upper_bound */);
689   if (!read_options_.ignore_range_deletions) {
690     std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
691         svnew->mem->NewRangeTombstoneIterator(
692             read_options_, sv_->current->version_set()->LastSequence()));
693     range_del_agg.AddTombstones(std::move(range_del_iter));
694     svnew->imm->AddRangeTombstoneIterators(read_options_, &arena_,
695                                            &range_del_agg);
696   }
697 
698   const auto* vstorage = sv_->current->storage_info();
699   const auto& l0_files = vstorage->LevelFiles(0);
700   const auto* vstorage_new = svnew->current->storage_info();
701   const auto& l0_files_new = vstorage_new->LevelFiles(0);
702   size_t iold, inew;
703   bool found;
704   std::vector<InternalIterator*> l0_iters_new;
705   l0_iters_new.reserve(l0_files_new.size());
706 
707   for (inew = 0; inew < l0_files_new.size(); inew++) {
708     found = false;
709     for (iold = 0; iold < l0_files.size(); iold++) {
710       if (l0_files[iold] == l0_files_new[inew]) {
711         found = true;
712         break;
713       }
714     }
715     if (found) {
716       if (l0_iters_[iold] == nullptr) {
717         l0_iters_new.push_back(nullptr);
718         TEST_SYNC_POINT_CALLBACK("ForwardIterator::RenewIterators:Null", this);
719       } else {
720         l0_iters_new.push_back(l0_iters_[iold]);
721         l0_iters_[iold] = nullptr;
722         TEST_SYNC_POINT_CALLBACK("ForwardIterator::RenewIterators:Copy", this);
723       }
724       continue;
725     }
726     l0_iters_new.push_back(cfd_->table_cache()->NewIterator(
727         read_options_, *cfd_->soptions(), cfd_->internal_comparator(),
728         *l0_files_new[inew],
729         read_options_.ignore_range_deletions ? nullptr : &range_del_agg,
730         svnew->mutable_cf_options.prefix_extractor.get(),
731         /*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr,
732         TableReaderCaller::kUserIterator, /*arena=*/nullptr,
733         /*skip_filters=*/false, /*level=*/-1,
734         /*smallest_compaction_key=*/nullptr,
735         /*largest_compaction_key=*/nullptr));
736   }
737 
738   for (auto* f : l0_iters_) {
739     DeleteIterator(f);
740   }
741   l0_iters_.clear();
742   l0_iters_ = l0_iters_new;
743 
744   for (auto* l : level_iters_) {
745     DeleteIterator(l);
746   }
747   level_iters_.clear();
748   BuildLevelIterators(vstorage_new);
749   current_ = nullptr;
750   is_prev_set_ = false;
751   SVCleanup();
752   sv_ = svnew;
753 
754   UpdateChildrenPinnedItersMgr();
755   if (!range_del_agg.IsEmpty()) {
756     status_ = Status::NotSupported(
757         "Range tombstones unsupported with ForwardIterator");
758     valid_ = false;
759   }
760 }
761 
BuildLevelIterators(const VersionStorageInfo * vstorage)762 void ForwardIterator::BuildLevelIterators(const VersionStorageInfo* vstorage) {
763   level_iters_.reserve(vstorage->num_levels() - 1);
764   for (int32_t level = 1; level < vstorage->num_levels(); ++level) {
765     const auto& level_files = vstorage->LevelFiles(level);
766     if ((level_files.empty()) ||
767         ((read_options_.iterate_upper_bound != nullptr) &&
768          (user_comparator_->Compare(*read_options_.iterate_upper_bound,
769                                     level_files[0]->smallest.user_key()) <
770           0))) {
771       level_iters_.push_back(nullptr);
772       if (!level_files.empty()) {
773         has_iter_trimmed_for_upper_bound_ = true;
774       }
775     } else {
776       level_iters_.push_back(new ForwardLevelIterator(
777           cfd_, read_options_, level_files,
778           sv_->mutable_cf_options.prefix_extractor.get()));
779     }
780   }
781 }
782 
ResetIncompleteIterators()783 void ForwardIterator::ResetIncompleteIterators() {
784   const auto& l0_files = sv_->current->storage_info()->LevelFiles(0);
785   for (size_t i = 0; i < l0_iters_.size(); ++i) {
786     assert(i < l0_files.size());
787     if (!l0_iters_[i] || !l0_iters_[i]->status().IsIncomplete()) {
788       continue;
789     }
790     DeleteIterator(l0_iters_[i]);
791     l0_iters_[i] = cfd_->table_cache()->NewIterator(
792         read_options_, *cfd_->soptions(), cfd_->internal_comparator(),
793         *l0_files[i], /*range_del_agg=*/nullptr,
794         sv_->mutable_cf_options.prefix_extractor.get(),
795         /*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr,
796         TableReaderCaller::kUserIterator, /*arena=*/nullptr,
797         /*skip_filters=*/false, /*level=*/-1,
798         /*smallest_compaction_key=*/nullptr,
799         /*largest_compaction_key=*/nullptr);
800     l0_iters_[i]->SetPinnedItersMgr(pinned_iters_mgr_);
801   }
802 
803   for (auto* level_iter : level_iters_) {
804     if (level_iter && level_iter->status().IsIncomplete()) {
805       level_iter->Reset();
806     }
807   }
808 
809   current_ = nullptr;
810   is_prev_set_ = false;
811 }
812 
UpdateCurrent()813 void ForwardIterator::UpdateCurrent() {
814   if (immutable_min_heap_.empty() && !mutable_iter_->Valid()) {
815     current_ = nullptr;
816   } else if (immutable_min_heap_.empty()) {
817     current_ = mutable_iter_;
818   } else if (!mutable_iter_->Valid()) {
819     current_ = immutable_min_heap_.top();
820     immutable_min_heap_.pop();
821   } else {
822     current_ = immutable_min_heap_.top();
823     assert(current_ != nullptr);
824     assert(current_->Valid());
825     int cmp = cfd_->internal_comparator().InternalKeyComparator::Compare(
826         mutable_iter_->key(), current_->key());
827     assert(cmp != 0);
828     if (cmp > 0) {
829       immutable_min_heap_.pop();
830     } else {
831       current_ = mutable_iter_;
832     }
833   }
834   valid_ = current_ != nullptr && immutable_status_.ok();
835   if (!status_.ok()) {
836     status_ = Status::OK();
837   }
838 
839   // Upper bound doesn't apply to the memtable iterator. We want Valid() to
840   // return false when all iterators are over iterate_upper_bound, but can't
841   // just set valid_ to false, as that would effectively disable the tailing
842   // optimization (Seek() would be called on all immutable iterators regardless
843   // of whether the target key is greater than prev_key_).
844   current_over_upper_bound_ = valid_ && IsOverUpperBound(current_->key());
845 }
846 
NeedToSeekImmutable(const Slice & target)847 bool ForwardIterator::NeedToSeekImmutable(const Slice& target) {
848   // We maintain the interval (prev_key_, immutable_min_heap_.top()->key())
849   // such that there are no records with keys within that range in
850   // immutable_min_heap_. Since immutable structures (SST files and immutable
851   // memtables) can't change in this version, we don't need to do a seek if
852   // 'target' belongs to that interval (immutable_min_heap_.top() is already
853   // at the correct position).
854 
855   if (!valid_ || !current_ || !is_prev_set_ || !immutable_status_.ok()) {
856     return true;
857   }
858   Slice prev_key = prev_key_.GetInternalKey();
859   if (prefix_extractor_ && prefix_extractor_->Transform(target).compare(
860     prefix_extractor_->Transform(prev_key)) != 0) {
861     return true;
862   }
863   if (cfd_->internal_comparator().InternalKeyComparator::Compare(
864         prev_key, target) >= (is_prev_inclusive_ ? 1 : 0)) {
865     return true;
866   }
867 
868   if (immutable_min_heap_.empty() && current_ == mutable_iter_) {
869     // Nothing to seek on.
870     return false;
871   }
872   if (cfd_->internal_comparator().InternalKeyComparator::Compare(
873         target, current_ == mutable_iter_ ? immutable_min_heap_.top()->key()
874                                           : current_->key()) > 0) {
875     return true;
876   }
877   return false;
878 }
879 
DeleteCurrentIter()880 void ForwardIterator::DeleteCurrentIter() {
881   const VersionStorageInfo* vstorage = sv_->current->storage_info();
882   const std::vector<FileMetaData*>& l0 = vstorage->LevelFiles(0);
883   for (size_t i = 0; i < l0.size(); ++i) {
884     if (!l0_iters_[i]) {
885       continue;
886     }
887     if (l0_iters_[i] == current_) {
888       has_iter_trimmed_for_upper_bound_ = true;
889       DeleteIterator(l0_iters_[i]);
890       l0_iters_[i] = nullptr;
891       return;
892     }
893   }
894 
895   for (int32_t level = 1; level < vstorage->num_levels(); ++level) {
896     if (level_iters_[level - 1] == nullptr) {
897       continue;
898     }
899     if (level_iters_[level - 1] == current_) {
900       has_iter_trimmed_for_upper_bound_ = true;
901       DeleteIterator(level_iters_[level - 1]);
902       level_iters_[level - 1] = nullptr;
903     }
904   }
905 }
906 
TEST_CheckDeletedIters(int * pdeleted_iters,int * pnum_iters)907 bool ForwardIterator::TEST_CheckDeletedIters(int* pdeleted_iters,
908                                              int* pnum_iters) {
909   bool retval = false;
910   int deleted_iters = 0;
911   int num_iters = 0;
912 
913   const VersionStorageInfo* vstorage = sv_->current->storage_info();
914   const std::vector<FileMetaData*>& l0 = vstorage->LevelFiles(0);
915   for (size_t i = 0; i < l0.size(); ++i) {
916     if (!l0_iters_[i]) {
917       retval = true;
918       deleted_iters++;
919     } else {
920       num_iters++;
921     }
922   }
923 
924   for (int32_t level = 1; level < vstorage->num_levels(); ++level) {
925     if ((level_iters_[level - 1] == nullptr) &&
926         (!vstorage->LevelFiles(level).empty())) {
927       retval = true;
928       deleted_iters++;
929     } else if (!vstorage->LevelFiles(level).empty()) {
930       num_iters++;
931     }
932   }
933   if ((!retval) && num_iters <= 1) {
934     retval = true;
935   }
936   if (pdeleted_iters) {
937     *pdeleted_iters = deleted_iters;
938   }
939   if (pnum_iters) {
940     *pnum_iters = num_iters;
941   }
942   return retval;
943 }
944 
FindFileInRange(const std::vector<FileMetaData * > & files,const Slice & internal_key,uint32_t left,uint32_t right)945 uint32_t ForwardIterator::FindFileInRange(
946     const std::vector<FileMetaData*>& files, const Slice& internal_key,
947     uint32_t left, uint32_t right) {
948   auto cmp = [&](const FileMetaData* f, const Slice& key) -> bool {
949     return cfd_->internal_comparator().InternalKeyComparator::Compare(
950             f->largest.Encode(), key) < 0;
951   };
952   const auto &b = files.begin();
953   return static_cast<uint32_t>(std::lower_bound(b + left,
954                                  b + right, internal_key, cmp) - b);
955 }
956 
DeleteIterator(InternalIterator * iter,bool is_arena)957 void ForwardIterator::DeleteIterator(InternalIterator* iter, bool is_arena) {
958   if (iter == nullptr) {
959     return;
960   }
961 
962   if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
963     pinned_iters_mgr_->PinIterator(iter, is_arena);
964   } else {
965     if (is_arena) {
966       iter->~InternalIterator();
967     } else {
968       delete iter;
969     }
970   }
971 }
972 
973 }  // namespace ROCKSDB_NAMESPACE
974 
975 #endif  // ROCKSDB_LITE
976