1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "db/db_iter.h"
11 #include <string>
12 #include <iostream>
13 #include <limits>
14
15 #include "db/dbformat.h"
16 #include "db/merge_context.h"
17 #include "db/merge_helper.h"
18 #include "db/pinned_iterators_manager.h"
19 #include "file/filename.h"
20 #include "logging/logging.h"
21 #include "memory/arena.h"
22 #include "monitoring/perf_context_imp.h"
23 #include "rocksdb/env.h"
24 #include "rocksdb/iterator.h"
25 #include "rocksdb/merge_operator.h"
26 #include "rocksdb/options.h"
27 #include "table/internal_iterator.h"
28 #include "table/iterator_wrapper.h"
29 #include "trace_replay/trace_replay.h"
30 #include "util/mutexlock.h"
31 #include "util/string_util.h"
32 #include "util/user_comparator_wrapper.h"
33
34 namespace rocksdb {
35
36 #if 0
37 static void DumpInternalIter(Iterator* iter) {
38 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
39 ParsedInternalKey k;
40 if (!ParseInternalKey(iter->key(), &k)) {
41 fprintf(stderr, "Corrupt '%s'\n", EscapeString(iter->key()).c_str());
42 } else {
43 fprintf(stderr, "@ '%s'\n", k.DebugString().c_str());
44 }
45 }
46 }
47 #endif
48
DBIter(Env * _env,const ReadOptions & read_options,const ImmutableCFOptions & cf_options,const MutableCFOptions & mutable_cf_options,const Comparator * cmp,InternalIterator * iter,SequenceNumber s,bool arena_mode,uint64_t max_sequential_skip_in_iterations,ReadCallback * read_callback,DBImpl * db_impl,ColumnFamilyData * cfd,bool allow_blob)49 DBIter::DBIter(Env* _env, const ReadOptions& read_options,
50 const ImmutableCFOptions& cf_options,
51 const MutableCFOptions& mutable_cf_options,
52 const Comparator* cmp, InternalIterator* iter, SequenceNumber s,
53 bool arena_mode, uint64_t max_sequential_skip_in_iterations,
54 ReadCallback* read_callback, DBImpl* db_impl,
55 ColumnFamilyData* cfd, bool allow_blob)
56 : prefix_extractor_(mutable_cf_options.prefix_extractor.get()),
57 env_(_env),
58 logger_(cf_options.info_log),
59 user_comparator_(cmp),
60 merge_operator_(cf_options.merge_operator),
61 iter_(iter),
62 read_callback_(read_callback),
63 sequence_(s),
64 statistics_(cf_options.statistics),
65 num_internal_keys_skipped_(0),
66 iterate_lower_bound_(read_options.iterate_lower_bound),
67 iterate_upper_bound_(read_options.iterate_upper_bound),
68 direction_(kForward),
69 valid_(false),
70 current_entry_is_merged_(false),
71 is_key_seqnum_zero_(false),
72 prefix_same_as_start_(mutable_cf_options.prefix_extractor
73 ? read_options.prefix_same_as_start
74 : false),
75 pin_thru_lifetime_(read_options.pin_data),
76 total_order_seek_(read_options.total_order_seek),
77 allow_blob_(allow_blob),
78 is_blob_(false),
79 arena_mode_(arena_mode),
80 range_del_agg_(&cf_options.internal_comparator, s),
81 db_impl_(db_impl),
82 cfd_(cfd),
83 start_seqnum_(read_options.iter_start_seqnum) {
84 RecordTick(statistics_, NO_ITERATOR_CREATED);
85 max_skip_ = max_sequential_skip_in_iterations;
86 max_skippable_internal_keys_ = read_options.max_skippable_internal_keys;
87 if (pin_thru_lifetime_) {
88 pinned_iters_mgr_.StartPinning();
89 }
90 if (iter_.iter()) {
91 iter_.iter()->SetPinnedItersMgr(&pinned_iters_mgr_);
92 }
93 }
94
GetProperty(std::string prop_name,std::string * prop)95 Status DBIter::GetProperty(std::string prop_name, std::string* prop) {
96 if (prop == nullptr) {
97 return Status::InvalidArgument("prop is nullptr");
98 }
99 if (prop_name == "rocksdb.iterator.super-version-number") {
100 // First try to pass the value returned from inner iterator.
101 return iter_.iter()->GetProperty(prop_name, prop);
102 } else if (prop_name == "rocksdb.iterator.is-key-pinned") {
103 if (valid_) {
104 *prop = (pin_thru_lifetime_ && saved_key_.IsKeyPinned()) ? "1" : "0";
105 } else {
106 *prop = "Iterator is not valid.";
107 }
108 return Status::OK();
109 } else if (prop_name == "rocksdb.iterator.internal-key") {
110 *prop = saved_key_.GetUserKey().ToString();
111 return Status::OK();
112 }
113 return Status::InvalidArgument("Unidentified property.");
114 }
115
ParseKey(ParsedInternalKey * ikey)116 bool DBIter::ParseKey(ParsedInternalKey* ikey) {
117 if (!ParseInternalKey(iter_.key(), ikey)) {
118 status_ = Status::Corruption("corrupted internal key in DBIter");
119 valid_ = false;
120 ROCKS_LOG_ERROR(logger_, "corrupted internal key in DBIter: %s",
121 iter_.key().ToString(true).c_str());
122 return false;
123 } else {
124 return true;
125 }
126 }
127
Next()128 void DBIter::Next() {
129 assert(valid_);
130 assert(status_.ok());
131
132 PERF_CPU_TIMER_GUARD(iter_next_cpu_nanos, env_);
133 // Release temporarily pinned blocks from last operation
134 ReleaseTempPinnedData();
135 local_stats_.skip_count_ += num_internal_keys_skipped_;
136 local_stats_.skip_count_--;
137 num_internal_keys_skipped_ = 0;
138 bool ok = true;
139 if (direction_ == kReverse) {
140 is_key_seqnum_zero_ = false;
141 if (!ReverseToForward()) {
142 ok = false;
143 }
144 } else if (!current_entry_is_merged_) {
145 // If the current value is not a merge, the iter position is the
146 // current key, which is already returned. We can safely issue a
147 // Next() without checking the current key.
148 // If the current key is a merge, very likely iter already points
149 // to the next internal position.
150 assert(iter_.Valid());
151 iter_.Next();
152 PERF_COUNTER_ADD(internal_key_skipped_count, 1);
153 }
154
155 local_stats_.next_count_++;
156 if (ok && iter_.Valid()) {
157 Slice prefix;
158 if (prefix_same_as_start_) {
159 assert(prefix_extractor_ != nullptr);
160 prefix = prefix_.GetUserKey();
161 }
162 FindNextUserEntry(true /* skipping the current user key */,
163 prefix_same_as_start_ ? &prefix : nullptr);
164 } else {
165 is_key_seqnum_zero_ = false;
166 valid_ = false;
167 }
168 if (statistics_ != nullptr && valid_) {
169 local_stats_.next_found_count_++;
170 local_stats_.bytes_read_ += (key().size() + value().size());
171 }
172 }
173
174 // PRE: saved_key_ has the current user key if skipping_saved_key
175 // POST: saved_key_ should have the next user key if valid_,
176 // if the current entry is a result of merge
177 // current_entry_is_merged_ => true
178 // saved_value_ => the merged value
179 //
180 // NOTE: In between, saved_key_ can point to a user key that has
181 // a delete marker or a sequence number higher than sequence_
182 // saved_key_ MUST have a proper user_key before calling this function
183 //
184 // The prefix parameter, if not null, indicates that we need to iterator
185 // within the prefix, and the iterator needs to be made invalid, if no
186 // more entry for the prefix can be found.
FindNextUserEntry(bool skipping_saved_key,const Slice * prefix)187 bool DBIter::FindNextUserEntry(bool skipping_saved_key, const Slice* prefix) {
188 PERF_TIMER_GUARD(find_next_user_entry_time);
189 return FindNextUserEntryInternal(skipping_saved_key, prefix);
190 }
191
192 // Actual implementation of DBIter::FindNextUserEntry()
FindNextUserEntryInternal(bool skipping_saved_key,const Slice * prefix)193 bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
194 const Slice* prefix) {
195 // Loop until we hit an acceptable entry to yield
196 assert(iter_.Valid());
197 assert(status_.ok());
198 assert(direction_ == kForward);
199 current_entry_is_merged_ = false;
200
201 // How many times in a row we have skipped an entry with user key less than
202 // or equal to saved_key_. We could skip these entries either because
203 // sequence numbers were too high or because skipping_saved_key = true.
204 // What saved_key_ contains throughout this method:
205 // - if skipping_saved_key : saved_key_ contains the key that we need
206 // to skip,
207 // and we haven't seen any keys greater than that,
208 // - if num_skipped > 0 : saved_key_ contains the key that we have skipped
209 // num_skipped times, and we haven't seen any keys
210 // greater than that,
211 // - none of the above : saved_key_ can contain anything, it doesn't matter.
212 uint64_t num_skipped = 0;
213 // For write unprepared, the target sequence number in reseek could be larger
214 // than the snapshot, and thus needs to be skipped again. This could result in
215 // an infinite loop of reseeks. To avoid that, we limit the number of reseeks
216 // to one.
217 bool reseek_done = false;
218
219 is_blob_ = false;
220
221 do {
222 // Will update is_key_seqnum_zero_ as soon as we parsed the current key
223 // but we need to save the previous value to be used in the loop.
224 bool is_prev_key_seqnum_zero = is_key_seqnum_zero_;
225 if (!ParseKey(&ikey_)) {
226 is_key_seqnum_zero_ = false;
227 return false;
228 }
229
230 is_key_seqnum_zero_ = (ikey_.sequence == 0);
231
232 assert(iterate_upper_bound_ == nullptr || iter_.MayBeOutOfUpperBound() ||
233 user_comparator_.Compare(ikey_.user_key, *iterate_upper_bound_) < 0);
234 if (iterate_upper_bound_ != nullptr && iter_.MayBeOutOfUpperBound() &&
235 user_comparator_.Compare(ikey_.user_key, *iterate_upper_bound_) >= 0) {
236 break;
237 }
238
239 assert(prefix == nullptr || prefix_extractor_ != nullptr);
240 if (prefix != nullptr &&
241 prefix_extractor_->Transform(ikey_.user_key).compare(*prefix) != 0) {
242 assert(prefix_same_as_start_);
243 break;
244 }
245
246 if (TooManyInternalKeysSkipped()) {
247 return false;
248 }
249
250 if (IsVisible(ikey_.sequence)) {
251 // If the previous entry is of seqnum 0, the current entry will not
252 // possibly be skipped. This condition can potentially be relaxed to
253 // prev_key.seq <= ikey_.sequence. We are cautious because it will be more
254 // prone to bugs causing the same user key with the same sequence number.
255 if (!is_prev_key_seqnum_zero && skipping_saved_key &&
256 user_comparator_.Compare(ikey_.user_key, saved_key_.GetUserKey()) <=
257 0) {
258 num_skipped++; // skip this entry
259 PERF_COUNTER_ADD(internal_key_skipped_count, 1);
260 } else {
261 assert(!skipping_saved_key ||
262 user_comparator_.Compare(ikey_.user_key,
263 saved_key_.GetUserKey()) > 0);
264 num_skipped = 0;
265 reseek_done = false;
266 switch (ikey_.type) {
267 case kTypeDeletion:
268 case kTypeSingleDeletion:
269 // Arrange to skip all upcoming entries for this key since
270 // they are hidden by this deletion.
271 // if iterartor specified start_seqnum we
272 // 1) return internal key, including the type
273 // 2) return ikey only if ikey.seqnum >= start_seqnum_
274 // note that if deletion seqnum is < start_seqnum_ we
275 // just skip it like in normal iterator.
276 if (start_seqnum_ > 0 && ikey_.sequence >= start_seqnum_) {
277 saved_key_.SetInternalKey(ikey_);
278 valid_ = true;
279 return true;
280 } else {
281 saved_key_.SetUserKey(
282 ikey_.user_key, !pin_thru_lifetime_ ||
283 !iter_.iter()->IsKeyPinned() /* copy */);
284 skipping_saved_key = true;
285 PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
286 }
287 break;
288 case kTypeValue:
289 case kTypeBlobIndex:
290 if (start_seqnum_ > 0) {
291 // we are taking incremental snapshot here
292 // incremental snapshots aren't supported on DB with range deletes
293 assert(!(
294 (ikey_.type == kTypeBlobIndex) && (start_seqnum_ > 0)
295 ));
296 if (ikey_.sequence >= start_seqnum_) {
297 saved_key_.SetInternalKey(ikey_);
298 valid_ = true;
299 return true;
300 } else {
301 // this key and all previous versions shouldn't be included,
302 // skipping_saved_key
303 saved_key_.SetUserKey(
304 ikey_.user_key,
305 !pin_thru_lifetime_ ||
306 !iter_.iter()->IsKeyPinned() /* copy */);
307 skipping_saved_key = true;
308 }
309 } else {
310 saved_key_.SetUserKey(
311 ikey_.user_key, !pin_thru_lifetime_ ||
312 !iter_.iter()->IsKeyPinned() /* copy */);
313 if (range_del_agg_.ShouldDelete(
314 ikey_, RangeDelPositioningMode::kForwardTraversal)) {
315 // Arrange to skip all upcoming entries for this key since
316 // they are hidden by this deletion.
317 skipping_saved_key = true;
318 num_skipped = 0;
319 reseek_done = false;
320 PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
321 } else if (ikey_.type == kTypeBlobIndex) {
322 if (!allow_blob_) {
323 ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
324 status_ = Status::NotSupported(
325 "Encounter unexpected blob index. Please open DB with "
326 "rocksdb::blob_db::BlobDB instead.");
327 valid_ = false;
328 return false;
329 }
330
331 is_blob_ = true;
332 valid_ = true;
333 return true;
334 } else {
335 valid_ = true;
336 return true;
337 }
338 }
339 break;
340 case kTypeMerge:
341 saved_key_.SetUserKey(
342 ikey_.user_key,
343 !pin_thru_lifetime_ || !iter_.iter()->IsKeyPinned() /* copy */);
344 if (range_del_agg_.ShouldDelete(
345 ikey_, RangeDelPositioningMode::kForwardTraversal)) {
346 // Arrange to skip all upcoming entries for this key since
347 // they are hidden by this deletion.
348 skipping_saved_key = true;
349 num_skipped = 0;
350 reseek_done = false;
351 PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
352 } else {
353 // By now, we are sure the current ikey is going to yield a
354 // value
355 current_entry_is_merged_ = true;
356 valid_ = true;
357 return MergeValuesNewToOld(); // Go to a different state machine
358 }
359 break;
360 default:
361 assert(false);
362 break;
363 }
364 }
365 } else {
366 PERF_COUNTER_ADD(internal_recent_skipped_count, 1);
367
368 // This key was inserted after our snapshot was taken.
369 // If this happens too many times in a row for the same user key, we want
370 // to seek to the target sequence number.
371 int cmp =
372 user_comparator_.Compare(ikey_.user_key, saved_key_.GetUserKey());
373 if (cmp == 0 || (skipping_saved_key && cmp <= 0)) {
374 num_skipped++;
375 } else {
376 saved_key_.SetUserKey(
377 ikey_.user_key,
378 !iter_.iter()->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
379 skipping_saved_key = false;
380 num_skipped = 0;
381 reseek_done = false;
382 }
383 }
384
385 // If we have sequentially iterated via numerous equal keys, then it's
386 // better to seek so that we can avoid too many key comparisons.
387 //
388 // To avoid infinite loops, do not reseek if we have already attempted to
389 // reseek previously.
390 //
391 // TODO(lth): If we reseek to sequence number greater than ikey_.sequence,
392 // than it does not make sense to reseek as we would actually land further
393 // away from the desired key. There is opportunity for optimization here.
394 if (num_skipped > max_skip_ && !reseek_done) {
395 is_key_seqnum_zero_ = false;
396 num_skipped = 0;
397 reseek_done = true;
398 std::string last_key;
399 if (skipping_saved_key) {
400 // We're looking for the next user-key but all we see are the same
401 // user-key with decreasing sequence numbers. Fast forward to
402 // sequence number 0 and type deletion (the smallest type).
403 AppendInternalKey(&last_key, ParsedInternalKey(saved_key_.GetUserKey(),
404 0, kTypeDeletion));
405 // Don't set skipping_saved_key = false because we may still see more
406 // user-keys equal to saved_key_.
407 } else {
408 // We saw multiple entries with this user key and sequence numbers
409 // higher than sequence_. Fast forward to sequence_.
410 // Note that this only covers a case when a higher key was overwritten
411 // many times since our snapshot was taken, not the case when a lot of
412 // different keys were inserted after our snapshot was taken.
413 AppendInternalKey(&last_key,
414 ParsedInternalKey(saved_key_.GetUserKey(), sequence_,
415 kValueTypeForSeek));
416 }
417 iter_.Seek(last_key);
418 RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
419 } else {
420 iter_.Next();
421 }
422 } while (iter_.Valid());
423
424 valid_ = false;
425 return iter_.status().ok();
426 }
427
428 // Merge values of the same user key starting from the current iter_ position
429 // Scan from the newer entries to older entries.
430 // PRE: iter_.key() points to the first merge type entry
431 // saved_key_ stores the user key
432 // POST: saved_value_ has the merged value for the user key
433 // iter_ points to the next entry (or invalid)
MergeValuesNewToOld()434 bool DBIter::MergeValuesNewToOld() {
435 if (!merge_operator_) {
436 ROCKS_LOG_ERROR(logger_, "Options::merge_operator is null.");
437 status_ = Status::InvalidArgument("merge_operator_ must be set.");
438 valid_ = false;
439 return false;
440 }
441
442 // Temporarily pin the blocks that hold merge operands
443 TempPinData();
444 merge_context_.Clear();
445 // Start the merge process by pushing the first operand
446 merge_context_.PushOperand(
447 iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
448 TEST_SYNC_POINT("DBIter::MergeValuesNewToOld:PushedFirstOperand");
449
450 ParsedInternalKey ikey;
451 Status s;
452 for (iter_.Next(); iter_.Valid(); iter_.Next()) {
453 TEST_SYNC_POINT("DBIter::MergeValuesNewToOld:SteppedToNextOperand");
454 if (!ParseKey(&ikey)) {
455 return false;
456 }
457
458 if (!user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
459 // hit the next user key, stop right here
460 break;
461 } else if (kTypeDeletion == ikey.type || kTypeSingleDeletion == ikey.type ||
462 range_del_agg_.ShouldDelete(
463 ikey, RangeDelPositioningMode::kForwardTraversal)) {
464 // hit a delete with the same user key, stop right here
465 // iter_ is positioned after delete
466 iter_.Next();
467 break;
468 } else if (kTypeValue == ikey.type) {
469 // hit a put, merge the put value with operands and store the
470 // final result in saved_value_. We are done!
471 const Slice val = iter_.value();
472 s = MergeHelper::TimedFullMerge(
473 merge_operator_, ikey.user_key, &val, merge_context_.GetOperands(),
474 &saved_value_, logger_, statistics_, env_, &pinned_value_, true);
475 if (!s.ok()) {
476 valid_ = false;
477 status_ = s;
478 return false;
479 }
480 // iter_ is positioned after put
481 iter_.Next();
482 if (!iter_.status().ok()) {
483 valid_ = false;
484 return false;
485 }
486 return true;
487 } else if (kTypeMerge == ikey.type) {
488 // hit a merge, add the value as an operand and run associative merge.
489 // when complete, add result to operands and continue.
490 merge_context_.PushOperand(
491 iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
492 PERF_COUNTER_ADD(internal_merge_count, 1);
493 } else if (kTypeBlobIndex == ikey.type) {
494 if (!allow_blob_) {
495 ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
496 status_ = Status::NotSupported(
497 "Encounter unexpected blob index. Please open DB with "
498 "rocksdb::blob_db::BlobDB instead.");
499 } else {
500 status_ =
501 Status::NotSupported("Blob DB does not support merge operator.");
502 }
503 valid_ = false;
504 return false;
505 } else {
506 assert(false);
507 }
508 }
509
510 if (!iter_.status().ok()) {
511 valid_ = false;
512 return false;
513 }
514
515 // we either exhausted all internal keys under this user key, or hit
516 // a deletion marker.
517 // feed null as the existing value to the merge operator, such that
518 // client can differentiate this scenario and do things accordingly.
519 s = MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetUserKey(),
520 nullptr, merge_context_.GetOperands(),
521 &saved_value_, logger_, statistics_, env_,
522 &pinned_value_, true);
523 if (!s.ok()) {
524 valid_ = false;
525 status_ = s;
526 return false;
527 }
528
529 assert(status_.ok());
530 return true;
531 }
532
Prev()533 void DBIter::Prev() {
534 assert(valid_);
535 assert(status_.ok());
536
537 PERF_CPU_TIMER_GUARD(iter_prev_cpu_nanos, env_);
538 ReleaseTempPinnedData();
539 ResetInternalKeysSkippedCounter();
540 bool ok = true;
541 if (direction_ == kForward) {
542 if (!ReverseToBackward()) {
543 ok = false;
544 }
545 }
546 if (ok) {
547 Slice prefix;
548 if (prefix_same_as_start_) {
549 assert(prefix_extractor_ != nullptr);
550 prefix = prefix_.GetUserKey();
551 }
552 PrevInternal(prefix_same_as_start_ ? &prefix : nullptr);
553 }
554
555 if (statistics_ != nullptr) {
556 local_stats_.prev_count_++;
557 if (valid_) {
558 local_stats_.prev_found_count_++;
559 local_stats_.bytes_read_ += (key().size() + value().size());
560 }
561 }
562 }
563
ReverseToForward()564 bool DBIter::ReverseToForward() {
565 assert(iter_.status().ok());
566
567 // When moving backwards, iter_ is positioned on _previous_ key, which may
568 // not exist or may have different prefix than the current key().
569 // If that's the case, seek iter_ to current key.
570 if ((prefix_extractor_ != nullptr && !total_order_seek_) || !iter_.Valid()) {
571 IterKey last_key;
572 last_key.SetInternalKey(ParsedInternalKey(
573 saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek));
574 iter_.Seek(last_key.GetInternalKey());
575 }
576
577 direction_ = kForward;
578 // Skip keys less than the current key() (a.k.a. saved_key_).
579 while (iter_.Valid()) {
580 ParsedInternalKey ikey;
581 if (!ParseKey(&ikey)) {
582 return false;
583 }
584 if (user_comparator_.Compare(ikey.user_key, saved_key_.GetUserKey()) >= 0) {
585 return true;
586 }
587 iter_.Next();
588 }
589
590 if (!iter_.status().ok()) {
591 valid_ = false;
592 return false;
593 }
594
595 return true;
596 }
597
598 // Move iter_ to the key before saved_key_.
ReverseToBackward()599 bool DBIter::ReverseToBackward() {
600 assert(iter_.status().ok());
601
602 // When current_entry_is_merged_ is true, iter_ may be positioned on the next
603 // key, which may not exist or may have prefix different from current.
604 // If that's the case, seek to saved_key_.
605 if (current_entry_is_merged_ &&
606 ((prefix_extractor_ != nullptr && !total_order_seek_) ||
607 !iter_.Valid())) {
608 IterKey last_key;
609 // Using kMaxSequenceNumber and kValueTypeForSeek
610 // (not kValueTypeForSeekForPrev) to seek to a key strictly smaller
611 // than saved_key_.
612 last_key.SetInternalKey(ParsedInternalKey(
613 saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek));
614 if (prefix_extractor_ != nullptr && !total_order_seek_) {
615 iter_.SeekForPrev(last_key.GetInternalKey());
616 } else {
617 // Some iterators may not support SeekForPrev(), so we avoid using it
618 // when prefix seek mode is disabled. This is somewhat expensive
619 // (an extra Prev(), as well as an extra change of direction of iter_),
620 // so we may need to reconsider it later.
621 iter_.Seek(last_key.GetInternalKey());
622 if (!iter_.Valid() && iter_.status().ok()) {
623 iter_.SeekToLast();
624 }
625 }
626 }
627
628 direction_ = kReverse;
629 return FindUserKeyBeforeSavedKey();
630 }
631
PrevInternal(const Slice * prefix)632 void DBIter::PrevInternal(const Slice* prefix) {
633 while (iter_.Valid()) {
634 saved_key_.SetUserKey(
635 ExtractUserKey(iter_.key()),
636 !iter_.iter()->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
637
638 assert(prefix == nullptr || prefix_extractor_ != nullptr);
639 if (prefix != nullptr &&
640 prefix_extractor_->Transform(saved_key_.GetUserKey())
641 .compare(*prefix) != 0) {
642 assert(prefix_same_as_start_);
643 // Current key does not have the same prefix as start
644 valid_ = false;
645 return;
646 }
647
648 assert(iterate_lower_bound_ == nullptr || iter_.MayBeOutOfLowerBound() ||
649 user_comparator_.Compare(saved_key_.GetUserKey(),
650 *iterate_lower_bound_) >= 0);
651 if (iterate_lower_bound_ != nullptr && iter_.MayBeOutOfLowerBound() &&
652 user_comparator_.Compare(saved_key_.GetUserKey(),
653 *iterate_lower_bound_) < 0) {
654 // We've iterated earlier than the user-specified lower bound.
655 valid_ = false;
656 return;
657 }
658
659 if (!FindValueForCurrentKey()) { // assigns valid_
660 return;
661 }
662
663 // Whether or not we found a value for current key, we need iter_ to end up
664 // on a smaller key.
665 if (!FindUserKeyBeforeSavedKey()) {
666 return;
667 }
668
669 if (valid_) {
670 // Found the value.
671 return;
672 }
673
674 if (TooManyInternalKeysSkipped(false)) {
675 return;
676 }
677 }
678
679 // We haven't found any key - iterator is not valid
680 valid_ = false;
681 }
682
683 // Used for backwards iteration.
684 // Looks at the entries with user key saved_key_ and finds the most up-to-date
685 // value for it, or executes a merge, or determines that the value was deleted.
686 // Sets valid_ to true if the value is found and is ready to be presented to
687 // the user through value().
688 // Sets valid_ to false if the value was deleted, and we should try another key.
689 // Returns false if an error occurred, and !status().ok() and !valid_.
690 //
691 // PRE: iter_ is positioned on the last entry with user key equal to saved_key_.
692 // POST: iter_ is positioned on one of the entries equal to saved_key_, or on
693 // the entry just before them, or on the entry just after them.
FindValueForCurrentKey()694 bool DBIter::FindValueForCurrentKey() {
695 assert(iter_.Valid());
696 merge_context_.Clear();
697 current_entry_is_merged_ = false;
698 // last entry before merge (could be kTypeDeletion, kTypeSingleDeletion or
699 // kTypeValue)
700 ValueType last_not_merge_type = kTypeDeletion;
701 ValueType last_key_entry_type = kTypeDeletion;
702
703 // Temporarily pin blocks that hold (merge operands / the value)
704 ReleaseTempPinnedData();
705 TempPinData();
706 size_t num_skipped = 0;
707 while (iter_.Valid()) {
708 ParsedInternalKey ikey;
709 if (!ParseKey(&ikey)) {
710 return false;
711 }
712
713 if (!IsVisible(ikey.sequence) ||
714 !user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
715 break;
716 }
717 if (TooManyInternalKeysSkipped()) {
718 return false;
719 }
720
721 // This user key has lots of entries.
722 // We're going from old to new, and it's taking too long. Let's do a Seek()
723 // and go from new to old. This helps when a key was overwritten many times.
724 if (num_skipped >= max_skip_) {
725 return FindValueForCurrentKeyUsingSeek();
726 }
727
728 last_key_entry_type = ikey.type;
729 switch (last_key_entry_type) {
730 case kTypeValue:
731 case kTypeBlobIndex:
732 if (range_del_agg_.ShouldDelete(
733 ikey, RangeDelPositioningMode::kBackwardTraversal)) {
734 last_key_entry_type = kTypeRangeDeletion;
735 PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
736 } else {
737 assert(iter_.iter()->IsValuePinned());
738 pinned_value_ = iter_.value();
739 }
740 merge_context_.Clear();
741 last_not_merge_type = last_key_entry_type;
742 break;
743 case kTypeDeletion:
744 case kTypeSingleDeletion:
745 merge_context_.Clear();
746 last_not_merge_type = last_key_entry_type;
747 PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
748 break;
749 case kTypeMerge:
750 if (range_del_agg_.ShouldDelete(
751 ikey, RangeDelPositioningMode::kBackwardTraversal)) {
752 merge_context_.Clear();
753 last_key_entry_type = kTypeRangeDeletion;
754 last_not_merge_type = last_key_entry_type;
755 PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
756 } else {
757 assert(merge_operator_ != nullptr);
758 merge_context_.PushOperandBack(
759 iter_.value(),
760 iter_.iter()->IsValuePinned() /* operand_pinned */);
761 PERF_COUNTER_ADD(internal_merge_count, 1);
762 }
763 break;
764 default:
765 assert(false);
766 }
767
768 PERF_COUNTER_ADD(internal_key_skipped_count, 1);
769 iter_.Prev();
770 ++num_skipped;
771 }
772
773 if (!iter_.status().ok()) {
774 valid_ = false;
775 return false;
776 }
777
778 Status s;
779 is_blob_ = false;
780 switch (last_key_entry_type) {
781 case kTypeDeletion:
782 case kTypeSingleDeletion:
783 case kTypeRangeDeletion:
784 valid_ = false;
785 return true;
786 case kTypeMerge:
787 current_entry_is_merged_ = true;
788 if (last_not_merge_type == kTypeDeletion ||
789 last_not_merge_type == kTypeSingleDeletion ||
790 last_not_merge_type == kTypeRangeDeletion) {
791 s = MergeHelper::TimedFullMerge(
792 merge_operator_, saved_key_.GetUserKey(), nullptr,
793 merge_context_.GetOperands(), &saved_value_, logger_, statistics_,
794 env_, &pinned_value_, true);
795 } else if (last_not_merge_type == kTypeBlobIndex) {
796 if (!allow_blob_) {
797 ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
798 status_ = Status::NotSupported(
799 "Encounter unexpected blob index. Please open DB with "
800 "rocksdb::blob_db::BlobDB instead.");
801 } else {
802 status_ =
803 Status::NotSupported("Blob DB does not support merge operator.");
804 }
805 valid_ = false;
806 return false;
807 } else {
808 assert(last_not_merge_type == kTypeValue);
809 s = MergeHelper::TimedFullMerge(
810 merge_operator_, saved_key_.GetUserKey(), &pinned_value_,
811 merge_context_.GetOperands(), &saved_value_, logger_, statistics_,
812 env_, &pinned_value_, true);
813 }
814 break;
815 case kTypeValue:
816 // do nothing - we've already has value in pinned_value_
817 break;
818 case kTypeBlobIndex:
819 if (!allow_blob_) {
820 ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
821 status_ = Status::NotSupported(
822 "Encounter unexpected blob index. Please open DB with "
823 "rocksdb::blob_db::BlobDB instead.");
824 valid_ = false;
825 return false;
826 }
827 is_blob_ = true;
828 break;
829 default:
830 assert(false);
831 break;
832 }
833 if (!s.ok()) {
834 valid_ = false;
835 status_ = s;
836 return false;
837 }
838 valid_ = true;
839 return true;
840 }
841
842 // This function is used in FindValueForCurrentKey.
843 // We use Seek() function instead of Prev() to find necessary value
844 // TODO: This is very similar to FindNextUserEntry() and MergeValuesNewToOld().
845 // Would be nice to reuse some code.
FindValueForCurrentKeyUsingSeek()846 bool DBIter::FindValueForCurrentKeyUsingSeek() {
847 // FindValueForCurrentKey will enable pinning before calling
848 // FindValueForCurrentKeyUsingSeek()
849 assert(pinned_iters_mgr_.PinningEnabled());
850 std::string last_key;
851 AppendInternalKey(&last_key, ParsedInternalKey(saved_key_.GetUserKey(),
852 sequence_, kValueTypeForSeek));
853 iter_.Seek(last_key);
854 RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
855
856 // In case read_callback presents, the value we seek to may not be visible.
857 // Find the next value that's visible.
858 ParsedInternalKey ikey;
859 is_blob_ = false;
860 while (true) {
861 if (!iter_.Valid()) {
862 valid_ = false;
863 return iter_.status().ok();
864 }
865
866 if (!ParseKey(&ikey)) {
867 return false;
868 }
869 if (!user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
870 // No visible values for this key, even though FindValueForCurrentKey()
871 // has seen some. This is possible if we're using a tailing iterator, and
872 // the entries were discarded in a compaction.
873 valid_ = false;
874 return true;
875 }
876
877 if (IsVisible(ikey.sequence)) {
878 break;
879 }
880
881 iter_.Next();
882 }
883
884 if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
885 range_del_agg_.ShouldDelete(
886 ikey, RangeDelPositioningMode::kBackwardTraversal)) {
887 valid_ = false;
888 return true;
889 }
890 if (ikey.type == kTypeBlobIndex && !allow_blob_) {
891 ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
892 status_ = Status::NotSupported(
893 "Encounter unexpected blob index. Please open DB with "
894 "rocksdb::blob_db::BlobDB instead.");
895 valid_ = false;
896 return false;
897 }
898 if (ikey.type == kTypeValue || ikey.type == kTypeBlobIndex) {
899 assert(iter_.iter()->IsValuePinned());
900 pinned_value_ = iter_.value();
901 is_blob_ = (ikey.type == kTypeBlobIndex);
902 valid_ = true;
903 return true;
904 }
905
906 // kTypeMerge. We need to collect all kTypeMerge values and save them
907 // in operands
908 assert(ikey.type == kTypeMerge);
909 current_entry_is_merged_ = true;
910 merge_context_.Clear();
911 merge_context_.PushOperand(
912 iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
913 while (true) {
914 iter_.Next();
915
916 if (!iter_.Valid()) {
917 if (!iter_.status().ok()) {
918 valid_ = false;
919 return false;
920 }
921 break;
922 }
923 if (!ParseKey(&ikey)) {
924 return false;
925 }
926 if (!user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
927 break;
928 }
929
930 if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
931 range_del_agg_.ShouldDelete(
932 ikey, RangeDelPositioningMode::kForwardTraversal)) {
933 break;
934 } else if (ikey.type == kTypeValue) {
935 const Slice val = iter_.value();
936 Status s = MergeHelper::TimedFullMerge(
937 merge_operator_, saved_key_.GetUserKey(), &val,
938 merge_context_.GetOperands(), &saved_value_, logger_, statistics_,
939 env_, &pinned_value_, true);
940 if (!s.ok()) {
941 valid_ = false;
942 status_ = s;
943 return false;
944 }
945 valid_ = true;
946 return true;
947 } else if (ikey.type == kTypeMerge) {
948 merge_context_.PushOperand(
949 iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
950 PERF_COUNTER_ADD(internal_merge_count, 1);
951 } else if (ikey.type == kTypeBlobIndex) {
952 if (!allow_blob_) {
953 ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
954 status_ = Status::NotSupported(
955 "Encounter unexpected blob index. Please open DB with "
956 "rocksdb::blob_db::BlobDB instead.");
957 } else {
958 status_ =
959 Status::NotSupported("Blob DB does not support merge operator.");
960 }
961 valid_ = false;
962 return false;
963 } else {
964 assert(false);
965 }
966 }
967
968 Status s = MergeHelper::TimedFullMerge(
969 merge_operator_, saved_key_.GetUserKey(), nullptr,
970 merge_context_.GetOperands(), &saved_value_, logger_, statistics_, env_,
971 &pinned_value_, true);
972 if (!s.ok()) {
973 valid_ = false;
974 status_ = s;
975 return false;
976 }
977
978 // Make sure we leave iter_ in a good state. If it's valid and we don't care
979 // about prefixes, that's already good enough. Otherwise it needs to be
980 // seeked to the current key.
981 if ((prefix_extractor_ != nullptr && !total_order_seek_) || !iter_.Valid()) {
982 if (prefix_extractor_ != nullptr && !total_order_seek_) {
983 iter_.SeekForPrev(last_key);
984 } else {
985 iter_.Seek(last_key);
986 if (!iter_.Valid() && iter_.status().ok()) {
987 iter_.SeekToLast();
988 }
989 }
990 RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
991 }
992
993 valid_ = true;
994 return true;
995 }
996
997 // Move backwards until the key smaller than saved_key_.
998 // Changes valid_ only if return value is false.
FindUserKeyBeforeSavedKey()999 bool DBIter::FindUserKeyBeforeSavedKey() {
1000 assert(status_.ok());
1001 size_t num_skipped = 0;
1002 while (iter_.Valid()) {
1003 ParsedInternalKey ikey;
1004 if (!ParseKey(&ikey)) {
1005 return false;
1006 }
1007
1008 if (user_comparator_.Compare(ikey.user_key, saved_key_.GetUserKey()) < 0) {
1009 return true;
1010 }
1011
1012 if (TooManyInternalKeysSkipped()) {
1013 return false;
1014 }
1015
1016 assert(ikey.sequence != kMaxSequenceNumber);
1017 if (!IsVisible(ikey.sequence)) {
1018 PERF_COUNTER_ADD(internal_recent_skipped_count, 1);
1019 } else {
1020 PERF_COUNTER_ADD(internal_key_skipped_count, 1);
1021 }
1022
1023 if (num_skipped >= max_skip_) {
1024 num_skipped = 0;
1025 IterKey last_key;
1026 last_key.SetInternalKey(ParsedInternalKey(
1027 saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek));
1028 // It would be more efficient to use SeekForPrev() here, but some
1029 // iterators may not support it.
1030 iter_.Seek(last_key.GetInternalKey());
1031 RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
1032 if (!iter_.Valid()) {
1033 break;
1034 }
1035 } else {
1036 ++num_skipped;
1037 }
1038
1039 iter_.Prev();
1040 }
1041
1042 if (!iter_.status().ok()) {
1043 valid_ = false;
1044 return false;
1045 }
1046
1047 return true;
1048 }
1049
TooManyInternalKeysSkipped(bool increment)1050 bool DBIter::TooManyInternalKeysSkipped(bool increment) {
1051 if ((max_skippable_internal_keys_ > 0) &&
1052 (num_internal_keys_skipped_ > max_skippable_internal_keys_)) {
1053 valid_ = false;
1054 status_ = Status::Incomplete("Too many internal keys skipped.");
1055 return true;
1056 } else if (increment) {
1057 num_internal_keys_skipped_++;
1058 }
1059 return false;
1060 }
1061
IsVisible(SequenceNumber sequence)1062 bool DBIter::IsVisible(SequenceNumber sequence) {
1063 if (read_callback_ == nullptr) {
1064 return sequence <= sequence_;
1065 } else {
1066 return read_callback_->IsVisible(sequence);
1067 }
1068 }
1069
SetSavedKeyToSeekTarget(const Slice & target)1070 void DBIter::SetSavedKeyToSeekTarget(const Slice& target) {
1071 is_key_seqnum_zero_ = false;
1072 SequenceNumber seq = sequence_;
1073 saved_key_.Clear();
1074 saved_key_.SetInternalKey(target, seq);
1075
1076 if (iterate_lower_bound_ != nullptr &&
1077 user_comparator_.Compare(saved_key_.GetUserKey(), *iterate_lower_bound_) <
1078 0) {
1079 // Seek key is smaller than the lower bound.
1080 saved_key_.Clear();
1081 saved_key_.SetInternalKey(*iterate_lower_bound_, seq);
1082 }
1083 }
1084
SetSavedKeyToSeekForPrevTarget(const Slice & target)1085 void DBIter::SetSavedKeyToSeekForPrevTarget(const Slice& target) {
1086 is_key_seqnum_zero_ = false;
1087 saved_key_.Clear();
1088 // now saved_key is used to store internal key.
1089 saved_key_.SetInternalKey(target, 0 /* sequence_number */,
1090 kValueTypeForSeekForPrev);
1091
1092 if (iterate_upper_bound_ != nullptr &&
1093 user_comparator_.Compare(saved_key_.GetUserKey(),
1094 *iterate_upper_bound_) >= 0) {
1095 saved_key_.Clear();
1096 saved_key_.SetInternalKey(*iterate_upper_bound_, kMaxSequenceNumber);
1097 }
1098 }
1099
Seek(const Slice & target)1100 void DBIter::Seek(const Slice& target) {
1101 PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, env_);
1102 StopWatch sw(env_, statistics_, DB_SEEK);
1103
1104 #ifndef ROCKSDB_LITE
1105 if (db_impl_ != nullptr && cfd_ != nullptr) {
1106 db_impl_->TraceIteratorSeek(cfd_->GetID(), target);
1107 }
1108 #endif // ROCKSDB_LITE
1109
1110 status_ = Status::OK();
1111 ReleaseTempPinnedData();
1112 ResetInternalKeysSkippedCounter();
1113
1114 // Seek the inner iterator based on the target key.
1115 {
1116 PERF_TIMER_GUARD(seek_internal_seek_time);
1117
1118 SetSavedKeyToSeekTarget(target);
1119 iter_.Seek(saved_key_.GetInternalKey());
1120
1121 range_del_agg_.InvalidateRangeDelMapPositions();
1122 RecordTick(statistics_, NUMBER_DB_SEEK);
1123 }
1124 if (!iter_.Valid()) {
1125 valid_ = false;
1126 return;
1127 }
1128 direction_ = kForward;
1129
1130 // Now the inner iterator is placed to the target position. From there,
1131 // we need to find out the next key that is visible to the user.
1132 //
1133 ClearSavedValue();
1134 if (prefix_same_as_start_) {
1135 // The case where the iterator needs to be invalidated if it has exausted
1136 // keys within the same prefix of the seek key.
1137 assert(prefix_extractor_ != nullptr);
1138 Slice target_prefix;
1139 target_prefix = prefix_extractor_->Transform(target);
1140 FindNextUserEntry(false /* not skipping saved_key */,
1141 &target_prefix /* prefix */);
1142 if (valid_) {
1143 // Remember the prefix of the seek key for the future Prev() call to
1144 // check.
1145 prefix_.SetUserKey(target_prefix);
1146 }
1147 } else {
1148 FindNextUserEntry(false /* not skipping saved_key */, nullptr);
1149 }
1150 if (!valid_) {
1151 return;
1152 }
1153
1154 // Updating stats and perf context counters.
1155 if (statistics_ != nullptr) {
1156 // Decrement since we don't want to count this key as skipped
1157 RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
1158 RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
1159 }
1160 PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
1161 }
1162
SeekForPrev(const Slice & target)1163 void DBIter::SeekForPrev(const Slice& target) {
1164 PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, env_);
1165 StopWatch sw(env_, statistics_, DB_SEEK);
1166
1167 #ifndef ROCKSDB_LITE
1168 if (db_impl_ != nullptr && cfd_ != nullptr) {
1169 db_impl_->TraceIteratorSeekForPrev(cfd_->GetID(), target);
1170 }
1171 #endif // ROCKSDB_LITE
1172
1173 status_ = Status::OK();
1174 ReleaseTempPinnedData();
1175 ResetInternalKeysSkippedCounter();
1176
1177 // Seek the inner iterator based on the target key.
1178 {
1179 PERF_TIMER_GUARD(seek_internal_seek_time);
1180 SetSavedKeyToSeekForPrevTarget(target);
1181 iter_.SeekForPrev(saved_key_.GetInternalKey());
1182 range_del_agg_.InvalidateRangeDelMapPositions();
1183 RecordTick(statistics_, NUMBER_DB_SEEK);
1184 }
1185 if (!iter_.Valid()) {
1186 valid_ = false;
1187 return;
1188 }
1189 direction_ = kReverse;
1190
1191 // Now the inner iterator is placed to the target position. From there,
1192 // we need to find out the first key that is visible to the user in the
1193 // backward direction.
1194 ClearSavedValue();
1195 if (prefix_same_as_start_) {
1196 // The case where the iterator needs to be invalidated if it has exausted
1197 // keys within the same prefix of the seek key.
1198 assert(prefix_extractor_ != nullptr);
1199 Slice target_prefix;
1200 target_prefix = prefix_extractor_->Transform(target);
1201 PrevInternal(&target_prefix);
1202 if (valid_) {
1203 // Remember the prefix of the seek key for the future Prev() call to
1204 // check.
1205 prefix_.SetUserKey(target_prefix);
1206 }
1207 } else {
1208 PrevInternal(nullptr);
1209 }
1210
1211 // Report stats and perf context.
1212 if (statistics_ != nullptr && valid_) {
1213 RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
1214 RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
1215 PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
1216 }
1217 }
1218
SeekToFirst()1219 void DBIter::SeekToFirst() {
1220 if (iterate_lower_bound_ != nullptr) {
1221 Seek(*iterate_lower_bound_);
1222 return;
1223 }
1224 PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, env_);
1225 // Don't use iter_::Seek() if we set a prefix extractor
1226 // because prefix seek will be used.
1227 if (prefix_extractor_ != nullptr && !total_order_seek_) {
1228 max_skip_ = std::numeric_limits<uint64_t>::max();
1229 }
1230 status_ = Status::OK();
1231 direction_ = kForward;
1232 ReleaseTempPinnedData();
1233 ResetInternalKeysSkippedCounter();
1234 ClearSavedValue();
1235 is_key_seqnum_zero_ = false;
1236
1237 {
1238 PERF_TIMER_GUARD(seek_internal_seek_time);
1239 iter_.SeekToFirst();
1240 range_del_agg_.InvalidateRangeDelMapPositions();
1241 }
1242
1243 RecordTick(statistics_, NUMBER_DB_SEEK);
1244 if (iter_.Valid()) {
1245 saved_key_.SetUserKey(
1246 ExtractUserKey(iter_.key()),
1247 !iter_.iter()->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
1248 FindNextUserEntry(false /* not skipping saved_key */,
1249 nullptr /* no prefix check */);
1250 if (statistics_ != nullptr) {
1251 if (valid_) {
1252 RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
1253 RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
1254 PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
1255 }
1256 }
1257 } else {
1258 valid_ = false;
1259 }
1260 if (valid_ && prefix_same_as_start_) {
1261 assert(prefix_extractor_ != nullptr);
1262 prefix_.SetUserKey(prefix_extractor_->Transform(saved_key_.GetUserKey()));
1263 }
1264 }
1265
SeekToLast()1266 void DBIter::SeekToLast() {
1267 if (iterate_upper_bound_ != nullptr) {
1268 // Seek to last key strictly less than ReadOptions.iterate_upper_bound.
1269 SeekForPrev(*iterate_upper_bound_);
1270 if (Valid() && user_comparator_.Equal(*iterate_upper_bound_, key())) {
1271 ReleaseTempPinnedData();
1272 PrevInternal(nullptr);
1273 }
1274 return;
1275 }
1276
1277 PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, env_);
1278 // Don't use iter_::Seek() if we set a prefix extractor
1279 // because prefix seek will be used.
1280 if (prefix_extractor_ != nullptr && !total_order_seek_) {
1281 max_skip_ = std::numeric_limits<uint64_t>::max();
1282 }
1283 status_ = Status::OK();
1284 direction_ = kReverse;
1285 ReleaseTempPinnedData();
1286 ResetInternalKeysSkippedCounter();
1287 ClearSavedValue();
1288 is_key_seqnum_zero_ = false;
1289
1290 {
1291 PERF_TIMER_GUARD(seek_internal_seek_time);
1292 iter_.SeekToLast();
1293 range_del_agg_.InvalidateRangeDelMapPositions();
1294 }
1295 PrevInternal(nullptr);
1296 if (statistics_ != nullptr) {
1297 RecordTick(statistics_, NUMBER_DB_SEEK);
1298 if (valid_) {
1299 RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
1300 RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
1301 PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
1302 }
1303 }
1304 if (valid_ && prefix_same_as_start_) {
1305 assert(prefix_extractor_ != nullptr);
1306 prefix_.SetUserKey(prefix_extractor_->Transform(saved_key_.GetUserKey()));
1307 }
1308 }
1309
NewDBIterator(Env * env,const ReadOptions & read_options,const ImmutableCFOptions & cf_options,const MutableCFOptions & mutable_cf_options,const Comparator * user_key_comparator,InternalIterator * internal_iter,const SequenceNumber & sequence,uint64_t max_sequential_skip_in_iterations,ReadCallback * read_callback,DBImpl * db_impl,ColumnFamilyData * cfd,bool allow_blob)1310 Iterator* NewDBIterator(Env* env, const ReadOptions& read_options,
1311 const ImmutableCFOptions& cf_options,
1312 const MutableCFOptions& mutable_cf_options,
1313 const Comparator* user_key_comparator,
1314 InternalIterator* internal_iter,
1315 const SequenceNumber& sequence,
1316 uint64_t max_sequential_skip_in_iterations,
1317 ReadCallback* read_callback, DBImpl* db_impl,
1318 ColumnFamilyData* cfd, bool allow_blob) {
1319 DBIter* db_iter = new DBIter(
1320 env, read_options, cf_options, mutable_cf_options, user_key_comparator,
1321 internal_iter, sequence, false, max_sequential_skip_in_iterations,
1322 read_callback, db_impl, cfd, allow_blob);
1323 return db_iter;
1324 }
1325
1326 } // namespace rocksdb
1327