1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include "table/block_based/partitioned_filter_block.h"
7
8 #include <utility>
9
10 #include "monitoring/perf_context_imp.h"
11 #include "port/malloc.h"
12 #include "port/port.h"
13 #include "rocksdb/filter_policy.h"
14 #include "table/block_based/block.h"
15 #include "table/block_based/block_based_table_reader.h"
16 #include "util/coding.h"
17
18 namespace ROCKSDB_NAMESPACE {
19
PartitionedFilterBlockBuilder(const SliceTransform * _prefix_extractor,bool whole_key_filtering,FilterBitsBuilder * filter_bits_builder,int index_block_restart_interval,const bool use_value_delta_encoding,PartitionedIndexBuilder * const p_index_builder,const uint32_t partition_size)20 PartitionedFilterBlockBuilder::PartitionedFilterBlockBuilder(
21 const SliceTransform* _prefix_extractor, bool whole_key_filtering,
22 FilterBitsBuilder* filter_bits_builder, int index_block_restart_interval,
23 const bool use_value_delta_encoding,
24 PartitionedIndexBuilder* const p_index_builder,
25 const uint32_t partition_size)
26 : FullFilterBlockBuilder(_prefix_extractor, whole_key_filtering,
27 filter_bits_builder),
28 index_on_filter_block_builder_(index_block_restart_interval,
29 true /*use_delta_encoding*/,
30 use_value_delta_encoding),
31 index_on_filter_block_builder_without_seq_(index_block_restart_interval,
32 true /*use_delta_encoding*/,
33 use_value_delta_encoding),
34 p_index_builder_(p_index_builder),
35 keys_added_to_partition_(0) {
36 keys_per_partition_ =
37 filter_bits_builder_->CalculateNumEntry(partition_size);
38 }
39
~PartitionedFilterBlockBuilder()40 PartitionedFilterBlockBuilder::~PartitionedFilterBlockBuilder() {}
41
MaybeCutAFilterBlock(const Slice * next_key)42 void PartitionedFilterBlockBuilder::MaybeCutAFilterBlock(
43 const Slice* next_key) {
44 // Use == to send the request only once
45 if (keys_added_to_partition_ == keys_per_partition_) {
46 // Currently only index builder is in charge of cutting a partition. We keep
47 // requesting until it is granted.
48 p_index_builder_->RequestPartitionCut();
49 }
50 if (!p_index_builder_->ShouldCutFilterBlock()) {
51 return;
52 }
53 filter_gc.push_back(std::unique_ptr<const char[]>(nullptr));
54
55 // Add the prefix of the next key before finishing the partition. This hack,
56 // fixes a bug with format_verison=3 where seeking for the prefix would lead
57 // us to the previous partition.
58 const bool add_prefix =
59 next_key && prefix_extractor() && prefix_extractor()->InDomain(*next_key);
60 if (add_prefix) {
61 FullFilterBlockBuilder::AddPrefix(*next_key);
62 }
63
64 Slice filter = filter_bits_builder_->Finish(&filter_gc.back());
65 std::string& index_key = p_index_builder_->GetPartitionKey();
66 filters.push_back({index_key, filter});
67 keys_added_to_partition_ = 0;
68 Reset();
69 }
70
Add(const Slice & key)71 void PartitionedFilterBlockBuilder::Add(const Slice& key) {
72 MaybeCutAFilterBlock(&key);
73 FullFilterBlockBuilder::Add(key);
74 }
75
AddKey(const Slice & key)76 void PartitionedFilterBlockBuilder::AddKey(const Slice& key) {
77 FullFilterBlockBuilder::AddKey(key);
78 keys_added_to_partition_++;
79 }
80
Finish(const BlockHandle & last_partition_block_handle,Status * status)81 Slice PartitionedFilterBlockBuilder::Finish(
82 const BlockHandle& last_partition_block_handle, Status* status) {
83 if (finishing_filters == true) {
84 // Record the handle of the last written filter block in the index
85 FilterEntry& last_entry = filters.front();
86 std::string handle_encoding;
87 last_partition_block_handle.EncodeTo(&handle_encoding);
88 std::string handle_delta_encoding;
89 PutVarsignedint64(
90 &handle_delta_encoding,
91 last_partition_block_handle.size() - last_encoded_handle_.size());
92 last_encoded_handle_ = last_partition_block_handle;
93 const Slice handle_delta_encoding_slice(handle_delta_encoding);
94 index_on_filter_block_builder_.Add(last_entry.key, handle_encoding,
95 &handle_delta_encoding_slice);
96 if (!p_index_builder_->seperator_is_key_plus_seq()) {
97 index_on_filter_block_builder_without_seq_.Add(
98 ExtractUserKey(last_entry.key), handle_encoding,
99 &handle_delta_encoding_slice);
100 }
101 filters.pop_front();
102 } else {
103 MaybeCutAFilterBlock(nullptr);
104 }
105 // If there is no filter partition left, then return the index on filter
106 // partitions
107 if (UNLIKELY(filters.empty())) {
108 *status = Status::OK();
109 if (finishing_filters) {
110 if (p_index_builder_->seperator_is_key_plus_seq()) {
111 return index_on_filter_block_builder_.Finish();
112 } else {
113 return index_on_filter_block_builder_without_seq_.Finish();
114 }
115 } else {
116 // This is the rare case where no key was added to the filter
117 return Slice();
118 }
119 } else {
120 // Return the next filter partition in line and set Incomplete() status to
121 // indicate we expect more calls to Finish
122 *status = Status::Incomplete();
123 finishing_filters = true;
124 return filters.front().filter;
125 }
126 }
127
PartitionedFilterBlockReader(const BlockBasedTable * t,CachableEntry<Block> && filter_block)128 PartitionedFilterBlockReader::PartitionedFilterBlockReader(
129 const BlockBasedTable* t, CachableEntry<Block>&& filter_block)
130 : FilterBlockReaderCommon(t, std::move(filter_block)) {}
131
Create(const BlockBasedTable * table,FilePrefetchBuffer * prefetch_buffer,bool use_cache,bool prefetch,bool pin,BlockCacheLookupContext * lookup_context)132 std::unique_ptr<FilterBlockReader> PartitionedFilterBlockReader::Create(
133 const BlockBasedTable* table, FilePrefetchBuffer* prefetch_buffer,
134 bool use_cache, bool prefetch, bool pin,
135 BlockCacheLookupContext* lookup_context) {
136 assert(table);
137 assert(table->get_rep());
138 assert(!pin || prefetch);
139
140 CachableEntry<Block> filter_block;
141 if (prefetch || !use_cache) {
142 const Status s = ReadFilterBlock(table, prefetch_buffer, ReadOptions(),
143 use_cache, nullptr /* get_context */,
144 lookup_context, &filter_block);
145 if (!s.ok()) {
146 return std::unique_ptr<FilterBlockReader>();
147 }
148
149 if (use_cache && !pin) {
150 filter_block.Reset();
151 }
152 }
153
154 return std::unique_ptr<FilterBlockReader>(
155 new PartitionedFilterBlockReader(table, std::move(filter_block)));
156 }
157
KeyMayMatch(const Slice & key,const SliceTransform * prefix_extractor,uint64_t block_offset,const bool no_io,const Slice * const const_ikey_ptr,GetContext * get_context,BlockCacheLookupContext * lookup_context)158 bool PartitionedFilterBlockReader::KeyMayMatch(
159 const Slice& key, const SliceTransform* prefix_extractor,
160 uint64_t block_offset, const bool no_io, const Slice* const const_ikey_ptr,
161 GetContext* get_context, BlockCacheLookupContext* lookup_context) {
162 assert(const_ikey_ptr != nullptr);
163 assert(block_offset == kNotValid);
164 if (!whole_key_filtering()) {
165 return true;
166 }
167
168 return MayMatch(key, prefix_extractor, block_offset, no_io, const_ikey_ptr,
169 get_context, lookup_context,
170 &FullFilterBlockReader::KeyMayMatch);
171 }
172
PrefixMayMatch(const Slice & prefix,const SliceTransform * prefix_extractor,uint64_t block_offset,const bool no_io,const Slice * const const_ikey_ptr,GetContext * get_context,BlockCacheLookupContext * lookup_context)173 bool PartitionedFilterBlockReader::PrefixMayMatch(
174 const Slice& prefix, const SliceTransform* prefix_extractor,
175 uint64_t block_offset, const bool no_io, const Slice* const const_ikey_ptr,
176 GetContext* get_context, BlockCacheLookupContext* lookup_context) {
177 #ifdef NDEBUG
178 (void)block_offset;
179 #endif
180 assert(const_ikey_ptr != nullptr);
181 assert(block_offset == kNotValid);
182 if (!table_prefix_extractor() && !prefix_extractor) {
183 return true;
184 }
185
186 return MayMatch(prefix, prefix_extractor, block_offset, no_io, const_ikey_ptr,
187 get_context, lookup_context,
188 &FullFilterBlockReader::PrefixMayMatch);
189 }
190
GetFilterPartitionHandle(const CachableEntry<Block> & filter_block,const Slice & entry) const191 BlockHandle PartitionedFilterBlockReader::GetFilterPartitionHandle(
192 const CachableEntry<Block>& filter_block, const Slice& entry) const {
193 IndexBlockIter iter;
194 const InternalKeyComparator* const comparator = internal_comparator();
195 Statistics* kNullStats = nullptr;
196 filter_block.GetValue()->NewIndexIterator(
197 comparator, comparator->user_comparator(), &iter, kNullStats,
198 true /* total_order_seek */, false /* have_first_key */,
199 index_key_includes_seq(), index_value_is_full());
200 iter.Seek(entry);
201 if (UNLIKELY(!iter.Valid())) {
202 // entry is larger than all the keys. However its prefix might still be
203 // present in the last partition. If this is called by PrefixMayMatch this
204 // is necessary for correct behavior. Otherwise it is unnecessary but safe.
205 // Assuming this is an unlikely case for full key search, the performance
206 // overhead should be negligible.
207 iter.SeekToLast();
208 }
209 assert(iter.Valid());
210 BlockHandle fltr_blk_handle = iter.value().handle;
211 return fltr_blk_handle;
212 }
213
GetFilterPartitionBlock(FilePrefetchBuffer * prefetch_buffer,const BlockHandle & fltr_blk_handle,bool no_io,GetContext * get_context,BlockCacheLookupContext * lookup_context,CachableEntry<ParsedFullFilterBlock> * filter_block) const214 Status PartitionedFilterBlockReader::GetFilterPartitionBlock(
215 FilePrefetchBuffer* prefetch_buffer, const BlockHandle& fltr_blk_handle,
216 bool no_io, GetContext* get_context,
217 BlockCacheLookupContext* lookup_context,
218 CachableEntry<ParsedFullFilterBlock>* filter_block) const {
219 assert(table());
220 assert(filter_block);
221 assert(filter_block->IsEmpty());
222
223 if (!filter_map_.empty()) {
224 auto iter = filter_map_.find(fltr_blk_handle.offset());
225 // This is a possible scenario since block cache might not have had space
226 // for the partition
227 if (iter != filter_map_.end()) {
228 filter_block->SetUnownedValue(iter->second.GetValue());
229 return Status::OK();
230 }
231 }
232
233 ReadOptions read_options;
234 if (no_io) {
235 read_options.read_tier = kBlockCacheTier;
236 }
237
238 const Status s =
239 table()->RetrieveBlock(prefetch_buffer, read_options, fltr_blk_handle,
240 UncompressionDict::GetEmptyDict(), filter_block,
241 BlockType::kFilter, get_context, lookup_context,
242 /* for_compaction */ false, /* use_cache */ true);
243
244 return s;
245 }
246
MayMatch(const Slice & slice,const SliceTransform * prefix_extractor,uint64_t block_offset,bool no_io,const Slice * const_ikey_ptr,GetContext * get_context,BlockCacheLookupContext * lookup_context,FilterFunction filter_function) const247 bool PartitionedFilterBlockReader::MayMatch(
248 const Slice& slice, const SliceTransform* prefix_extractor,
249 uint64_t block_offset, bool no_io, const Slice* const_ikey_ptr,
250 GetContext* get_context, BlockCacheLookupContext* lookup_context,
251 FilterFunction filter_function) const {
252 CachableEntry<Block> filter_block;
253 Status s =
254 GetOrReadFilterBlock(no_io, get_context, lookup_context, &filter_block);
255 if (UNLIKELY(!s.ok())) {
256 return true;
257 }
258
259 if (UNLIKELY(filter_block.GetValue()->size() == 0)) {
260 return true;
261 }
262
263 auto filter_handle = GetFilterPartitionHandle(filter_block, *const_ikey_ptr);
264 if (UNLIKELY(filter_handle.size() == 0)) { // key is out of range
265 return false;
266 }
267
268 CachableEntry<ParsedFullFilterBlock> filter_partition_block;
269 s = GetFilterPartitionBlock(nullptr /* prefetch_buffer */, filter_handle,
270 no_io, get_context, lookup_context,
271 &filter_partition_block);
272 if (UNLIKELY(!s.ok())) {
273 return true;
274 }
275
276 FullFilterBlockReader filter_partition(table(),
277 std::move(filter_partition_block));
278 return (filter_partition.*filter_function)(
279 slice, prefix_extractor, block_offset, no_io, const_ikey_ptr, get_context,
280 lookup_context);
281 }
282
ApproximateMemoryUsage() const283 size_t PartitionedFilterBlockReader::ApproximateMemoryUsage() const {
284 size_t usage = ApproximateFilterBlockMemoryUsage();
285 #ifdef ROCKSDB_MALLOC_USABLE_SIZE
286 usage += malloc_usable_size(const_cast<PartitionedFilterBlockReader*>(this));
287 #else
288 usage += sizeof(*this);
289 #endif // ROCKSDB_MALLOC_USABLE_SIZE
290 return usage;
291 // TODO(myabandeh): better estimation for filter_map_ size
292 }
293
294 // TODO(myabandeh): merge this with the same function in IndexReader
CacheDependencies(bool pin)295 void PartitionedFilterBlockReader::CacheDependencies(bool pin) {
296 assert(table());
297
298 const BlockBasedTable::Rep* const rep = table()->get_rep();
299 assert(rep);
300
301 BlockCacheLookupContext lookup_context{TableReaderCaller::kPrefetch};
302
303 CachableEntry<Block> filter_block;
304
305 Status s = GetOrReadFilterBlock(false /* no_io */, nullptr /* get_context */,
306 &lookup_context, &filter_block);
307 if (!s.ok()) {
308 ROCKS_LOG_WARN(rep->ioptions.info_log,
309 "Error retrieving top-level filter block while trying to "
310 "cache filter partitions: %s",
311 s.ToString().c_str());
312 return;
313 }
314
315 // Before read partitions, prefetch them to avoid lots of IOs
316 assert(filter_block.GetValue());
317
318 IndexBlockIter biter;
319 const InternalKeyComparator* const comparator = internal_comparator();
320 Statistics* kNullStats = nullptr;
321 filter_block.GetValue()->NewIndexIterator(
322 comparator, comparator->user_comparator(), &biter, kNullStats,
323 true /* total_order_seek */, false /* have_first_key */,
324 index_key_includes_seq(), index_value_is_full());
325 // Index partitions are assumed to be consecuitive. Prefetch them all.
326 // Read the first block offset
327 biter.SeekToFirst();
328 BlockHandle handle = biter.value().handle;
329 uint64_t prefetch_off = handle.offset();
330
331 // Read the last block's offset
332 biter.SeekToLast();
333 handle = biter.value().handle;
334 uint64_t last_off = handle.offset() + handle.size() + kBlockTrailerSize;
335 uint64_t prefetch_len = last_off - prefetch_off;
336 std::unique_ptr<FilePrefetchBuffer> prefetch_buffer;
337
338 prefetch_buffer.reset(new FilePrefetchBuffer());
339 s = prefetch_buffer->Prefetch(rep->file.get(), prefetch_off,
340 static_cast<size_t>(prefetch_len));
341
342 // After prefetch, read the partitions one by one
343 ReadOptions read_options;
344 for (biter.SeekToFirst(); biter.Valid(); biter.Next()) {
345 handle = biter.value().handle;
346
347 CachableEntry<ParsedFullFilterBlock> block;
348 // TODO: Support counter batch update for partitioned index and
349 // filter blocks
350 s = table()->MaybeReadBlockAndLoadToCache(
351 prefetch_buffer.get(), read_options, handle,
352 UncompressionDict::GetEmptyDict(), &block, BlockType::kFilter,
353 nullptr /* get_context */, &lookup_context, nullptr /* contents */);
354
355 assert(s.ok() || block.GetValue() == nullptr);
356 if (s.ok() && block.GetValue() != nullptr) {
357 if (block.IsCached()) {
358 if (pin) {
359 filter_map_[handle.offset()] = std::move(block);
360 }
361 }
362 }
363 }
364 }
365
internal_comparator() const366 const InternalKeyComparator* PartitionedFilterBlockReader::internal_comparator()
367 const {
368 assert(table());
369 assert(table()->get_rep());
370
371 return &table()->get_rep()->internal_comparator;
372 }
373
index_key_includes_seq() const374 bool PartitionedFilterBlockReader::index_key_includes_seq() const {
375 assert(table());
376 assert(table()->get_rep());
377
378 return table()->get_rep()->index_key_includes_seq;
379 }
380
index_value_is_full() const381 bool PartitionedFilterBlockReader::index_value_is_full() const {
382 assert(table());
383 assert(table()->get_rep());
384
385 return table()->get_rep()->index_value_is_full;
386 }
387
388 } // namespace ROCKSDB_NAMESPACE
389