1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #include "table/block_based/block_based_table_reader.h"
10 
11 #include <algorithm>
12 #include <array>
13 #include <limits>
14 #include <string>
15 #include <utility>
16 #include <vector>
17 
18 #include "cache/cache_entry_roles.h"
19 #include "cache/sharded_cache.h"
20 #include "db/dbformat.h"
21 #include "db/pinned_iterators_manager.h"
22 #include "file/file_prefetch_buffer.h"
23 #include "file/file_util.h"
24 #include "file/random_access_file_reader.h"
25 #include "logging/logging.h"
26 #include "monitoring/perf_context_imp.h"
27 #include "port/lang.h"
28 #include "rocksdb/cache.h"
29 #include "rocksdb/comparator.h"
30 #include "rocksdb/convenience.h"
31 #include "rocksdb/env.h"
32 #include "rocksdb/file_system.h"
33 #include "rocksdb/filter_policy.h"
34 #include "rocksdb/iterator.h"
35 #include "rocksdb/options.h"
36 #include "rocksdb/snapshot.h"
37 #include "rocksdb/statistics.h"
38 #include "rocksdb/system_clock.h"
39 #include "rocksdb/table.h"
40 #include "rocksdb/table_properties.h"
41 #include "rocksdb/trace_record.h"
42 #include "table/block_based/binary_search_index_reader.h"
43 #include "table/block_based/block.h"
44 #include "table/block_based/block_based_filter_block.h"
45 #include "table/block_based/block_based_table_factory.h"
46 #include "table/block_based/block_based_table_iterator.h"
47 #include "table/block_based/block_like_traits.h"
48 #include "table/block_based/block_prefix_index.h"
49 #include "table/block_based/block_type.h"
50 #include "table/block_based/filter_block.h"
51 #include "table/block_based/full_filter_block.h"
52 #include "table/block_based/hash_index_reader.h"
53 #include "table/block_based/partitioned_filter_block.h"
54 #include "table/block_based/partitioned_index_reader.h"
55 #include "table/block_fetcher.h"
56 #include "table/format.h"
57 #include "table/get_context.h"
58 #include "table/internal_iterator.h"
59 #include "table/meta_blocks.h"
60 #include "table/multiget_context.h"
61 #include "table/persistent_cache_helper.h"
62 #include "table/persistent_cache_options.h"
63 #include "table/sst_file_writer_collectors.h"
64 #include "table/two_level_iterator.h"
65 #include "test_util/sync_point.h"
66 #include "util/coding.h"
67 #include "util/crc32c.h"
68 #include "util/stop_watch.h"
69 #include "util/string_util.h"
70 
71 namespace ROCKSDB_NAMESPACE {
72 
73 extern const uint64_t kBlockBasedTableMagicNumber;
74 extern const std::string kHashIndexPrefixesBlock;
75 extern const std::string kHashIndexPrefixesMetadataBlock;
76 
~BlockBasedTable()77 BlockBasedTable::~BlockBasedTable() {
78   delete rep_;
79 }
80 
81 std::atomic<uint64_t> BlockBasedTable::next_cache_key_id_(0);
82 
83 namespace {
84 // Read the block identified by "handle" from "file".
85 // The only relevant option is options.verify_checksums for now.
86 // On failure return non-OK.
87 // On success fill *result and return OK - caller owns *result
88 // @param uncompression_dict Data for presetting the compression library's
89 //    dictionary.
90 template <typename TBlocklike>
ReadBlockFromFile(RandomAccessFileReader * file,FilePrefetchBuffer * prefetch_buffer,const Footer & footer,const ReadOptions & options,const BlockHandle & handle,std::unique_ptr<TBlocklike> * result,const ImmutableOptions & ioptions,bool do_uncompress,bool maybe_compressed,BlockType block_type,const UncompressionDict & uncompression_dict,const PersistentCacheOptions & cache_options,size_t read_amp_bytes_per_bit,MemoryAllocator * memory_allocator,bool for_compaction,bool using_zstd,const FilterPolicy * filter_policy)91 Status ReadBlockFromFile(
92     RandomAccessFileReader* file, FilePrefetchBuffer* prefetch_buffer,
93     const Footer& footer, const ReadOptions& options, const BlockHandle& handle,
94     std::unique_ptr<TBlocklike>* result, const ImmutableOptions& ioptions,
95     bool do_uncompress, bool maybe_compressed, BlockType block_type,
96     const UncompressionDict& uncompression_dict,
97     const PersistentCacheOptions& cache_options, size_t read_amp_bytes_per_bit,
98     MemoryAllocator* memory_allocator, bool for_compaction, bool using_zstd,
99     const FilterPolicy* filter_policy) {
100   assert(result);
101 
102   BlockContents contents;
103   BlockFetcher block_fetcher(
104       file, prefetch_buffer, footer, options, handle, &contents, ioptions,
105       do_uncompress, maybe_compressed, block_type, uncompression_dict,
106       cache_options, memory_allocator, nullptr, for_compaction);
107   Status s = block_fetcher.ReadBlockContents();
108   if (s.ok()) {
109     result->reset(BlocklikeTraits<TBlocklike>::Create(
110         std::move(contents), read_amp_bytes_per_bit, ioptions.stats, using_zstd,
111         filter_policy));
112   }
113 
114   return s;
115 }
116 
117 // Release the cached entry and decrement its ref count.
118 // Do not force erase
ReleaseCachedEntry(void * arg,void * h)119 void ReleaseCachedEntry(void* arg, void* h) {
120   Cache* cache = reinterpret_cast<Cache*>(arg);
121   Cache::Handle* handle = reinterpret_cast<Cache::Handle*>(h);
122   cache->Release(handle, false /* force_erase */);
123 }
124 
125 // For hash based index, return true if prefix_extractor and
126 // prefix_extractor_block mismatch, false otherwise. This flag will be used
127 // as total_order_seek via NewIndexIterator
PrefixExtractorChanged(const TableProperties * table_properties,const SliceTransform * prefix_extractor)128 bool PrefixExtractorChanged(const TableProperties* table_properties,
129                             const SliceTransform* prefix_extractor) {
130   // BlockBasedTableOptions::kHashSearch requires prefix_extractor to be set.
131   // Turn off hash index in prefix_extractor is not set; if  prefix_extractor
132   // is set but prefix_extractor_block is not set, also disable hash index
133   if (prefix_extractor == nullptr || table_properties == nullptr ||
134       table_properties->prefix_extractor_name.empty()) {
135     return true;
136   }
137 
138   // prefix_extractor and prefix_extractor_block are both non-empty
139   if (table_properties->prefix_extractor_name != prefix_extractor->AsString()) {
140     return true;
141   } else {
142     return false;
143   }
144 }
145 
CopyBufferToHeap(MemoryAllocator * allocator,Slice & buf)146 CacheAllocationPtr CopyBufferToHeap(MemoryAllocator* allocator, Slice& buf) {
147   CacheAllocationPtr heap_buf;
148   heap_buf = AllocateBlock(buf.size(), allocator);
149   memcpy(heap_buf.get(), buf.data(), buf.size());
150   return heap_buf;
151 }
152 }  // namespace
153 
UpdateCacheHitMetrics(BlockType block_type,GetContext * get_context,size_t usage) const154 void BlockBasedTable::UpdateCacheHitMetrics(BlockType block_type,
155                                             GetContext* get_context,
156                                             size_t usage) const {
157   Statistics* const statistics = rep_->ioptions.stats;
158 
159   PERF_COUNTER_ADD(block_cache_hit_count, 1);
160   PERF_COUNTER_BY_LEVEL_ADD(block_cache_hit_count, 1,
161                             static_cast<uint32_t>(rep_->level));
162 
163   if (get_context) {
164     ++get_context->get_context_stats_.num_cache_hit;
165     get_context->get_context_stats_.num_cache_bytes_read += usage;
166   } else {
167     RecordTick(statistics, BLOCK_CACHE_HIT);
168     RecordTick(statistics, BLOCK_CACHE_BYTES_READ, usage);
169   }
170 
171   switch (block_type) {
172     case BlockType::kFilter:
173       PERF_COUNTER_ADD(block_cache_filter_hit_count, 1);
174 
175       if (get_context) {
176         ++get_context->get_context_stats_.num_cache_filter_hit;
177       } else {
178         RecordTick(statistics, BLOCK_CACHE_FILTER_HIT);
179       }
180       break;
181 
182     case BlockType::kCompressionDictionary:
183       // TODO: introduce perf counter for compression dictionary hit count
184       if (get_context) {
185         ++get_context->get_context_stats_.num_cache_compression_dict_hit;
186       } else {
187         RecordTick(statistics, BLOCK_CACHE_COMPRESSION_DICT_HIT);
188       }
189       break;
190 
191     case BlockType::kIndex:
192       PERF_COUNTER_ADD(block_cache_index_hit_count, 1);
193 
194       if (get_context) {
195         ++get_context->get_context_stats_.num_cache_index_hit;
196       } else {
197         RecordTick(statistics, BLOCK_CACHE_INDEX_HIT);
198       }
199       break;
200 
201     default:
202       // TODO: introduce dedicated tickers/statistics/counters
203       // for range tombstones
204       if (get_context) {
205         ++get_context->get_context_stats_.num_cache_data_hit;
206       } else {
207         RecordTick(statistics, BLOCK_CACHE_DATA_HIT);
208       }
209       break;
210   }
211 }
212 
UpdateCacheMissMetrics(BlockType block_type,GetContext * get_context) const213 void BlockBasedTable::UpdateCacheMissMetrics(BlockType block_type,
214                                              GetContext* get_context) const {
215   Statistics* const statistics = rep_->ioptions.stats;
216 
217   // TODO: introduce aggregate (not per-level) block cache miss count
218   PERF_COUNTER_BY_LEVEL_ADD(block_cache_miss_count, 1,
219                             static_cast<uint32_t>(rep_->level));
220 
221   if (get_context) {
222     ++get_context->get_context_stats_.num_cache_miss;
223   } else {
224     RecordTick(statistics, BLOCK_CACHE_MISS);
225   }
226 
227   // TODO: introduce perf counters for misses per block type
228   switch (block_type) {
229     case BlockType::kFilter:
230       if (get_context) {
231         ++get_context->get_context_stats_.num_cache_filter_miss;
232       } else {
233         RecordTick(statistics, BLOCK_CACHE_FILTER_MISS);
234       }
235       break;
236 
237     case BlockType::kCompressionDictionary:
238       if (get_context) {
239         ++get_context->get_context_stats_.num_cache_compression_dict_miss;
240       } else {
241         RecordTick(statistics, BLOCK_CACHE_COMPRESSION_DICT_MISS);
242       }
243       break;
244 
245     case BlockType::kIndex:
246       if (get_context) {
247         ++get_context->get_context_stats_.num_cache_index_miss;
248       } else {
249         RecordTick(statistics, BLOCK_CACHE_INDEX_MISS);
250       }
251       break;
252 
253     default:
254       // TODO: introduce dedicated tickers/statistics/counters
255       // for range tombstones
256       if (get_context) {
257         ++get_context->get_context_stats_.num_cache_data_miss;
258       } else {
259         RecordTick(statistics, BLOCK_CACHE_DATA_MISS);
260       }
261       break;
262   }
263 }
264 
UpdateCacheInsertionMetrics(BlockType block_type,GetContext * get_context,size_t usage,bool redundant,Statistics * const statistics)265 void BlockBasedTable::UpdateCacheInsertionMetrics(
266     BlockType block_type, GetContext* get_context, size_t usage, bool redundant,
267     Statistics* const statistics) {
268   // TODO: introduce perf counters for block cache insertions
269   if (get_context) {
270     ++get_context->get_context_stats_.num_cache_add;
271     if (redundant) {
272       ++get_context->get_context_stats_.num_cache_add_redundant;
273     }
274     get_context->get_context_stats_.num_cache_bytes_write += usage;
275   } else {
276     RecordTick(statistics, BLOCK_CACHE_ADD);
277     if (redundant) {
278       RecordTick(statistics, BLOCK_CACHE_ADD_REDUNDANT);
279     }
280     RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE, usage);
281   }
282 
283   switch (block_type) {
284     case BlockType::kFilter:
285       if (get_context) {
286         ++get_context->get_context_stats_.num_cache_filter_add;
287         if (redundant) {
288           ++get_context->get_context_stats_.num_cache_filter_add_redundant;
289         }
290         get_context->get_context_stats_.num_cache_filter_bytes_insert += usage;
291       } else {
292         RecordTick(statistics, BLOCK_CACHE_FILTER_ADD);
293         if (redundant) {
294           RecordTick(statistics, BLOCK_CACHE_FILTER_ADD_REDUNDANT);
295         }
296         RecordTick(statistics, BLOCK_CACHE_FILTER_BYTES_INSERT, usage);
297       }
298       break;
299 
300     case BlockType::kCompressionDictionary:
301       if (get_context) {
302         ++get_context->get_context_stats_.num_cache_compression_dict_add;
303         if (redundant) {
304           ++get_context->get_context_stats_
305                 .num_cache_compression_dict_add_redundant;
306         }
307         get_context->get_context_stats_
308             .num_cache_compression_dict_bytes_insert += usage;
309       } else {
310         RecordTick(statistics, BLOCK_CACHE_COMPRESSION_DICT_ADD);
311         if (redundant) {
312           RecordTick(statistics, BLOCK_CACHE_COMPRESSION_DICT_ADD_REDUNDANT);
313         }
314         RecordTick(statistics, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT,
315                    usage);
316       }
317       break;
318 
319     case BlockType::kIndex:
320       if (get_context) {
321         ++get_context->get_context_stats_.num_cache_index_add;
322         if (redundant) {
323           ++get_context->get_context_stats_.num_cache_index_add_redundant;
324         }
325         get_context->get_context_stats_.num_cache_index_bytes_insert += usage;
326       } else {
327         RecordTick(statistics, BLOCK_CACHE_INDEX_ADD);
328         if (redundant) {
329           RecordTick(statistics, BLOCK_CACHE_INDEX_ADD_REDUNDANT);
330         }
331         RecordTick(statistics, BLOCK_CACHE_INDEX_BYTES_INSERT, usage);
332       }
333       break;
334 
335     default:
336       // TODO: introduce dedicated tickers/statistics/counters
337       // for range tombstones
338       if (get_context) {
339         ++get_context->get_context_stats_.num_cache_data_add;
340         if (redundant) {
341           ++get_context->get_context_stats_.num_cache_data_add_redundant;
342         }
343         get_context->get_context_stats_.num_cache_data_bytes_insert += usage;
344       } else {
345         RecordTick(statistics, BLOCK_CACHE_DATA_ADD);
346         if (redundant) {
347           RecordTick(statistics, BLOCK_CACHE_DATA_ADD_REDUNDANT);
348         }
349         RecordTick(statistics, BLOCK_CACHE_DATA_BYTES_INSERT, usage);
350       }
351       break;
352   }
353 }
354 
GetEntryFromCache(const CacheTier & cache_tier,Cache * block_cache,const Slice & key,BlockType block_type,const bool wait,GetContext * get_context,const Cache::CacheItemHelper * cache_helper,const Cache::CreateCallback & create_cb,Cache::Priority priority) const355 Cache::Handle* BlockBasedTable::GetEntryFromCache(
356     const CacheTier& cache_tier, Cache* block_cache, const Slice& key,
357     BlockType block_type, const bool wait, GetContext* get_context,
358     const Cache::CacheItemHelper* cache_helper,
359     const Cache::CreateCallback& create_cb, Cache::Priority priority) const {
360   Cache::Handle* cache_handle = nullptr;
361   if (cache_tier == CacheTier::kNonVolatileBlockTier) {
362     cache_handle = block_cache->Lookup(key, cache_helper, create_cb, priority,
363                                        wait, rep_->ioptions.statistics.get());
364   } else {
365     cache_handle = block_cache->Lookup(key, rep_->ioptions.statistics.get());
366   }
367 
368   if (cache_handle != nullptr) {
369     UpdateCacheHitMetrics(block_type, get_context,
370                           block_cache->GetUsage(cache_handle));
371   } else {
372     UpdateCacheMissMetrics(block_type, get_context);
373   }
374 
375   return cache_handle;
376 }
377 
378 template <typename TBlocklike>
InsertEntryToCache(const CacheTier & cache_tier,Cache * block_cache,const Slice & key,const Cache::CacheItemHelper * cache_helper,std::unique_ptr<TBlocklike> & block_holder,size_t charge,Cache::Handle ** cache_handle,Cache::Priority priority) const379 Status BlockBasedTable::InsertEntryToCache(
380     const CacheTier& cache_tier, Cache* block_cache, const Slice& key,
381     const Cache::CacheItemHelper* cache_helper,
382     std::unique_ptr<TBlocklike>& block_holder, size_t charge,
383     Cache::Handle** cache_handle, Cache::Priority priority) const {
384   Status s = Status::OK();
385   if (cache_tier == CacheTier::kNonVolatileBlockTier) {
386     s = block_cache->Insert(key, block_holder.get(), cache_helper, charge,
387                             cache_handle, priority);
388   } else {
389     s = block_cache->Insert(key, block_holder.get(), charge,
390                             cache_helper->del_cb, cache_handle, priority);
391   }
392   return s;
393 }
394 
395 // Helper function to setup the cache key's prefix for the Table.
SetupCacheKeyPrefix(Rep * rep,const std::string & db_session_id,uint64_t file_num)396 void BlockBasedTable::SetupCacheKeyPrefix(Rep* rep,
397                                           const std::string& db_session_id,
398                                           uint64_t file_num) {
399   assert(kMaxCacheKeyPrefixSize >= 10);
400   rep->cache_key_prefix_size = 0;
401   rep->compressed_cache_key_prefix_size = 0;
402   if (rep->table_options.block_cache != nullptr) {
403     GenerateCachePrefix<Cache, FSRandomAccessFile>(
404         rep->table_options.block_cache.get(), rep->file->file(),
405         &rep->cache_key_prefix[0], &rep->cache_key_prefix_size, db_session_id,
406         file_num);
407   }
408   if (rep->table_options.block_cache_compressed != nullptr) {
409     GenerateCachePrefix<Cache, FSRandomAccessFile>(
410         rep->table_options.block_cache_compressed.get(), rep->file->file(),
411         &rep->compressed_cache_key_prefix[0],
412         &rep->compressed_cache_key_prefix_size, db_session_id, file_num);
413   }
414   if (rep->table_options.persistent_cache != nullptr) {
415     char persistent_cache_key_prefix[kMaxCacheKeyPrefixSize];
416     size_t persistent_cache_key_prefix_size = 0;
417 
418     GenerateCachePrefix<PersistentCache, FSRandomAccessFile>(
419         rep->table_options.persistent_cache.get(), rep->file->file(),
420         &persistent_cache_key_prefix[0], &persistent_cache_key_prefix_size,
421         db_session_id, file_num);
422 
423     rep->persistent_cache_options =
424         PersistentCacheOptions(rep->table_options.persistent_cache,
425                                std::string(persistent_cache_key_prefix,
426                                            persistent_cache_key_prefix_size),
427                                rep->ioptions.stats);
428   }
429 }
430 
431 namespace {
432 // Return True if table_properties has `user_prop_name` has a `true` value
433 // or it doesn't contain this property (for backward compatible).
IsFeatureSupported(const TableProperties & table_properties,const std::string & user_prop_name,Logger * info_log)434 bool IsFeatureSupported(const TableProperties& table_properties,
435                         const std::string& user_prop_name, Logger* info_log) {
436   auto& props = table_properties.user_collected_properties;
437   auto pos = props.find(user_prop_name);
438   // Older version doesn't have this value set. Skip this check.
439   if (pos != props.end()) {
440     if (pos->second == kPropFalse) {
441       return false;
442     } else if (pos->second != kPropTrue) {
443       ROCKS_LOG_WARN(info_log, "Property %s has invalidate value %s",
444                      user_prop_name.c_str(), pos->second.c_str());
445     }
446   }
447   return true;
448 }
449 
450 // Caller has to ensure seqno is not nullptr.
GetGlobalSequenceNumber(const TableProperties & table_properties,SequenceNumber largest_seqno,SequenceNumber * seqno)451 Status GetGlobalSequenceNumber(const TableProperties& table_properties,
452                                SequenceNumber largest_seqno,
453                                SequenceNumber* seqno) {
454   const auto& props = table_properties.user_collected_properties;
455   const auto version_pos = props.find(ExternalSstFilePropertyNames::kVersion);
456   const auto seqno_pos = props.find(ExternalSstFilePropertyNames::kGlobalSeqno);
457 
458   *seqno = kDisableGlobalSequenceNumber;
459   if (version_pos == props.end()) {
460     if (seqno_pos != props.end()) {
461       std::array<char, 200> msg_buf;
462       // This is not an external sst file, global_seqno is not supported.
463       snprintf(
464           msg_buf.data(), msg_buf.max_size(),
465           "A non-external sst file have global seqno property with value %s",
466           seqno_pos->second.c_str());
467       return Status::Corruption(msg_buf.data());
468     }
469     return Status::OK();
470   }
471 
472   uint32_t version = DecodeFixed32(version_pos->second.c_str());
473   if (version < 2) {
474     if (seqno_pos != props.end() || version != 1) {
475       std::array<char, 200> msg_buf;
476       // This is a v1 external sst file, global_seqno is not supported.
477       snprintf(msg_buf.data(), msg_buf.max_size(),
478                "An external sst file with version %u have global seqno "
479                "property with value %s",
480                version, seqno_pos->second.c_str());
481       return Status::Corruption(msg_buf.data());
482     }
483     return Status::OK();
484   }
485 
486   // Since we have a plan to deprecate global_seqno, we do not return failure
487   // if seqno_pos == props.end(). We rely on version_pos to detect whether the
488   // SST is external.
489   SequenceNumber global_seqno(0);
490   if (seqno_pos != props.end()) {
491     global_seqno = DecodeFixed64(seqno_pos->second.c_str());
492   }
493   // SstTableReader open table reader with kMaxSequenceNumber as largest_seqno
494   // to denote it is unknown.
495   if (largest_seqno < kMaxSequenceNumber) {
496     if (global_seqno == 0) {
497       global_seqno = largest_seqno;
498     }
499     if (global_seqno != largest_seqno) {
500       std::array<char, 200> msg_buf;
501       snprintf(
502           msg_buf.data(), msg_buf.max_size(),
503           "An external sst file with version %u have global seqno property "
504           "with value %s, while largest seqno in the file is %llu",
505           version, seqno_pos->second.c_str(),
506           static_cast<unsigned long long>(largest_seqno));
507       return Status::Corruption(msg_buf.data());
508     }
509   }
510   *seqno = global_seqno;
511 
512   if (global_seqno > kMaxSequenceNumber) {
513     std::array<char, 200> msg_buf;
514     snprintf(msg_buf.data(), msg_buf.max_size(),
515              "An external sst file with version %u have global seqno property "
516              "with value %llu, which is greater than kMaxSequenceNumber",
517              version, static_cast<unsigned long long>(global_seqno));
518     return Status::Corruption(msg_buf.data());
519   }
520 
521   return Status::OK();
522 }
523 }  // namespace
524 
GetCacheKey(const char * cache_key_prefix,size_t cache_key_prefix_size,const BlockHandle & handle,char * cache_key)525 Slice BlockBasedTable::GetCacheKey(const char* cache_key_prefix,
526                                    size_t cache_key_prefix_size,
527                                    const BlockHandle& handle, char* cache_key) {
528   assert(cache_key != nullptr);
529   assert(cache_key_prefix_size != 0);
530   assert(cache_key_prefix_size <= kMaxCacheKeyPrefixSize);
531   memcpy(cache_key, cache_key_prefix, cache_key_prefix_size);
532   char* end =
533       EncodeVarint64(cache_key + cache_key_prefix_size, handle.offset());
534   return Slice(cache_key, static_cast<size_t>(end - cache_key));
535 }
536 
Open(const ReadOptions & read_options,const ImmutableOptions & ioptions,const EnvOptions & env_options,const BlockBasedTableOptions & table_options,const InternalKeyComparator & internal_comparator,std::unique_ptr<RandomAccessFileReader> && file,uint64_t file_size,std::unique_ptr<TableReader> * table_reader,const SliceTransform * prefix_extractor,const bool prefetch_index_and_filter_in_cache,const bool skip_filters,const int level,const bool immortal_table,const SequenceNumber largest_seqno,const bool force_direct_prefetch,TailPrefetchStats * tail_prefetch_stats,BlockCacheTracer * const block_cache_tracer,size_t max_file_size_for_l0_meta_pin,const std::string & cur_db_session_id,uint64_t cur_file_num)537 Status BlockBasedTable::Open(
538     const ReadOptions& read_options, const ImmutableOptions& ioptions,
539     const EnvOptions& env_options, const BlockBasedTableOptions& table_options,
540     const InternalKeyComparator& internal_comparator,
541     std::unique_ptr<RandomAccessFileReader>&& file, uint64_t file_size,
542     std::unique_ptr<TableReader>* table_reader,
543     const SliceTransform* prefix_extractor,
544     const bool prefetch_index_and_filter_in_cache, const bool skip_filters,
545     const int level, const bool immortal_table,
546     const SequenceNumber largest_seqno, const bool force_direct_prefetch,
547     TailPrefetchStats* tail_prefetch_stats,
548     BlockCacheTracer* const block_cache_tracer,
549     size_t max_file_size_for_l0_meta_pin, const std::string& cur_db_session_id,
550     uint64_t cur_file_num) {
551   table_reader->reset();
552 
553   Status s;
554   Footer footer;
555   std::unique_ptr<FilePrefetchBuffer> prefetch_buffer;
556 
557   // Only retain read_options.deadline and read_options.io_timeout.
558   // In future, we may retain more
559   // options. Specifically, w ignore verify_checksums and default to
560   // checksum verification anyway when creating the index and filter
561   // readers.
562   ReadOptions ro;
563   ro.deadline = read_options.deadline;
564   ro.io_timeout = read_options.io_timeout;
565 
566   // prefetch both index and filters, down to all partitions
567   const bool prefetch_all = prefetch_index_and_filter_in_cache || level == 0;
568   const bool preload_all = !table_options.cache_index_and_filter_blocks;
569 
570   if (!ioptions.allow_mmap_reads) {
571     s = PrefetchTail(ro, file.get(), file_size, force_direct_prefetch,
572                      tail_prefetch_stats, prefetch_all, preload_all,
573                      &prefetch_buffer);
574     // Return error in prefetch path to users.
575     if (!s.ok()) {
576       return s;
577     }
578   } else {
579     // Should not prefetch for mmap mode.
580     prefetch_buffer.reset(new FilePrefetchBuffer(
581         nullptr, 0, 0, false /* enable */, true /* track_min_offset */));
582   }
583 
584   // Read in the following order:
585   //    1. Footer
586   //    2. [metaindex block]
587   //    3. [meta block: properties]
588   //    4. [meta block: range deletion tombstone]
589   //    5. [meta block: compression dictionary]
590   //    6. [meta block: index]
591   //    7. [meta block: filter]
592   IOOptions opts;
593   s = file->PrepareIOOptions(ro, opts);
594   if (s.ok()) {
595     s = ReadFooterFromFile(opts, file.get(), prefetch_buffer.get(), file_size,
596                            &footer, kBlockBasedTableMagicNumber);
597   }
598   if (!s.ok()) {
599     return s;
600   }
601   if (!BlockBasedTableSupportedVersion(footer.version())) {
602     return Status::Corruption(
603         "Unknown Footer version. Maybe this file was created with newer "
604         "version of RocksDB?");
605   }
606 
607   // We've successfully read the footer. We are ready to serve requests.
608   // Better not mutate rep_ after the creation. eg. internal_prefix_transform
609   // raw pointer will be used to create HashIndexReader, whose reset may
610   // access a dangling pointer.
611   BlockCacheLookupContext lookup_context{TableReaderCaller::kPrefetch};
612   Rep* rep = new BlockBasedTable::Rep(ioptions, env_options, table_options,
613                                       internal_comparator, skip_filters,
614                                       file_size, level, immortal_table);
615   rep->file = std::move(file);
616   rep->footer = footer;
617   rep->hash_index_allow_collision = table_options.hash_index_allow_collision;
618   // We need to wrap data with internal_prefix_transform to make sure it can
619   // handle prefix correctly.
620   if (prefix_extractor != nullptr) {
621     rep->internal_prefix_transform.reset(
622         new InternalKeySliceTransform(prefix_extractor));
623   }
624 
625   // For fully portable/stable cache keys, we need to read the properties
626   // block before setting up cache keys. TODO: consider setting up a bootstrap
627   // cache key for PersistentCache to use for metaindex and properties blocks.
628   rep->persistent_cache_options = PersistentCacheOptions();
629 
630   // Meta-blocks are not dictionary compressed. Explicitly set the dictionary
631   // handle to null, otherwise it may be seen as uninitialized during the below
632   // meta-block reads.
633   rep->compression_dict_handle = BlockHandle::NullBlockHandle();
634 
635   // Read metaindex
636   std::unique_ptr<BlockBasedTable> new_table(
637       new BlockBasedTable(rep, block_cache_tracer));
638   std::unique_ptr<Block> metaindex;
639   std::unique_ptr<InternalIterator> metaindex_iter;
640   s = new_table->ReadMetaIndexBlock(ro, prefetch_buffer.get(), &metaindex,
641                                     &metaindex_iter);
642   if (!s.ok()) {
643     return s;
644   }
645 
646   // Populates table_properties and some fields that depend on it,
647   // such as index_type.
648   s = new_table->ReadPropertiesBlock(ro, prefetch_buffer.get(),
649                                      metaindex_iter.get(), largest_seqno);
650   if (!s.ok()) {
651     return s;
652   }
653 
654   // With properties loaded, we can set up portable/stable cache keys if
655   // necessary info is available
656   std::string db_session_id;
657   uint64_t file_num;
658   if (rep->table_properties && !rep->table_properties->db_session_id.empty() &&
659       rep->table_properties->orig_file_number > 0) {
660     // We must have both properties to get a stable unique id because
661     // CreateColumnFamilyWithImport or IngestExternalFiles can change the
662     // file numbers on a file.
663     db_session_id = rep->table_properties->db_session_id;
664     file_num = rep->table_properties->orig_file_number;
665   } else {
666     // We have to use transient (but unique) cache keys based on current
667     // identifiers.
668     db_session_id = cur_db_session_id;
669     file_num = cur_file_num;
670   }
671   SetupCacheKeyPrefix(rep, db_session_id, file_num);
672 
673   s = new_table->ReadRangeDelBlock(ro, prefetch_buffer.get(),
674                                    metaindex_iter.get(), internal_comparator,
675                                    &lookup_context);
676   if (!s.ok()) {
677     return s;
678   }
679   s = new_table->PrefetchIndexAndFilterBlocks(
680       ro, prefetch_buffer.get(), metaindex_iter.get(), new_table.get(),
681       prefetch_all, table_options, level, file_size,
682       max_file_size_for_l0_meta_pin, &lookup_context);
683 
684   if (s.ok()) {
685     // Update tail prefetch stats
686     assert(prefetch_buffer.get() != nullptr);
687     if (tail_prefetch_stats != nullptr) {
688       assert(prefetch_buffer->min_offset_read() < file_size);
689       tail_prefetch_stats->RecordEffectiveSize(
690           static_cast<size_t>(file_size) - prefetch_buffer->min_offset_read());
691     }
692 
693     *table_reader = std::move(new_table);
694   }
695 
696   return s;
697 }
698 
PrefetchTail(const ReadOptions & ro,RandomAccessFileReader * file,uint64_t file_size,bool force_direct_prefetch,TailPrefetchStats * tail_prefetch_stats,const bool prefetch_all,const bool preload_all,std::unique_ptr<FilePrefetchBuffer> * prefetch_buffer)699 Status BlockBasedTable::PrefetchTail(
700     const ReadOptions& ro, RandomAccessFileReader* file, uint64_t file_size,
701     bool force_direct_prefetch, TailPrefetchStats* tail_prefetch_stats,
702     const bool prefetch_all, const bool preload_all,
703     std::unique_ptr<FilePrefetchBuffer>* prefetch_buffer) {
704   size_t tail_prefetch_size = 0;
705   if (tail_prefetch_stats != nullptr) {
706     // Multiple threads may get a 0 (no history) when running in parallel,
707     // but it will get cleared after the first of them finishes.
708     tail_prefetch_size = tail_prefetch_stats->GetSuggestedPrefetchSize();
709   }
710   if (tail_prefetch_size == 0) {
711     // Before read footer, readahead backwards to prefetch data. Do more
712     // readahead if we're going to read index/filter.
713     // TODO: This may incorrectly select small readahead in case partitioned
714     // index/filter is enabled and top-level partition pinning is enabled.
715     // That's because we need to issue readahead before we read the properties,
716     // at which point we don't yet know the index type.
717     tail_prefetch_size = prefetch_all || preload_all ? 512 * 1024 : 4 * 1024;
718   }
719   size_t prefetch_off;
720   size_t prefetch_len;
721   if (file_size < tail_prefetch_size) {
722     prefetch_off = 0;
723     prefetch_len = static_cast<size_t>(file_size);
724   } else {
725     prefetch_off = static_cast<size_t>(file_size - tail_prefetch_size);
726     prefetch_len = tail_prefetch_size;
727   }
728   TEST_SYNC_POINT_CALLBACK("BlockBasedTable::Open::TailPrefetchLen",
729                            &tail_prefetch_size);
730 
731   // Try file system prefetch
732   if (!file->use_direct_io() && !force_direct_prefetch) {
733     if (!file->Prefetch(prefetch_off, prefetch_len).IsNotSupported()) {
734       prefetch_buffer->reset(
735           new FilePrefetchBuffer(nullptr, 0, 0, false, true));
736       return Status::OK();
737     }
738   }
739 
740   // Use `FilePrefetchBuffer`
741   prefetch_buffer->reset(new FilePrefetchBuffer(nullptr, 0, 0, true, true));
742   IOOptions opts;
743   Status s = file->PrepareIOOptions(ro, opts);
744   if (s.ok()) {
745     s = (*prefetch_buffer)->Prefetch(opts, file, prefetch_off, prefetch_len);
746   }
747   return s;
748 }
749 
TryReadPropertiesWithGlobalSeqno(const ReadOptions & ro,FilePrefetchBuffer * prefetch_buffer,const Slice & handle_value,TableProperties ** table_properties)750 Status BlockBasedTable::TryReadPropertiesWithGlobalSeqno(
751     const ReadOptions& ro, FilePrefetchBuffer* prefetch_buffer,
752     const Slice& handle_value, TableProperties** table_properties) {
753   assert(table_properties != nullptr);
754   // If this is an external SST file ingested with write_global_seqno set to
755   // true, then we expect the checksum mismatch because checksum was written
756   // by SstFileWriter, but its global seqno in the properties block may have
757   // been changed during ingestion. In this case, we read the properties
758   // block, copy it to a memory buffer, change the global seqno to its
759   // original value, i.e. 0, and verify the checksum again.
760   BlockHandle props_block_handle;
761   CacheAllocationPtr tmp_buf;
762   Status s = ReadProperties(ro, handle_value, rep_->file.get(), prefetch_buffer,
763                             rep_->footer, rep_->ioptions, table_properties,
764                             false /* verify_checksum */, &props_block_handle,
765                             &tmp_buf, false /* compression_type_missing */,
766                             nullptr /* memory_allocator */);
767   if (s.ok() && tmp_buf) {
768     const auto seqno_pos_iter =
769         (*table_properties)
770             ->properties_offsets.find(
771                 ExternalSstFilePropertyNames::kGlobalSeqno);
772     size_t block_size = static_cast<size_t>(props_block_handle.size());
773     if (seqno_pos_iter != (*table_properties)->properties_offsets.end()) {
774       uint64_t global_seqno_offset = seqno_pos_iter->second;
775       EncodeFixed64(
776           tmp_buf.get() + global_seqno_offset - props_block_handle.offset(), 0);
777     }
778     s = ROCKSDB_NAMESPACE::VerifyBlockChecksum(
779         rep_->footer.checksum(), tmp_buf.get(), block_size,
780         rep_->file->file_name(), props_block_handle.offset());
781   }
782   return s;
783 }
784 
ReadPropertiesBlock(const ReadOptions & ro,FilePrefetchBuffer * prefetch_buffer,InternalIterator * meta_iter,const SequenceNumber largest_seqno)785 Status BlockBasedTable::ReadPropertiesBlock(
786     const ReadOptions& ro, FilePrefetchBuffer* prefetch_buffer,
787     InternalIterator* meta_iter, const SequenceNumber largest_seqno) {
788   bool found_properties_block = true;
789   Status s;
790   s = SeekToPropertiesBlock(meta_iter, &found_properties_block);
791 
792   if (!s.ok()) {
793     ROCKS_LOG_WARN(rep_->ioptions.logger,
794                    "Error when seeking to properties block from file: %s",
795                    s.ToString().c_str());
796   } else if (found_properties_block) {
797     s = meta_iter->status();
798     TableProperties* table_properties = nullptr;
799     if (s.ok()) {
800       s = ReadProperties(
801           ro, meta_iter->value(), rep_->file.get(), prefetch_buffer,
802           rep_->footer, rep_->ioptions, &table_properties,
803           true /* verify_checksum */, nullptr /* ret_block_handle */,
804           nullptr /* ret_block_contents */,
805           false /* compression_type_missing */, nullptr /* memory_allocator */);
806     }
807     IGNORE_STATUS_IF_ERROR(s);
808 
809     if (s.IsCorruption()) {
810       s = TryReadPropertiesWithGlobalSeqno(
811           ro, prefetch_buffer, meta_iter->value(), &table_properties);
812       IGNORE_STATUS_IF_ERROR(s);
813     }
814     std::unique_ptr<TableProperties> props_guard;
815     if (table_properties != nullptr) {
816       props_guard.reset(table_properties);
817     }
818 
819     if (!s.ok()) {
820       ROCKS_LOG_WARN(rep_->ioptions.logger,
821                      "Encountered error while reading data from properties "
822                      "block %s",
823                      s.ToString().c_str());
824     } else {
825       assert(table_properties != nullptr);
826       rep_->table_properties.reset(props_guard.release());
827       rep_->blocks_maybe_compressed =
828           rep_->table_properties->compression_name !=
829           CompressionTypeToString(kNoCompression);
830       rep_->blocks_definitely_zstd_compressed =
831           (rep_->table_properties->compression_name ==
832                CompressionTypeToString(kZSTD) ||
833            rep_->table_properties->compression_name ==
834                CompressionTypeToString(kZSTDNotFinalCompression));
835     }
836   } else {
837     ROCKS_LOG_ERROR(rep_->ioptions.logger,
838                     "Cannot find Properties block from file.");
839   }
840 #ifndef ROCKSDB_LITE
841   if (rep_->table_properties) {
842     //**TODO: If/When the DBOptions has a registry in it, the ConfigOptions
843     // will need to use it
844     ConfigOptions config_options;
845     Status st = SliceTransform::CreateFromString(
846         config_options, rep_->table_properties->prefix_extractor_name,
847         &(rep_->table_prefix_extractor));
848     if (!st.ok()) {
849       //**TODO: Should this be error be returned or swallowed?
850       ROCKS_LOG_ERROR(rep_->ioptions.logger,
851                       "Failed to create prefix extractor[%s]: %s",
852                       rep_->table_properties->prefix_extractor_name.c_str(),
853                       st.ToString().c_str());
854     }
855   }
856 #endif  // ROCKSDB_LITE
857 
858   // Read the table properties, if provided.
859   if (rep_->table_properties) {
860     rep_->whole_key_filtering &=
861         IsFeatureSupported(*(rep_->table_properties),
862                            BlockBasedTablePropertyNames::kWholeKeyFiltering,
863                            rep_->ioptions.logger);
864     rep_->prefix_filtering &= IsFeatureSupported(
865         *(rep_->table_properties),
866         BlockBasedTablePropertyNames::kPrefixFiltering, rep_->ioptions.logger);
867 
868     rep_->index_key_includes_seq =
869         rep_->table_properties->index_key_is_user_key == 0;
870     rep_->index_value_is_full =
871         rep_->table_properties->index_value_is_delta_encoded == 0;
872 
873     // Update index_type with the true type.
874     // If table properties don't contain index type, we assume that the table
875     // is in very old format and has kBinarySearch index type.
876     auto& props = rep_->table_properties->user_collected_properties;
877     auto pos = props.find(BlockBasedTablePropertyNames::kIndexType);
878     if (pos != props.end()) {
879       rep_->index_type = static_cast<BlockBasedTableOptions::IndexType>(
880           DecodeFixed32(pos->second.c_str()));
881     }
882 
883     rep_->index_has_first_key =
884         rep_->index_type == BlockBasedTableOptions::kBinarySearchWithFirstKey;
885 
886     s = GetGlobalSequenceNumber(*(rep_->table_properties), largest_seqno,
887                                 &(rep_->global_seqno));
888     if (!s.ok()) {
889       ROCKS_LOG_ERROR(rep_->ioptions.logger, "%s", s.ToString().c_str());
890     }
891   }
892   return s;
893 }
894 
ReadRangeDelBlock(const ReadOptions & read_options,FilePrefetchBuffer * prefetch_buffer,InternalIterator * meta_iter,const InternalKeyComparator & internal_comparator,BlockCacheLookupContext * lookup_context)895 Status BlockBasedTable::ReadRangeDelBlock(
896     const ReadOptions& read_options, FilePrefetchBuffer* prefetch_buffer,
897     InternalIterator* meta_iter,
898     const InternalKeyComparator& internal_comparator,
899     BlockCacheLookupContext* lookup_context) {
900   Status s;
901   bool found_range_del_block;
902   BlockHandle range_del_handle;
903   s = SeekToRangeDelBlock(meta_iter, &found_range_del_block, &range_del_handle);
904   if (!s.ok()) {
905     ROCKS_LOG_WARN(
906         rep_->ioptions.logger,
907         "Error when seeking to range delete tombstones block from file: %s",
908         s.ToString().c_str());
909   } else if (found_range_del_block && !range_del_handle.IsNull()) {
910     std::unique_ptr<InternalIterator> iter(NewDataBlockIterator<DataBlockIter>(
911         read_options, range_del_handle,
912         /*input_iter=*/nullptr, BlockType::kRangeDeletion,
913         /*get_context=*/nullptr, lookup_context, Status(), prefetch_buffer));
914     assert(iter != nullptr);
915     s = iter->status();
916     if (!s.ok()) {
917       ROCKS_LOG_WARN(
918           rep_->ioptions.logger,
919           "Encountered error while reading data from range del block %s",
920           s.ToString().c_str());
921       IGNORE_STATUS_IF_ERROR(s);
922     } else {
923       rep_->fragmented_range_dels =
924           std::make_shared<FragmentedRangeTombstoneList>(std::move(iter),
925                                                          internal_comparator);
926     }
927   }
928   return s;
929 }
930 
PrefetchIndexAndFilterBlocks(const ReadOptions & ro,FilePrefetchBuffer * prefetch_buffer,InternalIterator * meta_iter,BlockBasedTable * new_table,bool prefetch_all,const BlockBasedTableOptions & table_options,const int level,size_t file_size,size_t max_file_size_for_l0_meta_pin,BlockCacheLookupContext * lookup_context)931 Status BlockBasedTable::PrefetchIndexAndFilterBlocks(
932     const ReadOptions& ro, FilePrefetchBuffer* prefetch_buffer,
933     InternalIterator* meta_iter, BlockBasedTable* new_table, bool prefetch_all,
934     const BlockBasedTableOptions& table_options, const int level,
935     size_t file_size, size_t max_file_size_for_l0_meta_pin,
936     BlockCacheLookupContext* lookup_context) {
937   Status s;
938 
939   // Find filter handle and filter type
940   if (rep_->filter_policy) {
941     for (auto filter_type :
942          {Rep::FilterType::kFullFilter, Rep::FilterType::kPartitionedFilter,
943           Rep::FilterType::kBlockFilter}) {
944       std::string prefix;
945       switch (filter_type) {
946         case Rep::FilterType::kFullFilter:
947           prefix = kFullFilterBlockPrefix;
948           break;
949         case Rep::FilterType::kPartitionedFilter:
950           prefix = kPartitionedFilterBlockPrefix;
951           break;
952         case Rep::FilterType::kBlockFilter:
953           prefix = kFilterBlockPrefix;
954           break;
955         default:
956           assert(0);
957       }
958       std::string filter_block_key = prefix;
959       filter_block_key.append(rep_->filter_policy->Name());
960       if (FindMetaBlock(meta_iter, filter_block_key, &rep_->filter_handle)
961               .ok()) {
962         rep_->filter_type = filter_type;
963         break;
964       }
965     }
966   }
967   // Partition filters cannot be enabled without partition indexes
968   assert(rep_->filter_type != Rep::FilterType::kPartitionedFilter ||
969          rep_->index_type == BlockBasedTableOptions::kTwoLevelIndexSearch);
970 
971   // Find compression dictionary handle
972   bool found_compression_dict = false;
973   s = SeekToCompressionDictBlock(meta_iter, &found_compression_dict,
974                                  &rep_->compression_dict_handle);
975   if (!s.ok()) {
976     return s;
977   }
978 
979   BlockBasedTableOptions::IndexType index_type = rep_->index_type;
980 
981   const bool use_cache = table_options.cache_index_and_filter_blocks;
982 
983   const bool maybe_flushed =
984       level == 0 && file_size <= max_file_size_for_l0_meta_pin;
985   std::function<bool(PinningTier, PinningTier)> is_pinned =
986       [maybe_flushed, &is_pinned](PinningTier pinning_tier,
987                                   PinningTier fallback_pinning_tier) {
988         // Fallback to fallback would lead to infinite recursion. Disallow it.
989         assert(fallback_pinning_tier != PinningTier::kFallback);
990 
991         switch (pinning_tier) {
992           case PinningTier::kFallback:
993             return is_pinned(fallback_pinning_tier,
994                              PinningTier::kNone /* fallback_pinning_tier */);
995           case PinningTier::kNone:
996             return false;
997           case PinningTier::kFlushedAndSimilar:
998             return maybe_flushed;
999           case PinningTier::kAll:
1000             return true;
1001         };
1002 
1003         // In GCC, this is needed to suppress `control reaches end of non-void
1004         // function [-Werror=return-type]`.
1005         assert(false);
1006         return false;
1007       };
1008   const bool pin_top_level_index = is_pinned(
1009       table_options.metadata_cache_options.top_level_index_pinning,
1010       table_options.pin_top_level_index_and_filter ? PinningTier::kAll
1011                                                    : PinningTier::kNone);
1012   const bool pin_partition =
1013       is_pinned(table_options.metadata_cache_options.partition_pinning,
1014                 table_options.pin_l0_filter_and_index_blocks_in_cache
1015                     ? PinningTier::kFlushedAndSimilar
1016                     : PinningTier::kNone);
1017   const bool pin_unpartitioned =
1018       is_pinned(table_options.metadata_cache_options.unpartitioned_pinning,
1019                 table_options.pin_l0_filter_and_index_blocks_in_cache
1020                     ? PinningTier::kFlushedAndSimilar
1021                     : PinningTier::kNone);
1022 
1023   // pin the first level of index
1024   const bool pin_index =
1025       index_type == BlockBasedTableOptions::kTwoLevelIndexSearch
1026           ? pin_top_level_index
1027           : pin_unpartitioned;
1028   // prefetch the first level of index
1029   // WART: this might be redundant (unnecessary cache hit) if !pin_index,
1030   // depending on prepopulate_block_cache option
1031   const bool prefetch_index = prefetch_all || pin_index;
1032 
1033   std::unique_ptr<IndexReader> index_reader;
1034   s = new_table->CreateIndexReader(ro, prefetch_buffer, meta_iter, use_cache,
1035                                    prefetch_index, pin_index, lookup_context,
1036                                    &index_reader);
1037   if (!s.ok()) {
1038     return s;
1039   }
1040 
1041   rep_->index_reader = std::move(index_reader);
1042 
1043   // The partitions of partitioned index are always stored in cache. They
1044   // are hence follow the configuration for pin and prefetch regardless of
1045   // the value of cache_index_and_filter_blocks
1046   if (prefetch_all || pin_partition) {
1047     s = rep_->index_reader->CacheDependencies(ro, pin_partition);
1048   }
1049   if (!s.ok()) {
1050     return s;
1051   }
1052 
1053   // pin the first level of filter
1054   const bool pin_filter =
1055       rep_->filter_type == Rep::FilterType::kPartitionedFilter
1056           ? pin_top_level_index
1057           : pin_unpartitioned;
1058   // prefetch the first level of filter
1059   // WART: this might be redundant (unnecessary cache hit) if !pin_filter,
1060   // depending on prepopulate_block_cache option
1061   const bool prefetch_filter = prefetch_all || pin_filter;
1062 
1063   if (rep_->filter_policy) {
1064     auto filter = new_table->CreateFilterBlockReader(
1065         ro, prefetch_buffer, use_cache, prefetch_filter, pin_filter,
1066         lookup_context);
1067 
1068     if (filter) {
1069       // Refer to the comment above about paritioned indexes always being cached
1070       if (prefetch_all || pin_partition) {
1071         s = filter->CacheDependencies(ro, pin_partition);
1072         if (!s.ok()) {
1073           return s;
1074         }
1075       }
1076       rep_->filter = std::move(filter);
1077     }
1078   }
1079 
1080   if (!rep_->compression_dict_handle.IsNull()) {
1081     std::unique_ptr<UncompressionDictReader> uncompression_dict_reader;
1082     s = UncompressionDictReader::Create(
1083         this, ro, prefetch_buffer, use_cache, prefetch_all || pin_unpartitioned,
1084         pin_unpartitioned, lookup_context, &uncompression_dict_reader);
1085     if (!s.ok()) {
1086       return s;
1087     }
1088 
1089     rep_->uncompression_dict_reader = std::move(uncompression_dict_reader);
1090   }
1091 
1092   assert(s.ok());
1093   return s;
1094 }
1095 
SetupForCompaction()1096 void BlockBasedTable::SetupForCompaction() {
1097   switch (rep_->ioptions.access_hint_on_compaction_start) {
1098     case Options::NONE:
1099       break;
1100     case Options::NORMAL:
1101       rep_->file->file()->Hint(FSRandomAccessFile::kNormal);
1102       break;
1103     case Options::SEQUENTIAL:
1104       rep_->file->file()->Hint(FSRandomAccessFile::kSequential);
1105       break;
1106     case Options::WILLNEED:
1107       rep_->file->file()->Hint(FSRandomAccessFile::kWillNeed);
1108       break;
1109     default:
1110       assert(false);
1111   }
1112 }
1113 
GetTableProperties() const1114 std::shared_ptr<const TableProperties> BlockBasedTable::GetTableProperties()
1115     const {
1116   return rep_->table_properties;
1117 }
1118 
ApproximateMemoryUsage() const1119 size_t BlockBasedTable::ApproximateMemoryUsage() const {
1120   size_t usage = 0;
1121   if (rep_->filter) {
1122     usage += rep_->filter->ApproximateMemoryUsage();
1123   }
1124   if (rep_->index_reader) {
1125     usage += rep_->index_reader->ApproximateMemoryUsage();
1126   }
1127   if (rep_->uncompression_dict_reader) {
1128     usage += rep_->uncompression_dict_reader->ApproximateMemoryUsage();
1129   }
1130   return usage;
1131 }
1132 
1133 // Load the meta-index-block from the file. On success, return the loaded
1134 // metaindex
1135 // block and its iterator.
ReadMetaIndexBlock(const ReadOptions & ro,FilePrefetchBuffer * prefetch_buffer,std::unique_ptr<Block> * metaindex_block,std::unique_ptr<InternalIterator> * iter)1136 Status BlockBasedTable::ReadMetaIndexBlock(
1137     const ReadOptions& ro, FilePrefetchBuffer* prefetch_buffer,
1138     std::unique_ptr<Block>* metaindex_block,
1139     std::unique_ptr<InternalIterator>* iter) {
1140   // TODO(sanjay): Skip this if footer.metaindex_handle() size indicates
1141   // it is an empty block.
1142   std::unique_ptr<Block> metaindex;
1143   Status s = ReadBlockFromFile(
1144       rep_->file.get(), prefetch_buffer, rep_->footer, ro,
1145       rep_->footer.metaindex_handle(), &metaindex, rep_->ioptions,
1146       true /* decompress */, true /*maybe_compressed*/, BlockType::kMetaIndex,
1147       UncompressionDict::GetEmptyDict(), rep_->persistent_cache_options,
1148       0 /* read_amp_bytes_per_bit */, GetMemoryAllocator(rep_->table_options),
1149       false /* for_compaction */, rep_->blocks_definitely_zstd_compressed,
1150       nullptr /* filter_policy */);
1151 
1152   if (!s.ok()) {
1153     ROCKS_LOG_ERROR(rep_->ioptions.logger,
1154                     "Encountered error while reading data from properties"
1155                     " block %s",
1156                     s.ToString().c_str());
1157     return s;
1158   }
1159 
1160   *metaindex_block = std::move(metaindex);
1161   // meta block uses bytewise comparator.
1162   iter->reset(metaindex_block->get()->NewDataIterator(
1163       BytewiseComparator(), kDisableGlobalSequenceNumber));
1164   return Status::OK();
1165 }
1166 
1167 template <typename TBlocklike>
GetDataBlockFromCache(const Slice & block_cache_key,const Slice & compressed_block_cache_key,Cache * block_cache,Cache * block_cache_compressed,const ReadOptions & read_options,CachableEntry<TBlocklike> * block,const UncompressionDict & uncompression_dict,BlockType block_type,const bool wait,GetContext * get_context) const1168 Status BlockBasedTable::GetDataBlockFromCache(
1169     const Slice& block_cache_key, const Slice& compressed_block_cache_key,
1170     Cache* block_cache, Cache* block_cache_compressed,
1171     const ReadOptions& read_options, CachableEntry<TBlocklike>* block,
1172     const UncompressionDict& uncompression_dict, BlockType block_type,
1173     const bool wait, GetContext* get_context) const {
1174   const size_t read_amp_bytes_per_bit =
1175       block_type == BlockType::kData
1176           ? rep_->table_options.read_amp_bytes_per_bit
1177           : 0;
1178   assert(block);
1179   assert(block->IsEmpty());
1180   const Cache::Priority priority =
1181       rep_->table_options.cache_index_and_filter_blocks_with_high_priority &&
1182               (block_type == BlockType::kFilter ||
1183                block_type == BlockType::kCompressionDictionary ||
1184                block_type == BlockType::kIndex)
1185           ? Cache::Priority::HIGH
1186           : Cache::Priority::LOW;
1187 
1188   Status s;
1189   BlockContents* compressed_block = nullptr;
1190   Cache::Handle* block_cache_compressed_handle = nullptr;
1191   Statistics* statistics = rep_->ioptions.statistics.get();
1192   bool using_zstd = rep_->blocks_definitely_zstd_compressed;
1193   const FilterPolicy* filter_policy = rep_->filter_policy;
1194   Cache::CreateCallback create_cb = GetCreateCallback<TBlocklike>(
1195       read_amp_bytes_per_bit, statistics, using_zstd, filter_policy);
1196 
1197   // Lookup uncompressed cache first
1198   if (block_cache != nullptr) {
1199     Cache::Handle* cache_handle = nullptr;
1200     cache_handle = GetEntryFromCache(
1201         rep_->ioptions.lowest_used_cache_tier, block_cache, block_cache_key,
1202         block_type, wait, get_context,
1203         BlocklikeTraits<TBlocklike>::GetCacheItemHelper(block_type), create_cb,
1204         priority);
1205     if (cache_handle != nullptr) {
1206       block->SetCachedValue(
1207           reinterpret_cast<TBlocklike*>(block_cache->Value(cache_handle)),
1208           block_cache, cache_handle);
1209       return s;
1210     }
1211   }
1212 
1213   // If not found, search from the compressed block cache.
1214   assert(block->IsEmpty());
1215 
1216   if (block_cache_compressed == nullptr) {
1217     return s;
1218   }
1219 
1220   assert(!compressed_block_cache_key.empty());
1221   BlockContents contents;
1222   if (rep_->ioptions.lowest_used_cache_tier ==
1223       CacheTier::kNonVolatileBlockTier) {
1224     Cache::CreateCallback create_cb_special = GetCreateCallback<BlockContents>(
1225         read_amp_bytes_per_bit, statistics, using_zstd, filter_policy);
1226     block_cache_compressed_handle = block_cache_compressed->Lookup(
1227         compressed_block_cache_key,
1228         BlocklikeTraits<BlockContents>::GetCacheItemHelper(block_type),
1229         create_cb_special, priority, true);
1230   } else {
1231     block_cache_compressed_handle =
1232         block_cache_compressed->Lookup(compressed_block_cache_key, statistics);
1233   }
1234 
1235   // if we found in the compressed cache, then uncompress and insert into
1236   // uncompressed cache
1237   if (block_cache_compressed_handle == nullptr) {
1238     RecordTick(statistics, BLOCK_CACHE_COMPRESSED_MISS);
1239     return s;
1240   }
1241 
1242   // found compressed block
1243   RecordTick(statistics, BLOCK_CACHE_COMPRESSED_HIT);
1244   compressed_block = reinterpret_cast<BlockContents*>(
1245       block_cache_compressed->Value(block_cache_compressed_handle));
1246   CompressionType compression_type = compressed_block->get_compression_type();
1247   assert(compression_type != kNoCompression);
1248 
1249   // Retrieve the uncompressed contents into a new buffer
1250   UncompressionContext context(compression_type);
1251   UncompressionInfo info(context, uncompression_dict, compression_type);
1252   s = UncompressBlockContents(
1253       info, compressed_block->data.data(), compressed_block->data.size(),
1254       &contents, rep_->table_options.format_version, rep_->ioptions,
1255       GetMemoryAllocator(rep_->table_options));
1256 
1257   // Insert uncompressed block into block cache, the priority is based on the
1258   // data block type.
1259   if (s.ok()) {
1260     std::unique_ptr<TBlocklike> block_holder(
1261         BlocklikeTraits<TBlocklike>::Create(
1262             std::move(contents), read_amp_bytes_per_bit, statistics,
1263             rep_->blocks_definitely_zstd_compressed,
1264             rep_->table_options.filter_policy.get()));  // uncompressed block
1265 
1266     if (block_cache != nullptr && block_holder->own_bytes() &&
1267         read_options.fill_cache) {
1268       size_t charge = block_holder->ApproximateMemoryUsage();
1269       Cache::Handle* cache_handle = nullptr;
1270       s = InsertEntryToCache(
1271           rep_->ioptions.lowest_used_cache_tier, block_cache, block_cache_key,
1272           BlocklikeTraits<TBlocklike>::GetCacheItemHelper(block_type),
1273           block_holder, charge, &cache_handle, priority);
1274       if (s.ok()) {
1275         assert(cache_handle != nullptr);
1276         block->SetCachedValue(block_holder.release(), block_cache,
1277                               cache_handle);
1278 
1279         UpdateCacheInsertionMetrics(block_type, get_context, charge,
1280                                     s.IsOkOverwritten(), rep_->ioptions.stats);
1281       } else {
1282         RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES);
1283       }
1284     } else {
1285       block->SetOwnedValue(block_holder.release());
1286     }
1287   }
1288 
1289   // Release hold on compressed cache entry
1290   block_cache_compressed->Release(block_cache_compressed_handle);
1291   return s;
1292 }
1293 
1294 template <typename TBlocklike>
PutDataBlockToCache(const Slice & block_cache_key,const Slice & compressed_block_cache_key,Cache * block_cache,Cache * block_cache_compressed,CachableEntry<TBlocklike> * cached_block,BlockContents * raw_block_contents,CompressionType raw_block_comp_type,const UncompressionDict & uncompression_dict,MemoryAllocator * memory_allocator,BlockType block_type,GetContext * get_context) const1295 Status BlockBasedTable::PutDataBlockToCache(
1296     const Slice& block_cache_key, const Slice& compressed_block_cache_key,
1297     Cache* block_cache, Cache* block_cache_compressed,
1298     CachableEntry<TBlocklike>* cached_block, BlockContents* raw_block_contents,
1299     CompressionType raw_block_comp_type,
1300     const UncompressionDict& uncompression_dict,
1301     MemoryAllocator* memory_allocator, BlockType block_type,
1302     GetContext* get_context) const {
1303   const ImmutableOptions& ioptions = rep_->ioptions;
1304   const uint32_t format_version = rep_->table_options.format_version;
1305   const size_t read_amp_bytes_per_bit =
1306       block_type == BlockType::kData
1307           ? rep_->table_options.read_amp_bytes_per_bit
1308           : 0;
1309   const Cache::Priority priority =
1310       rep_->table_options.cache_index_and_filter_blocks_with_high_priority &&
1311               (block_type == BlockType::kFilter ||
1312                block_type == BlockType::kCompressionDictionary ||
1313                block_type == BlockType::kIndex)
1314           ? Cache::Priority::HIGH
1315           : Cache::Priority::LOW;
1316   assert(cached_block);
1317   assert(cached_block->IsEmpty());
1318 
1319   Status s;
1320   Statistics* statistics = ioptions.stats;
1321 
1322   std::unique_ptr<TBlocklike> block_holder;
1323   if (raw_block_comp_type != kNoCompression) {
1324     // Retrieve the uncompressed contents into a new buffer
1325     BlockContents uncompressed_block_contents;
1326     UncompressionContext context(raw_block_comp_type);
1327     UncompressionInfo info(context, uncompression_dict, raw_block_comp_type);
1328     s = UncompressBlockContents(info, raw_block_contents->data.data(),
1329                                 raw_block_contents->data.size(),
1330                                 &uncompressed_block_contents, format_version,
1331                                 ioptions, memory_allocator);
1332     if (!s.ok()) {
1333       return s;
1334     }
1335 
1336     block_holder.reset(BlocklikeTraits<TBlocklike>::Create(
1337         std::move(uncompressed_block_contents), read_amp_bytes_per_bit,
1338         statistics, rep_->blocks_definitely_zstd_compressed,
1339         rep_->table_options.filter_policy.get()));
1340   } else {
1341     block_holder.reset(BlocklikeTraits<TBlocklike>::Create(
1342         std::move(*raw_block_contents), read_amp_bytes_per_bit, statistics,
1343         rep_->blocks_definitely_zstd_compressed,
1344         rep_->table_options.filter_policy.get()));
1345   }
1346 
1347   // Insert compressed block into compressed block cache.
1348   // Release the hold on the compressed cache entry immediately.
1349   if (block_cache_compressed != nullptr &&
1350       raw_block_comp_type != kNoCompression && raw_block_contents != nullptr &&
1351       raw_block_contents->own_bytes()) {
1352 #ifndef NDEBUG
1353     assert(raw_block_contents->is_raw_block);
1354 #endif  // NDEBUG
1355 
1356     // We cannot directly put raw_block_contents because this could point to
1357     // an object in the stack.
1358     std::unique_ptr<BlockContents> block_cont_for_comp_cache(
1359         new BlockContents(std::move(*raw_block_contents)));
1360     s = InsertEntryToCache(
1361         rep_->ioptions.lowest_used_cache_tier, block_cache_compressed,
1362         compressed_block_cache_key,
1363         BlocklikeTraits<BlockContents>::GetCacheItemHelper(block_type),
1364         block_cont_for_comp_cache,
1365         block_cont_for_comp_cache->ApproximateMemoryUsage(), nullptr,
1366         Cache::Priority::LOW);
1367 
1368     BlockContents* block_cont_raw_ptr = block_cont_for_comp_cache.release();
1369     if (s.ok()) {
1370       // Avoid the following code to delete this cached block.
1371       RecordTick(statistics, BLOCK_CACHE_COMPRESSED_ADD);
1372     } else {
1373       RecordTick(statistics, BLOCK_CACHE_COMPRESSED_ADD_FAILURES);
1374       delete block_cont_raw_ptr;
1375     }
1376   }
1377 
1378   // insert into uncompressed block cache
1379   if (block_cache != nullptr && block_holder->own_bytes()) {
1380     size_t charge = block_holder->ApproximateMemoryUsage();
1381     Cache::Handle* cache_handle = nullptr;
1382     s = InsertEntryToCache(
1383         rep_->ioptions.lowest_used_cache_tier, block_cache, block_cache_key,
1384         BlocklikeTraits<TBlocklike>::GetCacheItemHelper(block_type),
1385         block_holder, charge, &cache_handle, priority);
1386     if (s.ok()) {
1387       assert(cache_handle != nullptr);
1388       cached_block->SetCachedValue(block_holder.release(), block_cache,
1389                                    cache_handle);
1390 
1391       UpdateCacheInsertionMetrics(block_type, get_context, charge,
1392                                   s.IsOkOverwritten(), rep_->ioptions.stats);
1393     } else {
1394       RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES);
1395     }
1396   } else {
1397     cached_block->SetOwnedValue(block_holder.release());
1398   }
1399 
1400   return s;
1401 }
1402 
CreateFilterBlockReader(const ReadOptions & ro,FilePrefetchBuffer * prefetch_buffer,bool use_cache,bool prefetch,bool pin,BlockCacheLookupContext * lookup_context)1403 std::unique_ptr<FilterBlockReader> BlockBasedTable::CreateFilterBlockReader(
1404     const ReadOptions& ro, FilePrefetchBuffer* prefetch_buffer, bool use_cache,
1405     bool prefetch, bool pin, BlockCacheLookupContext* lookup_context) {
1406   auto& rep = rep_;
1407   auto filter_type = rep->filter_type;
1408   if (filter_type == Rep::FilterType::kNoFilter) {
1409     return std::unique_ptr<FilterBlockReader>();
1410   }
1411 
1412   assert(rep->filter_policy);
1413 
1414   switch (filter_type) {
1415     case Rep::FilterType::kPartitionedFilter:
1416       return PartitionedFilterBlockReader::Create(
1417           this, ro, prefetch_buffer, use_cache, prefetch, pin, lookup_context);
1418 
1419     case Rep::FilterType::kBlockFilter:
1420       return BlockBasedFilterBlockReader::Create(
1421           this, ro, prefetch_buffer, use_cache, prefetch, pin, lookup_context);
1422 
1423     case Rep::FilterType::kFullFilter:
1424       return FullFilterBlockReader::Create(this, ro, prefetch_buffer, use_cache,
1425                                            prefetch, pin, lookup_context);
1426 
1427     default:
1428       // filter_type is either kNoFilter (exited the function at the first if),
1429       // or it must be covered in this switch block
1430       assert(false);
1431       return std::unique_ptr<FilterBlockReader>();
1432   }
1433 }
1434 
1435 // disable_prefix_seek should be set to true when prefix_extractor found in SST
1436 // differs from the one in mutable_cf_options and index type is HashBasedIndex
NewIndexIterator(const ReadOptions & read_options,bool disable_prefix_seek,IndexBlockIter * input_iter,GetContext * get_context,BlockCacheLookupContext * lookup_context) const1437 InternalIteratorBase<IndexValue>* BlockBasedTable::NewIndexIterator(
1438     const ReadOptions& read_options, bool disable_prefix_seek,
1439     IndexBlockIter* input_iter, GetContext* get_context,
1440     BlockCacheLookupContext* lookup_context) const {
1441   assert(rep_ != nullptr);
1442   assert(rep_->index_reader != nullptr);
1443 
1444   // We don't return pinned data from index blocks, so no need
1445   // to set `block_contents_pinned`.
1446   return rep_->index_reader->NewIterator(read_options, disable_prefix_seek,
1447                                          input_iter, get_context,
1448                                          lookup_context);
1449 }
1450 
1451 template <>
InitBlockIterator(const Rep * rep,Block * block,BlockType block_type,DataBlockIter * input_iter,bool block_contents_pinned)1452 DataBlockIter* BlockBasedTable::InitBlockIterator<DataBlockIter>(
1453     const Rep* rep, Block* block, BlockType block_type,
1454     DataBlockIter* input_iter, bool block_contents_pinned) {
1455   return block->NewDataIterator(rep->internal_comparator.user_comparator(),
1456                                 rep->get_global_seqno(block_type), input_iter,
1457                                 rep->ioptions.stats, block_contents_pinned);
1458 }
1459 
1460 template <>
InitBlockIterator(const Rep * rep,Block * block,BlockType block_type,IndexBlockIter * input_iter,bool block_contents_pinned)1461 IndexBlockIter* BlockBasedTable::InitBlockIterator<IndexBlockIter>(
1462     const Rep* rep, Block* block, BlockType block_type,
1463     IndexBlockIter* input_iter, bool block_contents_pinned) {
1464   return block->NewIndexIterator(
1465       rep->internal_comparator.user_comparator(),
1466       rep->get_global_seqno(block_type), input_iter, rep->ioptions.stats,
1467       /* total_order_seek */ true, rep->index_has_first_key,
1468       rep->index_key_includes_seq, rep->index_value_is_full,
1469       block_contents_pinned);
1470 }
1471 
1472 // If contents is nullptr, this function looks up the block caches for the
1473 // data block referenced by handle, and read the block from disk if necessary.
1474 // If contents is non-null, it skips the cache lookup and disk read, since
1475 // the caller has already read it. In both cases, if ro.fill_cache is true,
1476 // it inserts the block into the block cache.
1477 template <typename TBlocklike>
MaybeReadBlockAndLoadToCache(FilePrefetchBuffer * prefetch_buffer,const ReadOptions & ro,const BlockHandle & handle,const UncompressionDict & uncompression_dict,const bool wait,CachableEntry<TBlocklike> * block_entry,BlockType block_type,GetContext * get_context,BlockCacheLookupContext * lookup_context,BlockContents * contents) const1478 Status BlockBasedTable::MaybeReadBlockAndLoadToCache(
1479     FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro,
1480     const BlockHandle& handle, const UncompressionDict& uncompression_dict,
1481     const bool wait, CachableEntry<TBlocklike>* block_entry,
1482     BlockType block_type, GetContext* get_context,
1483     BlockCacheLookupContext* lookup_context, BlockContents* contents) const {
1484   assert(block_entry != nullptr);
1485   const bool no_io = (ro.read_tier == kBlockCacheTier);
1486   Cache* block_cache = rep_->table_options.block_cache.get();
1487   Cache* block_cache_compressed =
1488       rep_->table_options.block_cache_compressed.get();
1489 
1490   // First, try to get the block from the cache
1491   //
1492   // If either block cache is enabled, we'll try to read from it.
1493   Status s;
1494   char cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length];
1495   char compressed_cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length];
1496   Slice key /* key to the block cache */;
1497   Slice ckey /* key to the compressed block cache */;
1498   bool is_cache_hit = false;
1499   if (block_cache != nullptr || block_cache_compressed != nullptr) {
1500     // create key for block cache
1501     if (block_cache != nullptr) {
1502       key = GetCacheKey(rep_->cache_key_prefix, rep_->cache_key_prefix_size,
1503                         handle, cache_key);
1504     }
1505 
1506     if (block_cache_compressed != nullptr) {
1507       ckey = GetCacheKey(rep_->compressed_cache_key_prefix,
1508                          rep_->compressed_cache_key_prefix_size, handle,
1509                          compressed_cache_key);
1510     }
1511 
1512     if (!contents) {
1513       s = GetDataBlockFromCache(key, ckey, block_cache, block_cache_compressed,
1514                                 ro, block_entry, uncompression_dict, block_type,
1515                                 wait, get_context);
1516       // Value could still be null at this point, so check the cache handle
1517       // and update the read pattern for prefetching
1518       if (block_entry->GetValue() || block_entry->GetCacheHandle()) {
1519         // TODO(haoyu): Differentiate cache hit on uncompressed block cache and
1520         // compressed block cache.
1521         is_cache_hit = true;
1522         if (prefetch_buffer) {
1523           // Update the block details so that PrefetchBuffer can use the read
1524           // pattern to determine if reads are sequential or not for
1525           // prefetching. It should also take in account blocks read from cache.
1526           prefetch_buffer->UpdateReadPattern(handle.offset(),
1527                                              block_size(handle));
1528         }
1529       }
1530     }
1531 
1532     // Can't find the block from the cache. If I/O is allowed, read from the
1533     // file.
1534     if (block_entry->GetValue() == nullptr &&
1535         block_entry->GetCacheHandle() == nullptr && !no_io && ro.fill_cache) {
1536       Statistics* statistics = rep_->ioptions.stats;
1537       const bool maybe_compressed =
1538           block_type != BlockType::kFilter &&
1539           block_type != BlockType::kCompressionDictionary &&
1540           rep_->blocks_maybe_compressed;
1541       const bool do_uncompress = maybe_compressed && !block_cache_compressed;
1542       CompressionType raw_block_comp_type;
1543       BlockContents raw_block_contents;
1544       if (!contents) {
1545         StopWatch sw(rep_->ioptions.clock, statistics, READ_BLOCK_GET_MICROS);
1546         BlockFetcher block_fetcher(
1547             rep_->file.get(), prefetch_buffer, rep_->footer, ro, handle,
1548             &raw_block_contents, rep_->ioptions, do_uncompress,
1549             maybe_compressed, block_type, uncompression_dict,
1550             rep_->persistent_cache_options,
1551             GetMemoryAllocator(rep_->table_options),
1552             GetMemoryAllocatorForCompressedBlock(rep_->table_options));
1553         s = block_fetcher.ReadBlockContents();
1554         raw_block_comp_type = block_fetcher.get_compression_type();
1555         contents = &raw_block_contents;
1556         if (get_context) {
1557           switch (block_type) {
1558             case BlockType::kIndex:
1559               ++get_context->get_context_stats_.num_index_read;
1560               break;
1561             case BlockType::kFilter:
1562               ++get_context->get_context_stats_.num_filter_read;
1563               break;
1564             case BlockType::kData:
1565               ++get_context->get_context_stats_.num_data_read;
1566               break;
1567             default:
1568               break;
1569           }
1570         }
1571       } else {
1572         raw_block_comp_type = contents->get_compression_type();
1573       }
1574 
1575       if (s.ok()) {
1576         // If filling cache is allowed and a cache is configured, try to put the
1577         // block to the cache.
1578         s = PutDataBlockToCache(
1579             key, ckey, block_cache, block_cache_compressed, block_entry,
1580             contents, raw_block_comp_type, uncompression_dict,
1581             GetMemoryAllocator(rep_->table_options), block_type, get_context);
1582       }
1583     }
1584   }
1585 
1586   // Fill lookup_context.
1587   if (block_cache_tracer_ && block_cache_tracer_->is_tracing_enabled() &&
1588       lookup_context) {
1589     size_t usage = 0;
1590     uint64_t nkeys = 0;
1591     if (block_entry->GetValue()) {
1592       // Approximate the number of keys in the block using restarts.
1593       nkeys =
1594           rep_->table_options.block_restart_interval *
1595           BlocklikeTraits<TBlocklike>::GetNumRestarts(*block_entry->GetValue());
1596       usage = block_entry->GetValue()->ApproximateMemoryUsage();
1597     }
1598     TraceType trace_block_type = TraceType::kTraceMax;
1599     switch (block_type) {
1600       case BlockType::kData:
1601         trace_block_type = TraceType::kBlockTraceDataBlock;
1602         break;
1603       case BlockType::kFilter:
1604         trace_block_type = TraceType::kBlockTraceFilterBlock;
1605         break;
1606       case BlockType::kCompressionDictionary:
1607         trace_block_type = TraceType::kBlockTraceUncompressionDictBlock;
1608         break;
1609       case BlockType::kRangeDeletion:
1610         trace_block_type = TraceType::kBlockTraceRangeDeletionBlock;
1611         break;
1612       case BlockType::kIndex:
1613         trace_block_type = TraceType::kBlockTraceIndexBlock;
1614         break;
1615       default:
1616         // This cannot happen.
1617         assert(false);
1618         break;
1619     }
1620     bool no_insert = no_io || !ro.fill_cache;
1621     if (BlockCacheTraceHelper::IsGetOrMultiGetOnDataBlock(
1622             trace_block_type, lookup_context->caller)) {
1623       // Defer logging the access to Get() and MultiGet() to trace additional
1624       // information, e.g., referenced_key_exist_in_block.
1625 
1626       // Make a copy of the block key here since it will be logged later.
1627       lookup_context->FillLookupContext(
1628           is_cache_hit, no_insert, trace_block_type,
1629           /*block_size=*/usage, /*block_key=*/key.ToString(), nkeys);
1630     } else {
1631       // Avoid making copy of block_key and cf_name when constructing the access
1632       // record.
1633       BlockCacheTraceRecord access_record(
1634           rep_->ioptions.clock->NowMicros(),
1635           /*block_key=*/"", trace_block_type,
1636           /*block_size=*/usage, rep_->cf_id_for_tracing(),
1637           /*cf_name=*/"", rep_->level_for_tracing(),
1638           rep_->sst_number_for_tracing(), lookup_context->caller, is_cache_hit,
1639           no_insert, lookup_context->get_id,
1640           lookup_context->get_from_user_specified_snapshot,
1641           /*referenced_key=*/"");
1642       // TODO: Should handle this error?
1643       block_cache_tracer_
1644           ->WriteBlockAccess(access_record, key, rep_->cf_name_for_tracing(),
1645                              lookup_context->referenced_key)
1646           .PermitUncheckedError();
1647     }
1648   }
1649 
1650   assert(s.ok() || block_entry->GetValue() == nullptr);
1651   return s;
1652 }
1653 
1654 // This function reads multiple data blocks from disk using Env::MultiRead()
1655 // and optionally inserts them into the block cache. It uses the scratch
1656 // buffer provided by the caller, which is contiguous. If scratch is a nullptr
1657 // it allocates a separate buffer for each block. Typically, if the blocks
1658 // need to be uncompressed and there is no compressed block cache, callers
1659 // can allocate a temporary scratch buffer in order to minimize memory
1660 // allocations.
1661 // If options.fill_cache is true, it inserts the blocks into cache. If its
1662 // false and scratch is non-null and the blocks are uncompressed, it copies
1663 // the buffers to heap. In any case, the CachableEntry<Block> returned will
1664 // own the data bytes.
1665 // If compression is enabled and also there is no compressed block cache,
1666 // the adjacent blocks are read out in one IO (combined read)
1667 // batch - A MultiGetRange with only those keys with unique data blocks not
1668 //         found in cache
1669 // handles - A vector of block handles. Some of them me be NULL handles
1670 // scratch - An optional contiguous buffer to read compressed blocks into
RetrieveMultipleBlocks(const ReadOptions & options,const MultiGetRange * batch,const autovector<BlockHandle,MultiGetContext::MAX_BATCH_SIZE> * handles,autovector<Status,MultiGetContext::MAX_BATCH_SIZE> * statuses,autovector<CachableEntry<Block>,MultiGetContext::MAX_BATCH_SIZE> * results,char * scratch,const UncompressionDict & uncompression_dict) const1671 void BlockBasedTable::RetrieveMultipleBlocks(
1672     const ReadOptions& options, const MultiGetRange* batch,
1673     const autovector<BlockHandle, MultiGetContext::MAX_BATCH_SIZE>* handles,
1674     autovector<Status, MultiGetContext::MAX_BATCH_SIZE>* statuses,
1675     autovector<CachableEntry<Block>, MultiGetContext::MAX_BATCH_SIZE>* results,
1676     char* scratch, const UncompressionDict& uncompression_dict) const {
1677   RandomAccessFileReader* file = rep_->file.get();
1678   const Footer& footer = rep_->footer;
1679   const ImmutableOptions& ioptions = rep_->ioptions;
1680   size_t read_amp_bytes_per_bit = rep_->table_options.read_amp_bytes_per_bit;
1681   MemoryAllocator* memory_allocator = GetMemoryAllocator(rep_->table_options);
1682 
1683   if (ioptions.allow_mmap_reads) {
1684     size_t idx_in_batch = 0;
1685     for (auto mget_iter = batch->begin(); mget_iter != batch->end();
1686          ++mget_iter, ++idx_in_batch) {
1687       BlockCacheLookupContext lookup_data_block_context(
1688           TableReaderCaller::kUserMultiGet);
1689       const BlockHandle& handle = (*handles)[idx_in_batch];
1690       if (handle.IsNull()) {
1691         continue;
1692       }
1693 
1694       (*statuses)[idx_in_batch] =
1695           RetrieveBlock(nullptr, options, handle, uncompression_dict,
1696                         &(*results)[idx_in_batch], BlockType::kData,
1697                         mget_iter->get_context, &lookup_data_block_context,
1698                         /* for_compaction */ false, /* use_cache */ true,
1699                         /* wait_for_cache */ true);
1700     }
1701     return;
1702   }
1703 
1704   // In direct IO mode, blocks share the direct io buffer.
1705   // Otherwise, blocks share the scratch buffer.
1706   const bool use_shared_buffer = file->use_direct_io() || scratch != nullptr;
1707 
1708   autovector<FSReadRequest, MultiGetContext::MAX_BATCH_SIZE> read_reqs;
1709   size_t buf_offset = 0;
1710   size_t idx_in_batch = 0;
1711 
1712   uint64_t prev_offset = 0;
1713   size_t prev_len = 0;
1714   autovector<size_t, MultiGetContext::MAX_BATCH_SIZE> req_idx_for_block;
1715   autovector<size_t, MultiGetContext::MAX_BATCH_SIZE> req_offset_for_block;
1716   for (auto mget_iter = batch->begin(); mget_iter != batch->end();
1717        ++mget_iter, ++idx_in_batch) {
1718     const BlockHandle& handle = (*handles)[idx_in_batch];
1719     if (handle.IsNull()) {
1720       continue;
1721     }
1722 
1723     size_t prev_end = static_cast<size_t>(prev_offset) + prev_len;
1724 
1725     // If current block is adjacent to the previous one, at the same time,
1726     // compression is enabled and there is no compressed cache, we combine
1727     // the two block read as one.
1728     // We don't combine block reads here in direct IO mode, because when doing
1729     // direct IO read, the block requests will be realigned and merged when
1730     // necessary.
1731     if (use_shared_buffer && !file->use_direct_io() &&
1732         prev_end == handle.offset()) {
1733       req_offset_for_block.emplace_back(prev_len);
1734       prev_len += block_size(handle);
1735     } else {
1736       // No compression or current block and previous one is not adjacent:
1737       // Step 1, create a new request for previous blocks
1738       if (prev_len != 0) {
1739         FSReadRequest req;
1740         req.offset = prev_offset;
1741         req.len = prev_len;
1742         if (file->use_direct_io()) {
1743           req.scratch = nullptr;
1744         } else if (use_shared_buffer) {
1745           req.scratch = scratch + buf_offset;
1746           buf_offset += req.len;
1747         } else {
1748           req.scratch = new char[req.len];
1749         }
1750         read_reqs.emplace_back(req);
1751       }
1752 
1753       // Step 2, remeber the previous block info
1754       prev_offset = handle.offset();
1755       prev_len = block_size(handle);
1756       req_offset_for_block.emplace_back(0);
1757     }
1758     req_idx_for_block.emplace_back(read_reqs.size());
1759 
1760     PERF_COUNTER_ADD(block_read_count, 1);
1761     PERF_COUNTER_ADD(block_read_byte, block_size(handle));
1762   }
1763   // Handle the last block and process the pending last request
1764   if (prev_len != 0) {
1765     FSReadRequest req;
1766     req.offset = prev_offset;
1767     req.len = prev_len;
1768     if (file->use_direct_io()) {
1769       req.scratch = nullptr;
1770     } else if (use_shared_buffer) {
1771       req.scratch = scratch + buf_offset;
1772     } else {
1773       req.scratch = new char[req.len];
1774     }
1775     read_reqs.emplace_back(req);
1776   }
1777 
1778   AlignedBuf direct_io_buf;
1779   {
1780     IOOptions opts;
1781     IOStatus s = file->PrepareIOOptions(options, opts);
1782     if (s.ok()) {
1783       s = file->MultiRead(opts, &read_reqs[0], read_reqs.size(),
1784                           &direct_io_buf);
1785     }
1786     if (!s.ok()) {
1787       // Discard all the results in this batch if there is any time out
1788       // or overall MultiRead error
1789       for (FSReadRequest& req : read_reqs) {
1790         req.status = s;
1791       }
1792     }
1793   }
1794 
1795   idx_in_batch = 0;
1796   size_t valid_batch_idx = 0;
1797   for (auto mget_iter = batch->begin(); mget_iter != batch->end();
1798        ++mget_iter, ++idx_in_batch) {
1799     const BlockHandle& handle = (*handles)[idx_in_batch];
1800 
1801     if (handle.IsNull()) {
1802       continue;
1803     }
1804 
1805     assert(valid_batch_idx < req_idx_for_block.size());
1806     assert(valid_batch_idx < req_offset_for_block.size());
1807     assert(req_idx_for_block[valid_batch_idx] < read_reqs.size());
1808     size_t& req_idx = req_idx_for_block[valid_batch_idx];
1809     size_t& req_offset = req_offset_for_block[valid_batch_idx];
1810     valid_batch_idx++;
1811     if (mget_iter->get_context) {
1812       ++(mget_iter->get_context->get_context_stats_.num_data_read);
1813     }
1814     FSReadRequest& req = read_reqs[req_idx];
1815     Status s = req.status;
1816     if (s.ok()) {
1817       if ((req.result.size() != req.len) ||
1818           (req_offset + block_size(handle) > req.result.size())) {
1819         s = Status::Corruption(
1820             "truncated block read from " + rep_->file->file_name() +
1821             " offset " + ToString(handle.offset()) + ", expected " +
1822             ToString(req.len) + " bytes, got " + ToString(req.result.size()));
1823       }
1824     }
1825 
1826     BlockContents raw_block_contents;
1827     if (s.ok()) {
1828       if (!use_shared_buffer) {
1829         // We allocated a buffer for this block. Give ownership of it to
1830         // BlockContents so it can free the memory
1831         assert(req.result.data() == req.scratch);
1832         assert(req.result.size() == block_size(handle));
1833         assert(req_offset == 0);
1834         std::unique_ptr<char[]> raw_block(req.scratch);
1835         raw_block_contents = BlockContents(std::move(raw_block), handle.size());
1836       } else {
1837         // We used the scratch buffer or direct io buffer
1838         // which are shared by the blocks.
1839         // raw_block_contents does not have the ownership.
1840         raw_block_contents =
1841             BlockContents(Slice(req.result.data() + req_offset, handle.size()));
1842       }
1843 #ifndef NDEBUG
1844       raw_block_contents.is_raw_block = true;
1845 #endif
1846 
1847       if (options.verify_checksums) {
1848         PERF_TIMER_GUARD(block_checksum_time);
1849         const char* data = req.result.data();
1850         // Since the scratch might be shared, the offset of the data block in
1851         // the buffer might not be 0. req.result.data() only point to the
1852         // begin address of each read request, we need to add the offset
1853         // in each read request. Checksum is stored in the block trailer,
1854         // beyond the payload size.
1855         s = ROCKSDB_NAMESPACE::VerifyBlockChecksum(
1856             footer.checksum(), data + req_offset, handle.size(),
1857             rep_->file->file_name(), handle.offset());
1858         TEST_SYNC_POINT_CALLBACK("RetrieveMultipleBlocks:VerifyChecksum", &s);
1859       }
1860     } else if (!use_shared_buffer) {
1861       // Free the allocated scratch buffer.
1862       delete[] req.scratch;
1863     }
1864 
1865     if (s.ok()) {
1866       // When the blocks share the same underlying buffer (scratch or direct io
1867       // buffer), we may need to manually copy the block into heap if the raw
1868       // block has to be inserted into a cache. That falls into th following
1869       // cases -
1870       // 1. Raw block is not compressed, it needs to be inserted into the
1871       //    uncompressed block cache if there is one
1872       // 2. If the raw block is compressed, it needs to be inserted into the
1873       //    compressed block cache if there is one
1874       //
1875       // In all other cases, the raw block is either uncompressed into a heap
1876       // buffer or there is no cache at all.
1877       CompressionType compression_type =
1878           raw_block_contents.get_compression_type();
1879       if (use_shared_buffer && (compression_type == kNoCompression ||
1880                                 (compression_type != kNoCompression &&
1881                                  rep_->table_options.block_cache_compressed))) {
1882         Slice raw = Slice(req.result.data() + req_offset, block_size(handle));
1883         raw_block_contents = BlockContents(
1884             CopyBufferToHeap(GetMemoryAllocator(rep_->table_options), raw),
1885             handle.size());
1886 #ifndef NDEBUG
1887         raw_block_contents.is_raw_block = true;
1888 #endif
1889       }
1890     }
1891 
1892     if (s.ok()) {
1893       if (options.fill_cache) {
1894         BlockCacheLookupContext lookup_data_block_context(
1895             TableReaderCaller::kUserMultiGet);
1896         CachableEntry<Block>* block_entry = &(*results)[idx_in_batch];
1897         // MaybeReadBlockAndLoadToCache will insert into the block caches if
1898         // necessary. Since we're passing the raw block contents, it will
1899         // avoid looking up the block cache
1900         s = MaybeReadBlockAndLoadToCache(
1901             nullptr, options, handle, uncompression_dict, /*wait=*/true,
1902             block_entry, BlockType::kData, mget_iter->get_context,
1903             &lookup_data_block_context, &raw_block_contents);
1904 
1905         // block_entry value could be null if no block cache is present, i.e
1906         // BlockBasedTableOptions::no_block_cache is true and no compressed
1907         // block cache is configured. In that case, fall
1908         // through and set up the block explicitly
1909         if (block_entry->GetValue() != nullptr) {
1910           s.PermitUncheckedError();
1911           continue;
1912         }
1913       }
1914 
1915       CompressionType compression_type =
1916           raw_block_contents.get_compression_type();
1917       BlockContents contents;
1918       if (compression_type != kNoCompression) {
1919         UncompressionContext context(compression_type);
1920         UncompressionInfo info(context, uncompression_dict, compression_type);
1921         s = UncompressBlockContents(info, req.result.data() + req_offset,
1922                                     handle.size(), &contents, footer.version(),
1923                                     rep_->ioptions, memory_allocator);
1924       } else {
1925         // There are two cases here:
1926         // 1) caller uses the shared buffer (scratch or direct io buffer);
1927         // 2) we use the requst buffer.
1928         // If scratch buffer or direct io buffer is used, we ensure that
1929         // all raw blocks are copyed to the heap as single blocks. If scratch
1930         // buffer is not used, we also have no combined read, so the raw
1931         // block can be used directly.
1932         contents = std::move(raw_block_contents);
1933       }
1934       if (s.ok()) {
1935         (*results)[idx_in_batch].SetOwnedValue(new Block(
1936             std::move(contents), read_amp_bytes_per_bit, ioptions.stats));
1937       }
1938     }
1939     (*statuses)[idx_in_batch] = s;
1940   }
1941 }
1942 
1943 template <typename TBlocklike>
RetrieveBlock(FilePrefetchBuffer * prefetch_buffer,const ReadOptions & ro,const BlockHandle & handle,const UncompressionDict & uncompression_dict,CachableEntry<TBlocklike> * block_entry,BlockType block_type,GetContext * get_context,BlockCacheLookupContext * lookup_context,bool for_compaction,bool use_cache,bool wait_for_cache) const1944 Status BlockBasedTable::RetrieveBlock(
1945     FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro,
1946     const BlockHandle& handle, const UncompressionDict& uncompression_dict,
1947     CachableEntry<TBlocklike>* block_entry, BlockType block_type,
1948     GetContext* get_context, BlockCacheLookupContext* lookup_context,
1949     bool for_compaction, bool use_cache, bool wait_for_cache) const {
1950   assert(block_entry);
1951   assert(block_entry->IsEmpty());
1952 
1953   Status s;
1954   if (use_cache) {
1955     s = MaybeReadBlockAndLoadToCache(
1956         prefetch_buffer, ro, handle, uncompression_dict, wait_for_cache,
1957         block_entry, block_type, get_context, lookup_context,
1958         /*contents=*/nullptr);
1959 
1960     if (!s.ok()) {
1961       return s;
1962     }
1963 
1964     if (block_entry->GetValue() != nullptr ||
1965         block_entry->GetCacheHandle() != nullptr) {
1966       assert(s.ok());
1967       return s;
1968     }
1969   }
1970 
1971   assert(block_entry->IsEmpty());
1972 
1973   const bool no_io = ro.read_tier == kBlockCacheTier;
1974   if (no_io) {
1975     return Status::Incomplete("no blocking io");
1976   }
1977 
1978   const bool maybe_compressed =
1979       block_type != BlockType::kFilter &&
1980       block_type != BlockType::kCompressionDictionary &&
1981       rep_->blocks_maybe_compressed;
1982   const bool do_uncompress = maybe_compressed;
1983   std::unique_ptr<TBlocklike> block;
1984 
1985   {
1986     StopWatch sw(rep_->ioptions.clock, rep_->ioptions.stats,
1987                  READ_BLOCK_GET_MICROS);
1988     s = ReadBlockFromFile(
1989         rep_->file.get(), prefetch_buffer, rep_->footer, ro, handle, &block,
1990         rep_->ioptions, do_uncompress, maybe_compressed, block_type,
1991         uncompression_dict, rep_->persistent_cache_options,
1992         block_type == BlockType::kData
1993             ? rep_->table_options.read_amp_bytes_per_bit
1994             : 0,
1995         GetMemoryAllocator(rep_->table_options), for_compaction,
1996         rep_->blocks_definitely_zstd_compressed,
1997         rep_->table_options.filter_policy.get());
1998 
1999     if (get_context) {
2000       switch (block_type) {
2001         case BlockType::kIndex:
2002           ++(get_context->get_context_stats_.num_index_read);
2003           break;
2004         case BlockType::kFilter:
2005           ++(get_context->get_context_stats_.num_filter_read);
2006           break;
2007         case BlockType::kData:
2008           ++(get_context->get_context_stats_.num_data_read);
2009           break;
2010         default:
2011           break;
2012       }
2013     }
2014   }
2015 
2016   if (!s.ok()) {
2017     return s;
2018   }
2019 
2020   block_entry->SetOwnedValue(block.release());
2021 
2022   assert(s.ok());
2023   return s;
2024 }
2025 
2026 // Explicitly instantiate templates for both "blocklike" types we use.
2027 // This makes it possible to keep the template definitions in the .cc file.
2028 template Status BlockBasedTable::RetrieveBlock<BlockContents>(
2029     FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro,
2030     const BlockHandle& handle, const UncompressionDict& uncompression_dict,
2031     CachableEntry<BlockContents>* block_entry, BlockType block_type,
2032     GetContext* get_context, BlockCacheLookupContext* lookup_context,
2033     bool for_compaction, bool use_cache, bool wait_for_cache) const;
2034 
2035 template Status BlockBasedTable::RetrieveBlock<ParsedFullFilterBlock>(
2036     FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro,
2037     const BlockHandle& handle, const UncompressionDict& uncompression_dict,
2038     CachableEntry<ParsedFullFilterBlock>* block_entry, BlockType block_type,
2039     GetContext* get_context, BlockCacheLookupContext* lookup_context,
2040     bool for_compaction, bool use_cache, bool wait_for_cache) const;
2041 
2042 template Status BlockBasedTable::RetrieveBlock<Block>(
2043     FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro,
2044     const BlockHandle& handle, const UncompressionDict& uncompression_dict,
2045     CachableEntry<Block>* block_entry, BlockType block_type,
2046     GetContext* get_context, BlockCacheLookupContext* lookup_context,
2047     bool for_compaction, bool use_cache, bool wait_for_cache) const;
2048 
2049 template Status BlockBasedTable::RetrieveBlock<UncompressionDict>(
2050     FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro,
2051     const BlockHandle& handle, const UncompressionDict& uncompression_dict,
2052     CachableEntry<UncompressionDict>* block_entry, BlockType block_type,
2053     GetContext* get_context, BlockCacheLookupContext* lookup_context,
2054     bool for_compaction, bool use_cache, bool wait_for_cache) const;
2055 
PartitionedIndexIteratorState(const BlockBasedTable * table,std::unordered_map<uint64_t,CachableEntry<Block>> * block_map)2056 BlockBasedTable::PartitionedIndexIteratorState::PartitionedIndexIteratorState(
2057     const BlockBasedTable* table,
2058     std::unordered_map<uint64_t, CachableEntry<Block>>* block_map)
2059     : table_(table), block_map_(block_map) {}
2060 
2061 InternalIteratorBase<IndexValue>*
NewSecondaryIterator(const BlockHandle & handle)2062 BlockBasedTable::PartitionedIndexIteratorState::NewSecondaryIterator(
2063     const BlockHandle& handle) {
2064   // Return a block iterator on the index partition
2065   auto block = block_map_->find(handle.offset());
2066   // This is a possible scenario since block cache might not have had space
2067   // for the partition
2068   if (block != block_map_->end()) {
2069     const Rep* rep = table_->get_rep();
2070     assert(rep);
2071 
2072     Statistics* kNullStats = nullptr;
2073     // We don't return pinned data from index blocks, so no need
2074     // to set `block_contents_pinned`.
2075     return block->second.GetValue()->NewIndexIterator(
2076         rep->internal_comparator.user_comparator(),
2077         rep->get_global_seqno(BlockType::kIndex), nullptr, kNullStats, true,
2078         rep->index_has_first_key, rep->index_key_includes_seq,
2079         rep->index_value_is_full);
2080   }
2081   // Create an empty iterator
2082   // TODO(ajkr): this is not the right way to handle an unpinned partition.
2083   return new IndexBlockIter();
2084 }
2085 
2086 // This will be broken if the user specifies an unusual implementation
2087 // of Options.comparator, or if the user specifies an unusual
2088 // definition of prefixes in BlockBasedTableOptions.filter_policy.
2089 // In particular, we require the following three properties:
2090 //
2091 // 1) key.starts_with(prefix(key))
2092 // 2) Compare(prefix(key), key) <= 0.
2093 // 3) If Compare(key1, key2) <= 0, then Compare(prefix(key1), prefix(key2)) <= 0
2094 //
2095 // If read_options.read_tier == kBlockCacheTier, this method will do no I/O and
2096 // will return true if the filter block is not in memory and not found in block
2097 // cache.
2098 //
2099 // REQUIRES: this method shouldn't be called while the DB lock is held.
PrefixMayMatch(const Slice & internal_key,const ReadOptions & read_options,const SliceTransform * options_prefix_extractor,const bool need_upper_bound_check,BlockCacheLookupContext * lookup_context) const2100 bool BlockBasedTable::PrefixMayMatch(
2101     const Slice& internal_key, const ReadOptions& read_options,
2102     const SliceTransform* options_prefix_extractor,
2103     const bool need_upper_bound_check,
2104     BlockCacheLookupContext* lookup_context) const {
2105   if (!rep_->filter_policy) {
2106     return true;
2107   }
2108 
2109   const SliceTransform* prefix_extractor;
2110 
2111   if (rep_->table_prefix_extractor == nullptr) {
2112     if (need_upper_bound_check) {
2113       return true;
2114     }
2115     prefix_extractor = options_prefix_extractor;
2116   } else {
2117     prefix_extractor = rep_->table_prefix_extractor.get();
2118   }
2119   auto ts_sz = rep_->internal_comparator.user_comparator()->timestamp_size();
2120   auto user_key_without_ts =
2121       ExtractUserKeyAndStripTimestamp(internal_key, ts_sz);
2122   if (!prefix_extractor->InDomain(user_key_without_ts)) {
2123     return true;
2124   }
2125 
2126   bool may_match = true;
2127 
2128   // First, try check with full filter
2129   FilterBlockReader* const filter = rep_->filter.get();
2130   bool filter_checked = true;
2131   if (filter != nullptr) {
2132     const bool no_io = read_options.read_tier == kBlockCacheTier;
2133 
2134     if (!filter->IsBlockBased()) {
2135       const Slice* const const_ikey_ptr = &internal_key;
2136       may_match = filter->RangeMayExist(
2137           read_options.iterate_upper_bound, user_key_without_ts,
2138           prefix_extractor, rep_->internal_comparator.user_comparator(),
2139           const_ikey_ptr, &filter_checked, need_upper_bound_check, no_io,
2140           lookup_context);
2141     } else {
2142       // if prefix_extractor changed for block based filter, skip filter
2143       if (need_upper_bound_check) {
2144         return true;
2145       }
2146       auto prefix = prefix_extractor->Transform(user_key_without_ts);
2147       InternalKey internal_key_prefix(prefix, kMaxSequenceNumber, kTypeValue);
2148       auto internal_prefix = internal_key_prefix.Encode();
2149 
2150       // To prevent any io operation in this method, we set `read_tier` to make
2151       // sure we always read index or filter only when they have already been
2152       // loaded to memory.
2153       ReadOptions no_io_read_options;
2154       no_io_read_options.read_tier = kBlockCacheTier;
2155 
2156       // Then, try find it within each block
2157       // we already know prefix_extractor and prefix_extractor_name must match
2158       // because `CheckPrefixMayMatch` first checks `check_filter_ == true`
2159       std::unique_ptr<InternalIteratorBase<IndexValue>> iiter(NewIndexIterator(
2160           no_io_read_options,
2161           /*need_upper_bound_check=*/false, /*input_iter=*/nullptr,
2162           /*get_context=*/nullptr, lookup_context));
2163       iiter->Seek(internal_prefix);
2164 
2165       if (!iiter->Valid()) {
2166         // we're past end of file
2167         // if it's incomplete, it means that we avoided I/O
2168         // and we're not really sure that we're past the end
2169         // of the file
2170         may_match = iiter->status().IsIncomplete();
2171       } else if ((rep_->index_key_includes_seq ? ExtractUserKey(iiter->key())
2172                                                : iiter->key())
2173                      .starts_with(ExtractUserKey(internal_prefix))) {
2174         // we need to check for this subtle case because our only
2175         // guarantee is that "the key is a string >= last key in that data
2176         // block" according to the doc/table_format.txt spec.
2177         //
2178         // Suppose iiter->key() starts with the desired prefix; it is not
2179         // necessarily the case that the corresponding data block will
2180         // contain the prefix, since iiter->key() need not be in the
2181         // block.  However, the next data block may contain the prefix, so
2182         // we return true to play it safe.
2183         may_match = true;
2184       } else if (filter->IsBlockBased()) {
2185         // iiter->key() does NOT start with the desired prefix.  Because
2186         // Seek() finds the first key that is >= the seek target, this
2187         // means that iiter->key() > prefix.  Thus, any data blocks coming
2188         // after the data block corresponding to iiter->key() cannot
2189         // possibly contain the key.  Thus, the corresponding data block
2190         // is the only on could potentially contain the prefix.
2191         BlockHandle handle = iiter->value().handle;
2192         may_match = filter->PrefixMayMatch(
2193             prefix, prefix_extractor, handle.offset(), no_io,
2194             /*const_key_ptr=*/nullptr, /*get_context=*/nullptr, lookup_context);
2195       }
2196     }
2197   }
2198 
2199   if (filter_checked) {
2200     Statistics* statistics = rep_->ioptions.stats;
2201     RecordTick(statistics, BLOOM_FILTER_PREFIX_CHECKED);
2202     if (!may_match) {
2203       RecordTick(statistics, BLOOM_FILTER_PREFIX_USEFUL);
2204     }
2205   }
2206 
2207   return may_match;
2208 }
2209 
2210 
NewIterator(const ReadOptions & read_options,const SliceTransform * prefix_extractor,Arena * arena,bool skip_filters,TableReaderCaller caller,size_t compaction_readahead_size,bool allow_unprepared_value)2211 InternalIterator* BlockBasedTable::NewIterator(
2212     const ReadOptions& read_options, const SliceTransform* prefix_extractor,
2213     Arena* arena, bool skip_filters, TableReaderCaller caller,
2214     size_t compaction_readahead_size, bool allow_unprepared_value) {
2215   BlockCacheLookupContext lookup_context{caller};
2216   bool need_upper_bound_check =
2217       read_options.auto_prefix_mode ||
2218       PrefixExtractorChanged(rep_->table_properties.get(), prefix_extractor);
2219   std::unique_ptr<InternalIteratorBase<IndexValue>> index_iter(NewIndexIterator(
2220       read_options,
2221       need_upper_bound_check &&
2222           rep_->index_type == BlockBasedTableOptions::kHashSearch,
2223       /*input_iter=*/nullptr, /*get_context=*/nullptr, &lookup_context));
2224   if (arena == nullptr) {
2225     return new BlockBasedTableIterator(
2226         this, read_options, rep_->internal_comparator, std::move(index_iter),
2227         !skip_filters && !read_options.total_order_seek &&
2228             prefix_extractor != nullptr,
2229         need_upper_bound_check, prefix_extractor, caller,
2230         compaction_readahead_size, allow_unprepared_value);
2231   } else {
2232     auto* mem = arena->AllocateAligned(sizeof(BlockBasedTableIterator));
2233     return new (mem) BlockBasedTableIterator(
2234         this, read_options, rep_->internal_comparator, std::move(index_iter),
2235         !skip_filters && !read_options.total_order_seek &&
2236             prefix_extractor != nullptr,
2237         need_upper_bound_check, prefix_extractor, caller,
2238         compaction_readahead_size, allow_unprepared_value);
2239   }
2240 }
2241 
NewRangeTombstoneIterator(const ReadOptions & read_options)2242 FragmentedRangeTombstoneIterator* BlockBasedTable::NewRangeTombstoneIterator(
2243     const ReadOptions& read_options) {
2244   if (rep_->fragmented_range_dels == nullptr) {
2245     return nullptr;
2246   }
2247   SequenceNumber snapshot = kMaxSequenceNumber;
2248   if (read_options.snapshot != nullptr) {
2249     snapshot = read_options.snapshot->GetSequenceNumber();
2250   }
2251   return new FragmentedRangeTombstoneIterator(
2252       rep_->fragmented_range_dels, rep_->internal_comparator, snapshot);
2253 }
2254 
FullFilterKeyMayMatch(const ReadOptions & read_options,FilterBlockReader * filter,const Slice & internal_key,const bool no_io,const SliceTransform * prefix_extractor,GetContext * get_context,BlockCacheLookupContext * lookup_context) const2255 bool BlockBasedTable::FullFilterKeyMayMatch(
2256     const ReadOptions& read_options, FilterBlockReader* filter,
2257     const Slice& internal_key, const bool no_io,
2258     const SliceTransform* prefix_extractor, GetContext* get_context,
2259     BlockCacheLookupContext* lookup_context) const {
2260   if (filter == nullptr || filter->IsBlockBased()) {
2261     return true;
2262   }
2263   Slice user_key = ExtractUserKey(internal_key);
2264   const Slice* const const_ikey_ptr = &internal_key;
2265   bool may_match = true;
2266   size_t ts_sz = rep_->internal_comparator.user_comparator()->timestamp_size();
2267   Slice user_key_without_ts = StripTimestampFromUserKey(user_key, ts_sz);
2268   if (rep_->whole_key_filtering) {
2269     may_match =
2270         filter->KeyMayMatch(user_key_without_ts, prefix_extractor, kNotValid,
2271                             no_io, const_ikey_ptr, get_context, lookup_context);
2272   } else if (!read_options.total_order_seek && prefix_extractor &&
2273              rep_->table_properties->prefix_extractor_name ==
2274                  prefix_extractor->AsString() &&
2275              prefix_extractor->InDomain(user_key_without_ts) &&
2276              !filter->PrefixMayMatch(
2277                  prefix_extractor->Transform(user_key_without_ts),
2278                  prefix_extractor, kNotValid, no_io, const_ikey_ptr,
2279                  get_context, lookup_context)) {
2280     may_match = false;
2281   }
2282   if (may_match) {
2283     RecordTick(rep_->ioptions.stats, BLOOM_FILTER_FULL_POSITIVE);
2284     PERF_COUNTER_BY_LEVEL_ADD(bloom_filter_full_positive, 1, rep_->level);
2285   }
2286   return may_match;
2287 }
2288 
FullFilterKeysMayMatch(const ReadOptions & read_options,FilterBlockReader * filter,MultiGetRange * range,const bool no_io,const SliceTransform * prefix_extractor,BlockCacheLookupContext * lookup_context) const2289 void BlockBasedTable::FullFilterKeysMayMatch(
2290     const ReadOptions& read_options, FilterBlockReader* filter,
2291     MultiGetRange* range, const bool no_io,
2292     const SliceTransform* prefix_extractor,
2293     BlockCacheLookupContext* lookup_context) const {
2294   if (filter == nullptr || filter->IsBlockBased()) {
2295     return;
2296   }
2297   uint64_t before_keys = range->KeysLeft();
2298   assert(before_keys > 0);  // Caller should ensure
2299   if (rep_->whole_key_filtering) {
2300     filter->KeysMayMatch(range, prefix_extractor, kNotValid, no_io,
2301                          lookup_context);
2302     uint64_t after_keys = range->KeysLeft();
2303     if (after_keys) {
2304       RecordTick(rep_->ioptions.stats, BLOOM_FILTER_FULL_POSITIVE, after_keys);
2305       PERF_COUNTER_BY_LEVEL_ADD(bloom_filter_full_positive, after_keys,
2306                                 rep_->level);
2307     }
2308     uint64_t filtered_keys = before_keys - after_keys;
2309     if (filtered_keys) {
2310       RecordTick(rep_->ioptions.stats, BLOOM_FILTER_USEFUL, filtered_keys);
2311       PERF_COUNTER_BY_LEVEL_ADD(bloom_filter_useful, filtered_keys,
2312                                 rep_->level);
2313     }
2314   } else if (!read_options.total_order_seek && prefix_extractor &&
2315              rep_->table_properties->prefix_extractor_name ==
2316                  prefix_extractor->AsString()) {
2317     filter->PrefixesMayMatch(range, prefix_extractor, kNotValid, false,
2318                              lookup_context);
2319     RecordTick(rep_->ioptions.stats, BLOOM_FILTER_PREFIX_CHECKED, before_keys);
2320     uint64_t after_keys = range->KeysLeft();
2321     uint64_t filtered_keys = before_keys - after_keys;
2322     if (filtered_keys) {
2323       RecordTick(rep_->ioptions.stats, BLOOM_FILTER_PREFIX_USEFUL,
2324                  filtered_keys);
2325     }
2326   }
2327 }
2328 
Get(const ReadOptions & read_options,const Slice & key,GetContext * get_context,const SliceTransform * prefix_extractor,bool skip_filters)2329 Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key,
2330                             GetContext* get_context,
2331                             const SliceTransform* prefix_extractor,
2332                             bool skip_filters) {
2333   assert(key.size() >= 8);  // key must be internal key
2334   assert(get_context != nullptr);
2335   Status s;
2336   const bool no_io = read_options.read_tier == kBlockCacheTier;
2337 
2338   FilterBlockReader* const filter =
2339       !skip_filters ? rep_->filter.get() : nullptr;
2340 
2341   // First check the full filter
2342   // If full filter not useful, Then go into each block
2343   uint64_t tracing_get_id = get_context->get_tracing_get_id();
2344   BlockCacheLookupContext lookup_context{
2345       TableReaderCaller::kUserGet, tracing_get_id,
2346       /*get_from_user_specified_snapshot=*/read_options.snapshot != nullptr};
2347   if (block_cache_tracer_ && block_cache_tracer_->is_tracing_enabled()) {
2348     // Trace the key since it contains both user key and sequence number.
2349     lookup_context.referenced_key = key.ToString();
2350     lookup_context.get_from_user_specified_snapshot =
2351         read_options.snapshot != nullptr;
2352   }
2353   TEST_SYNC_POINT("BlockBasedTable::Get:BeforeFilterMatch");
2354   const bool may_match =
2355       FullFilterKeyMayMatch(read_options, filter, key, no_io, prefix_extractor,
2356                             get_context, &lookup_context);
2357   TEST_SYNC_POINT("BlockBasedTable::Get:AfterFilterMatch");
2358   if (!may_match) {
2359     RecordTick(rep_->ioptions.stats, BLOOM_FILTER_USEFUL);
2360     PERF_COUNTER_BY_LEVEL_ADD(bloom_filter_useful, 1, rep_->level);
2361   } else {
2362     IndexBlockIter iiter_on_stack;
2363     // if prefix_extractor found in block differs from options, disable
2364     // BlockPrefixIndex. Only do this check when index_type is kHashSearch.
2365     bool need_upper_bound_check = false;
2366     if (rep_->index_type == BlockBasedTableOptions::kHashSearch) {
2367       need_upper_bound_check = PrefixExtractorChanged(
2368           rep_->table_properties.get(), prefix_extractor);
2369     }
2370     auto iiter =
2371         NewIndexIterator(read_options, need_upper_bound_check, &iiter_on_stack,
2372                          get_context, &lookup_context);
2373     std::unique_ptr<InternalIteratorBase<IndexValue>> iiter_unique_ptr;
2374     if (iiter != &iiter_on_stack) {
2375       iiter_unique_ptr.reset(iiter);
2376     }
2377 
2378     size_t ts_sz =
2379         rep_->internal_comparator.user_comparator()->timestamp_size();
2380     bool matched = false;  // if such user key matched a key in SST
2381     bool done = false;
2382     for (iiter->Seek(key); iiter->Valid() && !done; iiter->Next()) {
2383       IndexValue v = iiter->value();
2384 
2385       bool not_exist_in_filter =
2386           filter != nullptr && filter->IsBlockBased() == true &&
2387           !filter->KeyMayMatch(ExtractUserKeyAndStripTimestamp(key, ts_sz),
2388                                prefix_extractor, v.handle.offset(), no_io,
2389                                /*const_ikey_ptr=*/nullptr, get_context,
2390                                &lookup_context);
2391 
2392       if (not_exist_in_filter) {
2393         // Not found
2394         // TODO: think about interaction with Merge. If a user key cannot
2395         // cross one data block, we should be fine.
2396         RecordTick(rep_->ioptions.stats, BLOOM_FILTER_USEFUL);
2397         PERF_COUNTER_BY_LEVEL_ADD(bloom_filter_useful, 1, rep_->level);
2398         break;
2399       }
2400 
2401       if (!v.first_internal_key.empty() && !skip_filters &&
2402           UserComparatorWrapper(rep_->internal_comparator.user_comparator())
2403                   .CompareWithoutTimestamp(
2404                       ExtractUserKey(key),
2405                       ExtractUserKey(v.first_internal_key)) < 0) {
2406         // The requested key falls between highest key in previous block and
2407         // lowest key in current block.
2408         break;
2409       }
2410 
2411       BlockCacheLookupContext lookup_data_block_context{
2412           TableReaderCaller::kUserGet, tracing_get_id,
2413           /*get_from_user_specified_snapshot=*/read_options.snapshot !=
2414               nullptr};
2415       bool does_referenced_key_exist = false;
2416       DataBlockIter biter;
2417       uint64_t referenced_data_size = 0;
2418       NewDataBlockIterator<DataBlockIter>(
2419           read_options, v.handle, &biter, BlockType::kData, get_context,
2420           &lookup_data_block_context,
2421           /*s=*/Status(), /*prefetch_buffer*/ nullptr);
2422 
2423       if (no_io && biter.status().IsIncomplete()) {
2424         // couldn't get block from block_cache
2425         // Update Saver.state to Found because we are only looking for
2426         // whether we can guarantee the key is not there when "no_io" is set
2427         get_context->MarkKeyMayExist();
2428         s = biter.status();
2429         break;
2430       }
2431       if (!biter.status().ok()) {
2432         s = biter.status();
2433         break;
2434       }
2435 
2436       bool may_exist = biter.SeekForGet(key);
2437       // If user-specified timestamp is supported, we cannot end the search
2438       // just because hash index lookup indicates the key+ts does not exist.
2439       if (!may_exist && ts_sz == 0) {
2440         // HashSeek cannot find the key this block and the the iter is not
2441         // the end of the block, i.e. cannot be in the following blocks
2442         // either. In this case, the seek_key cannot be found, so we break
2443         // from the top level for-loop.
2444         done = true;
2445       } else {
2446         // Call the *saver function on each entry/block until it returns false
2447         for (; biter.Valid(); biter.Next()) {
2448           ParsedInternalKey parsed_key;
2449           Status pik_status = ParseInternalKey(
2450               biter.key(), &parsed_key, false /* log_err_key */);  // TODO
2451           if (!pik_status.ok()) {
2452             s = pik_status;
2453           }
2454 
2455           if (!get_context->SaveValue(
2456                   parsed_key, biter.value(), &matched,
2457                   biter.IsValuePinned() ? &biter : nullptr)) {
2458             if (get_context->State() == GetContext::GetState::kFound) {
2459               does_referenced_key_exist = true;
2460               referenced_data_size = biter.key().size() + biter.value().size();
2461             }
2462             done = true;
2463             break;
2464           }
2465         }
2466         s = biter.status();
2467       }
2468       // Write the block cache access record.
2469       if (block_cache_tracer_ && block_cache_tracer_->is_tracing_enabled()) {
2470         // Avoid making copy of block_key, cf_name, and referenced_key when
2471         // constructing the access record.
2472         Slice referenced_key;
2473         if (does_referenced_key_exist) {
2474           referenced_key = biter.key();
2475         } else {
2476           referenced_key = key;
2477         }
2478         BlockCacheTraceRecord access_record(
2479             rep_->ioptions.clock->NowMicros(),
2480             /*block_key=*/"", lookup_data_block_context.block_type,
2481             lookup_data_block_context.block_size, rep_->cf_id_for_tracing(),
2482             /*cf_name=*/"", rep_->level_for_tracing(),
2483             rep_->sst_number_for_tracing(), lookup_data_block_context.caller,
2484             lookup_data_block_context.is_cache_hit,
2485             lookup_data_block_context.no_insert,
2486             lookup_data_block_context.get_id,
2487             lookup_data_block_context.get_from_user_specified_snapshot,
2488             /*referenced_key=*/"", referenced_data_size,
2489             lookup_data_block_context.num_keys_in_block,
2490             does_referenced_key_exist);
2491         // TODO: Should handle status here?
2492         block_cache_tracer_
2493             ->WriteBlockAccess(access_record,
2494                                lookup_data_block_context.block_key,
2495                                rep_->cf_name_for_tracing(), referenced_key)
2496             .PermitUncheckedError();
2497       }
2498 
2499       if (done) {
2500         // Avoid the extra Next which is expensive in two-level indexes
2501         break;
2502       }
2503     }
2504     if (matched && filter != nullptr && !filter->IsBlockBased()) {
2505       RecordTick(rep_->ioptions.stats, BLOOM_FILTER_FULL_TRUE_POSITIVE);
2506       PERF_COUNTER_BY_LEVEL_ADD(bloom_filter_full_true_positive, 1,
2507                                 rep_->level);
2508     }
2509     if (s.ok() && !iiter->status().IsNotFound()) {
2510       s = iiter->status();
2511     }
2512   }
2513 
2514   return s;
2515 }
2516 
2517 using MultiGetRange = MultiGetContext::Range;
MultiGet(const ReadOptions & read_options,const MultiGetRange * mget_range,const SliceTransform * prefix_extractor,bool skip_filters)2518 void BlockBasedTable::MultiGet(const ReadOptions& read_options,
2519                                const MultiGetRange* mget_range,
2520                                const SliceTransform* prefix_extractor,
2521                                bool skip_filters) {
2522   if (mget_range->empty()) {
2523     // Caller should ensure non-empty (performance bug)
2524     assert(false);
2525     return;  // Nothing to do
2526   }
2527 
2528   FilterBlockReader* const filter =
2529       !skip_filters ? rep_->filter.get() : nullptr;
2530   MultiGetRange sst_file_range(*mget_range, mget_range->begin(),
2531                                mget_range->end());
2532 
2533   // First check the full filter
2534   // If full filter not useful, Then go into each block
2535   const bool no_io = read_options.read_tier == kBlockCacheTier;
2536   uint64_t tracing_mget_id = BlockCacheTraceHelper::kReservedGetId;
2537   if (sst_file_range.begin()->get_context) {
2538     tracing_mget_id = sst_file_range.begin()->get_context->get_tracing_get_id();
2539   }
2540   BlockCacheLookupContext lookup_context{
2541       TableReaderCaller::kUserMultiGet, tracing_mget_id,
2542       /*get_from_user_specified_snapshot=*/read_options.snapshot != nullptr};
2543   FullFilterKeysMayMatch(read_options, filter, &sst_file_range, no_io,
2544                          prefix_extractor, &lookup_context);
2545 
2546   if (!sst_file_range.empty()) {
2547     IndexBlockIter iiter_on_stack;
2548     // if prefix_extractor found in block differs from options, disable
2549     // BlockPrefixIndex. Only do this check when index_type is kHashSearch.
2550     bool need_upper_bound_check = false;
2551     if (rep_->index_type == BlockBasedTableOptions::kHashSearch) {
2552       need_upper_bound_check = PrefixExtractorChanged(
2553           rep_->table_properties.get(), prefix_extractor);
2554     }
2555     auto iiter =
2556         NewIndexIterator(read_options, need_upper_bound_check, &iiter_on_stack,
2557                          sst_file_range.begin()->get_context, &lookup_context);
2558     std::unique_ptr<InternalIteratorBase<IndexValue>> iiter_unique_ptr;
2559     if (iiter != &iiter_on_stack) {
2560       iiter_unique_ptr.reset(iiter);
2561     }
2562 
2563     uint64_t offset = std::numeric_limits<uint64_t>::max();
2564     autovector<BlockHandle, MultiGetContext::MAX_BATCH_SIZE> block_handles;
2565     autovector<CachableEntry<Block>, MultiGetContext::MAX_BATCH_SIZE> results;
2566     autovector<Status, MultiGetContext::MAX_BATCH_SIZE> statuses;
2567     char stack_buf[kMultiGetReadStackBufSize];
2568     std::unique_ptr<char[]> block_buf;
2569     {
2570       MultiGetRange data_block_range(sst_file_range, sst_file_range.begin(),
2571                                      sst_file_range.end());
2572       std::vector<Cache::Handle*> cache_handles;
2573       bool wait_for_cache_results = false;
2574 
2575       CachableEntry<UncompressionDict> uncompression_dict;
2576       Status uncompression_dict_status;
2577       uncompression_dict_status.PermitUncheckedError();
2578       bool uncompression_dict_inited = false;
2579       size_t total_len = 0;
2580       ReadOptions ro = read_options;
2581       ro.read_tier = kBlockCacheTier;
2582 
2583       for (auto miter = data_block_range.begin();
2584            miter != data_block_range.end(); ++miter) {
2585         const Slice& key = miter->ikey;
2586         iiter->Seek(miter->ikey);
2587 
2588         IndexValue v;
2589         if (iiter->Valid()) {
2590           v = iiter->value();
2591         }
2592         if (!iiter->Valid() ||
2593             (!v.first_internal_key.empty() && !skip_filters &&
2594              UserComparatorWrapper(rep_->internal_comparator.user_comparator())
2595                      .CompareWithoutTimestamp(
2596                          ExtractUserKey(key),
2597                          ExtractUserKey(v.first_internal_key)) < 0)) {
2598           // The requested key falls between highest key in previous block and
2599           // lowest key in current block.
2600           if (!iiter->status().IsNotFound()) {
2601             *(miter->s) = iiter->status();
2602           }
2603           data_block_range.SkipKey(miter);
2604           sst_file_range.SkipKey(miter);
2605           continue;
2606         }
2607 
2608         if (!uncompression_dict_inited && rep_->uncompression_dict_reader) {
2609           uncompression_dict_status =
2610               rep_->uncompression_dict_reader->GetOrReadUncompressionDictionary(
2611                   nullptr /* prefetch_buffer */, no_io,
2612                   sst_file_range.begin()->get_context, &lookup_context,
2613                   &uncompression_dict);
2614           uncompression_dict_inited = true;
2615         }
2616 
2617         if (!uncompression_dict_status.ok()) {
2618           assert(!uncompression_dict_status.IsNotFound());
2619           *(miter->s) = uncompression_dict_status;
2620           data_block_range.SkipKey(miter);
2621           sst_file_range.SkipKey(miter);
2622           continue;
2623         }
2624 
2625         statuses.emplace_back();
2626         results.emplace_back();
2627         if (v.handle.offset() == offset) {
2628           // We're going to reuse the block for this key later on. No need to
2629           // look it up now. Place a null handle
2630           block_handles.emplace_back(BlockHandle::NullBlockHandle());
2631           continue;
2632         }
2633         // Lookup the cache for the given data block referenced by an index
2634         // iterator value (i.e BlockHandle). If it exists in the cache,
2635         // initialize block to the contents of the data block.
2636         offset = v.handle.offset();
2637         BlockHandle handle = v.handle;
2638         BlockCacheLookupContext lookup_data_block_context(
2639             TableReaderCaller::kUserMultiGet);
2640         const UncompressionDict& dict = uncompression_dict.GetValue()
2641                                             ? *uncompression_dict.GetValue()
2642                                             : UncompressionDict::GetEmptyDict();
2643         Status s = RetrieveBlock(
2644             nullptr, ro, handle, dict, &(results.back()), BlockType::kData,
2645             miter->get_context, &lookup_data_block_context,
2646             /* for_compaction */ false, /* use_cache */ true,
2647             /* wait_for_cache */ false);
2648         if (s.IsIncomplete()) {
2649           s = Status::OK();
2650         }
2651         if (s.ok() && !results.back().IsEmpty()) {
2652           // Since we have a valid handle, check the value. If its nullptr,
2653           // it means the cache is waiting for the final result and we're
2654           // supposed to call WaitAll() to wait for the result.
2655           if (results.back().GetValue() != nullptr) {
2656             // Found it in the cache. Add NULL handle to indicate there is
2657             // nothing to read from disk.
2658             if (results.back().GetCacheHandle()) {
2659               results.back().UpdateCachedValue();
2660             }
2661             block_handles.emplace_back(BlockHandle::NullBlockHandle());
2662           } else {
2663             // We have to wait for the cache lookup to finish in the
2664             // background, and then we may have to read the block from disk
2665             // anyway
2666             assert(results.back().GetCacheHandle());
2667             wait_for_cache_results = true;
2668             block_handles.emplace_back(handle);
2669             cache_handles.emplace_back(results.back().GetCacheHandle());
2670           }
2671         } else {
2672           block_handles.emplace_back(handle);
2673           total_len += block_size(handle);
2674         }
2675       }
2676 
2677       if (wait_for_cache_results) {
2678         Cache* block_cache = rep_->table_options.block_cache.get();
2679         block_cache->WaitAll(cache_handles);
2680         for (size_t i = 0; i < block_handles.size(); ++i) {
2681           // If this block was a success or failure or not needed because
2682           // the corresponding key is in the same block as a prior key, skip
2683           if (block_handles[i] == BlockHandle::NullBlockHandle() ||
2684               results[i].IsEmpty()) {
2685             continue;
2686           }
2687           results[i].UpdateCachedValue();
2688           void* val = results[i].GetValue();
2689           if (!val) {
2690             // The async cache lookup failed - could be due to an error
2691             // or a false positive. We need to read the data block from
2692             // the SST file
2693             results[i].Reset();
2694             total_len += block_size(block_handles[i]);
2695           } else {
2696             block_handles[i] = BlockHandle::NullBlockHandle();
2697           }
2698         }
2699       }
2700 
2701       if (total_len) {
2702         char* scratch = nullptr;
2703         const UncompressionDict& dict = uncompression_dict.GetValue()
2704                                             ? *uncompression_dict.GetValue()
2705                                             : UncompressionDict::GetEmptyDict();
2706         assert(uncompression_dict_inited || !rep_->uncompression_dict_reader);
2707         assert(uncompression_dict_status.ok());
2708         // If using direct IO, then scratch is not used, so keep it nullptr.
2709         // If the blocks need to be uncompressed and we don't need the
2710         // compressed blocks, then we can use a contiguous block of
2711         // memory to read in all the blocks as it will be temporary
2712         // storage
2713         // 1. If blocks are compressed and compressed block cache is there,
2714         //    alloc heap bufs
2715         // 2. If blocks are uncompressed, alloc heap bufs
2716         // 3. If blocks are compressed and no compressed block cache, use
2717         //    stack buf
2718         if (!rep_->file->use_direct_io() &&
2719             rep_->table_options.block_cache_compressed == nullptr &&
2720             rep_->blocks_maybe_compressed) {
2721           if (total_len <= kMultiGetReadStackBufSize) {
2722             scratch = stack_buf;
2723           } else {
2724             scratch = new char[total_len];
2725             block_buf.reset(scratch);
2726           }
2727         }
2728         RetrieveMultipleBlocks(read_options, &data_block_range, &block_handles,
2729                                &statuses, &results, scratch, dict);
2730         if (sst_file_range.begin()->get_context) {
2731           ++(sst_file_range.begin()
2732                  ->get_context->get_context_stats_.num_sst_read);
2733         }
2734       }
2735     }
2736 
2737     DataBlockIter first_biter;
2738     DataBlockIter next_biter;
2739     size_t idx_in_batch = 0;
2740     for (auto miter = sst_file_range.begin(); miter != sst_file_range.end();
2741          ++miter) {
2742       Status s;
2743       GetContext* get_context = miter->get_context;
2744       const Slice& key = miter->ikey;
2745       bool matched = false;  // if such user key matched a key in SST
2746       bool done = false;
2747       bool first_block = true;
2748       do {
2749         DataBlockIter* biter = nullptr;
2750         bool reusing_block = true;
2751         uint64_t referenced_data_size = 0;
2752         bool does_referenced_key_exist = false;
2753         BlockCacheLookupContext lookup_data_block_context(
2754             TableReaderCaller::kUserMultiGet, tracing_mget_id,
2755             /*get_from_user_specified_snapshot=*/read_options.snapshot !=
2756                 nullptr);
2757         if (first_block) {
2758           if (!block_handles[idx_in_batch].IsNull() ||
2759               !results[idx_in_batch].IsEmpty()) {
2760             first_biter.Invalidate(Status::OK());
2761             NewDataBlockIterator<DataBlockIter>(
2762                 read_options, results[idx_in_batch], &first_biter,
2763                 statuses[idx_in_batch]);
2764             reusing_block = false;
2765           } else {
2766             // If handler is null and result is empty, then the status is never
2767             // set, which should be the initial value: ok().
2768             assert(statuses[idx_in_batch].ok());
2769           }
2770           biter = &first_biter;
2771           idx_in_batch++;
2772         } else {
2773           IndexValue v = iiter->value();
2774           if (!v.first_internal_key.empty() && !skip_filters &&
2775               UserComparatorWrapper(rep_->internal_comparator.user_comparator())
2776                       .CompareWithoutTimestamp(
2777                           ExtractUserKey(key),
2778                           ExtractUserKey(v.first_internal_key)) < 0) {
2779             // The requested key falls between highest key in previous block and
2780             // lowest key in current block.
2781             break;
2782           }
2783 
2784           next_biter.Invalidate(Status::OK());
2785           NewDataBlockIterator<DataBlockIter>(
2786               read_options, iiter->value().handle, &next_biter,
2787               BlockType::kData, get_context, &lookup_data_block_context,
2788               Status(), nullptr);
2789           biter = &next_biter;
2790           reusing_block = false;
2791         }
2792 
2793         if (read_options.read_tier == kBlockCacheTier &&
2794             biter->status().IsIncomplete()) {
2795           // couldn't get block from block_cache
2796           // Update Saver.state to Found because we are only looking for
2797           // whether we can guarantee the key is not there when "no_io" is set
2798           get_context->MarkKeyMayExist();
2799           break;
2800         }
2801         if (!biter->status().ok()) {
2802           s = biter->status();
2803           break;
2804         }
2805 
2806         bool may_exist = biter->SeekForGet(key);
2807         if (!may_exist) {
2808           // HashSeek cannot find the key this block and the the iter is not
2809           // the end of the block, i.e. cannot be in the following blocks
2810           // either. In this case, the seek_key cannot be found, so we break
2811           // from the top level for-loop.
2812           break;
2813         }
2814 
2815         // Call the *saver function on each entry/block until it returns false
2816         for (; biter->Valid(); biter->Next()) {
2817           ParsedInternalKey parsed_key;
2818           Cleanable dummy;
2819           Cleanable* value_pinner = nullptr;
2820           Status pik_status = ParseInternalKey(
2821               biter->key(), &parsed_key, false /* log_err_key */);  // TODO
2822           if (!pik_status.ok()) {
2823             s = pik_status;
2824           }
2825           if (biter->IsValuePinned()) {
2826             if (reusing_block) {
2827               Cache* block_cache = rep_->table_options.block_cache.get();
2828               assert(biter->cache_handle() != nullptr);
2829               block_cache->Ref(biter->cache_handle());
2830               dummy.RegisterCleanup(&ReleaseCachedEntry, block_cache,
2831                                     biter->cache_handle());
2832               value_pinner = &dummy;
2833             } else {
2834               value_pinner = biter;
2835             }
2836           }
2837           if (!get_context->SaveValue(parsed_key, biter->value(), &matched,
2838                                       value_pinner)) {
2839             if (get_context->State() == GetContext::GetState::kFound) {
2840               does_referenced_key_exist = true;
2841               referenced_data_size =
2842                   biter->key().size() + biter->value().size();
2843             }
2844             done = true;
2845             break;
2846           }
2847           s = biter->status();
2848         }
2849         // Write the block cache access.
2850         if (block_cache_tracer_ && block_cache_tracer_->is_tracing_enabled()) {
2851           // Avoid making copy of block_key, cf_name, and referenced_key when
2852           // constructing the access record.
2853           Slice referenced_key;
2854           if (does_referenced_key_exist) {
2855             referenced_key = biter->key();
2856           } else {
2857             referenced_key = key;
2858           }
2859           BlockCacheTraceRecord access_record(
2860               rep_->ioptions.clock->NowMicros(),
2861               /*block_key=*/"", lookup_data_block_context.block_type,
2862               lookup_data_block_context.block_size, rep_->cf_id_for_tracing(),
2863               /*cf_name=*/"", rep_->level_for_tracing(),
2864               rep_->sst_number_for_tracing(), lookup_data_block_context.caller,
2865               lookup_data_block_context.is_cache_hit,
2866               lookup_data_block_context.no_insert,
2867               lookup_data_block_context.get_id,
2868               lookup_data_block_context.get_from_user_specified_snapshot,
2869               /*referenced_key=*/"", referenced_data_size,
2870               lookup_data_block_context.num_keys_in_block,
2871               does_referenced_key_exist);
2872           // TODO: Should handle status here?
2873           block_cache_tracer_
2874               ->WriteBlockAccess(access_record,
2875                                  lookup_data_block_context.block_key,
2876                                  rep_->cf_name_for_tracing(), referenced_key)
2877               .PermitUncheckedError();
2878         }
2879         s = biter->status();
2880         if (done) {
2881           // Avoid the extra Next which is expensive in two-level indexes
2882           break;
2883         }
2884         if (first_block) {
2885           iiter->Seek(key);
2886         }
2887         first_block = false;
2888         iiter->Next();
2889       } while (iiter->Valid());
2890 
2891       if (matched && filter != nullptr && !filter->IsBlockBased()) {
2892         RecordTick(rep_->ioptions.stats, BLOOM_FILTER_FULL_TRUE_POSITIVE);
2893         PERF_COUNTER_BY_LEVEL_ADD(bloom_filter_full_true_positive, 1,
2894                                   rep_->level);
2895       }
2896       if (s.ok() && !iiter->status().IsNotFound()) {
2897         s = iiter->status();
2898       }
2899       *(miter->s) = s;
2900     }
2901 #ifdef ROCKSDB_ASSERT_STATUS_CHECKED
2902     // Not sure why we need to do it. Should investigate more.
2903     for (auto& st : statuses) {
2904       st.PermitUncheckedError();
2905     }
2906 #endif  // ROCKSDB_ASSERT_STATUS_CHECKED
2907   }
2908 }
2909 
Prefetch(const Slice * const begin,const Slice * const end)2910 Status BlockBasedTable::Prefetch(const Slice* const begin,
2911                                  const Slice* const end) {
2912   auto& comparator = rep_->internal_comparator;
2913   UserComparatorWrapper user_comparator(comparator.user_comparator());
2914   // pre-condition
2915   if (begin && end && comparator.Compare(*begin, *end) > 0) {
2916     return Status::InvalidArgument(*begin, *end);
2917   }
2918   BlockCacheLookupContext lookup_context{TableReaderCaller::kPrefetch};
2919   IndexBlockIter iiter_on_stack;
2920   auto iiter = NewIndexIterator(ReadOptions(), /*need_upper_bound_check=*/false,
2921                                 &iiter_on_stack, /*get_context=*/nullptr,
2922                                 &lookup_context);
2923   std::unique_ptr<InternalIteratorBase<IndexValue>> iiter_unique_ptr;
2924   if (iiter != &iiter_on_stack) {
2925     iiter_unique_ptr = std::unique_ptr<InternalIteratorBase<IndexValue>>(iiter);
2926   }
2927 
2928   if (!iiter->status().ok()) {
2929     // error opening index iterator
2930     return iiter->status();
2931   }
2932 
2933   // indicates if we are on the last page that need to be pre-fetched
2934   bool prefetching_boundary_page = false;
2935 
2936   for (begin ? iiter->Seek(*begin) : iiter->SeekToFirst(); iiter->Valid();
2937        iiter->Next()) {
2938     BlockHandle block_handle = iiter->value().handle;
2939     const bool is_user_key = !rep_->index_key_includes_seq;
2940     if (end &&
2941         ((!is_user_key && comparator.Compare(iiter->key(), *end) >= 0) ||
2942          (is_user_key &&
2943           user_comparator.Compare(iiter->key(), ExtractUserKey(*end)) >= 0))) {
2944       if (prefetching_boundary_page) {
2945         break;
2946       }
2947 
2948       // The index entry represents the last key in the data block.
2949       // We should load this page into memory as well, but no more
2950       prefetching_boundary_page = true;
2951     }
2952 
2953     // Load the block specified by the block_handle into the block cache
2954     DataBlockIter biter;
2955 
2956     NewDataBlockIterator<DataBlockIter>(
2957         ReadOptions(), block_handle, &biter, /*type=*/BlockType::kData,
2958         /*get_context=*/nullptr, &lookup_context, Status(),
2959         /*prefetch_buffer=*/nullptr);
2960 
2961     if (!biter.status().ok()) {
2962       // there was an unexpected error while pre-fetching
2963       return biter.status();
2964     }
2965   }
2966 
2967   return Status::OK();
2968 }
2969 
VerifyChecksum(const ReadOptions & read_options,TableReaderCaller caller)2970 Status BlockBasedTable::VerifyChecksum(const ReadOptions& read_options,
2971                                        TableReaderCaller caller) {
2972   Status s;
2973   // Check Meta blocks
2974   std::unique_ptr<Block> metaindex;
2975   std::unique_ptr<InternalIterator> metaindex_iter;
2976   ReadOptions ro;
2977   s = ReadMetaIndexBlock(ro, nullptr /* prefetch buffer */, &metaindex,
2978                          &metaindex_iter);
2979   if (s.ok()) {
2980     s = VerifyChecksumInMetaBlocks(metaindex_iter.get());
2981     if (!s.ok()) {
2982       return s;
2983     }
2984   } else {
2985     return s;
2986   }
2987   // Check Data blocks
2988   IndexBlockIter iiter_on_stack;
2989   BlockCacheLookupContext context{caller};
2990   InternalIteratorBase<IndexValue>* iiter = NewIndexIterator(
2991       read_options, /*disable_prefix_seek=*/false, &iiter_on_stack,
2992       /*get_context=*/nullptr, &context);
2993   std::unique_ptr<InternalIteratorBase<IndexValue>> iiter_unique_ptr;
2994   if (iiter != &iiter_on_stack) {
2995     iiter_unique_ptr = std::unique_ptr<InternalIteratorBase<IndexValue>>(iiter);
2996   }
2997   if (!iiter->status().ok()) {
2998     // error opening index iterator
2999     return iiter->status();
3000   }
3001   s = VerifyChecksumInBlocks(read_options, iiter);
3002   return s;
3003 }
3004 
VerifyChecksumInBlocks(const ReadOptions & read_options,InternalIteratorBase<IndexValue> * index_iter)3005 Status BlockBasedTable::VerifyChecksumInBlocks(
3006     const ReadOptions& read_options,
3007     InternalIteratorBase<IndexValue>* index_iter) {
3008   Status s;
3009   // We are scanning the whole file, so no need to do exponential
3010   // increasing of the buffer size.
3011   size_t readahead_size = (read_options.readahead_size != 0)
3012                               ? read_options.readahead_size
3013                               : rep_->table_options.max_auto_readahead_size;
3014   // FilePrefetchBuffer doesn't work in mmap mode and readahead is not
3015   // needed there.
3016   FilePrefetchBuffer prefetch_buffer(
3017       rep_->file.get(), readahead_size /* readahead_size */,
3018       readahead_size /* max_readahead_size */,
3019       !rep_->ioptions.allow_mmap_reads /* enable */);
3020 
3021   for (index_iter->SeekToFirst(); index_iter->Valid(); index_iter->Next()) {
3022     s = index_iter->status();
3023     if (!s.ok()) {
3024       break;
3025     }
3026     BlockHandle handle = index_iter->value().handle;
3027     BlockContents contents;
3028     BlockFetcher block_fetcher(
3029         rep_->file.get(), &prefetch_buffer, rep_->footer, ReadOptions(), handle,
3030         &contents, rep_->ioptions, false /* decompress */,
3031         false /*maybe_compressed*/, BlockType::kData,
3032         UncompressionDict::GetEmptyDict(), rep_->persistent_cache_options);
3033     s = block_fetcher.ReadBlockContents();
3034     if (!s.ok()) {
3035       break;
3036     }
3037   }
3038   if (s.ok()) {
3039     // In the case of two level indexes, we would have exited the above loop
3040     // by checking index_iter->Valid(), but Valid() might have returned false
3041     // due to an IO error. So check the index_iter status
3042     s = index_iter->status();
3043   }
3044   return s;
3045 }
3046 
GetBlockTypeForMetaBlockByName(const Slice & meta_block_name)3047 BlockType BlockBasedTable::GetBlockTypeForMetaBlockByName(
3048     const Slice& meta_block_name) {
3049   if (meta_block_name.starts_with(kFilterBlockPrefix) ||
3050       meta_block_name.starts_with(kFullFilterBlockPrefix) ||
3051       meta_block_name.starts_with(kPartitionedFilterBlockPrefix)) {
3052     return BlockType::kFilter;
3053   }
3054 
3055   if (meta_block_name == kPropertiesBlock) {
3056     return BlockType::kProperties;
3057   }
3058 
3059   if (meta_block_name == kCompressionDictBlock) {
3060     return BlockType::kCompressionDictionary;
3061   }
3062 
3063   if (meta_block_name == kRangeDelBlock) {
3064     return BlockType::kRangeDeletion;
3065   }
3066 
3067   if (meta_block_name == kHashIndexPrefixesBlock) {
3068     return BlockType::kHashIndexPrefixes;
3069   }
3070 
3071   if (meta_block_name == kHashIndexPrefixesMetadataBlock) {
3072     return BlockType::kHashIndexMetadata;
3073   }
3074 
3075   assert(false);
3076   return BlockType::kInvalid;
3077 }
3078 
VerifyChecksumInMetaBlocks(InternalIteratorBase<Slice> * index_iter)3079 Status BlockBasedTable::VerifyChecksumInMetaBlocks(
3080     InternalIteratorBase<Slice>* index_iter) {
3081   Status s;
3082   for (index_iter->SeekToFirst(); index_iter->Valid(); index_iter->Next()) {
3083     s = index_iter->status();
3084     if (!s.ok()) {
3085       break;
3086     }
3087     BlockHandle handle;
3088     Slice input = index_iter->value();
3089     s = handle.DecodeFrom(&input);
3090     BlockContents contents;
3091     const Slice meta_block_name = index_iter->key();
3092     BlockFetcher block_fetcher(
3093         rep_->file.get(), nullptr /* prefetch buffer */, rep_->footer,
3094         ReadOptions(), handle, &contents, rep_->ioptions,
3095         false /* decompress */, false /*maybe_compressed*/,
3096         GetBlockTypeForMetaBlockByName(meta_block_name),
3097         UncompressionDict::GetEmptyDict(), rep_->persistent_cache_options);
3098     s = block_fetcher.ReadBlockContents();
3099     if (s.IsCorruption() && meta_block_name == kPropertiesBlock) {
3100       TableProperties* table_properties;
3101       ReadOptions ro;
3102       s = TryReadPropertiesWithGlobalSeqno(ro, nullptr /* prefetch_buffer */,
3103                                            index_iter->value(),
3104                                            &table_properties);
3105       delete table_properties;
3106     }
3107     if (!s.ok()) {
3108       break;
3109     }
3110   }
3111   return s;
3112 }
3113 
TEST_BlockInCache(const BlockHandle & handle) const3114 bool BlockBasedTable::TEST_BlockInCache(const BlockHandle& handle) const {
3115   assert(rep_ != nullptr);
3116 
3117   Cache* const cache = rep_->table_options.block_cache.get();
3118   if (cache == nullptr) {
3119     return false;
3120   }
3121 
3122   char cache_key_storage[kMaxCacheKeyPrefixSize + kMaxVarint64Length];
3123   Slice cache_key =
3124       GetCacheKey(rep_->cache_key_prefix, rep_->cache_key_prefix_size, handle,
3125                   cache_key_storage);
3126 
3127   Cache::Handle* const cache_handle = cache->Lookup(cache_key);
3128   if (cache_handle == nullptr) {
3129     return false;
3130   }
3131 
3132   cache->Release(cache_handle);
3133 
3134   return true;
3135 }
3136 
TEST_KeyInCache(const ReadOptions & options,const Slice & key)3137 bool BlockBasedTable::TEST_KeyInCache(const ReadOptions& options,
3138                                       const Slice& key) {
3139   std::unique_ptr<InternalIteratorBase<IndexValue>> iiter(NewIndexIterator(
3140       options, /*need_upper_bound_check=*/false, /*input_iter=*/nullptr,
3141       /*get_context=*/nullptr, /*lookup_context=*/nullptr));
3142   iiter->Seek(key);
3143   assert(iiter->Valid());
3144 
3145   return TEST_BlockInCache(iiter->value().handle);
3146 }
3147 
3148 // REQUIRES: The following fields of rep_ should have already been populated:
3149 //  1. file
3150 //  2. index_handle,
3151 //  3. options
3152 //  4. internal_comparator
3153 //  5. index_type
CreateIndexReader(const ReadOptions & ro,FilePrefetchBuffer * prefetch_buffer,InternalIterator * preloaded_meta_index_iter,bool use_cache,bool prefetch,bool pin,BlockCacheLookupContext * lookup_context,std::unique_ptr<IndexReader> * index_reader)3154 Status BlockBasedTable::CreateIndexReader(
3155     const ReadOptions& ro, FilePrefetchBuffer* prefetch_buffer,
3156     InternalIterator* preloaded_meta_index_iter, bool use_cache, bool prefetch,
3157     bool pin, BlockCacheLookupContext* lookup_context,
3158     std::unique_ptr<IndexReader>* index_reader) {
3159   // kHashSearch requires non-empty prefix_extractor but bypass checking
3160   // prefix_extractor here since we have no access to MutableCFOptions.
3161   // Add need_upper_bound_check flag in  BlockBasedTable::NewIndexIterator.
3162   // If prefix_extractor does not match prefix_extractor_name from table
3163   // properties, turn off Hash Index by setting total_order_seek to true
3164 
3165   switch (rep_->index_type) {
3166     case BlockBasedTableOptions::kTwoLevelIndexSearch: {
3167       return PartitionIndexReader::Create(this, ro, prefetch_buffer, use_cache,
3168                                           prefetch, pin, lookup_context,
3169                                           index_reader);
3170     }
3171     case BlockBasedTableOptions::kBinarySearch:
3172       FALLTHROUGH_INTENDED;
3173     case BlockBasedTableOptions::kBinarySearchWithFirstKey: {
3174       return BinarySearchIndexReader::Create(this, ro, prefetch_buffer,
3175                                              use_cache, prefetch, pin,
3176                                              lookup_context, index_reader);
3177     }
3178     case BlockBasedTableOptions::kHashSearch: {
3179       std::unique_ptr<Block> metaindex_guard;
3180       std::unique_ptr<InternalIterator> metaindex_iter_guard;
3181       auto meta_index_iter = preloaded_meta_index_iter;
3182       bool should_fallback = false;
3183       if (rep_->internal_prefix_transform.get() == nullptr) {
3184         ROCKS_LOG_WARN(rep_->ioptions.logger,
3185                        "No prefix extractor passed in. Fall back to binary"
3186                        " search index.");
3187         should_fallback = true;
3188       } else if (meta_index_iter == nullptr) {
3189         auto s = ReadMetaIndexBlock(ro, prefetch_buffer, &metaindex_guard,
3190                                     &metaindex_iter_guard);
3191         if (!s.ok()) {
3192           // we simply fall back to binary search in case there is any
3193           // problem with prefix hash index loading.
3194           ROCKS_LOG_WARN(rep_->ioptions.logger,
3195                          "Unable to read the metaindex block."
3196                          " Fall back to binary search index.");
3197           should_fallback = true;
3198         }
3199         meta_index_iter = metaindex_iter_guard.get();
3200       }
3201 
3202       if (should_fallback) {
3203         return BinarySearchIndexReader::Create(this, ro, prefetch_buffer,
3204                                                use_cache, prefetch, pin,
3205                                                lookup_context, index_reader);
3206       } else {
3207         return HashIndexReader::Create(this, ro, prefetch_buffer,
3208                                        meta_index_iter, use_cache, prefetch,
3209                                        pin, lookup_context, index_reader);
3210       }
3211     }
3212     default: {
3213       std::string error_message =
3214           "Unrecognized index type: " + ToString(rep_->index_type);
3215       return Status::InvalidArgument(error_message.c_str());
3216     }
3217   }
3218 }
3219 
ApproximateDataOffsetOf(const InternalIteratorBase<IndexValue> & index_iter,uint64_t data_size) const3220 uint64_t BlockBasedTable::ApproximateDataOffsetOf(
3221     const InternalIteratorBase<IndexValue>& index_iter,
3222     uint64_t data_size) const {
3223   if (index_iter.Valid()) {
3224     BlockHandle handle = index_iter.value().handle;
3225     return handle.offset();
3226   } else {
3227     // The iterator is past the last key in the file.
3228     return data_size;
3229   }
3230 }
3231 
GetApproximateDataSize()3232 uint64_t BlockBasedTable::GetApproximateDataSize() {
3233   // Should be in table properties unless super old version
3234   if (rep_->table_properties) {
3235     return rep_->table_properties->data_size;
3236   }
3237   // Fall back to rough estimate from footer
3238   return rep_->footer.metaindex_handle().offset();
3239 }
3240 
ApproximateOffsetOf(const Slice & key,TableReaderCaller caller)3241 uint64_t BlockBasedTable::ApproximateOffsetOf(const Slice& key,
3242                                               TableReaderCaller caller) {
3243   uint64_t data_size = GetApproximateDataSize();
3244   if (UNLIKELY(data_size == 0)) {
3245     // Hmm. Let's just split in half to avoid skewing one way or another,
3246     // since we don't know whether we're operating on lower bound or
3247     // upper bound.
3248     return rep_->file_size / 2;
3249   }
3250 
3251   BlockCacheLookupContext context(caller);
3252   IndexBlockIter iiter_on_stack;
3253   ReadOptions ro;
3254   ro.total_order_seek = true;
3255   auto index_iter =
3256       NewIndexIterator(ro, /*disable_prefix_seek=*/true,
3257                        /*input_iter=*/&iiter_on_stack, /*get_context=*/nullptr,
3258                        /*lookup_context=*/&context);
3259   std::unique_ptr<InternalIteratorBase<IndexValue>> iiter_unique_ptr;
3260   if (index_iter != &iiter_on_stack) {
3261     iiter_unique_ptr.reset(index_iter);
3262   }
3263 
3264   index_iter->Seek(key);
3265 
3266   uint64_t offset = ApproximateDataOffsetOf(*index_iter, data_size);
3267   // Pro-rate file metadata (incl filters) size-proportionally across data
3268   // blocks.
3269   double size_ratio =
3270       static_cast<double>(offset) / static_cast<double>(data_size);
3271   return static_cast<uint64_t>(size_ratio *
3272                                static_cast<double>(rep_->file_size));
3273 }
3274 
ApproximateSize(const Slice & start,const Slice & end,TableReaderCaller caller)3275 uint64_t BlockBasedTable::ApproximateSize(const Slice& start, const Slice& end,
3276                                           TableReaderCaller caller) {
3277   assert(rep_->internal_comparator.Compare(start, end) <= 0);
3278 
3279   uint64_t data_size = GetApproximateDataSize();
3280   if (UNLIKELY(data_size == 0)) {
3281     // Hmm. Assume whole file is involved, since we have lower and upper
3282     // bound.
3283     return rep_->file_size;
3284   }
3285 
3286   BlockCacheLookupContext context(caller);
3287   IndexBlockIter iiter_on_stack;
3288   ReadOptions ro;
3289   ro.total_order_seek = true;
3290   auto index_iter =
3291       NewIndexIterator(ro, /*disable_prefix_seek=*/true,
3292                        /*input_iter=*/&iiter_on_stack, /*get_context=*/nullptr,
3293                        /*lookup_context=*/&context);
3294   std::unique_ptr<InternalIteratorBase<IndexValue>> iiter_unique_ptr;
3295   if (index_iter != &iiter_on_stack) {
3296     iiter_unique_ptr.reset(index_iter);
3297   }
3298 
3299   index_iter->Seek(start);
3300   uint64_t start_offset = ApproximateDataOffsetOf(*index_iter, data_size);
3301   index_iter->Seek(end);
3302   uint64_t end_offset = ApproximateDataOffsetOf(*index_iter, data_size);
3303 
3304   assert(end_offset >= start_offset);
3305   // Pro-rate file metadata (incl filters) size-proportionally across data
3306   // blocks.
3307   double size_ratio = static_cast<double>(end_offset - start_offset) /
3308                       static_cast<double>(data_size);
3309   return static_cast<uint64_t>(size_ratio *
3310                                static_cast<double>(rep_->file_size));
3311 }
3312 
TEST_FilterBlockInCache() const3313 bool BlockBasedTable::TEST_FilterBlockInCache() const {
3314   assert(rep_ != nullptr);
3315   return TEST_BlockInCache(rep_->filter_handle);
3316 }
3317 
TEST_IndexBlockInCache() const3318 bool BlockBasedTable::TEST_IndexBlockInCache() const {
3319   assert(rep_ != nullptr);
3320 
3321   return TEST_BlockInCache(rep_->footer.index_handle());
3322 }
3323 
GetKVPairsFromDataBlocks(std::vector<KVPairBlock> * kv_pair_blocks)3324 Status BlockBasedTable::GetKVPairsFromDataBlocks(
3325     std::vector<KVPairBlock>* kv_pair_blocks) {
3326   std::unique_ptr<InternalIteratorBase<IndexValue>> blockhandles_iter(
3327       NewIndexIterator(ReadOptions(), /*need_upper_bound_check=*/false,
3328                        /*input_iter=*/nullptr, /*get_context=*/nullptr,
3329                        /*lookup_contex=*/nullptr));
3330 
3331   Status s = blockhandles_iter->status();
3332   if (!s.ok()) {
3333     // Cannot read Index Block
3334     return s;
3335   }
3336 
3337   for (blockhandles_iter->SeekToFirst(); blockhandles_iter->Valid();
3338        blockhandles_iter->Next()) {
3339     s = blockhandles_iter->status();
3340 
3341     if (!s.ok()) {
3342       break;
3343     }
3344 
3345     std::unique_ptr<InternalIterator> datablock_iter;
3346     datablock_iter.reset(NewDataBlockIterator<DataBlockIter>(
3347         ReadOptions(), blockhandles_iter->value().handle,
3348         /*input_iter=*/nullptr, /*type=*/BlockType::kData,
3349         /*get_context=*/nullptr, /*lookup_context=*/nullptr, Status(),
3350         /*prefetch_buffer=*/nullptr));
3351     s = datablock_iter->status();
3352 
3353     if (!s.ok()) {
3354       // Error reading the block - Skipped
3355       continue;
3356     }
3357 
3358     KVPairBlock kv_pair_block;
3359     for (datablock_iter->SeekToFirst(); datablock_iter->Valid();
3360          datablock_iter->Next()) {
3361       s = datablock_iter->status();
3362       if (!s.ok()) {
3363         // Error reading the block - Skipped
3364         break;
3365       }
3366       const Slice& key = datablock_iter->key();
3367       const Slice& value = datablock_iter->value();
3368       std::string key_copy = std::string(key.data(), key.size());
3369       std::string value_copy = std::string(value.data(), value.size());
3370 
3371       kv_pair_block.push_back(
3372           std::make_pair(std::move(key_copy), std::move(value_copy)));
3373     }
3374     kv_pair_blocks->push_back(std::move(kv_pair_block));
3375   }
3376   return Status::OK();
3377 }
3378 
DumpTable(WritableFile * out_file)3379 Status BlockBasedTable::DumpTable(WritableFile* out_file) {
3380   WritableFileStringStreamAdapter out_file_wrapper(out_file);
3381   std::ostream out_stream(&out_file_wrapper);
3382   // Output Footer
3383   out_stream << "Footer Details:\n"
3384                 "--------------------------------------\n";
3385   out_stream << "  " << rep_->footer.ToString() << "\n";
3386 
3387   // Output MetaIndex
3388   out_stream << "Metaindex Details:\n"
3389                 "--------------------------------------\n";
3390   std::unique_ptr<Block> metaindex;
3391   std::unique_ptr<InternalIterator> metaindex_iter;
3392   ReadOptions ro;
3393   Status s = ReadMetaIndexBlock(ro, nullptr /* prefetch_buffer */, &metaindex,
3394                                 &metaindex_iter);
3395   if (s.ok()) {
3396     for (metaindex_iter->SeekToFirst(); metaindex_iter->Valid();
3397          metaindex_iter->Next()) {
3398       s = metaindex_iter->status();
3399       if (!s.ok()) {
3400         return s;
3401       }
3402       if (metaindex_iter->key() == kPropertiesBlock) {
3403         out_stream << "  Properties block handle: "
3404                    << metaindex_iter->value().ToString(true) << "\n";
3405       } else if (metaindex_iter->key() == kCompressionDictBlock) {
3406         out_stream << "  Compression dictionary block handle: "
3407                    << metaindex_iter->value().ToString(true) << "\n";
3408       } else if (strstr(metaindex_iter->key().ToString().c_str(),
3409                         "filter.rocksdb.") != nullptr) {
3410         out_stream << "  Filter block handle: "
3411                    << metaindex_iter->value().ToString(true) << "\n";
3412       } else if (metaindex_iter->key() == kRangeDelBlock) {
3413         out_stream << "  Range deletion block handle: "
3414                    << metaindex_iter->value().ToString(true) << "\n";
3415       }
3416     }
3417     out_stream << "\n";
3418   } else {
3419     return s;
3420   }
3421 
3422   // Output TableProperties
3423   const ROCKSDB_NAMESPACE::TableProperties* table_properties;
3424   table_properties = rep_->table_properties.get();
3425 
3426   if (table_properties != nullptr) {
3427     out_stream << "Table Properties:\n"
3428                   "--------------------------------------\n";
3429     out_stream << "  " << table_properties->ToString("\n  ", ": ") << "\n";
3430   }
3431 
3432   if (rep_->filter) {
3433     out_stream << "Filter Details:\n"
3434                   "--------------------------------------\n";
3435     out_stream << "  " << rep_->filter->ToString() << "\n";
3436   }
3437 
3438   // Output Index block
3439   s = DumpIndexBlock(out_stream);
3440   if (!s.ok()) {
3441     return s;
3442   }
3443 
3444   // Output compression dictionary
3445   if (rep_->uncompression_dict_reader) {
3446     CachableEntry<UncompressionDict> uncompression_dict;
3447     s = rep_->uncompression_dict_reader->GetOrReadUncompressionDictionary(
3448         nullptr /* prefetch_buffer */, false /* no_io */,
3449         nullptr /* get_context */, nullptr /* lookup_context */,
3450         &uncompression_dict);
3451     if (!s.ok()) {
3452       return s;
3453     }
3454 
3455     assert(uncompression_dict.GetValue());
3456 
3457     const Slice& raw_dict = uncompression_dict.GetValue()->GetRawDict();
3458     out_stream << "Compression Dictionary:\n"
3459                   "--------------------------------------\n";
3460     out_stream << "  size (bytes): " << raw_dict.size() << "\n\n";
3461     out_stream << "  HEX    " << raw_dict.ToString(true) << "\n\n";
3462   }
3463 
3464   // Output range deletions block
3465   auto* range_del_iter = NewRangeTombstoneIterator(ReadOptions());
3466   if (range_del_iter != nullptr) {
3467     range_del_iter->SeekToFirst();
3468     if (range_del_iter->Valid()) {
3469       out_stream << "Range deletions:\n"
3470                     "--------------------------------------\n";
3471       for (; range_del_iter->Valid(); range_del_iter->Next()) {
3472         DumpKeyValue(range_del_iter->key(), range_del_iter->value(),
3473                      out_stream);
3474       }
3475       out_stream << "\n";
3476     }
3477     delete range_del_iter;
3478   }
3479   // Output Data blocks
3480   s = DumpDataBlocks(out_stream);
3481 
3482   if (!s.ok()) {
3483     return s;
3484   }
3485 
3486   if (!out_stream.good()) {
3487     return Status::IOError("Failed to write to output file");
3488   }
3489   return Status::OK();
3490 }
3491 
DumpIndexBlock(std::ostream & out_stream)3492 Status BlockBasedTable::DumpIndexBlock(std::ostream& out_stream) {
3493   out_stream << "Index Details:\n"
3494                 "--------------------------------------\n";
3495   std::unique_ptr<InternalIteratorBase<IndexValue>> blockhandles_iter(
3496       NewIndexIterator(ReadOptions(), /*need_upper_bound_check=*/false,
3497                        /*input_iter=*/nullptr, /*get_context=*/nullptr,
3498                        /*lookup_contex=*/nullptr));
3499   Status s = blockhandles_iter->status();
3500   if (!s.ok()) {
3501     out_stream << "Can not read Index Block \n\n";
3502     return s;
3503   }
3504 
3505   out_stream << "  Block key hex dump: Data block handle\n";
3506   out_stream << "  Block key ascii\n\n";
3507   for (blockhandles_iter->SeekToFirst(); blockhandles_iter->Valid();
3508        blockhandles_iter->Next()) {
3509     s = blockhandles_iter->status();
3510     if (!s.ok()) {
3511       break;
3512     }
3513     Slice key = blockhandles_iter->key();
3514     Slice user_key;
3515     InternalKey ikey;
3516     if (!rep_->index_key_includes_seq) {
3517       user_key = key;
3518     } else {
3519       ikey.DecodeFrom(key);
3520       user_key = ikey.user_key();
3521     }
3522 
3523     out_stream << "  HEX    " << user_key.ToString(true) << ": "
3524                << blockhandles_iter->value().ToString(true,
3525                                                       rep_->index_has_first_key)
3526                << "\n";
3527 
3528     std::string str_key = user_key.ToString();
3529     std::string res_key("");
3530     char cspace = ' ';
3531     for (size_t i = 0; i < str_key.size(); i++) {
3532       res_key.append(&str_key[i], 1);
3533       res_key.append(1, cspace);
3534     }
3535     out_stream << "  ASCII  " << res_key << "\n";
3536     out_stream << "  ------\n";
3537   }
3538   out_stream << "\n";
3539   return Status::OK();
3540 }
3541 
DumpDataBlocks(std::ostream & out_stream)3542 Status BlockBasedTable::DumpDataBlocks(std::ostream& out_stream) {
3543   std::unique_ptr<InternalIteratorBase<IndexValue>> blockhandles_iter(
3544       NewIndexIterator(ReadOptions(), /*need_upper_bound_check=*/false,
3545                        /*input_iter=*/nullptr, /*get_context=*/nullptr,
3546                        /*lookup_contex=*/nullptr));
3547   Status s = blockhandles_iter->status();
3548   if (!s.ok()) {
3549     out_stream << "Can not read Index Block \n\n";
3550     return s;
3551   }
3552 
3553   uint64_t datablock_size_min = std::numeric_limits<uint64_t>::max();
3554   uint64_t datablock_size_max = 0;
3555   uint64_t datablock_size_sum = 0;
3556 
3557   size_t block_id = 1;
3558   for (blockhandles_iter->SeekToFirst(); blockhandles_iter->Valid();
3559        block_id++, blockhandles_iter->Next()) {
3560     s = blockhandles_iter->status();
3561     if (!s.ok()) {
3562       break;
3563     }
3564 
3565     BlockHandle bh = blockhandles_iter->value().handle;
3566     uint64_t datablock_size = bh.size();
3567     datablock_size_min = std::min(datablock_size_min, datablock_size);
3568     datablock_size_max = std::max(datablock_size_max, datablock_size);
3569     datablock_size_sum += datablock_size;
3570 
3571     out_stream << "Data Block # " << block_id << " @ "
3572                << blockhandles_iter->value().handle.ToString(true) << "\n";
3573     out_stream << "--------------------------------------\n";
3574 
3575     std::unique_ptr<InternalIterator> datablock_iter;
3576     datablock_iter.reset(NewDataBlockIterator<DataBlockIter>(
3577         ReadOptions(), blockhandles_iter->value().handle,
3578         /*input_iter=*/nullptr, /*type=*/BlockType::kData,
3579         /*get_context=*/nullptr, /*lookup_context=*/nullptr, Status(),
3580         /*prefetch_buffer=*/nullptr));
3581     s = datablock_iter->status();
3582 
3583     if (!s.ok()) {
3584       out_stream << "Error reading the block - Skipped \n\n";
3585       continue;
3586     }
3587 
3588     for (datablock_iter->SeekToFirst(); datablock_iter->Valid();
3589          datablock_iter->Next()) {
3590       s = datablock_iter->status();
3591       if (!s.ok()) {
3592         out_stream << "Error reading the block - Skipped \n";
3593         break;
3594       }
3595       DumpKeyValue(datablock_iter->key(), datablock_iter->value(), out_stream);
3596     }
3597     out_stream << "\n";
3598   }
3599 
3600   uint64_t num_datablocks = block_id - 1;
3601   if (num_datablocks) {
3602     double datablock_size_avg =
3603         static_cast<double>(datablock_size_sum) / num_datablocks;
3604     out_stream << "Data Block Summary:\n";
3605     out_stream << "--------------------------------------\n";
3606     out_stream << "  # data blocks: " << num_datablocks << "\n";
3607     out_stream << "  min data block size: " << datablock_size_min << "\n";
3608     out_stream << "  max data block size: " << datablock_size_max << "\n";
3609     out_stream << "  avg data block size: " << ToString(datablock_size_avg)
3610                << "\n";
3611   }
3612 
3613   return Status::OK();
3614 }
3615 
DumpKeyValue(const Slice & key,const Slice & value,std::ostream & out_stream)3616 void BlockBasedTable::DumpKeyValue(const Slice& key, const Slice& value,
3617                                    std::ostream& out_stream) {
3618   InternalKey ikey;
3619   ikey.DecodeFrom(key);
3620 
3621   out_stream << "  HEX    " << ikey.user_key().ToString(true) << ": "
3622              << value.ToString(true) << "\n";
3623 
3624   std::string str_key = ikey.user_key().ToString();
3625   std::string str_value = value.ToString();
3626   std::string res_key(""), res_value("");
3627   char cspace = ' ';
3628   for (size_t i = 0; i < str_key.size(); i++) {
3629     if (str_key[i] == '\0') {
3630       res_key.append("\\0", 2);
3631     } else {
3632       res_key.append(&str_key[i], 1);
3633     }
3634     res_key.append(1, cspace);
3635   }
3636   for (size_t i = 0; i < str_value.size(); i++) {
3637     if (str_value[i] == '\0') {
3638       res_value.append("\\0", 2);
3639     } else {
3640       res_value.append(&str_value[i], 1);
3641     }
3642     res_value.append(1, cspace);
3643   }
3644 
3645   out_stream << "  ASCII  " << res_key << ": " << res_value << "\n";
3646   out_stream << "  ------\n";
3647 }
3648 
3649 }  // namespace ROCKSDB_NAMESPACE
3650