1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors
9 #include <dirent.h>
10 #ifndef ROCKSDB_NO_DYNAMIC_EXTENSION
11 #include <dlfcn.h>
12 #endif
13 #include <errno.h>
14 #include <fcntl.h>
15
16 #if defined(OS_LINUX)
17 #include <linux/fs.h>
18 #endif
19 #if defined(ROCKSDB_IOURING_PRESENT)
20 #include <liburing.h>
21 #endif
22 #include <pthread.h>
23 #include <signal.h>
24 #include <stdio.h>
25 #include <stdlib.h>
26 #include <string.h>
27 #include <sys/ioctl.h>
28 #include <sys/mman.h>
29 #include <sys/stat.h>
30 #if defined(OS_LINUX) || defined(OS_SOLARIS) || defined(OS_ANDROID)
31 #include <sys/statfs.h>
32 #include <sys/syscall.h>
33 #include <sys/sysmacros.h>
34 #endif
35 #include <sys/statvfs.h>
36 #include <sys/time.h>
37 #include <sys/types.h>
38 #if defined(ROCKSDB_IOURING_PRESENT)
39 #include <sys/uio.h>
40 #endif
41 #include <time.h>
42 #include <algorithm>
43 // Get nano time includes
44 #if defined(OS_LINUX) || defined(OS_FREEBSD)
45 #elif defined(__MACH__)
46 #include <Availability.h>
47 #include <mach/clock.h>
48 #include <mach/mach.h>
49 #else
50 #include <chrono>
51 #endif
52 #include <deque>
53 #include <set>
54 #include <vector>
55
56 #include "env/composite_env_wrapper.h"
57 #include "env/io_posix.h"
58 #include "logging/logging.h"
59 #include "logging/posix_logger.h"
60 #include "monitoring/iostats_context_imp.h"
61 #include "monitoring/thread_status_updater.h"
62 #include "port/port.h"
63 #include "rocksdb/options.h"
64 #include "rocksdb/slice.h"
65 #include "test_util/sync_point.h"
66 #include "util/coding.h"
67 #include "util/compression_context_cache.h"
68 #include "util/random.h"
69 #include "util/string_util.h"
70 #include "util/thread_local.h"
71 #include "util/threadpool_imp.h"
72
73 #if !defined(TMPFS_MAGIC)
74 #define TMPFS_MAGIC 0x01021994
75 #endif
76 #if !defined(XFS_SUPER_MAGIC)
77 #define XFS_SUPER_MAGIC 0x58465342
78 #endif
79 #if !defined(EXT4_SUPER_MAGIC)
80 #define EXT4_SUPER_MAGIC 0xEF53
81 #endif
82
83 namespace ROCKSDB_NAMESPACE {
84 #if defined(OS_WIN)
85 static const std::string kSharedLibExt = ".dll";
86 static const char kPathSeparator = ';';
87 #else
88 static const char kPathSeparator = ':';
89 #if defined(OS_MACOSX)
90 static const std::string kSharedLibExt = ".dylib";
91 #else
92 static const std::string kSharedLibExt = ".so";
93 #endif
94 #endif
95
96 namespace {
97
CreateThreadStatusUpdater()98 ThreadStatusUpdater* CreateThreadStatusUpdater() {
99 return new ThreadStatusUpdater();
100 }
101
102 #ifndef ROCKSDB_NO_DYNAMIC_EXTENSION
103 class PosixDynamicLibrary : public DynamicLibrary {
104 public:
PosixDynamicLibrary(const std::string & name,void * handle)105 PosixDynamicLibrary(const std::string& name, void* handle)
106 : name_(name), handle_(handle) {}
~PosixDynamicLibrary()107 ~PosixDynamicLibrary() override { dlclose(handle_); }
108
LoadSymbol(const std::string & sym_name,void ** func)109 Status LoadSymbol(const std::string& sym_name, void** func) override {
110 assert(nullptr != func);
111 dlerror(); // Clear any old error
112 *func = dlsym(handle_, sym_name.c_str());
113 if (*func != nullptr) {
114 return Status::OK();
115 } else {
116 char* err = dlerror();
117 return Status::NotFound("Error finding symbol: " + sym_name, err);
118 }
119 }
120
Name() const121 const char* Name() const override { return name_.c_str(); }
122
123 private:
124 std::string name_;
125 void* handle_;
126 };
127 #endif // !ROCKSDB_NO_DYNAMIC_EXTENSION
128
129 class PosixEnv : public CompositeEnvWrapper {
130 public:
131 PosixEnv();
132
~PosixEnv()133 ~PosixEnv() override {
134 for (const auto tid : threads_to_join_) {
135 pthread_join(tid, nullptr);
136 }
137 for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) {
138 thread_pools_[pool_id].JoinAllThreads();
139 }
140 // Delete the thread_status_updater_ only when the current Env is not
141 // Env::Default(). This is to avoid the free-after-use error when
142 // Env::Default() is destructed while some other child threads are
143 // still trying to update thread status.
144 if (this != Env::Default()) {
145 delete thread_status_updater_;
146 }
147 }
148
SetFD_CLOEXEC(int fd,const EnvOptions * options)149 void SetFD_CLOEXEC(int fd, const EnvOptions* options) {
150 if ((options == nullptr || options->set_fd_cloexec) && fd > 0) {
151 fcntl(fd, F_SETFD, fcntl(fd, F_GETFD) | FD_CLOEXEC);
152 }
153 }
154
155 #ifndef ROCKSDB_NO_DYNAMIC_EXTENSION
156 // Loads the named library into the result.
157 // If the input name is empty, the current executable is loaded
158 // On *nix systems, a "lib" prefix is added to the name if one is not supplied
159 // Comparably, the appropriate shared library extension is added to the name
160 // if not supplied. If search_path is not specified, the shared library will
161 // be loaded using the default path (LD_LIBRARY_PATH) If search_path is
162 // specified, the shared library will be searched for in the directories
163 // provided by the search path
LoadLibrary(const std::string & name,const std::string & path,std::shared_ptr<DynamicLibrary> * result)164 Status LoadLibrary(const std::string& name, const std::string& path,
165 std::shared_ptr<DynamicLibrary>* result) override {
166 Status status;
167 assert(result != nullptr);
168 if (name.empty()) {
169 void* hndl = dlopen(NULL, RTLD_NOW);
170 if (hndl != nullptr) {
171 result->reset(new PosixDynamicLibrary(name, hndl));
172 return Status::OK();
173 }
174 } else {
175 std::string library_name = name;
176 if (library_name.find(kSharedLibExt) == std::string::npos) {
177 library_name = library_name + kSharedLibExt;
178 }
179 #if !defined(OS_WIN)
180 if (library_name.find('/') == std::string::npos &&
181 library_name.compare(0, 3, "lib") != 0) {
182 library_name = "lib" + library_name;
183 }
184 #endif
185 if (path.empty()) {
186 void* hndl = dlopen(library_name.c_str(), RTLD_NOW);
187 if (hndl != nullptr) {
188 result->reset(new PosixDynamicLibrary(library_name, hndl));
189 return Status::OK();
190 }
191 } else {
192 std::string local_path;
193 std::stringstream ss(path);
194 while (getline(ss, local_path, kPathSeparator)) {
195 if (!path.empty()) {
196 std::string full_name = local_path + "/" + library_name;
197 void* hndl = dlopen(full_name.c_str(), RTLD_NOW);
198 if (hndl != nullptr) {
199 result->reset(new PosixDynamicLibrary(full_name, hndl));
200 return Status::OK();
201 }
202 }
203 }
204 }
205 }
206 return Status::IOError(
207 IOErrorMsg("Failed to open shared library: xs", name), dlerror());
208 }
209 #endif // !ROCKSDB_NO_DYNAMIC_EXTENSION
210
211 void Schedule(void (*function)(void* arg1), void* arg, Priority pri = LOW,
212 void* tag = nullptr,
213 void (*unschedFunction)(void* arg) = nullptr) override;
214
215 int UnSchedule(void* arg, Priority pri) override;
216
217 void StartThread(void (*function)(void* arg), void* arg) override;
218
219 void WaitForJoin() override;
220
221 unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override;
222
GetTestDirectory(std::string * result)223 Status GetTestDirectory(std::string* result) override {
224 const char* env = getenv("TEST_TMPDIR");
225 if (env && env[0] != '\0') {
226 *result = env;
227 } else {
228 char buf[100];
229 snprintf(buf, sizeof(buf), "/tmp/rocksdbtest-%d", int(geteuid()));
230 *result = buf;
231 }
232 // Directory may already exist
233 CreateDir(*result);
234 return Status::OK();
235 }
236
GetThreadList(std::vector<ThreadStatus> * thread_list)237 Status GetThreadList(std::vector<ThreadStatus>* thread_list) override {
238 assert(thread_status_updater_);
239 return thread_status_updater_->GetThreadList(thread_list);
240 }
241
gettid(pthread_t tid)242 static uint64_t gettid(pthread_t tid) {
243 uint64_t thread_id = 0;
244 memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid)));
245 return thread_id;
246 }
247
gettid()248 static uint64_t gettid() {
249 pthread_t tid = pthread_self();
250 return gettid(tid);
251 }
252
GetThreadID() const253 uint64_t GetThreadID() const override { return gettid(pthread_self()); }
254
NewLogger(const std::string & fname,std::shared_ptr<Logger> * result)255 Status NewLogger(const std::string& fname,
256 std::shared_ptr<Logger>* result) override {
257 FILE* f;
258 {
259 IOSTATS_TIMER_GUARD(open_nanos);
260 f = fopen(fname.c_str(),
261 "w"
262 #ifdef __GLIBC_PREREQ
263 #if __GLIBC_PREREQ(2, 7)
264 "e" // glibc extension to enable O_CLOEXEC
265 #endif
266 #endif
267 );
268 }
269 if (f == nullptr) {
270 result->reset();
271 return IOError("when fopen a file for new logger", fname, errno);
272 } else {
273 int fd = fileno(f);
274 #ifdef ROCKSDB_FALLOCATE_PRESENT
275 fallocate(fd, FALLOC_FL_KEEP_SIZE, 0, 4 * 1024);
276 #endif
277 SetFD_CLOEXEC(fd, nullptr);
278 result->reset(new PosixLogger(f, &PosixEnv::gettid, this));
279 return Status::OK();
280 }
281 }
282
NowMicros()283 uint64_t NowMicros() override {
284 struct timeval tv;
285 gettimeofday(&tv, nullptr);
286 return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
287 }
288
NowNanos()289 uint64_t NowNanos() override {
290 #if defined(OS_LINUX) || defined(OS_FREEBSD) || defined(OS_AIX)
291 struct timespec ts;
292 clock_gettime(CLOCK_MONOTONIC, &ts);
293 return static_cast<uint64_t>(ts.tv_sec) * 1000000000 + ts.tv_nsec;
294 #elif defined(OS_SOLARIS)
295 return gethrtime();
296 #elif defined(__MACH__)
297 clock_serv_t cclock;
298 mach_timespec_t ts;
299 host_get_clock_service(mach_host_self(), CALENDAR_CLOCK, &cclock);
300 clock_get_time(cclock, &ts);
301 mach_port_deallocate(mach_task_self(), cclock);
302 return static_cast<uint64_t>(ts.tv_sec) * 1000000000 + ts.tv_nsec;
303 #else
304 return std::chrono::duration_cast<std::chrono::nanoseconds>(
305 std::chrono::steady_clock::now().time_since_epoch()).count();
306 #endif
307 }
308
NowCPUNanos()309 uint64_t NowCPUNanos() override {
310 #if defined(OS_LINUX) || defined(OS_FREEBSD) || defined(OS_AIX) || \
311 (defined(__MACH__) && defined(__MAC_10_12))
312 struct timespec ts;
313 clock_gettime(CLOCK_THREAD_CPUTIME_ID, &ts);
314 return static_cast<uint64_t>(ts.tv_sec) * 1000000000 + ts.tv_nsec;
315 #endif
316 return 0;
317 }
318
SleepForMicroseconds(int micros)319 void SleepForMicroseconds(int micros) override { usleep(micros); }
320
GetHostName(char * name,uint64_t len)321 Status GetHostName(char* name, uint64_t len) override {
322 int ret = gethostname(name, static_cast<size_t>(len));
323 if (ret < 0) {
324 if (errno == EFAULT || errno == EINVAL) {
325 return Status::InvalidArgument(strerror(errno));
326 } else {
327 return IOError("GetHostName", name, errno);
328 }
329 }
330 return Status::OK();
331 }
332
GetCurrentTime(int64_t * unix_time)333 Status GetCurrentTime(int64_t* unix_time) override {
334 time_t ret = time(nullptr);
335 if (ret == (time_t) -1) {
336 return IOError("GetCurrentTime", "", errno);
337 }
338 *unix_time = (int64_t) ret;
339 return Status::OK();
340 }
341
GetThreadStatusUpdater() const342 ThreadStatusUpdater* GetThreadStatusUpdater() const override {
343 return Env::GetThreadStatusUpdater();
344 }
345
GenerateUniqueId()346 std::string GenerateUniqueId() override { return Env::GenerateUniqueId(); }
347
348 // Allow increasing the number of worker threads.
SetBackgroundThreads(int num,Priority pri)349 void SetBackgroundThreads(int num, Priority pri) override {
350 assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
351 thread_pools_[pri].SetBackgroundThreads(num);
352 }
353
GetBackgroundThreads(Priority pri)354 int GetBackgroundThreads(Priority pri) override {
355 assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
356 return thread_pools_[pri].GetBackgroundThreads();
357 }
358
SetAllowNonOwnerAccess(bool allow_non_owner_access)359 Status SetAllowNonOwnerAccess(bool allow_non_owner_access) override {
360 allow_non_owner_access_ = allow_non_owner_access;
361 return Status::OK();
362 }
363
364 // Allow increasing the number of worker threads.
IncBackgroundThreadsIfNeeded(int num,Priority pri)365 void IncBackgroundThreadsIfNeeded(int num, Priority pri) override {
366 assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
367 thread_pools_[pri].IncBackgroundThreadsIfNeeded(num);
368 }
369
LowerThreadPoolIOPriority(Priority pool=LOW)370 void LowerThreadPoolIOPriority(Priority pool = LOW) override {
371 assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH);
372 #ifdef OS_LINUX
373 thread_pools_[pool].LowerIOPriority();
374 #else
375 (void)pool;
376 #endif
377 }
378
LowerThreadPoolCPUPriority(Priority pool=LOW)379 void LowerThreadPoolCPUPriority(Priority pool = LOW) override {
380 assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH);
381 #ifdef OS_LINUX
382 thread_pools_[pool].LowerCPUPriority();
383 #else
384 (void)pool;
385 #endif
386 }
387
TimeToString(uint64_t secondsSince1970)388 std::string TimeToString(uint64_t secondsSince1970) override {
389 const time_t seconds = (time_t)secondsSince1970;
390 struct tm t;
391 int maxsize = 64;
392 std::string dummy;
393 dummy.reserve(maxsize);
394 dummy.resize(maxsize);
395 char* p = &dummy[0];
396 localtime_r(&seconds, &t);
397 snprintf(p, maxsize,
398 "%04d/%02d/%02d-%02d:%02d:%02d ",
399 t.tm_year + 1900,
400 t.tm_mon + 1,
401 t.tm_mday,
402 t.tm_hour,
403 t.tm_min,
404 t.tm_sec);
405 return dummy;
406 }
407
408 private:
409 std::vector<ThreadPoolImpl> thread_pools_;
410 pthread_mutex_t mu_;
411 std::vector<pthread_t> threads_to_join_;
412 // If true, allow non owner read access for db files. Otherwise, non-owner
413 // has no access to db files.
414 bool allow_non_owner_access_;
415 };
416
PosixEnv()417 PosixEnv::PosixEnv()
418 : CompositeEnvWrapper(this, FileSystem::Default().get()),
419 thread_pools_(Priority::TOTAL),
420 allow_non_owner_access_(true) {
421 ThreadPoolImpl::PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr));
422 for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) {
423 thread_pools_[pool_id].SetThreadPriority(
424 static_cast<Env::Priority>(pool_id));
425 // This allows later initializing the thread-local-env of each thread.
426 thread_pools_[pool_id].SetHostEnv(this);
427 }
428 thread_status_updater_ = CreateThreadStatusUpdater();
429 }
430
Schedule(void (* function)(void * arg1),void * arg,Priority pri,void * tag,void (* unschedFunction)(void * arg))431 void PosixEnv::Schedule(void (*function)(void* arg1), void* arg, Priority pri,
432 void* tag, void (*unschedFunction)(void* arg)) {
433 assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
434 thread_pools_[pri].Schedule(function, arg, tag, unschedFunction);
435 }
436
UnSchedule(void * arg,Priority pri)437 int PosixEnv::UnSchedule(void* arg, Priority pri) {
438 return thread_pools_[pri].UnSchedule(arg);
439 }
440
GetThreadPoolQueueLen(Priority pri) const441 unsigned int PosixEnv::GetThreadPoolQueueLen(Priority pri) const {
442 assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
443 return thread_pools_[pri].GetQueueLen();
444 }
445
446 struct StartThreadState {
447 void (*user_function)(void*);
448 void* arg;
449 };
450
StartThreadWrapper(void * arg)451 static void* StartThreadWrapper(void* arg) {
452 StartThreadState* state = reinterpret_cast<StartThreadState*>(arg);
453 state->user_function(state->arg);
454 delete state;
455 return nullptr;
456 }
457
StartThread(void (* function)(void * arg),void * arg)458 void PosixEnv::StartThread(void (*function)(void* arg), void* arg) {
459 pthread_t t;
460 StartThreadState* state = new StartThreadState;
461 state->user_function = function;
462 state->arg = arg;
463 ThreadPoolImpl::PthreadCall(
464 "start thread", pthread_create(&t, nullptr, &StartThreadWrapper, state));
465 ThreadPoolImpl::PthreadCall("lock", pthread_mutex_lock(&mu_));
466 threads_to_join_.push_back(t);
467 ThreadPoolImpl::PthreadCall("unlock", pthread_mutex_unlock(&mu_));
468 }
469
WaitForJoin()470 void PosixEnv::WaitForJoin() {
471 for (const auto tid : threads_to_join_) {
472 pthread_join(tid, nullptr);
473 }
474 threads_to_join_.clear();
475 }
476
477 } // namespace
478
GenerateUniqueId()479 std::string Env::GenerateUniqueId() {
480 std::string uuid_file = "/proc/sys/kernel/random/uuid";
481
482 Status s = FileExists(uuid_file);
483 if (s.ok()) {
484 std::string uuid;
485 s = ReadFileToString(this, uuid_file, &uuid);
486 if (s.ok()) {
487 return uuid;
488 }
489 }
490 // Could not read uuid_file - generate uuid using "nanos-random"
491 Random64 r(time(nullptr));
492 uint64_t random_uuid_portion =
493 r.Uniform(std::numeric_limits<uint64_t>::max());
494 uint64_t nanos_uuid_portion = NowNanos();
495 char uuid2[200];
496 snprintf(uuid2,
497 200,
498 "%lx-%lx",
499 (unsigned long)nanos_uuid_portion,
500 (unsigned long)random_uuid_portion);
501 return uuid2;
502 }
503
504 //
505 // Default Posix Env
506 //
Default()507 Env* Env::Default() {
508 // The following function call initializes the singletons of ThreadLocalPtr
509 // right before the static default_env. This guarantees default_env will
510 // always being destructed before the ThreadLocalPtr singletons get
511 // destructed as C++ guarantees that the destructions of static variables
512 // is in the reverse order of their constructions.
513 //
514 // Since static members are destructed in the reverse order
515 // of their construction, having this call here guarantees that
516 // the destructor of static PosixEnv will go first, then the
517 // the singletons of ThreadLocalPtr.
518 ThreadLocalPtr::InitSingletons();
519 CompressionContextCache::InitSingleton();
520 INIT_SYNC_POINT_SINGLETONS();
521 static PosixEnv default_env;
522 static CompositeEnvWrapper composite_env(&default_env,
523 FileSystem::Default().get());
524 return &composite_env;
525 }
526
527 } // namespace ROCKSDB_NAMESPACE
528