1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "db/table_cache.h"
11 
12 #include "db/dbformat.h"
13 #include "db/range_tombstone_fragmenter.h"
14 #include "db/snapshot_impl.h"
15 #include "db/version_edit.h"
16 #include "file/filename.h"
17 #include "file/random_access_file_reader.h"
18 #include "monitoring/perf_context_imp.h"
19 #include "rocksdb/statistics.h"
20 #include "table/block_based/block_based_table_reader.h"
21 #include "table/get_context.h"
22 #include "table/internal_iterator.h"
23 #include "table/iterator_wrapper.h"
24 #include "table/multiget_context.h"
25 #include "table/table_builder.h"
26 #include "table/table_reader.h"
27 #include "test_util/sync_point.h"
28 #include "util/cast_util.h"
29 #include "util/coding.h"
30 #include "util/stop_watch.h"
31 
32 namespace ROCKSDB_NAMESPACE {
33 
34 namespace {
35 
36 template <class T>
DeleteEntry(const Slice &,void * value)37 static void DeleteEntry(const Slice& /*key*/, void* value) {
38   T* typed_value = reinterpret_cast<T*>(value);
39   delete typed_value;
40 }
41 
UnrefEntry(void * arg1,void * arg2)42 static void UnrefEntry(void* arg1, void* arg2) {
43   Cache* cache = reinterpret_cast<Cache*>(arg1);
44   Cache::Handle* h = reinterpret_cast<Cache::Handle*>(arg2);
45   cache->Release(h);
46 }
47 
GetSliceForFileNumber(const uint64_t * file_number)48 static Slice GetSliceForFileNumber(const uint64_t* file_number) {
49   return Slice(reinterpret_cast<const char*>(file_number),
50                sizeof(*file_number));
51 }
52 
53 #ifndef ROCKSDB_LITE
54 
AppendVarint64(IterKey * key,uint64_t v)55 void AppendVarint64(IterKey* key, uint64_t v) {
56   char buf[10];
57   auto ptr = EncodeVarint64(buf, v);
58   key->TrimAppend(key->Size(), buf, ptr - buf);
59 }
60 
61 #endif  // ROCKSDB_LITE
62 
63 }  // namespace
64 
TableCache(const ImmutableCFOptions & ioptions,const FileOptions & file_options,Cache * const cache,BlockCacheTracer * const block_cache_tracer)65 TableCache::TableCache(const ImmutableCFOptions& ioptions,
66                        const FileOptions& file_options, Cache* const cache,
67                        BlockCacheTracer* const block_cache_tracer)
68     : ioptions_(ioptions),
69       file_options_(file_options),
70       cache_(cache),
71       immortal_tables_(false),
72       block_cache_tracer_(block_cache_tracer) {
73   if (ioptions_.row_cache) {
74     // If the same cache is shared by multiple instances, we need to
75     // disambiguate its entries.
76     PutVarint64(&row_cache_id_, ioptions_.row_cache->NewId());
77   }
78 }
79 
~TableCache()80 TableCache::~TableCache() {
81 }
82 
GetTableReaderFromHandle(Cache::Handle * handle)83 TableReader* TableCache::GetTableReaderFromHandle(Cache::Handle* handle) {
84   return reinterpret_cast<TableReader*>(cache_->Value(handle));
85 }
86 
ReleaseHandle(Cache::Handle * handle)87 void TableCache::ReleaseHandle(Cache::Handle* handle) {
88   cache_->Release(handle);
89 }
90 
GetTableReader(const FileOptions & file_options,const InternalKeyComparator & internal_comparator,const FileDescriptor & fd,bool sequential_mode,bool record_read_stats,HistogramImpl * file_read_hist,std::unique_ptr<TableReader> * table_reader,const SliceTransform * prefix_extractor,bool skip_filters,int level,bool prefetch_index_and_filter_in_cache)91 Status TableCache::GetTableReader(
92     const FileOptions& file_options,
93     const InternalKeyComparator& internal_comparator, const FileDescriptor& fd,
94     bool sequential_mode, bool record_read_stats, HistogramImpl* file_read_hist,
95     std::unique_ptr<TableReader>* table_reader,
96     const SliceTransform* prefix_extractor, bool skip_filters, int level,
97     bool prefetch_index_and_filter_in_cache) {
98   std::string fname =
99       TableFileName(ioptions_.cf_paths, fd.GetNumber(), fd.GetPathId());
100   std::unique_ptr<FSRandomAccessFile> file;
101   Status s = ioptions_.fs->NewRandomAccessFile(fname, file_options, &file,
102                                                nullptr);
103   RecordTick(ioptions_.statistics, NO_FILE_OPENS);
104   if (s.IsPathNotFound()) {
105     fname = Rocks2LevelTableFileName(fname);
106     s = ioptions_.fs->NewRandomAccessFile(fname, file_options, &file, nullptr);
107     RecordTick(ioptions_.statistics, NO_FILE_OPENS);
108   }
109 
110   if (s.ok()) {
111     if (!sequential_mode && ioptions_.advise_random_on_open) {
112       file->Hint(FSRandomAccessFile::kRandom);
113     }
114     StopWatch sw(ioptions_.env, ioptions_.statistics, TABLE_OPEN_IO_MICROS);
115     std::unique_ptr<RandomAccessFileReader> file_reader(
116         new RandomAccessFileReader(
117             std::move(file), fname, ioptions_.env,
118             record_read_stats ? ioptions_.statistics : nullptr, SST_READ_MICROS,
119             file_read_hist, ioptions_.rate_limiter, ioptions_.listeners));
120     s = ioptions_.table_factory->NewTableReader(
121         TableReaderOptions(ioptions_, prefix_extractor, file_options,
122                            internal_comparator, skip_filters, immortal_tables_,
123                            level, fd.largest_seqno, block_cache_tracer_),
124         std::move(file_reader), fd.GetFileSize(), table_reader,
125         prefetch_index_and_filter_in_cache);
126     TEST_SYNC_POINT("TableCache::GetTableReader:0");
127   }
128   return s;
129 }
130 
EraseHandle(const FileDescriptor & fd,Cache::Handle * handle)131 void TableCache::EraseHandle(const FileDescriptor& fd, Cache::Handle* handle) {
132   ReleaseHandle(handle);
133   uint64_t number = fd.GetNumber();
134   Slice key = GetSliceForFileNumber(&number);
135   cache_->Erase(key);
136 }
137 
FindTable(const FileOptions & file_options,const InternalKeyComparator & internal_comparator,const FileDescriptor & fd,Cache::Handle ** handle,const SliceTransform * prefix_extractor,const bool no_io,bool record_read_stats,HistogramImpl * file_read_hist,bool skip_filters,int level,bool prefetch_index_and_filter_in_cache)138 Status TableCache::FindTable(const FileOptions& file_options,
139                              const InternalKeyComparator& internal_comparator,
140                              const FileDescriptor& fd, Cache::Handle** handle,
141                              const SliceTransform* prefix_extractor,
142                              const bool no_io, bool record_read_stats,
143                              HistogramImpl* file_read_hist, bool skip_filters,
144                              int level,
145                              bool prefetch_index_and_filter_in_cache) {
146   PERF_TIMER_GUARD_WITH_ENV(find_table_nanos, ioptions_.env);
147   Status s;
148   uint64_t number = fd.GetNumber();
149   Slice key = GetSliceForFileNumber(&number);
150   *handle = cache_->Lookup(key);
151   TEST_SYNC_POINT_CALLBACK("TableCache::FindTable:0",
152                            const_cast<bool*>(&no_io));
153 
154   if (*handle == nullptr) {
155     if (no_io) {  // Don't do IO and return a not-found status
156       return Status::Incomplete("Table not found in table_cache, no_io is set");
157     }
158     std::unique_ptr<TableReader> table_reader;
159     s = GetTableReader(file_options, internal_comparator, fd,
160                        false /* sequential mode */, record_read_stats,
161                        file_read_hist, &table_reader, prefix_extractor,
162                        skip_filters, level, prefetch_index_and_filter_in_cache);
163     if (!s.ok()) {
164       assert(table_reader == nullptr);
165       RecordTick(ioptions_.statistics, NO_FILE_ERRORS);
166       // We do not cache error results so that if the error is transient,
167       // or somebody repairs the file, we recover automatically.
168     } else {
169       s = cache_->Insert(key, table_reader.get(), 1, &DeleteEntry<TableReader>,
170                          handle);
171       if (s.ok()) {
172         // Release ownership of table reader.
173         table_reader.release();
174       }
175     }
176   }
177   return s;
178 }
179 
NewIterator(const ReadOptions & options,const FileOptions & file_options,const InternalKeyComparator & icomparator,const FileMetaData & file_meta,RangeDelAggregator * range_del_agg,const SliceTransform * prefix_extractor,TableReader ** table_reader_ptr,HistogramImpl * file_read_hist,TableReaderCaller caller,Arena * arena,bool skip_filters,int level,const InternalKey * smallest_compaction_key,const InternalKey * largest_compaction_key)180 InternalIterator* TableCache::NewIterator(
181     const ReadOptions& options, const FileOptions& file_options,
182     const InternalKeyComparator& icomparator, const FileMetaData& file_meta,
183     RangeDelAggregator* range_del_agg, const SliceTransform* prefix_extractor,
184     TableReader** table_reader_ptr, HistogramImpl* file_read_hist,
185     TableReaderCaller caller, Arena* arena, bool skip_filters, int level,
186     const InternalKey* smallest_compaction_key,
187     const InternalKey* largest_compaction_key) {
188   PERF_TIMER_GUARD(new_table_iterator_nanos);
189 
190   Status s;
191   TableReader* table_reader = nullptr;
192   Cache::Handle* handle = nullptr;
193   if (table_reader_ptr != nullptr) {
194     *table_reader_ptr = nullptr;
195   }
196   bool for_compaction = caller == TableReaderCaller::kCompaction;
197   auto& fd = file_meta.fd;
198   table_reader = fd.table_reader;
199   if (table_reader == nullptr) {
200     s = FindTable(file_options, icomparator, fd, &handle, prefix_extractor,
201                   options.read_tier == kBlockCacheTier /* no_io */,
202                   !for_compaction /* record_read_stats */, file_read_hist,
203                   skip_filters, level);
204     if (s.ok()) {
205       table_reader = GetTableReaderFromHandle(handle);
206     }
207   }
208   InternalIterator* result = nullptr;
209   if (s.ok()) {
210     if (options.table_filter &&
211         !options.table_filter(*table_reader->GetTableProperties())) {
212       result = NewEmptyInternalIterator<Slice>(arena);
213     } else {
214       result = table_reader->NewIterator(options, prefix_extractor, arena,
215                                    skip_filters, caller,
216                                    file_options.compaction_readahead_size);
217     }
218     if (handle != nullptr) {
219       result->RegisterCleanup(&UnrefEntry, cache_, handle);
220       handle = nullptr;  // prevent from releasing below
221     }
222 
223     if (for_compaction) {
224       table_reader->SetupForCompaction();
225     }
226     if (table_reader_ptr != nullptr) {
227       *table_reader_ptr = table_reader;
228     }
229   }
230   if (s.ok() && range_del_agg != nullptr && !options.ignore_range_deletions) {
231     if (range_del_agg->AddFile(fd.GetNumber())) {
232       std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
233           static_cast<FragmentedRangeTombstoneIterator*>(
234               table_reader->NewRangeTombstoneIterator(options)));
235       if (range_del_iter != nullptr) {
236         s = range_del_iter->status();
237       }
238       if (s.ok()) {
239         const InternalKey* smallest = &file_meta.smallest;
240         const InternalKey* largest = &file_meta.largest;
241         if (smallest_compaction_key != nullptr) {
242           smallest = smallest_compaction_key;
243         }
244         if (largest_compaction_key != nullptr) {
245           largest = largest_compaction_key;
246         }
247         range_del_agg->AddTombstones(std::move(range_del_iter), smallest,
248                                      largest);
249       }
250     }
251   }
252 
253   if (handle != nullptr) {
254     ReleaseHandle(handle);
255   }
256   if (!s.ok()) {
257     assert(result == nullptr);
258     result = NewErrorInternalIterator<Slice>(s, arena);
259   }
260   return result;
261 }
262 
GetRangeTombstoneIterator(const ReadOptions & options,const InternalKeyComparator & internal_comparator,const FileMetaData & file_meta,std::unique_ptr<FragmentedRangeTombstoneIterator> * out_iter)263 Status TableCache::GetRangeTombstoneIterator(
264     const ReadOptions& options,
265     const InternalKeyComparator& internal_comparator,
266     const FileMetaData& file_meta,
267     std::unique_ptr<FragmentedRangeTombstoneIterator>* out_iter) {
268   const FileDescriptor& fd = file_meta.fd;
269   Status s;
270   TableReader* t = fd.table_reader;
271   Cache::Handle* handle = nullptr;
272   if (t == nullptr) {
273     s = FindTable(file_options_, internal_comparator, fd, &handle);
274     if (s.ok()) {
275       t = GetTableReaderFromHandle(handle);
276     }
277   }
278   if (s.ok()) {
279     out_iter->reset(t->NewRangeTombstoneIterator(options));
280     assert(out_iter);
281   }
282   return s;
283 }
284 
285 #ifndef ROCKSDB_LITE
CreateRowCacheKeyPrefix(const ReadOptions & options,const FileDescriptor & fd,const Slice & internal_key,GetContext * get_context,IterKey & row_cache_key)286 void TableCache::CreateRowCacheKeyPrefix(const ReadOptions& options,
287                                          const FileDescriptor& fd,
288                                          const Slice& internal_key,
289                                          GetContext* get_context,
290                                          IterKey& row_cache_key) {
291   uint64_t fd_number = fd.GetNumber();
292   // We use the user key as cache key instead of the internal key,
293   // otherwise the whole cache would be invalidated every time the
294   // sequence key increases. However, to support caching snapshot
295   // reads, we append the sequence number (incremented by 1 to
296   // distinguish from 0) only in this case.
297   // If the snapshot is larger than the largest seqno in the file,
298   // all data should be exposed to the snapshot, so we treat it
299   // the same as there is no snapshot. The exception is that if
300   // a seq-checking callback is registered, some internal keys
301   // may still be filtered out.
302   uint64_t seq_no = 0;
303   // Maybe we can include the whole file ifsnapshot == fd.largest_seqno.
304   if (options.snapshot != nullptr &&
305       (get_context->has_callback() ||
306        static_cast_with_check<const SnapshotImpl, const Snapshot>(
307            options.snapshot)
308                ->GetSequenceNumber() <= fd.largest_seqno)) {
309     // We should consider to use options.snapshot->GetSequenceNumber()
310     // instead of GetInternalKeySeqno(k), which will make the code
311     // easier to understand.
312     seq_no = 1 + GetInternalKeySeqno(internal_key);
313   }
314 
315   // Compute row cache key.
316   row_cache_key.TrimAppend(row_cache_key.Size(), row_cache_id_.data(),
317                            row_cache_id_.size());
318   AppendVarint64(&row_cache_key, fd_number);
319   AppendVarint64(&row_cache_key, seq_no);
320 }
321 
GetFromRowCache(const Slice & user_key,IterKey & row_cache_key,size_t prefix_size,GetContext * get_context)322 bool TableCache::GetFromRowCache(const Slice& user_key, IterKey& row_cache_key,
323                                  size_t prefix_size, GetContext* get_context) {
324   bool found = false;
325 
326   row_cache_key.TrimAppend(prefix_size, user_key.data(), user_key.size());
327   if (auto row_handle =
328           ioptions_.row_cache->Lookup(row_cache_key.GetUserKey())) {
329     // Cleanable routine to release the cache entry
330     Cleanable value_pinner;
331     auto release_cache_entry_func = [](void* cache_to_clean,
332                                        void* cache_handle) {
333       ((Cache*)cache_to_clean)->Release((Cache::Handle*)cache_handle);
334     };
335     auto found_row_cache_entry =
336         static_cast<const std::string*>(ioptions_.row_cache->Value(row_handle));
337     // If it comes here value is located on the cache.
338     // found_row_cache_entry points to the value on cache,
339     // and value_pinner has cleanup procedure for the cached entry.
340     // After replayGetContextLog() returns, get_context.pinnable_slice_
341     // will point to cache entry buffer (or a copy based on that) and
342     // cleanup routine under value_pinner will be delegated to
343     // get_context.pinnable_slice_. Cache entry is released when
344     // get_context.pinnable_slice_ is reset.
345     value_pinner.RegisterCleanup(release_cache_entry_func,
346                                  ioptions_.row_cache.get(), row_handle);
347     replayGetContextLog(*found_row_cache_entry, user_key, get_context,
348                         &value_pinner);
349     RecordTick(ioptions_.statistics, ROW_CACHE_HIT);
350     found = true;
351   } else {
352     RecordTick(ioptions_.statistics, ROW_CACHE_MISS);
353   }
354   return found;
355 }
356 #endif  // ROCKSDB_LITE
357 
Get(const ReadOptions & options,const InternalKeyComparator & internal_comparator,const FileMetaData & file_meta,const Slice & k,GetContext * get_context,const SliceTransform * prefix_extractor,HistogramImpl * file_read_hist,bool skip_filters,int level)358 Status TableCache::Get(const ReadOptions& options,
359                        const InternalKeyComparator& internal_comparator,
360                        const FileMetaData& file_meta, const Slice& k,
361                        GetContext* get_context,
362                        const SliceTransform* prefix_extractor,
363                        HistogramImpl* file_read_hist, bool skip_filters,
364                        int level) {
365   auto& fd = file_meta.fd;
366   std::string* row_cache_entry = nullptr;
367   bool done = false;
368 #ifndef ROCKSDB_LITE
369   IterKey row_cache_key;
370   std::string row_cache_entry_buffer;
371 
372   // Check row cache if enabled. Since row cache does not currently store
373   // sequence numbers, we cannot use it if we need to fetch the sequence.
374   if (ioptions_.row_cache && !get_context->NeedToReadSequence()) {
375     auto user_key = ExtractUserKey(k);
376     CreateRowCacheKeyPrefix(options, fd, k, get_context, row_cache_key);
377     done = GetFromRowCache(user_key, row_cache_key, row_cache_key.Size(),
378                            get_context);
379     if (!done) {
380       row_cache_entry = &row_cache_entry_buffer;
381     }
382   }
383 #endif  // ROCKSDB_LITE
384   Status s;
385   TableReader* t = fd.table_reader;
386   Cache::Handle* handle = nullptr;
387   if (!done && s.ok()) {
388     if (t == nullptr) {
389       s = FindTable(
390           file_options_, internal_comparator, fd, &handle, prefix_extractor,
391           options.read_tier == kBlockCacheTier /* no_io */,
392           true /* record_read_stats */, file_read_hist, skip_filters, level);
393       if (s.ok()) {
394         t = GetTableReaderFromHandle(handle);
395       }
396     }
397     SequenceNumber* max_covering_tombstone_seq =
398         get_context->max_covering_tombstone_seq();
399     if (s.ok() && max_covering_tombstone_seq != nullptr &&
400         !options.ignore_range_deletions) {
401       std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
402           t->NewRangeTombstoneIterator(options));
403       if (range_del_iter != nullptr) {
404         *max_covering_tombstone_seq = std::max(
405             *max_covering_tombstone_seq,
406             range_del_iter->MaxCoveringTombstoneSeqnum(ExtractUserKey(k)));
407       }
408     }
409     if (s.ok()) {
410       get_context->SetReplayLog(row_cache_entry);  // nullptr if no cache.
411       s = t->Get(options, k, get_context, prefix_extractor, skip_filters);
412       get_context->SetReplayLog(nullptr);
413     } else if (options.read_tier == kBlockCacheTier && s.IsIncomplete()) {
414       // Couldn't find Table in cache but treat as kFound if no_io set
415       get_context->MarkKeyMayExist();
416       s = Status::OK();
417       done = true;
418     }
419   }
420 
421 #ifndef ROCKSDB_LITE
422   // Put the replay log in row cache only if something was found.
423   if (!done && s.ok() && row_cache_entry && !row_cache_entry->empty()) {
424     size_t charge =
425         row_cache_key.Size() + row_cache_entry->size() + sizeof(std::string);
426     void* row_ptr = new std::string(std::move(*row_cache_entry));
427     ioptions_.row_cache->Insert(row_cache_key.GetUserKey(), row_ptr, charge,
428                                 &DeleteEntry<std::string>);
429   }
430 #endif  // ROCKSDB_LITE
431 
432   if (handle != nullptr) {
433     ReleaseHandle(handle);
434   }
435   return s;
436 }
437 
438 // Batched version of TableCache::MultiGet.
MultiGet(const ReadOptions & options,const InternalKeyComparator & internal_comparator,const FileMetaData & file_meta,const MultiGetContext::Range * mget_range,const SliceTransform * prefix_extractor,HistogramImpl * file_read_hist,bool skip_filters,int level)439 Status TableCache::MultiGet(const ReadOptions& options,
440                             const InternalKeyComparator& internal_comparator,
441                             const FileMetaData& file_meta,
442                             const MultiGetContext::Range* mget_range,
443                             const SliceTransform* prefix_extractor,
444                             HistogramImpl* file_read_hist, bool skip_filters,
445                             int level) {
446   auto& fd = file_meta.fd;
447   Status s;
448   TableReader* t = fd.table_reader;
449   Cache::Handle* handle = nullptr;
450   MultiGetRange table_range(*mget_range, mget_range->begin(),
451                             mget_range->end());
452 #ifndef ROCKSDB_LITE
453   autovector<std::string, MultiGetContext::MAX_BATCH_SIZE> row_cache_entries;
454   IterKey row_cache_key;
455   size_t row_cache_key_prefix_size = 0;
456   KeyContext& first_key = *table_range.begin();
457   bool lookup_row_cache =
458       ioptions_.row_cache && !first_key.get_context->NeedToReadSequence();
459 
460   // Check row cache if enabled. Since row cache does not currently store
461   // sequence numbers, we cannot use it if we need to fetch the sequence.
462   if (lookup_row_cache) {
463     GetContext* first_context = first_key.get_context;
464     CreateRowCacheKeyPrefix(options, fd, first_key.ikey, first_context,
465                             row_cache_key);
466     row_cache_key_prefix_size = row_cache_key.Size();
467 
468     for (auto miter = table_range.begin(); miter != table_range.end();
469          ++miter) {
470       const Slice& user_key = miter->ukey;
471       ;
472       GetContext* get_context = miter->get_context;
473 
474       if (GetFromRowCache(user_key, row_cache_key, row_cache_key_prefix_size,
475                           get_context)) {
476         table_range.SkipKey(miter);
477       } else {
478         row_cache_entries.emplace_back();
479         get_context->SetReplayLog(&(row_cache_entries.back()));
480       }
481     }
482   }
483 #endif  // ROCKSDB_LITE
484 
485   // Check that table_range is not empty. Its possible all keys may have been
486   // found in the row cache and thus the range may now be empty
487   if (s.ok() && !table_range.empty()) {
488     if (t == nullptr) {
489       s = FindTable(
490           file_options_, internal_comparator, fd, &handle, prefix_extractor,
491           options.read_tier == kBlockCacheTier /* no_io */,
492           true /* record_read_stats */, file_read_hist, skip_filters, level);
493       TEST_SYNC_POINT_CALLBACK("TableCache::MultiGet:FindTable", &s);
494       if (s.ok()) {
495         t = GetTableReaderFromHandle(handle);
496         assert(t);
497       }
498     }
499     if (s.ok() && !options.ignore_range_deletions) {
500       std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
501           t->NewRangeTombstoneIterator(options));
502       if (range_del_iter != nullptr) {
503         for (auto iter = table_range.begin(); iter != table_range.end();
504              ++iter) {
505           SequenceNumber* max_covering_tombstone_seq =
506               iter->get_context->max_covering_tombstone_seq();
507           *max_covering_tombstone_seq =
508               std::max(*max_covering_tombstone_seq,
509                        range_del_iter->MaxCoveringTombstoneSeqnum(iter->ukey));
510         }
511       }
512     }
513     if (s.ok()) {
514       t->MultiGet(options, &table_range, prefix_extractor, skip_filters);
515     } else if (options.read_tier == kBlockCacheTier && s.IsIncomplete()) {
516       for (auto iter = table_range.begin(); iter != table_range.end(); ++iter) {
517         Status* status = iter->s;
518         if (status->IsIncomplete()) {
519           // Couldn't find Table in cache but treat as kFound if no_io set
520           iter->get_context->MarkKeyMayExist();
521           s = Status::OK();
522         }
523       }
524     }
525   }
526 
527 #ifndef ROCKSDB_LITE
528   if (lookup_row_cache) {
529     size_t row_idx = 0;
530 
531     for (auto miter = table_range.begin(); miter != table_range.end();
532          ++miter) {
533       std::string& row_cache_entry = row_cache_entries[row_idx++];
534       const Slice& user_key = miter->ukey;
535       ;
536       GetContext* get_context = miter->get_context;
537 
538       get_context->SetReplayLog(nullptr);
539       // Compute row cache key.
540       row_cache_key.TrimAppend(row_cache_key_prefix_size, user_key.data(),
541                                user_key.size());
542       // Put the replay log in row cache only if something was found.
543       if (s.ok() && !row_cache_entry.empty()) {
544         size_t charge =
545             row_cache_key.Size() + row_cache_entry.size() + sizeof(std::string);
546         void* row_ptr = new std::string(std::move(row_cache_entry));
547         ioptions_.row_cache->Insert(row_cache_key.GetUserKey(), row_ptr, charge,
548                                     &DeleteEntry<std::string>);
549       }
550     }
551   }
552 #endif  // ROCKSDB_LITE
553 
554   if (handle != nullptr) {
555     ReleaseHandle(handle);
556   }
557   return s;
558 }
559 
GetTableProperties(const FileOptions & file_options,const InternalKeyComparator & internal_comparator,const FileDescriptor & fd,std::shared_ptr<const TableProperties> * properties,const SliceTransform * prefix_extractor,bool no_io)560 Status TableCache::GetTableProperties(
561     const FileOptions& file_options,
562     const InternalKeyComparator& internal_comparator, const FileDescriptor& fd,
563     std::shared_ptr<const TableProperties>* properties,
564     const SliceTransform* prefix_extractor, bool no_io) {
565   Status s;
566   auto table_reader = fd.table_reader;
567   // table already been pre-loaded?
568   if (table_reader) {
569     *properties = table_reader->GetTableProperties();
570 
571     return s;
572   }
573 
574   Cache::Handle* table_handle = nullptr;
575   s = FindTable(file_options, internal_comparator, fd, &table_handle,
576                 prefix_extractor, no_io);
577   if (!s.ok()) {
578     return s;
579   }
580   assert(table_handle);
581   auto table = GetTableReaderFromHandle(table_handle);
582   *properties = table->GetTableProperties();
583   ReleaseHandle(table_handle);
584   return s;
585 }
586 
GetMemoryUsageByTableReader(const FileOptions & file_options,const InternalKeyComparator & internal_comparator,const FileDescriptor & fd,const SliceTransform * prefix_extractor)587 size_t TableCache::GetMemoryUsageByTableReader(
588     const FileOptions& file_options,
589     const InternalKeyComparator& internal_comparator, const FileDescriptor& fd,
590     const SliceTransform* prefix_extractor) {
591   Status s;
592   auto table_reader = fd.table_reader;
593   // table already been pre-loaded?
594   if (table_reader) {
595     return table_reader->ApproximateMemoryUsage();
596   }
597 
598   Cache::Handle* table_handle = nullptr;
599   s = FindTable(file_options, internal_comparator, fd, &table_handle,
600                 prefix_extractor, true);
601   if (!s.ok()) {
602     return 0;
603   }
604   assert(table_handle);
605   auto table = GetTableReaderFromHandle(table_handle);
606   auto ret = table->ApproximateMemoryUsage();
607   ReleaseHandle(table_handle);
608   return ret;
609 }
610 
Evict(Cache * cache,uint64_t file_number)611 void TableCache::Evict(Cache* cache, uint64_t file_number) {
612   cache->Erase(GetSliceForFileNumber(&file_number));
613 }
614 
ApproximateOffsetOf(const Slice & key,const FileDescriptor & fd,TableReaderCaller caller,const InternalKeyComparator & internal_comparator,const SliceTransform * prefix_extractor)615 uint64_t TableCache::ApproximateOffsetOf(
616     const Slice& key, const FileDescriptor& fd, TableReaderCaller caller,
617     const InternalKeyComparator& internal_comparator,
618     const SliceTransform* prefix_extractor) {
619   uint64_t result = 0;
620   TableReader* table_reader = fd.table_reader;
621   Cache::Handle* table_handle = nullptr;
622   if (table_reader == nullptr) {
623     const bool for_compaction = (caller == TableReaderCaller::kCompaction);
624     Status s = FindTable(file_options_, internal_comparator, fd, &table_handle,
625                          prefix_extractor, false /* no_io */,
626                          !for_compaction /* record_read_stats */);
627     if (s.ok()) {
628       table_reader = GetTableReaderFromHandle(table_handle);
629     }
630   }
631 
632   if (table_reader != nullptr) {
633     result = table_reader->ApproximateOffsetOf(key, caller);
634   }
635   if (table_handle != nullptr) {
636     ReleaseHandle(table_handle);
637   }
638 
639   return result;
640 }
641 
ApproximateSize(const Slice & start,const Slice & end,const FileDescriptor & fd,TableReaderCaller caller,const InternalKeyComparator & internal_comparator,const SliceTransform * prefix_extractor)642 uint64_t TableCache::ApproximateSize(
643     const Slice& start, const Slice& end, const FileDescriptor& fd,
644     TableReaderCaller caller, const InternalKeyComparator& internal_comparator,
645     const SliceTransform* prefix_extractor) {
646   uint64_t result = 0;
647   TableReader* table_reader = fd.table_reader;
648   Cache::Handle* table_handle = nullptr;
649   if (table_reader == nullptr) {
650     const bool for_compaction = (caller == TableReaderCaller::kCompaction);
651     Status s = FindTable(file_options_, internal_comparator, fd, &table_handle,
652                          prefix_extractor, false /* no_io */,
653                          !for_compaction /* record_read_stats */);
654     if (s.ok()) {
655       table_reader = GetTableReaderFromHandle(table_handle);
656     }
657   }
658 
659   if (table_reader != nullptr) {
660     result = table_reader->ApproximateSize(start, end, caller);
661   }
662   if (table_handle != nullptr) {
663     ReleaseHandle(table_handle);
664   }
665 
666   return result;
667 }
668 }  // namespace ROCKSDB_NAMESPACE
669