1 // Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved. 2 // This source code is licensed under both the GPLv2 (found in the 3 // COPYING file in the root directory) and Apache 2.0 License 4 // (found in the LICENSE.Apache file in the root directory). 5 6 #pragma once 7 #ifndef ROCKSDB_LITE 8 9 #include <unordered_map> 10 11 #include "file/random_access_file_reader.h" 12 #include "file/writable_file_writer.h" 13 #include "rocksdb/utilities/cache_dump_load.h" 14 #include "table/block_based/block.h" 15 #include "table/block_based/block_like_traits.h" 16 #include "table/block_based/block_type.h" 17 #include "table/block_based/cachable_entry.h" 18 #include "table/block_based/parsed_full_filter_block.h" 19 #include "table/block_based/reader_common.h" 20 21 namespace ROCKSDB_NAMESPACE { 22 23 // the read buffer size of for the default CacheDumpReader 24 const unsigned int kDumpReaderBufferSize = 1024; // 1KB 25 static const unsigned int kSizePrefixLen = 4; 26 27 enum CacheDumpUnitType : unsigned char { 28 kHeader = 1, 29 kFooter = 2, 30 kData = 3, 31 kFilter = 4, 32 kProperties = 5, 33 kCompressionDictionary = 6, 34 kRangeDeletion = 7, 35 kHashIndexPrefixes = 8, 36 kHashIndexMetadata = 9, 37 kMetaIndex = 10, 38 kIndex = 11, 39 kDeprecatedFilterBlock = 12, 40 kFilterMetaBlock = 13, 41 kBlockTypeMax, 42 }; 43 44 // The metadata of a dump unit. After it is serilized, its size is fixed 16 45 // bytes. 46 struct DumpUnitMeta { 47 // sequence number is a monotonically increasing number to indicate the order 48 // of the blocks being written. Header is 0. 49 uint32_t sequence_num; 50 // The Crc32c checksum of its dump unit. 51 uint32_t dump_unit_checksum; 52 // The dump unit size after the dump unit is serilized to a string. 53 uint64_t dump_unit_size; 54 resetDumpUnitMeta55 void reset() { 56 sequence_num = 0; 57 dump_unit_checksum = 0; 58 dump_unit_size = 0; 59 } 60 }; 61 62 // The data structure to hold a block and its information. 63 struct DumpUnit { 64 // The timestamp when the block is identified, copied, and dumped from block 65 // cache 66 uint64_t timestamp; 67 // The type of the block 68 CacheDumpUnitType type; 69 // The key of this block when the block is referenced by this Cache 70 Slice key; 71 // The block size 72 size_t value_len; 73 // The Crc32c checksum of the block 74 uint32_t value_checksum; 75 // Pointer to the block. Note that, in the dump process, it points to a memory 76 // buffer copied from cache block. The buffer is freed when we process the 77 // next block. In the load process, we use an std::string to store the 78 // serilized dump_unit read from the reader. So it points to the memory 79 // address of the begin of the block in this string. 80 void* value; 81 DumpUnitDumpUnit82 DumpUnit() { reset(); } 83 resetDumpUnit84 void reset() { 85 timestamp = 0; 86 type = CacheDumpUnitType::kBlockTypeMax; 87 key.clear(); 88 value_len = 0; 89 value_checksum = 0; 90 value = nullptr; 91 } 92 }; 93 94 // The default implementation of the Cache Dumper 95 class CacheDumperImpl : public CacheDumper { 96 public: CacheDumperImpl(const CacheDumpOptions & dump_options,const std::shared_ptr<Cache> & cache,std::unique_ptr<CacheDumpWriter> && writer)97 CacheDumperImpl(const CacheDumpOptions& dump_options, 98 const std::shared_ptr<Cache>& cache, 99 std::unique_ptr<CacheDumpWriter>&& writer) 100 : options_(dump_options), cache_(cache), writer_(std::move(writer)) {} ~CacheDumperImpl()101 ~CacheDumperImpl() { writer_.reset(); } 102 Status SetDumpFilter(std::vector<DB*> db_list) override; 103 IOStatus DumpCacheEntriesToWriter() override; 104 105 private: 106 IOStatus WriteRawBlock(uint64_t timestamp, CacheDumpUnitType type, 107 const Slice& key, void* value, size_t len, 108 uint32_t checksum); 109 110 IOStatus WriteHeader(); 111 112 IOStatus WriteCacheBlock(const CacheDumpUnitType type, const Slice& key, 113 void* value, size_t len); 114 IOStatus WriteFooter(); 115 bool ShouldFilterOut(const Slice& key); 116 std::function<void(const Slice&, void*, size_t, Cache::DeleterFn)> 117 DumpOneBlockCallBack(); 118 119 CacheDumpOptions options_; 120 std::shared_ptr<Cache> cache_; 121 std::unique_ptr<CacheDumpWriter> writer_; 122 std::unordered_map<Cache::DeleterFn, CacheEntryRole> role_map_; 123 SystemClock* clock_; 124 uint32_t sequence_num_; 125 // The cache key prefix filter. Currently, we use db_session_id as the prefix, 126 // so using std::set to store the prefixes as filter is enough. Further 127 // improvement can be applied like BloomFilter or others to speedup the 128 // filtering. 129 std::set<std::string> prefix_filter_; 130 }; 131 132 // The default implementation of CacheDumpedLoader 133 class CacheDumpedLoaderImpl : public CacheDumpedLoader { 134 public: CacheDumpedLoaderImpl(const CacheDumpOptions & dump_options,const BlockBasedTableOptions & toptions,const std::shared_ptr<SecondaryCache> & secondary_cache,std::unique_ptr<CacheDumpReader> && reader)135 CacheDumpedLoaderImpl(const CacheDumpOptions& dump_options, 136 const BlockBasedTableOptions& toptions, 137 const std::shared_ptr<SecondaryCache>& secondary_cache, 138 std::unique_ptr<CacheDumpReader>&& reader) 139 : options_(dump_options), 140 toptions_(toptions), 141 secondary_cache_(secondary_cache), 142 reader_(std::move(reader)) {} ~CacheDumpedLoaderImpl()143 ~CacheDumpedLoaderImpl() {} 144 IOStatus RestoreCacheEntriesToSecondaryCache() override; 145 146 private: 147 IOStatus ReadDumpUnitMeta(std::string* data, DumpUnitMeta* unit_meta); 148 IOStatus ReadDumpUnit(size_t len, std::string* data, DumpUnit* unit); 149 IOStatus ReadHeader(std::string* data, DumpUnit* dump_unit); 150 IOStatus ReadCacheBlock(std::string* data, DumpUnit* dump_unit); 151 152 CacheDumpOptions options_; 153 const BlockBasedTableOptions& toptions_; 154 std::shared_ptr<SecondaryCache> secondary_cache_; 155 std::unique_ptr<CacheDumpReader> reader_; 156 std::unordered_map<Cache::DeleterFn, CacheEntryRole> role_map_; 157 }; 158 159 // The default implementation of CacheDumpWriter. We write the blocks to a file 160 // sequentially. 161 class ToFileCacheDumpWriter : public CacheDumpWriter { 162 public: ToFileCacheDumpWriter(std::unique_ptr<WritableFileWriter> && file_writer)163 explicit ToFileCacheDumpWriter( 164 std::unique_ptr<WritableFileWriter>&& file_writer) 165 : file_writer_(std::move(file_writer)) {} 166 ~ToFileCacheDumpWriter()167 ~ToFileCacheDumpWriter() { Close().PermitUncheckedError(); } 168 169 // Write the serilized metadata to the file WriteMetadata(const Slice & metadata)170 virtual IOStatus WriteMetadata(const Slice& metadata) override { 171 assert(file_writer_ != nullptr); 172 std::string prefix; 173 PutFixed32(&prefix, static_cast<uint32_t>(metadata.size())); 174 IOStatus io_s = file_writer_->Append(Slice(prefix)); 175 if (!io_s.ok()) { 176 return io_s; 177 } 178 io_s = file_writer_->Append(metadata); 179 return io_s; 180 } 181 182 // Write the serilized data to the file WritePacket(const Slice & data)183 virtual IOStatus WritePacket(const Slice& data) override { 184 assert(file_writer_ != nullptr); 185 std::string prefix; 186 PutFixed32(&prefix, static_cast<uint32_t>(data.size())); 187 IOStatus io_s = file_writer_->Append(Slice(prefix)); 188 if (!io_s.ok()) { 189 return io_s; 190 } 191 io_s = file_writer_->Append(data); 192 return io_s; 193 } 194 195 // Reset the writer Close()196 virtual IOStatus Close() override { 197 file_writer_.reset(); 198 return IOStatus::OK(); 199 } 200 201 private: 202 std::unique_ptr<WritableFileWriter> file_writer_; 203 }; 204 205 // The default implementation of CacheDumpReader. It is implemented based on 206 // RandomAccessFileReader. Note that, we keep an internal variable to remember 207 // the current offset. 208 class FromFileCacheDumpReader : public CacheDumpReader { 209 public: FromFileCacheDumpReader(std::unique_ptr<RandomAccessFileReader> && reader)210 explicit FromFileCacheDumpReader( 211 std::unique_ptr<RandomAccessFileReader>&& reader) 212 : file_reader_(std::move(reader)), 213 offset_(0), 214 buffer_(new char[kDumpReaderBufferSize]) {} 215 ~FromFileCacheDumpReader()216 ~FromFileCacheDumpReader() { delete[] buffer_; } 217 ReadMetadata(std::string * metadata)218 virtual IOStatus ReadMetadata(std::string* metadata) override { 219 uint32_t metadata_len = 0; 220 IOStatus io_s = ReadSizePrefix(&metadata_len); 221 if (!io_s.ok()) { 222 return io_s; 223 } 224 return Read(metadata_len, metadata); 225 } 226 ReadPacket(std::string * data)227 virtual IOStatus ReadPacket(std::string* data) override { 228 uint32_t data_len = 0; 229 IOStatus io_s = ReadSizePrefix(&data_len); 230 if (!io_s.ok()) { 231 return io_s; 232 } 233 return Read(data_len, data); 234 } 235 236 private: ReadSizePrefix(uint32_t * len)237 IOStatus ReadSizePrefix(uint32_t* len) { 238 std::string prefix; 239 IOStatus io_s = Read(kSizePrefixLen, &prefix); 240 if (!io_s.ok()) { 241 return io_s; 242 } 243 Slice encoded_slice(prefix); 244 if (!GetFixed32(&encoded_slice, len)) { 245 return IOStatus::Corruption("Decode size prefix string failed"); 246 } 247 return IOStatus::OK(); 248 } 249 Read(size_t len,std::string * data)250 IOStatus Read(size_t len, std::string* data) { 251 assert(file_reader_ != nullptr); 252 IOStatus io_s; 253 254 unsigned int bytes_to_read = static_cast<unsigned int>(len); 255 unsigned int to_read = bytes_to_read > kDumpReaderBufferSize 256 ? kDumpReaderBufferSize 257 : bytes_to_read; 258 259 while (to_read > 0) { 260 io_s = file_reader_->Read(IOOptions(), offset_, to_read, &result_, 261 buffer_, nullptr); 262 if (!io_s.ok()) { 263 return io_s; 264 } 265 if (result_.size() < to_read) { 266 return IOStatus::Corruption("Corrupted cache dump file."); 267 } 268 data->append(result_.data(), result_.size()); 269 270 offset_ += to_read; 271 bytes_to_read -= to_read; 272 to_read = bytes_to_read > kDumpReaderBufferSize ? kDumpReaderBufferSize 273 : bytes_to_read; 274 } 275 return io_s; 276 } 277 std::unique_ptr<RandomAccessFileReader> file_reader_; 278 Slice result_; 279 size_t offset_; 280 char* buffer_; 281 }; 282 283 // The cache dump and load helper class 284 class CacheDumperHelper { 285 public: 286 // serilize the dump_unit_meta to a string, it is fixed 16 bytes size. EncodeDumpUnitMeta(const DumpUnitMeta & meta,std::string * data)287 static void EncodeDumpUnitMeta(const DumpUnitMeta& meta, std::string* data) { 288 assert(data); 289 PutFixed32(data, static_cast<uint32_t>(meta.sequence_num)); 290 PutFixed32(data, static_cast<uint32_t>(meta.dump_unit_checksum)); 291 PutFixed64(data, meta.dump_unit_size); 292 } 293 294 // Serilize the dump_unit to a string. EncodeDumpUnit(const DumpUnit & dump_unit,std::string * data)295 static void EncodeDumpUnit(const DumpUnit& dump_unit, std::string* data) { 296 assert(data); 297 PutFixed64(data, dump_unit.timestamp); 298 data->push_back(dump_unit.type); 299 PutLengthPrefixedSlice(data, dump_unit.key); 300 PutFixed32(data, static_cast<uint32_t>(dump_unit.value_len)); 301 PutFixed32(data, dump_unit.value_checksum); 302 PutLengthPrefixedSlice(data, 303 Slice((char*)dump_unit.value, dump_unit.value_len)); 304 } 305 306 // Deserilize the dump_unit_meta from a string DecodeDumpUnitMeta(const std::string & encoded_data,DumpUnitMeta * unit_meta)307 static Status DecodeDumpUnitMeta(const std::string& encoded_data, 308 DumpUnitMeta* unit_meta) { 309 assert(unit_meta != nullptr); 310 Slice encoded_slice = Slice(encoded_data); 311 if (!GetFixed32(&encoded_slice, &(unit_meta->sequence_num))) { 312 return Status::Incomplete("Decode dumped unit meta sequence_num failed"); 313 } 314 if (!GetFixed32(&encoded_slice, &(unit_meta->dump_unit_checksum))) { 315 return Status::Incomplete( 316 "Decode dumped unit meta dump_unit_checksum failed"); 317 } 318 if (!GetFixed64(&encoded_slice, &(unit_meta->dump_unit_size))) { 319 return Status::Incomplete( 320 "Decode dumped unit meta dump_unit_size failed"); 321 } 322 return Status::OK(); 323 } 324 325 // Deserilize the dump_unit from a string. DecodeDumpUnit(const std::string & encoded_data,DumpUnit * dump_unit)326 static Status DecodeDumpUnit(const std::string& encoded_data, 327 DumpUnit* dump_unit) { 328 assert(dump_unit != nullptr); 329 Slice encoded_slice = Slice(encoded_data); 330 331 // Decode timestamp 332 if (!GetFixed64(&encoded_slice, &dump_unit->timestamp)) { 333 return Status::Incomplete("Decode dumped unit string failed"); 334 } 335 // Decode the block type 336 dump_unit->type = static_cast<CacheDumpUnitType>(encoded_slice[0]); 337 encoded_slice.remove_prefix(1); 338 // Decode the key 339 if (!GetLengthPrefixedSlice(&encoded_slice, &(dump_unit->key))) { 340 return Status::Incomplete("Decode dumped unit string failed"); 341 } 342 // Decode the value size 343 uint32_t value_len; 344 if (!GetFixed32(&encoded_slice, &value_len)) { 345 return Status::Incomplete("Decode dumped unit string failed"); 346 } 347 dump_unit->value_len = static_cast<size_t>(value_len); 348 // Decode the value checksum 349 if (!GetFixed32(&encoded_slice, &(dump_unit->value_checksum))) { 350 return Status::Incomplete("Decode dumped unit string failed"); 351 } 352 // Decode the block content and copy to the memory space whose pointer 353 // will be managed by the cache finally. 354 Slice block; 355 if (!GetLengthPrefixedSlice(&encoded_slice, &block)) { 356 return Status::Incomplete("Decode dumped unit string failed"); 357 } 358 dump_unit->value = (void*)block.data(); 359 assert(block.size() == dump_unit->value_len); 360 return Status::OK(); 361 } 362 }; 363 364 } // namespace ROCKSDB_NAMESPACE 365 #endif // ROCKSDB_LITE 366