1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #include "table/block_based/block_prefix_index.h"
7 
8 #include <vector>
9 
10 #include "memory/arena.h"
11 #include "rocksdb/comparator.h"
12 #include "rocksdb/slice.h"
13 #include "rocksdb/slice_transform.h"
14 #include "util/coding.h"
15 #include "util/hash.h"
16 
17 namespace ROCKSDB_NAMESPACE {
18 
Hash(const Slice & s)19 inline uint32_t Hash(const Slice& s) {
20   return ROCKSDB_NAMESPACE::Hash(s.data(), s.size(), 0);
21 }
22 
PrefixToBucket(const Slice & prefix,uint32_t num_buckets)23 inline uint32_t PrefixToBucket(const Slice& prefix, uint32_t num_buckets) {
24   return Hash(prefix) % num_buckets;
25 }
26 
27 // The prefix block index is simply a bucket array, with each entry pointing to
28 // the blocks that span the prefixes hashed to this bucket.
29 //
30 // To reduce memory footprint, if there is only one block per bucket, the entry
31 // stores the block id directly. If there are more than one blocks per bucket,
32 // because of hash collision or a single prefix spanning multiple blocks,
33 // the entry points to an array of block ids. The block array is an array of
34 // uint32_t's. The first uint32_t indicates the total number of blocks, followed
35 // by the block ids.
36 //
37 // To differentiate the two cases, the high order bit of the entry indicates
38 // whether it is a 'pointer' into a separate block array.
39 // 0x7FFFFFFF is reserved for empty bucket.
40 
41 const uint32_t kNoneBlock = 0x7FFFFFFF;
42 const uint32_t kBlockArrayMask = 0x80000000;
43 
IsNone(uint32_t block_id)44 inline bool IsNone(uint32_t block_id) { return block_id == kNoneBlock; }
45 
IsBlockId(uint32_t block_id)46 inline bool IsBlockId(uint32_t block_id) {
47   return (block_id & kBlockArrayMask) == 0;
48 }
49 
DecodeIndex(uint32_t block_id)50 inline uint32_t DecodeIndex(uint32_t block_id) {
51   uint32_t index = block_id ^ kBlockArrayMask;
52   assert(index < kBlockArrayMask);
53   return index;
54 }
55 
EncodeIndex(uint32_t index)56 inline uint32_t EncodeIndex(uint32_t index) {
57   assert(index < kBlockArrayMask);
58   return index | kBlockArrayMask;
59 }
60 
61 // temporary storage for prefix information during index building
62 struct PrefixRecord {
63   Slice prefix;
64   uint32_t start_block;
65   uint32_t end_block;
66   uint32_t num_blocks;
67   PrefixRecord* next;
68 };
69 
70 class BlockPrefixIndex::Builder {
71  public:
Builder(const SliceTransform * internal_prefix_extractor)72   explicit Builder(const SliceTransform* internal_prefix_extractor)
73       : internal_prefix_extractor_(internal_prefix_extractor) {}
74 
Add(const Slice & key_prefix,uint32_t start_block,uint32_t num_blocks)75   void Add(const Slice& key_prefix, uint32_t start_block, uint32_t num_blocks) {
76     PrefixRecord* record = reinterpret_cast<PrefixRecord*>(
77         arena_.AllocateAligned(sizeof(PrefixRecord)));
78     record->prefix = key_prefix;
79     record->start_block = start_block;
80     record->end_block = start_block + num_blocks - 1;
81     record->num_blocks = num_blocks;
82     prefixes_.push_back(record);
83   }
84 
Finish()85   BlockPrefixIndex* Finish() {
86     // For now, use roughly 1:1 prefix to bucket ratio.
87     uint32_t num_buckets = static_cast<uint32_t>(prefixes_.size()) + 1;
88 
89     // Collect prefix records that hash to the same bucket, into a single
90     // linklist.
91     std::vector<PrefixRecord*> prefixes_per_bucket(num_buckets, nullptr);
92     std::vector<uint32_t> num_blocks_per_bucket(num_buckets, 0);
93     for (PrefixRecord* current : prefixes_) {
94       uint32_t bucket = PrefixToBucket(current->prefix, num_buckets);
95       // merge the prefix block span if the first block of this prefix is
96       // connected to the last block of the previous prefix.
97       PrefixRecord* prev = prefixes_per_bucket[bucket];
98       if (prev) {
99         assert(current->start_block >= prev->end_block);
100         auto distance = current->start_block - prev->end_block;
101         if (distance <= 1) {
102           prev->end_block = current->end_block;
103           prev->num_blocks = prev->end_block - prev->start_block + 1;
104           num_blocks_per_bucket[bucket] += (current->num_blocks + distance - 1);
105           continue;
106         }
107       }
108       current->next = prev;
109       prefixes_per_bucket[bucket] = current;
110       num_blocks_per_bucket[bucket] += current->num_blocks;
111     }
112 
113     // Calculate the block array buffer size
114     uint32_t total_block_array_entries = 0;
115     for (uint32_t i = 0; i < num_buckets; i++) {
116       uint32_t num_blocks = num_blocks_per_bucket[i];
117       if (num_blocks > 1) {
118         total_block_array_entries += (num_blocks + 1);
119       }
120     }
121 
122     // Populate the final prefix block index
123     uint32_t* block_array_buffer = new uint32_t[total_block_array_entries];
124     uint32_t* buckets = new uint32_t[num_buckets];
125     uint32_t offset = 0;
126     for (uint32_t i = 0; i < num_buckets; i++) {
127       uint32_t num_blocks = num_blocks_per_bucket[i];
128       if (num_blocks == 0) {
129         assert(prefixes_per_bucket[i] == nullptr);
130         buckets[i] = kNoneBlock;
131       } else if (num_blocks == 1) {
132         assert(prefixes_per_bucket[i] != nullptr);
133         assert(prefixes_per_bucket[i]->next == nullptr);
134         buckets[i] = prefixes_per_bucket[i]->start_block;
135       } else {
136         assert(total_block_array_entries > 0);
137         assert(prefixes_per_bucket[i] != nullptr);
138         buckets[i] = EncodeIndex(offset);
139         block_array_buffer[offset] = num_blocks;
140         uint32_t* last_block = &block_array_buffer[offset + num_blocks];
141         auto current = prefixes_per_bucket[i];
142         // populate block ids from largest to smallest
143         while (current != nullptr) {
144           for (uint32_t iter = 0; iter < current->num_blocks; iter++) {
145             *last_block = current->end_block - iter;
146             last_block--;
147           }
148           current = current->next;
149         }
150         assert(last_block == &block_array_buffer[offset]);
151         offset += (num_blocks + 1);
152       }
153     }
154 
155     assert(offset == total_block_array_entries);
156 
157     return new BlockPrefixIndex(internal_prefix_extractor_, num_buckets,
158                                 buckets, total_block_array_entries,
159                                 block_array_buffer);
160   }
161 
162  private:
163   const SliceTransform* internal_prefix_extractor_;
164 
165   std::vector<PrefixRecord*> prefixes_;
166   Arena arena_;
167 };
168 
Create(const SliceTransform * internal_prefix_extractor,const Slice & prefixes,const Slice & prefix_meta,BlockPrefixIndex ** prefix_index)169 Status BlockPrefixIndex::Create(const SliceTransform* internal_prefix_extractor,
170                                 const Slice& prefixes, const Slice& prefix_meta,
171                                 BlockPrefixIndex** prefix_index) {
172   uint64_t pos = 0;
173   auto meta_pos = prefix_meta;
174   Status s;
175   Builder builder(internal_prefix_extractor);
176 
177   while (!meta_pos.empty()) {
178     uint32_t prefix_size = 0;
179     uint32_t entry_index = 0;
180     uint32_t num_blocks = 0;
181     if (!GetVarint32(&meta_pos, &prefix_size) ||
182         !GetVarint32(&meta_pos, &entry_index) ||
183         !GetVarint32(&meta_pos, &num_blocks)) {
184       s = Status::Corruption(
185           "Corrupted prefix meta block: unable to read from it.");
186       break;
187     }
188     if (pos + prefix_size > prefixes.size()) {
189       s = Status::Corruption(
190           "Corrupted prefix meta block: size inconsistency.");
191       break;
192     }
193     Slice prefix(prefixes.data() + pos, prefix_size);
194     builder.Add(prefix, entry_index, num_blocks);
195 
196     pos += prefix_size;
197   }
198 
199   if (s.ok() && pos != prefixes.size()) {
200     s = Status::Corruption("Corrupted prefix meta block");
201   }
202 
203   if (s.ok()) {
204     *prefix_index = builder.Finish();
205   }
206 
207   return s;
208 }
209 
GetBlocks(const Slice & key,uint32_t ** blocks)210 uint32_t BlockPrefixIndex::GetBlocks(const Slice& key, uint32_t** blocks) {
211   Slice prefix = internal_prefix_extractor_->Transform(key);
212 
213   uint32_t bucket = PrefixToBucket(prefix, num_buckets_);
214   uint32_t block_id = buckets_[bucket];
215 
216   if (IsNone(block_id)) {
217     return 0;
218   } else if (IsBlockId(block_id)) {
219     *blocks = &buckets_[bucket];
220     return 1;
221   } else {
222     uint32_t index = DecodeIndex(block_id);
223     assert(index < num_block_array_buffer_entries_);
224     *blocks = &block_array_buffer_[index + 1];
225     uint32_t num_blocks = block_array_buffer_[index];
226     assert(num_blocks > 1);
227     assert(index + num_blocks < num_block_array_buffer_entries_);
228     return num_blocks;
229   }
230 }
231 
232 }  // namespace ROCKSDB_NAMESPACE
233