1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #pragma once
11 #include <stdint.h>
12 #include <string>
13 #include "db/db_impl/db_impl.h"
14 #include "db/dbformat.h"
15 #include "db/range_del_aggregator.h"
16 #include "memory/arena.h"
17 #include "options/cf_options.h"
18 #include "rocksdb/db.h"
19 #include "rocksdb/iterator.h"
20 #include "table/iterator_wrapper.h"
21 #include "util/autovector.h"
22 
23 namespace rocksdb {
24 
25 // This file declares the factory functions of DBIter, in its original form
26 // or a wrapped form with class ArenaWrappedDBIter, which is defined here.
27 // Class DBIter, which is declared and implemented inside db_iter.cc, is
28 // a iterator that converts internal keys (yielded by an InternalIterator)
29 // that were live at the specified sequence number into appropriate user
30 // keys.
31 // Each internal key is consist of a user key, a sequence number, and a value
32 // type. DBIter deals with multiple key versions, tombstones, merge operands,
33 // etc, and exposes an Iterator.
34 // For example, DBIter may wrap following InternalIterator:
35 //    user key: AAA  value: v3   seqno: 100    type: Put
36 //    user key: AAA  value: v2   seqno: 97     type: Put
37 //    user key: AAA  value: v1   seqno: 95     type: Put
38 //    user key: BBB  value: v1   seqno: 90     type: Put
39 //    user key: BBC  value: N/A  seqno: 98     type: Delete
40 //    user key: BBC  value: v1   seqno: 95     type: Put
41 // If the snapshot passed in is 102, then the DBIter is expected to
42 // expose the following iterator:
43 //    key: AAA  value: v3
44 //    key: BBB  value: v1
45 // If the snapshot passed in is 96, then it should expose:
46 //    key: AAA  value: v1
47 //    key: BBB  value: v1
48 //    key: BBC  value: v1
49 //
50 
51 // Memtables and sstables that make the DB representation contain
52 // (userkey,seq,type) => uservalue entries.  DBIter
53 // combines multiple entries for the same userkey found in the DB
54 // representation into a single entry while accounting for sequence
55 // numbers, deletion markers, overwrites, etc.
56 class DBIter final : public Iterator {
57  public:
58   // The following is grossly complicated. TODO: clean it up
59   // Which direction is the iterator currently moving?
60   // (1) When moving forward:
61   //   (1a) if current_entry_is_merged_ = false, the internal iterator is
62   //        positioned at the exact entry that yields this->key(), this->value()
63   //   (1b) if current_entry_is_merged_ = true, the internal iterator is
64   //        positioned immediately after the last entry that contributed to the
65   //        current this->value(). That entry may or may not have key equal to
66   //        this->key().
67   // (2) When moving backwards, the internal iterator is positioned
68   //     just before all entries whose user key == this->key().
69   enum Direction { kForward, kReverse };
70 
71   // LocalStatistics contain Statistics counters that will be aggregated per
72   // each iterator instance and then will be sent to the global statistics when
73   // the iterator is destroyed.
74   //
75   // The purpose of this approach is to avoid perf regression happening
76   // when multiple threads bump the atomic counters from a DBIter::Next().
77   struct LocalStatistics {
LocalStatisticsLocalStatistics78     explicit LocalStatistics() { ResetCounters(); }
79 
ResetCountersLocalStatistics80     void ResetCounters() {
81       next_count_ = 0;
82       next_found_count_ = 0;
83       prev_count_ = 0;
84       prev_found_count_ = 0;
85       bytes_read_ = 0;
86       skip_count_ = 0;
87     }
88 
BumpGlobalStatisticsLocalStatistics89     void BumpGlobalStatistics(Statistics* global_statistics) {
90       RecordTick(global_statistics, NUMBER_DB_NEXT, next_count_);
91       RecordTick(global_statistics, NUMBER_DB_NEXT_FOUND, next_found_count_);
92       RecordTick(global_statistics, NUMBER_DB_PREV, prev_count_);
93       RecordTick(global_statistics, NUMBER_DB_PREV_FOUND, prev_found_count_);
94       RecordTick(global_statistics, ITER_BYTES_READ, bytes_read_);
95       RecordTick(global_statistics, NUMBER_ITER_SKIP, skip_count_);
96       PERF_COUNTER_ADD(iter_read_bytes, bytes_read_);
97       ResetCounters();
98     }
99 
100     // Map to Tickers::NUMBER_DB_NEXT
101     uint64_t next_count_;
102     // Map to Tickers::NUMBER_DB_NEXT_FOUND
103     uint64_t next_found_count_;
104     // Map to Tickers::NUMBER_DB_PREV
105     uint64_t prev_count_;
106     // Map to Tickers::NUMBER_DB_PREV_FOUND
107     uint64_t prev_found_count_;
108     // Map to Tickers::ITER_BYTES_READ
109     uint64_t bytes_read_;
110     // Map to Tickers::NUMBER_ITER_SKIP
111     uint64_t skip_count_;
112   };
113 
114   DBIter(Env* _env, const ReadOptions& read_options,
115          const ImmutableCFOptions& cf_options,
116          const MutableCFOptions& mutable_cf_options, const Comparator* cmp,
117          InternalIterator* iter, SequenceNumber s, bool arena_mode,
118          uint64_t max_sequential_skip_in_iterations,
119          ReadCallback* read_callback, DBImpl* db_impl, ColumnFamilyData* cfd,
120          bool allow_blob);
121 
122   // No copying allowed
123   DBIter(const DBIter&) = delete;
124   void operator=(const DBIter&) = delete;
125 
~DBIter()126   ~DBIter() override {
127     // Release pinned data if any
128     if (pinned_iters_mgr_.PinningEnabled()) {
129       pinned_iters_mgr_.ReleasePinnedData();
130     }
131     RecordTick(statistics_, NO_ITERATOR_DELETED);
132     ResetInternalKeysSkippedCounter();
133     local_stats_.BumpGlobalStatistics(statistics_);
134     iter_.DeleteIter(arena_mode_);
135   }
SetIter(InternalIterator * iter)136   virtual void SetIter(InternalIterator* iter) {
137     assert(iter_.iter() == nullptr);
138     iter_.Set(iter);
139     iter_.iter()->SetPinnedItersMgr(&pinned_iters_mgr_);
140   }
GetRangeDelAggregator()141   virtual ReadRangeDelAggregator* GetRangeDelAggregator() {
142     return &range_del_agg_;
143   }
144 
Valid()145   bool Valid() const override { return valid_; }
key()146   Slice key() const override {
147     assert(valid_);
148     if (start_seqnum_ > 0) {
149       return saved_key_.GetInternalKey();
150     } else {
151       return saved_key_.GetUserKey();
152     }
153   }
value()154   Slice value() const override {
155     assert(valid_);
156     if (current_entry_is_merged_) {
157       // If pinned_value_ is set then the result of merge operator is one of
158       // the merge operands and we should return it.
159       return pinned_value_.data() ? pinned_value_ : saved_value_;
160     } else if (direction_ == kReverse) {
161       return pinned_value_;
162     } else {
163       return iter_.value();
164     }
165   }
status()166   Status status() const override {
167     if (status_.ok()) {
168       return iter_.status();
169     } else {
170       assert(!valid_);
171       return status_;
172     }
173   }
IsBlob()174   bool IsBlob() const {
175     assert(valid_ && (allow_blob_ || !is_blob_));
176     return is_blob_;
177   }
178 
179   Status GetProperty(std::string prop_name, std::string* prop) override;
180 
181   void Next() final override;
182   void Prev() final override;
183   void Seek(const Slice& target) final override;
184   void SeekForPrev(const Slice& target) final override;
185   void SeekToFirst() final override;
186   void SeekToLast() final override;
env()187   Env* env() { return env_; }
set_sequence(uint64_t s)188   void set_sequence(uint64_t s) {
189     sequence_ = s;
190     if (read_callback_) {
191       read_callback_->Refresh(s);
192     }
193   }
set_valid(bool v)194   void set_valid(bool v) { valid_ = v; }
195 
196  private:
197   // For all methods in this block:
198   // PRE: iter_->Valid() && status_.ok()
199   // Return false if there was an error, and status() is non-ok, valid_ = false;
200   // in this case callers would usually stop what they were doing and return.
201   bool ReverseToForward();
202   bool ReverseToBackward();
203   // Set saved_key_ to the seek key to target, with proper sequence number set.
204   // It might get adjusted if the seek key is smaller than iterator lower bound.
205   void SetSavedKeyToSeekTarget(const Slice& /*target*/);
206   // Set saved_key_ to the seek key to target, with proper sequence number set.
207   // It might get adjusted if the seek key is larger than iterator upper bound.
208   void SetSavedKeyToSeekForPrevTarget(const Slice& /*target*/);
209   bool FindValueForCurrentKey();
210   bool FindValueForCurrentKeyUsingSeek();
211   bool FindUserKeyBeforeSavedKey();
212   // If `skipping_saved_key` is true, the function will keep iterating until it
213   // finds a user key that is larger than `saved_key_`.
214   // If `prefix` is not null, the iterator needs to stop when all keys for the
215   // prefix are exhausted and the interator is set to invalid.
216   bool FindNextUserEntry(bool skipping_saved_key, const Slice* prefix);
217   // Internal implementation of FindNextUserEntry().
218   bool FindNextUserEntryInternal(bool skipping_saved_key, const Slice* prefix);
219   bool ParseKey(ParsedInternalKey* key);
220   bool MergeValuesNewToOld();
221 
222   // If prefix is not null, we need to set the iterator to invalid if no more
223   // entry can be found within the prefix.
224   void PrevInternal(const Slice* /*prefix*/);
225   bool TooManyInternalKeysSkipped(bool increment = true);
226   bool IsVisible(SequenceNumber sequence);
227 
228   // Temporarily pin the blocks that we encounter until ReleaseTempPinnedData()
229   // is called
TempPinData()230   void TempPinData() {
231     if (!pin_thru_lifetime_) {
232       pinned_iters_mgr_.StartPinning();
233     }
234   }
235 
236   // Release blocks pinned by TempPinData()
ReleaseTempPinnedData()237   void ReleaseTempPinnedData() {
238     if (!pin_thru_lifetime_ && pinned_iters_mgr_.PinningEnabled()) {
239       pinned_iters_mgr_.ReleasePinnedData();
240     }
241   }
242 
ClearSavedValue()243   inline void ClearSavedValue() {
244     if (saved_value_.capacity() > 1048576) {
245       std::string empty;
246       swap(empty, saved_value_);
247     } else {
248       saved_value_.clear();
249     }
250   }
251 
ResetInternalKeysSkippedCounter()252   inline void ResetInternalKeysSkippedCounter() {
253     local_stats_.skip_count_ += num_internal_keys_skipped_;
254     if (valid_) {
255       local_stats_.skip_count_--;
256     }
257     num_internal_keys_skipped_ = 0;
258   }
259 
260   const SliceTransform* prefix_extractor_;
261   Env* const env_;
262   Logger* logger_;
263   UserComparatorWrapper user_comparator_;
264   const MergeOperator* const merge_operator_;
265   IteratorWrapper iter_;
266   ReadCallback* read_callback_;
267   // Max visible sequence number. It is normally the snapshot seq unless we have
268   // uncommitted data in db as in WriteUnCommitted.
269   SequenceNumber sequence_;
270 
271   IterKey saved_key_;
272   // Reusable internal key data structure. This is only used inside one function
273   // and should not be used across functions. Reusing this object can reduce
274   // overhead of calling construction of the function if creating it each time.
275   ParsedInternalKey ikey_;
276   std::string saved_value_;
277   Slice pinned_value_;
278   // for prefix seek mode to support prev()
279   Statistics* statistics_;
280   uint64_t max_skip_;
281   uint64_t max_skippable_internal_keys_;
282   uint64_t num_internal_keys_skipped_;
283   const Slice* iterate_lower_bound_;
284   const Slice* iterate_upper_bound_;
285 
286   // The prefix of the seek key. It is only used when prefix_same_as_start_
287   // is true and prefix extractor is not null. In Next() or Prev(), current keys
288   // will be checked against this prefix, so that the iterator can be
289   // invalidated if the keys in this prefix has been exhausted. Set it using
290   // SetUserKey() and use it using GetUserKey().
291   IterKey prefix_;
292 
293   Status status_;
294   Direction direction_;
295   bool valid_;
296   bool current_entry_is_merged_;
297   // True if we know that the current entry's seqnum is 0.
298   // This information is used as that the next entry will be for another
299   // user key.
300   bool is_key_seqnum_zero_;
301   const bool prefix_same_as_start_;
302   // Means that we will pin all data blocks we read as long the Iterator
303   // is not deleted, will be true if ReadOptions::pin_data is true
304   const bool pin_thru_lifetime_;
305   const bool total_order_seek_;
306   bool allow_blob_;
307   bool is_blob_;
308   bool arena_mode_;
309   // List of operands for merge operator.
310   MergeContext merge_context_;
311   ReadRangeDelAggregator range_del_agg_;
312   LocalStatistics local_stats_;
313   PinnedIteratorsManager pinned_iters_mgr_;
314 #ifdef ROCKSDB_LITE
315   ROCKSDB_FIELD_UNUSED
316 #endif
317   DBImpl* db_impl_;
318 #ifdef ROCKSDB_LITE
319   ROCKSDB_FIELD_UNUSED
320 #endif
321   ColumnFamilyData* cfd_;
322   // for diff snapshots we want the lower bound on the seqnum;
323   // if this value > 0 iterator will return internal keys
324   SequenceNumber start_seqnum_;
325 };
326 // Return a new iterator that converts internal keys (yielded by
327 // "*internal_iter") that were live at the specified `sequence` number
328 // into appropriate user keys.
329 extern Iterator* NewDBIterator(
330     Env* env, const ReadOptions& read_options,
331     const ImmutableCFOptions& cf_options,
332     const MutableCFOptions& mutable_cf_options,
333     const Comparator* user_key_comparator, InternalIterator* internal_iter,
334     const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations,
335     ReadCallback* read_callback, DBImpl* db_impl = nullptr,
336     ColumnFamilyData* cfd = nullptr, bool allow_blob = false);
337 
338 }  // namespace rocksdb
339