1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #include "table/block_based/block_based_table_reader.h"
10
11 #include <algorithm>
12 #include <array>
13 #include <limits>
14 #include <string>
15 #include <utility>
16 #include <vector>
17
18 #include "cache/cache_entry_roles.h"
19 #include "cache/sharded_cache.h"
20 #include "db/dbformat.h"
21 #include "db/pinned_iterators_manager.h"
22 #include "file/file_prefetch_buffer.h"
23 #include "file/file_util.h"
24 #include "file/random_access_file_reader.h"
25 #include "logging/logging.h"
26 #include "monitoring/perf_context_imp.h"
27 #include "port/lang.h"
28 #include "rocksdb/cache.h"
29 #include "rocksdb/comparator.h"
30 #include "rocksdb/convenience.h"
31 #include "rocksdb/env.h"
32 #include "rocksdb/file_system.h"
33 #include "rocksdb/filter_policy.h"
34 #include "rocksdb/iterator.h"
35 #include "rocksdb/options.h"
36 #include "rocksdb/snapshot.h"
37 #include "rocksdb/statistics.h"
38 #include "rocksdb/system_clock.h"
39 #include "rocksdb/table.h"
40 #include "rocksdb/table_properties.h"
41 #include "rocksdb/trace_record.h"
42 #include "table/block_based/binary_search_index_reader.h"
43 #include "table/block_based/block.h"
44 #include "table/block_based/block_based_filter_block.h"
45 #include "table/block_based/block_based_table_factory.h"
46 #include "table/block_based/block_based_table_iterator.h"
47 #include "table/block_based/block_like_traits.h"
48 #include "table/block_based/block_prefix_index.h"
49 #include "table/block_based/block_type.h"
50 #include "table/block_based/filter_block.h"
51 #include "table/block_based/full_filter_block.h"
52 #include "table/block_based/hash_index_reader.h"
53 #include "table/block_based/partitioned_filter_block.h"
54 #include "table/block_based/partitioned_index_reader.h"
55 #include "table/block_fetcher.h"
56 #include "table/format.h"
57 #include "table/get_context.h"
58 #include "table/internal_iterator.h"
59 #include "table/meta_blocks.h"
60 #include "table/multiget_context.h"
61 #include "table/persistent_cache_helper.h"
62 #include "table/persistent_cache_options.h"
63 #include "table/sst_file_writer_collectors.h"
64 #include "table/two_level_iterator.h"
65 #include "test_util/sync_point.h"
66 #include "util/coding.h"
67 #include "util/crc32c.h"
68 #include "util/stop_watch.h"
69 #include "util/string_util.h"
70
71 namespace ROCKSDB_NAMESPACE {
72
73 extern const uint64_t kBlockBasedTableMagicNumber;
74 extern const std::string kHashIndexPrefixesBlock;
75 extern const std::string kHashIndexPrefixesMetadataBlock;
76
~BlockBasedTable()77 BlockBasedTable::~BlockBasedTable() {
78 delete rep_;
79 }
80
81 std::atomic<uint64_t> BlockBasedTable::next_cache_key_id_(0);
82
83 namespace {
84 // Read the block identified by "handle" from "file".
85 // The only relevant option is options.verify_checksums for now.
86 // On failure return non-OK.
87 // On success fill *result and return OK - caller owns *result
88 // @param uncompression_dict Data for presetting the compression library's
89 // dictionary.
90 template <typename TBlocklike>
ReadBlockFromFile(RandomAccessFileReader * file,FilePrefetchBuffer * prefetch_buffer,const Footer & footer,const ReadOptions & options,const BlockHandle & handle,std::unique_ptr<TBlocklike> * result,const ImmutableOptions & ioptions,bool do_uncompress,bool maybe_compressed,BlockType block_type,const UncompressionDict & uncompression_dict,const PersistentCacheOptions & cache_options,size_t read_amp_bytes_per_bit,MemoryAllocator * memory_allocator,bool for_compaction,bool using_zstd,const FilterPolicy * filter_policy)91 Status ReadBlockFromFile(
92 RandomAccessFileReader* file, FilePrefetchBuffer* prefetch_buffer,
93 const Footer& footer, const ReadOptions& options, const BlockHandle& handle,
94 std::unique_ptr<TBlocklike>* result, const ImmutableOptions& ioptions,
95 bool do_uncompress, bool maybe_compressed, BlockType block_type,
96 const UncompressionDict& uncompression_dict,
97 const PersistentCacheOptions& cache_options, size_t read_amp_bytes_per_bit,
98 MemoryAllocator* memory_allocator, bool for_compaction, bool using_zstd,
99 const FilterPolicy* filter_policy) {
100 assert(result);
101
102 BlockContents contents;
103 BlockFetcher block_fetcher(
104 file, prefetch_buffer, footer, options, handle, &contents, ioptions,
105 do_uncompress, maybe_compressed, block_type, uncompression_dict,
106 cache_options, memory_allocator, nullptr, for_compaction);
107 Status s = block_fetcher.ReadBlockContents();
108 if (s.ok()) {
109 result->reset(BlocklikeTraits<TBlocklike>::Create(
110 std::move(contents), read_amp_bytes_per_bit, ioptions.stats, using_zstd,
111 filter_policy));
112 }
113
114 return s;
115 }
116
117 // Release the cached entry and decrement its ref count.
118 // Do not force erase
ReleaseCachedEntry(void * arg,void * h)119 void ReleaseCachedEntry(void* arg, void* h) {
120 Cache* cache = reinterpret_cast<Cache*>(arg);
121 Cache::Handle* handle = reinterpret_cast<Cache::Handle*>(h);
122 cache->Release(handle, false /* force_erase */);
123 }
124
125 // For hash based index, return true if prefix_extractor and
126 // prefix_extractor_block mismatch, false otherwise. This flag will be used
127 // as total_order_seek via NewIndexIterator
PrefixExtractorChanged(const TableProperties * table_properties,const SliceTransform * prefix_extractor)128 bool PrefixExtractorChanged(const TableProperties* table_properties,
129 const SliceTransform* prefix_extractor) {
130 // BlockBasedTableOptions::kHashSearch requires prefix_extractor to be set.
131 // Turn off hash index in prefix_extractor is not set; if prefix_extractor
132 // is set but prefix_extractor_block is not set, also disable hash index
133 if (prefix_extractor == nullptr || table_properties == nullptr ||
134 table_properties->prefix_extractor_name.empty()) {
135 return true;
136 }
137
138 // prefix_extractor and prefix_extractor_block are both non-empty
139 if (table_properties->prefix_extractor_name != prefix_extractor->AsString()) {
140 return true;
141 } else {
142 return false;
143 }
144 }
145
CopyBufferToHeap(MemoryAllocator * allocator,Slice & buf)146 CacheAllocationPtr CopyBufferToHeap(MemoryAllocator* allocator, Slice& buf) {
147 CacheAllocationPtr heap_buf;
148 heap_buf = AllocateBlock(buf.size(), allocator);
149 memcpy(heap_buf.get(), buf.data(), buf.size());
150 return heap_buf;
151 }
152 } // namespace
153
UpdateCacheHitMetrics(BlockType block_type,GetContext * get_context,size_t usage) const154 void BlockBasedTable::UpdateCacheHitMetrics(BlockType block_type,
155 GetContext* get_context,
156 size_t usage) const {
157 Statistics* const statistics = rep_->ioptions.stats;
158
159 PERF_COUNTER_ADD(block_cache_hit_count, 1);
160 PERF_COUNTER_BY_LEVEL_ADD(block_cache_hit_count, 1,
161 static_cast<uint32_t>(rep_->level));
162
163 if (get_context) {
164 ++get_context->get_context_stats_.num_cache_hit;
165 get_context->get_context_stats_.num_cache_bytes_read += usage;
166 } else {
167 RecordTick(statistics, BLOCK_CACHE_HIT);
168 RecordTick(statistics, BLOCK_CACHE_BYTES_READ, usage);
169 }
170
171 switch (block_type) {
172 case BlockType::kFilter:
173 PERF_COUNTER_ADD(block_cache_filter_hit_count, 1);
174
175 if (get_context) {
176 ++get_context->get_context_stats_.num_cache_filter_hit;
177 } else {
178 RecordTick(statistics, BLOCK_CACHE_FILTER_HIT);
179 }
180 break;
181
182 case BlockType::kCompressionDictionary:
183 // TODO: introduce perf counter for compression dictionary hit count
184 if (get_context) {
185 ++get_context->get_context_stats_.num_cache_compression_dict_hit;
186 } else {
187 RecordTick(statistics, BLOCK_CACHE_COMPRESSION_DICT_HIT);
188 }
189 break;
190
191 case BlockType::kIndex:
192 PERF_COUNTER_ADD(block_cache_index_hit_count, 1);
193
194 if (get_context) {
195 ++get_context->get_context_stats_.num_cache_index_hit;
196 } else {
197 RecordTick(statistics, BLOCK_CACHE_INDEX_HIT);
198 }
199 break;
200
201 default:
202 // TODO: introduce dedicated tickers/statistics/counters
203 // for range tombstones
204 if (get_context) {
205 ++get_context->get_context_stats_.num_cache_data_hit;
206 } else {
207 RecordTick(statistics, BLOCK_CACHE_DATA_HIT);
208 }
209 break;
210 }
211 }
212
UpdateCacheMissMetrics(BlockType block_type,GetContext * get_context) const213 void BlockBasedTable::UpdateCacheMissMetrics(BlockType block_type,
214 GetContext* get_context) const {
215 Statistics* const statistics = rep_->ioptions.stats;
216
217 // TODO: introduce aggregate (not per-level) block cache miss count
218 PERF_COUNTER_BY_LEVEL_ADD(block_cache_miss_count, 1,
219 static_cast<uint32_t>(rep_->level));
220
221 if (get_context) {
222 ++get_context->get_context_stats_.num_cache_miss;
223 } else {
224 RecordTick(statistics, BLOCK_CACHE_MISS);
225 }
226
227 // TODO: introduce perf counters for misses per block type
228 switch (block_type) {
229 case BlockType::kFilter:
230 if (get_context) {
231 ++get_context->get_context_stats_.num_cache_filter_miss;
232 } else {
233 RecordTick(statistics, BLOCK_CACHE_FILTER_MISS);
234 }
235 break;
236
237 case BlockType::kCompressionDictionary:
238 if (get_context) {
239 ++get_context->get_context_stats_.num_cache_compression_dict_miss;
240 } else {
241 RecordTick(statistics, BLOCK_CACHE_COMPRESSION_DICT_MISS);
242 }
243 break;
244
245 case BlockType::kIndex:
246 if (get_context) {
247 ++get_context->get_context_stats_.num_cache_index_miss;
248 } else {
249 RecordTick(statistics, BLOCK_CACHE_INDEX_MISS);
250 }
251 break;
252
253 default:
254 // TODO: introduce dedicated tickers/statistics/counters
255 // for range tombstones
256 if (get_context) {
257 ++get_context->get_context_stats_.num_cache_data_miss;
258 } else {
259 RecordTick(statistics, BLOCK_CACHE_DATA_MISS);
260 }
261 break;
262 }
263 }
264
UpdateCacheInsertionMetrics(BlockType block_type,GetContext * get_context,size_t usage,bool redundant,Statistics * const statistics)265 void BlockBasedTable::UpdateCacheInsertionMetrics(
266 BlockType block_type, GetContext* get_context, size_t usage, bool redundant,
267 Statistics* const statistics) {
268 // TODO: introduce perf counters for block cache insertions
269 if (get_context) {
270 ++get_context->get_context_stats_.num_cache_add;
271 if (redundant) {
272 ++get_context->get_context_stats_.num_cache_add_redundant;
273 }
274 get_context->get_context_stats_.num_cache_bytes_write += usage;
275 } else {
276 RecordTick(statistics, BLOCK_CACHE_ADD);
277 if (redundant) {
278 RecordTick(statistics, BLOCK_CACHE_ADD_REDUNDANT);
279 }
280 RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE, usage);
281 }
282
283 switch (block_type) {
284 case BlockType::kFilter:
285 if (get_context) {
286 ++get_context->get_context_stats_.num_cache_filter_add;
287 if (redundant) {
288 ++get_context->get_context_stats_.num_cache_filter_add_redundant;
289 }
290 get_context->get_context_stats_.num_cache_filter_bytes_insert += usage;
291 } else {
292 RecordTick(statistics, BLOCK_CACHE_FILTER_ADD);
293 if (redundant) {
294 RecordTick(statistics, BLOCK_CACHE_FILTER_ADD_REDUNDANT);
295 }
296 RecordTick(statistics, BLOCK_CACHE_FILTER_BYTES_INSERT, usage);
297 }
298 break;
299
300 case BlockType::kCompressionDictionary:
301 if (get_context) {
302 ++get_context->get_context_stats_.num_cache_compression_dict_add;
303 if (redundant) {
304 ++get_context->get_context_stats_
305 .num_cache_compression_dict_add_redundant;
306 }
307 get_context->get_context_stats_
308 .num_cache_compression_dict_bytes_insert += usage;
309 } else {
310 RecordTick(statistics, BLOCK_CACHE_COMPRESSION_DICT_ADD);
311 if (redundant) {
312 RecordTick(statistics, BLOCK_CACHE_COMPRESSION_DICT_ADD_REDUNDANT);
313 }
314 RecordTick(statistics, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT,
315 usage);
316 }
317 break;
318
319 case BlockType::kIndex:
320 if (get_context) {
321 ++get_context->get_context_stats_.num_cache_index_add;
322 if (redundant) {
323 ++get_context->get_context_stats_.num_cache_index_add_redundant;
324 }
325 get_context->get_context_stats_.num_cache_index_bytes_insert += usage;
326 } else {
327 RecordTick(statistics, BLOCK_CACHE_INDEX_ADD);
328 if (redundant) {
329 RecordTick(statistics, BLOCK_CACHE_INDEX_ADD_REDUNDANT);
330 }
331 RecordTick(statistics, BLOCK_CACHE_INDEX_BYTES_INSERT, usage);
332 }
333 break;
334
335 default:
336 // TODO: introduce dedicated tickers/statistics/counters
337 // for range tombstones
338 if (get_context) {
339 ++get_context->get_context_stats_.num_cache_data_add;
340 if (redundant) {
341 ++get_context->get_context_stats_.num_cache_data_add_redundant;
342 }
343 get_context->get_context_stats_.num_cache_data_bytes_insert += usage;
344 } else {
345 RecordTick(statistics, BLOCK_CACHE_DATA_ADD);
346 if (redundant) {
347 RecordTick(statistics, BLOCK_CACHE_DATA_ADD_REDUNDANT);
348 }
349 RecordTick(statistics, BLOCK_CACHE_DATA_BYTES_INSERT, usage);
350 }
351 break;
352 }
353 }
354
GetEntryFromCache(const CacheTier & cache_tier,Cache * block_cache,const Slice & key,BlockType block_type,const bool wait,GetContext * get_context,const Cache::CacheItemHelper * cache_helper,const Cache::CreateCallback & create_cb,Cache::Priority priority) const355 Cache::Handle* BlockBasedTable::GetEntryFromCache(
356 const CacheTier& cache_tier, Cache* block_cache, const Slice& key,
357 BlockType block_type, const bool wait, GetContext* get_context,
358 const Cache::CacheItemHelper* cache_helper,
359 const Cache::CreateCallback& create_cb, Cache::Priority priority) const {
360 Cache::Handle* cache_handle = nullptr;
361 if (cache_tier == CacheTier::kNonVolatileBlockTier) {
362 cache_handle = block_cache->Lookup(key, cache_helper, create_cb, priority,
363 wait, rep_->ioptions.statistics.get());
364 } else {
365 cache_handle = block_cache->Lookup(key, rep_->ioptions.statistics.get());
366 }
367
368 if (cache_handle != nullptr) {
369 UpdateCacheHitMetrics(block_type, get_context,
370 block_cache->GetUsage(cache_handle));
371 } else {
372 UpdateCacheMissMetrics(block_type, get_context);
373 }
374
375 return cache_handle;
376 }
377
378 template <typename TBlocklike>
InsertEntryToCache(const CacheTier & cache_tier,Cache * block_cache,const Slice & key,const Cache::CacheItemHelper * cache_helper,std::unique_ptr<TBlocklike> & block_holder,size_t charge,Cache::Handle ** cache_handle,Cache::Priority priority) const379 Status BlockBasedTable::InsertEntryToCache(
380 const CacheTier& cache_tier, Cache* block_cache, const Slice& key,
381 const Cache::CacheItemHelper* cache_helper,
382 std::unique_ptr<TBlocklike>& block_holder, size_t charge,
383 Cache::Handle** cache_handle, Cache::Priority priority) const {
384 Status s = Status::OK();
385 if (cache_tier == CacheTier::kNonVolatileBlockTier) {
386 s = block_cache->Insert(key, block_holder.get(), cache_helper, charge,
387 cache_handle, priority);
388 } else {
389 s = block_cache->Insert(key, block_holder.get(), charge,
390 cache_helper->del_cb, cache_handle, priority);
391 }
392 return s;
393 }
394
395 // Helper function to setup the cache key's prefix for the Table.
SetupCacheKeyPrefix(Rep * rep,const std::string & db_session_id,uint64_t file_num)396 void BlockBasedTable::SetupCacheKeyPrefix(Rep* rep,
397 const std::string& db_session_id,
398 uint64_t file_num) {
399 assert(kMaxCacheKeyPrefixSize >= 10);
400 rep->cache_key_prefix_size = 0;
401 rep->compressed_cache_key_prefix_size = 0;
402 if (rep->table_options.block_cache != nullptr) {
403 GenerateCachePrefix<Cache, FSRandomAccessFile>(
404 rep->table_options.block_cache.get(), rep->file->file(),
405 &rep->cache_key_prefix[0], &rep->cache_key_prefix_size, db_session_id,
406 file_num);
407 }
408 if (rep->table_options.block_cache_compressed != nullptr) {
409 GenerateCachePrefix<Cache, FSRandomAccessFile>(
410 rep->table_options.block_cache_compressed.get(), rep->file->file(),
411 &rep->compressed_cache_key_prefix[0],
412 &rep->compressed_cache_key_prefix_size, db_session_id, file_num);
413 }
414 if (rep->table_options.persistent_cache != nullptr) {
415 char persistent_cache_key_prefix[kMaxCacheKeyPrefixSize];
416 size_t persistent_cache_key_prefix_size = 0;
417
418 GenerateCachePrefix<PersistentCache, FSRandomAccessFile>(
419 rep->table_options.persistent_cache.get(), rep->file->file(),
420 &persistent_cache_key_prefix[0], &persistent_cache_key_prefix_size,
421 db_session_id, file_num);
422
423 rep->persistent_cache_options =
424 PersistentCacheOptions(rep->table_options.persistent_cache,
425 std::string(persistent_cache_key_prefix,
426 persistent_cache_key_prefix_size),
427 rep->ioptions.stats);
428 }
429 }
430
431 namespace {
432 // Return True if table_properties has `user_prop_name` has a `true` value
433 // or it doesn't contain this property (for backward compatible).
IsFeatureSupported(const TableProperties & table_properties,const std::string & user_prop_name,Logger * info_log)434 bool IsFeatureSupported(const TableProperties& table_properties,
435 const std::string& user_prop_name, Logger* info_log) {
436 auto& props = table_properties.user_collected_properties;
437 auto pos = props.find(user_prop_name);
438 // Older version doesn't have this value set. Skip this check.
439 if (pos != props.end()) {
440 if (pos->second == kPropFalse) {
441 return false;
442 } else if (pos->second != kPropTrue) {
443 ROCKS_LOG_WARN(info_log, "Property %s has invalidate value %s",
444 user_prop_name.c_str(), pos->second.c_str());
445 }
446 }
447 return true;
448 }
449
450 // Caller has to ensure seqno is not nullptr.
GetGlobalSequenceNumber(const TableProperties & table_properties,SequenceNumber largest_seqno,SequenceNumber * seqno)451 Status GetGlobalSequenceNumber(const TableProperties& table_properties,
452 SequenceNumber largest_seqno,
453 SequenceNumber* seqno) {
454 const auto& props = table_properties.user_collected_properties;
455 const auto version_pos = props.find(ExternalSstFilePropertyNames::kVersion);
456 const auto seqno_pos = props.find(ExternalSstFilePropertyNames::kGlobalSeqno);
457
458 *seqno = kDisableGlobalSequenceNumber;
459 if (version_pos == props.end()) {
460 if (seqno_pos != props.end()) {
461 std::array<char, 200> msg_buf;
462 // This is not an external sst file, global_seqno is not supported.
463 snprintf(
464 msg_buf.data(), msg_buf.max_size(),
465 "A non-external sst file have global seqno property with value %s",
466 seqno_pos->second.c_str());
467 return Status::Corruption(msg_buf.data());
468 }
469 return Status::OK();
470 }
471
472 uint32_t version = DecodeFixed32(version_pos->second.c_str());
473 if (version < 2) {
474 if (seqno_pos != props.end() || version != 1) {
475 std::array<char, 200> msg_buf;
476 // This is a v1 external sst file, global_seqno is not supported.
477 snprintf(msg_buf.data(), msg_buf.max_size(),
478 "An external sst file with version %u have global seqno "
479 "property with value %s",
480 version, seqno_pos->second.c_str());
481 return Status::Corruption(msg_buf.data());
482 }
483 return Status::OK();
484 }
485
486 // Since we have a plan to deprecate global_seqno, we do not return failure
487 // if seqno_pos == props.end(). We rely on version_pos to detect whether the
488 // SST is external.
489 SequenceNumber global_seqno(0);
490 if (seqno_pos != props.end()) {
491 global_seqno = DecodeFixed64(seqno_pos->second.c_str());
492 }
493 // SstTableReader open table reader with kMaxSequenceNumber as largest_seqno
494 // to denote it is unknown.
495 if (largest_seqno < kMaxSequenceNumber) {
496 if (global_seqno == 0) {
497 global_seqno = largest_seqno;
498 }
499 if (global_seqno != largest_seqno) {
500 std::array<char, 200> msg_buf;
501 snprintf(
502 msg_buf.data(), msg_buf.max_size(),
503 "An external sst file with version %u have global seqno property "
504 "with value %s, while largest seqno in the file is %llu",
505 version, seqno_pos->second.c_str(),
506 static_cast<unsigned long long>(largest_seqno));
507 return Status::Corruption(msg_buf.data());
508 }
509 }
510 *seqno = global_seqno;
511
512 if (global_seqno > kMaxSequenceNumber) {
513 std::array<char, 200> msg_buf;
514 snprintf(msg_buf.data(), msg_buf.max_size(),
515 "An external sst file with version %u have global seqno property "
516 "with value %llu, which is greater than kMaxSequenceNumber",
517 version, static_cast<unsigned long long>(global_seqno));
518 return Status::Corruption(msg_buf.data());
519 }
520
521 return Status::OK();
522 }
523 } // namespace
524
GetCacheKey(const char * cache_key_prefix,size_t cache_key_prefix_size,const BlockHandle & handle,char * cache_key)525 Slice BlockBasedTable::GetCacheKey(const char* cache_key_prefix,
526 size_t cache_key_prefix_size,
527 const BlockHandle& handle, char* cache_key) {
528 assert(cache_key != nullptr);
529 assert(cache_key_prefix_size != 0);
530 assert(cache_key_prefix_size <= kMaxCacheKeyPrefixSize);
531 memcpy(cache_key, cache_key_prefix, cache_key_prefix_size);
532 char* end =
533 EncodeVarint64(cache_key + cache_key_prefix_size, handle.offset());
534 return Slice(cache_key, static_cast<size_t>(end - cache_key));
535 }
536
Open(const ReadOptions & read_options,const ImmutableOptions & ioptions,const EnvOptions & env_options,const BlockBasedTableOptions & table_options,const InternalKeyComparator & internal_comparator,std::unique_ptr<RandomAccessFileReader> && file,uint64_t file_size,std::unique_ptr<TableReader> * table_reader,const SliceTransform * prefix_extractor,const bool prefetch_index_and_filter_in_cache,const bool skip_filters,const int level,const bool immortal_table,const SequenceNumber largest_seqno,const bool force_direct_prefetch,TailPrefetchStats * tail_prefetch_stats,BlockCacheTracer * const block_cache_tracer,size_t max_file_size_for_l0_meta_pin,const std::string & cur_db_session_id,uint64_t cur_file_num)537 Status BlockBasedTable::Open(
538 const ReadOptions& read_options, const ImmutableOptions& ioptions,
539 const EnvOptions& env_options, const BlockBasedTableOptions& table_options,
540 const InternalKeyComparator& internal_comparator,
541 std::unique_ptr<RandomAccessFileReader>&& file, uint64_t file_size,
542 std::unique_ptr<TableReader>* table_reader,
543 const SliceTransform* prefix_extractor,
544 const bool prefetch_index_and_filter_in_cache, const bool skip_filters,
545 const int level, const bool immortal_table,
546 const SequenceNumber largest_seqno, const bool force_direct_prefetch,
547 TailPrefetchStats* tail_prefetch_stats,
548 BlockCacheTracer* const block_cache_tracer,
549 size_t max_file_size_for_l0_meta_pin, const std::string& cur_db_session_id,
550 uint64_t cur_file_num) {
551 table_reader->reset();
552
553 Status s;
554 Footer footer;
555 std::unique_ptr<FilePrefetchBuffer> prefetch_buffer;
556
557 // Only retain read_options.deadline and read_options.io_timeout.
558 // In future, we may retain more
559 // options. Specifically, w ignore verify_checksums and default to
560 // checksum verification anyway when creating the index and filter
561 // readers.
562 ReadOptions ro;
563 ro.deadline = read_options.deadline;
564 ro.io_timeout = read_options.io_timeout;
565
566 // prefetch both index and filters, down to all partitions
567 const bool prefetch_all = prefetch_index_and_filter_in_cache || level == 0;
568 const bool preload_all = !table_options.cache_index_and_filter_blocks;
569
570 if (!ioptions.allow_mmap_reads) {
571 s = PrefetchTail(ro, file.get(), file_size, force_direct_prefetch,
572 tail_prefetch_stats, prefetch_all, preload_all,
573 &prefetch_buffer);
574 // Return error in prefetch path to users.
575 if (!s.ok()) {
576 return s;
577 }
578 } else {
579 // Should not prefetch for mmap mode.
580 prefetch_buffer.reset(new FilePrefetchBuffer(
581 nullptr, 0, 0, false /* enable */, true /* track_min_offset */));
582 }
583
584 // Read in the following order:
585 // 1. Footer
586 // 2. [metaindex block]
587 // 3. [meta block: properties]
588 // 4. [meta block: range deletion tombstone]
589 // 5. [meta block: compression dictionary]
590 // 6. [meta block: index]
591 // 7. [meta block: filter]
592 IOOptions opts;
593 s = file->PrepareIOOptions(ro, opts);
594 if (s.ok()) {
595 s = ReadFooterFromFile(opts, file.get(), prefetch_buffer.get(), file_size,
596 &footer, kBlockBasedTableMagicNumber);
597 }
598 if (!s.ok()) {
599 return s;
600 }
601 if (!BlockBasedTableSupportedVersion(footer.version())) {
602 return Status::Corruption(
603 "Unknown Footer version. Maybe this file was created with newer "
604 "version of RocksDB?");
605 }
606
607 // We've successfully read the footer. We are ready to serve requests.
608 // Better not mutate rep_ after the creation. eg. internal_prefix_transform
609 // raw pointer will be used to create HashIndexReader, whose reset may
610 // access a dangling pointer.
611 BlockCacheLookupContext lookup_context{TableReaderCaller::kPrefetch};
612 Rep* rep = new BlockBasedTable::Rep(ioptions, env_options, table_options,
613 internal_comparator, skip_filters,
614 file_size, level, immortal_table);
615 rep->file = std::move(file);
616 rep->footer = footer;
617 rep->hash_index_allow_collision = table_options.hash_index_allow_collision;
618 // We need to wrap data with internal_prefix_transform to make sure it can
619 // handle prefix correctly.
620 if (prefix_extractor != nullptr) {
621 rep->internal_prefix_transform.reset(
622 new InternalKeySliceTransform(prefix_extractor));
623 }
624
625 // For fully portable/stable cache keys, we need to read the properties
626 // block before setting up cache keys. TODO: consider setting up a bootstrap
627 // cache key for PersistentCache to use for metaindex and properties blocks.
628 rep->persistent_cache_options = PersistentCacheOptions();
629
630 // Meta-blocks are not dictionary compressed. Explicitly set the dictionary
631 // handle to null, otherwise it may be seen as uninitialized during the below
632 // meta-block reads.
633 rep->compression_dict_handle = BlockHandle::NullBlockHandle();
634
635 // Read metaindex
636 std::unique_ptr<BlockBasedTable> new_table(
637 new BlockBasedTable(rep, block_cache_tracer));
638 std::unique_ptr<Block> metaindex;
639 std::unique_ptr<InternalIterator> metaindex_iter;
640 s = new_table->ReadMetaIndexBlock(ro, prefetch_buffer.get(), &metaindex,
641 &metaindex_iter);
642 if (!s.ok()) {
643 return s;
644 }
645
646 // Populates table_properties and some fields that depend on it,
647 // such as index_type.
648 s = new_table->ReadPropertiesBlock(ro, prefetch_buffer.get(),
649 metaindex_iter.get(), largest_seqno);
650 if (!s.ok()) {
651 return s;
652 }
653
654 // With properties loaded, we can set up portable/stable cache keys if
655 // necessary info is available
656 std::string db_session_id;
657 uint64_t file_num;
658 if (rep->table_properties && !rep->table_properties->db_session_id.empty() &&
659 rep->table_properties->orig_file_number > 0) {
660 // We must have both properties to get a stable unique id because
661 // CreateColumnFamilyWithImport or IngestExternalFiles can change the
662 // file numbers on a file.
663 db_session_id = rep->table_properties->db_session_id;
664 file_num = rep->table_properties->orig_file_number;
665 } else {
666 // We have to use transient (but unique) cache keys based on current
667 // identifiers.
668 db_session_id = cur_db_session_id;
669 file_num = cur_file_num;
670 }
671 SetupCacheKeyPrefix(rep, db_session_id, file_num);
672
673 s = new_table->ReadRangeDelBlock(ro, prefetch_buffer.get(),
674 metaindex_iter.get(), internal_comparator,
675 &lookup_context);
676 if (!s.ok()) {
677 return s;
678 }
679 s = new_table->PrefetchIndexAndFilterBlocks(
680 ro, prefetch_buffer.get(), metaindex_iter.get(), new_table.get(),
681 prefetch_all, table_options, level, file_size,
682 max_file_size_for_l0_meta_pin, &lookup_context);
683
684 if (s.ok()) {
685 // Update tail prefetch stats
686 assert(prefetch_buffer.get() != nullptr);
687 if (tail_prefetch_stats != nullptr) {
688 assert(prefetch_buffer->min_offset_read() < file_size);
689 tail_prefetch_stats->RecordEffectiveSize(
690 static_cast<size_t>(file_size) - prefetch_buffer->min_offset_read());
691 }
692
693 *table_reader = std::move(new_table);
694 }
695
696 return s;
697 }
698
PrefetchTail(const ReadOptions & ro,RandomAccessFileReader * file,uint64_t file_size,bool force_direct_prefetch,TailPrefetchStats * tail_prefetch_stats,const bool prefetch_all,const bool preload_all,std::unique_ptr<FilePrefetchBuffer> * prefetch_buffer)699 Status BlockBasedTable::PrefetchTail(
700 const ReadOptions& ro, RandomAccessFileReader* file, uint64_t file_size,
701 bool force_direct_prefetch, TailPrefetchStats* tail_prefetch_stats,
702 const bool prefetch_all, const bool preload_all,
703 std::unique_ptr<FilePrefetchBuffer>* prefetch_buffer) {
704 size_t tail_prefetch_size = 0;
705 if (tail_prefetch_stats != nullptr) {
706 // Multiple threads may get a 0 (no history) when running in parallel,
707 // but it will get cleared after the first of them finishes.
708 tail_prefetch_size = tail_prefetch_stats->GetSuggestedPrefetchSize();
709 }
710 if (tail_prefetch_size == 0) {
711 // Before read footer, readahead backwards to prefetch data. Do more
712 // readahead if we're going to read index/filter.
713 // TODO: This may incorrectly select small readahead in case partitioned
714 // index/filter is enabled and top-level partition pinning is enabled.
715 // That's because we need to issue readahead before we read the properties,
716 // at which point we don't yet know the index type.
717 tail_prefetch_size = prefetch_all || preload_all ? 512 * 1024 : 4 * 1024;
718 }
719 size_t prefetch_off;
720 size_t prefetch_len;
721 if (file_size < tail_prefetch_size) {
722 prefetch_off = 0;
723 prefetch_len = static_cast<size_t>(file_size);
724 } else {
725 prefetch_off = static_cast<size_t>(file_size - tail_prefetch_size);
726 prefetch_len = tail_prefetch_size;
727 }
728 TEST_SYNC_POINT_CALLBACK("BlockBasedTable::Open::TailPrefetchLen",
729 &tail_prefetch_size);
730
731 // Try file system prefetch
732 if (!file->use_direct_io() && !force_direct_prefetch) {
733 if (!file->Prefetch(prefetch_off, prefetch_len).IsNotSupported()) {
734 prefetch_buffer->reset(
735 new FilePrefetchBuffer(nullptr, 0, 0, false, true));
736 return Status::OK();
737 }
738 }
739
740 // Use `FilePrefetchBuffer`
741 prefetch_buffer->reset(new FilePrefetchBuffer(nullptr, 0, 0, true, true));
742 IOOptions opts;
743 Status s = file->PrepareIOOptions(ro, opts);
744 if (s.ok()) {
745 s = (*prefetch_buffer)->Prefetch(opts, file, prefetch_off, prefetch_len);
746 }
747 return s;
748 }
749
TryReadPropertiesWithGlobalSeqno(const ReadOptions & ro,FilePrefetchBuffer * prefetch_buffer,const Slice & handle_value,TableProperties ** table_properties)750 Status BlockBasedTable::TryReadPropertiesWithGlobalSeqno(
751 const ReadOptions& ro, FilePrefetchBuffer* prefetch_buffer,
752 const Slice& handle_value, TableProperties** table_properties) {
753 assert(table_properties != nullptr);
754 // If this is an external SST file ingested with write_global_seqno set to
755 // true, then we expect the checksum mismatch because checksum was written
756 // by SstFileWriter, but its global seqno in the properties block may have
757 // been changed during ingestion. In this case, we read the properties
758 // block, copy it to a memory buffer, change the global seqno to its
759 // original value, i.e. 0, and verify the checksum again.
760 BlockHandle props_block_handle;
761 CacheAllocationPtr tmp_buf;
762 Status s = ReadProperties(ro, handle_value, rep_->file.get(), prefetch_buffer,
763 rep_->footer, rep_->ioptions, table_properties,
764 false /* verify_checksum */, &props_block_handle,
765 &tmp_buf, false /* compression_type_missing */,
766 nullptr /* memory_allocator */);
767 if (s.ok() && tmp_buf) {
768 const auto seqno_pos_iter =
769 (*table_properties)
770 ->properties_offsets.find(
771 ExternalSstFilePropertyNames::kGlobalSeqno);
772 size_t block_size = static_cast<size_t>(props_block_handle.size());
773 if (seqno_pos_iter != (*table_properties)->properties_offsets.end()) {
774 uint64_t global_seqno_offset = seqno_pos_iter->second;
775 EncodeFixed64(
776 tmp_buf.get() + global_seqno_offset - props_block_handle.offset(), 0);
777 }
778 s = ROCKSDB_NAMESPACE::VerifyBlockChecksum(
779 rep_->footer.checksum(), tmp_buf.get(), block_size,
780 rep_->file->file_name(), props_block_handle.offset());
781 }
782 return s;
783 }
784
ReadPropertiesBlock(const ReadOptions & ro,FilePrefetchBuffer * prefetch_buffer,InternalIterator * meta_iter,const SequenceNumber largest_seqno)785 Status BlockBasedTable::ReadPropertiesBlock(
786 const ReadOptions& ro, FilePrefetchBuffer* prefetch_buffer,
787 InternalIterator* meta_iter, const SequenceNumber largest_seqno) {
788 bool found_properties_block = true;
789 Status s;
790 s = SeekToPropertiesBlock(meta_iter, &found_properties_block);
791
792 if (!s.ok()) {
793 ROCKS_LOG_WARN(rep_->ioptions.logger,
794 "Error when seeking to properties block from file: %s",
795 s.ToString().c_str());
796 } else if (found_properties_block) {
797 s = meta_iter->status();
798 TableProperties* table_properties = nullptr;
799 if (s.ok()) {
800 s = ReadProperties(
801 ro, meta_iter->value(), rep_->file.get(), prefetch_buffer,
802 rep_->footer, rep_->ioptions, &table_properties,
803 true /* verify_checksum */, nullptr /* ret_block_handle */,
804 nullptr /* ret_block_contents */,
805 false /* compression_type_missing */, nullptr /* memory_allocator */);
806 }
807 IGNORE_STATUS_IF_ERROR(s);
808
809 if (s.IsCorruption()) {
810 s = TryReadPropertiesWithGlobalSeqno(
811 ro, prefetch_buffer, meta_iter->value(), &table_properties);
812 IGNORE_STATUS_IF_ERROR(s);
813 }
814 std::unique_ptr<TableProperties> props_guard;
815 if (table_properties != nullptr) {
816 props_guard.reset(table_properties);
817 }
818
819 if (!s.ok()) {
820 ROCKS_LOG_WARN(rep_->ioptions.logger,
821 "Encountered error while reading data from properties "
822 "block %s",
823 s.ToString().c_str());
824 } else {
825 assert(table_properties != nullptr);
826 rep_->table_properties.reset(props_guard.release());
827 rep_->blocks_maybe_compressed =
828 rep_->table_properties->compression_name !=
829 CompressionTypeToString(kNoCompression);
830 rep_->blocks_definitely_zstd_compressed =
831 (rep_->table_properties->compression_name ==
832 CompressionTypeToString(kZSTD) ||
833 rep_->table_properties->compression_name ==
834 CompressionTypeToString(kZSTDNotFinalCompression));
835 }
836 } else {
837 ROCKS_LOG_ERROR(rep_->ioptions.logger,
838 "Cannot find Properties block from file.");
839 }
840 #ifndef ROCKSDB_LITE
841 if (rep_->table_properties) {
842 //**TODO: If/When the DBOptions has a registry in it, the ConfigOptions
843 // will need to use it
844 ConfigOptions config_options;
845 Status st = SliceTransform::CreateFromString(
846 config_options, rep_->table_properties->prefix_extractor_name,
847 &(rep_->table_prefix_extractor));
848 if (!st.ok()) {
849 //**TODO: Should this be error be returned or swallowed?
850 ROCKS_LOG_ERROR(rep_->ioptions.logger,
851 "Failed to create prefix extractor[%s]: %s",
852 rep_->table_properties->prefix_extractor_name.c_str(),
853 st.ToString().c_str());
854 }
855 }
856 #endif // ROCKSDB_LITE
857
858 // Read the table properties, if provided.
859 if (rep_->table_properties) {
860 rep_->whole_key_filtering &=
861 IsFeatureSupported(*(rep_->table_properties),
862 BlockBasedTablePropertyNames::kWholeKeyFiltering,
863 rep_->ioptions.logger);
864 rep_->prefix_filtering &= IsFeatureSupported(
865 *(rep_->table_properties),
866 BlockBasedTablePropertyNames::kPrefixFiltering, rep_->ioptions.logger);
867
868 rep_->index_key_includes_seq =
869 rep_->table_properties->index_key_is_user_key == 0;
870 rep_->index_value_is_full =
871 rep_->table_properties->index_value_is_delta_encoded == 0;
872
873 // Update index_type with the true type.
874 // If table properties don't contain index type, we assume that the table
875 // is in very old format and has kBinarySearch index type.
876 auto& props = rep_->table_properties->user_collected_properties;
877 auto pos = props.find(BlockBasedTablePropertyNames::kIndexType);
878 if (pos != props.end()) {
879 rep_->index_type = static_cast<BlockBasedTableOptions::IndexType>(
880 DecodeFixed32(pos->second.c_str()));
881 }
882
883 rep_->index_has_first_key =
884 rep_->index_type == BlockBasedTableOptions::kBinarySearchWithFirstKey;
885
886 s = GetGlobalSequenceNumber(*(rep_->table_properties), largest_seqno,
887 &(rep_->global_seqno));
888 if (!s.ok()) {
889 ROCKS_LOG_ERROR(rep_->ioptions.logger, "%s", s.ToString().c_str());
890 }
891 }
892 return s;
893 }
894
ReadRangeDelBlock(const ReadOptions & read_options,FilePrefetchBuffer * prefetch_buffer,InternalIterator * meta_iter,const InternalKeyComparator & internal_comparator,BlockCacheLookupContext * lookup_context)895 Status BlockBasedTable::ReadRangeDelBlock(
896 const ReadOptions& read_options, FilePrefetchBuffer* prefetch_buffer,
897 InternalIterator* meta_iter,
898 const InternalKeyComparator& internal_comparator,
899 BlockCacheLookupContext* lookup_context) {
900 Status s;
901 bool found_range_del_block;
902 BlockHandle range_del_handle;
903 s = SeekToRangeDelBlock(meta_iter, &found_range_del_block, &range_del_handle);
904 if (!s.ok()) {
905 ROCKS_LOG_WARN(
906 rep_->ioptions.logger,
907 "Error when seeking to range delete tombstones block from file: %s",
908 s.ToString().c_str());
909 } else if (found_range_del_block && !range_del_handle.IsNull()) {
910 std::unique_ptr<InternalIterator> iter(NewDataBlockIterator<DataBlockIter>(
911 read_options, range_del_handle,
912 /*input_iter=*/nullptr, BlockType::kRangeDeletion,
913 /*get_context=*/nullptr, lookup_context, Status(), prefetch_buffer));
914 assert(iter != nullptr);
915 s = iter->status();
916 if (!s.ok()) {
917 ROCKS_LOG_WARN(
918 rep_->ioptions.logger,
919 "Encountered error while reading data from range del block %s",
920 s.ToString().c_str());
921 IGNORE_STATUS_IF_ERROR(s);
922 } else {
923 rep_->fragmented_range_dels =
924 std::make_shared<FragmentedRangeTombstoneList>(std::move(iter),
925 internal_comparator);
926 }
927 }
928 return s;
929 }
930
PrefetchIndexAndFilterBlocks(const ReadOptions & ro,FilePrefetchBuffer * prefetch_buffer,InternalIterator * meta_iter,BlockBasedTable * new_table,bool prefetch_all,const BlockBasedTableOptions & table_options,const int level,size_t file_size,size_t max_file_size_for_l0_meta_pin,BlockCacheLookupContext * lookup_context)931 Status BlockBasedTable::PrefetchIndexAndFilterBlocks(
932 const ReadOptions& ro, FilePrefetchBuffer* prefetch_buffer,
933 InternalIterator* meta_iter, BlockBasedTable* new_table, bool prefetch_all,
934 const BlockBasedTableOptions& table_options, const int level,
935 size_t file_size, size_t max_file_size_for_l0_meta_pin,
936 BlockCacheLookupContext* lookup_context) {
937 Status s;
938
939 // Find filter handle and filter type
940 if (rep_->filter_policy) {
941 for (auto filter_type :
942 {Rep::FilterType::kFullFilter, Rep::FilterType::kPartitionedFilter,
943 Rep::FilterType::kBlockFilter}) {
944 std::string prefix;
945 switch (filter_type) {
946 case Rep::FilterType::kFullFilter:
947 prefix = kFullFilterBlockPrefix;
948 break;
949 case Rep::FilterType::kPartitionedFilter:
950 prefix = kPartitionedFilterBlockPrefix;
951 break;
952 case Rep::FilterType::kBlockFilter:
953 prefix = kFilterBlockPrefix;
954 break;
955 default:
956 assert(0);
957 }
958 std::string filter_block_key = prefix;
959 filter_block_key.append(rep_->filter_policy->Name());
960 if (FindMetaBlock(meta_iter, filter_block_key, &rep_->filter_handle)
961 .ok()) {
962 rep_->filter_type = filter_type;
963 break;
964 }
965 }
966 }
967 // Partition filters cannot be enabled without partition indexes
968 assert(rep_->filter_type != Rep::FilterType::kPartitionedFilter ||
969 rep_->index_type == BlockBasedTableOptions::kTwoLevelIndexSearch);
970
971 // Find compression dictionary handle
972 bool found_compression_dict = false;
973 s = SeekToCompressionDictBlock(meta_iter, &found_compression_dict,
974 &rep_->compression_dict_handle);
975 if (!s.ok()) {
976 return s;
977 }
978
979 BlockBasedTableOptions::IndexType index_type = rep_->index_type;
980
981 const bool use_cache = table_options.cache_index_and_filter_blocks;
982
983 const bool maybe_flushed =
984 level == 0 && file_size <= max_file_size_for_l0_meta_pin;
985 std::function<bool(PinningTier, PinningTier)> is_pinned =
986 [maybe_flushed, &is_pinned](PinningTier pinning_tier,
987 PinningTier fallback_pinning_tier) {
988 // Fallback to fallback would lead to infinite recursion. Disallow it.
989 assert(fallback_pinning_tier != PinningTier::kFallback);
990
991 switch (pinning_tier) {
992 case PinningTier::kFallback:
993 return is_pinned(fallback_pinning_tier,
994 PinningTier::kNone /* fallback_pinning_tier */);
995 case PinningTier::kNone:
996 return false;
997 case PinningTier::kFlushedAndSimilar:
998 return maybe_flushed;
999 case PinningTier::kAll:
1000 return true;
1001 };
1002
1003 // In GCC, this is needed to suppress `control reaches end of non-void
1004 // function [-Werror=return-type]`.
1005 assert(false);
1006 return false;
1007 };
1008 const bool pin_top_level_index = is_pinned(
1009 table_options.metadata_cache_options.top_level_index_pinning,
1010 table_options.pin_top_level_index_and_filter ? PinningTier::kAll
1011 : PinningTier::kNone);
1012 const bool pin_partition =
1013 is_pinned(table_options.metadata_cache_options.partition_pinning,
1014 table_options.pin_l0_filter_and_index_blocks_in_cache
1015 ? PinningTier::kFlushedAndSimilar
1016 : PinningTier::kNone);
1017 const bool pin_unpartitioned =
1018 is_pinned(table_options.metadata_cache_options.unpartitioned_pinning,
1019 table_options.pin_l0_filter_and_index_blocks_in_cache
1020 ? PinningTier::kFlushedAndSimilar
1021 : PinningTier::kNone);
1022
1023 // pin the first level of index
1024 const bool pin_index =
1025 index_type == BlockBasedTableOptions::kTwoLevelIndexSearch
1026 ? pin_top_level_index
1027 : pin_unpartitioned;
1028 // prefetch the first level of index
1029 // WART: this might be redundant (unnecessary cache hit) if !pin_index,
1030 // depending on prepopulate_block_cache option
1031 const bool prefetch_index = prefetch_all || pin_index;
1032
1033 std::unique_ptr<IndexReader> index_reader;
1034 s = new_table->CreateIndexReader(ro, prefetch_buffer, meta_iter, use_cache,
1035 prefetch_index, pin_index, lookup_context,
1036 &index_reader);
1037 if (!s.ok()) {
1038 return s;
1039 }
1040
1041 rep_->index_reader = std::move(index_reader);
1042
1043 // The partitions of partitioned index are always stored in cache. They
1044 // are hence follow the configuration for pin and prefetch regardless of
1045 // the value of cache_index_and_filter_blocks
1046 if (prefetch_all || pin_partition) {
1047 s = rep_->index_reader->CacheDependencies(ro, pin_partition);
1048 }
1049 if (!s.ok()) {
1050 return s;
1051 }
1052
1053 // pin the first level of filter
1054 const bool pin_filter =
1055 rep_->filter_type == Rep::FilterType::kPartitionedFilter
1056 ? pin_top_level_index
1057 : pin_unpartitioned;
1058 // prefetch the first level of filter
1059 // WART: this might be redundant (unnecessary cache hit) if !pin_filter,
1060 // depending on prepopulate_block_cache option
1061 const bool prefetch_filter = prefetch_all || pin_filter;
1062
1063 if (rep_->filter_policy) {
1064 auto filter = new_table->CreateFilterBlockReader(
1065 ro, prefetch_buffer, use_cache, prefetch_filter, pin_filter,
1066 lookup_context);
1067
1068 if (filter) {
1069 // Refer to the comment above about paritioned indexes always being cached
1070 if (prefetch_all || pin_partition) {
1071 s = filter->CacheDependencies(ro, pin_partition);
1072 if (!s.ok()) {
1073 return s;
1074 }
1075 }
1076 rep_->filter = std::move(filter);
1077 }
1078 }
1079
1080 if (!rep_->compression_dict_handle.IsNull()) {
1081 std::unique_ptr<UncompressionDictReader> uncompression_dict_reader;
1082 s = UncompressionDictReader::Create(
1083 this, ro, prefetch_buffer, use_cache, prefetch_all || pin_unpartitioned,
1084 pin_unpartitioned, lookup_context, &uncompression_dict_reader);
1085 if (!s.ok()) {
1086 return s;
1087 }
1088
1089 rep_->uncompression_dict_reader = std::move(uncompression_dict_reader);
1090 }
1091
1092 assert(s.ok());
1093 return s;
1094 }
1095
SetupForCompaction()1096 void BlockBasedTable::SetupForCompaction() {
1097 switch (rep_->ioptions.access_hint_on_compaction_start) {
1098 case Options::NONE:
1099 break;
1100 case Options::NORMAL:
1101 rep_->file->file()->Hint(FSRandomAccessFile::kNormal);
1102 break;
1103 case Options::SEQUENTIAL:
1104 rep_->file->file()->Hint(FSRandomAccessFile::kSequential);
1105 break;
1106 case Options::WILLNEED:
1107 rep_->file->file()->Hint(FSRandomAccessFile::kWillNeed);
1108 break;
1109 default:
1110 assert(false);
1111 }
1112 }
1113
GetTableProperties() const1114 std::shared_ptr<const TableProperties> BlockBasedTable::GetTableProperties()
1115 const {
1116 return rep_->table_properties;
1117 }
1118
ApproximateMemoryUsage() const1119 size_t BlockBasedTable::ApproximateMemoryUsage() const {
1120 size_t usage = 0;
1121 if (rep_->filter) {
1122 usage += rep_->filter->ApproximateMemoryUsage();
1123 }
1124 if (rep_->index_reader) {
1125 usage += rep_->index_reader->ApproximateMemoryUsage();
1126 }
1127 if (rep_->uncompression_dict_reader) {
1128 usage += rep_->uncompression_dict_reader->ApproximateMemoryUsage();
1129 }
1130 return usage;
1131 }
1132
1133 // Load the meta-index-block from the file. On success, return the loaded
1134 // metaindex
1135 // block and its iterator.
ReadMetaIndexBlock(const ReadOptions & ro,FilePrefetchBuffer * prefetch_buffer,std::unique_ptr<Block> * metaindex_block,std::unique_ptr<InternalIterator> * iter)1136 Status BlockBasedTable::ReadMetaIndexBlock(
1137 const ReadOptions& ro, FilePrefetchBuffer* prefetch_buffer,
1138 std::unique_ptr<Block>* metaindex_block,
1139 std::unique_ptr<InternalIterator>* iter) {
1140 // TODO(sanjay): Skip this if footer.metaindex_handle() size indicates
1141 // it is an empty block.
1142 std::unique_ptr<Block> metaindex;
1143 Status s = ReadBlockFromFile(
1144 rep_->file.get(), prefetch_buffer, rep_->footer, ro,
1145 rep_->footer.metaindex_handle(), &metaindex, rep_->ioptions,
1146 true /* decompress */, true /*maybe_compressed*/, BlockType::kMetaIndex,
1147 UncompressionDict::GetEmptyDict(), rep_->persistent_cache_options,
1148 0 /* read_amp_bytes_per_bit */, GetMemoryAllocator(rep_->table_options),
1149 false /* for_compaction */, rep_->blocks_definitely_zstd_compressed,
1150 nullptr /* filter_policy */);
1151
1152 if (!s.ok()) {
1153 ROCKS_LOG_ERROR(rep_->ioptions.logger,
1154 "Encountered error while reading data from properties"
1155 " block %s",
1156 s.ToString().c_str());
1157 return s;
1158 }
1159
1160 *metaindex_block = std::move(metaindex);
1161 // meta block uses bytewise comparator.
1162 iter->reset(metaindex_block->get()->NewDataIterator(
1163 BytewiseComparator(), kDisableGlobalSequenceNumber));
1164 return Status::OK();
1165 }
1166
1167 template <typename TBlocklike>
GetDataBlockFromCache(const Slice & block_cache_key,const Slice & compressed_block_cache_key,Cache * block_cache,Cache * block_cache_compressed,const ReadOptions & read_options,CachableEntry<TBlocklike> * block,const UncompressionDict & uncompression_dict,BlockType block_type,const bool wait,GetContext * get_context) const1168 Status BlockBasedTable::GetDataBlockFromCache(
1169 const Slice& block_cache_key, const Slice& compressed_block_cache_key,
1170 Cache* block_cache, Cache* block_cache_compressed,
1171 const ReadOptions& read_options, CachableEntry<TBlocklike>* block,
1172 const UncompressionDict& uncompression_dict, BlockType block_type,
1173 const bool wait, GetContext* get_context) const {
1174 const size_t read_amp_bytes_per_bit =
1175 block_type == BlockType::kData
1176 ? rep_->table_options.read_amp_bytes_per_bit
1177 : 0;
1178 assert(block);
1179 assert(block->IsEmpty());
1180 const Cache::Priority priority =
1181 rep_->table_options.cache_index_and_filter_blocks_with_high_priority &&
1182 (block_type == BlockType::kFilter ||
1183 block_type == BlockType::kCompressionDictionary ||
1184 block_type == BlockType::kIndex)
1185 ? Cache::Priority::HIGH
1186 : Cache::Priority::LOW;
1187
1188 Status s;
1189 BlockContents* compressed_block = nullptr;
1190 Cache::Handle* block_cache_compressed_handle = nullptr;
1191 Statistics* statistics = rep_->ioptions.statistics.get();
1192 bool using_zstd = rep_->blocks_definitely_zstd_compressed;
1193 const FilterPolicy* filter_policy = rep_->filter_policy;
1194 Cache::CreateCallback create_cb = GetCreateCallback<TBlocklike>(
1195 read_amp_bytes_per_bit, statistics, using_zstd, filter_policy);
1196
1197 // Lookup uncompressed cache first
1198 if (block_cache != nullptr) {
1199 Cache::Handle* cache_handle = nullptr;
1200 cache_handle = GetEntryFromCache(
1201 rep_->ioptions.lowest_used_cache_tier, block_cache, block_cache_key,
1202 block_type, wait, get_context,
1203 BlocklikeTraits<TBlocklike>::GetCacheItemHelper(block_type), create_cb,
1204 priority);
1205 if (cache_handle != nullptr) {
1206 block->SetCachedValue(
1207 reinterpret_cast<TBlocklike*>(block_cache->Value(cache_handle)),
1208 block_cache, cache_handle);
1209 return s;
1210 }
1211 }
1212
1213 // If not found, search from the compressed block cache.
1214 assert(block->IsEmpty());
1215
1216 if (block_cache_compressed == nullptr) {
1217 return s;
1218 }
1219
1220 assert(!compressed_block_cache_key.empty());
1221 BlockContents contents;
1222 if (rep_->ioptions.lowest_used_cache_tier ==
1223 CacheTier::kNonVolatileBlockTier) {
1224 Cache::CreateCallback create_cb_special = GetCreateCallback<BlockContents>(
1225 read_amp_bytes_per_bit, statistics, using_zstd, filter_policy);
1226 block_cache_compressed_handle = block_cache_compressed->Lookup(
1227 compressed_block_cache_key,
1228 BlocklikeTraits<BlockContents>::GetCacheItemHelper(block_type),
1229 create_cb_special, priority, true);
1230 } else {
1231 block_cache_compressed_handle =
1232 block_cache_compressed->Lookup(compressed_block_cache_key, statistics);
1233 }
1234
1235 // if we found in the compressed cache, then uncompress and insert into
1236 // uncompressed cache
1237 if (block_cache_compressed_handle == nullptr) {
1238 RecordTick(statistics, BLOCK_CACHE_COMPRESSED_MISS);
1239 return s;
1240 }
1241
1242 // found compressed block
1243 RecordTick(statistics, BLOCK_CACHE_COMPRESSED_HIT);
1244 compressed_block = reinterpret_cast<BlockContents*>(
1245 block_cache_compressed->Value(block_cache_compressed_handle));
1246 CompressionType compression_type = compressed_block->get_compression_type();
1247 assert(compression_type != kNoCompression);
1248
1249 // Retrieve the uncompressed contents into a new buffer
1250 UncompressionContext context(compression_type);
1251 UncompressionInfo info(context, uncompression_dict, compression_type);
1252 s = UncompressBlockContents(
1253 info, compressed_block->data.data(), compressed_block->data.size(),
1254 &contents, rep_->table_options.format_version, rep_->ioptions,
1255 GetMemoryAllocator(rep_->table_options));
1256
1257 // Insert uncompressed block into block cache, the priority is based on the
1258 // data block type.
1259 if (s.ok()) {
1260 std::unique_ptr<TBlocklike> block_holder(
1261 BlocklikeTraits<TBlocklike>::Create(
1262 std::move(contents), read_amp_bytes_per_bit, statistics,
1263 rep_->blocks_definitely_zstd_compressed,
1264 rep_->table_options.filter_policy.get())); // uncompressed block
1265
1266 if (block_cache != nullptr && block_holder->own_bytes() &&
1267 read_options.fill_cache) {
1268 size_t charge = block_holder->ApproximateMemoryUsage();
1269 Cache::Handle* cache_handle = nullptr;
1270 s = InsertEntryToCache(
1271 rep_->ioptions.lowest_used_cache_tier, block_cache, block_cache_key,
1272 BlocklikeTraits<TBlocklike>::GetCacheItemHelper(block_type),
1273 block_holder, charge, &cache_handle, priority);
1274 if (s.ok()) {
1275 assert(cache_handle != nullptr);
1276 block->SetCachedValue(block_holder.release(), block_cache,
1277 cache_handle);
1278
1279 UpdateCacheInsertionMetrics(block_type, get_context, charge,
1280 s.IsOkOverwritten(), rep_->ioptions.stats);
1281 } else {
1282 RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES);
1283 }
1284 } else {
1285 block->SetOwnedValue(block_holder.release());
1286 }
1287 }
1288
1289 // Release hold on compressed cache entry
1290 block_cache_compressed->Release(block_cache_compressed_handle);
1291 return s;
1292 }
1293
1294 template <typename TBlocklike>
PutDataBlockToCache(const Slice & block_cache_key,const Slice & compressed_block_cache_key,Cache * block_cache,Cache * block_cache_compressed,CachableEntry<TBlocklike> * cached_block,BlockContents * raw_block_contents,CompressionType raw_block_comp_type,const UncompressionDict & uncompression_dict,MemoryAllocator * memory_allocator,BlockType block_type,GetContext * get_context) const1295 Status BlockBasedTable::PutDataBlockToCache(
1296 const Slice& block_cache_key, const Slice& compressed_block_cache_key,
1297 Cache* block_cache, Cache* block_cache_compressed,
1298 CachableEntry<TBlocklike>* cached_block, BlockContents* raw_block_contents,
1299 CompressionType raw_block_comp_type,
1300 const UncompressionDict& uncompression_dict,
1301 MemoryAllocator* memory_allocator, BlockType block_type,
1302 GetContext* get_context) const {
1303 const ImmutableOptions& ioptions = rep_->ioptions;
1304 const uint32_t format_version = rep_->table_options.format_version;
1305 const size_t read_amp_bytes_per_bit =
1306 block_type == BlockType::kData
1307 ? rep_->table_options.read_amp_bytes_per_bit
1308 : 0;
1309 const Cache::Priority priority =
1310 rep_->table_options.cache_index_and_filter_blocks_with_high_priority &&
1311 (block_type == BlockType::kFilter ||
1312 block_type == BlockType::kCompressionDictionary ||
1313 block_type == BlockType::kIndex)
1314 ? Cache::Priority::HIGH
1315 : Cache::Priority::LOW;
1316 assert(cached_block);
1317 assert(cached_block->IsEmpty());
1318
1319 Status s;
1320 Statistics* statistics = ioptions.stats;
1321
1322 std::unique_ptr<TBlocklike> block_holder;
1323 if (raw_block_comp_type != kNoCompression) {
1324 // Retrieve the uncompressed contents into a new buffer
1325 BlockContents uncompressed_block_contents;
1326 UncompressionContext context(raw_block_comp_type);
1327 UncompressionInfo info(context, uncompression_dict, raw_block_comp_type);
1328 s = UncompressBlockContents(info, raw_block_contents->data.data(),
1329 raw_block_contents->data.size(),
1330 &uncompressed_block_contents, format_version,
1331 ioptions, memory_allocator);
1332 if (!s.ok()) {
1333 return s;
1334 }
1335
1336 block_holder.reset(BlocklikeTraits<TBlocklike>::Create(
1337 std::move(uncompressed_block_contents), read_amp_bytes_per_bit,
1338 statistics, rep_->blocks_definitely_zstd_compressed,
1339 rep_->table_options.filter_policy.get()));
1340 } else {
1341 block_holder.reset(BlocklikeTraits<TBlocklike>::Create(
1342 std::move(*raw_block_contents), read_amp_bytes_per_bit, statistics,
1343 rep_->blocks_definitely_zstd_compressed,
1344 rep_->table_options.filter_policy.get()));
1345 }
1346
1347 // Insert compressed block into compressed block cache.
1348 // Release the hold on the compressed cache entry immediately.
1349 if (block_cache_compressed != nullptr &&
1350 raw_block_comp_type != kNoCompression && raw_block_contents != nullptr &&
1351 raw_block_contents->own_bytes()) {
1352 #ifndef NDEBUG
1353 assert(raw_block_contents->is_raw_block);
1354 #endif // NDEBUG
1355
1356 // We cannot directly put raw_block_contents because this could point to
1357 // an object in the stack.
1358 std::unique_ptr<BlockContents> block_cont_for_comp_cache(
1359 new BlockContents(std::move(*raw_block_contents)));
1360 s = InsertEntryToCache(
1361 rep_->ioptions.lowest_used_cache_tier, block_cache_compressed,
1362 compressed_block_cache_key,
1363 BlocklikeTraits<BlockContents>::GetCacheItemHelper(block_type),
1364 block_cont_for_comp_cache,
1365 block_cont_for_comp_cache->ApproximateMemoryUsage(), nullptr,
1366 Cache::Priority::LOW);
1367
1368 BlockContents* block_cont_raw_ptr = block_cont_for_comp_cache.release();
1369 if (s.ok()) {
1370 // Avoid the following code to delete this cached block.
1371 RecordTick(statistics, BLOCK_CACHE_COMPRESSED_ADD);
1372 } else {
1373 RecordTick(statistics, BLOCK_CACHE_COMPRESSED_ADD_FAILURES);
1374 delete block_cont_raw_ptr;
1375 }
1376 }
1377
1378 // insert into uncompressed block cache
1379 if (block_cache != nullptr && block_holder->own_bytes()) {
1380 size_t charge = block_holder->ApproximateMemoryUsage();
1381 Cache::Handle* cache_handle = nullptr;
1382 s = InsertEntryToCache(
1383 rep_->ioptions.lowest_used_cache_tier, block_cache, block_cache_key,
1384 BlocklikeTraits<TBlocklike>::GetCacheItemHelper(block_type),
1385 block_holder, charge, &cache_handle, priority);
1386 if (s.ok()) {
1387 assert(cache_handle != nullptr);
1388 cached_block->SetCachedValue(block_holder.release(), block_cache,
1389 cache_handle);
1390
1391 UpdateCacheInsertionMetrics(block_type, get_context, charge,
1392 s.IsOkOverwritten(), rep_->ioptions.stats);
1393 } else {
1394 RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES);
1395 }
1396 } else {
1397 cached_block->SetOwnedValue(block_holder.release());
1398 }
1399
1400 return s;
1401 }
1402
CreateFilterBlockReader(const ReadOptions & ro,FilePrefetchBuffer * prefetch_buffer,bool use_cache,bool prefetch,bool pin,BlockCacheLookupContext * lookup_context)1403 std::unique_ptr<FilterBlockReader> BlockBasedTable::CreateFilterBlockReader(
1404 const ReadOptions& ro, FilePrefetchBuffer* prefetch_buffer, bool use_cache,
1405 bool prefetch, bool pin, BlockCacheLookupContext* lookup_context) {
1406 auto& rep = rep_;
1407 auto filter_type = rep->filter_type;
1408 if (filter_type == Rep::FilterType::kNoFilter) {
1409 return std::unique_ptr<FilterBlockReader>();
1410 }
1411
1412 assert(rep->filter_policy);
1413
1414 switch (filter_type) {
1415 case Rep::FilterType::kPartitionedFilter:
1416 return PartitionedFilterBlockReader::Create(
1417 this, ro, prefetch_buffer, use_cache, prefetch, pin, lookup_context);
1418
1419 case Rep::FilterType::kBlockFilter:
1420 return BlockBasedFilterBlockReader::Create(
1421 this, ro, prefetch_buffer, use_cache, prefetch, pin, lookup_context);
1422
1423 case Rep::FilterType::kFullFilter:
1424 return FullFilterBlockReader::Create(this, ro, prefetch_buffer, use_cache,
1425 prefetch, pin, lookup_context);
1426
1427 default:
1428 // filter_type is either kNoFilter (exited the function at the first if),
1429 // or it must be covered in this switch block
1430 assert(false);
1431 return std::unique_ptr<FilterBlockReader>();
1432 }
1433 }
1434
1435 // disable_prefix_seek should be set to true when prefix_extractor found in SST
1436 // differs from the one in mutable_cf_options and index type is HashBasedIndex
NewIndexIterator(const ReadOptions & read_options,bool disable_prefix_seek,IndexBlockIter * input_iter,GetContext * get_context,BlockCacheLookupContext * lookup_context) const1437 InternalIteratorBase<IndexValue>* BlockBasedTable::NewIndexIterator(
1438 const ReadOptions& read_options, bool disable_prefix_seek,
1439 IndexBlockIter* input_iter, GetContext* get_context,
1440 BlockCacheLookupContext* lookup_context) const {
1441 assert(rep_ != nullptr);
1442 assert(rep_->index_reader != nullptr);
1443
1444 // We don't return pinned data from index blocks, so no need
1445 // to set `block_contents_pinned`.
1446 return rep_->index_reader->NewIterator(read_options, disable_prefix_seek,
1447 input_iter, get_context,
1448 lookup_context);
1449 }
1450
1451 template <>
InitBlockIterator(const Rep * rep,Block * block,BlockType block_type,DataBlockIter * input_iter,bool block_contents_pinned)1452 DataBlockIter* BlockBasedTable::InitBlockIterator<DataBlockIter>(
1453 const Rep* rep, Block* block, BlockType block_type,
1454 DataBlockIter* input_iter, bool block_contents_pinned) {
1455 return block->NewDataIterator(rep->internal_comparator.user_comparator(),
1456 rep->get_global_seqno(block_type), input_iter,
1457 rep->ioptions.stats, block_contents_pinned);
1458 }
1459
1460 template <>
InitBlockIterator(const Rep * rep,Block * block,BlockType block_type,IndexBlockIter * input_iter,bool block_contents_pinned)1461 IndexBlockIter* BlockBasedTable::InitBlockIterator<IndexBlockIter>(
1462 const Rep* rep, Block* block, BlockType block_type,
1463 IndexBlockIter* input_iter, bool block_contents_pinned) {
1464 return block->NewIndexIterator(
1465 rep->internal_comparator.user_comparator(),
1466 rep->get_global_seqno(block_type), input_iter, rep->ioptions.stats,
1467 /* total_order_seek */ true, rep->index_has_first_key,
1468 rep->index_key_includes_seq, rep->index_value_is_full,
1469 block_contents_pinned);
1470 }
1471
1472 // If contents is nullptr, this function looks up the block caches for the
1473 // data block referenced by handle, and read the block from disk if necessary.
1474 // If contents is non-null, it skips the cache lookup and disk read, since
1475 // the caller has already read it. In both cases, if ro.fill_cache is true,
1476 // it inserts the block into the block cache.
1477 template <typename TBlocklike>
MaybeReadBlockAndLoadToCache(FilePrefetchBuffer * prefetch_buffer,const ReadOptions & ro,const BlockHandle & handle,const UncompressionDict & uncompression_dict,const bool wait,CachableEntry<TBlocklike> * block_entry,BlockType block_type,GetContext * get_context,BlockCacheLookupContext * lookup_context,BlockContents * contents) const1478 Status BlockBasedTable::MaybeReadBlockAndLoadToCache(
1479 FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro,
1480 const BlockHandle& handle, const UncompressionDict& uncompression_dict,
1481 const bool wait, CachableEntry<TBlocklike>* block_entry,
1482 BlockType block_type, GetContext* get_context,
1483 BlockCacheLookupContext* lookup_context, BlockContents* contents) const {
1484 assert(block_entry != nullptr);
1485 const bool no_io = (ro.read_tier == kBlockCacheTier);
1486 Cache* block_cache = rep_->table_options.block_cache.get();
1487 Cache* block_cache_compressed =
1488 rep_->table_options.block_cache_compressed.get();
1489
1490 // First, try to get the block from the cache
1491 //
1492 // If either block cache is enabled, we'll try to read from it.
1493 Status s;
1494 char cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length];
1495 char compressed_cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length];
1496 Slice key /* key to the block cache */;
1497 Slice ckey /* key to the compressed block cache */;
1498 bool is_cache_hit = false;
1499 if (block_cache != nullptr || block_cache_compressed != nullptr) {
1500 // create key for block cache
1501 if (block_cache != nullptr) {
1502 key = GetCacheKey(rep_->cache_key_prefix, rep_->cache_key_prefix_size,
1503 handle, cache_key);
1504 }
1505
1506 if (block_cache_compressed != nullptr) {
1507 ckey = GetCacheKey(rep_->compressed_cache_key_prefix,
1508 rep_->compressed_cache_key_prefix_size, handle,
1509 compressed_cache_key);
1510 }
1511
1512 if (!contents) {
1513 s = GetDataBlockFromCache(key, ckey, block_cache, block_cache_compressed,
1514 ro, block_entry, uncompression_dict, block_type,
1515 wait, get_context);
1516 // Value could still be null at this point, so check the cache handle
1517 // and update the read pattern for prefetching
1518 if (block_entry->GetValue() || block_entry->GetCacheHandle()) {
1519 // TODO(haoyu): Differentiate cache hit on uncompressed block cache and
1520 // compressed block cache.
1521 is_cache_hit = true;
1522 if (prefetch_buffer) {
1523 // Update the block details so that PrefetchBuffer can use the read
1524 // pattern to determine if reads are sequential or not for
1525 // prefetching. It should also take in account blocks read from cache.
1526 prefetch_buffer->UpdateReadPattern(handle.offset(),
1527 block_size(handle));
1528 }
1529 }
1530 }
1531
1532 // Can't find the block from the cache. If I/O is allowed, read from the
1533 // file.
1534 if (block_entry->GetValue() == nullptr &&
1535 block_entry->GetCacheHandle() == nullptr && !no_io && ro.fill_cache) {
1536 Statistics* statistics = rep_->ioptions.stats;
1537 const bool maybe_compressed =
1538 block_type != BlockType::kFilter &&
1539 block_type != BlockType::kCompressionDictionary &&
1540 rep_->blocks_maybe_compressed;
1541 const bool do_uncompress = maybe_compressed && !block_cache_compressed;
1542 CompressionType raw_block_comp_type;
1543 BlockContents raw_block_contents;
1544 if (!contents) {
1545 StopWatch sw(rep_->ioptions.clock, statistics, READ_BLOCK_GET_MICROS);
1546 BlockFetcher block_fetcher(
1547 rep_->file.get(), prefetch_buffer, rep_->footer, ro, handle,
1548 &raw_block_contents, rep_->ioptions, do_uncompress,
1549 maybe_compressed, block_type, uncompression_dict,
1550 rep_->persistent_cache_options,
1551 GetMemoryAllocator(rep_->table_options),
1552 GetMemoryAllocatorForCompressedBlock(rep_->table_options));
1553 s = block_fetcher.ReadBlockContents();
1554 raw_block_comp_type = block_fetcher.get_compression_type();
1555 contents = &raw_block_contents;
1556 if (get_context) {
1557 switch (block_type) {
1558 case BlockType::kIndex:
1559 ++get_context->get_context_stats_.num_index_read;
1560 break;
1561 case BlockType::kFilter:
1562 ++get_context->get_context_stats_.num_filter_read;
1563 break;
1564 case BlockType::kData:
1565 ++get_context->get_context_stats_.num_data_read;
1566 break;
1567 default:
1568 break;
1569 }
1570 }
1571 } else {
1572 raw_block_comp_type = contents->get_compression_type();
1573 }
1574
1575 if (s.ok()) {
1576 // If filling cache is allowed and a cache is configured, try to put the
1577 // block to the cache.
1578 s = PutDataBlockToCache(
1579 key, ckey, block_cache, block_cache_compressed, block_entry,
1580 contents, raw_block_comp_type, uncompression_dict,
1581 GetMemoryAllocator(rep_->table_options), block_type, get_context);
1582 }
1583 }
1584 }
1585
1586 // Fill lookup_context.
1587 if (block_cache_tracer_ && block_cache_tracer_->is_tracing_enabled() &&
1588 lookup_context) {
1589 size_t usage = 0;
1590 uint64_t nkeys = 0;
1591 if (block_entry->GetValue()) {
1592 // Approximate the number of keys in the block using restarts.
1593 nkeys =
1594 rep_->table_options.block_restart_interval *
1595 BlocklikeTraits<TBlocklike>::GetNumRestarts(*block_entry->GetValue());
1596 usage = block_entry->GetValue()->ApproximateMemoryUsage();
1597 }
1598 TraceType trace_block_type = TraceType::kTraceMax;
1599 switch (block_type) {
1600 case BlockType::kData:
1601 trace_block_type = TraceType::kBlockTraceDataBlock;
1602 break;
1603 case BlockType::kFilter:
1604 trace_block_type = TraceType::kBlockTraceFilterBlock;
1605 break;
1606 case BlockType::kCompressionDictionary:
1607 trace_block_type = TraceType::kBlockTraceUncompressionDictBlock;
1608 break;
1609 case BlockType::kRangeDeletion:
1610 trace_block_type = TraceType::kBlockTraceRangeDeletionBlock;
1611 break;
1612 case BlockType::kIndex:
1613 trace_block_type = TraceType::kBlockTraceIndexBlock;
1614 break;
1615 default:
1616 // This cannot happen.
1617 assert(false);
1618 break;
1619 }
1620 bool no_insert = no_io || !ro.fill_cache;
1621 if (BlockCacheTraceHelper::IsGetOrMultiGetOnDataBlock(
1622 trace_block_type, lookup_context->caller)) {
1623 // Defer logging the access to Get() and MultiGet() to trace additional
1624 // information, e.g., referenced_key_exist_in_block.
1625
1626 // Make a copy of the block key here since it will be logged later.
1627 lookup_context->FillLookupContext(
1628 is_cache_hit, no_insert, trace_block_type,
1629 /*block_size=*/usage, /*block_key=*/key.ToString(), nkeys);
1630 } else {
1631 // Avoid making copy of block_key and cf_name when constructing the access
1632 // record.
1633 BlockCacheTraceRecord access_record(
1634 rep_->ioptions.clock->NowMicros(),
1635 /*block_key=*/"", trace_block_type,
1636 /*block_size=*/usage, rep_->cf_id_for_tracing(),
1637 /*cf_name=*/"", rep_->level_for_tracing(),
1638 rep_->sst_number_for_tracing(), lookup_context->caller, is_cache_hit,
1639 no_insert, lookup_context->get_id,
1640 lookup_context->get_from_user_specified_snapshot,
1641 /*referenced_key=*/"");
1642 // TODO: Should handle this error?
1643 block_cache_tracer_
1644 ->WriteBlockAccess(access_record, key, rep_->cf_name_for_tracing(),
1645 lookup_context->referenced_key)
1646 .PermitUncheckedError();
1647 }
1648 }
1649
1650 assert(s.ok() || block_entry->GetValue() == nullptr);
1651 return s;
1652 }
1653
1654 // This function reads multiple data blocks from disk using Env::MultiRead()
1655 // and optionally inserts them into the block cache. It uses the scratch
1656 // buffer provided by the caller, which is contiguous. If scratch is a nullptr
1657 // it allocates a separate buffer for each block. Typically, if the blocks
1658 // need to be uncompressed and there is no compressed block cache, callers
1659 // can allocate a temporary scratch buffer in order to minimize memory
1660 // allocations.
1661 // If options.fill_cache is true, it inserts the blocks into cache. If its
1662 // false and scratch is non-null and the blocks are uncompressed, it copies
1663 // the buffers to heap. In any case, the CachableEntry<Block> returned will
1664 // own the data bytes.
1665 // If compression is enabled and also there is no compressed block cache,
1666 // the adjacent blocks are read out in one IO (combined read)
1667 // batch - A MultiGetRange with only those keys with unique data blocks not
1668 // found in cache
1669 // handles - A vector of block handles. Some of them me be NULL handles
1670 // scratch - An optional contiguous buffer to read compressed blocks into
RetrieveMultipleBlocks(const ReadOptions & options,const MultiGetRange * batch,const autovector<BlockHandle,MultiGetContext::MAX_BATCH_SIZE> * handles,autovector<Status,MultiGetContext::MAX_BATCH_SIZE> * statuses,autovector<CachableEntry<Block>,MultiGetContext::MAX_BATCH_SIZE> * results,char * scratch,const UncompressionDict & uncompression_dict) const1671 void BlockBasedTable::RetrieveMultipleBlocks(
1672 const ReadOptions& options, const MultiGetRange* batch,
1673 const autovector<BlockHandle, MultiGetContext::MAX_BATCH_SIZE>* handles,
1674 autovector<Status, MultiGetContext::MAX_BATCH_SIZE>* statuses,
1675 autovector<CachableEntry<Block>, MultiGetContext::MAX_BATCH_SIZE>* results,
1676 char* scratch, const UncompressionDict& uncompression_dict) const {
1677 RandomAccessFileReader* file = rep_->file.get();
1678 const Footer& footer = rep_->footer;
1679 const ImmutableOptions& ioptions = rep_->ioptions;
1680 size_t read_amp_bytes_per_bit = rep_->table_options.read_amp_bytes_per_bit;
1681 MemoryAllocator* memory_allocator = GetMemoryAllocator(rep_->table_options);
1682
1683 if (ioptions.allow_mmap_reads) {
1684 size_t idx_in_batch = 0;
1685 for (auto mget_iter = batch->begin(); mget_iter != batch->end();
1686 ++mget_iter, ++idx_in_batch) {
1687 BlockCacheLookupContext lookup_data_block_context(
1688 TableReaderCaller::kUserMultiGet);
1689 const BlockHandle& handle = (*handles)[idx_in_batch];
1690 if (handle.IsNull()) {
1691 continue;
1692 }
1693
1694 (*statuses)[idx_in_batch] =
1695 RetrieveBlock(nullptr, options, handle, uncompression_dict,
1696 &(*results)[idx_in_batch], BlockType::kData,
1697 mget_iter->get_context, &lookup_data_block_context,
1698 /* for_compaction */ false, /* use_cache */ true,
1699 /* wait_for_cache */ true);
1700 }
1701 return;
1702 }
1703
1704 // In direct IO mode, blocks share the direct io buffer.
1705 // Otherwise, blocks share the scratch buffer.
1706 const bool use_shared_buffer = file->use_direct_io() || scratch != nullptr;
1707
1708 autovector<FSReadRequest, MultiGetContext::MAX_BATCH_SIZE> read_reqs;
1709 size_t buf_offset = 0;
1710 size_t idx_in_batch = 0;
1711
1712 uint64_t prev_offset = 0;
1713 size_t prev_len = 0;
1714 autovector<size_t, MultiGetContext::MAX_BATCH_SIZE> req_idx_for_block;
1715 autovector<size_t, MultiGetContext::MAX_BATCH_SIZE> req_offset_for_block;
1716 for (auto mget_iter = batch->begin(); mget_iter != batch->end();
1717 ++mget_iter, ++idx_in_batch) {
1718 const BlockHandle& handle = (*handles)[idx_in_batch];
1719 if (handle.IsNull()) {
1720 continue;
1721 }
1722
1723 size_t prev_end = static_cast<size_t>(prev_offset) + prev_len;
1724
1725 // If current block is adjacent to the previous one, at the same time,
1726 // compression is enabled and there is no compressed cache, we combine
1727 // the two block read as one.
1728 // We don't combine block reads here in direct IO mode, because when doing
1729 // direct IO read, the block requests will be realigned and merged when
1730 // necessary.
1731 if (use_shared_buffer && !file->use_direct_io() &&
1732 prev_end == handle.offset()) {
1733 req_offset_for_block.emplace_back(prev_len);
1734 prev_len += block_size(handle);
1735 } else {
1736 // No compression or current block and previous one is not adjacent:
1737 // Step 1, create a new request for previous blocks
1738 if (prev_len != 0) {
1739 FSReadRequest req;
1740 req.offset = prev_offset;
1741 req.len = prev_len;
1742 if (file->use_direct_io()) {
1743 req.scratch = nullptr;
1744 } else if (use_shared_buffer) {
1745 req.scratch = scratch + buf_offset;
1746 buf_offset += req.len;
1747 } else {
1748 req.scratch = new char[req.len];
1749 }
1750 read_reqs.emplace_back(req);
1751 }
1752
1753 // Step 2, remeber the previous block info
1754 prev_offset = handle.offset();
1755 prev_len = block_size(handle);
1756 req_offset_for_block.emplace_back(0);
1757 }
1758 req_idx_for_block.emplace_back(read_reqs.size());
1759
1760 PERF_COUNTER_ADD(block_read_count, 1);
1761 PERF_COUNTER_ADD(block_read_byte, block_size(handle));
1762 }
1763 // Handle the last block and process the pending last request
1764 if (prev_len != 0) {
1765 FSReadRequest req;
1766 req.offset = prev_offset;
1767 req.len = prev_len;
1768 if (file->use_direct_io()) {
1769 req.scratch = nullptr;
1770 } else if (use_shared_buffer) {
1771 req.scratch = scratch + buf_offset;
1772 } else {
1773 req.scratch = new char[req.len];
1774 }
1775 read_reqs.emplace_back(req);
1776 }
1777
1778 AlignedBuf direct_io_buf;
1779 {
1780 IOOptions opts;
1781 IOStatus s = file->PrepareIOOptions(options, opts);
1782 if (s.ok()) {
1783 s = file->MultiRead(opts, &read_reqs[0], read_reqs.size(),
1784 &direct_io_buf);
1785 }
1786 if (!s.ok()) {
1787 // Discard all the results in this batch if there is any time out
1788 // or overall MultiRead error
1789 for (FSReadRequest& req : read_reqs) {
1790 req.status = s;
1791 }
1792 }
1793 }
1794
1795 idx_in_batch = 0;
1796 size_t valid_batch_idx = 0;
1797 for (auto mget_iter = batch->begin(); mget_iter != batch->end();
1798 ++mget_iter, ++idx_in_batch) {
1799 const BlockHandle& handle = (*handles)[idx_in_batch];
1800
1801 if (handle.IsNull()) {
1802 continue;
1803 }
1804
1805 assert(valid_batch_idx < req_idx_for_block.size());
1806 assert(valid_batch_idx < req_offset_for_block.size());
1807 assert(req_idx_for_block[valid_batch_idx] < read_reqs.size());
1808 size_t& req_idx = req_idx_for_block[valid_batch_idx];
1809 size_t& req_offset = req_offset_for_block[valid_batch_idx];
1810 valid_batch_idx++;
1811 if (mget_iter->get_context) {
1812 ++(mget_iter->get_context->get_context_stats_.num_data_read);
1813 }
1814 FSReadRequest& req = read_reqs[req_idx];
1815 Status s = req.status;
1816 if (s.ok()) {
1817 if ((req.result.size() != req.len) ||
1818 (req_offset + block_size(handle) > req.result.size())) {
1819 s = Status::Corruption(
1820 "truncated block read from " + rep_->file->file_name() +
1821 " offset " + ToString(handle.offset()) + ", expected " +
1822 ToString(req.len) + " bytes, got " + ToString(req.result.size()));
1823 }
1824 }
1825
1826 BlockContents raw_block_contents;
1827 if (s.ok()) {
1828 if (!use_shared_buffer) {
1829 // We allocated a buffer for this block. Give ownership of it to
1830 // BlockContents so it can free the memory
1831 assert(req.result.data() == req.scratch);
1832 assert(req.result.size() == block_size(handle));
1833 assert(req_offset == 0);
1834 std::unique_ptr<char[]> raw_block(req.scratch);
1835 raw_block_contents = BlockContents(std::move(raw_block), handle.size());
1836 } else {
1837 // We used the scratch buffer or direct io buffer
1838 // which are shared by the blocks.
1839 // raw_block_contents does not have the ownership.
1840 raw_block_contents =
1841 BlockContents(Slice(req.result.data() + req_offset, handle.size()));
1842 }
1843 #ifndef NDEBUG
1844 raw_block_contents.is_raw_block = true;
1845 #endif
1846
1847 if (options.verify_checksums) {
1848 PERF_TIMER_GUARD(block_checksum_time);
1849 const char* data = req.result.data();
1850 // Since the scratch might be shared, the offset of the data block in
1851 // the buffer might not be 0. req.result.data() only point to the
1852 // begin address of each read request, we need to add the offset
1853 // in each read request. Checksum is stored in the block trailer,
1854 // beyond the payload size.
1855 s = ROCKSDB_NAMESPACE::VerifyBlockChecksum(
1856 footer.checksum(), data + req_offset, handle.size(),
1857 rep_->file->file_name(), handle.offset());
1858 TEST_SYNC_POINT_CALLBACK("RetrieveMultipleBlocks:VerifyChecksum", &s);
1859 }
1860 } else if (!use_shared_buffer) {
1861 // Free the allocated scratch buffer.
1862 delete[] req.scratch;
1863 }
1864
1865 if (s.ok()) {
1866 // When the blocks share the same underlying buffer (scratch or direct io
1867 // buffer), we may need to manually copy the block into heap if the raw
1868 // block has to be inserted into a cache. That falls into th following
1869 // cases -
1870 // 1. Raw block is not compressed, it needs to be inserted into the
1871 // uncompressed block cache if there is one
1872 // 2. If the raw block is compressed, it needs to be inserted into the
1873 // compressed block cache if there is one
1874 //
1875 // In all other cases, the raw block is either uncompressed into a heap
1876 // buffer or there is no cache at all.
1877 CompressionType compression_type =
1878 raw_block_contents.get_compression_type();
1879 if (use_shared_buffer && (compression_type == kNoCompression ||
1880 (compression_type != kNoCompression &&
1881 rep_->table_options.block_cache_compressed))) {
1882 Slice raw = Slice(req.result.data() + req_offset, block_size(handle));
1883 raw_block_contents = BlockContents(
1884 CopyBufferToHeap(GetMemoryAllocator(rep_->table_options), raw),
1885 handle.size());
1886 #ifndef NDEBUG
1887 raw_block_contents.is_raw_block = true;
1888 #endif
1889 }
1890 }
1891
1892 if (s.ok()) {
1893 if (options.fill_cache) {
1894 BlockCacheLookupContext lookup_data_block_context(
1895 TableReaderCaller::kUserMultiGet);
1896 CachableEntry<Block>* block_entry = &(*results)[idx_in_batch];
1897 // MaybeReadBlockAndLoadToCache will insert into the block caches if
1898 // necessary. Since we're passing the raw block contents, it will
1899 // avoid looking up the block cache
1900 s = MaybeReadBlockAndLoadToCache(
1901 nullptr, options, handle, uncompression_dict, /*wait=*/true,
1902 block_entry, BlockType::kData, mget_iter->get_context,
1903 &lookup_data_block_context, &raw_block_contents);
1904
1905 // block_entry value could be null if no block cache is present, i.e
1906 // BlockBasedTableOptions::no_block_cache is true and no compressed
1907 // block cache is configured. In that case, fall
1908 // through and set up the block explicitly
1909 if (block_entry->GetValue() != nullptr) {
1910 s.PermitUncheckedError();
1911 continue;
1912 }
1913 }
1914
1915 CompressionType compression_type =
1916 raw_block_contents.get_compression_type();
1917 BlockContents contents;
1918 if (compression_type != kNoCompression) {
1919 UncompressionContext context(compression_type);
1920 UncompressionInfo info(context, uncompression_dict, compression_type);
1921 s = UncompressBlockContents(info, req.result.data() + req_offset,
1922 handle.size(), &contents, footer.version(),
1923 rep_->ioptions, memory_allocator);
1924 } else {
1925 // There are two cases here:
1926 // 1) caller uses the shared buffer (scratch or direct io buffer);
1927 // 2) we use the requst buffer.
1928 // If scratch buffer or direct io buffer is used, we ensure that
1929 // all raw blocks are copyed to the heap as single blocks. If scratch
1930 // buffer is not used, we also have no combined read, so the raw
1931 // block can be used directly.
1932 contents = std::move(raw_block_contents);
1933 }
1934 if (s.ok()) {
1935 (*results)[idx_in_batch].SetOwnedValue(new Block(
1936 std::move(contents), read_amp_bytes_per_bit, ioptions.stats));
1937 }
1938 }
1939 (*statuses)[idx_in_batch] = s;
1940 }
1941 }
1942
1943 template <typename TBlocklike>
RetrieveBlock(FilePrefetchBuffer * prefetch_buffer,const ReadOptions & ro,const BlockHandle & handle,const UncompressionDict & uncompression_dict,CachableEntry<TBlocklike> * block_entry,BlockType block_type,GetContext * get_context,BlockCacheLookupContext * lookup_context,bool for_compaction,bool use_cache,bool wait_for_cache) const1944 Status BlockBasedTable::RetrieveBlock(
1945 FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro,
1946 const BlockHandle& handle, const UncompressionDict& uncompression_dict,
1947 CachableEntry<TBlocklike>* block_entry, BlockType block_type,
1948 GetContext* get_context, BlockCacheLookupContext* lookup_context,
1949 bool for_compaction, bool use_cache, bool wait_for_cache) const {
1950 assert(block_entry);
1951 assert(block_entry->IsEmpty());
1952
1953 Status s;
1954 if (use_cache) {
1955 s = MaybeReadBlockAndLoadToCache(
1956 prefetch_buffer, ro, handle, uncompression_dict, wait_for_cache,
1957 block_entry, block_type, get_context, lookup_context,
1958 /*contents=*/nullptr);
1959
1960 if (!s.ok()) {
1961 return s;
1962 }
1963
1964 if (block_entry->GetValue() != nullptr ||
1965 block_entry->GetCacheHandle() != nullptr) {
1966 assert(s.ok());
1967 return s;
1968 }
1969 }
1970
1971 assert(block_entry->IsEmpty());
1972
1973 const bool no_io = ro.read_tier == kBlockCacheTier;
1974 if (no_io) {
1975 return Status::Incomplete("no blocking io");
1976 }
1977
1978 const bool maybe_compressed =
1979 block_type != BlockType::kFilter &&
1980 block_type != BlockType::kCompressionDictionary &&
1981 rep_->blocks_maybe_compressed;
1982 const bool do_uncompress = maybe_compressed;
1983 std::unique_ptr<TBlocklike> block;
1984
1985 {
1986 StopWatch sw(rep_->ioptions.clock, rep_->ioptions.stats,
1987 READ_BLOCK_GET_MICROS);
1988 s = ReadBlockFromFile(
1989 rep_->file.get(), prefetch_buffer, rep_->footer, ro, handle, &block,
1990 rep_->ioptions, do_uncompress, maybe_compressed, block_type,
1991 uncompression_dict, rep_->persistent_cache_options,
1992 block_type == BlockType::kData
1993 ? rep_->table_options.read_amp_bytes_per_bit
1994 : 0,
1995 GetMemoryAllocator(rep_->table_options), for_compaction,
1996 rep_->blocks_definitely_zstd_compressed,
1997 rep_->table_options.filter_policy.get());
1998
1999 if (get_context) {
2000 switch (block_type) {
2001 case BlockType::kIndex:
2002 ++(get_context->get_context_stats_.num_index_read);
2003 break;
2004 case BlockType::kFilter:
2005 ++(get_context->get_context_stats_.num_filter_read);
2006 break;
2007 case BlockType::kData:
2008 ++(get_context->get_context_stats_.num_data_read);
2009 break;
2010 default:
2011 break;
2012 }
2013 }
2014 }
2015
2016 if (!s.ok()) {
2017 return s;
2018 }
2019
2020 block_entry->SetOwnedValue(block.release());
2021
2022 assert(s.ok());
2023 return s;
2024 }
2025
2026 // Explicitly instantiate templates for both "blocklike" types we use.
2027 // This makes it possible to keep the template definitions in the .cc file.
2028 template Status BlockBasedTable::RetrieveBlock<BlockContents>(
2029 FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro,
2030 const BlockHandle& handle, const UncompressionDict& uncompression_dict,
2031 CachableEntry<BlockContents>* block_entry, BlockType block_type,
2032 GetContext* get_context, BlockCacheLookupContext* lookup_context,
2033 bool for_compaction, bool use_cache, bool wait_for_cache) const;
2034
2035 template Status BlockBasedTable::RetrieveBlock<ParsedFullFilterBlock>(
2036 FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro,
2037 const BlockHandle& handle, const UncompressionDict& uncompression_dict,
2038 CachableEntry<ParsedFullFilterBlock>* block_entry, BlockType block_type,
2039 GetContext* get_context, BlockCacheLookupContext* lookup_context,
2040 bool for_compaction, bool use_cache, bool wait_for_cache) const;
2041
2042 template Status BlockBasedTable::RetrieveBlock<Block>(
2043 FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro,
2044 const BlockHandle& handle, const UncompressionDict& uncompression_dict,
2045 CachableEntry<Block>* block_entry, BlockType block_type,
2046 GetContext* get_context, BlockCacheLookupContext* lookup_context,
2047 bool for_compaction, bool use_cache, bool wait_for_cache) const;
2048
2049 template Status BlockBasedTable::RetrieveBlock<UncompressionDict>(
2050 FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro,
2051 const BlockHandle& handle, const UncompressionDict& uncompression_dict,
2052 CachableEntry<UncompressionDict>* block_entry, BlockType block_type,
2053 GetContext* get_context, BlockCacheLookupContext* lookup_context,
2054 bool for_compaction, bool use_cache, bool wait_for_cache) const;
2055
PartitionedIndexIteratorState(const BlockBasedTable * table,std::unordered_map<uint64_t,CachableEntry<Block>> * block_map)2056 BlockBasedTable::PartitionedIndexIteratorState::PartitionedIndexIteratorState(
2057 const BlockBasedTable* table,
2058 std::unordered_map<uint64_t, CachableEntry<Block>>* block_map)
2059 : table_(table), block_map_(block_map) {}
2060
2061 InternalIteratorBase<IndexValue>*
NewSecondaryIterator(const BlockHandle & handle)2062 BlockBasedTable::PartitionedIndexIteratorState::NewSecondaryIterator(
2063 const BlockHandle& handle) {
2064 // Return a block iterator on the index partition
2065 auto block = block_map_->find(handle.offset());
2066 // This is a possible scenario since block cache might not have had space
2067 // for the partition
2068 if (block != block_map_->end()) {
2069 const Rep* rep = table_->get_rep();
2070 assert(rep);
2071
2072 Statistics* kNullStats = nullptr;
2073 // We don't return pinned data from index blocks, so no need
2074 // to set `block_contents_pinned`.
2075 return block->second.GetValue()->NewIndexIterator(
2076 rep->internal_comparator.user_comparator(),
2077 rep->get_global_seqno(BlockType::kIndex), nullptr, kNullStats, true,
2078 rep->index_has_first_key, rep->index_key_includes_seq,
2079 rep->index_value_is_full);
2080 }
2081 // Create an empty iterator
2082 // TODO(ajkr): this is not the right way to handle an unpinned partition.
2083 return new IndexBlockIter();
2084 }
2085
2086 // This will be broken if the user specifies an unusual implementation
2087 // of Options.comparator, or if the user specifies an unusual
2088 // definition of prefixes in BlockBasedTableOptions.filter_policy.
2089 // In particular, we require the following three properties:
2090 //
2091 // 1) key.starts_with(prefix(key))
2092 // 2) Compare(prefix(key), key) <= 0.
2093 // 3) If Compare(key1, key2) <= 0, then Compare(prefix(key1), prefix(key2)) <= 0
2094 //
2095 // If read_options.read_tier == kBlockCacheTier, this method will do no I/O and
2096 // will return true if the filter block is not in memory and not found in block
2097 // cache.
2098 //
2099 // REQUIRES: this method shouldn't be called while the DB lock is held.
PrefixMayMatch(const Slice & internal_key,const ReadOptions & read_options,const SliceTransform * options_prefix_extractor,const bool need_upper_bound_check,BlockCacheLookupContext * lookup_context) const2100 bool BlockBasedTable::PrefixMayMatch(
2101 const Slice& internal_key, const ReadOptions& read_options,
2102 const SliceTransform* options_prefix_extractor,
2103 const bool need_upper_bound_check,
2104 BlockCacheLookupContext* lookup_context) const {
2105 if (!rep_->filter_policy) {
2106 return true;
2107 }
2108
2109 const SliceTransform* prefix_extractor;
2110
2111 if (rep_->table_prefix_extractor == nullptr) {
2112 if (need_upper_bound_check) {
2113 return true;
2114 }
2115 prefix_extractor = options_prefix_extractor;
2116 } else {
2117 prefix_extractor = rep_->table_prefix_extractor.get();
2118 }
2119 auto ts_sz = rep_->internal_comparator.user_comparator()->timestamp_size();
2120 auto user_key_without_ts =
2121 ExtractUserKeyAndStripTimestamp(internal_key, ts_sz);
2122 if (!prefix_extractor->InDomain(user_key_without_ts)) {
2123 return true;
2124 }
2125
2126 bool may_match = true;
2127
2128 // First, try check with full filter
2129 FilterBlockReader* const filter = rep_->filter.get();
2130 bool filter_checked = true;
2131 if (filter != nullptr) {
2132 const bool no_io = read_options.read_tier == kBlockCacheTier;
2133
2134 if (!filter->IsBlockBased()) {
2135 const Slice* const const_ikey_ptr = &internal_key;
2136 may_match = filter->RangeMayExist(
2137 read_options.iterate_upper_bound, user_key_without_ts,
2138 prefix_extractor, rep_->internal_comparator.user_comparator(),
2139 const_ikey_ptr, &filter_checked, need_upper_bound_check, no_io,
2140 lookup_context);
2141 } else {
2142 // if prefix_extractor changed for block based filter, skip filter
2143 if (need_upper_bound_check) {
2144 return true;
2145 }
2146 auto prefix = prefix_extractor->Transform(user_key_without_ts);
2147 InternalKey internal_key_prefix(prefix, kMaxSequenceNumber, kTypeValue);
2148 auto internal_prefix = internal_key_prefix.Encode();
2149
2150 // To prevent any io operation in this method, we set `read_tier` to make
2151 // sure we always read index or filter only when they have already been
2152 // loaded to memory.
2153 ReadOptions no_io_read_options;
2154 no_io_read_options.read_tier = kBlockCacheTier;
2155
2156 // Then, try find it within each block
2157 // we already know prefix_extractor and prefix_extractor_name must match
2158 // because `CheckPrefixMayMatch` first checks `check_filter_ == true`
2159 std::unique_ptr<InternalIteratorBase<IndexValue>> iiter(NewIndexIterator(
2160 no_io_read_options,
2161 /*need_upper_bound_check=*/false, /*input_iter=*/nullptr,
2162 /*get_context=*/nullptr, lookup_context));
2163 iiter->Seek(internal_prefix);
2164
2165 if (!iiter->Valid()) {
2166 // we're past end of file
2167 // if it's incomplete, it means that we avoided I/O
2168 // and we're not really sure that we're past the end
2169 // of the file
2170 may_match = iiter->status().IsIncomplete();
2171 } else if ((rep_->index_key_includes_seq ? ExtractUserKey(iiter->key())
2172 : iiter->key())
2173 .starts_with(ExtractUserKey(internal_prefix))) {
2174 // we need to check for this subtle case because our only
2175 // guarantee is that "the key is a string >= last key in that data
2176 // block" according to the doc/table_format.txt spec.
2177 //
2178 // Suppose iiter->key() starts with the desired prefix; it is not
2179 // necessarily the case that the corresponding data block will
2180 // contain the prefix, since iiter->key() need not be in the
2181 // block. However, the next data block may contain the prefix, so
2182 // we return true to play it safe.
2183 may_match = true;
2184 } else if (filter->IsBlockBased()) {
2185 // iiter->key() does NOT start with the desired prefix. Because
2186 // Seek() finds the first key that is >= the seek target, this
2187 // means that iiter->key() > prefix. Thus, any data blocks coming
2188 // after the data block corresponding to iiter->key() cannot
2189 // possibly contain the key. Thus, the corresponding data block
2190 // is the only on could potentially contain the prefix.
2191 BlockHandle handle = iiter->value().handle;
2192 may_match = filter->PrefixMayMatch(
2193 prefix, prefix_extractor, handle.offset(), no_io,
2194 /*const_key_ptr=*/nullptr, /*get_context=*/nullptr, lookup_context);
2195 }
2196 }
2197 }
2198
2199 if (filter_checked) {
2200 Statistics* statistics = rep_->ioptions.stats;
2201 RecordTick(statistics, BLOOM_FILTER_PREFIX_CHECKED);
2202 if (!may_match) {
2203 RecordTick(statistics, BLOOM_FILTER_PREFIX_USEFUL);
2204 }
2205 }
2206
2207 return may_match;
2208 }
2209
2210
NewIterator(const ReadOptions & read_options,const SliceTransform * prefix_extractor,Arena * arena,bool skip_filters,TableReaderCaller caller,size_t compaction_readahead_size,bool allow_unprepared_value)2211 InternalIterator* BlockBasedTable::NewIterator(
2212 const ReadOptions& read_options, const SliceTransform* prefix_extractor,
2213 Arena* arena, bool skip_filters, TableReaderCaller caller,
2214 size_t compaction_readahead_size, bool allow_unprepared_value) {
2215 BlockCacheLookupContext lookup_context{caller};
2216 bool need_upper_bound_check =
2217 read_options.auto_prefix_mode ||
2218 PrefixExtractorChanged(rep_->table_properties.get(), prefix_extractor);
2219 std::unique_ptr<InternalIteratorBase<IndexValue>> index_iter(NewIndexIterator(
2220 read_options,
2221 need_upper_bound_check &&
2222 rep_->index_type == BlockBasedTableOptions::kHashSearch,
2223 /*input_iter=*/nullptr, /*get_context=*/nullptr, &lookup_context));
2224 if (arena == nullptr) {
2225 return new BlockBasedTableIterator(
2226 this, read_options, rep_->internal_comparator, std::move(index_iter),
2227 !skip_filters && !read_options.total_order_seek &&
2228 prefix_extractor != nullptr,
2229 need_upper_bound_check, prefix_extractor, caller,
2230 compaction_readahead_size, allow_unprepared_value);
2231 } else {
2232 auto* mem = arena->AllocateAligned(sizeof(BlockBasedTableIterator));
2233 return new (mem) BlockBasedTableIterator(
2234 this, read_options, rep_->internal_comparator, std::move(index_iter),
2235 !skip_filters && !read_options.total_order_seek &&
2236 prefix_extractor != nullptr,
2237 need_upper_bound_check, prefix_extractor, caller,
2238 compaction_readahead_size, allow_unprepared_value);
2239 }
2240 }
2241
NewRangeTombstoneIterator(const ReadOptions & read_options)2242 FragmentedRangeTombstoneIterator* BlockBasedTable::NewRangeTombstoneIterator(
2243 const ReadOptions& read_options) {
2244 if (rep_->fragmented_range_dels == nullptr) {
2245 return nullptr;
2246 }
2247 SequenceNumber snapshot = kMaxSequenceNumber;
2248 if (read_options.snapshot != nullptr) {
2249 snapshot = read_options.snapshot->GetSequenceNumber();
2250 }
2251 return new FragmentedRangeTombstoneIterator(
2252 rep_->fragmented_range_dels, rep_->internal_comparator, snapshot);
2253 }
2254
FullFilterKeyMayMatch(const ReadOptions & read_options,FilterBlockReader * filter,const Slice & internal_key,const bool no_io,const SliceTransform * prefix_extractor,GetContext * get_context,BlockCacheLookupContext * lookup_context) const2255 bool BlockBasedTable::FullFilterKeyMayMatch(
2256 const ReadOptions& read_options, FilterBlockReader* filter,
2257 const Slice& internal_key, const bool no_io,
2258 const SliceTransform* prefix_extractor, GetContext* get_context,
2259 BlockCacheLookupContext* lookup_context) const {
2260 if (filter == nullptr || filter->IsBlockBased()) {
2261 return true;
2262 }
2263 Slice user_key = ExtractUserKey(internal_key);
2264 const Slice* const const_ikey_ptr = &internal_key;
2265 bool may_match = true;
2266 size_t ts_sz = rep_->internal_comparator.user_comparator()->timestamp_size();
2267 Slice user_key_without_ts = StripTimestampFromUserKey(user_key, ts_sz);
2268 if (rep_->whole_key_filtering) {
2269 may_match =
2270 filter->KeyMayMatch(user_key_without_ts, prefix_extractor, kNotValid,
2271 no_io, const_ikey_ptr, get_context, lookup_context);
2272 } else if (!read_options.total_order_seek && prefix_extractor &&
2273 rep_->table_properties->prefix_extractor_name ==
2274 prefix_extractor->AsString() &&
2275 prefix_extractor->InDomain(user_key_without_ts) &&
2276 !filter->PrefixMayMatch(
2277 prefix_extractor->Transform(user_key_without_ts),
2278 prefix_extractor, kNotValid, no_io, const_ikey_ptr,
2279 get_context, lookup_context)) {
2280 may_match = false;
2281 }
2282 if (may_match) {
2283 RecordTick(rep_->ioptions.stats, BLOOM_FILTER_FULL_POSITIVE);
2284 PERF_COUNTER_BY_LEVEL_ADD(bloom_filter_full_positive, 1, rep_->level);
2285 }
2286 return may_match;
2287 }
2288
FullFilterKeysMayMatch(const ReadOptions & read_options,FilterBlockReader * filter,MultiGetRange * range,const bool no_io,const SliceTransform * prefix_extractor,BlockCacheLookupContext * lookup_context) const2289 void BlockBasedTable::FullFilterKeysMayMatch(
2290 const ReadOptions& read_options, FilterBlockReader* filter,
2291 MultiGetRange* range, const bool no_io,
2292 const SliceTransform* prefix_extractor,
2293 BlockCacheLookupContext* lookup_context) const {
2294 if (filter == nullptr || filter->IsBlockBased()) {
2295 return;
2296 }
2297 uint64_t before_keys = range->KeysLeft();
2298 assert(before_keys > 0); // Caller should ensure
2299 if (rep_->whole_key_filtering) {
2300 filter->KeysMayMatch(range, prefix_extractor, kNotValid, no_io,
2301 lookup_context);
2302 uint64_t after_keys = range->KeysLeft();
2303 if (after_keys) {
2304 RecordTick(rep_->ioptions.stats, BLOOM_FILTER_FULL_POSITIVE, after_keys);
2305 PERF_COUNTER_BY_LEVEL_ADD(bloom_filter_full_positive, after_keys,
2306 rep_->level);
2307 }
2308 uint64_t filtered_keys = before_keys - after_keys;
2309 if (filtered_keys) {
2310 RecordTick(rep_->ioptions.stats, BLOOM_FILTER_USEFUL, filtered_keys);
2311 PERF_COUNTER_BY_LEVEL_ADD(bloom_filter_useful, filtered_keys,
2312 rep_->level);
2313 }
2314 } else if (!read_options.total_order_seek && prefix_extractor &&
2315 rep_->table_properties->prefix_extractor_name ==
2316 prefix_extractor->AsString()) {
2317 filter->PrefixesMayMatch(range, prefix_extractor, kNotValid, false,
2318 lookup_context);
2319 RecordTick(rep_->ioptions.stats, BLOOM_FILTER_PREFIX_CHECKED, before_keys);
2320 uint64_t after_keys = range->KeysLeft();
2321 uint64_t filtered_keys = before_keys - after_keys;
2322 if (filtered_keys) {
2323 RecordTick(rep_->ioptions.stats, BLOOM_FILTER_PREFIX_USEFUL,
2324 filtered_keys);
2325 }
2326 }
2327 }
2328
Get(const ReadOptions & read_options,const Slice & key,GetContext * get_context,const SliceTransform * prefix_extractor,bool skip_filters)2329 Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key,
2330 GetContext* get_context,
2331 const SliceTransform* prefix_extractor,
2332 bool skip_filters) {
2333 assert(key.size() >= 8); // key must be internal key
2334 assert(get_context != nullptr);
2335 Status s;
2336 const bool no_io = read_options.read_tier == kBlockCacheTier;
2337
2338 FilterBlockReader* const filter =
2339 !skip_filters ? rep_->filter.get() : nullptr;
2340
2341 // First check the full filter
2342 // If full filter not useful, Then go into each block
2343 uint64_t tracing_get_id = get_context->get_tracing_get_id();
2344 BlockCacheLookupContext lookup_context{
2345 TableReaderCaller::kUserGet, tracing_get_id,
2346 /*get_from_user_specified_snapshot=*/read_options.snapshot != nullptr};
2347 if (block_cache_tracer_ && block_cache_tracer_->is_tracing_enabled()) {
2348 // Trace the key since it contains both user key and sequence number.
2349 lookup_context.referenced_key = key.ToString();
2350 lookup_context.get_from_user_specified_snapshot =
2351 read_options.snapshot != nullptr;
2352 }
2353 TEST_SYNC_POINT("BlockBasedTable::Get:BeforeFilterMatch");
2354 const bool may_match =
2355 FullFilterKeyMayMatch(read_options, filter, key, no_io, prefix_extractor,
2356 get_context, &lookup_context);
2357 TEST_SYNC_POINT("BlockBasedTable::Get:AfterFilterMatch");
2358 if (!may_match) {
2359 RecordTick(rep_->ioptions.stats, BLOOM_FILTER_USEFUL);
2360 PERF_COUNTER_BY_LEVEL_ADD(bloom_filter_useful, 1, rep_->level);
2361 } else {
2362 IndexBlockIter iiter_on_stack;
2363 // if prefix_extractor found in block differs from options, disable
2364 // BlockPrefixIndex. Only do this check when index_type is kHashSearch.
2365 bool need_upper_bound_check = false;
2366 if (rep_->index_type == BlockBasedTableOptions::kHashSearch) {
2367 need_upper_bound_check = PrefixExtractorChanged(
2368 rep_->table_properties.get(), prefix_extractor);
2369 }
2370 auto iiter =
2371 NewIndexIterator(read_options, need_upper_bound_check, &iiter_on_stack,
2372 get_context, &lookup_context);
2373 std::unique_ptr<InternalIteratorBase<IndexValue>> iiter_unique_ptr;
2374 if (iiter != &iiter_on_stack) {
2375 iiter_unique_ptr.reset(iiter);
2376 }
2377
2378 size_t ts_sz =
2379 rep_->internal_comparator.user_comparator()->timestamp_size();
2380 bool matched = false; // if such user key matched a key in SST
2381 bool done = false;
2382 for (iiter->Seek(key); iiter->Valid() && !done; iiter->Next()) {
2383 IndexValue v = iiter->value();
2384
2385 bool not_exist_in_filter =
2386 filter != nullptr && filter->IsBlockBased() == true &&
2387 !filter->KeyMayMatch(ExtractUserKeyAndStripTimestamp(key, ts_sz),
2388 prefix_extractor, v.handle.offset(), no_io,
2389 /*const_ikey_ptr=*/nullptr, get_context,
2390 &lookup_context);
2391
2392 if (not_exist_in_filter) {
2393 // Not found
2394 // TODO: think about interaction with Merge. If a user key cannot
2395 // cross one data block, we should be fine.
2396 RecordTick(rep_->ioptions.stats, BLOOM_FILTER_USEFUL);
2397 PERF_COUNTER_BY_LEVEL_ADD(bloom_filter_useful, 1, rep_->level);
2398 break;
2399 }
2400
2401 if (!v.first_internal_key.empty() && !skip_filters &&
2402 UserComparatorWrapper(rep_->internal_comparator.user_comparator())
2403 .CompareWithoutTimestamp(
2404 ExtractUserKey(key),
2405 ExtractUserKey(v.first_internal_key)) < 0) {
2406 // The requested key falls between highest key in previous block and
2407 // lowest key in current block.
2408 break;
2409 }
2410
2411 BlockCacheLookupContext lookup_data_block_context{
2412 TableReaderCaller::kUserGet, tracing_get_id,
2413 /*get_from_user_specified_snapshot=*/read_options.snapshot !=
2414 nullptr};
2415 bool does_referenced_key_exist = false;
2416 DataBlockIter biter;
2417 uint64_t referenced_data_size = 0;
2418 NewDataBlockIterator<DataBlockIter>(
2419 read_options, v.handle, &biter, BlockType::kData, get_context,
2420 &lookup_data_block_context,
2421 /*s=*/Status(), /*prefetch_buffer*/ nullptr);
2422
2423 if (no_io && biter.status().IsIncomplete()) {
2424 // couldn't get block from block_cache
2425 // Update Saver.state to Found because we are only looking for
2426 // whether we can guarantee the key is not there when "no_io" is set
2427 get_context->MarkKeyMayExist();
2428 s = biter.status();
2429 break;
2430 }
2431 if (!biter.status().ok()) {
2432 s = biter.status();
2433 break;
2434 }
2435
2436 bool may_exist = biter.SeekForGet(key);
2437 // If user-specified timestamp is supported, we cannot end the search
2438 // just because hash index lookup indicates the key+ts does not exist.
2439 if (!may_exist && ts_sz == 0) {
2440 // HashSeek cannot find the key this block and the the iter is not
2441 // the end of the block, i.e. cannot be in the following blocks
2442 // either. In this case, the seek_key cannot be found, so we break
2443 // from the top level for-loop.
2444 done = true;
2445 } else {
2446 // Call the *saver function on each entry/block until it returns false
2447 for (; biter.Valid(); biter.Next()) {
2448 ParsedInternalKey parsed_key;
2449 Status pik_status = ParseInternalKey(
2450 biter.key(), &parsed_key, false /* log_err_key */); // TODO
2451 if (!pik_status.ok()) {
2452 s = pik_status;
2453 }
2454
2455 if (!get_context->SaveValue(
2456 parsed_key, biter.value(), &matched,
2457 biter.IsValuePinned() ? &biter : nullptr)) {
2458 if (get_context->State() == GetContext::GetState::kFound) {
2459 does_referenced_key_exist = true;
2460 referenced_data_size = biter.key().size() + biter.value().size();
2461 }
2462 done = true;
2463 break;
2464 }
2465 }
2466 s = biter.status();
2467 }
2468 // Write the block cache access record.
2469 if (block_cache_tracer_ && block_cache_tracer_->is_tracing_enabled()) {
2470 // Avoid making copy of block_key, cf_name, and referenced_key when
2471 // constructing the access record.
2472 Slice referenced_key;
2473 if (does_referenced_key_exist) {
2474 referenced_key = biter.key();
2475 } else {
2476 referenced_key = key;
2477 }
2478 BlockCacheTraceRecord access_record(
2479 rep_->ioptions.clock->NowMicros(),
2480 /*block_key=*/"", lookup_data_block_context.block_type,
2481 lookup_data_block_context.block_size, rep_->cf_id_for_tracing(),
2482 /*cf_name=*/"", rep_->level_for_tracing(),
2483 rep_->sst_number_for_tracing(), lookup_data_block_context.caller,
2484 lookup_data_block_context.is_cache_hit,
2485 lookup_data_block_context.no_insert,
2486 lookup_data_block_context.get_id,
2487 lookup_data_block_context.get_from_user_specified_snapshot,
2488 /*referenced_key=*/"", referenced_data_size,
2489 lookup_data_block_context.num_keys_in_block,
2490 does_referenced_key_exist);
2491 // TODO: Should handle status here?
2492 block_cache_tracer_
2493 ->WriteBlockAccess(access_record,
2494 lookup_data_block_context.block_key,
2495 rep_->cf_name_for_tracing(), referenced_key)
2496 .PermitUncheckedError();
2497 }
2498
2499 if (done) {
2500 // Avoid the extra Next which is expensive in two-level indexes
2501 break;
2502 }
2503 }
2504 if (matched && filter != nullptr && !filter->IsBlockBased()) {
2505 RecordTick(rep_->ioptions.stats, BLOOM_FILTER_FULL_TRUE_POSITIVE);
2506 PERF_COUNTER_BY_LEVEL_ADD(bloom_filter_full_true_positive, 1,
2507 rep_->level);
2508 }
2509 if (s.ok() && !iiter->status().IsNotFound()) {
2510 s = iiter->status();
2511 }
2512 }
2513
2514 return s;
2515 }
2516
2517 using MultiGetRange = MultiGetContext::Range;
MultiGet(const ReadOptions & read_options,const MultiGetRange * mget_range,const SliceTransform * prefix_extractor,bool skip_filters)2518 void BlockBasedTable::MultiGet(const ReadOptions& read_options,
2519 const MultiGetRange* mget_range,
2520 const SliceTransform* prefix_extractor,
2521 bool skip_filters) {
2522 if (mget_range->empty()) {
2523 // Caller should ensure non-empty (performance bug)
2524 assert(false);
2525 return; // Nothing to do
2526 }
2527
2528 FilterBlockReader* const filter =
2529 !skip_filters ? rep_->filter.get() : nullptr;
2530 MultiGetRange sst_file_range(*mget_range, mget_range->begin(),
2531 mget_range->end());
2532
2533 // First check the full filter
2534 // If full filter not useful, Then go into each block
2535 const bool no_io = read_options.read_tier == kBlockCacheTier;
2536 uint64_t tracing_mget_id = BlockCacheTraceHelper::kReservedGetId;
2537 if (sst_file_range.begin()->get_context) {
2538 tracing_mget_id = sst_file_range.begin()->get_context->get_tracing_get_id();
2539 }
2540 BlockCacheLookupContext lookup_context{
2541 TableReaderCaller::kUserMultiGet, tracing_mget_id,
2542 /*get_from_user_specified_snapshot=*/read_options.snapshot != nullptr};
2543 FullFilterKeysMayMatch(read_options, filter, &sst_file_range, no_io,
2544 prefix_extractor, &lookup_context);
2545
2546 if (!sst_file_range.empty()) {
2547 IndexBlockIter iiter_on_stack;
2548 // if prefix_extractor found in block differs from options, disable
2549 // BlockPrefixIndex. Only do this check when index_type is kHashSearch.
2550 bool need_upper_bound_check = false;
2551 if (rep_->index_type == BlockBasedTableOptions::kHashSearch) {
2552 need_upper_bound_check = PrefixExtractorChanged(
2553 rep_->table_properties.get(), prefix_extractor);
2554 }
2555 auto iiter =
2556 NewIndexIterator(read_options, need_upper_bound_check, &iiter_on_stack,
2557 sst_file_range.begin()->get_context, &lookup_context);
2558 std::unique_ptr<InternalIteratorBase<IndexValue>> iiter_unique_ptr;
2559 if (iiter != &iiter_on_stack) {
2560 iiter_unique_ptr.reset(iiter);
2561 }
2562
2563 uint64_t offset = std::numeric_limits<uint64_t>::max();
2564 autovector<BlockHandle, MultiGetContext::MAX_BATCH_SIZE> block_handles;
2565 autovector<CachableEntry<Block>, MultiGetContext::MAX_BATCH_SIZE> results;
2566 autovector<Status, MultiGetContext::MAX_BATCH_SIZE> statuses;
2567 char stack_buf[kMultiGetReadStackBufSize];
2568 std::unique_ptr<char[]> block_buf;
2569 {
2570 MultiGetRange data_block_range(sst_file_range, sst_file_range.begin(),
2571 sst_file_range.end());
2572 std::vector<Cache::Handle*> cache_handles;
2573 bool wait_for_cache_results = false;
2574
2575 CachableEntry<UncompressionDict> uncompression_dict;
2576 Status uncompression_dict_status;
2577 uncompression_dict_status.PermitUncheckedError();
2578 bool uncompression_dict_inited = false;
2579 size_t total_len = 0;
2580 ReadOptions ro = read_options;
2581 ro.read_tier = kBlockCacheTier;
2582
2583 for (auto miter = data_block_range.begin();
2584 miter != data_block_range.end(); ++miter) {
2585 const Slice& key = miter->ikey;
2586 iiter->Seek(miter->ikey);
2587
2588 IndexValue v;
2589 if (iiter->Valid()) {
2590 v = iiter->value();
2591 }
2592 if (!iiter->Valid() ||
2593 (!v.first_internal_key.empty() && !skip_filters &&
2594 UserComparatorWrapper(rep_->internal_comparator.user_comparator())
2595 .CompareWithoutTimestamp(
2596 ExtractUserKey(key),
2597 ExtractUserKey(v.first_internal_key)) < 0)) {
2598 // The requested key falls between highest key in previous block and
2599 // lowest key in current block.
2600 if (!iiter->status().IsNotFound()) {
2601 *(miter->s) = iiter->status();
2602 }
2603 data_block_range.SkipKey(miter);
2604 sst_file_range.SkipKey(miter);
2605 continue;
2606 }
2607
2608 if (!uncompression_dict_inited && rep_->uncompression_dict_reader) {
2609 uncompression_dict_status =
2610 rep_->uncompression_dict_reader->GetOrReadUncompressionDictionary(
2611 nullptr /* prefetch_buffer */, no_io,
2612 sst_file_range.begin()->get_context, &lookup_context,
2613 &uncompression_dict);
2614 uncompression_dict_inited = true;
2615 }
2616
2617 if (!uncompression_dict_status.ok()) {
2618 assert(!uncompression_dict_status.IsNotFound());
2619 *(miter->s) = uncompression_dict_status;
2620 data_block_range.SkipKey(miter);
2621 sst_file_range.SkipKey(miter);
2622 continue;
2623 }
2624
2625 statuses.emplace_back();
2626 results.emplace_back();
2627 if (v.handle.offset() == offset) {
2628 // We're going to reuse the block for this key later on. No need to
2629 // look it up now. Place a null handle
2630 block_handles.emplace_back(BlockHandle::NullBlockHandle());
2631 continue;
2632 }
2633 // Lookup the cache for the given data block referenced by an index
2634 // iterator value (i.e BlockHandle). If it exists in the cache,
2635 // initialize block to the contents of the data block.
2636 offset = v.handle.offset();
2637 BlockHandle handle = v.handle;
2638 BlockCacheLookupContext lookup_data_block_context(
2639 TableReaderCaller::kUserMultiGet);
2640 const UncompressionDict& dict = uncompression_dict.GetValue()
2641 ? *uncompression_dict.GetValue()
2642 : UncompressionDict::GetEmptyDict();
2643 Status s = RetrieveBlock(
2644 nullptr, ro, handle, dict, &(results.back()), BlockType::kData,
2645 miter->get_context, &lookup_data_block_context,
2646 /* for_compaction */ false, /* use_cache */ true,
2647 /* wait_for_cache */ false);
2648 if (s.IsIncomplete()) {
2649 s = Status::OK();
2650 }
2651 if (s.ok() && !results.back().IsEmpty()) {
2652 // Since we have a valid handle, check the value. If its nullptr,
2653 // it means the cache is waiting for the final result and we're
2654 // supposed to call WaitAll() to wait for the result.
2655 if (results.back().GetValue() != nullptr) {
2656 // Found it in the cache. Add NULL handle to indicate there is
2657 // nothing to read from disk.
2658 if (results.back().GetCacheHandle()) {
2659 results.back().UpdateCachedValue();
2660 }
2661 block_handles.emplace_back(BlockHandle::NullBlockHandle());
2662 } else {
2663 // We have to wait for the cache lookup to finish in the
2664 // background, and then we may have to read the block from disk
2665 // anyway
2666 assert(results.back().GetCacheHandle());
2667 wait_for_cache_results = true;
2668 block_handles.emplace_back(handle);
2669 cache_handles.emplace_back(results.back().GetCacheHandle());
2670 }
2671 } else {
2672 block_handles.emplace_back(handle);
2673 total_len += block_size(handle);
2674 }
2675 }
2676
2677 if (wait_for_cache_results) {
2678 Cache* block_cache = rep_->table_options.block_cache.get();
2679 block_cache->WaitAll(cache_handles);
2680 for (size_t i = 0; i < block_handles.size(); ++i) {
2681 // If this block was a success or failure or not needed because
2682 // the corresponding key is in the same block as a prior key, skip
2683 if (block_handles[i] == BlockHandle::NullBlockHandle() ||
2684 results[i].IsEmpty()) {
2685 continue;
2686 }
2687 results[i].UpdateCachedValue();
2688 void* val = results[i].GetValue();
2689 if (!val) {
2690 // The async cache lookup failed - could be due to an error
2691 // or a false positive. We need to read the data block from
2692 // the SST file
2693 results[i].Reset();
2694 total_len += block_size(block_handles[i]);
2695 } else {
2696 block_handles[i] = BlockHandle::NullBlockHandle();
2697 }
2698 }
2699 }
2700
2701 if (total_len) {
2702 char* scratch = nullptr;
2703 const UncompressionDict& dict = uncompression_dict.GetValue()
2704 ? *uncompression_dict.GetValue()
2705 : UncompressionDict::GetEmptyDict();
2706 assert(uncompression_dict_inited || !rep_->uncompression_dict_reader);
2707 assert(uncompression_dict_status.ok());
2708 // If using direct IO, then scratch is not used, so keep it nullptr.
2709 // If the blocks need to be uncompressed and we don't need the
2710 // compressed blocks, then we can use a contiguous block of
2711 // memory to read in all the blocks as it will be temporary
2712 // storage
2713 // 1. If blocks are compressed and compressed block cache is there,
2714 // alloc heap bufs
2715 // 2. If blocks are uncompressed, alloc heap bufs
2716 // 3. If blocks are compressed and no compressed block cache, use
2717 // stack buf
2718 if (!rep_->file->use_direct_io() &&
2719 rep_->table_options.block_cache_compressed == nullptr &&
2720 rep_->blocks_maybe_compressed) {
2721 if (total_len <= kMultiGetReadStackBufSize) {
2722 scratch = stack_buf;
2723 } else {
2724 scratch = new char[total_len];
2725 block_buf.reset(scratch);
2726 }
2727 }
2728 RetrieveMultipleBlocks(read_options, &data_block_range, &block_handles,
2729 &statuses, &results, scratch, dict);
2730 if (sst_file_range.begin()->get_context) {
2731 ++(sst_file_range.begin()
2732 ->get_context->get_context_stats_.num_sst_read);
2733 }
2734 }
2735 }
2736
2737 DataBlockIter first_biter;
2738 DataBlockIter next_biter;
2739 size_t idx_in_batch = 0;
2740 for (auto miter = sst_file_range.begin(); miter != sst_file_range.end();
2741 ++miter) {
2742 Status s;
2743 GetContext* get_context = miter->get_context;
2744 const Slice& key = miter->ikey;
2745 bool matched = false; // if such user key matched a key in SST
2746 bool done = false;
2747 bool first_block = true;
2748 do {
2749 DataBlockIter* biter = nullptr;
2750 bool reusing_block = true;
2751 uint64_t referenced_data_size = 0;
2752 bool does_referenced_key_exist = false;
2753 BlockCacheLookupContext lookup_data_block_context(
2754 TableReaderCaller::kUserMultiGet, tracing_mget_id,
2755 /*get_from_user_specified_snapshot=*/read_options.snapshot !=
2756 nullptr);
2757 if (first_block) {
2758 if (!block_handles[idx_in_batch].IsNull() ||
2759 !results[idx_in_batch].IsEmpty()) {
2760 first_biter.Invalidate(Status::OK());
2761 NewDataBlockIterator<DataBlockIter>(
2762 read_options, results[idx_in_batch], &first_biter,
2763 statuses[idx_in_batch]);
2764 reusing_block = false;
2765 } else {
2766 // If handler is null and result is empty, then the status is never
2767 // set, which should be the initial value: ok().
2768 assert(statuses[idx_in_batch].ok());
2769 }
2770 biter = &first_biter;
2771 idx_in_batch++;
2772 } else {
2773 IndexValue v = iiter->value();
2774 if (!v.first_internal_key.empty() && !skip_filters &&
2775 UserComparatorWrapper(rep_->internal_comparator.user_comparator())
2776 .CompareWithoutTimestamp(
2777 ExtractUserKey(key),
2778 ExtractUserKey(v.first_internal_key)) < 0) {
2779 // The requested key falls between highest key in previous block and
2780 // lowest key in current block.
2781 break;
2782 }
2783
2784 next_biter.Invalidate(Status::OK());
2785 NewDataBlockIterator<DataBlockIter>(
2786 read_options, iiter->value().handle, &next_biter,
2787 BlockType::kData, get_context, &lookup_data_block_context,
2788 Status(), nullptr);
2789 biter = &next_biter;
2790 reusing_block = false;
2791 }
2792
2793 if (read_options.read_tier == kBlockCacheTier &&
2794 biter->status().IsIncomplete()) {
2795 // couldn't get block from block_cache
2796 // Update Saver.state to Found because we are only looking for
2797 // whether we can guarantee the key is not there when "no_io" is set
2798 get_context->MarkKeyMayExist();
2799 break;
2800 }
2801 if (!biter->status().ok()) {
2802 s = biter->status();
2803 break;
2804 }
2805
2806 bool may_exist = biter->SeekForGet(key);
2807 if (!may_exist) {
2808 // HashSeek cannot find the key this block and the the iter is not
2809 // the end of the block, i.e. cannot be in the following blocks
2810 // either. In this case, the seek_key cannot be found, so we break
2811 // from the top level for-loop.
2812 break;
2813 }
2814
2815 // Call the *saver function on each entry/block until it returns false
2816 for (; biter->Valid(); biter->Next()) {
2817 ParsedInternalKey parsed_key;
2818 Cleanable dummy;
2819 Cleanable* value_pinner = nullptr;
2820 Status pik_status = ParseInternalKey(
2821 biter->key(), &parsed_key, false /* log_err_key */); // TODO
2822 if (!pik_status.ok()) {
2823 s = pik_status;
2824 }
2825 if (biter->IsValuePinned()) {
2826 if (reusing_block) {
2827 Cache* block_cache = rep_->table_options.block_cache.get();
2828 assert(biter->cache_handle() != nullptr);
2829 block_cache->Ref(biter->cache_handle());
2830 dummy.RegisterCleanup(&ReleaseCachedEntry, block_cache,
2831 biter->cache_handle());
2832 value_pinner = &dummy;
2833 } else {
2834 value_pinner = biter;
2835 }
2836 }
2837 if (!get_context->SaveValue(parsed_key, biter->value(), &matched,
2838 value_pinner)) {
2839 if (get_context->State() == GetContext::GetState::kFound) {
2840 does_referenced_key_exist = true;
2841 referenced_data_size =
2842 biter->key().size() + biter->value().size();
2843 }
2844 done = true;
2845 break;
2846 }
2847 s = biter->status();
2848 }
2849 // Write the block cache access.
2850 if (block_cache_tracer_ && block_cache_tracer_->is_tracing_enabled()) {
2851 // Avoid making copy of block_key, cf_name, and referenced_key when
2852 // constructing the access record.
2853 Slice referenced_key;
2854 if (does_referenced_key_exist) {
2855 referenced_key = biter->key();
2856 } else {
2857 referenced_key = key;
2858 }
2859 BlockCacheTraceRecord access_record(
2860 rep_->ioptions.clock->NowMicros(),
2861 /*block_key=*/"", lookup_data_block_context.block_type,
2862 lookup_data_block_context.block_size, rep_->cf_id_for_tracing(),
2863 /*cf_name=*/"", rep_->level_for_tracing(),
2864 rep_->sst_number_for_tracing(), lookup_data_block_context.caller,
2865 lookup_data_block_context.is_cache_hit,
2866 lookup_data_block_context.no_insert,
2867 lookup_data_block_context.get_id,
2868 lookup_data_block_context.get_from_user_specified_snapshot,
2869 /*referenced_key=*/"", referenced_data_size,
2870 lookup_data_block_context.num_keys_in_block,
2871 does_referenced_key_exist);
2872 // TODO: Should handle status here?
2873 block_cache_tracer_
2874 ->WriteBlockAccess(access_record,
2875 lookup_data_block_context.block_key,
2876 rep_->cf_name_for_tracing(), referenced_key)
2877 .PermitUncheckedError();
2878 }
2879 s = biter->status();
2880 if (done) {
2881 // Avoid the extra Next which is expensive in two-level indexes
2882 break;
2883 }
2884 if (first_block) {
2885 iiter->Seek(key);
2886 }
2887 first_block = false;
2888 iiter->Next();
2889 } while (iiter->Valid());
2890
2891 if (matched && filter != nullptr && !filter->IsBlockBased()) {
2892 RecordTick(rep_->ioptions.stats, BLOOM_FILTER_FULL_TRUE_POSITIVE);
2893 PERF_COUNTER_BY_LEVEL_ADD(bloom_filter_full_true_positive, 1,
2894 rep_->level);
2895 }
2896 if (s.ok() && !iiter->status().IsNotFound()) {
2897 s = iiter->status();
2898 }
2899 *(miter->s) = s;
2900 }
2901 #ifdef ROCKSDB_ASSERT_STATUS_CHECKED
2902 // Not sure why we need to do it. Should investigate more.
2903 for (auto& st : statuses) {
2904 st.PermitUncheckedError();
2905 }
2906 #endif // ROCKSDB_ASSERT_STATUS_CHECKED
2907 }
2908 }
2909
Prefetch(const Slice * const begin,const Slice * const end)2910 Status BlockBasedTable::Prefetch(const Slice* const begin,
2911 const Slice* const end) {
2912 auto& comparator = rep_->internal_comparator;
2913 UserComparatorWrapper user_comparator(comparator.user_comparator());
2914 // pre-condition
2915 if (begin && end && comparator.Compare(*begin, *end) > 0) {
2916 return Status::InvalidArgument(*begin, *end);
2917 }
2918 BlockCacheLookupContext lookup_context{TableReaderCaller::kPrefetch};
2919 IndexBlockIter iiter_on_stack;
2920 auto iiter = NewIndexIterator(ReadOptions(), /*need_upper_bound_check=*/false,
2921 &iiter_on_stack, /*get_context=*/nullptr,
2922 &lookup_context);
2923 std::unique_ptr<InternalIteratorBase<IndexValue>> iiter_unique_ptr;
2924 if (iiter != &iiter_on_stack) {
2925 iiter_unique_ptr = std::unique_ptr<InternalIteratorBase<IndexValue>>(iiter);
2926 }
2927
2928 if (!iiter->status().ok()) {
2929 // error opening index iterator
2930 return iiter->status();
2931 }
2932
2933 // indicates if we are on the last page that need to be pre-fetched
2934 bool prefetching_boundary_page = false;
2935
2936 for (begin ? iiter->Seek(*begin) : iiter->SeekToFirst(); iiter->Valid();
2937 iiter->Next()) {
2938 BlockHandle block_handle = iiter->value().handle;
2939 const bool is_user_key = !rep_->index_key_includes_seq;
2940 if (end &&
2941 ((!is_user_key && comparator.Compare(iiter->key(), *end) >= 0) ||
2942 (is_user_key &&
2943 user_comparator.Compare(iiter->key(), ExtractUserKey(*end)) >= 0))) {
2944 if (prefetching_boundary_page) {
2945 break;
2946 }
2947
2948 // The index entry represents the last key in the data block.
2949 // We should load this page into memory as well, but no more
2950 prefetching_boundary_page = true;
2951 }
2952
2953 // Load the block specified by the block_handle into the block cache
2954 DataBlockIter biter;
2955
2956 NewDataBlockIterator<DataBlockIter>(
2957 ReadOptions(), block_handle, &biter, /*type=*/BlockType::kData,
2958 /*get_context=*/nullptr, &lookup_context, Status(),
2959 /*prefetch_buffer=*/nullptr);
2960
2961 if (!biter.status().ok()) {
2962 // there was an unexpected error while pre-fetching
2963 return biter.status();
2964 }
2965 }
2966
2967 return Status::OK();
2968 }
2969
VerifyChecksum(const ReadOptions & read_options,TableReaderCaller caller)2970 Status BlockBasedTable::VerifyChecksum(const ReadOptions& read_options,
2971 TableReaderCaller caller) {
2972 Status s;
2973 // Check Meta blocks
2974 std::unique_ptr<Block> metaindex;
2975 std::unique_ptr<InternalIterator> metaindex_iter;
2976 ReadOptions ro;
2977 s = ReadMetaIndexBlock(ro, nullptr /* prefetch buffer */, &metaindex,
2978 &metaindex_iter);
2979 if (s.ok()) {
2980 s = VerifyChecksumInMetaBlocks(metaindex_iter.get());
2981 if (!s.ok()) {
2982 return s;
2983 }
2984 } else {
2985 return s;
2986 }
2987 // Check Data blocks
2988 IndexBlockIter iiter_on_stack;
2989 BlockCacheLookupContext context{caller};
2990 InternalIteratorBase<IndexValue>* iiter = NewIndexIterator(
2991 read_options, /*disable_prefix_seek=*/false, &iiter_on_stack,
2992 /*get_context=*/nullptr, &context);
2993 std::unique_ptr<InternalIteratorBase<IndexValue>> iiter_unique_ptr;
2994 if (iiter != &iiter_on_stack) {
2995 iiter_unique_ptr = std::unique_ptr<InternalIteratorBase<IndexValue>>(iiter);
2996 }
2997 if (!iiter->status().ok()) {
2998 // error opening index iterator
2999 return iiter->status();
3000 }
3001 s = VerifyChecksumInBlocks(read_options, iiter);
3002 return s;
3003 }
3004
VerifyChecksumInBlocks(const ReadOptions & read_options,InternalIteratorBase<IndexValue> * index_iter)3005 Status BlockBasedTable::VerifyChecksumInBlocks(
3006 const ReadOptions& read_options,
3007 InternalIteratorBase<IndexValue>* index_iter) {
3008 Status s;
3009 // We are scanning the whole file, so no need to do exponential
3010 // increasing of the buffer size.
3011 size_t readahead_size = (read_options.readahead_size != 0)
3012 ? read_options.readahead_size
3013 : rep_->table_options.max_auto_readahead_size;
3014 // FilePrefetchBuffer doesn't work in mmap mode and readahead is not
3015 // needed there.
3016 FilePrefetchBuffer prefetch_buffer(
3017 rep_->file.get(), readahead_size /* readahead_size */,
3018 readahead_size /* max_readahead_size */,
3019 !rep_->ioptions.allow_mmap_reads /* enable */);
3020
3021 for (index_iter->SeekToFirst(); index_iter->Valid(); index_iter->Next()) {
3022 s = index_iter->status();
3023 if (!s.ok()) {
3024 break;
3025 }
3026 BlockHandle handle = index_iter->value().handle;
3027 BlockContents contents;
3028 BlockFetcher block_fetcher(
3029 rep_->file.get(), &prefetch_buffer, rep_->footer, ReadOptions(), handle,
3030 &contents, rep_->ioptions, false /* decompress */,
3031 false /*maybe_compressed*/, BlockType::kData,
3032 UncompressionDict::GetEmptyDict(), rep_->persistent_cache_options);
3033 s = block_fetcher.ReadBlockContents();
3034 if (!s.ok()) {
3035 break;
3036 }
3037 }
3038 if (s.ok()) {
3039 // In the case of two level indexes, we would have exited the above loop
3040 // by checking index_iter->Valid(), but Valid() might have returned false
3041 // due to an IO error. So check the index_iter status
3042 s = index_iter->status();
3043 }
3044 return s;
3045 }
3046
GetBlockTypeForMetaBlockByName(const Slice & meta_block_name)3047 BlockType BlockBasedTable::GetBlockTypeForMetaBlockByName(
3048 const Slice& meta_block_name) {
3049 if (meta_block_name.starts_with(kFilterBlockPrefix) ||
3050 meta_block_name.starts_with(kFullFilterBlockPrefix) ||
3051 meta_block_name.starts_with(kPartitionedFilterBlockPrefix)) {
3052 return BlockType::kFilter;
3053 }
3054
3055 if (meta_block_name == kPropertiesBlock) {
3056 return BlockType::kProperties;
3057 }
3058
3059 if (meta_block_name == kCompressionDictBlock) {
3060 return BlockType::kCompressionDictionary;
3061 }
3062
3063 if (meta_block_name == kRangeDelBlock) {
3064 return BlockType::kRangeDeletion;
3065 }
3066
3067 if (meta_block_name == kHashIndexPrefixesBlock) {
3068 return BlockType::kHashIndexPrefixes;
3069 }
3070
3071 if (meta_block_name == kHashIndexPrefixesMetadataBlock) {
3072 return BlockType::kHashIndexMetadata;
3073 }
3074
3075 assert(false);
3076 return BlockType::kInvalid;
3077 }
3078
VerifyChecksumInMetaBlocks(InternalIteratorBase<Slice> * index_iter)3079 Status BlockBasedTable::VerifyChecksumInMetaBlocks(
3080 InternalIteratorBase<Slice>* index_iter) {
3081 Status s;
3082 for (index_iter->SeekToFirst(); index_iter->Valid(); index_iter->Next()) {
3083 s = index_iter->status();
3084 if (!s.ok()) {
3085 break;
3086 }
3087 BlockHandle handle;
3088 Slice input = index_iter->value();
3089 s = handle.DecodeFrom(&input);
3090 BlockContents contents;
3091 const Slice meta_block_name = index_iter->key();
3092 BlockFetcher block_fetcher(
3093 rep_->file.get(), nullptr /* prefetch buffer */, rep_->footer,
3094 ReadOptions(), handle, &contents, rep_->ioptions,
3095 false /* decompress */, false /*maybe_compressed*/,
3096 GetBlockTypeForMetaBlockByName(meta_block_name),
3097 UncompressionDict::GetEmptyDict(), rep_->persistent_cache_options);
3098 s = block_fetcher.ReadBlockContents();
3099 if (s.IsCorruption() && meta_block_name == kPropertiesBlock) {
3100 TableProperties* table_properties;
3101 ReadOptions ro;
3102 s = TryReadPropertiesWithGlobalSeqno(ro, nullptr /* prefetch_buffer */,
3103 index_iter->value(),
3104 &table_properties);
3105 delete table_properties;
3106 }
3107 if (!s.ok()) {
3108 break;
3109 }
3110 }
3111 return s;
3112 }
3113
TEST_BlockInCache(const BlockHandle & handle) const3114 bool BlockBasedTable::TEST_BlockInCache(const BlockHandle& handle) const {
3115 assert(rep_ != nullptr);
3116
3117 Cache* const cache = rep_->table_options.block_cache.get();
3118 if (cache == nullptr) {
3119 return false;
3120 }
3121
3122 char cache_key_storage[kMaxCacheKeyPrefixSize + kMaxVarint64Length];
3123 Slice cache_key =
3124 GetCacheKey(rep_->cache_key_prefix, rep_->cache_key_prefix_size, handle,
3125 cache_key_storage);
3126
3127 Cache::Handle* const cache_handle = cache->Lookup(cache_key);
3128 if (cache_handle == nullptr) {
3129 return false;
3130 }
3131
3132 cache->Release(cache_handle);
3133
3134 return true;
3135 }
3136
TEST_KeyInCache(const ReadOptions & options,const Slice & key)3137 bool BlockBasedTable::TEST_KeyInCache(const ReadOptions& options,
3138 const Slice& key) {
3139 std::unique_ptr<InternalIteratorBase<IndexValue>> iiter(NewIndexIterator(
3140 options, /*need_upper_bound_check=*/false, /*input_iter=*/nullptr,
3141 /*get_context=*/nullptr, /*lookup_context=*/nullptr));
3142 iiter->Seek(key);
3143 assert(iiter->Valid());
3144
3145 return TEST_BlockInCache(iiter->value().handle);
3146 }
3147
3148 // REQUIRES: The following fields of rep_ should have already been populated:
3149 // 1. file
3150 // 2. index_handle,
3151 // 3. options
3152 // 4. internal_comparator
3153 // 5. index_type
CreateIndexReader(const ReadOptions & ro,FilePrefetchBuffer * prefetch_buffer,InternalIterator * preloaded_meta_index_iter,bool use_cache,bool prefetch,bool pin,BlockCacheLookupContext * lookup_context,std::unique_ptr<IndexReader> * index_reader)3154 Status BlockBasedTable::CreateIndexReader(
3155 const ReadOptions& ro, FilePrefetchBuffer* prefetch_buffer,
3156 InternalIterator* preloaded_meta_index_iter, bool use_cache, bool prefetch,
3157 bool pin, BlockCacheLookupContext* lookup_context,
3158 std::unique_ptr<IndexReader>* index_reader) {
3159 // kHashSearch requires non-empty prefix_extractor but bypass checking
3160 // prefix_extractor here since we have no access to MutableCFOptions.
3161 // Add need_upper_bound_check flag in BlockBasedTable::NewIndexIterator.
3162 // If prefix_extractor does not match prefix_extractor_name from table
3163 // properties, turn off Hash Index by setting total_order_seek to true
3164
3165 switch (rep_->index_type) {
3166 case BlockBasedTableOptions::kTwoLevelIndexSearch: {
3167 return PartitionIndexReader::Create(this, ro, prefetch_buffer, use_cache,
3168 prefetch, pin, lookup_context,
3169 index_reader);
3170 }
3171 case BlockBasedTableOptions::kBinarySearch:
3172 FALLTHROUGH_INTENDED;
3173 case BlockBasedTableOptions::kBinarySearchWithFirstKey: {
3174 return BinarySearchIndexReader::Create(this, ro, prefetch_buffer,
3175 use_cache, prefetch, pin,
3176 lookup_context, index_reader);
3177 }
3178 case BlockBasedTableOptions::kHashSearch: {
3179 std::unique_ptr<Block> metaindex_guard;
3180 std::unique_ptr<InternalIterator> metaindex_iter_guard;
3181 auto meta_index_iter = preloaded_meta_index_iter;
3182 bool should_fallback = false;
3183 if (rep_->internal_prefix_transform.get() == nullptr) {
3184 ROCKS_LOG_WARN(rep_->ioptions.logger,
3185 "No prefix extractor passed in. Fall back to binary"
3186 " search index.");
3187 should_fallback = true;
3188 } else if (meta_index_iter == nullptr) {
3189 auto s = ReadMetaIndexBlock(ro, prefetch_buffer, &metaindex_guard,
3190 &metaindex_iter_guard);
3191 if (!s.ok()) {
3192 // we simply fall back to binary search in case there is any
3193 // problem with prefix hash index loading.
3194 ROCKS_LOG_WARN(rep_->ioptions.logger,
3195 "Unable to read the metaindex block."
3196 " Fall back to binary search index.");
3197 should_fallback = true;
3198 }
3199 meta_index_iter = metaindex_iter_guard.get();
3200 }
3201
3202 if (should_fallback) {
3203 return BinarySearchIndexReader::Create(this, ro, prefetch_buffer,
3204 use_cache, prefetch, pin,
3205 lookup_context, index_reader);
3206 } else {
3207 return HashIndexReader::Create(this, ro, prefetch_buffer,
3208 meta_index_iter, use_cache, prefetch,
3209 pin, lookup_context, index_reader);
3210 }
3211 }
3212 default: {
3213 std::string error_message =
3214 "Unrecognized index type: " + ToString(rep_->index_type);
3215 return Status::InvalidArgument(error_message.c_str());
3216 }
3217 }
3218 }
3219
ApproximateDataOffsetOf(const InternalIteratorBase<IndexValue> & index_iter,uint64_t data_size) const3220 uint64_t BlockBasedTable::ApproximateDataOffsetOf(
3221 const InternalIteratorBase<IndexValue>& index_iter,
3222 uint64_t data_size) const {
3223 if (index_iter.Valid()) {
3224 BlockHandle handle = index_iter.value().handle;
3225 return handle.offset();
3226 } else {
3227 // The iterator is past the last key in the file.
3228 return data_size;
3229 }
3230 }
3231
GetApproximateDataSize()3232 uint64_t BlockBasedTable::GetApproximateDataSize() {
3233 // Should be in table properties unless super old version
3234 if (rep_->table_properties) {
3235 return rep_->table_properties->data_size;
3236 }
3237 // Fall back to rough estimate from footer
3238 return rep_->footer.metaindex_handle().offset();
3239 }
3240
ApproximateOffsetOf(const Slice & key,TableReaderCaller caller)3241 uint64_t BlockBasedTable::ApproximateOffsetOf(const Slice& key,
3242 TableReaderCaller caller) {
3243 uint64_t data_size = GetApproximateDataSize();
3244 if (UNLIKELY(data_size == 0)) {
3245 // Hmm. Let's just split in half to avoid skewing one way or another,
3246 // since we don't know whether we're operating on lower bound or
3247 // upper bound.
3248 return rep_->file_size / 2;
3249 }
3250
3251 BlockCacheLookupContext context(caller);
3252 IndexBlockIter iiter_on_stack;
3253 ReadOptions ro;
3254 ro.total_order_seek = true;
3255 auto index_iter =
3256 NewIndexIterator(ro, /*disable_prefix_seek=*/true,
3257 /*input_iter=*/&iiter_on_stack, /*get_context=*/nullptr,
3258 /*lookup_context=*/&context);
3259 std::unique_ptr<InternalIteratorBase<IndexValue>> iiter_unique_ptr;
3260 if (index_iter != &iiter_on_stack) {
3261 iiter_unique_ptr.reset(index_iter);
3262 }
3263
3264 index_iter->Seek(key);
3265
3266 uint64_t offset = ApproximateDataOffsetOf(*index_iter, data_size);
3267 // Pro-rate file metadata (incl filters) size-proportionally across data
3268 // blocks.
3269 double size_ratio =
3270 static_cast<double>(offset) / static_cast<double>(data_size);
3271 return static_cast<uint64_t>(size_ratio *
3272 static_cast<double>(rep_->file_size));
3273 }
3274
ApproximateSize(const Slice & start,const Slice & end,TableReaderCaller caller)3275 uint64_t BlockBasedTable::ApproximateSize(const Slice& start, const Slice& end,
3276 TableReaderCaller caller) {
3277 assert(rep_->internal_comparator.Compare(start, end) <= 0);
3278
3279 uint64_t data_size = GetApproximateDataSize();
3280 if (UNLIKELY(data_size == 0)) {
3281 // Hmm. Assume whole file is involved, since we have lower and upper
3282 // bound.
3283 return rep_->file_size;
3284 }
3285
3286 BlockCacheLookupContext context(caller);
3287 IndexBlockIter iiter_on_stack;
3288 ReadOptions ro;
3289 ro.total_order_seek = true;
3290 auto index_iter =
3291 NewIndexIterator(ro, /*disable_prefix_seek=*/true,
3292 /*input_iter=*/&iiter_on_stack, /*get_context=*/nullptr,
3293 /*lookup_context=*/&context);
3294 std::unique_ptr<InternalIteratorBase<IndexValue>> iiter_unique_ptr;
3295 if (index_iter != &iiter_on_stack) {
3296 iiter_unique_ptr.reset(index_iter);
3297 }
3298
3299 index_iter->Seek(start);
3300 uint64_t start_offset = ApproximateDataOffsetOf(*index_iter, data_size);
3301 index_iter->Seek(end);
3302 uint64_t end_offset = ApproximateDataOffsetOf(*index_iter, data_size);
3303
3304 assert(end_offset >= start_offset);
3305 // Pro-rate file metadata (incl filters) size-proportionally across data
3306 // blocks.
3307 double size_ratio = static_cast<double>(end_offset - start_offset) /
3308 static_cast<double>(data_size);
3309 return static_cast<uint64_t>(size_ratio *
3310 static_cast<double>(rep_->file_size));
3311 }
3312
TEST_FilterBlockInCache() const3313 bool BlockBasedTable::TEST_FilterBlockInCache() const {
3314 assert(rep_ != nullptr);
3315 return TEST_BlockInCache(rep_->filter_handle);
3316 }
3317
TEST_IndexBlockInCache() const3318 bool BlockBasedTable::TEST_IndexBlockInCache() const {
3319 assert(rep_ != nullptr);
3320
3321 return TEST_BlockInCache(rep_->footer.index_handle());
3322 }
3323
GetKVPairsFromDataBlocks(std::vector<KVPairBlock> * kv_pair_blocks)3324 Status BlockBasedTable::GetKVPairsFromDataBlocks(
3325 std::vector<KVPairBlock>* kv_pair_blocks) {
3326 std::unique_ptr<InternalIteratorBase<IndexValue>> blockhandles_iter(
3327 NewIndexIterator(ReadOptions(), /*need_upper_bound_check=*/false,
3328 /*input_iter=*/nullptr, /*get_context=*/nullptr,
3329 /*lookup_contex=*/nullptr));
3330
3331 Status s = blockhandles_iter->status();
3332 if (!s.ok()) {
3333 // Cannot read Index Block
3334 return s;
3335 }
3336
3337 for (blockhandles_iter->SeekToFirst(); blockhandles_iter->Valid();
3338 blockhandles_iter->Next()) {
3339 s = blockhandles_iter->status();
3340
3341 if (!s.ok()) {
3342 break;
3343 }
3344
3345 std::unique_ptr<InternalIterator> datablock_iter;
3346 datablock_iter.reset(NewDataBlockIterator<DataBlockIter>(
3347 ReadOptions(), blockhandles_iter->value().handle,
3348 /*input_iter=*/nullptr, /*type=*/BlockType::kData,
3349 /*get_context=*/nullptr, /*lookup_context=*/nullptr, Status(),
3350 /*prefetch_buffer=*/nullptr));
3351 s = datablock_iter->status();
3352
3353 if (!s.ok()) {
3354 // Error reading the block - Skipped
3355 continue;
3356 }
3357
3358 KVPairBlock kv_pair_block;
3359 for (datablock_iter->SeekToFirst(); datablock_iter->Valid();
3360 datablock_iter->Next()) {
3361 s = datablock_iter->status();
3362 if (!s.ok()) {
3363 // Error reading the block - Skipped
3364 break;
3365 }
3366 const Slice& key = datablock_iter->key();
3367 const Slice& value = datablock_iter->value();
3368 std::string key_copy = std::string(key.data(), key.size());
3369 std::string value_copy = std::string(value.data(), value.size());
3370
3371 kv_pair_block.push_back(
3372 std::make_pair(std::move(key_copy), std::move(value_copy)));
3373 }
3374 kv_pair_blocks->push_back(std::move(kv_pair_block));
3375 }
3376 return Status::OK();
3377 }
3378
DumpTable(WritableFile * out_file)3379 Status BlockBasedTable::DumpTable(WritableFile* out_file) {
3380 WritableFileStringStreamAdapter out_file_wrapper(out_file);
3381 std::ostream out_stream(&out_file_wrapper);
3382 // Output Footer
3383 out_stream << "Footer Details:\n"
3384 "--------------------------------------\n";
3385 out_stream << " " << rep_->footer.ToString() << "\n";
3386
3387 // Output MetaIndex
3388 out_stream << "Metaindex Details:\n"
3389 "--------------------------------------\n";
3390 std::unique_ptr<Block> metaindex;
3391 std::unique_ptr<InternalIterator> metaindex_iter;
3392 ReadOptions ro;
3393 Status s = ReadMetaIndexBlock(ro, nullptr /* prefetch_buffer */, &metaindex,
3394 &metaindex_iter);
3395 if (s.ok()) {
3396 for (metaindex_iter->SeekToFirst(); metaindex_iter->Valid();
3397 metaindex_iter->Next()) {
3398 s = metaindex_iter->status();
3399 if (!s.ok()) {
3400 return s;
3401 }
3402 if (metaindex_iter->key() == kPropertiesBlock) {
3403 out_stream << " Properties block handle: "
3404 << metaindex_iter->value().ToString(true) << "\n";
3405 } else if (metaindex_iter->key() == kCompressionDictBlock) {
3406 out_stream << " Compression dictionary block handle: "
3407 << metaindex_iter->value().ToString(true) << "\n";
3408 } else if (strstr(metaindex_iter->key().ToString().c_str(),
3409 "filter.rocksdb.") != nullptr) {
3410 out_stream << " Filter block handle: "
3411 << metaindex_iter->value().ToString(true) << "\n";
3412 } else if (metaindex_iter->key() == kRangeDelBlock) {
3413 out_stream << " Range deletion block handle: "
3414 << metaindex_iter->value().ToString(true) << "\n";
3415 }
3416 }
3417 out_stream << "\n";
3418 } else {
3419 return s;
3420 }
3421
3422 // Output TableProperties
3423 const ROCKSDB_NAMESPACE::TableProperties* table_properties;
3424 table_properties = rep_->table_properties.get();
3425
3426 if (table_properties != nullptr) {
3427 out_stream << "Table Properties:\n"
3428 "--------------------------------------\n";
3429 out_stream << " " << table_properties->ToString("\n ", ": ") << "\n";
3430 }
3431
3432 if (rep_->filter) {
3433 out_stream << "Filter Details:\n"
3434 "--------------------------------------\n";
3435 out_stream << " " << rep_->filter->ToString() << "\n";
3436 }
3437
3438 // Output Index block
3439 s = DumpIndexBlock(out_stream);
3440 if (!s.ok()) {
3441 return s;
3442 }
3443
3444 // Output compression dictionary
3445 if (rep_->uncompression_dict_reader) {
3446 CachableEntry<UncompressionDict> uncompression_dict;
3447 s = rep_->uncompression_dict_reader->GetOrReadUncompressionDictionary(
3448 nullptr /* prefetch_buffer */, false /* no_io */,
3449 nullptr /* get_context */, nullptr /* lookup_context */,
3450 &uncompression_dict);
3451 if (!s.ok()) {
3452 return s;
3453 }
3454
3455 assert(uncompression_dict.GetValue());
3456
3457 const Slice& raw_dict = uncompression_dict.GetValue()->GetRawDict();
3458 out_stream << "Compression Dictionary:\n"
3459 "--------------------------------------\n";
3460 out_stream << " size (bytes): " << raw_dict.size() << "\n\n";
3461 out_stream << " HEX " << raw_dict.ToString(true) << "\n\n";
3462 }
3463
3464 // Output range deletions block
3465 auto* range_del_iter = NewRangeTombstoneIterator(ReadOptions());
3466 if (range_del_iter != nullptr) {
3467 range_del_iter->SeekToFirst();
3468 if (range_del_iter->Valid()) {
3469 out_stream << "Range deletions:\n"
3470 "--------------------------------------\n";
3471 for (; range_del_iter->Valid(); range_del_iter->Next()) {
3472 DumpKeyValue(range_del_iter->key(), range_del_iter->value(),
3473 out_stream);
3474 }
3475 out_stream << "\n";
3476 }
3477 delete range_del_iter;
3478 }
3479 // Output Data blocks
3480 s = DumpDataBlocks(out_stream);
3481
3482 if (!s.ok()) {
3483 return s;
3484 }
3485
3486 if (!out_stream.good()) {
3487 return Status::IOError("Failed to write to output file");
3488 }
3489 return Status::OK();
3490 }
3491
DumpIndexBlock(std::ostream & out_stream)3492 Status BlockBasedTable::DumpIndexBlock(std::ostream& out_stream) {
3493 out_stream << "Index Details:\n"
3494 "--------------------------------------\n";
3495 std::unique_ptr<InternalIteratorBase<IndexValue>> blockhandles_iter(
3496 NewIndexIterator(ReadOptions(), /*need_upper_bound_check=*/false,
3497 /*input_iter=*/nullptr, /*get_context=*/nullptr,
3498 /*lookup_contex=*/nullptr));
3499 Status s = blockhandles_iter->status();
3500 if (!s.ok()) {
3501 out_stream << "Can not read Index Block \n\n";
3502 return s;
3503 }
3504
3505 out_stream << " Block key hex dump: Data block handle\n";
3506 out_stream << " Block key ascii\n\n";
3507 for (blockhandles_iter->SeekToFirst(); blockhandles_iter->Valid();
3508 blockhandles_iter->Next()) {
3509 s = blockhandles_iter->status();
3510 if (!s.ok()) {
3511 break;
3512 }
3513 Slice key = blockhandles_iter->key();
3514 Slice user_key;
3515 InternalKey ikey;
3516 if (!rep_->index_key_includes_seq) {
3517 user_key = key;
3518 } else {
3519 ikey.DecodeFrom(key);
3520 user_key = ikey.user_key();
3521 }
3522
3523 out_stream << " HEX " << user_key.ToString(true) << ": "
3524 << blockhandles_iter->value().ToString(true,
3525 rep_->index_has_first_key)
3526 << "\n";
3527
3528 std::string str_key = user_key.ToString();
3529 std::string res_key("");
3530 char cspace = ' ';
3531 for (size_t i = 0; i < str_key.size(); i++) {
3532 res_key.append(&str_key[i], 1);
3533 res_key.append(1, cspace);
3534 }
3535 out_stream << " ASCII " << res_key << "\n";
3536 out_stream << " ------\n";
3537 }
3538 out_stream << "\n";
3539 return Status::OK();
3540 }
3541
DumpDataBlocks(std::ostream & out_stream)3542 Status BlockBasedTable::DumpDataBlocks(std::ostream& out_stream) {
3543 std::unique_ptr<InternalIteratorBase<IndexValue>> blockhandles_iter(
3544 NewIndexIterator(ReadOptions(), /*need_upper_bound_check=*/false,
3545 /*input_iter=*/nullptr, /*get_context=*/nullptr,
3546 /*lookup_contex=*/nullptr));
3547 Status s = blockhandles_iter->status();
3548 if (!s.ok()) {
3549 out_stream << "Can not read Index Block \n\n";
3550 return s;
3551 }
3552
3553 uint64_t datablock_size_min = std::numeric_limits<uint64_t>::max();
3554 uint64_t datablock_size_max = 0;
3555 uint64_t datablock_size_sum = 0;
3556
3557 size_t block_id = 1;
3558 for (blockhandles_iter->SeekToFirst(); blockhandles_iter->Valid();
3559 block_id++, blockhandles_iter->Next()) {
3560 s = blockhandles_iter->status();
3561 if (!s.ok()) {
3562 break;
3563 }
3564
3565 BlockHandle bh = blockhandles_iter->value().handle;
3566 uint64_t datablock_size = bh.size();
3567 datablock_size_min = std::min(datablock_size_min, datablock_size);
3568 datablock_size_max = std::max(datablock_size_max, datablock_size);
3569 datablock_size_sum += datablock_size;
3570
3571 out_stream << "Data Block # " << block_id << " @ "
3572 << blockhandles_iter->value().handle.ToString(true) << "\n";
3573 out_stream << "--------------------------------------\n";
3574
3575 std::unique_ptr<InternalIterator> datablock_iter;
3576 datablock_iter.reset(NewDataBlockIterator<DataBlockIter>(
3577 ReadOptions(), blockhandles_iter->value().handle,
3578 /*input_iter=*/nullptr, /*type=*/BlockType::kData,
3579 /*get_context=*/nullptr, /*lookup_context=*/nullptr, Status(),
3580 /*prefetch_buffer=*/nullptr));
3581 s = datablock_iter->status();
3582
3583 if (!s.ok()) {
3584 out_stream << "Error reading the block - Skipped \n\n";
3585 continue;
3586 }
3587
3588 for (datablock_iter->SeekToFirst(); datablock_iter->Valid();
3589 datablock_iter->Next()) {
3590 s = datablock_iter->status();
3591 if (!s.ok()) {
3592 out_stream << "Error reading the block - Skipped \n";
3593 break;
3594 }
3595 DumpKeyValue(datablock_iter->key(), datablock_iter->value(), out_stream);
3596 }
3597 out_stream << "\n";
3598 }
3599
3600 uint64_t num_datablocks = block_id - 1;
3601 if (num_datablocks) {
3602 double datablock_size_avg =
3603 static_cast<double>(datablock_size_sum) / num_datablocks;
3604 out_stream << "Data Block Summary:\n";
3605 out_stream << "--------------------------------------\n";
3606 out_stream << " # data blocks: " << num_datablocks << "\n";
3607 out_stream << " min data block size: " << datablock_size_min << "\n";
3608 out_stream << " max data block size: " << datablock_size_max << "\n";
3609 out_stream << " avg data block size: " << ToString(datablock_size_avg)
3610 << "\n";
3611 }
3612
3613 return Status::OK();
3614 }
3615
DumpKeyValue(const Slice & key,const Slice & value,std::ostream & out_stream)3616 void BlockBasedTable::DumpKeyValue(const Slice& key, const Slice& value,
3617 std::ostream& out_stream) {
3618 InternalKey ikey;
3619 ikey.DecodeFrom(key);
3620
3621 out_stream << " HEX " << ikey.user_key().ToString(true) << ": "
3622 << value.ToString(true) << "\n";
3623
3624 std::string str_key = ikey.user_key().ToString();
3625 std::string str_value = value.ToString();
3626 std::string res_key(""), res_value("");
3627 char cspace = ' ';
3628 for (size_t i = 0; i < str_key.size(); i++) {
3629 if (str_key[i] == '\0') {
3630 res_key.append("\\0", 2);
3631 } else {
3632 res_key.append(&str_key[i], 1);
3633 }
3634 res_key.append(1, cspace);
3635 }
3636 for (size_t i = 0; i < str_value.size(); i++) {
3637 if (str_value[i] == '\0') {
3638 res_value.append("\\0", 2);
3639 } else {
3640 res_value.append(&str_value[i], 1);
3641 }
3642 res_value.append(1, cspace);
3643 }
3644
3645 out_stream << " ASCII " << res_key << ": " << res_value << "\n";
3646 out_stream << " ------\n";
3647 }
3648
3649 } // namespace ROCKSDB_NAMESPACE
3650