1 // Copyright (c) 2013, Facebook, Inc. All rights reserved. 2 // This source code is licensed under both the GPLv2 (found in the 3 // COPYING file in the root directory) and Apache 2.0 License 4 // (found in the LICENSE.Apache file in the root directory). 5 #pragma once 6 7 #ifndef ROCKSDB_LITE 8 9 #include <list> 10 #include <memory> 11 #include <string> 12 #include <vector> 13 14 #include "file/random_access_file_reader.h" 15 16 #include "rocksdb/comparator.h" 17 #include "rocksdb/env.h" 18 19 #include "utilities/persistent_cache/block_cache_tier_file_buffer.h" 20 #include "utilities/persistent_cache/lrulist.h" 21 #include "utilities/persistent_cache/persistent_cache_tier.h" 22 #include "utilities/persistent_cache/persistent_cache_util.h" 23 24 #include "port/port.h" 25 #include "util/crc32c.h" 26 #include "util/mutexlock.h" 27 28 // The io code path of persistent cache uses pipelined architecture 29 // 30 // client -> In Queue <-- BlockCacheTier --> Out Queue <-- Writer <--> Kernel 31 // 32 // This would enable the system to scale for GB/s of throughput which is 33 // expected with modern devies like NVM. 34 // 35 // The file level operations are encapsulated in the following abstractions 36 // 37 // BlockCacheFile 38 // ^ 39 // | 40 // | 41 // RandomAccessCacheFile (For reading) 42 // ^ 43 // | 44 // | 45 // WriteableCacheFile (For writing) 46 // 47 // Write IO code path : 48 // 49 namespace ROCKSDB_NAMESPACE { 50 51 class WriteableCacheFile; 52 struct BlockInfo; 53 54 // Represents a logical record on device 55 // 56 // (L)ogical (B)lock (Address = { cache-file-id, offset, size } 57 struct LogicalBlockAddress { LogicalBlockAddressLogicalBlockAddress58 LogicalBlockAddress() {} LogicalBlockAddressLogicalBlockAddress59 explicit LogicalBlockAddress(const uint32_t cache_id, const uint32_t off, 60 const uint16_t size) 61 : cache_id_(cache_id), off_(off), size_(size) {} 62 63 uint32_t cache_id_ = 0; 64 uint32_t off_ = 0; 65 uint32_t size_ = 0; 66 }; 67 68 typedef LogicalBlockAddress LBA; 69 70 // class Writer 71 // 72 // Writer is the abstraction used for writing data to file. The component can be 73 // multithreaded. It is the last step of write pipeline 74 class Writer { 75 public: Writer(PersistentCacheTier * const cache)76 explicit Writer(PersistentCacheTier* const cache) : cache_(cache) {} ~Writer()77 virtual ~Writer() {} 78 79 // write buffer to file at the given offset 80 virtual void Write(WritableFile* const file, CacheWriteBuffer* buf, 81 const uint64_t file_off, 82 const std::function<void()> callback) = 0; 83 // stop the writer 84 virtual void Stop() = 0; 85 86 PersistentCacheTier* const cache_; 87 }; 88 89 // class BlockCacheFile 90 // 91 // Generic interface to support building file specialized for read/writing 92 class BlockCacheFile : public LRUElement<BlockCacheFile> { 93 public: BlockCacheFile(const uint32_t cache_id)94 explicit BlockCacheFile(const uint32_t cache_id) 95 : LRUElement<BlockCacheFile>(), cache_id_(cache_id) {} 96 BlockCacheFile(Env * const env,const std::string & dir,const uint32_t cache_id)97 explicit BlockCacheFile(Env* const env, const std::string& dir, 98 const uint32_t cache_id) 99 : LRUElement<BlockCacheFile>(), 100 env_(env), 101 dir_(dir), 102 cache_id_(cache_id) {} 103 ~BlockCacheFile()104 virtual ~BlockCacheFile() {} 105 106 // append key/value to file and return LBA locator to user Append(const Slice &,const Slice &,LBA * const)107 virtual bool Append(const Slice& /*key*/, const Slice& /*val*/, 108 LBA* const /*lba*/) { 109 assert(!"not implemented"); 110 return false; 111 } 112 113 // read from the record locator (LBA) and return key, value and status Read(const LBA &,Slice *,Slice *,char *)114 virtual bool Read(const LBA& /*lba*/, Slice* /*key*/, Slice* /*block*/, 115 char* /*scratch*/) { 116 assert(!"not implemented"); 117 return false; 118 } 119 120 // get file path Path()121 std::string Path() const { 122 return dir_ + "/" + std::to_string(cache_id_) + ".rc"; 123 } 124 // get cache ID cacheid()125 uint32_t cacheid() const { return cache_id_; } 126 // Add block information to file data 127 // Block information is the list of index reference for this file Add(BlockInfo * binfo)128 virtual void Add(BlockInfo* binfo) { 129 WriteLock _(&rwlock_); 130 block_infos_.push_back(binfo); 131 } 132 // get block information block_infos()133 std::list<BlockInfo*>& block_infos() { return block_infos_; } 134 // delete file and return the size of the file 135 virtual Status Delete(uint64_t* size); 136 137 protected: 138 port::RWMutex rwlock_; // synchronization mutex 139 Env* const env_ = nullptr; // Env for OS 140 const std::string dir_; // Directory name 141 const uint32_t cache_id_; // Cache id for the file 142 std::list<BlockInfo*> block_infos_; // List of index entries mapping to the 143 // file content 144 }; 145 146 // class RandomAccessFile 147 // 148 // Thread safe implementation for reading random data from file 149 class RandomAccessCacheFile : public BlockCacheFile { 150 public: RandomAccessCacheFile(Env * const env,const std::string & dir,const uint32_t cache_id,const std::shared_ptr<Logger> & log)151 explicit RandomAccessCacheFile(Env* const env, const std::string& dir, 152 const uint32_t cache_id, 153 const std::shared_ptr<Logger>& log) 154 : BlockCacheFile(env, dir, cache_id), log_(log) {} 155 ~RandomAccessCacheFile()156 virtual ~RandomAccessCacheFile() {} 157 158 // open file for reading 159 bool Open(const bool enable_direct_reads); 160 // read data from the disk 161 bool Read(const LBA& lba, Slice* key, Slice* block, char* scratch) override; 162 163 private: 164 std::unique_ptr<RandomAccessFileReader> freader_; 165 166 protected: 167 bool OpenImpl(const bool enable_direct_reads); 168 bool ParseRec(const LBA& lba, Slice* key, Slice* val, char* scratch); 169 170 std::shared_ptr<Logger> log_; // log file 171 }; 172 173 // class WriteableCacheFile 174 // 175 // All writes to the files are cached in buffers. The buffers are flushed to 176 // disk as they get filled up. When file size reaches a certain size, a new file 177 // will be created provided there is free space 178 class WriteableCacheFile : public RandomAccessCacheFile { 179 public: WriteableCacheFile(Env * const env,CacheWriteBufferAllocator * alloc,Writer * writer,const std::string & dir,const uint32_t cache_id,const uint32_t max_size,const std::shared_ptr<Logger> & log)180 explicit WriteableCacheFile(Env* const env, CacheWriteBufferAllocator* alloc, 181 Writer* writer, const std::string& dir, 182 const uint32_t cache_id, const uint32_t max_size, 183 const std::shared_ptr<Logger>& log) 184 : RandomAccessCacheFile(env, dir, cache_id, log), 185 alloc_(alloc), 186 writer_(writer), 187 max_size_(max_size) {} 188 189 virtual ~WriteableCacheFile(); 190 191 // create file on disk 192 bool Create(const bool enable_direct_writes, const bool enable_direct_reads); 193 194 // read data from logical file Read(const LBA & lba,Slice * key,Slice * block,char * scratch)195 bool Read(const LBA& lba, Slice* key, Slice* block, char* scratch) override { 196 ReadLock _(&rwlock_); 197 const bool closed = eof_ && bufs_.empty(); 198 if (closed) { 199 // the file is closed, read from disk 200 return RandomAccessCacheFile::Read(lba, key, block, scratch); 201 } 202 // file is still being written, read from buffers 203 return ReadBuffer(lba, key, block, scratch); 204 } 205 206 // append data to end of file 207 bool Append(const Slice&, const Slice&, LBA* const) override; 208 // End-of-file Eof()209 bool Eof() const { return eof_; } 210 211 private: 212 friend class ThreadedWriter; 213 214 static const size_t kFileAlignmentSize = 4 * 1024; // align file size 215 216 bool ReadBuffer(const LBA& lba, Slice* key, Slice* block, char* scratch); 217 bool ReadBuffer(const LBA& lba, char* data); 218 bool ExpandBuffer(const size_t size); 219 void DispatchBuffer(); 220 void BufferWriteDone(); 221 void CloseAndOpenForReading(); 222 void ClearBuffers(); 223 void Close(); 224 225 // File layout in memory 226 // 227 // +------+------+------+------+------+------+ 228 // | b0 | b1 | b2 | b3 | b4 | b5 | 229 // +------+------+------+------+------+------+ 230 // ^ ^ 231 // | | 232 // buf_doff_ buf_woff_ 233 // (next buffer to (next buffer to fill) 234 // flush to disk) 235 // 236 // The buffers are flushed to disk serially for a given file 237 238 CacheWriteBufferAllocator* const alloc_ = nullptr; // Buffer provider 239 Writer* const writer_ = nullptr; // File writer thread 240 std::unique_ptr<WritableFile> file_; // RocksDB Env file abstraction 241 std::vector<CacheWriteBuffer*> bufs_; // Written buffers 242 uint32_t size_ = 0; // Size of the file 243 const uint32_t max_size_; // Max size of the file 244 bool eof_ = false; // End of file 245 uint32_t disk_woff_ = 0; // Offset to write on disk 246 size_t buf_woff_ = 0; // off into bufs_ to write 247 size_t buf_doff_ = 0; // off into bufs_ to dispatch 248 size_t pending_ios_ = 0; // Number of ios to disk in-progress 249 bool enable_direct_reads_ = false; // Should we enable direct reads 250 // when reading from disk 251 }; 252 253 // 254 // Abstraction to do writing to device. It is part of pipelined architecture. 255 // 256 class ThreadedWriter : public Writer { 257 public: 258 // Representation of IO to device 259 struct IO { IOIO260 explicit IO(const bool signal) : signal_(signal) {} IOIO261 explicit IO(WritableFile* const file, CacheWriteBuffer* const buf, 262 const uint64_t file_off, const std::function<void()> callback) 263 : file_(file), buf_(buf), file_off_(file_off), callback_(callback) {} 264 265 IO(const IO&) = default; 266 IO& operator=(const IO&) = default; SizeIO267 size_t Size() const { return sizeof(IO); } 268 269 WritableFile* file_ = nullptr; // File to write to 270 CacheWriteBuffer* buf_ = nullptr; // buffer to write 271 uint64_t file_off_ = 0; // file offset 272 bool signal_ = false; // signal to exit thread loop 273 std::function<void()> callback_; // Callback on completion 274 }; 275 276 explicit ThreadedWriter(PersistentCacheTier* const cache, const size_t qdepth, 277 const size_t io_size); ~ThreadedWriter()278 virtual ~ThreadedWriter() { assert(threads_.empty()); } 279 280 void Stop() override; 281 void Write(WritableFile* const file, CacheWriteBuffer* buf, 282 const uint64_t file_off, 283 const std::function<void()> callback) override; 284 285 private: 286 void ThreadMain(); 287 void DispatchIO(const IO& io); 288 289 const size_t io_size_ = 0; 290 BoundedQueue<IO> q_; 291 std::vector<port::Thread> threads_; 292 }; 293 294 } // namespace ROCKSDB_NAMESPACE 295 296 #endif 297