1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #pragma once
11 #include <cstdint>
12 #include <string>
13 
14 #include "db/db_impl/db_impl.h"
15 #include "db/range_del_aggregator.h"
16 #include "memory/arena.h"
17 #include "options/cf_options.h"
18 #include "rocksdb/db.h"
19 #include "rocksdb/iterator.h"
20 #include "table/iterator_wrapper.h"
21 #include "util/autovector.h"
22 
23 namespace ROCKSDB_NAMESPACE {
24 class Version;
25 
26 // This file declares the factory functions of DBIter, in its original form
27 // or a wrapped form with class ArenaWrappedDBIter, which is defined here.
28 // Class DBIter, which is declared and implemented inside db_iter.cc, is
29 // an iterator that converts internal keys (yielded by an InternalIterator)
30 // that were live at the specified sequence number into appropriate user
31 // keys.
32 // Each internal key consists of a user key, a sequence number, and a value
33 // type. DBIter deals with multiple key versions, tombstones, merge operands,
34 // etc, and exposes an Iterator.
35 // For example, DBIter may wrap following InternalIterator:
36 //    user key: AAA  value: v3   seqno: 100    type: Put
37 //    user key: AAA  value: v2   seqno: 97     type: Put
38 //    user key: AAA  value: v1   seqno: 95     type: Put
39 //    user key: BBB  value: v1   seqno: 90     type: Put
40 //    user key: BBC  value: N/A  seqno: 98     type: Delete
41 //    user key: BBC  value: v1   seqno: 95     type: Put
42 // If the snapshot passed in is 102, then the DBIter is expected to
43 // expose the following iterator:
44 //    key: AAA  value: v3
45 //    key: BBB  value: v1
46 // If the snapshot passed in is 96, then it should expose:
47 //    key: AAA  value: v1
48 //    key: BBB  value: v1
49 //    key: BBC  value: v1
50 //
51 
52 // Memtables and sstables that make the DB representation contain
53 // (userkey,seq,type) => uservalue entries.  DBIter
54 // combines multiple entries for the same userkey found in the DB
55 // representation into a single entry while accounting for sequence
56 // numbers, deletion markers, overwrites, etc.
57 class DBIter final : public Iterator {
58  public:
59   // The following is grossly complicated. TODO: clean it up
60   // Which direction is the iterator currently moving?
61   // (1) When moving forward:
62   //   (1a) if current_entry_is_merged_ = false, the internal iterator is
63   //        positioned at the exact entry that yields this->key(), this->value()
64   //   (1b) if current_entry_is_merged_ = true, the internal iterator is
65   //        positioned immediately after the last entry that contributed to the
66   //        current this->value(). That entry may or may not have key equal to
67   //        this->key().
68   // (2) When moving backwards, the internal iterator is positioned
69   //     just before all entries whose user key == this->key().
70   enum Direction : uint8_t { kForward, kReverse };
71 
72   // LocalStatistics contain Statistics counters that will be aggregated per
73   // each iterator instance and then will be sent to the global statistics when
74   // the iterator is destroyed.
75   //
76   // The purpose of this approach is to avoid perf regression happening
77   // when multiple threads bump the atomic counters from a DBIter::Next().
78   struct LocalStatistics {
LocalStatisticsLocalStatistics79     explicit LocalStatistics() { ResetCounters(); }
80 
ResetCountersLocalStatistics81     void ResetCounters() {
82       next_count_ = 0;
83       next_found_count_ = 0;
84       prev_count_ = 0;
85       prev_found_count_ = 0;
86       bytes_read_ = 0;
87       skip_count_ = 0;
88     }
89 
BumpGlobalStatisticsLocalStatistics90     void BumpGlobalStatistics(Statistics* global_statistics) {
91       RecordTick(global_statistics, NUMBER_DB_NEXT, next_count_);
92       RecordTick(global_statistics, NUMBER_DB_NEXT_FOUND, next_found_count_);
93       RecordTick(global_statistics, NUMBER_DB_PREV, prev_count_);
94       RecordTick(global_statistics, NUMBER_DB_PREV_FOUND, prev_found_count_);
95       RecordTick(global_statistics, ITER_BYTES_READ, bytes_read_);
96       RecordTick(global_statistics, NUMBER_ITER_SKIP, skip_count_);
97       PERF_COUNTER_ADD(iter_read_bytes, bytes_read_);
98       ResetCounters();
99     }
100 
101     // Map to Tickers::NUMBER_DB_NEXT
102     uint64_t next_count_;
103     // Map to Tickers::NUMBER_DB_NEXT_FOUND
104     uint64_t next_found_count_;
105     // Map to Tickers::NUMBER_DB_PREV
106     uint64_t prev_count_;
107     // Map to Tickers::NUMBER_DB_PREV_FOUND
108     uint64_t prev_found_count_;
109     // Map to Tickers::ITER_BYTES_READ
110     uint64_t bytes_read_;
111     // Map to Tickers::NUMBER_ITER_SKIP
112     uint64_t skip_count_;
113   };
114 
115   DBIter(Env* _env, const ReadOptions& read_options,
116          const ImmutableOptions& ioptions,
117          const MutableCFOptions& mutable_cf_options, const Comparator* cmp,
118          InternalIterator* iter, const Version* version, SequenceNumber s,
119          bool arena_mode, uint64_t max_sequential_skip_in_iterations,
120          ReadCallback* read_callback, DBImpl* db_impl, ColumnFamilyData* cfd,
121          bool expose_blob_index);
122 
123   // No copying allowed
124   DBIter(const DBIter&) = delete;
125   void operator=(const DBIter&) = delete;
126 
~DBIter()127   ~DBIter() override {
128     // Release pinned data if any
129     if (pinned_iters_mgr_.PinningEnabled()) {
130       pinned_iters_mgr_.ReleasePinnedData();
131     }
132     RecordTick(statistics_, NO_ITERATOR_DELETED);
133     ResetInternalKeysSkippedCounter();
134     local_stats_.BumpGlobalStatistics(statistics_);
135     iter_.DeleteIter(arena_mode_);
136   }
SetIter(InternalIterator * iter)137   void SetIter(InternalIterator* iter) {
138     assert(iter_.iter() == nullptr);
139     iter_.Set(iter);
140     iter_.iter()->SetPinnedItersMgr(&pinned_iters_mgr_);
141   }
GetRangeDelAggregator()142   ReadRangeDelAggregator* GetRangeDelAggregator() { return &range_del_agg_; }
143 
Valid()144   bool Valid() const override {
145 #ifdef ROCKSDB_ASSERT_STATUS_CHECKED
146     if (valid_) {
147       status_.PermitUncheckedError();
148     }
149 #endif  // ROCKSDB_ASSERT_STATUS_CHECKED
150     return valid_;
151   }
key()152   Slice key() const override {
153     assert(valid_);
154     if (start_seqnum_ > 0 || timestamp_lb_) {
155       return saved_key_.GetInternalKey();
156     } else {
157       const Slice ukey_and_ts = saved_key_.GetUserKey();
158       return Slice(ukey_and_ts.data(), ukey_and_ts.size() - timestamp_size_);
159     }
160   }
value()161   Slice value() const override {
162     assert(valid_);
163 
164     if (!expose_blob_index_ && is_blob_) {
165       return blob_value_;
166     } else if (current_entry_is_merged_) {
167       // If pinned_value_ is set then the result of merge operator is one of
168       // the merge operands and we should return it.
169       return pinned_value_.data() ? pinned_value_ : saved_value_;
170     } else if (direction_ == kReverse) {
171       return pinned_value_;
172     } else {
173       return iter_.value();
174     }
175   }
status()176   Status status() const override {
177     if (status_.ok()) {
178       return iter_.status();
179     } else {
180       assert(!valid_);
181       return status_;
182     }
183   }
timestamp()184   Slice timestamp() const override {
185     assert(valid_);
186     assert(timestamp_size_ > 0);
187     if (direction_ == kReverse) {
188       return saved_timestamp_;
189     }
190     const Slice ukey_and_ts = saved_key_.GetUserKey();
191     assert(timestamp_size_ < ukey_and_ts.size());
192     return ExtractTimestampFromUserKey(ukey_and_ts, timestamp_size_);
193   }
IsBlob()194   bool IsBlob() const {
195     assert(valid_);
196     return is_blob_;
197   }
198 
199   Status GetProperty(std::string prop_name, std::string* prop) override;
200 
201   void Next() final override;
202   void Prev() final override;
203   // 'target' does not contain timestamp, even if user timestamp feature is
204   // enabled.
205   void Seek(const Slice& target) final override;
206   void SeekForPrev(const Slice& target) final override;
207   void SeekToFirst() final override;
208   void SeekToLast() final override;
env()209   Env* env() const { return env_; }
set_sequence(uint64_t s)210   void set_sequence(uint64_t s) {
211     sequence_ = s;
212     if (read_callback_) {
213       read_callback_->Refresh(s);
214     }
215   }
set_valid(bool v)216   void set_valid(bool v) { valid_ = v; }
217 
218  private:
219   // For all methods in this block:
220   // PRE: iter_->Valid() && status_.ok()
221   // Return false if there was an error, and status() is non-ok, valid_ = false;
222   // in this case callers would usually stop what they were doing and return.
223   bool ReverseToForward();
224   bool ReverseToBackward();
225   // Set saved_key_ to the seek key to target, with proper sequence number set.
226   // It might get adjusted if the seek key is smaller than iterator lower bound.
227   void SetSavedKeyToSeekTarget(const Slice& target);
228   // Set saved_key_ to the seek key to target, with proper sequence number set.
229   // It might get adjusted if the seek key is larger than iterator upper bound.
230   void SetSavedKeyToSeekForPrevTarget(const Slice& target);
231   bool FindValueForCurrentKey();
232   bool FindValueForCurrentKeyUsingSeek();
233   bool FindUserKeyBeforeSavedKey();
234   // If `skipping_saved_key` is true, the function will keep iterating until it
235   // finds a user key that is larger than `saved_key_`.
236   // If `prefix` is not null, the iterator needs to stop when all keys for the
237   // prefix are exhausted and the iterator is set to invalid.
238   bool FindNextUserEntry(bool skipping_saved_key, const Slice* prefix);
239   // Internal implementation of FindNextUserEntry().
240   bool FindNextUserEntryInternal(bool skipping_saved_key, const Slice* prefix);
241   bool ParseKey(ParsedInternalKey* key);
242   bool MergeValuesNewToOld();
243 
244   // If prefix is not null, we need to set the iterator to invalid if no more
245   // entry can be found within the prefix.
246   void PrevInternal(const Slice* prefix);
247   bool TooManyInternalKeysSkipped(bool increment = true);
248   bool IsVisible(SequenceNumber sequence, const Slice& ts,
249                  bool* more_recent = nullptr);
250 
251   // Temporarily pin the blocks that we encounter until ReleaseTempPinnedData()
252   // is called
TempPinData()253   void TempPinData() {
254     if (!pin_thru_lifetime_) {
255       pinned_iters_mgr_.StartPinning();
256     }
257   }
258 
259   // Release blocks pinned by TempPinData()
ReleaseTempPinnedData()260   void ReleaseTempPinnedData() {
261     if (!pin_thru_lifetime_ && pinned_iters_mgr_.PinningEnabled()) {
262       pinned_iters_mgr_.ReleasePinnedData();
263     }
264   }
265 
ClearSavedValue()266   inline void ClearSavedValue() {
267     if (saved_value_.capacity() > 1048576) {
268       std::string empty;
269       swap(empty, saved_value_);
270     } else {
271       saved_value_.clear();
272     }
273   }
274 
ResetInternalKeysSkippedCounter()275   inline void ResetInternalKeysSkippedCounter() {
276     local_stats_.skip_count_ += num_internal_keys_skipped_;
277     if (valid_) {
278       local_stats_.skip_count_--;
279     }
280     num_internal_keys_skipped_ = 0;
281   }
282 
expect_total_order_inner_iter()283   bool expect_total_order_inner_iter() {
284     assert(expect_total_order_inner_iter_ || prefix_extractor_ != nullptr);
285     return expect_total_order_inner_iter_;
286   }
287 
288   // If lower bound of timestamp is given by ReadOptions.iter_start_ts, we need
289   // to return versions of the same key. We cannot just skip if the key value
290   // is the same but timestamps are different but fall in timestamp range.
CompareKeyForSkip(const Slice & a,const Slice & b)291   inline int CompareKeyForSkip(const Slice& a, const Slice& b) {
292     return timestamp_lb_ != nullptr
293                ? user_comparator_.Compare(a, b)
294                : user_comparator_.CompareWithoutTimestamp(a, b);
295   }
296 
297   // Retrieves the blob value for the specified user key using the given blob
298   // index when using the integrated BlobDB implementation.
299   bool SetBlobValueIfNeeded(const Slice& user_key, const Slice& blob_index);
300 
301   Status Merge(const Slice* val, const Slice& user_key);
302 
303   const SliceTransform* prefix_extractor_;
304   Env* const env_;
305   SystemClock* clock_;
306   Logger* logger_;
307   UserComparatorWrapper user_comparator_;
308   const MergeOperator* const merge_operator_;
309   IteratorWrapper iter_;
310   const Version* version_;
311   ReadCallback* read_callback_;
312   // Max visible sequence number. It is normally the snapshot seq unless we have
313   // uncommitted data in db as in WriteUnCommitted.
314   SequenceNumber sequence_;
315 
316   IterKey saved_key_;
317   // Reusable internal key data structure. This is only used inside one function
318   // and should not be used across functions. Reusing this object can reduce
319   // overhead of calling construction of the function if creating it each time.
320   ParsedInternalKey ikey_;
321   std::string saved_value_;
322   Slice pinned_value_;
323   // for prefix seek mode to support prev()
324   PinnableSlice blob_value_;
325   Statistics* statistics_;
326   uint64_t max_skip_;
327   uint64_t max_skippable_internal_keys_;
328   uint64_t num_internal_keys_skipped_;
329   const Slice* iterate_lower_bound_;
330   const Slice* iterate_upper_bound_;
331 
332   // The prefix of the seek key. It is only used when prefix_same_as_start_
333   // is true and prefix extractor is not null. In Next() or Prev(), current keys
334   // will be checked against this prefix, so that the iterator can be
335   // invalidated if the keys in this prefix has been exhausted. Set it using
336   // SetUserKey() and use it using GetUserKey().
337   IterKey prefix_;
338 
339   Status status_;
340   Direction direction_;
341   bool valid_;
342   bool current_entry_is_merged_;
343   // True if we know that the current entry's seqnum is 0.
344   // This information is used as that the next entry will be for another
345   // user key.
346   bool is_key_seqnum_zero_;
347   const bool prefix_same_as_start_;
348   // Means that we will pin all data blocks we read as long the Iterator
349   // is not deleted, will be true if ReadOptions::pin_data is true
350   const bool pin_thru_lifetime_;
351   // Expect the inner iterator to maintain a total order.
352   // prefix_extractor_ must be non-NULL if the value is false.
353   const bool expect_total_order_inner_iter_;
354   ReadTier read_tier_;
355   bool verify_checksums_;
356   // Whether the iterator is allowed to expose blob references. Set to true when
357   // the stacked BlobDB implementation is used, false otherwise.
358   bool expose_blob_index_;
359   bool is_blob_;
360   bool arena_mode_;
361   // List of operands for merge operator.
362   MergeContext merge_context_;
363   ReadRangeDelAggregator range_del_agg_;
364   LocalStatistics local_stats_;
365   PinnedIteratorsManager pinned_iters_mgr_;
366 #ifdef ROCKSDB_LITE
367   ROCKSDB_FIELD_UNUSED
368 #endif
369   DBImpl* db_impl_;
370 #ifdef ROCKSDB_LITE
371   ROCKSDB_FIELD_UNUSED
372 #endif
373   ColumnFamilyData* cfd_;
374   // for diff snapshots we want the lower bound on the seqnum;
375   // if this value > 0 iterator will return internal keys
376   SequenceNumber start_seqnum_;
377   const Slice* const timestamp_ub_;
378   const Slice* const timestamp_lb_;
379   const size_t timestamp_size_;
380   std::string saved_timestamp_;
381 };
382 
383 // Return a new iterator that converts internal keys (yielded by
384 // "*internal_iter") that were live at the specified `sequence` number
385 // into appropriate user keys.
386 extern Iterator* NewDBIterator(
387     Env* env, const ReadOptions& read_options, const ImmutableOptions& ioptions,
388     const MutableCFOptions& mutable_cf_options,
389     const Comparator* user_key_comparator, InternalIterator* internal_iter,
390     const Version* version, const SequenceNumber& sequence,
391     uint64_t max_sequential_skip_in_iterations, ReadCallback* read_callback,
392     DBImpl* db_impl = nullptr, ColumnFamilyData* cfd = nullptr,
393     bool expose_blob_index = false);
394 
395 }  // namespace ROCKSDB_NAMESPACE
396