1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "db/db_iter.h"
11 #include <string>
12 #include <iostream>
13 #include <limits>
14 
15 #include "db/dbformat.h"
16 #include "db/merge_context.h"
17 #include "db/merge_helper.h"
18 #include "db/pinned_iterators_manager.h"
19 #include "file/filename.h"
20 #include "logging/logging.h"
21 #include "memory/arena.h"
22 #include "monitoring/perf_context_imp.h"
23 #include "rocksdb/env.h"
24 #include "rocksdb/iterator.h"
25 #include "rocksdb/merge_operator.h"
26 #include "rocksdb/options.h"
27 #include "table/internal_iterator.h"
28 #include "table/iterator_wrapper.h"
29 #include "trace_replay/trace_replay.h"
30 #include "util/mutexlock.h"
31 #include "util/string_util.h"
32 #include "util/user_comparator_wrapper.h"
33 
34 namespace rocksdb {
35 
36 #if 0
37 static void DumpInternalIter(Iterator* iter) {
38   for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
39     ParsedInternalKey k;
40     if (!ParseInternalKey(iter->key(), &k)) {
41       fprintf(stderr, "Corrupt '%s'\n", EscapeString(iter->key()).c_str());
42     } else {
43       fprintf(stderr, "@ '%s'\n", k.DebugString().c_str());
44     }
45   }
46 }
47 #endif
48 
DBIter(Env * _env,const ReadOptions & read_options,const ImmutableCFOptions & cf_options,const MutableCFOptions & mutable_cf_options,const Comparator * cmp,InternalIterator * iter,SequenceNumber s,bool arena_mode,uint64_t max_sequential_skip_in_iterations,ReadCallback * read_callback,DBImpl * db_impl,ColumnFamilyData * cfd,bool allow_blob)49 DBIter::DBIter(Env* _env, const ReadOptions& read_options,
50                const ImmutableCFOptions& cf_options,
51                const MutableCFOptions& mutable_cf_options,
52                const Comparator* cmp, InternalIterator* iter, SequenceNumber s,
53                bool arena_mode, uint64_t max_sequential_skip_in_iterations,
54                ReadCallback* read_callback, DBImpl* db_impl,
55                ColumnFamilyData* cfd, bool allow_blob)
56     : prefix_extractor_(mutable_cf_options.prefix_extractor.get()),
57       env_(_env),
58       logger_(cf_options.info_log),
59       user_comparator_(cmp),
60       merge_operator_(cf_options.merge_operator),
61       iter_(iter),
62       read_callback_(read_callback),
63       sequence_(s),
64       statistics_(cf_options.statistics),
65       num_internal_keys_skipped_(0),
66       iterate_lower_bound_(read_options.iterate_lower_bound),
67       iterate_upper_bound_(read_options.iterate_upper_bound),
68       direction_(kForward),
69       valid_(false),
70       current_entry_is_merged_(false),
71       is_key_seqnum_zero_(false),
72       prefix_same_as_start_(mutable_cf_options.prefix_extractor
73                                 ? read_options.prefix_same_as_start
74                                 : false),
75       pin_thru_lifetime_(read_options.pin_data),
76       total_order_seek_(read_options.total_order_seek),
77       allow_blob_(allow_blob),
78       is_blob_(false),
79       arena_mode_(arena_mode),
80       range_del_agg_(&cf_options.internal_comparator, s),
81       db_impl_(db_impl),
82       cfd_(cfd),
83       start_seqnum_(read_options.iter_start_seqnum) {
84   RecordTick(statistics_, NO_ITERATOR_CREATED);
85   max_skip_ = max_sequential_skip_in_iterations;
86   max_skippable_internal_keys_ = read_options.max_skippable_internal_keys;
87   if (pin_thru_lifetime_) {
88     pinned_iters_mgr_.StartPinning();
89   }
90   if (iter_.iter()) {
91     iter_.iter()->SetPinnedItersMgr(&pinned_iters_mgr_);
92   }
93 }
94 
GetProperty(std::string prop_name,std::string * prop)95 Status DBIter::GetProperty(std::string prop_name, std::string* prop) {
96   if (prop == nullptr) {
97     return Status::InvalidArgument("prop is nullptr");
98   }
99   if (prop_name == "rocksdb.iterator.super-version-number") {
100     // First try to pass the value returned from inner iterator.
101     return iter_.iter()->GetProperty(prop_name, prop);
102   } else if (prop_name == "rocksdb.iterator.is-key-pinned") {
103     if (valid_) {
104       *prop = (pin_thru_lifetime_ && saved_key_.IsKeyPinned()) ? "1" : "0";
105     } else {
106       *prop = "Iterator is not valid.";
107     }
108     return Status::OK();
109   } else if (prop_name == "rocksdb.iterator.internal-key") {
110     *prop = saved_key_.GetUserKey().ToString();
111     return Status::OK();
112   }
113   return Status::InvalidArgument("Unidentified property.");
114 }
115 
ParseKey(ParsedInternalKey * ikey)116 bool DBIter::ParseKey(ParsedInternalKey* ikey) {
117   if (!ParseInternalKey(iter_.key(), ikey)) {
118     status_ = Status::Corruption("corrupted internal key in DBIter");
119     valid_ = false;
120     ROCKS_LOG_ERROR(logger_, "corrupted internal key in DBIter: %s",
121                     iter_.key().ToString(true).c_str());
122     return false;
123   } else {
124     return true;
125   }
126 }
127 
Next()128 void DBIter::Next() {
129   assert(valid_);
130   assert(status_.ok());
131 
132   PERF_CPU_TIMER_GUARD(iter_next_cpu_nanos, env_);
133   // Release temporarily pinned blocks from last operation
134   ReleaseTempPinnedData();
135   local_stats_.skip_count_ += num_internal_keys_skipped_;
136   local_stats_.skip_count_--;
137   num_internal_keys_skipped_ = 0;
138   bool ok = true;
139   if (direction_ == kReverse) {
140     is_key_seqnum_zero_ = false;
141     if (!ReverseToForward()) {
142       ok = false;
143     }
144   } else if (!current_entry_is_merged_) {
145     // If the current value is not a merge, the iter position is the
146     // current key, which is already returned. We can safely issue a
147     // Next() without checking the current key.
148     // If the current key is a merge, very likely iter already points
149     // to the next internal position.
150     assert(iter_.Valid());
151     iter_.Next();
152     PERF_COUNTER_ADD(internal_key_skipped_count, 1);
153   }
154 
155   local_stats_.next_count_++;
156   if (ok && iter_.Valid()) {
157     Slice prefix;
158     if (prefix_same_as_start_) {
159       assert(prefix_extractor_ != nullptr);
160       prefix = prefix_.GetUserKey();
161     }
162     FindNextUserEntry(true /* skipping the current user key */,
163                       prefix_same_as_start_ ? &prefix : nullptr);
164   } else {
165     is_key_seqnum_zero_ = false;
166     valid_ = false;
167   }
168   if (statistics_ != nullptr && valid_) {
169     local_stats_.next_found_count_++;
170     local_stats_.bytes_read_ += (key().size() + value().size());
171   }
172 }
173 
174 // PRE: saved_key_ has the current user key if skipping_saved_key
175 // POST: saved_key_ should have the next user key if valid_,
176 //       if the current entry is a result of merge
177 //           current_entry_is_merged_ => true
178 //           saved_value_             => the merged value
179 //
180 // NOTE: In between, saved_key_ can point to a user key that has
181 //       a delete marker or a sequence number higher than sequence_
182 //       saved_key_ MUST have a proper user_key before calling this function
183 //
184 // The prefix parameter, if not null, indicates that we need to iterator
185 // within the prefix, and the iterator needs to be made invalid, if no
186 // more entry for the prefix can be found.
FindNextUserEntry(bool skipping_saved_key,const Slice * prefix)187 bool DBIter::FindNextUserEntry(bool skipping_saved_key, const Slice* prefix) {
188   PERF_TIMER_GUARD(find_next_user_entry_time);
189   return FindNextUserEntryInternal(skipping_saved_key, prefix);
190 }
191 
192 // Actual implementation of DBIter::FindNextUserEntry()
FindNextUserEntryInternal(bool skipping_saved_key,const Slice * prefix)193 bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
194                                        const Slice* prefix) {
195   // Loop until we hit an acceptable entry to yield
196   assert(iter_.Valid());
197   assert(status_.ok());
198   assert(direction_ == kForward);
199   current_entry_is_merged_ = false;
200 
201   // How many times in a row we have skipped an entry with user key less than
202   // or equal to saved_key_. We could skip these entries either because
203   // sequence numbers were too high or because skipping_saved_key = true.
204   // What saved_key_ contains throughout this method:
205   //  - if skipping_saved_key        : saved_key_ contains the key that we need
206   //  to skip,
207   //                         and we haven't seen any keys greater than that,
208   //  - if num_skipped > 0 : saved_key_ contains the key that we have skipped
209   //                         num_skipped times, and we haven't seen any keys
210   //                         greater than that,
211   //  - none of the above  : saved_key_ can contain anything, it doesn't matter.
212   uint64_t num_skipped = 0;
213   // For write unprepared, the target sequence number in reseek could be larger
214   // than the snapshot, and thus needs to be skipped again. This could result in
215   // an infinite loop of reseeks. To avoid that, we limit the number of reseeks
216   // to one.
217   bool reseek_done = false;
218 
219   is_blob_ = false;
220 
221   do {
222     // Will update is_key_seqnum_zero_ as soon as we parsed the current key
223     // but we need to save the previous value to be used in the loop.
224     bool is_prev_key_seqnum_zero = is_key_seqnum_zero_;
225     if (!ParseKey(&ikey_)) {
226       is_key_seqnum_zero_ = false;
227       return false;
228     }
229 
230     is_key_seqnum_zero_ = (ikey_.sequence == 0);
231 
232     assert(iterate_upper_bound_ == nullptr || iter_.MayBeOutOfUpperBound() ||
233            user_comparator_.Compare(ikey_.user_key, *iterate_upper_bound_) < 0);
234     if (iterate_upper_bound_ != nullptr && iter_.MayBeOutOfUpperBound() &&
235         user_comparator_.Compare(ikey_.user_key, *iterate_upper_bound_) >= 0) {
236       break;
237     }
238 
239     assert(prefix == nullptr || prefix_extractor_ != nullptr);
240     if (prefix != nullptr &&
241         prefix_extractor_->Transform(ikey_.user_key).compare(*prefix) != 0) {
242       assert(prefix_same_as_start_);
243       break;
244     }
245 
246     if (TooManyInternalKeysSkipped()) {
247       return false;
248     }
249 
250     if (IsVisible(ikey_.sequence)) {
251       // If the previous entry is of seqnum 0, the current entry will not
252       // possibly be skipped. This condition can potentially be relaxed to
253       // prev_key.seq <= ikey_.sequence. We are cautious because it will be more
254       // prone to bugs causing the same user key with the same sequence number.
255       if (!is_prev_key_seqnum_zero && skipping_saved_key &&
256           user_comparator_.Compare(ikey_.user_key, saved_key_.GetUserKey()) <=
257               0) {
258         num_skipped++;  // skip this entry
259         PERF_COUNTER_ADD(internal_key_skipped_count, 1);
260       } else {
261         assert(!skipping_saved_key ||
262                user_comparator_.Compare(ikey_.user_key,
263                                         saved_key_.GetUserKey()) > 0);
264         num_skipped = 0;
265         reseek_done = false;
266         switch (ikey_.type) {
267           case kTypeDeletion:
268           case kTypeSingleDeletion:
269             // Arrange to skip all upcoming entries for this key since
270             // they are hidden by this deletion.
271             // if iterartor specified start_seqnum we
272             // 1) return internal key, including the type
273             // 2) return ikey only if ikey.seqnum >= start_seqnum_
274             // note that if deletion seqnum is < start_seqnum_ we
275             // just skip it like in normal iterator.
276             if (start_seqnum_ > 0 && ikey_.sequence >= start_seqnum_)  {
277               saved_key_.SetInternalKey(ikey_);
278               valid_ = true;
279               return true;
280             } else {
281               saved_key_.SetUserKey(
282                   ikey_.user_key, !pin_thru_lifetime_ ||
283                                       !iter_.iter()->IsKeyPinned() /* copy */);
284               skipping_saved_key = true;
285               PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
286             }
287             break;
288           case kTypeValue:
289           case kTypeBlobIndex:
290             if (start_seqnum_ > 0) {
291               // we are taking incremental snapshot here
292               // incremental snapshots aren't supported on DB with range deletes
293               assert(!(
294                 (ikey_.type == kTypeBlobIndex) && (start_seqnum_ > 0)
295               ));
296               if (ikey_.sequence >= start_seqnum_) {
297                 saved_key_.SetInternalKey(ikey_);
298                 valid_ = true;
299                 return true;
300               } else {
301                 // this key and all previous versions shouldn't be included,
302                 // skipping_saved_key
303                 saved_key_.SetUserKey(
304                     ikey_.user_key,
305                     !pin_thru_lifetime_ ||
306                         !iter_.iter()->IsKeyPinned() /* copy */);
307                 skipping_saved_key = true;
308               }
309             } else {
310               saved_key_.SetUserKey(
311                   ikey_.user_key, !pin_thru_lifetime_ ||
312                                       !iter_.iter()->IsKeyPinned() /* copy */);
313               if (range_del_agg_.ShouldDelete(
314                       ikey_, RangeDelPositioningMode::kForwardTraversal)) {
315                 // Arrange to skip all upcoming entries for this key since
316                 // they are hidden by this deletion.
317                 skipping_saved_key = true;
318                 num_skipped = 0;
319                 reseek_done = false;
320                 PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
321               } else if (ikey_.type == kTypeBlobIndex) {
322                 if (!allow_blob_) {
323                   ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
324                   status_ = Status::NotSupported(
325                       "Encounter unexpected blob index. Please open DB with "
326                       "rocksdb::blob_db::BlobDB instead.");
327                   valid_ = false;
328                   return false;
329                 }
330 
331                 is_blob_ = true;
332                 valid_ = true;
333                 return true;
334               } else {
335                 valid_ = true;
336                 return true;
337               }
338             }
339             break;
340           case kTypeMerge:
341             saved_key_.SetUserKey(
342                 ikey_.user_key,
343                 !pin_thru_lifetime_ || !iter_.iter()->IsKeyPinned() /* copy */);
344             if (range_del_agg_.ShouldDelete(
345                     ikey_, RangeDelPositioningMode::kForwardTraversal)) {
346               // Arrange to skip all upcoming entries for this key since
347               // they are hidden by this deletion.
348               skipping_saved_key = true;
349               num_skipped = 0;
350               reseek_done = false;
351               PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
352             } else {
353               // By now, we are sure the current ikey is going to yield a
354               // value
355               current_entry_is_merged_ = true;
356               valid_ = true;
357               return MergeValuesNewToOld();  // Go to a different state machine
358             }
359             break;
360           default:
361             assert(false);
362             break;
363         }
364       }
365     } else {
366       PERF_COUNTER_ADD(internal_recent_skipped_count, 1);
367 
368       // This key was inserted after our snapshot was taken.
369       // If this happens too many times in a row for the same user key, we want
370       // to seek to the target sequence number.
371       int cmp =
372           user_comparator_.Compare(ikey_.user_key, saved_key_.GetUserKey());
373       if (cmp == 0 || (skipping_saved_key && cmp <= 0)) {
374         num_skipped++;
375       } else {
376         saved_key_.SetUserKey(
377             ikey_.user_key,
378             !iter_.iter()->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
379         skipping_saved_key = false;
380         num_skipped = 0;
381         reseek_done = false;
382       }
383     }
384 
385     // If we have sequentially iterated via numerous equal keys, then it's
386     // better to seek so that we can avoid too many key comparisons.
387     //
388     // To avoid infinite loops, do not reseek if we have already attempted to
389     // reseek previously.
390     //
391     // TODO(lth): If we reseek to sequence number greater than ikey_.sequence,
392     // than it does not make sense to reseek as we would actually land further
393     // away from the desired key. There is opportunity for optimization here.
394     if (num_skipped > max_skip_ && !reseek_done) {
395       is_key_seqnum_zero_ = false;
396       num_skipped = 0;
397       reseek_done = true;
398       std::string last_key;
399       if (skipping_saved_key) {
400         // We're looking for the next user-key but all we see are the same
401         // user-key with decreasing sequence numbers. Fast forward to
402         // sequence number 0 and type deletion (the smallest type).
403         AppendInternalKey(&last_key, ParsedInternalKey(saved_key_.GetUserKey(),
404                                                        0, kTypeDeletion));
405         // Don't set skipping_saved_key = false because we may still see more
406         // user-keys equal to saved_key_.
407       } else {
408         // We saw multiple entries with this user key and sequence numbers
409         // higher than sequence_. Fast forward to sequence_.
410         // Note that this only covers a case when a higher key was overwritten
411         // many times since our snapshot was taken, not the case when a lot of
412         // different keys were inserted after our snapshot was taken.
413         AppendInternalKey(&last_key,
414                           ParsedInternalKey(saved_key_.GetUserKey(), sequence_,
415                                             kValueTypeForSeek));
416       }
417       iter_.Seek(last_key);
418       RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
419     } else {
420       iter_.Next();
421     }
422   } while (iter_.Valid());
423 
424   valid_ = false;
425   return iter_.status().ok();
426 }
427 
428 // Merge values of the same user key starting from the current iter_ position
429 // Scan from the newer entries to older entries.
430 // PRE: iter_.key() points to the first merge type entry
431 //      saved_key_ stores the user key
432 // POST: saved_value_ has the merged value for the user key
433 //       iter_ points to the next entry (or invalid)
MergeValuesNewToOld()434 bool DBIter::MergeValuesNewToOld() {
435   if (!merge_operator_) {
436     ROCKS_LOG_ERROR(logger_, "Options::merge_operator is null.");
437     status_ = Status::InvalidArgument("merge_operator_ must be set.");
438     valid_ = false;
439     return false;
440   }
441 
442   // Temporarily pin the blocks that hold merge operands
443   TempPinData();
444   merge_context_.Clear();
445   // Start the merge process by pushing the first operand
446   merge_context_.PushOperand(
447       iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
448   TEST_SYNC_POINT("DBIter::MergeValuesNewToOld:PushedFirstOperand");
449 
450   ParsedInternalKey ikey;
451   Status s;
452   for (iter_.Next(); iter_.Valid(); iter_.Next()) {
453     TEST_SYNC_POINT("DBIter::MergeValuesNewToOld:SteppedToNextOperand");
454     if (!ParseKey(&ikey)) {
455       return false;
456     }
457 
458     if (!user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
459       // hit the next user key, stop right here
460       break;
461     } else if (kTypeDeletion == ikey.type || kTypeSingleDeletion == ikey.type ||
462                range_del_agg_.ShouldDelete(
463                    ikey, RangeDelPositioningMode::kForwardTraversal)) {
464       // hit a delete with the same user key, stop right here
465       // iter_ is positioned after delete
466       iter_.Next();
467       break;
468     } else if (kTypeValue == ikey.type) {
469       // hit a put, merge the put value with operands and store the
470       // final result in saved_value_. We are done!
471       const Slice val = iter_.value();
472       s = MergeHelper::TimedFullMerge(
473           merge_operator_, ikey.user_key, &val, merge_context_.GetOperands(),
474           &saved_value_, logger_, statistics_, env_, &pinned_value_, true);
475       if (!s.ok()) {
476         valid_ = false;
477         status_ = s;
478         return false;
479       }
480       // iter_ is positioned after put
481       iter_.Next();
482       if (!iter_.status().ok()) {
483         valid_ = false;
484         return false;
485       }
486       return true;
487     } else if (kTypeMerge == ikey.type) {
488       // hit a merge, add the value as an operand and run associative merge.
489       // when complete, add result to operands and continue.
490       merge_context_.PushOperand(
491           iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
492       PERF_COUNTER_ADD(internal_merge_count, 1);
493     } else if (kTypeBlobIndex == ikey.type) {
494       if (!allow_blob_) {
495         ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
496         status_ = Status::NotSupported(
497             "Encounter unexpected blob index. Please open DB with "
498             "rocksdb::blob_db::BlobDB instead.");
499       } else {
500         status_ =
501             Status::NotSupported("Blob DB does not support merge operator.");
502       }
503       valid_ = false;
504       return false;
505     } else {
506       assert(false);
507     }
508   }
509 
510   if (!iter_.status().ok()) {
511     valid_ = false;
512     return false;
513   }
514 
515   // we either exhausted all internal keys under this user key, or hit
516   // a deletion marker.
517   // feed null as the existing value to the merge operator, such that
518   // client can differentiate this scenario and do things accordingly.
519   s = MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetUserKey(),
520                                   nullptr, merge_context_.GetOperands(),
521                                   &saved_value_, logger_, statistics_, env_,
522                                   &pinned_value_, true);
523   if (!s.ok()) {
524     valid_ = false;
525     status_ = s;
526     return false;
527   }
528 
529   assert(status_.ok());
530   return true;
531 }
532 
Prev()533 void DBIter::Prev() {
534   assert(valid_);
535   assert(status_.ok());
536 
537   PERF_CPU_TIMER_GUARD(iter_prev_cpu_nanos, env_);
538   ReleaseTempPinnedData();
539   ResetInternalKeysSkippedCounter();
540   bool ok = true;
541   if (direction_ == kForward) {
542     if (!ReverseToBackward()) {
543       ok = false;
544     }
545   }
546   if (ok) {
547     Slice prefix;
548     if (prefix_same_as_start_) {
549       assert(prefix_extractor_ != nullptr);
550       prefix = prefix_.GetUserKey();
551     }
552     PrevInternal(prefix_same_as_start_ ? &prefix : nullptr);
553   }
554 
555   if (statistics_ != nullptr) {
556     local_stats_.prev_count_++;
557     if (valid_) {
558       local_stats_.prev_found_count_++;
559       local_stats_.bytes_read_ += (key().size() + value().size());
560     }
561   }
562 }
563 
ReverseToForward()564 bool DBIter::ReverseToForward() {
565   assert(iter_.status().ok());
566 
567   // When moving backwards, iter_ is positioned on _previous_ key, which may
568   // not exist or may have different prefix than the current key().
569   // If that's the case, seek iter_ to current key.
570   if ((prefix_extractor_ != nullptr && !total_order_seek_) || !iter_.Valid()) {
571     IterKey last_key;
572     last_key.SetInternalKey(ParsedInternalKey(
573         saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek));
574     iter_.Seek(last_key.GetInternalKey());
575   }
576 
577   direction_ = kForward;
578   // Skip keys less than the current key() (a.k.a. saved_key_).
579   while (iter_.Valid()) {
580     ParsedInternalKey ikey;
581     if (!ParseKey(&ikey)) {
582       return false;
583     }
584     if (user_comparator_.Compare(ikey.user_key, saved_key_.GetUserKey()) >= 0) {
585       return true;
586     }
587     iter_.Next();
588   }
589 
590   if (!iter_.status().ok()) {
591     valid_ = false;
592     return false;
593   }
594 
595   return true;
596 }
597 
598 // Move iter_ to the key before saved_key_.
ReverseToBackward()599 bool DBIter::ReverseToBackward() {
600   assert(iter_.status().ok());
601 
602   // When current_entry_is_merged_ is true, iter_ may be positioned on the next
603   // key, which may not exist or may have prefix different from current.
604   // If that's the case, seek to saved_key_.
605   if (current_entry_is_merged_ &&
606       ((prefix_extractor_ != nullptr && !total_order_seek_) ||
607        !iter_.Valid())) {
608     IterKey last_key;
609     // Using kMaxSequenceNumber and kValueTypeForSeek
610     // (not kValueTypeForSeekForPrev) to seek to a key strictly smaller
611     // than saved_key_.
612     last_key.SetInternalKey(ParsedInternalKey(
613         saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek));
614     if (prefix_extractor_ != nullptr && !total_order_seek_) {
615       iter_.SeekForPrev(last_key.GetInternalKey());
616     } else {
617       // Some iterators may not support SeekForPrev(), so we avoid using it
618       // when prefix seek mode is disabled. This is somewhat expensive
619       // (an extra Prev(), as well as an extra change of direction of iter_),
620       // so we may need to reconsider it later.
621       iter_.Seek(last_key.GetInternalKey());
622       if (!iter_.Valid() && iter_.status().ok()) {
623         iter_.SeekToLast();
624       }
625     }
626   }
627 
628   direction_ = kReverse;
629   return FindUserKeyBeforeSavedKey();
630 }
631 
PrevInternal(const Slice * prefix)632 void DBIter::PrevInternal(const Slice* prefix) {
633   while (iter_.Valid()) {
634     saved_key_.SetUserKey(
635         ExtractUserKey(iter_.key()),
636         !iter_.iter()->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
637 
638     assert(prefix == nullptr || prefix_extractor_ != nullptr);
639     if (prefix != nullptr &&
640         prefix_extractor_->Transform(saved_key_.GetUserKey())
641                 .compare(*prefix) != 0) {
642       assert(prefix_same_as_start_);
643       // Current key does not have the same prefix as start
644       valid_ = false;
645       return;
646     }
647 
648     assert(iterate_lower_bound_ == nullptr || iter_.MayBeOutOfLowerBound() ||
649            user_comparator_.Compare(saved_key_.GetUserKey(),
650                                     *iterate_lower_bound_) >= 0);
651     if (iterate_lower_bound_ != nullptr && iter_.MayBeOutOfLowerBound() &&
652         user_comparator_.Compare(saved_key_.GetUserKey(),
653                                  *iterate_lower_bound_) < 0) {
654       // We've iterated earlier than the user-specified lower bound.
655       valid_ = false;
656       return;
657     }
658 
659     if (!FindValueForCurrentKey()) {  // assigns valid_
660       return;
661     }
662 
663     // Whether or not we found a value for current key, we need iter_ to end up
664     // on a smaller key.
665     if (!FindUserKeyBeforeSavedKey()) {
666       return;
667     }
668 
669     if (valid_) {
670       // Found the value.
671       return;
672     }
673 
674     if (TooManyInternalKeysSkipped(false)) {
675       return;
676     }
677   }
678 
679   // We haven't found any key - iterator is not valid
680   valid_ = false;
681 }
682 
683 // Used for backwards iteration.
684 // Looks at the entries with user key saved_key_ and finds the most up-to-date
685 // value for it, or executes a merge, or determines that the value was deleted.
686 // Sets valid_ to true if the value is found and is ready to be presented to
687 // the user through value().
688 // Sets valid_ to false if the value was deleted, and we should try another key.
689 // Returns false if an error occurred, and !status().ok() and !valid_.
690 //
691 // PRE: iter_ is positioned on the last entry with user key equal to saved_key_.
692 // POST: iter_ is positioned on one of the entries equal to saved_key_, or on
693 //       the entry just before them, or on the entry just after them.
FindValueForCurrentKey()694 bool DBIter::FindValueForCurrentKey() {
695   assert(iter_.Valid());
696   merge_context_.Clear();
697   current_entry_is_merged_ = false;
698   // last entry before merge (could be kTypeDeletion, kTypeSingleDeletion or
699   // kTypeValue)
700   ValueType last_not_merge_type = kTypeDeletion;
701   ValueType last_key_entry_type = kTypeDeletion;
702 
703   // Temporarily pin blocks that hold (merge operands / the value)
704   ReleaseTempPinnedData();
705   TempPinData();
706   size_t num_skipped = 0;
707   while (iter_.Valid()) {
708     ParsedInternalKey ikey;
709     if (!ParseKey(&ikey)) {
710       return false;
711     }
712 
713     if (!IsVisible(ikey.sequence) ||
714         !user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
715       break;
716     }
717     if (TooManyInternalKeysSkipped()) {
718       return false;
719     }
720 
721     // This user key has lots of entries.
722     // We're going from old to new, and it's taking too long. Let's do a Seek()
723     // and go from new to old. This helps when a key was overwritten many times.
724     if (num_skipped >= max_skip_) {
725       return FindValueForCurrentKeyUsingSeek();
726     }
727 
728     last_key_entry_type = ikey.type;
729     switch (last_key_entry_type) {
730       case kTypeValue:
731       case kTypeBlobIndex:
732         if (range_del_agg_.ShouldDelete(
733                 ikey, RangeDelPositioningMode::kBackwardTraversal)) {
734           last_key_entry_type = kTypeRangeDeletion;
735           PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
736         } else {
737           assert(iter_.iter()->IsValuePinned());
738           pinned_value_ = iter_.value();
739         }
740         merge_context_.Clear();
741         last_not_merge_type = last_key_entry_type;
742         break;
743       case kTypeDeletion:
744       case kTypeSingleDeletion:
745         merge_context_.Clear();
746         last_not_merge_type = last_key_entry_type;
747         PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
748         break;
749       case kTypeMerge:
750         if (range_del_agg_.ShouldDelete(
751                 ikey, RangeDelPositioningMode::kBackwardTraversal)) {
752           merge_context_.Clear();
753           last_key_entry_type = kTypeRangeDeletion;
754           last_not_merge_type = last_key_entry_type;
755           PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
756         } else {
757           assert(merge_operator_ != nullptr);
758           merge_context_.PushOperandBack(
759               iter_.value(),
760               iter_.iter()->IsValuePinned() /* operand_pinned */);
761           PERF_COUNTER_ADD(internal_merge_count, 1);
762         }
763         break;
764       default:
765         assert(false);
766     }
767 
768     PERF_COUNTER_ADD(internal_key_skipped_count, 1);
769     iter_.Prev();
770     ++num_skipped;
771   }
772 
773   if (!iter_.status().ok()) {
774     valid_ = false;
775     return false;
776   }
777 
778   Status s;
779   is_blob_ = false;
780   switch (last_key_entry_type) {
781     case kTypeDeletion:
782     case kTypeSingleDeletion:
783     case kTypeRangeDeletion:
784       valid_ = false;
785       return true;
786     case kTypeMerge:
787       current_entry_is_merged_ = true;
788       if (last_not_merge_type == kTypeDeletion ||
789           last_not_merge_type == kTypeSingleDeletion ||
790           last_not_merge_type == kTypeRangeDeletion) {
791         s = MergeHelper::TimedFullMerge(
792             merge_operator_, saved_key_.GetUserKey(), nullptr,
793             merge_context_.GetOperands(), &saved_value_, logger_, statistics_,
794             env_, &pinned_value_, true);
795       } else if (last_not_merge_type == kTypeBlobIndex) {
796         if (!allow_blob_) {
797           ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
798           status_ = Status::NotSupported(
799               "Encounter unexpected blob index. Please open DB with "
800               "rocksdb::blob_db::BlobDB instead.");
801         } else {
802           status_ =
803               Status::NotSupported("Blob DB does not support merge operator.");
804         }
805         valid_ = false;
806         return false;
807       } else {
808         assert(last_not_merge_type == kTypeValue);
809         s = MergeHelper::TimedFullMerge(
810             merge_operator_, saved_key_.GetUserKey(), &pinned_value_,
811             merge_context_.GetOperands(), &saved_value_, logger_, statistics_,
812             env_, &pinned_value_, true);
813       }
814       break;
815     case kTypeValue:
816       // do nothing - we've already has value in pinned_value_
817       break;
818     case kTypeBlobIndex:
819       if (!allow_blob_) {
820         ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
821         status_ = Status::NotSupported(
822             "Encounter unexpected blob index. Please open DB with "
823             "rocksdb::blob_db::BlobDB instead.");
824         valid_ = false;
825         return false;
826       }
827       is_blob_ = true;
828       break;
829     default:
830       assert(false);
831       break;
832   }
833   if (!s.ok()) {
834     valid_ = false;
835     status_ = s;
836     return false;
837   }
838   valid_ = true;
839   return true;
840 }
841 
842 // This function is used in FindValueForCurrentKey.
843 // We use Seek() function instead of Prev() to find necessary value
844 // TODO: This is very similar to FindNextUserEntry() and MergeValuesNewToOld().
845 //       Would be nice to reuse some code.
FindValueForCurrentKeyUsingSeek()846 bool DBIter::FindValueForCurrentKeyUsingSeek() {
847   // FindValueForCurrentKey will enable pinning before calling
848   // FindValueForCurrentKeyUsingSeek()
849   assert(pinned_iters_mgr_.PinningEnabled());
850   std::string last_key;
851   AppendInternalKey(&last_key, ParsedInternalKey(saved_key_.GetUserKey(),
852                                                  sequence_, kValueTypeForSeek));
853   iter_.Seek(last_key);
854   RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
855 
856   // In case read_callback presents, the value we seek to may not be visible.
857   // Find the next value that's visible.
858   ParsedInternalKey ikey;
859   is_blob_ = false;
860   while (true) {
861     if (!iter_.Valid()) {
862       valid_ = false;
863       return iter_.status().ok();
864     }
865 
866     if (!ParseKey(&ikey)) {
867       return false;
868     }
869     if (!user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
870       // No visible values for this key, even though FindValueForCurrentKey()
871       // has seen some. This is possible if we're using a tailing iterator, and
872       // the entries were discarded in a compaction.
873       valid_ = false;
874       return true;
875     }
876 
877     if (IsVisible(ikey.sequence)) {
878       break;
879     }
880 
881     iter_.Next();
882   }
883 
884   if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
885       range_del_agg_.ShouldDelete(
886           ikey, RangeDelPositioningMode::kBackwardTraversal)) {
887     valid_ = false;
888     return true;
889   }
890   if (ikey.type == kTypeBlobIndex && !allow_blob_) {
891     ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
892     status_ = Status::NotSupported(
893         "Encounter unexpected blob index. Please open DB with "
894         "rocksdb::blob_db::BlobDB instead.");
895     valid_ = false;
896     return false;
897   }
898   if (ikey.type == kTypeValue || ikey.type == kTypeBlobIndex) {
899     assert(iter_.iter()->IsValuePinned());
900     pinned_value_ = iter_.value();
901     is_blob_ = (ikey.type == kTypeBlobIndex);
902     valid_ = true;
903     return true;
904   }
905 
906   // kTypeMerge. We need to collect all kTypeMerge values and save them
907   // in operands
908   assert(ikey.type == kTypeMerge);
909   current_entry_is_merged_ = true;
910   merge_context_.Clear();
911   merge_context_.PushOperand(
912       iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
913   while (true) {
914     iter_.Next();
915 
916     if (!iter_.Valid()) {
917       if (!iter_.status().ok()) {
918         valid_ = false;
919         return false;
920       }
921       break;
922     }
923     if (!ParseKey(&ikey)) {
924       return false;
925     }
926     if (!user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
927       break;
928     }
929 
930     if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
931         range_del_agg_.ShouldDelete(
932             ikey, RangeDelPositioningMode::kForwardTraversal)) {
933       break;
934     } else if (ikey.type == kTypeValue) {
935       const Slice val = iter_.value();
936       Status s = MergeHelper::TimedFullMerge(
937           merge_operator_, saved_key_.GetUserKey(), &val,
938           merge_context_.GetOperands(), &saved_value_, logger_, statistics_,
939           env_, &pinned_value_, true);
940       if (!s.ok()) {
941         valid_ = false;
942         status_ = s;
943         return false;
944       }
945       valid_ = true;
946       return true;
947     } else if (ikey.type == kTypeMerge) {
948       merge_context_.PushOperand(
949           iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
950       PERF_COUNTER_ADD(internal_merge_count, 1);
951     } else if (ikey.type == kTypeBlobIndex) {
952       if (!allow_blob_) {
953         ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
954         status_ = Status::NotSupported(
955             "Encounter unexpected blob index. Please open DB with "
956             "rocksdb::blob_db::BlobDB instead.");
957       } else {
958         status_ =
959             Status::NotSupported("Blob DB does not support merge operator.");
960       }
961       valid_ = false;
962       return false;
963     } else {
964       assert(false);
965     }
966   }
967 
968   Status s = MergeHelper::TimedFullMerge(
969       merge_operator_, saved_key_.GetUserKey(), nullptr,
970       merge_context_.GetOperands(), &saved_value_, logger_, statistics_, env_,
971       &pinned_value_, true);
972   if (!s.ok()) {
973     valid_ = false;
974     status_ = s;
975     return false;
976   }
977 
978   // Make sure we leave iter_ in a good state. If it's valid and we don't care
979   // about prefixes, that's already good enough. Otherwise it needs to be
980   // seeked to the current key.
981   if ((prefix_extractor_ != nullptr && !total_order_seek_) || !iter_.Valid()) {
982     if (prefix_extractor_ != nullptr && !total_order_seek_) {
983       iter_.SeekForPrev(last_key);
984     } else {
985       iter_.Seek(last_key);
986       if (!iter_.Valid() && iter_.status().ok()) {
987         iter_.SeekToLast();
988       }
989     }
990     RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
991   }
992 
993   valid_ = true;
994   return true;
995 }
996 
997 // Move backwards until the key smaller than saved_key_.
998 // Changes valid_ only if return value is false.
FindUserKeyBeforeSavedKey()999 bool DBIter::FindUserKeyBeforeSavedKey() {
1000   assert(status_.ok());
1001   size_t num_skipped = 0;
1002   while (iter_.Valid()) {
1003     ParsedInternalKey ikey;
1004     if (!ParseKey(&ikey)) {
1005       return false;
1006     }
1007 
1008     if (user_comparator_.Compare(ikey.user_key, saved_key_.GetUserKey()) < 0) {
1009       return true;
1010     }
1011 
1012     if (TooManyInternalKeysSkipped()) {
1013       return false;
1014     }
1015 
1016     assert(ikey.sequence != kMaxSequenceNumber);
1017     if (!IsVisible(ikey.sequence)) {
1018       PERF_COUNTER_ADD(internal_recent_skipped_count, 1);
1019     } else {
1020       PERF_COUNTER_ADD(internal_key_skipped_count, 1);
1021     }
1022 
1023     if (num_skipped >= max_skip_) {
1024       num_skipped = 0;
1025       IterKey last_key;
1026       last_key.SetInternalKey(ParsedInternalKey(
1027           saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek));
1028       // It would be more efficient to use SeekForPrev() here, but some
1029       // iterators may not support it.
1030       iter_.Seek(last_key.GetInternalKey());
1031       RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
1032       if (!iter_.Valid()) {
1033         break;
1034       }
1035     } else {
1036       ++num_skipped;
1037     }
1038 
1039     iter_.Prev();
1040   }
1041 
1042   if (!iter_.status().ok()) {
1043     valid_ = false;
1044     return false;
1045   }
1046 
1047   return true;
1048 }
1049 
TooManyInternalKeysSkipped(bool increment)1050 bool DBIter::TooManyInternalKeysSkipped(bool increment) {
1051   if ((max_skippable_internal_keys_ > 0) &&
1052       (num_internal_keys_skipped_ > max_skippable_internal_keys_)) {
1053     valid_ = false;
1054     status_ = Status::Incomplete("Too many internal keys skipped.");
1055     return true;
1056   } else if (increment) {
1057     num_internal_keys_skipped_++;
1058   }
1059   return false;
1060 }
1061 
IsVisible(SequenceNumber sequence)1062 bool DBIter::IsVisible(SequenceNumber sequence) {
1063   if (read_callback_ == nullptr) {
1064     return sequence <= sequence_;
1065   } else {
1066     return read_callback_->IsVisible(sequence);
1067   }
1068 }
1069 
SetSavedKeyToSeekTarget(const Slice & target)1070 void DBIter::SetSavedKeyToSeekTarget(const Slice& target) {
1071   is_key_seqnum_zero_ = false;
1072   SequenceNumber seq = sequence_;
1073   saved_key_.Clear();
1074   saved_key_.SetInternalKey(target, seq);
1075 
1076   if (iterate_lower_bound_ != nullptr &&
1077       user_comparator_.Compare(saved_key_.GetUserKey(), *iterate_lower_bound_) <
1078           0) {
1079     // Seek key is smaller than the lower bound.
1080     saved_key_.Clear();
1081     saved_key_.SetInternalKey(*iterate_lower_bound_, seq);
1082   }
1083 }
1084 
SetSavedKeyToSeekForPrevTarget(const Slice & target)1085 void DBIter::SetSavedKeyToSeekForPrevTarget(const Slice& target) {
1086   is_key_seqnum_zero_ = false;
1087   saved_key_.Clear();
1088   // now saved_key is used to store internal key.
1089   saved_key_.SetInternalKey(target, 0 /* sequence_number */,
1090                             kValueTypeForSeekForPrev);
1091 
1092   if (iterate_upper_bound_ != nullptr &&
1093       user_comparator_.Compare(saved_key_.GetUserKey(),
1094                                *iterate_upper_bound_) >= 0) {
1095     saved_key_.Clear();
1096     saved_key_.SetInternalKey(*iterate_upper_bound_, kMaxSequenceNumber);
1097   }
1098 }
1099 
Seek(const Slice & target)1100 void DBIter::Seek(const Slice& target) {
1101   PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, env_);
1102   StopWatch sw(env_, statistics_, DB_SEEK);
1103 
1104 #ifndef ROCKSDB_LITE
1105   if (db_impl_ != nullptr && cfd_ != nullptr) {
1106     db_impl_->TraceIteratorSeek(cfd_->GetID(), target);
1107   }
1108 #endif  // ROCKSDB_LITE
1109 
1110   status_ = Status::OK();
1111   ReleaseTempPinnedData();
1112   ResetInternalKeysSkippedCounter();
1113 
1114   // Seek the inner iterator based on the target key.
1115   {
1116     PERF_TIMER_GUARD(seek_internal_seek_time);
1117 
1118     SetSavedKeyToSeekTarget(target);
1119     iter_.Seek(saved_key_.GetInternalKey());
1120 
1121     range_del_agg_.InvalidateRangeDelMapPositions();
1122     RecordTick(statistics_, NUMBER_DB_SEEK);
1123   }
1124   if (!iter_.Valid()) {
1125     valid_ = false;
1126     return;
1127   }
1128   direction_ = kForward;
1129 
1130   // Now the inner iterator is placed to the target position. From there,
1131   // we need to find out the next key that is visible to the user.
1132   //
1133   ClearSavedValue();
1134   if (prefix_same_as_start_) {
1135     // The case where the iterator needs to be invalidated if it has exausted
1136     // keys within the same prefix of the seek key.
1137     assert(prefix_extractor_ != nullptr);
1138     Slice target_prefix;
1139     target_prefix = prefix_extractor_->Transform(target);
1140     FindNextUserEntry(false /* not skipping saved_key */,
1141                       &target_prefix /* prefix */);
1142     if (valid_) {
1143       // Remember the prefix of the seek key for the future Prev() call to
1144       // check.
1145       prefix_.SetUserKey(target_prefix);
1146     }
1147   } else {
1148     FindNextUserEntry(false /* not skipping saved_key */, nullptr);
1149   }
1150   if (!valid_) {
1151     return;
1152   }
1153 
1154   // Updating stats and perf context counters.
1155   if (statistics_ != nullptr) {
1156     // Decrement since we don't want to count this key as skipped
1157     RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
1158     RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
1159   }
1160   PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
1161 }
1162 
SeekForPrev(const Slice & target)1163 void DBIter::SeekForPrev(const Slice& target) {
1164   PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, env_);
1165   StopWatch sw(env_, statistics_, DB_SEEK);
1166 
1167 #ifndef ROCKSDB_LITE
1168   if (db_impl_ != nullptr && cfd_ != nullptr) {
1169     db_impl_->TraceIteratorSeekForPrev(cfd_->GetID(), target);
1170   }
1171 #endif  // ROCKSDB_LITE
1172 
1173   status_ = Status::OK();
1174   ReleaseTempPinnedData();
1175   ResetInternalKeysSkippedCounter();
1176 
1177   // Seek the inner iterator based on the target key.
1178   {
1179     PERF_TIMER_GUARD(seek_internal_seek_time);
1180     SetSavedKeyToSeekForPrevTarget(target);
1181     iter_.SeekForPrev(saved_key_.GetInternalKey());
1182     range_del_agg_.InvalidateRangeDelMapPositions();
1183     RecordTick(statistics_, NUMBER_DB_SEEK);
1184   }
1185   if (!iter_.Valid()) {
1186     valid_ = false;
1187     return;
1188   }
1189   direction_ = kReverse;
1190 
1191   // Now the inner iterator is placed to the target position. From there,
1192   // we need to find out the first key that is visible to the user in the
1193   // backward direction.
1194   ClearSavedValue();
1195   if (prefix_same_as_start_) {
1196     // The case where the iterator needs to be invalidated if it has exausted
1197     // keys within the same prefix of the seek key.
1198     assert(prefix_extractor_ != nullptr);
1199     Slice target_prefix;
1200     target_prefix = prefix_extractor_->Transform(target);
1201     PrevInternal(&target_prefix);
1202     if (valid_) {
1203       // Remember the prefix of the seek key for the future Prev() call to
1204       // check.
1205       prefix_.SetUserKey(target_prefix);
1206     }
1207   } else {
1208     PrevInternal(nullptr);
1209   }
1210 
1211   // Report stats and perf context.
1212   if (statistics_ != nullptr && valid_) {
1213     RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
1214     RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
1215     PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
1216   }
1217 }
1218 
SeekToFirst()1219 void DBIter::SeekToFirst() {
1220   if (iterate_lower_bound_ != nullptr) {
1221     Seek(*iterate_lower_bound_);
1222     return;
1223   }
1224   PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, env_);
1225   // Don't use iter_::Seek() if we set a prefix extractor
1226   // because prefix seek will be used.
1227   if (prefix_extractor_ != nullptr && !total_order_seek_) {
1228     max_skip_ = std::numeric_limits<uint64_t>::max();
1229   }
1230   status_ = Status::OK();
1231   direction_ = kForward;
1232   ReleaseTempPinnedData();
1233   ResetInternalKeysSkippedCounter();
1234   ClearSavedValue();
1235   is_key_seqnum_zero_ = false;
1236 
1237   {
1238     PERF_TIMER_GUARD(seek_internal_seek_time);
1239     iter_.SeekToFirst();
1240     range_del_agg_.InvalidateRangeDelMapPositions();
1241   }
1242 
1243   RecordTick(statistics_, NUMBER_DB_SEEK);
1244   if (iter_.Valid()) {
1245     saved_key_.SetUserKey(
1246         ExtractUserKey(iter_.key()),
1247         !iter_.iter()->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
1248     FindNextUserEntry(false /* not skipping saved_key */,
1249                       nullptr /* no prefix check */);
1250     if (statistics_ != nullptr) {
1251       if (valid_) {
1252         RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
1253         RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
1254         PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
1255       }
1256     }
1257   } else {
1258     valid_ = false;
1259   }
1260   if (valid_ && prefix_same_as_start_) {
1261     assert(prefix_extractor_ != nullptr);
1262     prefix_.SetUserKey(prefix_extractor_->Transform(saved_key_.GetUserKey()));
1263   }
1264 }
1265 
SeekToLast()1266 void DBIter::SeekToLast() {
1267   if (iterate_upper_bound_ != nullptr) {
1268     // Seek to last key strictly less than ReadOptions.iterate_upper_bound.
1269     SeekForPrev(*iterate_upper_bound_);
1270     if (Valid() && user_comparator_.Equal(*iterate_upper_bound_, key())) {
1271       ReleaseTempPinnedData();
1272       PrevInternal(nullptr);
1273     }
1274     return;
1275   }
1276 
1277   PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, env_);
1278   // Don't use iter_::Seek() if we set a prefix extractor
1279   // because prefix seek will be used.
1280   if (prefix_extractor_ != nullptr && !total_order_seek_) {
1281     max_skip_ = std::numeric_limits<uint64_t>::max();
1282   }
1283   status_ = Status::OK();
1284   direction_ = kReverse;
1285   ReleaseTempPinnedData();
1286   ResetInternalKeysSkippedCounter();
1287   ClearSavedValue();
1288   is_key_seqnum_zero_ = false;
1289 
1290   {
1291     PERF_TIMER_GUARD(seek_internal_seek_time);
1292     iter_.SeekToLast();
1293     range_del_agg_.InvalidateRangeDelMapPositions();
1294   }
1295   PrevInternal(nullptr);
1296   if (statistics_ != nullptr) {
1297     RecordTick(statistics_, NUMBER_DB_SEEK);
1298     if (valid_) {
1299       RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
1300       RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
1301       PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
1302     }
1303   }
1304   if (valid_ && prefix_same_as_start_) {
1305     assert(prefix_extractor_ != nullptr);
1306     prefix_.SetUserKey(prefix_extractor_->Transform(saved_key_.GetUserKey()));
1307   }
1308 }
1309 
NewDBIterator(Env * env,const ReadOptions & read_options,const ImmutableCFOptions & cf_options,const MutableCFOptions & mutable_cf_options,const Comparator * user_key_comparator,InternalIterator * internal_iter,const SequenceNumber & sequence,uint64_t max_sequential_skip_in_iterations,ReadCallback * read_callback,DBImpl * db_impl,ColumnFamilyData * cfd,bool allow_blob)1310 Iterator* NewDBIterator(Env* env, const ReadOptions& read_options,
1311                         const ImmutableCFOptions& cf_options,
1312                         const MutableCFOptions& mutable_cf_options,
1313                         const Comparator* user_key_comparator,
1314                         InternalIterator* internal_iter,
1315                         const SequenceNumber& sequence,
1316                         uint64_t max_sequential_skip_in_iterations,
1317                         ReadCallback* read_callback, DBImpl* db_impl,
1318                         ColumnFamilyData* cfd, bool allow_blob) {
1319   DBIter* db_iter = new DBIter(
1320       env, read_options, cf_options, mutable_cf_options, user_key_comparator,
1321       internal_iter, sequence, false, max_sequential_skip_in_iterations,
1322       read_callback, db_impl, cfd, allow_blob);
1323   return db_iter;
1324 }
1325 
1326 }  // namespace rocksdb
1327