1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. 2 // This source code is licensed under both the GPLv2 (found in the 3 // COPYING file in the root directory) and Apache 2.0 License 4 // (found in the LICENSE.Apache file in the root directory). 5 // 6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved. 7 // Use of this source code is governed by a BSD-style license that can be 8 // found in the LICENSE file. See the AUTHORS file for names of contributors. 9 10 #pragma once 11 #include <cstdint> 12 #include <string> 13 14 #include "db/db_impl/db_impl.h" 15 #include "db/range_del_aggregator.h" 16 #include "memory/arena.h" 17 #include "options/cf_options.h" 18 #include "rocksdb/db.h" 19 #include "rocksdb/iterator.h" 20 #include "table/iterator_wrapper.h" 21 #include "util/autovector.h" 22 23 namespace ROCKSDB_NAMESPACE { 24 class Version; 25 26 // This file declares the factory functions of DBIter, in its original form 27 // or a wrapped form with class ArenaWrappedDBIter, which is defined here. 28 // Class DBIter, which is declared and implemented inside db_iter.cc, is 29 // an iterator that converts internal keys (yielded by an InternalIterator) 30 // that were live at the specified sequence number into appropriate user 31 // keys. 32 // Each internal key consists of a user key, a sequence number, and a value 33 // type. DBIter deals with multiple key versions, tombstones, merge operands, 34 // etc, and exposes an Iterator. 35 // For example, DBIter may wrap following InternalIterator: 36 // user key: AAA value: v3 seqno: 100 type: Put 37 // user key: AAA value: v2 seqno: 97 type: Put 38 // user key: AAA value: v1 seqno: 95 type: Put 39 // user key: BBB value: v1 seqno: 90 type: Put 40 // user key: BBC value: N/A seqno: 98 type: Delete 41 // user key: BBC value: v1 seqno: 95 type: Put 42 // If the snapshot passed in is 102, then the DBIter is expected to 43 // expose the following iterator: 44 // key: AAA value: v3 45 // key: BBB value: v1 46 // If the snapshot passed in is 96, then it should expose: 47 // key: AAA value: v1 48 // key: BBB value: v1 49 // key: BBC value: v1 50 // 51 52 // Memtables and sstables that make the DB representation contain 53 // (userkey,seq,type) => uservalue entries. DBIter 54 // combines multiple entries for the same userkey found in the DB 55 // representation into a single entry while accounting for sequence 56 // numbers, deletion markers, overwrites, etc. 57 class DBIter final : public Iterator { 58 public: 59 // The following is grossly complicated. TODO: clean it up 60 // Which direction is the iterator currently moving? 61 // (1) When moving forward: 62 // (1a) if current_entry_is_merged_ = false, the internal iterator is 63 // positioned at the exact entry that yields this->key(), this->value() 64 // (1b) if current_entry_is_merged_ = true, the internal iterator is 65 // positioned immediately after the last entry that contributed to the 66 // current this->value(). That entry may or may not have key equal to 67 // this->key(). 68 // (2) When moving backwards, the internal iterator is positioned 69 // just before all entries whose user key == this->key(). 70 enum Direction : uint8_t { kForward, kReverse }; 71 72 // LocalStatistics contain Statistics counters that will be aggregated per 73 // each iterator instance and then will be sent to the global statistics when 74 // the iterator is destroyed. 75 // 76 // The purpose of this approach is to avoid perf regression happening 77 // when multiple threads bump the atomic counters from a DBIter::Next(). 78 struct LocalStatistics { LocalStatisticsLocalStatistics79 explicit LocalStatistics() { ResetCounters(); } 80 ResetCountersLocalStatistics81 void ResetCounters() { 82 next_count_ = 0; 83 next_found_count_ = 0; 84 prev_count_ = 0; 85 prev_found_count_ = 0; 86 bytes_read_ = 0; 87 skip_count_ = 0; 88 } 89 BumpGlobalStatisticsLocalStatistics90 void BumpGlobalStatistics(Statistics* global_statistics) { 91 RecordTick(global_statistics, NUMBER_DB_NEXT, next_count_); 92 RecordTick(global_statistics, NUMBER_DB_NEXT_FOUND, next_found_count_); 93 RecordTick(global_statistics, NUMBER_DB_PREV, prev_count_); 94 RecordTick(global_statistics, NUMBER_DB_PREV_FOUND, prev_found_count_); 95 RecordTick(global_statistics, ITER_BYTES_READ, bytes_read_); 96 RecordTick(global_statistics, NUMBER_ITER_SKIP, skip_count_); 97 PERF_COUNTER_ADD(iter_read_bytes, bytes_read_); 98 ResetCounters(); 99 } 100 101 // Map to Tickers::NUMBER_DB_NEXT 102 uint64_t next_count_; 103 // Map to Tickers::NUMBER_DB_NEXT_FOUND 104 uint64_t next_found_count_; 105 // Map to Tickers::NUMBER_DB_PREV 106 uint64_t prev_count_; 107 // Map to Tickers::NUMBER_DB_PREV_FOUND 108 uint64_t prev_found_count_; 109 // Map to Tickers::ITER_BYTES_READ 110 uint64_t bytes_read_; 111 // Map to Tickers::NUMBER_ITER_SKIP 112 uint64_t skip_count_; 113 }; 114 115 DBIter(Env* _env, const ReadOptions& read_options, 116 const ImmutableOptions& ioptions, 117 const MutableCFOptions& mutable_cf_options, const Comparator* cmp, 118 InternalIterator* iter, const Version* version, SequenceNumber s, 119 bool arena_mode, uint64_t max_sequential_skip_in_iterations, 120 ReadCallback* read_callback, DBImpl* db_impl, ColumnFamilyData* cfd, 121 bool expose_blob_index); 122 123 // No copying allowed 124 DBIter(const DBIter&) = delete; 125 void operator=(const DBIter&) = delete; 126 ~DBIter()127 ~DBIter() override { 128 // Release pinned data if any 129 if (pinned_iters_mgr_.PinningEnabled()) { 130 pinned_iters_mgr_.ReleasePinnedData(); 131 } 132 RecordTick(statistics_, NO_ITERATOR_DELETED); 133 ResetInternalKeysSkippedCounter(); 134 local_stats_.BumpGlobalStatistics(statistics_); 135 iter_.DeleteIter(arena_mode_); 136 } SetIter(InternalIterator * iter)137 void SetIter(InternalIterator* iter) { 138 assert(iter_.iter() == nullptr); 139 iter_.Set(iter); 140 iter_.iter()->SetPinnedItersMgr(&pinned_iters_mgr_); 141 } GetRangeDelAggregator()142 ReadRangeDelAggregator* GetRangeDelAggregator() { return &range_del_agg_; } 143 Valid()144 bool Valid() const override { 145 #ifdef ROCKSDB_ASSERT_STATUS_CHECKED 146 if (valid_) { 147 status_.PermitUncheckedError(); 148 } 149 #endif // ROCKSDB_ASSERT_STATUS_CHECKED 150 return valid_; 151 } key()152 Slice key() const override { 153 assert(valid_); 154 if (start_seqnum_ > 0 || timestamp_lb_) { 155 return saved_key_.GetInternalKey(); 156 } else { 157 const Slice ukey_and_ts = saved_key_.GetUserKey(); 158 return Slice(ukey_and_ts.data(), ukey_and_ts.size() - timestamp_size_); 159 } 160 } value()161 Slice value() const override { 162 assert(valid_); 163 164 if (!expose_blob_index_ && is_blob_) { 165 return blob_value_; 166 } else if (current_entry_is_merged_) { 167 // If pinned_value_ is set then the result of merge operator is one of 168 // the merge operands and we should return it. 169 return pinned_value_.data() ? pinned_value_ : saved_value_; 170 } else if (direction_ == kReverse) { 171 return pinned_value_; 172 } else { 173 return iter_.value(); 174 } 175 } status()176 Status status() const override { 177 if (status_.ok()) { 178 return iter_.status(); 179 } else { 180 assert(!valid_); 181 return status_; 182 } 183 } timestamp()184 Slice timestamp() const override { 185 assert(valid_); 186 assert(timestamp_size_ > 0); 187 if (direction_ == kReverse) { 188 return saved_timestamp_; 189 } 190 const Slice ukey_and_ts = saved_key_.GetUserKey(); 191 assert(timestamp_size_ < ukey_and_ts.size()); 192 return ExtractTimestampFromUserKey(ukey_and_ts, timestamp_size_); 193 } IsBlob()194 bool IsBlob() const { 195 assert(valid_); 196 return is_blob_; 197 } 198 199 Status GetProperty(std::string prop_name, std::string* prop) override; 200 201 void Next() final override; 202 void Prev() final override; 203 // 'target' does not contain timestamp, even if user timestamp feature is 204 // enabled. 205 void Seek(const Slice& target) final override; 206 void SeekForPrev(const Slice& target) final override; 207 void SeekToFirst() final override; 208 void SeekToLast() final override; env()209 Env* env() const { return env_; } set_sequence(uint64_t s)210 void set_sequence(uint64_t s) { 211 sequence_ = s; 212 if (read_callback_) { 213 read_callback_->Refresh(s); 214 } 215 } set_valid(bool v)216 void set_valid(bool v) { valid_ = v; } 217 218 private: 219 // For all methods in this block: 220 // PRE: iter_->Valid() && status_.ok() 221 // Return false if there was an error, and status() is non-ok, valid_ = false; 222 // in this case callers would usually stop what they were doing and return. 223 bool ReverseToForward(); 224 bool ReverseToBackward(); 225 // Set saved_key_ to the seek key to target, with proper sequence number set. 226 // It might get adjusted if the seek key is smaller than iterator lower bound. 227 void SetSavedKeyToSeekTarget(const Slice& target); 228 // Set saved_key_ to the seek key to target, with proper sequence number set. 229 // It might get adjusted if the seek key is larger than iterator upper bound. 230 void SetSavedKeyToSeekForPrevTarget(const Slice& target); 231 bool FindValueForCurrentKey(); 232 bool FindValueForCurrentKeyUsingSeek(); 233 bool FindUserKeyBeforeSavedKey(); 234 // If `skipping_saved_key` is true, the function will keep iterating until it 235 // finds a user key that is larger than `saved_key_`. 236 // If `prefix` is not null, the iterator needs to stop when all keys for the 237 // prefix are exhausted and the iterator is set to invalid. 238 bool FindNextUserEntry(bool skipping_saved_key, const Slice* prefix); 239 // Internal implementation of FindNextUserEntry(). 240 bool FindNextUserEntryInternal(bool skipping_saved_key, const Slice* prefix); 241 bool ParseKey(ParsedInternalKey* key); 242 bool MergeValuesNewToOld(); 243 244 // If prefix is not null, we need to set the iterator to invalid if no more 245 // entry can be found within the prefix. 246 void PrevInternal(const Slice* prefix); 247 bool TooManyInternalKeysSkipped(bool increment = true); 248 bool IsVisible(SequenceNumber sequence, const Slice& ts, 249 bool* more_recent = nullptr); 250 251 // Temporarily pin the blocks that we encounter until ReleaseTempPinnedData() 252 // is called TempPinData()253 void TempPinData() { 254 if (!pin_thru_lifetime_) { 255 pinned_iters_mgr_.StartPinning(); 256 } 257 } 258 259 // Release blocks pinned by TempPinData() ReleaseTempPinnedData()260 void ReleaseTempPinnedData() { 261 if (!pin_thru_lifetime_ && pinned_iters_mgr_.PinningEnabled()) { 262 pinned_iters_mgr_.ReleasePinnedData(); 263 } 264 } 265 ClearSavedValue()266 inline void ClearSavedValue() { 267 if (saved_value_.capacity() > 1048576) { 268 std::string empty; 269 swap(empty, saved_value_); 270 } else { 271 saved_value_.clear(); 272 } 273 } 274 ResetInternalKeysSkippedCounter()275 inline void ResetInternalKeysSkippedCounter() { 276 local_stats_.skip_count_ += num_internal_keys_skipped_; 277 if (valid_) { 278 local_stats_.skip_count_--; 279 } 280 num_internal_keys_skipped_ = 0; 281 } 282 expect_total_order_inner_iter()283 bool expect_total_order_inner_iter() { 284 assert(expect_total_order_inner_iter_ || prefix_extractor_ != nullptr); 285 return expect_total_order_inner_iter_; 286 } 287 288 // If lower bound of timestamp is given by ReadOptions.iter_start_ts, we need 289 // to return versions of the same key. We cannot just skip if the key value 290 // is the same but timestamps are different but fall in timestamp range. CompareKeyForSkip(const Slice & a,const Slice & b)291 inline int CompareKeyForSkip(const Slice& a, const Slice& b) { 292 return timestamp_lb_ != nullptr 293 ? user_comparator_.Compare(a, b) 294 : user_comparator_.CompareWithoutTimestamp(a, b); 295 } 296 297 // Retrieves the blob value for the specified user key using the given blob 298 // index when using the integrated BlobDB implementation. 299 bool SetBlobValueIfNeeded(const Slice& user_key, const Slice& blob_index); 300 301 Status Merge(const Slice* val, const Slice& user_key); 302 303 const SliceTransform* prefix_extractor_; 304 Env* const env_; 305 SystemClock* clock_; 306 Logger* logger_; 307 UserComparatorWrapper user_comparator_; 308 const MergeOperator* const merge_operator_; 309 IteratorWrapper iter_; 310 const Version* version_; 311 ReadCallback* read_callback_; 312 // Max visible sequence number. It is normally the snapshot seq unless we have 313 // uncommitted data in db as in WriteUnCommitted. 314 SequenceNumber sequence_; 315 316 IterKey saved_key_; 317 // Reusable internal key data structure. This is only used inside one function 318 // and should not be used across functions. Reusing this object can reduce 319 // overhead of calling construction of the function if creating it each time. 320 ParsedInternalKey ikey_; 321 std::string saved_value_; 322 Slice pinned_value_; 323 // for prefix seek mode to support prev() 324 PinnableSlice blob_value_; 325 Statistics* statistics_; 326 uint64_t max_skip_; 327 uint64_t max_skippable_internal_keys_; 328 uint64_t num_internal_keys_skipped_; 329 const Slice* iterate_lower_bound_; 330 const Slice* iterate_upper_bound_; 331 332 // The prefix of the seek key. It is only used when prefix_same_as_start_ 333 // is true and prefix extractor is not null. In Next() or Prev(), current keys 334 // will be checked against this prefix, so that the iterator can be 335 // invalidated if the keys in this prefix has been exhausted. Set it using 336 // SetUserKey() and use it using GetUserKey(). 337 IterKey prefix_; 338 339 Status status_; 340 Direction direction_; 341 bool valid_; 342 bool current_entry_is_merged_; 343 // True if we know that the current entry's seqnum is 0. 344 // This information is used as that the next entry will be for another 345 // user key. 346 bool is_key_seqnum_zero_; 347 const bool prefix_same_as_start_; 348 // Means that we will pin all data blocks we read as long the Iterator 349 // is not deleted, will be true if ReadOptions::pin_data is true 350 const bool pin_thru_lifetime_; 351 // Expect the inner iterator to maintain a total order. 352 // prefix_extractor_ must be non-NULL if the value is false. 353 const bool expect_total_order_inner_iter_; 354 ReadTier read_tier_; 355 bool verify_checksums_; 356 // Whether the iterator is allowed to expose blob references. Set to true when 357 // the stacked BlobDB implementation is used, false otherwise. 358 bool expose_blob_index_; 359 bool is_blob_; 360 bool arena_mode_; 361 // List of operands for merge operator. 362 MergeContext merge_context_; 363 ReadRangeDelAggregator range_del_agg_; 364 LocalStatistics local_stats_; 365 PinnedIteratorsManager pinned_iters_mgr_; 366 #ifdef ROCKSDB_LITE 367 ROCKSDB_FIELD_UNUSED 368 #endif 369 DBImpl* db_impl_; 370 #ifdef ROCKSDB_LITE 371 ROCKSDB_FIELD_UNUSED 372 #endif 373 ColumnFamilyData* cfd_; 374 // for diff snapshots we want the lower bound on the seqnum; 375 // if this value > 0 iterator will return internal keys 376 SequenceNumber start_seqnum_; 377 const Slice* const timestamp_ub_; 378 const Slice* const timestamp_lb_; 379 const size_t timestamp_size_; 380 std::string saved_timestamp_; 381 }; 382 383 // Return a new iterator that converts internal keys (yielded by 384 // "*internal_iter") that were live at the specified `sequence` number 385 // into appropriate user keys. 386 extern Iterator* NewDBIterator( 387 Env* env, const ReadOptions& read_options, const ImmutableOptions& ioptions, 388 const MutableCFOptions& mutable_cf_options, 389 const Comparator* user_key_comparator, InternalIterator* internal_iter, 390 const Version* version, const SequenceNumber& sequence, 391 uint64_t max_sequential_skip_in_iterations, ReadCallback* read_callback, 392 DBImpl* db_impl = nullptr, ColumnFamilyData* cfd = nullptr, 393 bool expose_blob_index = false); 394 395 } // namespace ROCKSDB_NAMESPACE 396