1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #pragma once
11 #include <stdint.h>
12 #include <string>
13 #include "db/db_impl/db_impl.h"
14 #include "db/dbformat.h"
15 #include "db/range_del_aggregator.h"
16 #include "memory/arena.h"
17 #include "options/cf_options.h"
18 #include "rocksdb/db.h"
19 #include "rocksdb/iterator.h"
20 #include "table/iterator_wrapper.h"
21 #include "util/autovector.h"
22 
23 namespace ROCKSDB_NAMESPACE {
24 
25 // This file declares the factory functions of DBIter, in its original form
26 // or a wrapped form with class ArenaWrappedDBIter, which is defined here.
27 // Class DBIter, which is declared and implemented inside db_iter.cc, is
28 // an iterator that converts internal keys (yielded by an InternalIterator)
29 // that were live at the specified sequence number into appropriate user
30 // keys.
31 // Each internal key consists of a user key, a sequence number, and a value
32 // type. DBIter deals with multiple key versions, tombstones, merge operands,
33 // etc, and exposes an Iterator.
34 // For example, DBIter may wrap following InternalIterator:
35 //    user key: AAA  value: v3   seqno: 100    type: Put
36 //    user key: AAA  value: v2   seqno: 97     type: Put
37 //    user key: AAA  value: v1   seqno: 95     type: Put
38 //    user key: BBB  value: v1   seqno: 90     type: Put
39 //    user key: BBC  value: N/A  seqno: 98     type: Delete
40 //    user key: BBC  value: v1   seqno: 95     type: Put
41 // If the snapshot passed in is 102, then the DBIter is expected to
42 // expose the following iterator:
43 //    key: AAA  value: v3
44 //    key: BBB  value: v1
45 // If the snapshot passed in is 96, then it should expose:
46 //    key: AAA  value: v1
47 //    key: BBB  value: v1
48 //    key: BBC  value: v1
49 //
50 
51 // Memtables and sstables that make the DB representation contain
52 // (userkey,seq,type) => uservalue entries.  DBIter
53 // combines multiple entries for the same userkey found in the DB
54 // representation into a single entry while accounting for sequence
55 // numbers, deletion markers, overwrites, etc.
56 class DBIter final : public Iterator {
57  public:
58   // The following is grossly complicated. TODO: clean it up
59   // Which direction is the iterator currently moving?
60   // (1) When moving forward:
61   //   (1a) if current_entry_is_merged_ = false, the internal iterator is
62   //        positioned at the exact entry that yields this->key(), this->value()
63   //   (1b) if current_entry_is_merged_ = true, the internal iterator is
64   //        positioned immediately after the last entry that contributed to the
65   //        current this->value(). That entry may or may not have key equal to
66   //        this->key().
67   // (2) When moving backwards, the internal iterator is positioned
68   //     just before all entries whose user key == this->key().
69   enum Direction { kForward, kReverse };
70 
71   // LocalStatistics contain Statistics counters that will be aggregated per
72   // each iterator instance and then will be sent to the global statistics when
73   // the iterator is destroyed.
74   //
75   // The purpose of this approach is to avoid perf regression happening
76   // when multiple threads bump the atomic counters from a DBIter::Next().
77   struct LocalStatistics {
LocalStatisticsLocalStatistics78     explicit LocalStatistics() { ResetCounters(); }
79 
ResetCountersLocalStatistics80     void ResetCounters() {
81       next_count_ = 0;
82       next_found_count_ = 0;
83       prev_count_ = 0;
84       prev_found_count_ = 0;
85       bytes_read_ = 0;
86       skip_count_ = 0;
87     }
88 
BumpGlobalStatisticsLocalStatistics89     void BumpGlobalStatistics(Statistics* global_statistics) {
90       RecordTick(global_statistics, NUMBER_DB_NEXT, next_count_);
91       RecordTick(global_statistics, NUMBER_DB_NEXT_FOUND, next_found_count_);
92       RecordTick(global_statistics, NUMBER_DB_PREV, prev_count_);
93       RecordTick(global_statistics, NUMBER_DB_PREV_FOUND, prev_found_count_);
94       RecordTick(global_statistics, ITER_BYTES_READ, bytes_read_);
95       RecordTick(global_statistics, NUMBER_ITER_SKIP, skip_count_);
96       PERF_COUNTER_ADD(iter_read_bytes, bytes_read_);
97       ResetCounters();
98     }
99 
100     // Map to Tickers::NUMBER_DB_NEXT
101     uint64_t next_count_;
102     // Map to Tickers::NUMBER_DB_NEXT_FOUND
103     uint64_t next_found_count_;
104     // Map to Tickers::NUMBER_DB_PREV
105     uint64_t prev_count_;
106     // Map to Tickers::NUMBER_DB_PREV_FOUND
107     uint64_t prev_found_count_;
108     // Map to Tickers::ITER_BYTES_READ
109     uint64_t bytes_read_;
110     // Map to Tickers::NUMBER_ITER_SKIP
111     uint64_t skip_count_;
112   };
113 
114   DBIter(Env* _env, const ReadOptions& read_options,
115          const ImmutableCFOptions& cf_options,
116          const MutableCFOptions& mutable_cf_options, const Comparator* cmp,
117          InternalIterator* iter, SequenceNumber s, bool arena_mode,
118          uint64_t max_sequential_skip_in_iterations,
119          ReadCallback* read_callback, DBImpl* db_impl, ColumnFamilyData* cfd,
120          bool allow_blob);
121 
122   // No copying allowed
123   DBIter(const DBIter&) = delete;
124   void operator=(const DBIter&) = delete;
125 
~DBIter()126   ~DBIter() override {
127     // Release pinned data if any
128     if (pinned_iters_mgr_.PinningEnabled()) {
129       pinned_iters_mgr_.ReleasePinnedData();
130     }
131     RecordTick(statistics_, NO_ITERATOR_DELETED);
132     ResetInternalKeysSkippedCounter();
133     local_stats_.BumpGlobalStatistics(statistics_);
134     iter_.DeleteIter(arena_mode_);
135   }
SetIter(InternalIterator * iter)136   void SetIter(InternalIterator* iter) {
137     assert(iter_.iter() == nullptr);
138     iter_.Set(iter);
139     iter_.iter()->SetPinnedItersMgr(&pinned_iters_mgr_);
140   }
GetRangeDelAggregator()141   ReadRangeDelAggregator* GetRangeDelAggregator() { return &range_del_agg_; }
142 
Valid()143   bool Valid() const override { return valid_; }
key()144   Slice key() const override {
145     assert(valid_);
146     if (start_seqnum_ > 0) {
147       return saved_key_.GetInternalKey();
148     } else {
149       return saved_key_.GetUserKey();
150     }
151   }
value()152   Slice value() const override {
153     assert(valid_);
154     if (current_entry_is_merged_) {
155       // If pinned_value_ is set then the result of merge operator is one of
156       // the merge operands and we should return it.
157       return pinned_value_.data() ? pinned_value_ : saved_value_;
158     } else if (direction_ == kReverse) {
159       return pinned_value_;
160     } else {
161       return iter_.value();
162     }
163   }
status()164   Status status() const override {
165     if (status_.ok()) {
166       return iter_.status();
167     } else {
168       assert(!valid_);
169       return status_;
170     }
171   }
IsBlob()172   bool IsBlob() const {
173     assert(valid_ && (allow_blob_ || !is_blob_));
174     return is_blob_;
175   }
176 
177   Status GetProperty(std::string prop_name, std::string* prop) override;
178 
179   void Next() final override;
180   void Prev() final override;
181   void Seek(const Slice& target) final override;
182   void SeekForPrev(const Slice& target) final override;
183   void SeekToFirst() final override;
184   void SeekToLast() final override;
env()185   Env* env() const { return env_; }
set_sequence(uint64_t s)186   void set_sequence(uint64_t s) {
187     sequence_ = s;
188     if (read_callback_) {
189       read_callback_->Refresh(s);
190     }
191   }
set_valid(bool v)192   void set_valid(bool v) { valid_ = v; }
193 
194  private:
195   // For all methods in this block:
196   // PRE: iter_->Valid() && status_.ok()
197   // Return false if there was an error, and status() is non-ok, valid_ = false;
198   // in this case callers would usually stop what they were doing and return.
199   bool ReverseToForward();
200   bool ReverseToBackward();
201   // Set saved_key_ to the seek key to target, with proper sequence number set.
202   // It might get adjusted if the seek key is smaller than iterator lower bound.
203   void SetSavedKeyToSeekTarget(const Slice& target);
204   // Set saved_key_ to the seek key to target, with proper sequence number set.
205   // It might get adjusted if the seek key is larger than iterator upper bound.
206   void SetSavedKeyToSeekForPrevTarget(const Slice& target);
207   bool FindValueForCurrentKey();
208   bool FindValueForCurrentKeyUsingSeek();
209   bool FindUserKeyBeforeSavedKey();
210   // If `skipping_saved_key` is true, the function will keep iterating until it
211   // finds a user key that is larger than `saved_key_`.
212   // If `prefix` is not null, the iterator needs to stop when all keys for the
213   // prefix are exhausted and the interator is set to invalid.
214   bool FindNextUserEntry(bool skipping_saved_key, const Slice* prefix);
215   // Internal implementation of FindNextUserEntry().
216   bool FindNextUserEntryInternal(bool skipping_saved_key, const Slice* prefix);
217   bool ParseKey(ParsedInternalKey* key);
218   bool MergeValuesNewToOld();
219 
220   // If prefix is not null, we need to set the iterator to invalid if no more
221   // entry can be found within the prefix.
222   void PrevInternal(const Slice* prefix);
223   bool TooManyInternalKeysSkipped(bool increment = true);
224   bool IsVisible(SequenceNumber sequence);
225 
226   // Temporarily pin the blocks that we encounter until ReleaseTempPinnedData()
227   // is called
TempPinData()228   void TempPinData() {
229     if (!pin_thru_lifetime_) {
230       pinned_iters_mgr_.StartPinning();
231     }
232   }
233 
234   // Release blocks pinned by TempPinData()
ReleaseTempPinnedData()235   void ReleaseTempPinnedData() {
236     if (!pin_thru_lifetime_ && pinned_iters_mgr_.PinningEnabled()) {
237       pinned_iters_mgr_.ReleasePinnedData();
238     }
239   }
240 
ClearSavedValue()241   inline void ClearSavedValue() {
242     if (saved_value_.capacity() > 1048576) {
243       std::string empty;
244       swap(empty, saved_value_);
245     } else {
246       saved_value_.clear();
247     }
248   }
249 
ResetInternalKeysSkippedCounter()250   inline void ResetInternalKeysSkippedCounter() {
251     local_stats_.skip_count_ += num_internal_keys_skipped_;
252     if (valid_) {
253       local_stats_.skip_count_--;
254     }
255     num_internal_keys_skipped_ = 0;
256   }
257 
expect_total_order_inner_iter()258   bool expect_total_order_inner_iter() {
259     assert(expect_total_order_inner_iter_ || prefix_extractor_ != nullptr);
260     return expect_total_order_inner_iter_;
261   }
262 
263   const SliceTransform* prefix_extractor_;
264   Env* const env_;
265   Logger* logger_;
266   UserComparatorWrapper user_comparator_;
267   const MergeOperator* const merge_operator_;
268   IteratorWrapper iter_;
269   ReadCallback* read_callback_;
270   // Max visible sequence number. It is normally the snapshot seq unless we have
271   // uncommitted data in db as in WriteUnCommitted.
272   SequenceNumber sequence_;
273 
274   IterKey saved_key_;
275   // Reusable internal key data structure. This is only used inside one function
276   // and should not be used across functions. Reusing this object can reduce
277   // overhead of calling construction of the function if creating it each time.
278   ParsedInternalKey ikey_;
279   std::string saved_value_;
280   Slice pinned_value_;
281   // for prefix seek mode to support prev()
282   Statistics* statistics_;
283   uint64_t max_skip_;
284   uint64_t max_skippable_internal_keys_;
285   uint64_t num_internal_keys_skipped_;
286   const Slice* iterate_lower_bound_;
287   const Slice* iterate_upper_bound_;
288 
289   // The prefix of the seek key. It is only used when prefix_same_as_start_
290   // is true and prefix extractor is not null. In Next() or Prev(), current keys
291   // will be checked against this prefix, so that the iterator can be
292   // invalidated if the keys in this prefix has been exhausted. Set it using
293   // SetUserKey() and use it using GetUserKey().
294   IterKey prefix_;
295 
296   Status status_;
297   Direction direction_;
298   bool valid_;
299   bool current_entry_is_merged_;
300   // True if we know that the current entry's seqnum is 0.
301   // This information is used as that the next entry will be for another
302   // user key.
303   bool is_key_seqnum_zero_;
304   const bool prefix_same_as_start_;
305   // Means that we will pin all data blocks we read as long the Iterator
306   // is not deleted, will be true if ReadOptions::pin_data is true
307   const bool pin_thru_lifetime_;
308   // Expect the inner iterator to maintain a total order.
309   // prefix_extractor_ must be non-NULL if the value is false.
310   const bool expect_total_order_inner_iter_;
311   bool allow_blob_;
312   bool is_blob_;
313   bool arena_mode_;
314   // List of operands for merge operator.
315   MergeContext merge_context_;
316   ReadRangeDelAggregator range_del_agg_;
317   LocalStatistics local_stats_;
318   PinnedIteratorsManager pinned_iters_mgr_;
319 #ifdef ROCKSDB_LITE
320   ROCKSDB_FIELD_UNUSED
321 #endif
322   DBImpl* db_impl_;
323 #ifdef ROCKSDB_LITE
324   ROCKSDB_FIELD_UNUSED
325 #endif
326   ColumnFamilyData* cfd_;
327   // for diff snapshots we want the lower bound on the seqnum;
328   // if this value > 0 iterator will return internal keys
329   SequenceNumber start_seqnum_;
330 };
331 
332 // Return a new iterator that converts internal keys (yielded by
333 // "*internal_iter") that were live at the specified `sequence` number
334 // into appropriate user keys.
335 extern Iterator* NewDBIterator(
336     Env* env, const ReadOptions& read_options,
337     const ImmutableCFOptions& cf_options,
338     const MutableCFOptions& mutable_cf_options,
339     const Comparator* user_key_comparator, InternalIterator* internal_iter,
340     const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations,
341     ReadCallback* read_callback, DBImpl* db_impl = nullptr,
342     ColumnFamilyData* cfd = nullptr, bool allow_blob = false);
343 
344 }  // namespace ROCKSDB_NAMESPACE
345