1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #ifndef ROCKSDB_LITE
7 #include "db/forward_iterator.h"
8
9 #include <limits>
10 #include <string>
11 #include <utility>
12
13 #include "db/column_family.h"
14 #include "db/db_impl/db_impl.h"
15 #include "db/db_iter.h"
16 #include "db/dbformat.h"
17 #include "db/job_context.h"
18 #include "db/range_del_aggregator.h"
19 #include "db/range_tombstone_fragmenter.h"
20 #include "rocksdb/env.h"
21 #include "rocksdb/slice.h"
22 #include "rocksdb/slice_transform.h"
23 #include "table/merging_iterator.h"
24 #include "test_util/sync_point.h"
25 #include "util/string_util.h"
26
27 namespace ROCKSDB_NAMESPACE {
28
29 // Usage:
30 // ForwardLevelIterator iter;
31 // iter.SetFileIndex(file_index);
32 // iter.Seek(target); // or iter.SeekToFirst();
33 // iter.Next()
34 class ForwardLevelIterator : public InternalIterator {
35 public:
ForwardLevelIterator(const ColumnFamilyData * const cfd,const ReadOptions & read_options,const std::vector<FileMetaData * > & files,const SliceTransform * prefix_extractor)36 ForwardLevelIterator(const ColumnFamilyData* const cfd,
37 const ReadOptions& read_options,
38 const std::vector<FileMetaData*>& files,
39 const SliceTransform* prefix_extractor)
40 : cfd_(cfd),
41 read_options_(read_options),
42 files_(files),
43 valid_(false),
44 file_index_(std::numeric_limits<uint32_t>::max()),
45 file_iter_(nullptr),
46 pinned_iters_mgr_(nullptr),
47 prefix_extractor_(prefix_extractor) {}
48
~ForwardLevelIterator()49 ~ForwardLevelIterator() override {
50 // Reset current pointer
51 if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
52 pinned_iters_mgr_->PinIterator(file_iter_);
53 } else {
54 delete file_iter_;
55 }
56 }
57
SetFileIndex(uint32_t file_index)58 void SetFileIndex(uint32_t file_index) {
59 assert(file_index < files_.size());
60 status_ = Status::OK();
61 if (file_index != file_index_) {
62 file_index_ = file_index;
63 Reset();
64 }
65 }
Reset()66 void Reset() {
67 assert(file_index_ < files_.size());
68
69 // Reset current pointer
70 if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
71 pinned_iters_mgr_->PinIterator(file_iter_);
72 } else {
73 delete file_iter_;
74 }
75
76 ReadRangeDelAggregator range_del_agg(&cfd_->internal_comparator(),
77 kMaxSequenceNumber /* upper_bound */);
78 file_iter_ = cfd_->table_cache()->NewIterator(
79 read_options_, *(cfd_->soptions()), cfd_->internal_comparator(),
80 *files_[file_index_],
81 read_options_.ignore_range_deletions ? nullptr : &range_del_agg,
82 prefix_extractor_, /*table_reader_ptr=*/nullptr,
83 /*file_read_hist=*/nullptr, TableReaderCaller::kUserIterator,
84 /*arena=*/nullptr, /*skip_filters=*/false, /*level=*/-1,
85 /*smallest_compaction_key=*/nullptr,
86 /*largest_compaction_key=*/nullptr);
87 file_iter_->SetPinnedItersMgr(pinned_iters_mgr_);
88 valid_ = false;
89 if (!range_del_agg.IsEmpty()) {
90 status_ = Status::NotSupported(
91 "Range tombstones unsupported with ForwardIterator");
92 }
93 }
SeekToLast()94 void SeekToLast() override {
95 status_ = Status::NotSupported("ForwardLevelIterator::SeekToLast()");
96 valid_ = false;
97 }
Prev()98 void Prev() override {
99 status_ = Status::NotSupported("ForwardLevelIterator::Prev()");
100 valid_ = false;
101 }
Valid() const102 bool Valid() const override {
103 return valid_;
104 }
SeekToFirst()105 void SeekToFirst() override {
106 assert(file_iter_ != nullptr);
107 if (!status_.ok()) {
108 assert(!valid_);
109 return;
110 }
111 file_iter_->SeekToFirst();
112 valid_ = file_iter_->Valid();
113 }
Seek(const Slice & internal_key)114 void Seek(const Slice& internal_key) override {
115 assert(file_iter_ != nullptr);
116
117 // This deviates from the usual convention for InternalIterator::Seek() in
118 // that it doesn't discard pre-existing error status. That's because this
119 // Seek() is only supposed to be called immediately after SetFileIndex()
120 // (which discards pre-existing error status), and SetFileIndex() may set
121 // an error status, which we shouldn't discard.
122 if (!status_.ok()) {
123 assert(!valid_);
124 return;
125 }
126
127 file_iter_->Seek(internal_key);
128 valid_ = file_iter_->Valid();
129 }
SeekForPrev(const Slice &)130 void SeekForPrev(const Slice& /*internal_key*/) override {
131 status_ = Status::NotSupported("ForwardLevelIterator::SeekForPrev()");
132 valid_ = false;
133 }
Next()134 void Next() override {
135 assert(valid_);
136 file_iter_->Next();
137 for (;;) {
138 valid_ = file_iter_->Valid();
139 if (!file_iter_->status().ok()) {
140 assert(!valid_);
141 return;
142 }
143 if (valid_) {
144 return;
145 }
146 if (file_index_ + 1 >= files_.size()) {
147 valid_ = false;
148 return;
149 }
150 SetFileIndex(file_index_ + 1);
151 if (!status_.ok()) {
152 assert(!valid_);
153 return;
154 }
155 file_iter_->SeekToFirst();
156 }
157 }
key() const158 Slice key() const override {
159 assert(valid_);
160 return file_iter_->key();
161 }
value() const162 Slice value() const override {
163 assert(valid_);
164 return file_iter_->value();
165 }
status() const166 Status status() const override {
167 if (!status_.ok()) {
168 return status_;
169 } else if (file_iter_) {
170 return file_iter_->status();
171 }
172 return Status::OK();
173 }
IsKeyPinned() const174 bool IsKeyPinned() const override {
175 return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
176 file_iter_->IsKeyPinned();
177 }
IsValuePinned() const178 bool IsValuePinned() const override {
179 return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
180 file_iter_->IsValuePinned();
181 }
SetPinnedItersMgr(PinnedIteratorsManager * pinned_iters_mgr)182 void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
183 pinned_iters_mgr_ = pinned_iters_mgr;
184 if (file_iter_) {
185 file_iter_->SetPinnedItersMgr(pinned_iters_mgr_);
186 }
187 }
188
189 private:
190 const ColumnFamilyData* const cfd_;
191 const ReadOptions& read_options_;
192 const std::vector<FileMetaData*>& files_;
193
194 bool valid_;
195 uint32_t file_index_;
196 Status status_;
197 InternalIterator* file_iter_;
198 PinnedIteratorsManager* pinned_iters_mgr_;
199 const SliceTransform* prefix_extractor_;
200 };
201
ForwardIterator(DBImpl * db,const ReadOptions & read_options,ColumnFamilyData * cfd,SuperVersion * current_sv)202 ForwardIterator::ForwardIterator(DBImpl* db, const ReadOptions& read_options,
203 ColumnFamilyData* cfd,
204 SuperVersion* current_sv)
205 : db_(db),
206 read_options_(read_options),
207 cfd_(cfd),
208 prefix_extractor_(current_sv->mutable_cf_options.prefix_extractor.get()),
209 user_comparator_(cfd->user_comparator()),
210 immutable_min_heap_(MinIterComparator(&cfd_->internal_comparator())),
211 sv_(current_sv),
212 mutable_iter_(nullptr),
213 current_(nullptr),
214 valid_(false),
215 status_(Status::OK()),
216 immutable_status_(Status::OK()),
217 has_iter_trimmed_for_upper_bound_(false),
218 current_over_upper_bound_(false),
219 is_prev_set_(false),
220 is_prev_inclusive_(false),
221 pinned_iters_mgr_(nullptr) {
222 if (sv_) {
223 RebuildIterators(false);
224 }
225 }
226
~ForwardIterator()227 ForwardIterator::~ForwardIterator() {
228 Cleanup(true);
229 }
230
SVCleanup(DBImpl * db,SuperVersion * sv,bool background_purge_on_iterator_cleanup)231 void ForwardIterator::SVCleanup(DBImpl* db, SuperVersion* sv,
232 bool background_purge_on_iterator_cleanup) {
233 if (sv->Unref()) {
234 // Job id == 0 means that this is not our background process, but rather
235 // user thread
236 JobContext job_context(0);
237 db->mutex_.Lock();
238 sv->Cleanup();
239 db->FindObsoleteFiles(&job_context, false, true);
240 if (background_purge_on_iterator_cleanup) {
241 db->ScheduleBgLogWriterClose(&job_context);
242 db->AddSuperVersionsToFreeQueue(sv);
243 db->SchedulePurge();
244 }
245 db->mutex_.Unlock();
246 if (!background_purge_on_iterator_cleanup) {
247 delete sv;
248 }
249 if (job_context.HaveSomethingToDelete()) {
250 db->PurgeObsoleteFiles(job_context, background_purge_on_iterator_cleanup);
251 }
252 job_context.Clean();
253 }
254 }
255
256 namespace {
257 struct SVCleanupParams {
258 DBImpl* db;
259 SuperVersion* sv;
260 bool background_purge_on_iterator_cleanup;
261 };
262 }
263
264 // Used in PinnedIteratorsManager to release pinned SuperVersion
DeferredSVCleanup(void * arg)265 void ForwardIterator::DeferredSVCleanup(void* arg) {
266 auto d = reinterpret_cast<SVCleanupParams*>(arg);
267 ForwardIterator::SVCleanup(
268 d->db, d->sv, d->background_purge_on_iterator_cleanup);
269 delete d;
270 }
271
SVCleanup()272 void ForwardIterator::SVCleanup() {
273 if (sv_ == nullptr) {
274 return;
275 }
276 bool background_purge =
277 read_options_.background_purge_on_iterator_cleanup ||
278 db_->immutable_db_options().avoid_unnecessary_blocking_io;
279 if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
280 // pinned_iters_mgr_ tells us to make sure that all visited key-value slices
281 // are alive until pinned_iters_mgr_->ReleasePinnedData() is called.
282 // The slices may point into some memtables owned by sv_, so we need to keep
283 // sv_ referenced until pinned_iters_mgr_ unpins everything.
284 auto p = new SVCleanupParams{db_, sv_, background_purge};
285 pinned_iters_mgr_->PinPtr(p, &ForwardIterator::DeferredSVCleanup);
286 } else {
287 SVCleanup(db_, sv_, background_purge);
288 }
289 }
290
Cleanup(bool release_sv)291 void ForwardIterator::Cleanup(bool release_sv) {
292 if (mutable_iter_ != nullptr) {
293 DeleteIterator(mutable_iter_, true /* is_arena */);
294 }
295
296 for (auto* m : imm_iters_) {
297 DeleteIterator(m, true /* is_arena */);
298 }
299 imm_iters_.clear();
300
301 for (auto* f : l0_iters_) {
302 DeleteIterator(f);
303 }
304 l0_iters_.clear();
305
306 for (auto* l : level_iters_) {
307 DeleteIterator(l);
308 }
309 level_iters_.clear();
310
311 if (release_sv) {
312 SVCleanup();
313 }
314 }
315
Valid() const316 bool ForwardIterator::Valid() const {
317 // See UpdateCurrent().
318 return valid_ ? !current_over_upper_bound_ : false;
319 }
320
SeekToFirst()321 void ForwardIterator::SeekToFirst() {
322 if (sv_ == nullptr) {
323 RebuildIterators(true);
324 } else if (sv_->version_number != cfd_->GetSuperVersionNumber()) {
325 RenewIterators();
326 } else if (immutable_status_.IsIncomplete()) {
327 ResetIncompleteIterators();
328 }
329 SeekInternal(Slice(), true);
330 }
331
IsOverUpperBound(const Slice & internal_key) const332 bool ForwardIterator::IsOverUpperBound(const Slice& internal_key) const {
333 return !(read_options_.iterate_upper_bound == nullptr ||
334 cfd_->internal_comparator().user_comparator()->Compare(
335 ExtractUserKey(internal_key),
336 *read_options_.iterate_upper_bound) < 0);
337 }
338
Seek(const Slice & internal_key)339 void ForwardIterator::Seek(const Slice& internal_key) {
340 if (sv_ == nullptr) {
341 RebuildIterators(true);
342 } else if (sv_->version_number != cfd_->GetSuperVersionNumber()) {
343 RenewIterators();
344 } else if (immutable_status_.IsIncomplete()) {
345 ResetIncompleteIterators();
346 }
347 SeekInternal(internal_key, false);
348 }
349
SeekInternal(const Slice & internal_key,bool seek_to_first)350 void ForwardIterator::SeekInternal(const Slice& internal_key,
351 bool seek_to_first) {
352 assert(mutable_iter_);
353 // mutable
354 seek_to_first ? mutable_iter_->SeekToFirst() :
355 mutable_iter_->Seek(internal_key);
356
357 // immutable
358 // TODO(ljin): NeedToSeekImmutable has negative impact on performance
359 // if it turns to need to seek immutable often. We probably want to have
360 // an option to turn it off.
361 if (seek_to_first || NeedToSeekImmutable(internal_key)) {
362 immutable_status_ = Status::OK();
363 if (has_iter_trimmed_for_upper_bound_ &&
364 (
365 // prev_ is not set yet
366 is_prev_set_ == false ||
367 // We are doing SeekToFirst() and internal_key.size() = 0
368 seek_to_first ||
369 // prev_key_ > internal_key
370 cfd_->internal_comparator().InternalKeyComparator::Compare(
371 prev_key_.GetInternalKey(), internal_key) > 0)) {
372 // Some iterators are trimmed. Need to rebuild.
373 RebuildIterators(true);
374 // Already seeked mutable iter, so seek again
375 seek_to_first ? mutable_iter_->SeekToFirst()
376 : mutable_iter_->Seek(internal_key);
377 }
378 {
379 auto tmp = MinIterHeap(MinIterComparator(&cfd_->internal_comparator()));
380 immutable_min_heap_.swap(tmp);
381 }
382 for (size_t i = 0; i < imm_iters_.size(); i++) {
383 auto* m = imm_iters_[i];
384 seek_to_first ? m->SeekToFirst() : m->Seek(internal_key);
385 if (!m->status().ok()) {
386 immutable_status_ = m->status();
387 } else if (m->Valid()) {
388 immutable_min_heap_.push(m);
389 }
390 }
391
392 Slice target_user_key;
393 if (!seek_to_first) {
394 target_user_key = ExtractUserKey(internal_key);
395 }
396 const VersionStorageInfo* vstorage = sv_->current->storage_info();
397 const std::vector<FileMetaData*>& l0 = vstorage->LevelFiles(0);
398 for (size_t i = 0; i < l0.size(); ++i) {
399 if (!l0_iters_[i]) {
400 continue;
401 }
402 if (seek_to_first) {
403 l0_iters_[i]->SeekToFirst();
404 } else {
405 // If the target key passes over the larget key, we are sure Next()
406 // won't go over this file.
407 if (user_comparator_->Compare(target_user_key,
408 l0[i]->largest.user_key()) > 0) {
409 if (read_options_.iterate_upper_bound != nullptr) {
410 has_iter_trimmed_for_upper_bound_ = true;
411 DeleteIterator(l0_iters_[i]);
412 l0_iters_[i] = nullptr;
413 }
414 continue;
415 }
416 l0_iters_[i]->Seek(internal_key);
417 }
418
419 if (!l0_iters_[i]->status().ok()) {
420 immutable_status_ = l0_iters_[i]->status();
421 } else if (l0_iters_[i]->Valid() &&
422 !IsOverUpperBound(l0_iters_[i]->key())) {
423 immutable_min_heap_.push(l0_iters_[i]);
424 } else {
425 has_iter_trimmed_for_upper_bound_ = true;
426 DeleteIterator(l0_iters_[i]);
427 l0_iters_[i] = nullptr;
428 }
429 }
430
431 for (int32_t level = 1; level < vstorage->num_levels(); ++level) {
432 const std::vector<FileMetaData*>& level_files =
433 vstorage->LevelFiles(level);
434 if (level_files.empty()) {
435 continue;
436 }
437 if (level_iters_[level - 1] == nullptr) {
438 continue;
439 }
440 uint32_t f_idx = 0;
441 if (!seek_to_first) {
442 f_idx = FindFileInRange(level_files, internal_key, 0,
443 static_cast<uint32_t>(level_files.size()));
444 }
445
446 // Seek
447 if (f_idx < level_files.size()) {
448 level_iters_[level - 1]->SetFileIndex(f_idx);
449 seek_to_first ? level_iters_[level - 1]->SeekToFirst() :
450 level_iters_[level - 1]->Seek(internal_key);
451
452 if (!level_iters_[level - 1]->status().ok()) {
453 immutable_status_ = level_iters_[level - 1]->status();
454 } else if (level_iters_[level - 1]->Valid() &&
455 !IsOverUpperBound(level_iters_[level - 1]->key())) {
456 immutable_min_heap_.push(level_iters_[level - 1]);
457 } else {
458 // Nothing in this level is interesting. Remove.
459 has_iter_trimmed_for_upper_bound_ = true;
460 DeleteIterator(level_iters_[level - 1]);
461 level_iters_[level - 1] = nullptr;
462 }
463 }
464 }
465
466 if (seek_to_first) {
467 is_prev_set_ = false;
468 } else {
469 prev_key_.SetInternalKey(internal_key);
470 is_prev_set_ = true;
471 is_prev_inclusive_ = true;
472 }
473
474 TEST_SYNC_POINT_CALLBACK("ForwardIterator::SeekInternal:Immutable", this);
475 } else if (current_ && current_ != mutable_iter_) {
476 // current_ is one of immutable iterators, push it back to the heap
477 immutable_min_heap_.push(current_);
478 }
479
480 UpdateCurrent();
481 TEST_SYNC_POINT_CALLBACK("ForwardIterator::SeekInternal:Return", this);
482 }
483
Next()484 void ForwardIterator::Next() {
485 assert(valid_);
486 bool update_prev_key = false;
487
488 if (sv_ == nullptr ||
489 sv_->version_number != cfd_->GetSuperVersionNumber()) {
490 std::string current_key = key().ToString();
491 Slice old_key(current_key.data(), current_key.size());
492
493 if (sv_ == nullptr) {
494 RebuildIterators(true);
495 } else {
496 RenewIterators();
497 }
498 SeekInternal(old_key, false);
499 if (!valid_ || key().compare(old_key) != 0) {
500 return;
501 }
502 } else if (current_ != mutable_iter_) {
503 // It is going to advance immutable iterator
504
505 if (is_prev_set_ && prefix_extractor_) {
506 // advance prev_key_ to current_ only if they share the same prefix
507 update_prev_key =
508 prefix_extractor_->Transform(prev_key_.GetUserKey())
509 .compare(prefix_extractor_->Transform(current_->key())) == 0;
510 } else {
511 update_prev_key = true;
512 }
513
514
515 if (update_prev_key) {
516 prev_key_.SetInternalKey(current_->key());
517 is_prev_set_ = true;
518 is_prev_inclusive_ = false;
519 }
520 }
521
522 current_->Next();
523 if (current_ != mutable_iter_) {
524 if (!current_->status().ok()) {
525 immutable_status_ = current_->status();
526 } else if ((current_->Valid()) && (!IsOverUpperBound(current_->key()))) {
527 immutable_min_heap_.push(current_);
528 } else {
529 if ((current_->Valid()) && (IsOverUpperBound(current_->key()))) {
530 // remove the current iterator
531 DeleteCurrentIter();
532 current_ = nullptr;
533 }
534 if (update_prev_key) {
535 mutable_iter_->Seek(prev_key_.GetInternalKey());
536 }
537 }
538 }
539 UpdateCurrent();
540 TEST_SYNC_POINT_CALLBACK("ForwardIterator::Next:Return", this);
541 }
542
key() const543 Slice ForwardIterator::key() const {
544 assert(valid_);
545 return current_->key();
546 }
547
value() const548 Slice ForwardIterator::value() const {
549 assert(valid_);
550 return current_->value();
551 }
552
status() const553 Status ForwardIterator::status() const {
554 if (!status_.ok()) {
555 return status_;
556 } else if (!mutable_iter_->status().ok()) {
557 return mutable_iter_->status();
558 }
559
560 return immutable_status_;
561 }
562
GetProperty(std::string prop_name,std::string * prop)563 Status ForwardIterator::GetProperty(std::string prop_name, std::string* prop) {
564 assert(prop != nullptr);
565 if (prop_name == "rocksdb.iterator.super-version-number") {
566 *prop = ToString(sv_->version_number);
567 return Status::OK();
568 }
569 return Status::InvalidArgument();
570 }
571
SetPinnedItersMgr(PinnedIteratorsManager * pinned_iters_mgr)572 void ForwardIterator::SetPinnedItersMgr(
573 PinnedIteratorsManager* pinned_iters_mgr) {
574 pinned_iters_mgr_ = pinned_iters_mgr;
575 UpdateChildrenPinnedItersMgr();
576 }
577
UpdateChildrenPinnedItersMgr()578 void ForwardIterator::UpdateChildrenPinnedItersMgr() {
579 // Set PinnedIteratorsManager for mutable memtable iterator.
580 if (mutable_iter_) {
581 mutable_iter_->SetPinnedItersMgr(pinned_iters_mgr_);
582 }
583
584 // Set PinnedIteratorsManager for immutable memtable iterators.
585 for (InternalIterator* child_iter : imm_iters_) {
586 if (child_iter) {
587 child_iter->SetPinnedItersMgr(pinned_iters_mgr_);
588 }
589 }
590
591 // Set PinnedIteratorsManager for L0 files iterators.
592 for (InternalIterator* child_iter : l0_iters_) {
593 if (child_iter) {
594 child_iter->SetPinnedItersMgr(pinned_iters_mgr_);
595 }
596 }
597
598 // Set PinnedIteratorsManager for L1+ levels iterators.
599 for (ForwardLevelIterator* child_iter : level_iters_) {
600 if (child_iter) {
601 child_iter->SetPinnedItersMgr(pinned_iters_mgr_);
602 }
603 }
604 }
605
IsKeyPinned() const606 bool ForwardIterator::IsKeyPinned() const {
607 return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
608 current_->IsKeyPinned();
609 }
610
IsValuePinned() const611 bool ForwardIterator::IsValuePinned() const {
612 return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
613 current_->IsValuePinned();
614 }
615
RebuildIterators(bool refresh_sv)616 void ForwardIterator::RebuildIterators(bool refresh_sv) {
617 // Clean up
618 Cleanup(refresh_sv);
619 if (refresh_sv) {
620 // New
621 sv_ = cfd_->GetReferencedSuperVersion(db_);
622 }
623 ReadRangeDelAggregator range_del_agg(&cfd_->internal_comparator(),
624 kMaxSequenceNumber /* upper_bound */);
625 mutable_iter_ = sv_->mem->NewIterator(read_options_, &arena_);
626 sv_->imm->AddIterators(read_options_, &imm_iters_, &arena_);
627 if (!read_options_.ignore_range_deletions) {
628 std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
629 sv_->mem->NewRangeTombstoneIterator(
630 read_options_, sv_->current->version_set()->LastSequence()));
631 range_del_agg.AddTombstones(std::move(range_del_iter));
632 sv_->imm->AddRangeTombstoneIterators(read_options_, &arena_,
633 &range_del_agg);
634 }
635 has_iter_trimmed_for_upper_bound_ = false;
636
637 const auto* vstorage = sv_->current->storage_info();
638 const auto& l0_files = vstorage->LevelFiles(0);
639 l0_iters_.reserve(l0_files.size());
640 for (const auto* l0 : l0_files) {
641 if ((read_options_.iterate_upper_bound != nullptr) &&
642 cfd_->internal_comparator().user_comparator()->Compare(
643 l0->smallest.user_key(), *read_options_.iterate_upper_bound) > 0) {
644 // No need to set has_iter_trimmed_for_upper_bound_: this ForwardIterator
645 // will never be interested in files with smallest key above
646 // iterate_upper_bound, since iterate_upper_bound can't be changed.
647 l0_iters_.push_back(nullptr);
648 continue;
649 }
650 l0_iters_.push_back(cfd_->table_cache()->NewIterator(
651 read_options_, *cfd_->soptions(), cfd_->internal_comparator(), *l0,
652 read_options_.ignore_range_deletions ? nullptr : &range_del_agg,
653 sv_->mutable_cf_options.prefix_extractor.get(),
654 /*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr,
655 TableReaderCaller::kUserIterator, /*arena=*/nullptr,
656 /*skip_filters=*/false, /*level=*/-1,
657 /*smallest_compaction_key=*/nullptr,
658 /*largest_compaction_key=*/nullptr));
659 }
660 BuildLevelIterators(vstorage);
661 current_ = nullptr;
662 is_prev_set_ = false;
663
664 UpdateChildrenPinnedItersMgr();
665 if (!range_del_agg.IsEmpty()) {
666 status_ = Status::NotSupported(
667 "Range tombstones unsupported with ForwardIterator");
668 valid_ = false;
669 }
670 }
671
RenewIterators()672 void ForwardIterator::RenewIterators() {
673 SuperVersion* svnew;
674 assert(sv_);
675 svnew = cfd_->GetReferencedSuperVersion(db_);
676
677 if (mutable_iter_ != nullptr) {
678 DeleteIterator(mutable_iter_, true /* is_arena */);
679 }
680 for (auto* m : imm_iters_) {
681 DeleteIterator(m, true /* is_arena */);
682 }
683 imm_iters_.clear();
684
685 mutable_iter_ = svnew->mem->NewIterator(read_options_, &arena_);
686 svnew->imm->AddIterators(read_options_, &imm_iters_, &arena_);
687 ReadRangeDelAggregator range_del_agg(&cfd_->internal_comparator(),
688 kMaxSequenceNumber /* upper_bound */);
689 if (!read_options_.ignore_range_deletions) {
690 std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
691 svnew->mem->NewRangeTombstoneIterator(
692 read_options_, sv_->current->version_set()->LastSequence()));
693 range_del_agg.AddTombstones(std::move(range_del_iter));
694 svnew->imm->AddRangeTombstoneIterators(read_options_, &arena_,
695 &range_del_agg);
696 }
697
698 const auto* vstorage = sv_->current->storage_info();
699 const auto& l0_files = vstorage->LevelFiles(0);
700 const auto* vstorage_new = svnew->current->storage_info();
701 const auto& l0_files_new = vstorage_new->LevelFiles(0);
702 size_t iold, inew;
703 bool found;
704 std::vector<InternalIterator*> l0_iters_new;
705 l0_iters_new.reserve(l0_files_new.size());
706
707 for (inew = 0; inew < l0_files_new.size(); inew++) {
708 found = false;
709 for (iold = 0; iold < l0_files.size(); iold++) {
710 if (l0_files[iold] == l0_files_new[inew]) {
711 found = true;
712 break;
713 }
714 }
715 if (found) {
716 if (l0_iters_[iold] == nullptr) {
717 l0_iters_new.push_back(nullptr);
718 TEST_SYNC_POINT_CALLBACK("ForwardIterator::RenewIterators:Null", this);
719 } else {
720 l0_iters_new.push_back(l0_iters_[iold]);
721 l0_iters_[iold] = nullptr;
722 TEST_SYNC_POINT_CALLBACK("ForwardIterator::RenewIterators:Copy", this);
723 }
724 continue;
725 }
726 l0_iters_new.push_back(cfd_->table_cache()->NewIterator(
727 read_options_, *cfd_->soptions(), cfd_->internal_comparator(),
728 *l0_files_new[inew],
729 read_options_.ignore_range_deletions ? nullptr : &range_del_agg,
730 svnew->mutable_cf_options.prefix_extractor.get(),
731 /*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr,
732 TableReaderCaller::kUserIterator, /*arena=*/nullptr,
733 /*skip_filters=*/false, /*level=*/-1,
734 /*smallest_compaction_key=*/nullptr,
735 /*largest_compaction_key=*/nullptr));
736 }
737
738 for (auto* f : l0_iters_) {
739 DeleteIterator(f);
740 }
741 l0_iters_.clear();
742 l0_iters_ = l0_iters_new;
743
744 for (auto* l : level_iters_) {
745 DeleteIterator(l);
746 }
747 level_iters_.clear();
748 BuildLevelIterators(vstorage_new);
749 current_ = nullptr;
750 is_prev_set_ = false;
751 SVCleanup();
752 sv_ = svnew;
753
754 UpdateChildrenPinnedItersMgr();
755 if (!range_del_agg.IsEmpty()) {
756 status_ = Status::NotSupported(
757 "Range tombstones unsupported with ForwardIterator");
758 valid_ = false;
759 }
760 }
761
BuildLevelIterators(const VersionStorageInfo * vstorage)762 void ForwardIterator::BuildLevelIterators(const VersionStorageInfo* vstorage) {
763 level_iters_.reserve(vstorage->num_levels() - 1);
764 for (int32_t level = 1; level < vstorage->num_levels(); ++level) {
765 const auto& level_files = vstorage->LevelFiles(level);
766 if ((level_files.empty()) ||
767 ((read_options_.iterate_upper_bound != nullptr) &&
768 (user_comparator_->Compare(*read_options_.iterate_upper_bound,
769 level_files[0]->smallest.user_key()) <
770 0))) {
771 level_iters_.push_back(nullptr);
772 if (!level_files.empty()) {
773 has_iter_trimmed_for_upper_bound_ = true;
774 }
775 } else {
776 level_iters_.push_back(new ForwardLevelIterator(
777 cfd_, read_options_, level_files,
778 sv_->mutable_cf_options.prefix_extractor.get()));
779 }
780 }
781 }
782
ResetIncompleteIterators()783 void ForwardIterator::ResetIncompleteIterators() {
784 const auto& l0_files = sv_->current->storage_info()->LevelFiles(0);
785 for (size_t i = 0; i < l0_iters_.size(); ++i) {
786 assert(i < l0_files.size());
787 if (!l0_iters_[i] || !l0_iters_[i]->status().IsIncomplete()) {
788 continue;
789 }
790 DeleteIterator(l0_iters_[i]);
791 l0_iters_[i] = cfd_->table_cache()->NewIterator(
792 read_options_, *cfd_->soptions(), cfd_->internal_comparator(),
793 *l0_files[i], /*range_del_agg=*/nullptr,
794 sv_->mutable_cf_options.prefix_extractor.get(),
795 /*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr,
796 TableReaderCaller::kUserIterator, /*arena=*/nullptr,
797 /*skip_filters=*/false, /*level=*/-1,
798 /*smallest_compaction_key=*/nullptr,
799 /*largest_compaction_key=*/nullptr);
800 l0_iters_[i]->SetPinnedItersMgr(pinned_iters_mgr_);
801 }
802
803 for (auto* level_iter : level_iters_) {
804 if (level_iter && level_iter->status().IsIncomplete()) {
805 level_iter->Reset();
806 }
807 }
808
809 current_ = nullptr;
810 is_prev_set_ = false;
811 }
812
UpdateCurrent()813 void ForwardIterator::UpdateCurrent() {
814 if (immutable_min_heap_.empty() && !mutable_iter_->Valid()) {
815 current_ = nullptr;
816 } else if (immutable_min_heap_.empty()) {
817 current_ = mutable_iter_;
818 } else if (!mutable_iter_->Valid()) {
819 current_ = immutable_min_heap_.top();
820 immutable_min_heap_.pop();
821 } else {
822 current_ = immutable_min_heap_.top();
823 assert(current_ != nullptr);
824 assert(current_->Valid());
825 int cmp = cfd_->internal_comparator().InternalKeyComparator::Compare(
826 mutable_iter_->key(), current_->key());
827 assert(cmp != 0);
828 if (cmp > 0) {
829 immutable_min_heap_.pop();
830 } else {
831 current_ = mutable_iter_;
832 }
833 }
834 valid_ = current_ != nullptr && immutable_status_.ok();
835 if (!status_.ok()) {
836 status_ = Status::OK();
837 }
838
839 // Upper bound doesn't apply to the memtable iterator. We want Valid() to
840 // return false when all iterators are over iterate_upper_bound, but can't
841 // just set valid_ to false, as that would effectively disable the tailing
842 // optimization (Seek() would be called on all immutable iterators regardless
843 // of whether the target key is greater than prev_key_).
844 current_over_upper_bound_ = valid_ && IsOverUpperBound(current_->key());
845 }
846
NeedToSeekImmutable(const Slice & target)847 bool ForwardIterator::NeedToSeekImmutable(const Slice& target) {
848 // We maintain the interval (prev_key_, immutable_min_heap_.top()->key())
849 // such that there are no records with keys within that range in
850 // immutable_min_heap_. Since immutable structures (SST files and immutable
851 // memtables) can't change in this version, we don't need to do a seek if
852 // 'target' belongs to that interval (immutable_min_heap_.top() is already
853 // at the correct position).
854
855 if (!valid_ || !current_ || !is_prev_set_ || !immutable_status_.ok()) {
856 return true;
857 }
858 Slice prev_key = prev_key_.GetInternalKey();
859 if (prefix_extractor_ && prefix_extractor_->Transform(target).compare(
860 prefix_extractor_->Transform(prev_key)) != 0) {
861 return true;
862 }
863 if (cfd_->internal_comparator().InternalKeyComparator::Compare(
864 prev_key, target) >= (is_prev_inclusive_ ? 1 : 0)) {
865 return true;
866 }
867
868 if (immutable_min_heap_.empty() && current_ == mutable_iter_) {
869 // Nothing to seek on.
870 return false;
871 }
872 if (cfd_->internal_comparator().InternalKeyComparator::Compare(
873 target, current_ == mutable_iter_ ? immutable_min_heap_.top()->key()
874 : current_->key()) > 0) {
875 return true;
876 }
877 return false;
878 }
879
DeleteCurrentIter()880 void ForwardIterator::DeleteCurrentIter() {
881 const VersionStorageInfo* vstorage = sv_->current->storage_info();
882 const std::vector<FileMetaData*>& l0 = vstorage->LevelFiles(0);
883 for (size_t i = 0; i < l0.size(); ++i) {
884 if (!l0_iters_[i]) {
885 continue;
886 }
887 if (l0_iters_[i] == current_) {
888 has_iter_trimmed_for_upper_bound_ = true;
889 DeleteIterator(l0_iters_[i]);
890 l0_iters_[i] = nullptr;
891 return;
892 }
893 }
894
895 for (int32_t level = 1; level < vstorage->num_levels(); ++level) {
896 if (level_iters_[level - 1] == nullptr) {
897 continue;
898 }
899 if (level_iters_[level - 1] == current_) {
900 has_iter_trimmed_for_upper_bound_ = true;
901 DeleteIterator(level_iters_[level - 1]);
902 level_iters_[level - 1] = nullptr;
903 }
904 }
905 }
906
TEST_CheckDeletedIters(int * pdeleted_iters,int * pnum_iters)907 bool ForwardIterator::TEST_CheckDeletedIters(int* pdeleted_iters,
908 int* pnum_iters) {
909 bool retval = false;
910 int deleted_iters = 0;
911 int num_iters = 0;
912
913 const VersionStorageInfo* vstorage = sv_->current->storage_info();
914 const std::vector<FileMetaData*>& l0 = vstorage->LevelFiles(0);
915 for (size_t i = 0; i < l0.size(); ++i) {
916 if (!l0_iters_[i]) {
917 retval = true;
918 deleted_iters++;
919 } else {
920 num_iters++;
921 }
922 }
923
924 for (int32_t level = 1; level < vstorage->num_levels(); ++level) {
925 if ((level_iters_[level - 1] == nullptr) &&
926 (!vstorage->LevelFiles(level).empty())) {
927 retval = true;
928 deleted_iters++;
929 } else if (!vstorage->LevelFiles(level).empty()) {
930 num_iters++;
931 }
932 }
933 if ((!retval) && num_iters <= 1) {
934 retval = true;
935 }
936 if (pdeleted_iters) {
937 *pdeleted_iters = deleted_iters;
938 }
939 if (pnum_iters) {
940 *pnum_iters = num_iters;
941 }
942 return retval;
943 }
944
FindFileInRange(const std::vector<FileMetaData * > & files,const Slice & internal_key,uint32_t left,uint32_t right)945 uint32_t ForwardIterator::FindFileInRange(
946 const std::vector<FileMetaData*>& files, const Slice& internal_key,
947 uint32_t left, uint32_t right) {
948 auto cmp = [&](const FileMetaData* f, const Slice& key) -> bool {
949 return cfd_->internal_comparator().InternalKeyComparator::Compare(
950 f->largest.Encode(), key) < 0;
951 };
952 const auto &b = files.begin();
953 return static_cast<uint32_t>(std::lower_bound(b + left,
954 b + right, internal_key, cmp) - b);
955 }
956
DeleteIterator(InternalIterator * iter,bool is_arena)957 void ForwardIterator::DeleteIterator(InternalIterator* iter, bool is_arena) {
958 if (iter == nullptr) {
959 return;
960 }
961
962 if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
963 pinned_iters_mgr_->PinIterator(iter, is_arena);
964 } else {
965 if (is_arena) {
966 iter->~InternalIterator();
967 } else {
968 delete iter;
969 }
970 }
971 }
972
973 } // namespace ROCKSDB_NAMESPACE
974
975 #endif // ROCKSDB_LITE
976