1 //  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 #pragma once
6 
7 #ifndef ROCKSDB_LITE
8 
9 #include <list>
10 #include <memory>
11 #include <string>
12 #include <vector>
13 
14 #include "file/random_access_file_reader.h"
15 
16 #include "rocksdb/comparator.h"
17 #include "rocksdb/env.h"
18 
19 #include "utilities/persistent_cache/block_cache_tier_file_buffer.h"
20 #include "utilities/persistent_cache/lrulist.h"
21 #include "utilities/persistent_cache/persistent_cache_tier.h"
22 #include "utilities/persistent_cache/persistent_cache_util.h"
23 
24 #include "port/port.h"
25 #include "util/crc32c.h"
26 #include "util/mutexlock.h"
27 
28 // The io code path of persistent cache uses pipelined architecture
29 //
30 // client -> In Queue <-- BlockCacheTier --> Out Queue <-- Writer <--> Kernel
31 //
32 // This would enable the system to scale for GB/s of throughput which is
33 // expected with modern devies like NVM.
34 //
35 // The file level operations are encapsulated in the following abstractions
36 //
37 // BlockCacheFile
38 //       ^
39 //       |
40 //       |
41 // RandomAccessCacheFile (For reading)
42 //       ^
43 //       |
44 //       |
45 // WriteableCacheFile (For writing)
46 //
47 // Write IO code path :
48 //
49 namespace ROCKSDB_NAMESPACE {
50 
51 class WriteableCacheFile;
52 struct BlockInfo;
53 
54 // Represents a logical record on device
55 //
56 // (L)ogical (B)lock (Address = { cache-file-id, offset, size }
57 struct LogicalBlockAddress {
LogicalBlockAddressLogicalBlockAddress58   LogicalBlockAddress() {}
LogicalBlockAddressLogicalBlockAddress59   explicit LogicalBlockAddress(const uint32_t cache_id, const uint32_t off,
60                                const uint16_t size)
61       : cache_id_(cache_id), off_(off), size_(size) {}
62 
63   uint32_t cache_id_ = 0;
64   uint32_t off_ = 0;
65   uint32_t size_ = 0;
66 };
67 
68 typedef LogicalBlockAddress LBA;
69 
70 // class Writer
71 //
72 // Writer is the abstraction used for writing data to file. The component can be
73 // multithreaded. It is the last step of write pipeline
74 class Writer {
75  public:
Writer(PersistentCacheTier * const cache)76   explicit Writer(PersistentCacheTier* const cache) : cache_(cache) {}
~Writer()77   virtual ~Writer() {}
78 
79   // write buffer to file at the given offset
80   virtual void Write(WritableFile* const file, CacheWriteBuffer* buf,
81                      const uint64_t file_off,
82                      const std::function<void()> callback) = 0;
83   // stop the writer
84   virtual void Stop() = 0;
85 
86   PersistentCacheTier* const cache_;
87 };
88 
89 // class BlockCacheFile
90 //
91 // Generic interface to support building file specialized for read/writing
92 class BlockCacheFile : public LRUElement<BlockCacheFile> {
93  public:
BlockCacheFile(const uint32_t cache_id)94   explicit BlockCacheFile(const uint32_t cache_id)
95       : LRUElement<BlockCacheFile>(), cache_id_(cache_id) {}
96 
BlockCacheFile(Env * const env,const std::string & dir,const uint32_t cache_id)97   explicit BlockCacheFile(Env* const env, const std::string& dir,
98                           const uint32_t cache_id)
99       : LRUElement<BlockCacheFile>(),
100         env_(env),
101         dir_(dir),
102         cache_id_(cache_id) {}
103 
~BlockCacheFile()104   virtual ~BlockCacheFile() {}
105 
106   // append key/value to file and return LBA locator to user
Append(const Slice &,const Slice &,LBA * const)107   virtual bool Append(const Slice& /*key*/, const Slice& /*val*/,
108                       LBA* const /*lba*/) {
109     assert(!"not implemented");
110     return false;
111   }
112 
113   // read from the record locator (LBA) and return key, value and status
Read(const LBA &,Slice *,Slice *,char *)114   virtual bool Read(const LBA& /*lba*/, Slice* /*key*/, Slice* /*block*/,
115                     char* /*scratch*/) {
116     assert(!"not implemented");
117     return false;
118   }
119 
120   // get file path
Path()121   std::string Path() const {
122     return dir_ + "/" + std::to_string(cache_id_) + ".rc";
123   }
124   // get cache ID
cacheid()125   uint32_t cacheid() const { return cache_id_; }
126   // Add block information to file data
127   // Block information is the list of index reference for this file
Add(BlockInfo * binfo)128   virtual void Add(BlockInfo* binfo) {
129     WriteLock _(&rwlock_);
130     block_infos_.push_back(binfo);
131   }
132   // get block information
block_infos()133   std::list<BlockInfo*>& block_infos() { return block_infos_; }
134   // delete file and return the size of the file
135   virtual Status Delete(uint64_t* size);
136 
137  protected:
138   port::RWMutex rwlock_;               // synchronization mutex
139   Env* const env_ = nullptr;           // Env for OS
140   const std::string dir_;              // Directory name
141   const uint32_t cache_id_;            // Cache id for the file
142   std::list<BlockInfo*> block_infos_;  // List of index entries mapping to the
143                                        // file content
144 };
145 
146 // class RandomAccessFile
147 //
148 // Thread safe implementation for reading random data from file
149 class RandomAccessCacheFile : public BlockCacheFile {
150  public:
RandomAccessCacheFile(Env * const env,const std::string & dir,const uint32_t cache_id,const std::shared_ptr<Logger> & log)151   explicit RandomAccessCacheFile(Env* const env, const std::string& dir,
152                                  const uint32_t cache_id,
153                                  const std::shared_ptr<Logger>& log)
154       : BlockCacheFile(env, dir, cache_id), log_(log) {}
155 
~RandomAccessCacheFile()156   virtual ~RandomAccessCacheFile() {}
157 
158   // open file for reading
159   bool Open(const bool enable_direct_reads);
160   // read data from the disk
161   bool Read(const LBA& lba, Slice* key, Slice* block, char* scratch) override;
162 
163  private:
164   std::unique_ptr<RandomAccessFileReader> freader_;
165 
166  protected:
167   bool OpenImpl(const bool enable_direct_reads);
168   bool ParseRec(const LBA& lba, Slice* key, Slice* val, char* scratch);
169 
170   std::shared_ptr<Logger> log_;  // log file
171 };
172 
173 // class WriteableCacheFile
174 //
175 // All writes to the files are cached in buffers. The buffers are flushed to
176 // disk as they get filled up. When file size reaches a certain size, a new file
177 // will be created provided there is free space
178 class WriteableCacheFile : public RandomAccessCacheFile {
179  public:
WriteableCacheFile(Env * const env,CacheWriteBufferAllocator * alloc,Writer * writer,const std::string & dir,const uint32_t cache_id,const uint32_t max_size,const std::shared_ptr<Logger> & log)180   explicit WriteableCacheFile(Env* const env, CacheWriteBufferAllocator* alloc,
181                               Writer* writer, const std::string& dir,
182                               const uint32_t cache_id, const uint32_t max_size,
183                               const std::shared_ptr<Logger>& log)
184       : RandomAccessCacheFile(env, dir, cache_id, log),
185         alloc_(alloc),
186         writer_(writer),
187         max_size_(max_size) {}
188 
189   virtual ~WriteableCacheFile();
190 
191   // create file on disk
192   bool Create(const bool enable_direct_writes, const bool enable_direct_reads);
193 
194   // read data from logical file
Read(const LBA & lba,Slice * key,Slice * block,char * scratch)195   bool Read(const LBA& lba, Slice* key, Slice* block, char* scratch) override {
196     ReadLock _(&rwlock_);
197     const bool closed = eof_ && bufs_.empty();
198     if (closed) {
199       // the file is closed, read from disk
200       return RandomAccessCacheFile::Read(lba, key, block, scratch);
201     }
202     // file is still being written, read from buffers
203     return ReadBuffer(lba, key, block, scratch);
204   }
205 
206   // append data to end of file
207   bool Append(const Slice&, const Slice&, LBA* const) override;
208   // End-of-file
Eof()209   bool Eof() const { return eof_; }
210 
211  private:
212   friend class ThreadedWriter;
213 
214   static const size_t kFileAlignmentSize = 4 * 1024;  // align file size
215 
216   bool ReadBuffer(const LBA& lba, Slice* key, Slice* block, char* scratch);
217   bool ReadBuffer(const LBA& lba, char* data);
218   bool ExpandBuffer(const size_t size);
219   void DispatchBuffer();
220   void BufferWriteDone();
221   void CloseAndOpenForReading();
222   void ClearBuffers();
223   void Close();
224 
225   // File layout in memory
226   //
227   // +------+------+------+------+------+------+
228   // | b0   | b1   | b2   | b3   | b4   | b5   |
229   // +------+------+------+------+------+------+
230   //        ^                           ^
231   //        |                           |
232   //      buf_doff_                   buf_woff_
233   //   (next buffer to           (next buffer to fill)
234   //   flush to disk)
235   //
236   //  The buffers are flushed to disk serially for a given file
237 
238   CacheWriteBufferAllocator* const alloc_ = nullptr;  // Buffer provider
239   Writer* const writer_ = nullptr;                    // File writer thread
240   std::unique_ptr<WritableFile> file_;   // RocksDB Env file abstraction
241   std::vector<CacheWriteBuffer*> bufs_;  // Written buffers
242   uint32_t size_ = 0;                    // Size of the file
243   const uint32_t max_size_;              // Max size of the file
244   bool eof_ = false;                     // End of file
245   uint32_t disk_woff_ = 0;               // Offset to write on disk
246   size_t buf_woff_ = 0;                  // off into bufs_ to write
247   size_t buf_doff_ = 0;                  // off into bufs_ to dispatch
248   size_t pending_ios_ = 0;               // Number of ios to disk in-progress
249   bool enable_direct_reads_ = false;     // Should we enable direct reads
250                                          // when reading from disk
251 };
252 
253 //
254 // Abstraction to do writing to device. It is part of pipelined architecture.
255 //
256 class ThreadedWriter : public Writer {
257  public:
258   // Representation of IO to device
259   struct IO {
IOIO260     explicit IO(const bool signal) : signal_(signal) {}
IOIO261     explicit IO(WritableFile* const file, CacheWriteBuffer* const buf,
262                 const uint64_t file_off, const std::function<void()> callback)
263         : file_(file), buf_(buf), file_off_(file_off), callback_(callback) {}
264 
265     IO(const IO&) = default;
266     IO& operator=(const IO&) = default;
SizeIO267     size_t Size() const { return sizeof(IO); }
268 
269     WritableFile* file_ = nullptr;     // File to write to
270     CacheWriteBuffer* buf_ = nullptr;  // buffer to write
271     uint64_t file_off_ = 0;            // file offset
272     bool signal_ = false;              // signal to exit thread loop
273     std::function<void()> callback_;   // Callback on completion
274   };
275 
276   explicit ThreadedWriter(PersistentCacheTier* const cache, const size_t qdepth,
277                           const size_t io_size);
~ThreadedWriter()278   virtual ~ThreadedWriter() { assert(threads_.empty()); }
279 
280   void Stop() override;
281   void Write(WritableFile* const file, CacheWriteBuffer* buf,
282              const uint64_t file_off,
283              const std::function<void()> callback) override;
284 
285  private:
286   void ThreadMain();
287   void DispatchIO(const IO& io);
288 
289   const size_t io_size_ = 0;
290   BoundedQueue<IO> q_;
291   std::vector<port::Thread> threads_;
292 };
293 
294 }  // namespace ROCKSDB_NAMESPACE
295 
296 #endif
297