1 //  Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #pragma once
7 #ifndef ROCKSDB_LITE
8 
9 #include <unordered_map>
10 
11 #include "file/random_access_file_reader.h"
12 #include "file/writable_file_writer.h"
13 #include "rocksdb/utilities/cache_dump_load.h"
14 #include "table/block_based/block.h"
15 #include "table/block_based/block_like_traits.h"
16 #include "table/block_based/block_type.h"
17 #include "table/block_based/cachable_entry.h"
18 #include "table/block_based/parsed_full_filter_block.h"
19 #include "table/block_based/reader_common.h"
20 
21 namespace ROCKSDB_NAMESPACE {
22 
23 // the read buffer size of for the default CacheDumpReader
24 const unsigned int kDumpReaderBufferSize = 1024;  // 1KB
25 static const unsigned int kSizePrefixLen = 4;
26 
27 enum CacheDumpUnitType : unsigned char {
28   kHeader = 1,
29   kFooter = 2,
30   kData = 3,
31   kFilter = 4,
32   kProperties = 5,
33   kCompressionDictionary = 6,
34   kRangeDeletion = 7,
35   kHashIndexPrefixes = 8,
36   kHashIndexMetadata = 9,
37   kMetaIndex = 10,
38   kIndex = 11,
39   kDeprecatedFilterBlock = 12,
40   kFilterMetaBlock = 13,
41   kBlockTypeMax,
42 };
43 
44 // The metadata of a dump unit. After it is serilized, its size is fixed 16
45 // bytes.
46 struct DumpUnitMeta {
47   // sequence number is a monotonically increasing number to indicate the order
48   // of the blocks being written. Header is 0.
49   uint32_t sequence_num;
50   // The Crc32c checksum of its dump unit.
51   uint32_t dump_unit_checksum;
52   // The dump unit size after the dump unit is serilized to a string.
53   uint64_t dump_unit_size;
54 
resetDumpUnitMeta55   void reset() {
56     sequence_num = 0;
57     dump_unit_checksum = 0;
58     dump_unit_size = 0;
59   }
60 };
61 
62 // The data structure to hold a block and its information.
63 struct DumpUnit {
64   // The timestamp when the block is identified, copied, and dumped from block
65   // cache
66   uint64_t timestamp;
67   // The type of the block
68   CacheDumpUnitType type;
69   // The key of this block when the block is referenced by this Cache
70   Slice key;
71   // The block size
72   size_t value_len;
73   // The Crc32c checksum of the block
74   uint32_t value_checksum;
75   // Pointer to the block. Note that, in the dump process, it points to a memory
76   // buffer copied from cache block. The buffer is freed when we process the
77   // next block. In the load process, we use an std::string to store the
78   // serilized dump_unit read from the reader. So it points to the memory
79   // address of the begin of the block in this string.
80   void* value;
81 
DumpUnitDumpUnit82   DumpUnit() { reset(); }
83 
resetDumpUnit84   void reset() {
85     timestamp = 0;
86     type = CacheDumpUnitType::kBlockTypeMax;
87     key.clear();
88     value_len = 0;
89     value_checksum = 0;
90     value = nullptr;
91   }
92 };
93 
94 // The default implementation of the Cache Dumper
95 class CacheDumperImpl : public CacheDumper {
96  public:
CacheDumperImpl(const CacheDumpOptions & dump_options,const std::shared_ptr<Cache> & cache,std::unique_ptr<CacheDumpWriter> && writer)97   CacheDumperImpl(const CacheDumpOptions& dump_options,
98                   const std::shared_ptr<Cache>& cache,
99                   std::unique_ptr<CacheDumpWriter>&& writer)
100       : options_(dump_options), cache_(cache), writer_(std::move(writer)) {}
~CacheDumperImpl()101   ~CacheDumperImpl() { writer_.reset(); }
102   Status SetDumpFilter(std::vector<DB*> db_list) override;
103   IOStatus DumpCacheEntriesToWriter() override;
104 
105  private:
106   IOStatus WriteRawBlock(uint64_t timestamp, CacheDumpUnitType type,
107                          const Slice& key, void* value, size_t len,
108                          uint32_t checksum);
109 
110   IOStatus WriteHeader();
111 
112   IOStatus WriteCacheBlock(const CacheDumpUnitType type, const Slice& key,
113                            void* value, size_t len);
114   IOStatus WriteFooter();
115   bool ShouldFilterOut(const Slice& key);
116   std::function<void(const Slice&, void*, size_t, Cache::DeleterFn)>
117   DumpOneBlockCallBack();
118 
119   CacheDumpOptions options_;
120   std::shared_ptr<Cache> cache_;
121   std::unique_ptr<CacheDumpWriter> writer_;
122   std::unordered_map<Cache::DeleterFn, CacheEntryRole> role_map_;
123   SystemClock* clock_;
124   uint32_t sequence_num_;
125   // The cache key prefix filter. Currently, we use db_session_id as the prefix,
126   // so using std::set to store the prefixes as filter is enough. Further
127   // improvement can be applied like BloomFilter or others to speedup the
128   // filtering.
129   std::set<std::string> prefix_filter_;
130 };
131 
132 // The default implementation of CacheDumpedLoader
133 class CacheDumpedLoaderImpl : public CacheDumpedLoader {
134  public:
CacheDumpedLoaderImpl(const CacheDumpOptions & dump_options,const BlockBasedTableOptions & toptions,const std::shared_ptr<SecondaryCache> & secondary_cache,std::unique_ptr<CacheDumpReader> && reader)135   CacheDumpedLoaderImpl(const CacheDumpOptions& dump_options,
136                         const BlockBasedTableOptions& toptions,
137                         const std::shared_ptr<SecondaryCache>& secondary_cache,
138                         std::unique_ptr<CacheDumpReader>&& reader)
139       : options_(dump_options),
140         toptions_(toptions),
141         secondary_cache_(secondary_cache),
142         reader_(std::move(reader)) {}
~CacheDumpedLoaderImpl()143   ~CacheDumpedLoaderImpl() {}
144   IOStatus RestoreCacheEntriesToSecondaryCache() override;
145 
146  private:
147   IOStatus ReadDumpUnitMeta(std::string* data, DumpUnitMeta* unit_meta);
148   IOStatus ReadDumpUnit(size_t len, std::string* data, DumpUnit* unit);
149   IOStatus ReadHeader(std::string* data, DumpUnit* dump_unit);
150   IOStatus ReadCacheBlock(std::string* data, DumpUnit* dump_unit);
151 
152   CacheDumpOptions options_;
153   const BlockBasedTableOptions& toptions_;
154   std::shared_ptr<SecondaryCache> secondary_cache_;
155   std::unique_ptr<CacheDumpReader> reader_;
156   std::unordered_map<Cache::DeleterFn, CacheEntryRole> role_map_;
157 };
158 
159 // The default implementation of CacheDumpWriter. We write the blocks to a file
160 // sequentially.
161 class ToFileCacheDumpWriter : public CacheDumpWriter {
162  public:
ToFileCacheDumpWriter(std::unique_ptr<WritableFileWriter> && file_writer)163   explicit ToFileCacheDumpWriter(
164       std::unique_ptr<WritableFileWriter>&& file_writer)
165       : file_writer_(std::move(file_writer)) {}
166 
~ToFileCacheDumpWriter()167   ~ToFileCacheDumpWriter() { Close().PermitUncheckedError(); }
168 
169   // Write the serilized metadata to the file
WriteMetadata(const Slice & metadata)170   virtual IOStatus WriteMetadata(const Slice& metadata) override {
171     assert(file_writer_ != nullptr);
172     std::string prefix;
173     PutFixed32(&prefix, static_cast<uint32_t>(metadata.size()));
174     IOStatus io_s = file_writer_->Append(Slice(prefix));
175     if (!io_s.ok()) {
176       return io_s;
177     }
178     io_s = file_writer_->Append(metadata);
179     return io_s;
180   }
181 
182   // Write the serilized data to the file
WritePacket(const Slice & data)183   virtual IOStatus WritePacket(const Slice& data) override {
184     assert(file_writer_ != nullptr);
185     std::string prefix;
186     PutFixed32(&prefix, static_cast<uint32_t>(data.size()));
187     IOStatus io_s = file_writer_->Append(Slice(prefix));
188     if (!io_s.ok()) {
189       return io_s;
190     }
191     io_s = file_writer_->Append(data);
192     return io_s;
193   }
194 
195   // Reset the writer
Close()196   virtual IOStatus Close() override {
197     file_writer_.reset();
198     return IOStatus::OK();
199   }
200 
201  private:
202   std::unique_ptr<WritableFileWriter> file_writer_;
203 };
204 
205 // The default implementation of CacheDumpReader. It is implemented based on
206 // RandomAccessFileReader. Note that, we keep an internal variable to remember
207 // the current offset.
208 class FromFileCacheDumpReader : public CacheDumpReader {
209  public:
FromFileCacheDumpReader(std::unique_ptr<RandomAccessFileReader> && reader)210   explicit FromFileCacheDumpReader(
211       std::unique_ptr<RandomAccessFileReader>&& reader)
212       : file_reader_(std::move(reader)),
213         offset_(0),
214         buffer_(new char[kDumpReaderBufferSize]) {}
215 
~FromFileCacheDumpReader()216   ~FromFileCacheDumpReader() { delete[] buffer_; }
217 
ReadMetadata(std::string * metadata)218   virtual IOStatus ReadMetadata(std::string* metadata) override {
219     uint32_t metadata_len = 0;
220     IOStatus io_s = ReadSizePrefix(&metadata_len);
221     if (!io_s.ok()) {
222       return io_s;
223     }
224     return Read(metadata_len, metadata);
225   }
226 
ReadPacket(std::string * data)227   virtual IOStatus ReadPacket(std::string* data) override {
228     uint32_t data_len = 0;
229     IOStatus io_s = ReadSizePrefix(&data_len);
230     if (!io_s.ok()) {
231       return io_s;
232     }
233     return Read(data_len, data);
234   }
235 
236  private:
ReadSizePrefix(uint32_t * len)237   IOStatus ReadSizePrefix(uint32_t* len) {
238     std::string prefix;
239     IOStatus io_s = Read(kSizePrefixLen, &prefix);
240     if (!io_s.ok()) {
241       return io_s;
242     }
243     Slice encoded_slice(prefix);
244     if (!GetFixed32(&encoded_slice, len)) {
245       return IOStatus::Corruption("Decode size prefix string failed");
246     }
247     return IOStatus::OK();
248   }
249 
Read(size_t len,std::string * data)250   IOStatus Read(size_t len, std::string* data) {
251     assert(file_reader_ != nullptr);
252     IOStatus io_s;
253 
254     unsigned int bytes_to_read = static_cast<unsigned int>(len);
255     unsigned int to_read = bytes_to_read > kDumpReaderBufferSize
256                                ? kDumpReaderBufferSize
257                                : bytes_to_read;
258 
259     while (to_read > 0) {
260       io_s = file_reader_->Read(IOOptions(), offset_, to_read, &result_,
261                                 buffer_, nullptr);
262       if (!io_s.ok()) {
263         return io_s;
264       }
265       if (result_.size() < to_read) {
266         return IOStatus::Corruption("Corrupted cache dump file.");
267       }
268       data->append(result_.data(), result_.size());
269 
270       offset_ += to_read;
271       bytes_to_read -= to_read;
272       to_read = bytes_to_read > kDumpReaderBufferSize ? kDumpReaderBufferSize
273                                                       : bytes_to_read;
274     }
275     return io_s;
276   }
277   std::unique_ptr<RandomAccessFileReader> file_reader_;
278   Slice result_;
279   size_t offset_;
280   char* buffer_;
281 };
282 
283 // The cache dump and load helper class
284 class CacheDumperHelper {
285  public:
286   // serilize the dump_unit_meta to a string, it is fixed 16 bytes size.
EncodeDumpUnitMeta(const DumpUnitMeta & meta,std::string * data)287   static void EncodeDumpUnitMeta(const DumpUnitMeta& meta, std::string* data) {
288     assert(data);
289     PutFixed32(data, static_cast<uint32_t>(meta.sequence_num));
290     PutFixed32(data, static_cast<uint32_t>(meta.dump_unit_checksum));
291     PutFixed64(data, meta.dump_unit_size);
292   }
293 
294   // Serilize the dump_unit to a string.
EncodeDumpUnit(const DumpUnit & dump_unit,std::string * data)295   static void EncodeDumpUnit(const DumpUnit& dump_unit, std::string* data) {
296     assert(data);
297     PutFixed64(data, dump_unit.timestamp);
298     data->push_back(dump_unit.type);
299     PutLengthPrefixedSlice(data, dump_unit.key);
300     PutFixed32(data, static_cast<uint32_t>(dump_unit.value_len));
301     PutFixed32(data, dump_unit.value_checksum);
302     PutLengthPrefixedSlice(data,
303                            Slice((char*)dump_unit.value, dump_unit.value_len));
304   }
305 
306   // Deserilize the dump_unit_meta from a string
DecodeDumpUnitMeta(const std::string & encoded_data,DumpUnitMeta * unit_meta)307   static Status DecodeDumpUnitMeta(const std::string& encoded_data,
308                                    DumpUnitMeta* unit_meta) {
309     assert(unit_meta != nullptr);
310     Slice encoded_slice = Slice(encoded_data);
311     if (!GetFixed32(&encoded_slice, &(unit_meta->sequence_num))) {
312       return Status::Incomplete("Decode dumped unit meta sequence_num failed");
313     }
314     if (!GetFixed32(&encoded_slice, &(unit_meta->dump_unit_checksum))) {
315       return Status::Incomplete(
316           "Decode dumped unit meta dump_unit_checksum failed");
317     }
318     if (!GetFixed64(&encoded_slice, &(unit_meta->dump_unit_size))) {
319       return Status::Incomplete(
320           "Decode dumped unit meta dump_unit_size failed");
321     }
322     return Status::OK();
323   }
324 
325   // Deserilize the dump_unit from a string.
DecodeDumpUnit(const std::string & encoded_data,DumpUnit * dump_unit)326   static Status DecodeDumpUnit(const std::string& encoded_data,
327                                DumpUnit* dump_unit) {
328     assert(dump_unit != nullptr);
329     Slice encoded_slice = Slice(encoded_data);
330 
331     // Decode timestamp
332     if (!GetFixed64(&encoded_slice, &dump_unit->timestamp)) {
333       return Status::Incomplete("Decode dumped unit string failed");
334     }
335     // Decode the block type
336     dump_unit->type = static_cast<CacheDumpUnitType>(encoded_slice[0]);
337     encoded_slice.remove_prefix(1);
338     // Decode the key
339     if (!GetLengthPrefixedSlice(&encoded_slice, &(dump_unit->key))) {
340       return Status::Incomplete("Decode dumped unit string failed");
341     }
342     // Decode the value size
343     uint32_t value_len;
344     if (!GetFixed32(&encoded_slice, &value_len)) {
345       return Status::Incomplete("Decode dumped unit string failed");
346     }
347     dump_unit->value_len = static_cast<size_t>(value_len);
348     // Decode the value checksum
349     if (!GetFixed32(&encoded_slice, &(dump_unit->value_checksum))) {
350       return Status::Incomplete("Decode dumped unit string failed");
351     }
352     // Decode the block content and copy to the memory space whose pointer
353     // will be managed by the cache finally.
354     Slice block;
355     if (!GetLengthPrefixedSlice(&encoded_slice, &block)) {
356       return Status::Incomplete("Decode dumped unit string failed");
357     }
358     dump_unit->value = (void*)block.data();
359     assert(block.size() == dump_unit->value_len);
360     return Status::OK();
361   }
362 };
363 
364 }  // namespace ROCKSDB_NAMESPACE
365 #endif  // ROCKSDB_LITE
366