1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #ifndef ROCKSDB_LITE
11 #include "table/cuckoo/cuckoo_table_reader.h"
12 
13 #include <algorithm>
14 #include <limits>
15 #include <string>
16 #include <utility>
17 #include <vector>
18 
19 #include "memory/arena.h"
20 #include "options/cf_options.h"
21 #include "rocksdb/iterator.h"
22 #include "rocksdb/table.h"
23 #include "table/cuckoo/cuckoo_table_factory.h"
24 #include "table/get_context.h"
25 #include "table/internal_iterator.h"
26 #include "table/meta_blocks.h"
27 #include "util/coding.h"
28 
29 namespace ROCKSDB_NAMESPACE {
30 namespace {
31 const uint64_t CACHE_LINE_MASK = ~((uint64_t)CACHE_LINE_SIZE - 1);
32 const uint32_t kInvalidIndex = std::numeric_limits<uint32_t>::max();
33 }
34 
35 extern const uint64_t kCuckooTableMagicNumber;
36 
CuckooTableReader(const ImmutableOptions & ioptions,std::unique_ptr<RandomAccessFileReader> && file,uint64_t file_size,const Comparator * comparator,uint64_t (* get_slice_hash)(const Slice &,uint32_t,uint64_t))37 CuckooTableReader::CuckooTableReader(
38     const ImmutableOptions& ioptions,
39     std::unique_ptr<RandomAccessFileReader>&& file, uint64_t file_size,
40     const Comparator* comparator,
41     uint64_t (*get_slice_hash)(const Slice&, uint32_t, uint64_t))
42     : file_(std::move(file)),
43       is_last_level_(false),
44       identity_as_first_hash_(false),
45       use_module_hash_(false),
46       num_hash_func_(0),
47       unused_key_(""),
48       key_length_(0),
49       user_key_length_(0),
50       value_length_(0),
51       bucket_length_(0),
52       cuckoo_block_size_(0),
53       cuckoo_block_bytes_minus_one_(0),
54       table_size_(0),
55       ucomp_(comparator),
56       get_slice_hash_(get_slice_hash) {
57   if (!ioptions.allow_mmap_reads) {
58     status_ = Status::InvalidArgument("File is not mmaped");
59     return;
60   }
61   TableProperties* props = nullptr;
62   status_ = ReadTableProperties(file_.get(), file_size, kCuckooTableMagicNumber,
63       ioptions, &props, true /* compression_type_missing */);
64   if (!status_.ok()) {
65     return;
66   }
67   table_props_.reset(props);
68   auto& user_props = props->user_collected_properties;
69   auto hash_funs = user_props.find(CuckooTablePropertyNames::kNumHashFunc);
70   if (hash_funs == user_props.end()) {
71     status_ = Status::Corruption("Number of hash functions not found");
72     return;
73   }
74   num_hash_func_ = *reinterpret_cast<const uint32_t*>(hash_funs->second.data());
75   auto unused_key = user_props.find(CuckooTablePropertyNames::kEmptyKey);
76   if (unused_key == user_props.end()) {
77     status_ = Status::Corruption("Empty bucket value not found");
78     return;
79   }
80   unused_key_ = unused_key->second;
81 
82   key_length_ = static_cast<uint32_t>(props->fixed_key_len);
83   auto user_key_len = user_props.find(CuckooTablePropertyNames::kUserKeyLength);
84   if (user_key_len == user_props.end()) {
85     status_ = Status::Corruption("User key length not found");
86     return;
87   }
88   user_key_length_ = *reinterpret_cast<const uint32_t*>(
89       user_key_len->second.data());
90 
91   auto value_length = user_props.find(CuckooTablePropertyNames::kValueLength);
92   if (value_length == user_props.end()) {
93     status_ = Status::Corruption("Value length not found");
94     return;
95   }
96   value_length_ = *reinterpret_cast<const uint32_t*>(
97       value_length->second.data());
98   bucket_length_ = key_length_ + value_length_;
99 
100   auto hash_table_size = user_props.find(
101       CuckooTablePropertyNames::kHashTableSize);
102   if (hash_table_size == user_props.end()) {
103     status_ = Status::Corruption("Hash table size not found");
104     return;
105   }
106   table_size_ = *reinterpret_cast<const uint64_t*>(
107       hash_table_size->second.data());
108 
109   auto is_last_level = user_props.find(CuckooTablePropertyNames::kIsLastLevel);
110   if (is_last_level == user_props.end()) {
111     status_ = Status::Corruption("Is last level not found");
112     return;
113   }
114   is_last_level_ = *reinterpret_cast<const bool*>(is_last_level->second.data());
115 
116   auto identity_as_first_hash = user_props.find(
117       CuckooTablePropertyNames::kIdentityAsFirstHash);
118   if (identity_as_first_hash == user_props.end()) {
119     status_ = Status::Corruption("identity as first hash not found");
120     return;
121   }
122   identity_as_first_hash_ = *reinterpret_cast<const bool*>(
123       identity_as_first_hash->second.data());
124 
125   auto use_module_hash = user_props.find(
126       CuckooTablePropertyNames::kUseModuleHash);
127   if (use_module_hash == user_props.end()) {
128     status_ = Status::Corruption("hash type is not found");
129     return;
130   }
131   use_module_hash_ = *reinterpret_cast<const bool*>(
132       use_module_hash->second.data());
133   auto cuckoo_block_size = user_props.find(
134       CuckooTablePropertyNames::kCuckooBlockSize);
135   if (cuckoo_block_size == user_props.end()) {
136     status_ = Status::Corruption("Cuckoo block size not found");
137     return;
138   }
139   cuckoo_block_size_ = *reinterpret_cast<const uint32_t*>(
140       cuckoo_block_size->second.data());
141   cuckoo_block_bytes_minus_one_ = cuckoo_block_size_ * bucket_length_ - 1;
142   status_ = file_->Read(IOOptions(), 0, static_cast<size_t>(file_size),
143                         &file_data_, nullptr, nullptr);
144 }
145 
Get(const ReadOptions &,const Slice & key,GetContext * get_context,const SliceTransform *,bool)146 Status CuckooTableReader::Get(const ReadOptions& /*readOptions*/,
147                               const Slice& key, GetContext* get_context,
148                               const SliceTransform* /* prefix_extractor */,
149                               bool /*skip_filters*/) {
150   assert(key.size() == key_length_ + (is_last_level_ ? 8 : 0));
151   Slice user_key = ExtractUserKey(key);
152   for (uint32_t hash_cnt = 0; hash_cnt < num_hash_func_; ++hash_cnt) {
153     uint64_t offset = bucket_length_ * CuckooHash(
154         user_key, hash_cnt, use_module_hash_, table_size_,
155         identity_as_first_hash_, get_slice_hash_);
156     const char* bucket = &file_data_.data()[offset];
157     for (uint32_t block_idx = 0; block_idx < cuckoo_block_size_;
158          ++block_idx, bucket += bucket_length_) {
159       if (ucomp_->Equal(Slice(unused_key_.data(), user_key.size()),
160                         Slice(bucket, user_key.size()))) {
161         return Status::OK();
162       }
163       // Here, we compare only the user key part as we support only one entry
164       // per user key and we don't support snapshot.
165       if (ucomp_->Equal(user_key, Slice(bucket, user_key.size()))) {
166         Slice value(bucket + key_length_, value_length_);
167         if (is_last_level_) {
168           // Sequence number is not stored at the last level, so we will use
169           // kMaxSequenceNumber since it is unknown.  This could cause some
170           // transactions to fail to lock a key due to known sequence number.
171           // However, it is expected for anyone to use a CuckooTable in a
172           // TransactionDB.
173           get_context->SaveValue(value, kMaxSequenceNumber);
174         } else {
175           Slice full_key(bucket, key_length_);
176           ParsedInternalKey found_ikey;
177           Status s = ParseInternalKey(full_key, &found_ikey,
178                                       false /* log_err_key */);  // TODO
179           if (!s.ok()) return s;
180           bool dont_care __attribute__((__unused__));
181           get_context->SaveValue(found_ikey, value, &dont_care);
182         }
183         // We don't support merge operations. So, we return here.
184         return Status::OK();
185       }
186     }
187   }
188   return Status::OK();
189 }
190 
Prepare(const Slice & key)191 void CuckooTableReader::Prepare(const Slice& key) {
192   // Prefetch the first Cuckoo Block.
193   Slice user_key = ExtractUserKey(key);
194   uint64_t addr = reinterpret_cast<uint64_t>(file_data_.data()) +
195     bucket_length_ * CuckooHash(user_key, 0, use_module_hash_, table_size_,
196                                 identity_as_first_hash_, nullptr);
197   uint64_t end_addr = addr + cuckoo_block_bytes_minus_one_;
198   for (addr &= CACHE_LINE_MASK; addr < end_addr; addr += CACHE_LINE_SIZE) {
199     PREFETCH(reinterpret_cast<const char*>(addr), 0, 3);
200   }
201 }
202 
203 class CuckooTableIterator : public InternalIterator {
204  public:
205   explicit CuckooTableIterator(CuckooTableReader* reader);
206   // No copying allowed
207   CuckooTableIterator(const CuckooTableIterator&) = delete;
208   void operator=(const Iterator&) = delete;
~CuckooTableIterator()209   ~CuckooTableIterator() override {}
210   bool Valid() const override;
211   void SeekToFirst() override;
212   void SeekToLast() override;
213   void Seek(const Slice& target) override;
214   void SeekForPrev(const Slice& target) override;
215   void Next() override;
216   void Prev() override;
217   Slice key() const override;
218   Slice value() const override;
status() const219   Status status() const override { return Status::OK(); }
220   void InitIfNeeded();
221 
222  private:
223   struct BucketComparator {
BucketComparatorROCKSDB_NAMESPACE::CuckooTableIterator::BucketComparator224     BucketComparator(const Slice& file_data, const Comparator* ucomp,
225                      uint32_t bucket_len, uint32_t user_key_len,
226                      const Slice& target = Slice())
227       : file_data_(file_data),
228         ucomp_(ucomp),
229         bucket_len_(bucket_len),
230         user_key_len_(user_key_len),
231         target_(target) {}
operator ()ROCKSDB_NAMESPACE::CuckooTableIterator::BucketComparator232     bool operator()(const uint32_t first, const uint32_t second) const {
233       const char* first_bucket =
234         (first == kInvalidIndex) ? target_.data() :
235                                    &file_data_.data()[first * bucket_len_];
236       const char* second_bucket =
237         (second == kInvalidIndex) ? target_.data() :
238                                     &file_data_.data()[second * bucket_len_];
239       return ucomp_->Compare(Slice(first_bucket, user_key_len_),
240                              Slice(second_bucket, user_key_len_)) < 0;
241     }
242    private:
243     const Slice file_data_;
244     const Comparator* ucomp_;
245     const uint32_t bucket_len_;
246     const uint32_t user_key_len_;
247     const Slice target_;
248   };
249 
250   const BucketComparator bucket_comparator_;
251   void PrepareKVAtCurrIdx();
252   CuckooTableReader* reader_;
253   bool initialized_;
254   // Contains a map of keys to bucket_id sorted in key order.
255   std::vector<uint32_t> sorted_bucket_ids_;
256   // We assume that the number of items can be stored in uint32 (4 Billion).
257   uint32_t curr_key_idx_;
258   Slice curr_value_;
259   IterKey curr_key_;
260 };
261 
CuckooTableIterator(CuckooTableReader * reader)262 CuckooTableIterator::CuckooTableIterator(CuckooTableReader* reader)
263   : bucket_comparator_(reader->file_data_, reader->ucomp_,
264                        reader->bucket_length_, reader->user_key_length_),
265     reader_(reader),
266     initialized_(false),
267     curr_key_idx_(kInvalidIndex) {
268   sorted_bucket_ids_.clear();
269   curr_value_.clear();
270   curr_key_.Clear();
271 }
272 
InitIfNeeded()273 void CuckooTableIterator::InitIfNeeded() {
274   if (initialized_) {
275     return;
276   }
277   sorted_bucket_ids_.reserve(static_cast<size_t>(reader_->GetTableProperties()->num_entries));
278   uint64_t num_buckets = reader_->table_size_ + reader_->cuckoo_block_size_ - 1;
279   assert(num_buckets < kInvalidIndex);
280   const char* bucket = reader_->file_data_.data();
281   for (uint32_t bucket_id = 0; bucket_id < num_buckets; ++bucket_id) {
282     if (Slice(bucket, reader_->key_length_) != Slice(reader_->unused_key_)) {
283       sorted_bucket_ids_.push_back(bucket_id);
284     }
285     bucket += reader_->bucket_length_;
286   }
287   assert(sorted_bucket_ids_.size() ==
288       reader_->GetTableProperties()->num_entries);
289   std::sort(sorted_bucket_ids_.begin(), sorted_bucket_ids_.end(),
290             bucket_comparator_);
291   curr_key_idx_ = kInvalidIndex;
292   initialized_ = true;
293 }
294 
SeekToFirst()295 void CuckooTableIterator::SeekToFirst() {
296   InitIfNeeded();
297   curr_key_idx_ = 0;
298   PrepareKVAtCurrIdx();
299 }
300 
SeekToLast()301 void CuckooTableIterator::SeekToLast() {
302   InitIfNeeded();
303   curr_key_idx_ = static_cast<uint32_t>(sorted_bucket_ids_.size()) - 1;
304   PrepareKVAtCurrIdx();
305 }
306 
Seek(const Slice & target)307 void CuckooTableIterator::Seek(const Slice& target) {
308   InitIfNeeded();
309   const BucketComparator seek_comparator(
310       reader_->file_data_, reader_->ucomp_,
311       reader_->bucket_length_, reader_->user_key_length_,
312       ExtractUserKey(target));
313   auto seek_it = std::lower_bound(sorted_bucket_ids_.begin(),
314       sorted_bucket_ids_.end(),
315       kInvalidIndex,
316       seek_comparator);
317   curr_key_idx_ =
318       static_cast<uint32_t>(std::distance(sorted_bucket_ids_.begin(), seek_it));
319   PrepareKVAtCurrIdx();
320 }
321 
SeekForPrev(const Slice &)322 void CuckooTableIterator::SeekForPrev(const Slice& /*target*/) {
323   // Not supported
324   assert(false);
325 }
326 
Valid() const327 bool CuckooTableIterator::Valid() const {
328   return curr_key_idx_ < sorted_bucket_ids_.size();
329 }
330 
PrepareKVAtCurrIdx()331 void CuckooTableIterator::PrepareKVAtCurrIdx() {
332   if (!Valid()) {
333     curr_value_.clear();
334     curr_key_.Clear();
335     return;
336   }
337   uint32_t id = sorted_bucket_ids_[curr_key_idx_];
338   const char* offset = reader_->file_data_.data() +
339                        id * reader_->bucket_length_;
340   if (reader_->is_last_level_) {
341     // Always return internal key.
342     curr_key_.SetInternalKey(Slice(offset, reader_->user_key_length_),
343                              0, kTypeValue);
344   } else {
345     curr_key_.SetInternalKey(Slice(offset, reader_->key_length_));
346   }
347   curr_value_ = Slice(offset + reader_->key_length_, reader_->value_length_);
348 }
349 
Next()350 void CuckooTableIterator::Next() {
351   if (!Valid()) {
352     curr_value_.clear();
353     curr_key_.Clear();
354     return;
355   }
356   ++curr_key_idx_;
357   PrepareKVAtCurrIdx();
358 }
359 
Prev()360 void CuckooTableIterator::Prev() {
361   if (curr_key_idx_ == 0) {
362     curr_key_idx_ = static_cast<uint32_t>(sorted_bucket_ids_.size());
363   }
364   if (!Valid()) {
365     curr_value_.clear();
366     curr_key_.Clear();
367     return;
368   }
369   --curr_key_idx_;
370   PrepareKVAtCurrIdx();
371 }
372 
key() const373 Slice CuckooTableIterator::key() const {
374   assert(Valid());
375   return curr_key_.GetInternalKey();
376 }
377 
value() const378 Slice CuckooTableIterator::value() const {
379   assert(Valid());
380   return curr_value_;
381 }
382 
NewIterator(const ReadOptions &,const SliceTransform *,Arena * arena,bool,TableReaderCaller,size_t,bool)383 InternalIterator* CuckooTableReader::NewIterator(
384     const ReadOptions& /*read_options*/,
385     const SliceTransform* /* prefix_extractor */, Arena* arena,
386     bool /*skip_filters*/, TableReaderCaller /*caller*/,
387     size_t /*compaction_readahead_size*/,
388     bool /* allow_unprepared_value */) {
389   if (!status().ok()) {
390     return NewErrorInternalIterator<Slice>(
391         Status::Corruption("CuckooTableReader status is not okay."), arena);
392   }
393   CuckooTableIterator* iter;
394   if (arena == nullptr) {
395     iter = new CuckooTableIterator(this);
396   } else {
397     auto iter_mem = arena->AllocateAligned(sizeof(CuckooTableIterator));
398     iter = new (iter_mem) CuckooTableIterator(this);
399   }
400   return iter;
401 }
402 
ApproximateMemoryUsage() const403 size_t CuckooTableReader::ApproximateMemoryUsage() const { return 0; }
404 
405 }  // namespace ROCKSDB_NAMESPACE
406 #endif
407