1 // Copyright (c) 2019-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 #include "env/composite_env_wrapper.h"
7 
8 namespace ROCKSDB_NAMESPACE {
9 namespace {
10 // The CompositeEnvWrapper class provides an interface that is compatible
11 // with the old monolithic Env API, and an implementation that wraps around
12 // the new Env that provides threading and other OS related functionality, and
13 // the new FileSystem API that provides storage functionality. By
14 // providing the old Env interface, it allows the rest of RocksDB code to
15 // be agnostic of whether the underlying Env implementation is a monolithic
16 // Env or an Env + FileSystem. In the former case, the user will specify
17 // Options::env only, whereas in the latter case, the user will specify
18 // Options::env and Options::file_system.
19 
20 class CompositeSequentialFileWrapper : public SequentialFile {
21  public:
CompositeSequentialFileWrapper(std::unique_ptr<FSSequentialFile> & target)22   explicit CompositeSequentialFileWrapper(
23       std::unique_ptr<FSSequentialFile>& target)
24       : target_(std::move(target)) {}
25 
Read(size_t n,Slice * result,char * scratch)26   Status Read(size_t n, Slice* result, char* scratch) override {
27     IOOptions io_opts;
28     IODebugContext dbg;
29     return target_->Read(n, io_opts, result, scratch, &dbg);
30   }
Skip(uint64_t n)31   Status Skip(uint64_t n) override { return target_->Skip(n); }
use_direct_io() const32   bool use_direct_io() const override { return target_->use_direct_io(); }
GetRequiredBufferAlignment() const33   size_t GetRequiredBufferAlignment() const override {
34     return target_->GetRequiredBufferAlignment();
35   }
InvalidateCache(size_t offset,size_t length)36   Status InvalidateCache(size_t offset, size_t length) override {
37     return target_->InvalidateCache(offset, length);
38   }
PositionedRead(uint64_t offset,size_t n,Slice * result,char * scratch)39   Status PositionedRead(uint64_t offset, size_t n, Slice* result,
40                         char* scratch) override {
41     IOOptions io_opts;
42     IODebugContext dbg;
43     return target_->PositionedRead(offset, n, io_opts, result, scratch, &dbg);
44   }
45 
46  private:
47   std::unique_ptr<FSSequentialFile> target_;
48 };
49 
50 class CompositeRandomAccessFileWrapper : public RandomAccessFile {
51  public:
CompositeRandomAccessFileWrapper(std::unique_ptr<FSRandomAccessFile> & target)52   explicit CompositeRandomAccessFileWrapper(
53       std::unique_ptr<FSRandomAccessFile>& target)
54       : target_(std::move(target)) {}
55 
Read(uint64_t offset,size_t n,Slice * result,char * scratch) const56   Status Read(uint64_t offset, size_t n, Slice* result,
57               char* scratch) const override {
58     IOOptions io_opts;
59     IODebugContext dbg;
60     return target_->Read(offset, n, io_opts, result, scratch, &dbg);
61   }
MultiRead(ReadRequest * reqs,size_t num_reqs)62   Status MultiRead(ReadRequest* reqs, size_t num_reqs) override {
63     IOOptions io_opts;
64     IODebugContext dbg;
65     std::vector<FSReadRequest> fs_reqs;
66     Status status;
67 
68     fs_reqs.resize(num_reqs);
69     for (size_t i = 0; i < num_reqs; ++i) {
70       fs_reqs[i].offset = reqs[i].offset;
71       fs_reqs[i].len = reqs[i].len;
72       fs_reqs[i].scratch = reqs[i].scratch;
73       fs_reqs[i].status = IOStatus::OK();
74     }
75     status = target_->MultiRead(fs_reqs.data(), num_reqs, io_opts, &dbg);
76     for (size_t i = 0; i < num_reqs; ++i) {
77       reqs[i].result = fs_reqs[i].result;
78       reqs[i].status = fs_reqs[i].status;
79     }
80     return status;
81   }
Prefetch(uint64_t offset,size_t n)82   Status Prefetch(uint64_t offset, size_t n) override {
83     IOOptions io_opts;
84     IODebugContext dbg;
85     return target_->Prefetch(offset, n, io_opts, &dbg);
86   }
GetUniqueId(char * id,size_t max_size) const87   size_t GetUniqueId(char* id, size_t max_size) const override {
88     return target_->GetUniqueId(id, max_size);
89   }
Hint(AccessPattern pattern)90   void Hint(AccessPattern pattern) override {
91     target_->Hint((FSRandomAccessFile::AccessPattern)pattern);
92   }
use_direct_io() const93   bool use_direct_io() const override { return target_->use_direct_io(); }
GetRequiredBufferAlignment() const94   size_t GetRequiredBufferAlignment() const override {
95     return target_->GetRequiredBufferAlignment();
96   }
InvalidateCache(size_t offset,size_t length)97   Status InvalidateCache(size_t offset, size_t length) override {
98     return target_->InvalidateCache(offset, length);
99   }
100 
101  private:
102   std::unique_ptr<FSRandomAccessFile> target_;
103 };
104 
105 class CompositeWritableFileWrapper : public WritableFile {
106  public:
CompositeWritableFileWrapper(std::unique_ptr<FSWritableFile> & t)107   explicit CompositeWritableFileWrapper(std::unique_ptr<FSWritableFile>& t)
108       : target_(std::move(t)) {}
109 
Append(const Slice & data)110   Status Append(const Slice& data) override {
111     IOOptions io_opts;
112     IODebugContext dbg;
113     return target_->Append(data, io_opts, &dbg);
114   }
Append(const Slice & data,const DataVerificationInfo & verification_info)115   Status Append(const Slice& data,
116                 const DataVerificationInfo& verification_info) override {
117     IOOptions io_opts;
118     IODebugContext dbg;
119     return target_->Append(data, io_opts, verification_info, &dbg);
120   }
PositionedAppend(const Slice & data,uint64_t offset)121   Status PositionedAppend(const Slice& data, uint64_t offset) override {
122     IOOptions io_opts;
123     IODebugContext dbg;
124     return target_->PositionedAppend(data, offset, io_opts, &dbg);
125   }
PositionedAppend(const Slice & data,uint64_t offset,const DataVerificationInfo & verification_info)126   Status PositionedAppend(
127       const Slice& data, uint64_t offset,
128       const DataVerificationInfo& verification_info) override {
129     IOOptions io_opts;
130     IODebugContext dbg;
131     return target_->PositionedAppend(data, offset, io_opts, verification_info,
132                                      &dbg);
133   }
Truncate(uint64_t size)134   Status Truncate(uint64_t size) override {
135     IOOptions io_opts;
136     IODebugContext dbg;
137     return target_->Truncate(size, io_opts, &dbg);
138   }
Close()139   Status Close() override {
140     IOOptions io_opts;
141     IODebugContext dbg;
142     return target_->Close(io_opts, &dbg);
143   }
Flush()144   Status Flush() override {
145     IOOptions io_opts;
146     IODebugContext dbg;
147     return target_->Flush(io_opts, &dbg);
148   }
Sync()149   Status Sync() override {
150     IOOptions io_opts;
151     IODebugContext dbg;
152     return target_->Sync(io_opts, &dbg);
153   }
Fsync()154   Status Fsync() override {
155     IOOptions io_opts;
156     IODebugContext dbg;
157     return target_->Fsync(io_opts, &dbg);
158   }
IsSyncThreadSafe() const159   bool IsSyncThreadSafe() const override { return target_->IsSyncThreadSafe(); }
160 
use_direct_io() const161   bool use_direct_io() const override { return target_->use_direct_io(); }
162 
GetRequiredBufferAlignment() const163   size_t GetRequiredBufferAlignment() const override {
164     return target_->GetRequiredBufferAlignment();
165   }
166 
SetWriteLifeTimeHint(Env::WriteLifeTimeHint hint)167   void SetWriteLifeTimeHint(Env::WriteLifeTimeHint hint) override {
168     target_->SetWriteLifeTimeHint(hint);
169   }
170 
GetWriteLifeTimeHint()171   Env::WriteLifeTimeHint GetWriteLifeTimeHint() override {
172     return target_->GetWriteLifeTimeHint();
173   }
174 
GetFileSize()175   uint64_t GetFileSize() override {
176     IOOptions io_opts;
177     IODebugContext dbg;
178     return target_->GetFileSize(io_opts, &dbg);
179   }
180 
SetPreallocationBlockSize(size_t size)181   void SetPreallocationBlockSize(size_t size) override {
182     target_->SetPreallocationBlockSize(size);
183   }
184 
GetPreallocationStatus(size_t * block_size,size_t * last_allocated_block)185   void GetPreallocationStatus(size_t* block_size,
186                               size_t* last_allocated_block) override {
187     target_->GetPreallocationStatus(block_size, last_allocated_block);
188   }
189 
GetUniqueId(char * id,size_t max_size) const190   size_t GetUniqueId(char* id, size_t max_size) const override {
191     return target_->GetUniqueId(id, max_size);
192   }
193 
InvalidateCache(size_t offset,size_t length)194   Status InvalidateCache(size_t offset, size_t length) override {
195     return target_->InvalidateCache(offset, length);
196   }
197 
RangeSync(uint64_t offset,uint64_t nbytes)198   Status RangeSync(uint64_t offset, uint64_t nbytes) override {
199     IOOptions io_opts;
200     IODebugContext dbg;
201     return target_->RangeSync(offset, nbytes, io_opts, &dbg);
202   }
203 
PrepareWrite(size_t offset,size_t len)204   void PrepareWrite(size_t offset, size_t len) override {
205     IOOptions io_opts;
206     IODebugContext dbg;
207     target_->PrepareWrite(offset, len, io_opts, &dbg);
208   }
209 
Allocate(uint64_t offset,uint64_t len)210   Status Allocate(uint64_t offset, uint64_t len) override {
211     IOOptions io_opts;
212     IODebugContext dbg;
213     return target_->Allocate(offset, len, io_opts, &dbg);
214   }
215 
target()216   std::unique_ptr<FSWritableFile>* target() { return &target_; }
217 
218  private:
219   std::unique_ptr<FSWritableFile> target_;
220 };
221 
222 class CompositeRandomRWFileWrapper : public RandomRWFile {
223  public:
CompositeRandomRWFileWrapper(std::unique_ptr<FSRandomRWFile> & target)224   explicit CompositeRandomRWFileWrapper(std::unique_ptr<FSRandomRWFile>& target)
225       : target_(std::move(target)) {}
226 
use_direct_io() const227   bool use_direct_io() const override { return target_->use_direct_io(); }
GetRequiredBufferAlignment() const228   size_t GetRequiredBufferAlignment() const override {
229     return target_->GetRequiredBufferAlignment();
230   }
Write(uint64_t offset,const Slice & data)231   Status Write(uint64_t offset, const Slice& data) override {
232     IOOptions io_opts;
233     IODebugContext dbg;
234     return target_->Write(offset, data, io_opts, &dbg);
235   }
Read(uint64_t offset,size_t n,Slice * result,char * scratch) const236   Status Read(uint64_t offset, size_t n, Slice* result,
237               char* scratch) const override {
238     IOOptions io_opts;
239     IODebugContext dbg;
240     return target_->Read(offset, n, io_opts, result, scratch, &dbg);
241   }
Flush()242   Status Flush() override {
243     IOOptions io_opts;
244     IODebugContext dbg;
245     return target_->Flush(io_opts, &dbg);
246   }
Sync()247   Status Sync() override {
248     IOOptions io_opts;
249     IODebugContext dbg;
250     return target_->Sync(io_opts, &dbg);
251   }
Fsync()252   Status Fsync() override {
253     IOOptions io_opts;
254     IODebugContext dbg;
255     return target_->Fsync(io_opts, &dbg);
256   }
Close()257   Status Close() override {
258     IOOptions io_opts;
259     IODebugContext dbg;
260     return target_->Close(io_opts, &dbg);
261   }
262 
263  private:
264   std::unique_ptr<FSRandomRWFile> target_;
265 };
266 
267 class CompositeDirectoryWrapper : public Directory {
268  public:
CompositeDirectoryWrapper(std::unique_ptr<FSDirectory> & target)269   explicit CompositeDirectoryWrapper(std::unique_ptr<FSDirectory>& target)
270       : target_(std::move(target)) {}
271 
Fsync()272   Status Fsync() override {
273     IOOptions io_opts;
274     IODebugContext dbg;
275     return target_->Fsync(io_opts, &dbg);
276   }
GetUniqueId(char * id,size_t max_size) const277   size_t GetUniqueId(char* id, size_t max_size) const override {
278     return target_->GetUniqueId(id, max_size);
279   }
280 
281  private:
282   std::unique_ptr<FSDirectory> target_;
283 };
284 }  // namespace
285 
NewSequentialFile(const std::string & f,std::unique_ptr<SequentialFile> * r,const EnvOptions & options)286 Status CompositeEnv::NewSequentialFile(const std::string& f,
287                                        std::unique_ptr<SequentialFile>* r,
288                                        const EnvOptions& options) {
289   IODebugContext dbg;
290   std::unique_ptr<FSSequentialFile> file;
291   Status status;
292   status =
293       file_system_->NewSequentialFile(f, FileOptions(options), &file, &dbg);
294   if (status.ok()) {
295     r->reset(new CompositeSequentialFileWrapper(file));
296   }
297   return status;
298 }
299 
NewRandomAccessFile(const std::string & f,std::unique_ptr<RandomAccessFile> * r,const EnvOptions & options)300 Status CompositeEnv::NewRandomAccessFile(const std::string& f,
301                                          std::unique_ptr<RandomAccessFile>* r,
302                                          const EnvOptions& options) {
303   IODebugContext dbg;
304   std::unique_ptr<FSRandomAccessFile> file;
305   Status status;
306   status =
307       file_system_->NewRandomAccessFile(f, FileOptions(options), &file, &dbg);
308   if (status.ok()) {
309     r->reset(new CompositeRandomAccessFileWrapper(file));
310   }
311   return status;
312 }
313 
NewWritableFile(const std::string & f,std::unique_ptr<WritableFile> * r,const EnvOptions & options)314 Status CompositeEnv::NewWritableFile(const std::string& f,
315                                      std::unique_ptr<WritableFile>* r,
316                                      const EnvOptions& options) {
317   IODebugContext dbg;
318   std::unique_ptr<FSWritableFile> file;
319   Status status;
320   status = file_system_->NewWritableFile(f, FileOptions(options), &file, &dbg);
321   if (status.ok()) {
322     r->reset(new CompositeWritableFileWrapper(file));
323   }
324   return status;
325 }
326 
ReopenWritableFile(const std::string & fname,std::unique_ptr<WritableFile> * result,const EnvOptions & options)327 Status CompositeEnv::ReopenWritableFile(const std::string& fname,
328                                         std::unique_ptr<WritableFile>* result,
329                                         const EnvOptions& options) {
330   IODebugContext dbg;
331   Status status;
332   std::unique_ptr<FSWritableFile> file;
333   status = file_system_->ReopenWritableFile(fname, FileOptions(options), &file,
334                                             &dbg);
335   if (status.ok()) {
336     result->reset(new CompositeWritableFileWrapper(file));
337   }
338   return status;
339 }
340 
ReuseWritableFile(const std::string & fname,const std::string & old_fname,std::unique_ptr<WritableFile> * r,const EnvOptions & options)341 Status CompositeEnv::ReuseWritableFile(const std::string& fname,
342                                        const std::string& old_fname,
343                                        std::unique_ptr<WritableFile>* r,
344                                        const EnvOptions& options) {
345   IODebugContext dbg;
346   Status status;
347   std::unique_ptr<FSWritableFile> file;
348   status = file_system_->ReuseWritableFile(fname, old_fname,
349                                            FileOptions(options), &file, &dbg);
350   if (status.ok()) {
351     r->reset(new CompositeWritableFileWrapper(file));
352   }
353   return status;
354 }
355 
NewRandomRWFile(const std::string & fname,std::unique_ptr<RandomRWFile> * result,const EnvOptions & options)356 Status CompositeEnv::NewRandomRWFile(const std::string& fname,
357                                      std::unique_ptr<RandomRWFile>* result,
358                                      const EnvOptions& options) {
359   IODebugContext dbg;
360   std::unique_ptr<FSRandomRWFile> file;
361   Status status;
362   status =
363       file_system_->NewRandomRWFile(fname, FileOptions(options), &file, &dbg);
364   if (status.ok()) {
365     result->reset(new CompositeRandomRWFileWrapper(file));
366   }
367   return status;
368 }
369 
NewDirectory(const std::string & name,std::unique_ptr<Directory> * result)370 Status CompositeEnv::NewDirectory(const std::string& name,
371                                   std::unique_ptr<Directory>* result) {
372   IOOptions io_opts;
373   IODebugContext dbg;
374   std::unique_ptr<FSDirectory> dir;
375   Status status;
376   status = file_system_->NewDirectory(name, io_opts, &dir, &dbg);
377   if (status.ok()) {
378     result->reset(new CompositeDirectoryWrapper(dir));
379   }
380   return status;
381 }
382 
383 }  // namespace ROCKSDB_NAMESPACE
384