1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. 2 // This source code is licensed under both the GPLv2 (found in the 3 // COPYING file in the root directory) and Apache 2.0 License 4 // (found in the LICENSE.Apache file in the root directory). 5 // 6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved. 7 // Use of this source code is governed by a BSD-style license that can be 8 // found in the LICENSE file. See the AUTHORS file for names of contributors. 9 10 #pragma once 11 #include <stdint.h> 12 #include <string> 13 #include "db/db_impl/db_impl.h" 14 #include "db/dbformat.h" 15 #include "db/range_del_aggregator.h" 16 #include "memory/arena.h" 17 #include "options/cf_options.h" 18 #include "rocksdb/db.h" 19 #include "rocksdb/iterator.h" 20 #include "table/iterator_wrapper.h" 21 #include "util/autovector.h" 22 23 namespace rocksdb { 24 25 // This file declares the factory functions of DBIter, in its original form 26 // or a wrapped form with class ArenaWrappedDBIter, which is defined here. 27 // Class DBIter, which is declared and implemented inside db_iter.cc, is 28 // a iterator that converts internal keys (yielded by an InternalIterator) 29 // that were live at the specified sequence number into appropriate user 30 // keys. 31 // Each internal key is consist of a user key, a sequence number, and a value 32 // type. DBIter deals with multiple key versions, tombstones, merge operands, 33 // etc, and exposes an Iterator. 34 // For example, DBIter may wrap following InternalIterator: 35 // user key: AAA value: v3 seqno: 100 type: Put 36 // user key: AAA value: v2 seqno: 97 type: Put 37 // user key: AAA value: v1 seqno: 95 type: Put 38 // user key: BBB value: v1 seqno: 90 type: Put 39 // user key: BBC value: N/A seqno: 98 type: Delete 40 // user key: BBC value: v1 seqno: 95 type: Put 41 // If the snapshot passed in is 102, then the DBIter is expected to 42 // expose the following iterator: 43 // key: AAA value: v3 44 // key: BBB value: v1 45 // If the snapshot passed in is 96, then it should expose: 46 // key: AAA value: v1 47 // key: BBB value: v1 48 // key: BBC value: v1 49 // 50 51 // Memtables and sstables that make the DB representation contain 52 // (userkey,seq,type) => uservalue entries. DBIter 53 // combines multiple entries for the same userkey found in the DB 54 // representation into a single entry while accounting for sequence 55 // numbers, deletion markers, overwrites, etc. 56 class DBIter final : public Iterator { 57 public: 58 // The following is grossly complicated. TODO: clean it up 59 // Which direction is the iterator currently moving? 60 // (1) When moving forward: 61 // (1a) if current_entry_is_merged_ = false, the internal iterator is 62 // positioned at the exact entry that yields this->key(), this->value() 63 // (1b) if current_entry_is_merged_ = true, the internal iterator is 64 // positioned immediately after the last entry that contributed to the 65 // current this->value(). That entry may or may not have key equal to 66 // this->key(). 67 // (2) When moving backwards, the internal iterator is positioned 68 // just before all entries whose user key == this->key(). 69 enum Direction { kForward, kReverse }; 70 71 // LocalStatistics contain Statistics counters that will be aggregated per 72 // each iterator instance and then will be sent to the global statistics when 73 // the iterator is destroyed. 74 // 75 // The purpose of this approach is to avoid perf regression happening 76 // when multiple threads bump the atomic counters from a DBIter::Next(). 77 struct LocalStatistics { LocalStatisticsLocalStatistics78 explicit LocalStatistics() { ResetCounters(); } 79 ResetCountersLocalStatistics80 void ResetCounters() { 81 next_count_ = 0; 82 next_found_count_ = 0; 83 prev_count_ = 0; 84 prev_found_count_ = 0; 85 bytes_read_ = 0; 86 skip_count_ = 0; 87 } 88 BumpGlobalStatisticsLocalStatistics89 void BumpGlobalStatistics(Statistics* global_statistics) { 90 RecordTick(global_statistics, NUMBER_DB_NEXT, next_count_); 91 RecordTick(global_statistics, NUMBER_DB_NEXT_FOUND, next_found_count_); 92 RecordTick(global_statistics, NUMBER_DB_PREV, prev_count_); 93 RecordTick(global_statistics, NUMBER_DB_PREV_FOUND, prev_found_count_); 94 RecordTick(global_statistics, ITER_BYTES_READ, bytes_read_); 95 RecordTick(global_statistics, NUMBER_ITER_SKIP, skip_count_); 96 PERF_COUNTER_ADD(iter_read_bytes, bytes_read_); 97 ResetCounters(); 98 } 99 100 // Map to Tickers::NUMBER_DB_NEXT 101 uint64_t next_count_; 102 // Map to Tickers::NUMBER_DB_NEXT_FOUND 103 uint64_t next_found_count_; 104 // Map to Tickers::NUMBER_DB_PREV 105 uint64_t prev_count_; 106 // Map to Tickers::NUMBER_DB_PREV_FOUND 107 uint64_t prev_found_count_; 108 // Map to Tickers::ITER_BYTES_READ 109 uint64_t bytes_read_; 110 // Map to Tickers::NUMBER_ITER_SKIP 111 uint64_t skip_count_; 112 }; 113 114 DBIter(Env* _env, const ReadOptions& read_options, 115 const ImmutableCFOptions& cf_options, 116 const MutableCFOptions& mutable_cf_options, const Comparator* cmp, 117 InternalIterator* iter, SequenceNumber s, bool arena_mode, 118 uint64_t max_sequential_skip_in_iterations, 119 ReadCallback* read_callback, DBImpl* db_impl, ColumnFamilyData* cfd, 120 bool allow_blob); 121 122 // No copying allowed 123 DBIter(const DBIter&) = delete; 124 void operator=(const DBIter&) = delete; 125 ~DBIter()126 ~DBIter() override { 127 // Release pinned data if any 128 if (pinned_iters_mgr_.PinningEnabled()) { 129 pinned_iters_mgr_.ReleasePinnedData(); 130 } 131 RecordTick(statistics_, NO_ITERATOR_DELETED); 132 ResetInternalKeysSkippedCounter(); 133 local_stats_.BumpGlobalStatistics(statistics_); 134 iter_.DeleteIter(arena_mode_); 135 } SetIter(InternalIterator * iter)136 virtual void SetIter(InternalIterator* iter) { 137 assert(iter_.iter() == nullptr); 138 iter_.Set(iter); 139 iter_.iter()->SetPinnedItersMgr(&pinned_iters_mgr_); 140 } GetRangeDelAggregator()141 virtual ReadRangeDelAggregator* GetRangeDelAggregator() { 142 return &range_del_agg_; 143 } 144 Valid()145 bool Valid() const override { return valid_; } key()146 Slice key() const override { 147 assert(valid_); 148 if (start_seqnum_ > 0) { 149 return saved_key_.GetInternalKey(); 150 } else { 151 return saved_key_.GetUserKey(); 152 } 153 } value()154 Slice value() const override { 155 assert(valid_); 156 if (current_entry_is_merged_) { 157 // If pinned_value_ is set then the result of merge operator is one of 158 // the merge operands and we should return it. 159 return pinned_value_.data() ? pinned_value_ : saved_value_; 160 } else if (direction_ == kReverse) { 161 return pinned_value_; 162 } else { 163 return iter_.value(); 164 } 165 } status()166 Status status() const override { 167 if (status_.ok()) { 168 return iter_.status(); 169 } else { 170 assert(!valid_); 171 return status_; 172 } 173 } IsBlob()174 bool IsBlob() const { 175 assert(valid_ && (allow_blob_ || !is_blob_)); 176 return is_blob_; 177 } 178 179 Status GetProperty(std::string prop_name, std::string* prop) override; 180 181 void Next() final override; 182 void Prev() final override; 183 void Seek(const Slice& target) final override; 184 void SeekForPrev(const Slice& target) final override; 185 void SeekToFirst() final override; 186 void SeekToLast() final override; env()187 Env* env() { return env_; } set_sequence(uint64_t s)188 void set_sequence(uint64_t s) { 189 sequence_ = s; 190 if (read_callback_) { 191 read_callback_->Refresh(s); 192 } 193 } set_valid(bool v)194 void set_valid(bool v) { valid_ = v; } 195 196 private: 197 // For all methods in this block: 198 // PRE: iter_->Valid() && status_.ok() 199 // Return false if there was an error, and status() is non-ok, valid_ = false; 200 // in this case callers would usually stop what they were doing and return. 201 bool ReverseToForward(); 202 bool ReverseToBackward(); 203 // Set saved_key_ to the seek key to target, with proper sequence number set. 204 // It might get adjusted if the seek key is smaller than iterator lower bound. 205 void SetSavedKeyToSeekTarget(const Slice& /*target*/); 206 // Set saved_key_ to the seek key to target, with proper sequence number set. 207 // It might get adjusted if the seek key is larger than iterator upper bound. 208 void SetSavedKeyToSeekForPrevTarget(const Slice& /*target*/); 209 bool FindValueForCurrentKey(); 210 bool FindValueForCurrentKeyUsingSeek(); 211 bool FindUserKeyBeforeSavedKey(); 212 // If `skipping_saved_key` is true, the function will keep iterating until it 213 // finds a user key that is larger than `saved_key_`. 214 // If `prefix` is not null, the iterator needs to stop when all keys for the 215 // prefix are exhausted and the interator is set to invalid. 216 bool FindNextUserEntry(bool skipping_saved_key, const Slice* prefix); 217 // Internal implementation of FindNextUserEntry(). 218 bool FindNextUserEntryInternal(bool skipping_saved_key, const Slice* prefix); 219 bool ParseKey(ParsedInternalKey* key); 220 bool MergeValuesNewToOld(); 221 222 // If prefix is not null, we need to set the iterator to invalid if no more 223 // entry can be found within the prefix. 224 void PrevInternal(const Slice* /*prefix*/); 225 bool TooManyInternalKeysSkipped(bool increment = true); 226 bool IsVisible(SequenceNumber sequence); 227 228 // Temporarily pin the blocks that we encounter until ReleaseTempPinnedData() 229 // is called TempPinData()230 void TempPinData() { 231 if (!pin_thru_lifetime_) { 232 pinned_iters_mgr_.StartPinning(); 233 } 234 } 235 236 // Release blocks pinned by TempPinData() ReleaseTempPinnedData()237 void ReleaseTempPinnedData() { 238 if (!pin_thru_lifetime_ && pinned_iters_mgr_.PinningEnabled()) { 239 pinned_iters_mgr_.ReleasePinnedData(); 240 } 241 } 242 ClearSavedValue()243 inline void ClearSavedValue() { 244 if (saved_value_.capacity() > 1048576) { 245 std::string empty; 246 swap(empty, saved_value_); 247 } else { 248 saved_value_.clear(); 249 } 250 } 251 ResetInternalKeysSkippedCounter()252 inline void ResetInternalKeysSkippedCounter() { 253 local_stats_.skip_count_ += num_internal_keys_skipped_; 254 if (valid_) { 255 local_stats_.skip_count_--; 256 } 257 num_internal_keys_skipped_ = 0; 258 } 259 260 const SliceTransform* prefix_extractor_; 261 Env* const env_; 262 Logger* logger_; 263 UserComparatorWrapper user_comparator_; 264 const MergeOperator* const merge_operator_; 265 IteratorWrapper iter_; 266 ReadCallback* read_callback_; 267 // Max visible sequence number. It is normally the snapshot seq unless we have 268 // uncommitted data in db as in WriteUnCommitted. 269 SequenceNumber sequence_; 270 271 IterKey saved_key_; 272 // Reusable internal key data structure. This is only used inside one function 273 // and should not be used across functions. Reusing this object can reduce 274 // overhead of calling construction of the function if creating it each time. 275 ParsedInternalKey ikey_; 276 std::string saved_value_; 277 Slice pinned_value_; 278 // for prefix seek mode to support prev() 279 Statistics* statistics_; 280 uint64_t max_skip_; 281 uint64_t max_skippable_internal_keys_; 282 uint64_t num_internal_keys_skipped_; 283 const Slice* iterate_lower_bound_; 284 const Slice* iterate_upper_bound_; 285 286 // The prefix of the seek key. It is only used when prefix_same_as_start_ 287 // is true and prefix extractor is not null. In Next() or Prev(), current keys 288 // will be checked against this prefix, so that the iterator can be 289 // invalidated if the keys in this prefix has been exhausted. Set it using 290 // SetUserKey() and use it using GetUserKey(). 291 IterKey prefix_; 292 293 Status status_; 294 Direction direction_; 295 bool valid_; 296 bool current_entry_is_merged_; 297 // True if we know that the current entry's seqnum is 0. 298 // This information is used as that the next entry will be for another 299 // user key. 300 bool is_key_seqnum_zero_; 301 const bool prefix_same_as_start_; 302 // Means that we will pin all data blocks we read as long the Iterator 303 // is not deleted, will be true if ReadOptions::pin_data is true 304 const bool pin_thru_lifetime_; 305 const bool total_order_seek_; 306 bool allow_blob_; 307 bool is_blob_; 308 bool arena_mode_; 309 // List of operands for merge operator. 310 MergeContext merge_context_; 311 ReadRangeDelAggregator range_del_agg_; 312 LocalStatistics local_stats_; 313 PinnedIteratorsManager pinned_iters_mgr_; 314 #ifdef ROCKSDB_LITE 315 ROCKSDB_FIELD_UNUSED 316 #endif 317 DBImpl* db_impl_; 318 #ifdef ROCKSDB_LITE 319 ROCKSDB_FIELD_UNUSED 320 #endif 321 ColumnFamilyData* cfd_; 322 // for diff snapshots we want the lower bound on the seqnum; 323 // if this value > 0 iterator will return internal keys 324 SequenceNumber start_seqnum_; 325 }; 326 // Return a new iterator that converts internal keys (yielded by 327 // "*internal_iter") that were live at the specified `sequence` number 328 // into appropriate user keys. 329 extern Iterator* NewDBIterator( 330 Env* env, const ReadOptions& read_options, 331 const ImmutableCFOptions& cf_options, 332 const MutableCFOptions& mutable_cf_options, 333 const Comparator* user_key_comparator, InternalIterator* internal_iter, 334 const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations, 335 ReadCallback* read_callback, DBImpl* db_impl = nullptr, 336 ColumnFamilyData* cfd = nullptr, bool allow_blob = false); 337 338 } // namespace rocksdb 339