1 // Copyright (c) 2019-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 #include "env/composite_env_wrapper.h"
7
8 namespace ROCKSDB_NAMESPACE {
9 namespace {
10 // The CompositeEnvWrapper class provides an interface that is compatible
11 // with the old monolithic Env API, and an implementation that wraps around
12 // the new Env that provides threading and other OS related functionality, and
13 // the new FileSystem API that provides storage functionality. By
14 // providing the old Env interface, it allows the rest of RocksDB code to
15 // be agnostic of whether the underlying Env implementation is a monolithic
16 // Env or an Env + FileSystem. In the former case, the user will specify
17 // Options::env only, whereas in the latter case, the user will specify
18 // Options::env and Options::file_system.
19
20 class CompositeSequentialFileWrapper : public SequentialFile {
21 public:
CompositeSequentialFileWrapper(std::unique_ptr<FSSequentialFile> & target)22 explicit CompositeSequentialFileWrapper(
23 std::unique_ptr<FSSequentialFile>& target)
24 : target_(std::move(target)) {}
25
Read(size_t n,Slice * result,char * scratch)26 Status Read(size_t n, Slice* result, char* scratch) override {
27 IOOptions io_opts;
28 IODebugContext dbg;
29 return target_->Read(n, io_opts, result, scratch, &dbg);
30 }
Skip(uint64_t n)31 Status Skip(uint64_t n) override { return target_->Skip(n); }
use_direct_io() const32 bool use_direct_io() const override { return target_->use_direct_io(); }
GetRequiredBufferAlignment() const33 size_t GetRequiredBufferAlignment() const override {
34 return target_->GetRequiredBufferAlignment();
35 }
InvalidateCache(size_t offset,size_t length)36 Status InvalidateCache(size_t offset, size_t length) override {
37 return target_->InvalidateCache(offset, length);
38 }
PositionedRead(uint64_t offset,size_t n,Slice * result,char * scratch)39 Status PositionedRead(uint64_t offset, size_t n, Slice* result,
40 char* scratch) override {
41 IOOptions io_opts;
42 IODebugContext dbg;
43 return target_->PositionedRead(offset, n, io_opts, result, scratch, &dbg);
44 }
45
46 private:
47 std::unique_ptr<FSSequentialFile> target_;
48 };
49
50 class CompositeRandomAccessFileWrapper : public RandomAccessFile {
51 public:
CompositeRandomAccessFileWrapper(std::unique_ptr<FSRandomAccessFile> & target)52 explicit CompositeRandomAccessFileWrapper(
53 std::unique_ptr<FSRandomAccessFile>& target)
54 : target_(std::move(target)) {}
55
Read(uint64_t offset,size_t n,Slice * result,char * scratch) const56 Status Read(uint64_t offset, size_t n, Slice* result,
57 char* scratch) const override {
58 IOOptions io_opts;
59 IODebugContext dbg;
60 return target_->Read(offset, n, io_opts, result, scratch, &dbg);
61 }
MultiRead(ReadRequest * reqs,size_t num_reqs)62 Status MultiRead(ReadRequest* reqs, size_t num_reqs) override {
63 IOOptions io_opts;
64 IODebugContext dbg;
65 std::vector<FSReadRequest> fs_reqs;
66 Status status;
67
68 fs_reqs.resize(num_reqs);
69 for (size_t i = 0; i < num_reqs; ++i) {
70 fs_reqs[i].offset = reqs[i].offset;
71 fs_reqs[i].len = reqs[i].len;
72 fs_reqs[i].scratch = reqs[i].scratch;
73 fs_reqs[i].status = IOStatus::OK();
74 }
75 status = target_->MultiRead(fs_reqs.data(), num_reqs, io_opts, &dbg);
76 for (size_t i = 0; i < num_reqs; ++i) {
77 reqs[i].result = fs_reqs[i].result;
78 reqs[i].status = fs_reqs[i].status;
79 }
80 return status;
81 }
Prefetch(uint64_t offset,size_t n)82 Status Prefetch(uint64_t offset, size_t n) override {
83 IOOptions io_opts;
84 IODebugContext dbg;
85 return target_->Prefetch(offset, n, io_opts, &dbg);
86 }
GetUniqueId(char * id,size_t max_size) const87 size_t GetUniqueId(char* id, size_t max_size) const override {
88 return target_->GetUniqueId(id, max_size);
89 }
Hint(AccessPattern pattern)90 void Hint(AccessPattern pattern) override {
91 target_->Hint((FSRandomAccessFile::AccessPattern)pattern);
92 }
use_direct_io() const93 bool use_direct_io() const override { return target_->use_direct_io(); }
GetRequiredBufferAlignment() const94 size_t GetRequiredBufferAlignment() const override {
95 return target_->GetRequiredBufferAlignment();
96 }
InvalidateCache(size_t offset,size_t length)97 Status InvalidateCache(size_t offset, size_t length) override {
98 return target_->InvalidateCache(offset, length);
99 }
100
101 private:
102 std::unique_ptr<FSRandomAccessFile> target_;
103 };
104
105 class CompositeWritableFileWrapper : public WritableFile {
106 public:
CompositeWritableFileWrapper(std::unique_ptr<FSWritableFile> & t)107 explicit CompositeWritableFileWrapper(std::unique_ptr<FSWritableFile>& t)
108 : target_(std::move(t)) {}
109
Append(const Slice & data)110 Status Append(const Slice& data) override {
111 IOOptions io_opts;
112 IODebugContext dbg;
113 return target_->Append(data, io_opts, &dbg);
114 }
Append(const Slice & data,const DataVerificationInfo & verification_info)115 Status Append(const Slice& data,
116 const DataVerificationInfo& verification_info) override {
117 IOOptions io_opts;
118 IODebugContext dbg;
119 return target_->Append(data, io_opts, verification_info, &dbg);
120 }
PositionedAppend(const Slice & data,uint64_t offset)121 Status PositionedAppend(const Slice& data, uint64_t offset) override {
122 IOOptions io_opts;
123 IODebugContext dbg;
124 return target_->PositionedAppend(data, offset, io_opts, &dbg);
125 }
PositionedAppend(const Slice & data,uint64_t offset,const DataVerificationInfo & verification_info)126 Status PositionedAppend(
127 const Slice& data, uint64_t offset,
128 const DataVerificationInfo& verification_info) override {
129 IOOptions io_opts;
130 IODebugContext dbg;
131 return target_->PositionedAppend(data, offset, io_opts, verification_info,
132 &dbg);
133 }
Truncate(uint64_t size)134 Status Truncate(uint64_t size) override {
135 IOOptions io_opts;
136 IODebugContext dbg;
137 return target_->Truncate(size, io_opts, &dbg);
138 }
Close()139 Status Close() override {
140 IOOptions io_opts;
141 IODebugContext dbg;
142 return target_->Close(io_opts, &dbg);
143 }
Flush()144 Status Flush() override {
145 IOOptions io_opts;
146 IODebugContext dbg;
147 return target_->Flush(io_opts, &dbg);
148 }
Sync()149 Status Sync() override {
150 IOOptions io_opts;
151 IODebugContext dbg;
152 return target_->Sync(io_opts, &dbg);
153 }
Fsync()154 Status Fsync() override {
155 IOOptions io_opts;
156 IODebugContext dbg;
157 return target_->Fsync(io_opts, &dbg);
158 }
IsSyncThreadSafe() const159 bool IsSyncThreadSafe() const override { return target_->IsSyncThreadSafe(); }
160
use_direct_io() const161 bool use_direct_io() const override { return target_->use_direct_io(); }
162
GetRequiredBufferAlignment() const163 size_t GetRequiredBufferAlignment() const override {
164 return target_->GetRequiredBufferAlignment();
165 }
166
SetWriteLifeTimeHint(Env::WriteLifeTimeHint hint)167 void SetWriteLifeTimeHint(Env::WriteLifeTimeHint hint) override {
168 target_->SetWriteLifeTimeHint(hint);
169 }
170
GetWriteLifeTimeHint()171 Env::WriteLifeTimeHint GetWriteLifeTimeHint() override {
172 return target_->GetWriteLifeTimeHint();
173 }
174
GetFileSize()175 uint64_t GetFileSize() override {
176 IOOptions io_opts;
177 IODebugContext dbg;
178 return target_->GetFileSize(io_opts, &dbg);
179 }
180
SetPreallocationBlockSize(size_t size)181 void SetPreallocationBlockSize(size_t size) override {
182 target_->SetPreallocationBlockSize(size);
183 }
184
GetPreallocationStatus(size_t * block_size,size_t * last_allocated_block)185 void GetPreallocationStatus(size_t* block_size,
186 size_t* last_allocated_block) override {
187 target_->GetPreallocationStatus(block_size, last_allocated_block);
188 }
189
GetUniqueId(char * id,size_t max_size) const190 size_t GetUniqueId(char* id, size_t max_size) const override {
191 return target_->GetUniqueId(id, max_size);
192 }
193
InvalidateCache(size_t offset,size_t length)194 Status InvalidateCache(size_t offset, size_t length) override {
195 return target_->InvalidateCache(offset, length);
196 }
197
RangeSync(uint64_t offset,uint64_t nbytes)198 Status RangeSync(uint64_t offset, uint64_t nbytes) override {
199 IOOptions io_opts;
200 IODebugContext dbg;
201 return target_->RangeSync(offset, nbytes, io_opts, &dbg);
202 }
203
PrepareWrite(size_t offset,size_t len)204 void PrepareWrite(size_t offset, size_t len) override {
205 IOOptions io_opts;
206 IODebugContext dbg;
207 target_->PrepareWrite(offset, len, io_opts, &dbg);
208 }
209
Allocate(uint64_t offset,uint64_t len)210 Status Allocate(uint64_t offset, uint64_t len) override {
211 IOOptions io_opts;
212 IODebugContext dbg;
213 return target_->Allocate(offset, len, io_opts, &dbg);
214 }
215
target()216 std::unique_ptr<FSWritableFile>* target() { return &target_; }
217
218 private:
219 std::unique_ptr<FSWritableFile> target_;
220 };
221
222 class CompositeRandomRWFileWrapper : public RandomRWFile {
223 public:
CompositeRandomRWFileWrapper(std::unique_ptr<FSRandomRWFile> & target)224 explicit CompositeRandomRWFileWrapper(std::unique_ptr<FSRandomRWFile>& target)
225 : target_(std::move(target)) {}
226
use_direct_io() const227 bool use_direct_io() const override { return target_->use_direct_io(); }
GetRequiredBufferAlignment() const228 size_t GetRequiredBufferAlignment() const override {
229 return target_->GetRequiredBufferAlignment();
230 }
Write(uint64_t offset,const Slice & data)231 Status Write(uint64_t offset, const Slice& data) override {
232 IOOptions io_opts;
233 IODebugContext dbg;
234 return target_->Write(offset, data, io_opts, &dbg);
235 }
Read(uint64_t offset,size_t n,Slice * result,char * scratch) const236 Status Read(uint64_t offset, size_t n, Slice* result,
237 char* scratch) const override {
238 IOOptions io_opts;
239 IODebugContext dbg;
240 return target_->Read(offset, n, io_opts, result, scratch, &dbg);
241 }
Flush()242 Status Flush() override {
243 IOOptions io_opts;
244 IODebugContext dbg;
245 return target_->Flush(io_opts, &dbg);
246 }
Sync()247 Status Sync() override {
248 IOOptions io_opts;
249 IODebugContext dbg;
250 return target_->Sync(io_opts, &dbg);
251 }
Fsync()252 Status Fsync() override {
253 IOOptions io_opts;
254 IODebugContext dbg;
255 return target_->Fsync(io_opts, &dbg);
256 }
Close()257 Status Close() override {
258 IOOptions io_opts;
259 IODebugContext dbg;
260 return target_->Close(io_opts, &dbg);
261 }
262
263 private:
264 std::unique_ptr<FSRandomRWFile> target_;
265 };
266
267 class CompositeDirectoryWrapper : public Directory {
268 public:
CompositeDirectoryWrapper(std::unique_ptr<FSDirectory> & target)269 explicit CompositeDirectoryWrapper(std::unique_ptr<FSDirectory>& target)
270 : target_(std::move(target)) {}
271
Fsync()272 Status Fsync() override {
273 IOOptions io_opts;
274 IODebugContext dbg;
275 return target_->Fsync(io_opts, &dbg);
276 }
GetUniqueId(char * id,size_t max_size) const277 size_t GetUniqueId(char* id, size_t max_size) const override {
278 return target_->GetUniqueId(id, max_size);
279 }
280
281 private:
282 std::unique_ptr<FSDirectory> target_;
283 };
284 } // namespace
285
NewSequentialFile(const std::string & f,std::unique_ptr<SequentialFile> * r,const EnvOptions & options)286 Status CompositeEnv::NewSequentialFile(const std::string& f,
287 std::unique_ptr<SequentialFile>* r,
288 const EnvOptions& options) {
289 IODebugContext dbg;
290 std::unique_ptr<FSSequentialFile> file;
291 Status status;
292 status =
293 file_system_->NewSequentialFile(f, FileOptions(options), &file, &dbg);
294 if (status.ok()) {
295 r->reset(new CompositeSequentialFileWrapper(file));
296 }
297 return status;
298 }
299
NewRandomAccessFile(const std::string & f,std::unique_ptr<RandomAccessFile> * r,const EnvOptions & options)300 Status CompositeEnv::NewRandomAccessFile(const std::string& f,
301 std::unique_ptr<RandomAccessFile>* r,
302 const EnvOptions& options) {
303 IODebugContext dbg;
304 std::unique_ptr<FSRandomAccessFile> file;
305 Status status;
306 status =
307 file_system_->NewRandomAccessFile(f, FileOptions(options), &file, &dbg);
308 if (status.ok()) {
309 r->reset(new CompositeRandomAccessFileWrapper(file));
310 }
311 return status;
312 }
313
NewWritableFile(const std::string & f,std::unique_ptr<WritableFile> * r,const EnvOptions & options)314 Status CompositeEnv::NewWritableFile(const std::string& f,
315 std::unique_ptr<WritableFile>* r,
316 const EnvOptions& options) {
317 IODebugContext dbg;
318 std::unique_ptr<FSWritableFile> file;
319 Status status;
320 status = file_system_->NewWritableFile(f, FileOptions(options), &file, &dbg);
321 if (status.ok()) {
322 r->reset(new CompositeWritableFileWrapper(file));
323 }
324 return status;
325 }
326
ReopenWritableFile(const std::string & fname,std::unique_ptr<WritableFile> * result,const EnvOptions & options)327 Status CompositeEnv::ReopenWritableFile(const std::string& fname,
328 std::unique_ptr<WritableFile>* result,
329 const EnvOptions& options) {
330 IODebugContext dbg;
331 Status status;
332 std::unique_ptr<FSWritableFile> file;
333 status = file_system_->ReopenWritableFile(fname, FileOptions(options), &file,
334 &dbg);
335 if (status.ok()) {
336 result->reset(new CompositeWritableFileWrapper(file));
337 }
338 return status;
339 }
340
ReuseWritableFile(const std::string & fname,const std::string & old_fname,std::unique_ptr<WritableFile> * r,const EnvOptions & options)341 Status CompositeEnv::ReuseWritableFile(const std::string& fname,
342 const std::string& old_fname,
343 std::unique_ptr<WritableFile>* r,
344 const EnvOptions& options) {
345 IODebugContext dbg;
346 Status status;
347 std::unique_ptr<FSWritableFile> file;
348 status = file_system_->ReuseWritableFile(fname, old_fname,
349 FileOptions(options), &file, &dbg);
350 if (status.ok()) {
351 r->reset(new CompositeWritableFileWrapper(file));
352 }
353 return status;
354 }
355
NewRandomRWFile(const std::string & fname,std::unique_ptr<RandomRWFile> * result,const EnvOptions & options)356 Status CompositeEnv::NewRandomRWFile(const std::string& fname,
357 std::unique_ptr<RandomRWFile>* result,
358 const EnvOptions& options) {
359 IODebugContext dbg;
360 std::unique_ptr<FSRandomRWFile> file;
361 Status status;
362 status =
363 file_system_->NewRandomRWFile(fname, FileOptions(options), &file, &dbg);
364 if (status.ok()) {
365 result->reset(new CompositeRandomRWFileWrapper(file));
366 }
367 return status;
368 }
369
NewDirectory(const std::string & name,std::unique_ptr<Directory> * result)370 Status CompositeEnv::NewDirectory(const std::string& name,
371 std::unique_ptr<Directory>* result) {
372 IOOptions io_opts;
373 IODebugContext dbg;
374 std::unique_ptr<FSDirectory> dir;
375 Status status;
376 status = file_system_->NewDirectory(name, io_opts, &dir, &dbg);
377 if (status.ok()) {
378 result->reset(new CompositeDirectoryWrapper(dir));
379 }
380 return status;
381 }
382
383 } // namespace ROCKSDB_NAMESPACE
384