1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "rocksdb/env.h"
11
12 #include <thread>
13
14 #include "env/composite_env_wrapper.h"
15 #include "env/emulated_clock.h"
16 #include "env/unique_id_gen.h"
17 #include "logging/env_logger.h"
18 #include "memory/arena.h"
19 #include "options/db_options.h"
20 #include "port/port.h"
21 #include "rocksdb/convenience.h"
22 #include "rocksdb/options.h"
23 #include "rocksdb/system_clock.h"
24 #include "rocksdb/utilities/customizable_util.h"
25 #include "rocksdb/utilities/object_registry.h"
26 #include "rocksdb/utilities/options_type.h"
27 #include "util/autovector.h"
28 #include "util/string_util.h"
29
30 namespace ROCKSDB_NAMESPACE {
31 namespace {
32 class LegacySystemClock : public SystemClock {
33 private:
34 Env* env_;
35
36 public:
LegacySystemClock(Env * env)37 explicit LegacySystemClock(Env* env) : env_(env) {}
Name() const38 const char* Name() const override { return "Legacy System Clock"; }
39
40 // Returns the number of micro-seconds since some fixed point in time.
41 // It is often used as system time such as in GenericRateLimiter
42 // and other places so a port needs to return system time in order to work.
NowMicros()43 uint64_t NowMicros() override { return env_->NowMicros(); }
44
45 // Returns the number of nano-seconds since some fixed point in time. Only
46 // useful for computing deltas of time in one run.
47 // Default implementation simply relies on NowMicros.
48 // In platform-specific implementations, NowNanos() should return time points
49 // that are MONOTONIC.
NowNanos()50 uint64_t NowNanos() override { return env_->NowNanos(); }
51
CPUMicros()52 uint64_t CPUMicros() override { return CPUNanos() / 1000; }
CPUNanos()53 uint64_t CPUNanos() override { return env_->NowCPUNanos(); }
54
55 // Sleep/delay the thread for the prescribed number of micro-seconds.
SleepForMicroseconds(int micros)56 void SleepForMicroseconds(int micros) override {
57 env_->SleepForMicroseconds(micros);
58 }
59
60 // Get the number of seconds since the Epoch, 1970-01-01 00:00:00 (UTC).
61 // Only overwrites *unix_time on success.
GetCurrentTime(int64_t * unix_time)62 Status GetCurrentTime(int64_t* unix_time) override {
63 return env_->GetCurrentTime(unix_time);
64 }
65 // Converts seconds-since-Jan-01-1970 to a printable string
TimeToString(uint64_t time)66 std::string TimeToString(uint64_t time) override {
67 return env_->TimeToString(time);
68 }
69 };
70
71 class LegacySequentialFileWrapper : public FSSequentialFile {
72 public:
LegacySequentialFileWrapper(std::unique_ptr<SequentialFile> && _target)73 explicit LegacySequentialFileWrapper(
74 std::unique_ptr<SequentialFile>&& _target)
75 : target_(std::move(_target)) {}
76
Read(size_t n,const IOOptions &,Slice * result,char * scratch,IODebugContext *)77 IOStatus Read(size_t n, const IOOptions& /*options*/, Slice* result,
78 char* scratch, IODebugContext* /*dbg*/) override {
79 return status_to_io_status(target_->Read(n, result, scratch));
80 }
Skip(uint64_t n)81 IOStatus Skip(uint64_t n) override {
82 return status_to_io_status(target_->Skip(n));
83 }
use_direct_io() const84 bool use_direct_io() const override { return target_->use_direct_io(); }
GetRequiredBufferAlignment() const85 size_t GetRequiredBufferAlignment() const override {
86 return target_->GetRequiredBufferAlignment();
87 }
InvalidateCache(size_t offset,size_t length)88 IOStatus InvalidateCache(size_t offset, size_t length) override {
89 return status_to_io_status(target_->InvalidateCache(offset, length));
90 }
PositionedRead(uint64_t offset,size_t n,const IOOptions &,Slice * result,char * scratch,IODebugContext *)91 IOStatus PositionedRead(uint64_t offset, size_t n,
92 const IOOptions& /*options*/, Slice* result,
93 char* scratch, IODebugContext* /*dbg*/) override {
94 return status_to_io_status(
95 target_->PositionedRead(offset, n, result, scratch));
96 }
97
98 private:
99 std::unique_ptr<SequentialFile> target_;
100 };
101
102 class LegacyRandomAccessFileWrapper : public FSRandomAccessFile {
103 public:
LegacyRandomAccessFileWrapper(std::unique_ptr<RandomAccessFile> && target)104 explicit LegacyRandomAccessFileWrapper(
105 std::unique_ptr<RandomAccessFile>&& target)
106 : target_(std::move(target)) {}
107
Read(uint64_t offset,size_t n,const IOOptions &,Slice * result,char * scratch,IODebugContext *) const108 IOStatus Read(uint64_t offset, size_t n, const IOOptions& /*options*/,
109 Slice* result, char* scratch,
110 IODebugContext* /*dbg*/) const override {
111 return status_to_io_status(target_->Read(offset, n, result, scratch));
112 }
113
MultiRead(FSReadRequest * fs_reqs,size_t num_reqs,const IOOptions &,IODebugContext *)114 IOStatus MultiRead(FSReadRequest* fs_reqs, size_t num_reqs,
115 const IOOptions& /*options*/,
116 IODebugContext* /*dbg*/) override {
117 std::vector<ReadRequest> reqs;
118 Status status;
119
120 reqs.reserve(num_reqs);
121 for (size_t i = 0; i < num_reqs; ++i) {
122 ReadRequest req;
123
124 req.offset = fs_reqs[i].offset;
125 req.len = fs_reqs[i].len;
126 req.scratch = fs_reqs[i].scratch;
127 req.status = Status::OK();
128
129 reqs.emplace_back(req);
130 }
131 status = target_->MultiRead(reqs.data(), num_reqs);
132 for (size_t i = 0; i < num_reqs; ++i) {
133 fs_reqs[i].result = reqs[i].result;
134 fs_reqs[i].status = status_to_io_status(std::move(reqs[i].status));
135 }
136 return status_to_io_status(std::move(status));
137 }
138
Prefetch(uint64_t offset,size_t n,const IOOptions &,IODebugContext *)139 IOStatus Prefetch(uint64_t offset, size_t n, const IOOptions& /*options*/,
140 IODebugContext* /*dbg*/) override {
141 return status_to_io_status(target_->Prefetch(offset, n));
142 }
GetUniqueId(char * id,size_t max_size) const143 size_t GetUniqueId(char* id, size_t max_size) const override {
144 return target_->GetUniqueId(id, max_size);
145 }
Hint(AccessPattern pattern)146 void Hint(AccessPattern pattern) override {
147 target_->Hint((RandomAccessFile::AccessPattern)pattern);
148 }
use_direct_io() const149 bool use_direct_io() const override { return target_->use_direct_io(); }
GetRequiredBufferAlignment() const150 size_t GetRequiredBufferAlignment() const override {
151 return target_->GetRequiredBufferAlignment();
152 }
InvalidateCache(size_t offset,size_t length)153 IOStatus InvalidateCache(size_t offset, size_t length) override {
154 return status_to_io_status(target_->InvalidateCache(offset, length));
155 }
156
157 private:
158 std::unique_ptr<RandomAccessFile> target_;
159 };
160
161 class LegacyRandomRWFileWrapper : public FSRandomRWFile {
162 public:
LegacyRandomRWFileWrapper(std::unique_ptr<RandomRWFile> && target)163 explicit LegacyRandomRWFileWrapper(std::unique_ptr<RandomRWFile>&& target)
164 : target_(std::move(target)) {}
165
use_direct_io() const166 bool use_direct_io() const override { return target_->use_direct_io(); }
GetRequiredBufferAlignment() const167 size_t GetRequiredBufferAlignment() const override {
168 return target_->GetRequiredBufferAlignment();
169 }
Write(uint64_t offset,const Slice & data,const IOOptions &,IODebugContext *)170 IOStatus Write(uint64_t offset, const Slice& data,
171 const IOOptions& /*options*/,
172 IODebugContext* /*dbg*/) override {
173 return status_to_io_status(target_->Write(offset, data));
174 }
Read(uint64_t offset,size_t n,const IOOptions &,Slice * result,char * scratch,IODebugContext *) const175 IOStatus Read(uint64_t offset, size_t n, const IOOptions& /*options*/,
176 Slice* result, char* scratch,
177 IODebugContext* /*dbg*/) const override {
178 return status_to_io_status(target_->Read(offset, n, result, scratch));
179 }
Flush(const IOOptions &,IODebugContext *)180 IOStatus Flush(const IOOptions& /*options*/,
181 IODebugContext* /*dbg*/) override {
182 return status_to_io_status(target_->Flush());
183 }
Sync(const IOOptions &,IODebugContext *)184 IOStatus Sync(const IOOptions& /*options*/,
185 IODebugContext* /*dbg*/) override {
186 return status_to_io_status(target_->Sync());
187 }
Fsync(const IOOptions &,IODebugContext *)188 IOStatus Fsync(const IOOptions& /*options*/,
189 IODebugContext* /*dbg*/) override {
190 return status_to_io_status(target_->Fsync());
191 }
Close(const IOOptions &,IODebugContext *)192 IOStatus Close(const IOOptions& /*options*/,
193 IODebugContext* /*dbg*/) override {
194 return status_to_io_status(target_->Close());
195 }
196
197 private:
198 std::unique_ptr<RandomRWFile> target_;
199 };
200
201 class LegacyWritableFileWrapper : public FSWritableFile {
202 public:
LegacyWritableFileWrapper(std::unique_ptr<WritableFile> && _target)203 explicit LegacyWritableFileWrapper(std::unique_ptr<WritableFile>&& _target)
204 : target_(std::move(_target)) {}
205
Append(const Slice & data,const IOOptions &,IODebugContext *)206 IOStatus Append(const Slice& data, const IOOptions& /*options*/,
207 IODebugContext* /*dbg*/) override {
208 return status_to_io_status(target_->Append(data));
209 }
Append(const Slice & data,const IOOptions &,const DataVerificationInfo &,IODebugContext *)210 IOStatus Append(const Slice& data, const IOOptions& /*options*/,
211 const DataVerificationInfo& /*verification_info*/,
212 IODebugContext* /*dbg*/) override {
213 return status_to_io_status(target_->Append(data));
214 }
PositionedAppend(const Slice & data,uint64_t offset,const IOOptions &,IODebugContext *)215 IOStatus PositionedAppend(const Slice& data, uint64_t offset,
216 const IOOptions& /*options*/,
217 IODebugContext* /*dbg*/) override {
218 return status_to_io_status(target_->PositionedAppend(data, offset));
219 }
PositionedAppend(const Slice & data,uint64_t offset,const IOOptions &,const DataVerificationInfo &,IODebugContext *)220 IOStatus PositionedAppend(const Slice& data, uint64_t offset,
221 const IOOptions& /*options*/,
222 const DataVerificationInfo& /*verification_info*/,
223 IODebugContext* /*dbg*/) override {
224 return status_to_io_status(target_->PositionedAppend(data, offset));
225 }
Truncate(uint64_t size,const IOOptions &,IODebugContext *)226 IOStatus Truncate(uint64_t size, const IOOptions& /*options*/,
227 IODebugContext* /*dbg*/) override {
228 return status_to_io_status(target_->Truncate(size));
229 }
Close(const IOOptions &,IODebugContext *)230 IOStatus Close(const IOOptions& /*options*/,
231 IODebugContext* /*dbg*/) override {
232 return status_to_io_status(target_->Close());
233 }
Flush(const IOOptions &,IODebugContext *)234 IOStatus Flush(const IOOptions& /*options*/,
235 IODebugContext* /*dbg*/) override {
236 return status_to_io_status(target_->Flush());
237 }
Sync(const IOOptions &,IODebugContext *)238 IOStatus Sync(const IOOptions& /*options*/,
239 IODebugContext* /*dbg*/) override {
240 return status_to_io_status(target_->Sync());
241 }
Fsync(const IOOptions &,IODebugContext *)242 IOStatus Fsync(const IOOptions& /*options*/,
243 IODebugContext* /*dbg*/) override {
244 return status_to_io_status(target_->Fsync());
245 }
IsSyncThreadSafe() const246 bool IsSyncThreadSafe() const override { return target_->IsSyncThreadSafe(); }
247
use_direct_io() const248 bool use_direct_io() const override { return target_->use_direct_io(); }
249
GetRequiredBufferAlignment() const250 size_t GetRequiredBufferAlignment() const override {
251 return target_->GetRequiredBufferAlignment();
252 }
253
SetWriteLifeTimeHint(Env::WriteLifeTimeHint hint)254 void SetWriteLifeTimeHint(Env::WriteLifeTimeHint hint) override {
255 target_->SetWriteLifeTimeHint(hint);
256 }
257
GetWriteLifeTimeHint()258 Env::WriteLifeTimeHint GetWriteLifeTimeHint() override {
259 return target_->GetWriteLifeTimeHint();
260 }
261
GetFileSize(const IOOptions &,IODebugContext *)262 uint64_t GetFileSize(const IOOptions& /*options*/,
263 IODebugContext* /*dbg*/) override {
264 return target_->GetFileSize();
265 }
266
SetPreallocationBlockSize(size_t size)267 void SetPreallocationBlockSize(size_t size) override {
268 target_->SetPreallocationBlockSize(size);
269 }
270
GetPreallocationStatus(size_t * block_size,size_t * last_allocated_block)271 void GetPreallocationStatus(size_t* block_size,
272 size_t* last_allocated_block) override {
273 target_->GetPreallocationStatus(block_size, last_allocated_block);
274 }
275
GetUniqueId(char * id,size_t max_size) const276 size_t GetUniqueId(char* id, size_t max_size) const override {
277 return target_->GetUniqueId(id, max_size);
278 }
279
InvalidateCache(size_t offset,size_t length)280 IOStatus InvalidateCache(size_t offset, size_t length) override {
281 return status_to_io_status(target_->InvalidateCache(offset, length));
282 }
283
RangeSync(uint64_t offset,uint64_t nbytes,const IOOptions &,IODebugContext *)284 IOStatus RangeSync(uint64_t offset, uint64_t nbytes,
285 const IOOptions& /*options*/,
286 IODebugContext* /*dbg*/) override {
287 return status_to_io_status(target_->RangeSync(offset, nbytes));
288 }
289
PrepareWrite(size_t offset,size_t len,const IOOptions &,IODebugContext *)290 void PrepareWrite(size_t offset, size_t len, const IOOptions& /*options*/,
291 IODebugContext* /*dbg*/) override {
292 target_->PrepareWrite(offset, len);
293 }
294
Allocate(uint64_t offset,uint64_t len,const IOOptions &,IODebugContext *)295 IOStatus Allocate(uint64_t offset, uint64_t len, const IOOptions& /*options*/,
296 IODebugContext* /*dbg*/) override {
297 return status_to_io_status(target_->Allocate(offset, len));
298 }
299
300 private:
301 std::unique_ptr<WritableFile> target_;
302 };
303
304 class LegacyDirectoryWrapper : public FSDirectory {
305 public:
LegacyDirectoryWrapper(std::unique_ptr<Directory> && target)306 explicit LegacyDirectoryWrapper(std::unique_ptr<Directory>&& target)
307 : target_(std::move(target)) {}
308
Fsync(const IOOptions &,IODebugContext *)309 IOStatus Fsync(const IOOptions& /*options*/,
310 IODebugContext* /*dbg*/) override {
311 return status_to_io_status(target_->Fsync());
312 }
GetUniqueId(char * id,size_t max_size) const313 size_t GetUniqueId(char* id, size_t max_size) const override {
314 return target_->GetUniqueId(id, max_size);
315 }
316
317 private:
318 std::unique_ptr<Directory> target_;
319 };
320
321 class LegacyFileSystemWrapper : public FileSystem {
322 public:
323 // Initialize an EnvWrapper that delegates all calls to *t
LegacyFileSystemWrapper(Env * t)324 explicit LegacyFileSystemWrapper(Env* t) : target_(t) {}
~LegacyFileSystemWrapper()325 ~LegacyFileSystemWrapper() override {}
326
Name() const327 const char* Name() const override { return "Legacy File System"; }
328
329 // Return the target to which this Env forwards all calls
target() const330 Env* target() const { return target_; }
331
332 // The following text is boilerplate that forwards all methods to target()
NewSequentialFile(const std::string & f,const FileOptions & file_opts,std::unique_ptr<FSSequentialFile> * r,IODebugContext *)333 IOStatus NewSequentialFile(const std::string& f, const FileOptions& file_opts,
334 std::unique_ptr<FSSequentialFile>* r,
335 IODebugContext* /*dbg*/) override {
336 std::unique_ptr<SequentialFile> file;
337 Status s = target_->NewSequentialFile(f, &file, file_opts);
338 if (s.ok()) {
339 r->reset(new LegacySequentialFileWrapper(std::move(file)));
340 }
341 return status_to_io_status(std::move(s));
342 }
NewRandomAccessFile(const std::string & f,const FileOptions & file_opts,std::unique_ptr<FSRandomAccessFile> * r,IODebugContext *)343 IOStatus NewRandomAccessFile(const std::string& f,
344 const FileOptions& file_opts,
345 std::unique_ptr<FSRandomAccessFile>* r,
346 IODebugContext* /*dbg*/) override {
347 std::unique_ptr<RandomAccessFile> file;
348 Status s = target_->NewRandomAccessFile(f, &file, file_opts);
349 if (s.ok()) {
350 r->reset(new LegacyRandomAccessFileWrapper(std::move(file)));
351 }
352 return status_to_io_status(std::move(s));
353 }
NewWritableFile(const std::string & f,const FileOptions & file_opts,std::unique_ptr<FSWritableFile> * r,IODebugContext *)354 IOStatus NewWritableFile(const std::string& f, const FileOptions& file_opts,
355 std::unique_ptr<FSWritableFile>* r,
356 IODebugContext* /*dbg*/) override {
357 std::unique_ptr<WritableFile> file;
358 Status s = target_->NewWritableFile(f, &file, file_opts);
359 if (s.ok()) {
360 r->reset(new LegacyWritableFileWrapper(std::move(file)));
361 }
362 return status_to_io_status(std::move(s));
363 }
ReopenWritableFile(const std::string & fname,const FileOptions & file_opts,std::unique_ptr<FSWritableFile> * result,IODebugContext *)364 IOStatus ReopenWritableFile(const std::string& fname,
365 const FileOptions& file_opts,
366 std::unique_ptr<FSWritableFile>* result,
367 IODebugContext* /*dbg*/) override {
368 std::unique_ptr<WritableFile> file;
369 Status s = target_->ReopenWritableFile(fname, &file, file_opts);
370 if (s.ok()) {
371 result->reset(new LegacyWritableFileWrapper(std::move(file)));
372 }
373 return status_to_io_status(std::move(s));
374 }
ReuseWritableFile(const std::string & fname,const std::string & old_fname,const FileOptions & file_opts,std::unique_ptr<FSWritableFile> * r,IODebugContext *)375 IOStatus ReuseWritableFile(const std::string& fname,
376 const std::string& old_fname,
377 const FileOptions& file_opts,
378 std::unique_ptr<FSWritableFile>* r,
379 IODebugContext* /*dbg*/) override {
380 std::unique_ptr<WritableFile> file;
381 Status s = target_->ReuseWritableFile(fname, old_fname, &file, file_opts);
382 if (s.ok()) {
383 r->reset(new LegacyWritableFileWrapper(std::move(file)));
384 }
385 return status_to_io_status(std::move(s));
386 }
NewRandomRWFile(const std::string & fname,const FileOptions & file_opts,std::unique_ptr<FSRandomRWFile> * result,IODebugContext *)387 IOStatus NewRandomRWFile(const std::string& fname,
388 const FileOptions& file_opts,
389 std::unique_ptr<FSRandomRWFile>* result,
390 IODebugContext* /*dbg*/) override {
391 std::unique_ptr<RandomRWFile> file;
392 Status s = target_->NewRandomRWFile(fname, &file, file_opts);
393 if (s.ok()) {
394 result->reset(new LegacyRandomRWFileWrapper(std::move(file)));
395 }
396 return status_to_io_status(std::move(s));
397 }
NewMemoryMappedFileBuffer(const std::string & fname,std::unique_ptr<MemoryMappedFileBuffer> * result)398 IOStatus NewMemoryMappedFileBuffer(
399 const std::string& fname,
400 std::unique_ptr<MemoryMappedFileBuffer>* result) override {
401 return status_to_io_status(
402 target_->NewMemoryMappedFileBuffer(fname, result));
403 }
NewDirectory(const std::string & name,const IOOptions &,std::unique_ptr<FSDirectory> * result,IODebugContext *)404 IOStatus NewDirectory(const std::string& name, const IOOptions& /*io_opts*/,
405 std::unique_ptr<FSDirectory>* result,
406 IODebugContext* /*dbg*/) override {
407 std::unique_ptr<Directory> dir;
408 Status s = target_->NewDirectory(name, &dir);
409 if (s.ok()) {
410 result->reset(new LegacyDirectoryWrapper(std::move(dir)));
411 }
412 return status_to_io_status(std::move(s));
413 }
FileExists(const std::string & f,const IOOptions &,IODebugContext *)414 IOStatus FileExists(const std::string& f, const IOOptions& /*io_opts*/,
415 IODebugContext* /*dbg*/) override {
416 return status_to_io_status(target_->FileExists(f));
417 }
GetChildren(const std::string & dir,const IOOptions &,std::vector<std::string> * r,IODebugContext *)418 IOStatus GetChildren(const std::string& dir, const IOOptions& /*io_opts*/,
419 std::vector<std::string>* r,
420 IODebugContext* /*dbg*/) override {
421 return status_to_io_status(target_->GetChildren(dir, r));
422 }
GetChildrenFileAttributes(const std::string & dir,const IOOptions &,std::vector<FileAttributes> * result,IODebugContext *)423 IOStatus GetChildrenFileAttributes(const std::string& dir,
424 const IOOptions& /*options*/,
425 std::vector<FileAttributes>* result,
426 IODebugContext* /*dbg*/) override {
427 return status_to_io_status(target_->GetChildrenFileAttributes(dir, result));
428 }
DeleteFile(const std::string & f,const IOOptions &,IODebugContext *)429 IOStatus DeleteFile(const std::string& f, const IOOptions& /*options*/,
430 IODebugContext* /*dbg*/) override {
431 return status_to_io_status(target_->DeleteFile(f));
432 }
Truncate(const std::string & fname,size_t size,const IOOptions &,IODebugContext *)433 IOStatus Truncate(const std::string& fname, size_t size,
434 const IOOptions& /*options*/,
435 IODebugContext* /*dbg*/) override {
436 return status_to_io_status(target_->Truncate(fname, size));
437 }
CreateDir(const std::string & d,const IOOptions &,IODebugContext *)438 IOStatus CreateDir(const std::string& d, const IOOptions& /*options*/,
439 IODebugContext* /*dbg*/) override {
440 return status_to_io_status(target_->CreateDir(d));
441 }
CreateDirIfMissing(const std::string & d,const IOOptions &,IODebugContext *)442 IOStatus CreateDirIfMissing(const std::string& d,
443 const IOOptions& /*options*/,
444 IODebugContext* /*dbg*/) override {
445 return status_to_io_status(target_->CreateDirIfMissing(d));
446 }
DeleteDir(const std::string & d,const IOOptions &,IODebugContext *)447 IOStatus DeleteDir(const std::string& d, const IOOptions& /*options*/,
448 IODebugContext* /*dbg*/) override {
449 return status_to_io_status(target_->DeleteDir(d));
450 }
GetFileSize(const std::string & f,const IOOptions &,uint64_t * s,IODebugContext *)451 IOStatus GetFileSize(const std::string& f, const IOOptions& /*options*/,
452 uint64_t* s, IODebugContext* /*dbg*/) override {
453 return status_to_io_status(target_->GetFileSize(f, s));
454 }
455
GetFileModificationTime(const std::string & fname,const IOOptions &,uint64_t * file_mtime,IODebugContext *)456 IOStatus GetFileModificationTime(const std::string& fname,
457 const IOOptions& /*options*/,
458 uint64_t* file_mtime,
459 IODebugContext* /*dbg*/) override {
460 return status_to_io_status(
461 target_->GetFileModificationTime(fname, file_mtime));
462 }
463
GetAbsolutePath(const std::string & db_path,const IOOptions &,std::string * output_path,IODebugContext *)464 IOStatus GetAbsolutePath(const std::string& db_path,
465 const IOOptions& /*options*/,
466 std::string* output_path,
467 IODebugContext* /*dbg*/) override {
468 return status_to_io_status(target_->GetAbsolutePath(db_path, output_path));
469 }
470
RenameFile(const std::string & s,const std::string & t,const IOOptions &,IODebugContext *)471 IOStatus RenameFile(const std::string& s, const std::string& t,
472 const IOOptions& /*options*/,
473 IODebugContext* /*dbg*/) override {
474 return status_to_io_status(target_->RenameFile(s, t));
475 }
476
LinkFile(const std::string & s,const std::string & t,const IOOptions &,IODebugContext *)477 IOStatus LinkFile(const std::string& s, const std::string& t,
478 const IOOptions& /*options*/,
479 IODebugContext* /*dbg*/) override {
480 return status_to_io_status(target_->LinkFile(s, t));
481 }
482
NumFileLinks(const std::string & fname,const IOOptions &,uint64_t * count,IODebugContext *)483 IOStatus NumFileLinks(const std::string& fname, const IOOptions& /*options*/,
484 uint64_t* count, IODebugContext* /*dbg*/) override {
485 return status_to_io_status(target_->NumFileLinks(fname, count));
486 }
487
AreFilesSame(const std::string & first,const std::string & second,const IOOptions &,bool * res,IODebugContext *)488 IOStatus AreFilesSame(const std::string& first, const std::string& second,
489 const IOOptions& /*options*/, bool* res,
490 IODebugContext* /*dbg*/) override {
491 return status_to_io_status(target_->AreFilesSame(first, second, res));
492 }
493
LockFile(const std::string & f,const IOOptions &,FileLock ** l,IODebugContext *)494 IOStatus LockFile(const std::string& f, const IOOptions& /*options*/,
495 FileLock** l, IODebugContext* /*dbg*/) override {
496 return status_to_io_status(target_->LockFile(f, l));
497 }
498
UnlockFile(FileLock * l,const IOOptions &,IODebugContext *)499 IOStatus UnlockFile(FileLock* l, const IOOptions& /*options*/,
500 IODebugContext* /*dbg*/) override {
501 return status_to_io_status(target_->UnlockFile(l));
502 }
503
GetTestDirectory(const IOOptions &,std::string * path,IODebugContext *)504 IOStatus GetTestDirectory(const IOOptions& /*options*/, std::string* path,
505 IODebugContext* /*dbg*/) override {
506 return status_to_io_status(target_->GetTestDirectory(path));
507 }
NewLogger(const std::string & fname,const IOOptions &,std::shared_ptr<Logger> * result,IODebugContext *)508 IOStatus NewLogger(const std::string& fname, const IOOptions& /*options*/,
509 std::shared_ptr<Logger>* result,
510 IODebugContext* /*dbg*/) override {
511 return status_to_io_status(target_->NewLogger(fname, result));
512 }
513
SanitizeFileOptions(FileOptions * opts) const514 void SanitizeFileOptions(FileOptions* opts) const override {
515 target_->SanitizeEnvOptions(opts);
516 }
517
OptimizeForLogRead(const FileOptions & file_options) const518 FileOptions OptimizeForLogRead(
519 const FileOptions& file_options) const override {
520 return target_->OptimizeForLogRead(file_options);
521 }
OptimizeForManifestRead(const FileOptions & file_options) const522 FileOptions OptimizeForManifestRead(
523 const FileOptions& file_options) const override {
524 return target_->OptimizeForManifestRead(file_options);
525 }
OptimizeForLogWrite(const FileOptions & file_options,const DBOptions & db_options) const526 FileOptions OptimizeForLogWrite(const FileOptions& file_options,
527 const DBOptions& db_options) const override {
528 return target_->OptimizeForLogWrite(file_options, db_options);
529 }
OptimizeForManifestWrite(const FileOptions & file_options) const530 FileOptions OptimizeForManifestWrite(
531 const FileOptions& file_options) const override {
532 return target_->OptimizeForManifestWrite(file_options);
533 }
OptimizeForCompactionTableWrite(const FileOptions & file_options,const ImmutableDBOptions & immutable_ops) const534 FileOptions OptimizeForCompactionTableWrite(
535 const FileOptions& file_options,
536 const ImmutableDBOptions& immutable_ops) const override {
537 return target_->OptimizeForCompactionTableWrite(file_options,
538 immutable_ops);
539 }
OptimizeForCompactionTableRead(const FileOptions & file_options,const ImmutableDBOptions & db_options) const540 FileOptions OptimizeForCompactionTableRead(
541 const FileOptions& file_options,
542 const ImmutableDBOptions& db_options) const override {
543 return target_->OptimizeForCompactionTableRead(file_options, db_options);
544 }
OptimizeForBlobFileRead(const FileOptions & file_options,const ImmutableDBOptions & db_options) const545 FileOptions OptimizeForBlobFileRead(
546 const FileOptions& file_options,
547 const ImmutableDBOptions& db_options) const override {
548 return target_->OptimizeForBlobFileRead(file_options, db_options);
549 }
550
551 #ifdef GetFreeSpace
552 #undef GetFreeSpace
553 #endif
GetFreeSpace(const std::string & path,const IOOptions &,uint64_t * diskfree,IODebugContext *)554 IOStatus GetFreeSpace(const std::string& path, const IOOptions& /*options*/,
555 uint64_t* diskfree, IODebugContext* /*dbg*/) override {
556 return status_to_io_status(target_->GetFreeSpace(path, diskfree));
557 }
IsDirectory(const std::string & path,const IOOptions &,bool * is_dir,IODebugContext *)558 IOStatus IsDirectory(const std::string& path, const IOOptions& /*options*/,
559 bool* is_dir, IODebugContext* /*dbg*/) override {
560 return status_to_io_status(target_->IsDirectory(path, is_dir));
561 }
562
563 private:
564 Env* target_;
565 };
566 } // end anonymous namespace
567
Env()568 Env::Env() : thread_status_updater_(nullptr) {
569 file_system_ = std::make_shared<LegacyFileSystemWrapper>(this);
570 system_clock_ = std::make_shared<LegacySystemClock>(this);
571 }
572
Env(const std::shared_ptr<FileSystem> & fs)573 Env::Env(const std::shared_ptr<FileSystem>& fs)
574 : thread_status_updater_(nullptr), file_system_(fs) {
575 system_clock_ = std::make_shared<LegacySystemClock>(this);
576 }
577
Env(const std::shared_ptr<FileSystem> & fs,const std::shared_ptr<SystemClock> & clock)578 Env::Env(const std::shared_ptr<FileSystem>& fs,
579 const std::shared_ptr<SystemClock>& clock)
580 : thread_status_updater_(nullptr), file_system_(fs), system_clock_(clock) {}
581
~Env()582 Env::~Env() {
583 }
584
NewLogger(const std::string & fname,std::shared_ptr<Logger> * result)585 Status Env::NewLogger(const std::string& fname,
586 std::shared_ptr<Logger>* result) {
587 return NewEnvLogger(fname, this, result);
588 }
589
LoadEnv(const std::string & value,Env ** result)590 Status Env::LoadEnv(const std::string& value, Env** result) {
591 return CreateFromString(ConfigOptions(), value, result);
592 }
593
CreateFromString(const ConfigOptions & config_options,const std::string & value,Env ** result)594 Status Env::CreateFromString(const ConfigOptions& config_options,
595 const std::string& value, Env** result) {
596 Env* env = *result;
597 Status s;
598 #ifndef ROCKSDB_LITE
599 (void)config_options;
600 s = ObjectRegistry::NewInstance()->NewStaticObject<Env>(value, &env);
601 #else
602 (void)config_options;
603 s = Status::NotSupported("Cannot load environment in LITE mode", value);
604 #endif
605 if (s.ok()) {
606 *result = env;
607 }
608 return s;
609 }
610
LoadEnv(const std::string & value,Env ** result,std::shared_ptr<Env> * guard)611 Status Env::LoadEnv(const std::string& value, Env** result,
612 std::shared_ptr<Env>* guard) {
613 return CreateFromString(ConfigOptions(), value, result, guard);
614 }
615
CreateFromString(const ConfigOptions & config_options,const std::string & value,Env ** result,std::shared_ptr<Env> * guard)616 Status Env::CreateFromString(const ConfigOptions& config_options,
617 const std::string& value, Env** result,
618 std::shared_ptr<Env>* guard) {
619 assert(result);
620 if (value.empty()) {
621 *result = Env::Default();
622 return Status::OK();
623 }
624 Status s;
625 #ifndef ROCKSDB_LITE
626 Env* env = nullptr;
627 std::unique_ptr<Env> uniq_guard;
628 std::string err_msg;
629 assert(guard != nullptr);
630 (void)config_options;
631 env = ObjectRegistry::NewInstance()->NewObject<Env>(value, &uniq_guard,
632 &err_msg);
633 if (!env) {
634 s = Status::NotSupported(std::string("Cannot load ") + Env::Type() + ": " +
635 value);
636 env = Env::Default();
637 }
638 if (s.ok() && uniq_guard) {
639 guard->reset(uniq_guard.release());
640 *result = guard->get();
641 } else {
642 *result = env;
643 }
644 #else
645 (void)config_options;
646 (void)result;
647 (void)guard;
648 s = Status::NotSupported("Cannot load environment in LITE mode", value);
649 #endif
650 return s;
651 }
652
CreateFromUri(const ConfigOptions & config_options,const std::string & env_uri,const std::string & fs_uri,Env ** result,std::shared_ptr<Env> * guard)653 Status Env::CreateFromUri(const ConfigOptions& config_options,
654 const std::string& env_uri, const std::string& fs_uri,
655 Env** result, std::shared_ptr<Env>* guard) {
656 *result = config_options.env;
657 if (env_uri.empty() && fs_uri.empty()) {
658 // Neither specified. Use the default
659 guard->reset();
660 return Status::OK();
661 } else if (!env_uri.empty() && !fs_uri.empty()) {
662 // Both specified. Cannot choose. Return Invalid
663 return Status::InvalidArgument("cannot specify both fs_uri and env_uri");
664 } else if (fs_uri.empty()) { // Only have an ENV URI. Create an Env from it
665 return CreateFromString(config_options, env_uri, result, guard);
666 } else {
667 std::shared_ptr<FileSystem> fs;
668 Status s = FileSystem::CreateFromString(config_options, fs_uri, &fs);
669 if (s.ok()) {
670 guard->reset(new CompositeEnvWrapper(*result, fs));
671 *result = guard->get();
672 }
673 return s;
674 }
675 }
676
PriorityToString(Env::Priority priority)677 std::string Env::PriorityToString(Env::Priority priority) {
678 switch (priority) {
679 case Env::Priority::BOTTOM:
680 return "Bottom";
681 case Env::Priority::LOW:
682 return "Low";
683 case Env::Priority::HIGH:
684 return "High";
685 case Env::Priority::USER:
686 return "User";
687 case Env::Priority::TOTAL:
688 assert(false);
689 }
690 return "Invalid";
691 }
692
GetThreadID() const693 uint64_t Env::GetThreadID() const {
694 std::hash<std::thread::id> hasher;
695 return hasher(std::this_thread::get_id());
696 }
697
ReuseWritableFile(const std::string & fname,const std::string & old_fname,std::unique_ptr<WritableFile> * result,const EnvOptions & options)698 Status Env::ReuseWritableFile(const std::string& fname,
699 const std::string& old_fname,
700 std::unique_ptr<WritableFile>* result,
701 const EnvOptions& options) {
702 Status s = RenameFile(old_fname, fname);
703 if (!s.ok()) {
704 return s;
705 }
706 return NewWritableFile(fname, result, options);
707 }
708
GetChildrenFileAttributes(const std::string & dir,std::vector<FileAttributes> * result)709 Status Env::GetChildrenFileAttributes(const std::string& dir,
710 std::vector<FileAttributes>* result) {
711 assert(result != nullptr);
712 std::vector<std::string> child_fnames;
713 Status s = GetChildren(dir, &child_fnames);
714 if (!s.ok()) {
715 return s;
716 }
717 result->resize(child_fnames.size());
718 size_t result_size = 0;
719 for (size_t i = 0; i < child_fnames.size(); ++i) {
720 const std::string path = dir + "/" + child_fnames[i];
721 if (!(s = GetFileSize(path, &(*result)[result_size].size_bytes)).ok()) {
722 if (FileExists(path).IsNotFound()) {
723 // The file may have been deleted since we listed the directory
724 continue;
725 }
726 return s;
727 }
728 (*result)[result_size].name = std::move(child_fnames[i]);
729 result_size++;
730 }
731 result->resize(result_size);
732 return Status::OK();
733 }
734
GetHostNameString(std::string * result)735 Status Env::GetHostNameString(std::string* result) {
736 std::array<char, kMaxHostNameLen> hostname_buf;
737 Status s = GetHostName(hostname_buf.data(), hostname_buf.size());
738 if (s.ok()) {
739 hostname_buf[hostname_buf.size() - 1] = '\0';
740 result->assign(hostname_buf.data());
741 }
742 return s;
743 }
744
GenerateUniqueId()745 std::string Env::GenerateUniqueId() {
746 std::string result;
747 bool success = port::GenerateRfcUuid(&result);
748 if (!success) {
749 // Fall back on our own way of generating a unique ID and adapt it to
750 // RFC 4122 variant 1 version 4 (a random ID).
751 // https://en.wikipedia.org/wiki/Universally_unique_identifier
752 // We already tried GenerateRfcUuid so no need to try it again in
753 // GenerateRawUniqueId
754 constexpr bool exclude_port_uuid = true;
755 uint64_t upper, lower;
756 GenerateRawUniqueId(&upper, &lower, exclude_port_uuid);
757
758 // Set 4-bit version to 4
759 upper = (upper & (~uint64_t{0xf000})) | 0x4000;
760 // Set unary-encoded variant to 1 (0b10)
761 lower = (lower & (~(uint64_t{3} << 62))) | (uint64_t{2} << 62);
762
763 // Use 36 character format of RFC 4122
764 result.resize(36U);
765 char* buf = &result[0];
766 PutBaseChars<16>(&buf, 8, upper >> 32, /*!uppercase*/ false);
767 *(buf++) = '-';
768 PutBaseChars<16>(&buf, 4, upper >> 16, /*!uppercase*/ false);
769 *(buf++) = '-';
770 PutBaseChars<16>(&buf, 4, upper, /*!uppercase*/ false);
771 *(buf++) = '-';
772 PutBaseChars<16>(&buf, 4, lower >> 48, /*!uppercase*/ false);
773 *(buf++) = '-';
774 PutBaseChars<16>(&buf, 12, lower, /*!uppercase*/ false);
775 assert(buf == &result[36]);
776
777 // Verify variant 1 version 4
778 assert(result[14] == '4');
779 assert(result[19] == '8' || result[19] == '9' || result[19] == 'a' ||
780 result[19] == 'b');
781 }
782 return result;
783 }
784
~SequentialFile()785 SequentialFile::~SequentialFile() {
786 }
787
~RandomAccessFile()788 RandomAccessFile::~RandomAccessFile() {
789 }
790
~WritableFile()791 WritableFile::~WritableFile() {
792 }
793
~MemoryMappedFileBuffer()794 MemoryMappedFileBuffer::~MemoryMappedFileBuffer() {}
795
~Logger()796 Logger::~Logger() {}
797
Close()798 Status Logger::Close() {
799 if (!closed_) {
800 closed_ = true;
801 return CloseImpl();
802 } else {
803 return Status::OK();
804 }
805 }
806
CloseImpl()807 Status Logger::CloseImpl() { return Status::NotSupported(); }
808
~FileLock()809 FileLock::~FileLock() {
810 }
811
LogFlush(Logger * info_log)812 void LogFlush(Logger *info_log) {
813 if (info_log) {
814 info_log->Flush();
815 }
816 }
817
Logv(Logger * info_log,const char * format,va_list ap)818 static void Logv(Logger *info_log, const char* format, va_list ap) {
819 if (info_log && info_log->GetInfoLogLevel() <= InfoLogLevel::INFO_LEVEL) {
820 info_log->Logv(InfoLogLevel::INFO_LEVEL, format, ap);
821 }
822 }
823
Log(Logger * info_log,const char * format,...)824 void Log(Logger* info_log, const char* format, ...) {
825 va_list ap;
826 va_start(ap, format);
827 Logv(info_log, format, ap);
828 va_end(ap);
829 }
830
Logv(const InfoLogLevel log_level,const char * format,va_list ap)831 void Logger::Logv(const InfoLogLevel log_level, const char* format, va_list ap) {
832 static const char* kInfoLogLevelNames[5] = { "DEBUG", "INFO", "WARN",
833 "ERROR", "FATAL" };
834 if (log_level < log_level_) {
835 return;
836 }
837
838 if (log_level == InfoLogLevel::INFO_LEVEL) {
839 // Doesn't print log level if it is INFO level.
840 // This is to avoid unexpected performance regression after we add
841 // the feature of log level. All the logs before we add the feature
842 // are INFO level. We don't want to add extra costs to those existing
843 // logging.
844 Logv(format, ap);
845 } else if (log_level == InfoLogLevel::HEADER_LEVEL) {
846 LogHeader(format, ap);
847 } else {
848 char new_format[500];
849 snprintf(new_format, sizeof(new_format) - 1, "[%s] %s",
850 kInfoLogLevelNames[log_level], format);
851 Logv(new_format, ap);
852 }
853
854 if (log_level >= InfoLogLevel::WARN_LEVEL &&
855 log_level != InfoLogLevel::HEADER_LEVEL) {
856 // Log messages with severity of warning or higher should be rare and are
857 // sometimes followed by an unclean crash. We want to be sure important
858 // messages are not lost in an application buffer when that happens.
859 Flush();
860 }
861 }
862
Logv(const InfoLogLevel log_level,Logger * info_log,const char * format,va_list ap)863 static void Logv(const InfoLogLevel log_level, Logger *info_log, const char *format, va_list ap) {
864 if (info_log && info_log->GetInfoLogLevel() <= log_level) {
865 if (log_level == InfoLogLevel::HEADER_LEVEL) {
866 info_log->LogHeader(format, ap);
867 } else {
868 info_log->Logv(log_level, format, ap);
869 }
870 }
871 }
872
Log(const InfoLogLevel log_level,Logger * info_log,const char * format,...)873 void Log(const InfoLogLevel log_level, Logger* info_log, const char* format,
874 ...) {
875 va_list ap;
876 va_start(ap, format);
877 Logv(log_level, info_log, format, ap);
878 va_end(ap);
879 }
880
Headerv(Logger * info_log,const char * format,va_list ap)881 static void Headerv(Logger *info_log, const char *format, va_list ap) {
882 if (info_log) {
883 info_log->LogHeader(format, ap);
884 }
885 }
886
Header(Logger * info_log,const char * format,...)887 void Header(Logger* info_log, const char* format, ...) {
888 va_list ap;
889 va_start(ap, format);
890 Headerv(info_log, format, ap);
891 va_end(ap);
892 }
893
Debugv(Logger * info_log,const char * format,va_list ap)894 static void Debugv(Logger* info_log, const char* format, va_list ap) {
895 if (info_log && info_log->GetInfoLogLevel() <= InfoLogLevel::DEBUG_LEVEL) {
896 info_log->Logv(InfoLogLevel::DEBUG_LEVEL, format, ap);
897 }
898 }
899
Debug(Logger * info_log,const char * format,...)900 void Debug(Logger* info_log, const char* format, ...) {
901 va_list ap;
902 va_start(ap, format);
903 Debugv(info_log, format, ap);
904 va_end(ap);
905 }
906
Infov(Logger * info_log,const char * format,va_list ap)907 static void Infov(Logger* info_log, const char* format, va_list ap) {
908 if (info_log && info_log->GetInfoLogLevel() <= InfoLogLevel::INFO_LEVEL) {
909 info_log->Logv(InfoLogLevel::INFO_LEVEL, format, ap);
910 }
911 }
912
Info(Logger * info_log,const char * format,...)913 void Info(Logger* info_log, const char* format, ...) {
914 va_list ap;
915 va_start(ap, format);
916 Infov(info_log, format, ap);
917 va_end(ap);
918 }
919
Warnv(Logger * info_log,const char * format,va_list ap)920 static void Warnv(Logger* info_log, const char* format, va_list ap) {
921 if (info_log && info_log->GetInfoLogLevel() <= InfoLogLevel::WARN_LEVEL) {
922 info_log->Logv(InfoLogLevel::WARN_LEVEL, format, ap);
923 }
924 }
925
Warn(Logger * info_log,const char * format,...)926 void Warn(Logger* info_log, const char* format, ...) {
927 va_list ap;
928 va_start(ap, format);
929 Warnv(info_log, format, ap);
930 va_end(ap);
931 }
932
Errorv(Logger * info_log,const char * format,va_list ap)933 static void Errorv(Logger* info_log, const char* format, va_list ap) {
934 if (info_log && info_log->GetInfoLogLevel() <= InfoLogLevel::ERROR_LEVEL) {
935 info_log->Logv(InfoLogLevel::ERROR_LEVEL, format, ap);
936 }
937 }
938
Error(Logger * info_log,const char * format,...)939 void Error(Logger* info_log, const char* format, ...) {
940 va_list ap;
941 va_start(ap, format);
942 Errorv(info_log, format, ap);
943 va_end(ap);
944 }
945
Fatalv(Logger * info_log,const char * format,va_list ap)946 static void Fatalv(Logger* info_log, const char* format, va_list ap) {
947 if (info_log && info_log->GetInfoLogLevel() <= InfoLogLevel::FATAL_LEVEL) {
948 info_log->Logv(InfoLogLevel::FATAL_LEVEL, format, ap);
949 }
950 }
951
Fatal(Logger * info_log,const char * format,...)952 void Fatal(Logger* info_log, const char* format, ...) {
953 va_list ap;
954 va_start(ap, format);
955 Fatalv(info_log, format, ap);
956 va_end(ap);
957 }
958
LogFlush(const std::shared_ptr<Logger> & info_log)959 void LogFlush(const std::shared_ptr<Logger>& info_log) {
960 LogFlush(info_log.get());
961 }
962
Log(const InfoLogLevel log_level,const std::shared_ptr<Logger> & info_log,const char * format,...)963 void Log(const InfoLogLevel log_level, const std::shared_ptr<Logger>& info_log,
964 const char* format, ...) {
965 va_list ap;
966 va_start(ap, format);
967 Logv(log_level, info_log.get(), format, ap);
968 va_end(ap);
969 }
970
Header(const std::shared_ptr<Logger> & info_log,const char * format,...)971 void Header(const std::shared_ptr<Logger>& info_log, const char* format, ...) {
972 va_list ap;
973 va_start(ap, format);
974 Headerv(info_log.get(), format, ap);
975 va_end(ap);
976 }
977
Debug(const std::shared_ptr<Logger> & info_log,const char * format,...)978 void Debug(const std::shared_ptr<Logger>& info_log, const char* format, ...) {
979 va_list ap;
980 va_start(ap, format);
981 Debugv(info_log.get(), format, ap);
982 va_end(ap);
983 }
984
Info(const std::shared_ptr<Logger> & info_log,const char * format,...)985 void Info(const std::shared_ptr<Logger>& info_log, const char* format, ...) {
986 va_list ap;
987 va_start(ap, format);
988 Infov(info_log.get(), format, ap);
989 va_end(ap);
990 }
991
Warn(const std::shared_ptr<Logger> & info_log,const char * format,...)992 void Warn(const std::shared_ptr<Logger>& info_log, const char* format, ...) {
993 va_list ap;
994 va_start(ap, format);
995 Warnv(info_log.get(), format, ap);
996 va_end(ap);
997 }
998
Error(const std::shared_ptr<Logger> & info_log,const char * format,...)999 void Error(const std::shared_ptr<Logger>& info_log, const char* format, ...) {
1000 va_list ap;
1001 va_start(ap, format);
1002 Errorv(info_log.get(), format, ap);
1003 va_end(ap);
1004 }
1005
Fatal(const std::shared_ptr<Logger> & info_log,const char * format,...)1006 void Fatal(const std::shared_ptr<Logger>& info_log, const char* format, ...) {
1007 va_list ap;
1008 va_start(ap, format);
1009 Fatalv(info_log.get(), format, ap);
1010 va_end(ap);
1011 }
1012
Log(const std::shared_ptr<Logger> & info_log,const char * format,...)1013 void Log(const std::shared_ptr<Logger>& info_log, const char* format, ...) {
1014 va_list ap;
1015 va_start(ap, format);
1016 Logv(info_log.get(), format, ap);
1017 va_end(ap);
1018 }
1019
WriteStringToFile(Env * env,const Slice & data,const std::string & fname,bool should_sync)1020 Status WriteStringToFile(Env* env, const Slice& data, const std::string& fname,
1021 bool should_sync) {
1022 const auto& fs = env->GetFileSystem();
1023 return WriteStringToFile(fs.get(), data, fname, should_sync);
1024 }
1025
ReadFileToString(Env * env,const std::string & fname,std::string * data)1026 Status ReadFileToString(Env* env, const std::string& fname, std::string* data) {
1027 const auto& fs = env->GetFileSystem();
1028 return ReadFileToString(fs.get(), fname, data);
1029 }
1030
~EnvWrapper()1031 EnvWrapper::~EnvWrapper() {
1032 }
1033
1034 namespace { // anonymous namespace
1035
AssignEnvOptions(EnvOptions * env_options,const DBOptions & options)1036 void AssignEnvOptions(EnvOptions* env_options, const DBOptions& options) {
1037 env_options->use_mmap_reads = options.allow_mmap_reads;
1038 env_options->use_mmap_writes = options.allow_mmap_writes;
1039 env_options->use_direct_reads = options.use_direct_reads;
1040 env_options->set_fd_cloexec = options.is_fd_close_on_exec;
1041 env_options->bytes_per_sync = options.bytes_per_sync;
1042 env_options->compaction_readahead_size = options.compaction_readahead_size;
1043 env_options->random_access_max_buffer_size =
1044 options.random_access_max_buffer_size;
1045 env_options->rate_limiter = options.rate_limiter.get();
1046 env_options->writable_file_max_buffer_size =
1047 options.writable_file_max_buffer_size;
1048 env_options->allow_fallocate = options.allow_fallocate;
1049 env_options->strict_bytes_per_sync = options.strict_bytes_per_sync;
1050 options.env->SanitizeEnvOptions(env_options);
1051 }
1052
1053 }
1054
OptimizeForLogWrite(const EnvOptions & env_options,const DBOptions & db_options) const1055 EnvOptions Env::OptimizeForLogWrite(const EnvOptions& env_options,
1056 const DBOptions& db_options) const {
1057 EnvOptions optimized_env_options(env_options);
1058 optimized_env_options.bytes_per_sync = db_options.wal_bytes_per_sync;
1059 optimized_env_options.writable_file_max_buffer_size =
1060 db_options.writable_file_max_buffer_size;
1061 return optimized_env_options;
1062 }
1063
OptimizeForManifestWrite(const EnvOptions & env_options) const1064 EnvOptions Env::OptimizeForManifestWrite(const EnvOptions& env_options) const {
1065 return env_options;
1066 }
1067
OptimizeForLogRead(const EnvOptions & env_options) const1068 EnvOptions Env::OptimizeForLogRead(const EnvOptions& env_options) const {
1069 EnvOptions optimized_env_options(env_options);
1070 optimized_env_options.use_direct_reads = false;
1071 return optimized_env_options;
1072 }
1073
OptimizeForManifestRead(const EnvOptions & env_options) const1074 EnvOptions Env::OptimizeForManifestRead(const EnvOptions& env_options) const {
1075 EnvOptions optimized_env_options(env_options);
1076 optimized_env_options.use_direct_reads = false;
1077 return optimized_env_options;
1078 }
1079
OptimizeForCompactionTableWrite(const EnvOptions & env_options,const ImmutableDBOptions & db_options) const1080 EnvOptions Env::OptimizeForCompactionTableWrite(
1081 const EnvOptions& env_options, const ImmutableDBOptions& db_options) const {
1082 EnvOptions optimized_env_options(env_options);
1083 optimized_env_options.use_direct_writes =
1084 db_options.use_direct_io_for_flush_and_compaction;
1085 return optimized_env_options;
1086 }
1087
OptimizeForCompactionTableRead(const EnvOptions & env_options,const ImmutableDBOptions & db_options) const1088 EnvOptions Env::OptimizeForCompactionTableRead(
1089 const EnvOptions& env_options, const ImmutableDBOptions& db_options) const {
1090 EnvOptions optimized_env_options(env_options);
1091 optimized_env_options.use_direct_reads = db_options.use_direct_reads;
1092 return optimized_env_options;
1093 }
OptimizeForBlobFileRead(const EnvOptions & env_options,const ImmutableDBOptions & db_options) const1094 EnvOptions Env::OptimizeForBlobFileRead(
1095 const EnvOptions& env_options, const ImmutableDBOptions& db_options) const {
1096 EnvOptions optimized_env_options(env_options);
1097 optimized_env_options.use_direct_reads = db_options.use_direct_reads;
1098 return optimized_env_options;
1099 }
1100
EnvOptions(const DBOptions & options)1101 EnvOptions::EnvOptions(const DBOptions& options) {
1102 AssignEnvOptions(this, options);
1103 }
1104
EnvOptions()1105 EnvOptions::EnvOptions() {
1106 DBOptions options;
1107 AssignEnvOptions(this, options);
1108 }
1109
NewEnvLogger(const std::string & fname,Env * env,std::shared_ptr<Logger> * result)1110 Status NewEnvLogger(const std::string& fname, Env* env,
1111 std::shared_ptr<Logger>* result) {
1112 FileOptions options;
1113 // TODO: Tune the buffer size.
1114 options.writable_file_max_buffer_size = 1024 * 1024;
1115 std::unique_ptr<FSWritableFile> writable_file;
1116 const auto status = env->GetFileSystem()->NewWritableFile(
1117 fname, options, &writable_file, nullptr);
1118 if (!status.ok()) {
1119 return status;
1120 }
1121
1122 *result = std::make_shared<EnvLogger>(std::move(writable_file), fname,
1123 options, env);
1124 return Status::OK();
1125 }
1126
GetFileSystem() const1127 const std::shared_ptr<FileSystem>& Env::GetFileSystem() const {
1128 return file_system_;
1129 }
1130
GetSystemClock() const1131 const std::shared_ptr<SystemClock>& Env::GetSystemClock() const {
1132 return system_clock_;
1133 }
1134 namespace {
1135 static std::unordered_map<std::string, OptionTypeInfo> sc_wrapper_type_info = {
1136 #ifndef ROCKSDB_LITE
1137 {"target",
1138 OptionTypeInfo::AsCustomSharedPtr<SystemClock>(
1139 0, OptionVerificationType::kByName, OptionTypeFlags::kDontSerialize)},
1140 #endif // ROCKSDB_LITE
1141 };
1142
1143 } // namespace
SystemClockWrapper(const std::shared_ptr<SystemClock> & t)1144 SystemClockWrapper::SystemClockWrapper(const std::shared_ptr<SystemClock>& t)
1145 : target_(t) {
1146 RegisterOptions("", &target_, &sc_wrapper_type_info);
1147 }
1148
PrepareOptions(const ConfigOptions & options)1149 Status SystemClockWrapper::PrepareOptions(const ConfigOptions& options) {
1150 if (target_ == nullptr) {
1151 target_ = SystemClock::Default();
1152 }
1153 return SystemClock::PrepareOptions(options);
1154 }
1155
1156 #ifndef ROCKSDB_LITE
SerializeOptions(const ConfigOptions & config_options,const std::string & header) const1157 std::string SystemClockWrapper::SerializeOptions(
1158 const ConfigOptions& config_options, const std::string& header) const {
1159 auto parent = SystemClock::SerializeOptions(config_options, "");
1160 if (config_options.IsShallow() || target_ == nullptr ||
1161 target_->IsInstanceOf(SystemClock::kDefaultName())) {
1162 return parent;
1163 } else {
1164 std::string result = header;
1165 if (!StartsWith(parent, OptionTypeInfo::kIdPropName())) {
1166 result.append(OptionTypeInfo::kIdPropName()).append("=");
1167 }
1168 result.append(parent);
1169 if (!EndsWith(result, config_options.delimiter)) {
1170 result.append(config_options.delimiter);
1171 }
1172 result.append("target=").append(target_->ToString(config_options));
1173 return result;
1174 }
1175 }
1176 #endif // ROCKSDB_LITE
1177
1178 #ifndef ROCKSDB_LITE
RegisterBuiltinSystemClocks(ObjectLibrary & library,const std::string &)1179 static int RegisterBuiltinSystemClocks(ObjectLibrary& library,
1180 const std::string& /*arg*/) {
1181 library.Register<SystemClock>(
1182 EmulatedSystemClock::kClassName(),
1183 [](const std::string& /*uri*/, std::unique_ptr<SystemClock>* guard,
1184 std::string* /* errmsg */) {
1185 guard->reset(new EmulatedSystemClock(SystemClock::Default()));
1186 return guard->get();
1187 });
1188 size_t num_types;
1189 return static_cast<int>(library.GetFactoryCount(&num_types));
1190 }
1191 #endif // ROCKSDB_LITE
1192
CreateFromString(const ConfigOptions & config_options,const std::string & value,std::shared_ptr<SystemClock> * result)1193 Status SystemClock::CreateFromString(const ConfigOptions& config_options,
1194 const std::string& value,
1195 std::shared_ptr<SystemClock>* result) {
1196 auto clock = SystemClock::Default();
1197 if (clock->IsInstanceOf(value)) {
1198 *result = clock;
1199 return Status::OK();
1200 } else {
1201 #ifndef ROCKSDB_LITE
1202 static std::once_flag once;
1203 std::call_once(once, [&]() {
1204 RegisterBuiltinSystemClocks(*(ObjectLibrary::Default().get()), "");
1205 });
1206 #endif // ROCKSDB_LITE
1207 return LoadSharedObject<SystemClock>(config_options, value, nullptr,
1208 result);
1209 }
1210 }
1211 } // namespace ROCKSDB_NAMESPACE
1212