1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors
9 
10 #if !defined(OS_WIN)
11 
12 #include <dirent.h>
13 #ifndef ROCKSDB_NO_DYNAMIC_EXTENSION
14 #include <dlfcn.h>
15 #endif
16 #include <errno.h>
17 #include <fcntl.h>
18 
19 #if defined(ROCKSDB_IOURING_PRESENT)
20 #include <liburing.h>
21 #endif
22 #include <pthread.h>
23 #include <signal.h>
24 #include <stdio.h>
25 #include <stdlib.h>
26 #include <string.h>
27 #include <sys/mman.h>
28 #include <sys/stat.h>
29 #if defined(OS_LINUX) || defined(OS_SOLARIS) || defined(OS_ANDROID)
30 #include <sys/statfs.h>
31 #endif
32 #include <sys/statvfs.h>
33 #include <sys/time.h>
34 #include <sys/types.h>
35 #if defined(ROCKSDB_IOURING_PRESENT)
36 #include <sys/uio.h>
37 #endif
38 #include <time.h>
39 #include <algorithm>
40 // Get nano time includes
41 #if defined(OS_LINUX) || defined(OS_FREEBSD) || defined(OS_GNU_KFREEBSD)
42 #elif defined(__MACH__)
43 #include <Availability.h>
44 #include <mach/clock.h>
45 #include <mach/mach.h>
46 #else
47 #include <chrono>
48 #endif
49 #include <deque>
50 #include <set>
51 #include <vector>
52 
53 #include "env/composite_env_wrapper.h"
54 #include "env/io_posix.h"
55 #include "logging/posix_logger.h"
56 #include "monitoring/iostats_context_imp.h"
57 #include "monitoring/thread_status_updater.h"
58 #include "port/port.h"
59 #include "rocksdb/env.h"
60 #include "rocksdb/options.h"
61 #include "rocksdb/slice.h"
62 #include "rocksdb/system_clock.h"
63 #include "test_util/sync_point.h"
64 #include "util/coding.h"
65 #include "util/compression_context_cache.h"
66 #include "util/random.h"
67 #include "util/string_util.h"
68 #include "util/thread_local.h"
69 #include "util/threadpool_imp.h"
70 
71 #if !defined(TMPFS_MAGIC)
72 #define TMPFS_MAGIC 0x01021994
73 #endif
74 #if !defined(XFS_SUPER_MAGIC)
75 #define XFS_SUPER_MAGIC 0x58465342
76 #endif
77 #if !defined(EXT4_SUPER_MAGIC)
78 #define EXT4_SUPER_MAGIC 0xEF53
79 #endif
80 
81 namespace ROCKSDB_NAMESPACE {
82 #if defined(OS_WIN)
83 static const std::string kSharedLibExt = ".dll";
84 static const char kPathSeparator = ';';
85 #else
86 static const char kPathSeparator = ':';
87 #if defined(OS_MACOSX)
88 static const std::string kSharedLibExt = ".dylib";
89 #else
90 static const std::string kSharedLibExt = ".so";
91 #endif
92 #endif
93 
94 namespace {
95 
CreateThreadStatusUpdater()96 ThreadStatusUpdater* CreateThreadStatusUpdater() {
97   return new ThreadStatusUpdater();
98 }
99 
100 #ifndef ROCKSDB_NO_DYNAMIC_EXTENSION
101 class PosixDynamicLibrary : public DynamicLibrary {
102  public:
PosixDynamicLibrary(const std::string & name,void * handle)103   PosixDynamicLibrary(const std::string& name, void* handle)
104       : name_(name), handle_(handle) {}
~PosixDynamicLibrary()105   ~PosixDynamicLibrary() override { dlclose(handle_); }
106 
LoadSymbol(const std::string & sym_name,void ** func)107   Status LoadSymbol(const std::string& sym_name, void** func) override {
108     assert(nullptr != func);
109     dlerror();  // Clear any old error
110     *func = dlsym(handle_, sym_name.c_str());
111     if (*func != nullptr) {
112       return Status::OK();
113     } else {
114       char* err = dlerror();
115       return Status::NotFound("Error finding symbol: " + sym_name, err);
116     }
117   }
118 
Name() const119   const char* Name() const override { return name_.c_str(); }
120 
121  private:
122   std::string name_;
123   void* handle_;
124 };
125 #endif  // !ROCKSDB_NO_DYNAMIC_EXTENSION
126 
127 class PosixClock : public SystemClock {
128  public:
Name() const129   const char* Name() const override { return "PosixClock"; }
NowMicros()130   uint64_t NowMicros() override {
131     struct timeval tv;
132     gettimeofday(&tv, nullptr);
133     return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
134   }
135 
NowNanos()136   uint64_t NowNanos() override {
137 #if defined(OS_LINUX) || defined(OS_FREEBSD) || defined(OS_GNU_KFREEBSD) || \
138     defined(OS_AIX)
139     struct timespec ts;
140     clock_gettime(CLOCK_MONOTONIC, &ts);
141     return static_cast<uint64_t>(ts.tv_sec) * 1000000000 + ts.tv_nsec;
142 #elif defined(OS_SOLARIS)
143     return gethrtime();
144 #elif defined(__MACH__)
145     clock_serv_t cclock;
146     mach_timespec_t ts;
147     host_get_clock_service(mach_host_self(), CALENDAR_CLOCK, &cclock);
148     clock_get_time(cclock, &ts);
149     mach_port_deallocate(mach_task_self(), cclock);
150     return static_cast<uint64_t>(ts.tv_sec) * 1000000000 + ts.tv_nsec;
151 #else
152     return std::chrono::duration_cast<std::chrono::nanoseconds>(
153                std::chrono::steady_clock::now().time_since_epoch())
154         .count();
155 #endif
156   }
157 
CPUMicros()158   uint64_t CPUMicros() override {
159 #if defined(OS_LINUX) || defined(OS_FREEBSD) || defined(OS_GNU_KFREEBSD) || \
160     defined(OS_AIX) || (defined(__MACH__) && defined(__MAC_10_12))
161     struct timespec ts;
162     clock_gettime(CLOCK_THREAD_CPUTIME_ID, &ts);
163     return static_cast<uint64_t>(ts.tv_sec) * 1000000000;
164 #endif
165     return 0;
166   }
167 
CPUNanos()168   uint64_t CPUNanos() override {
169 #if defined(OS_LINUX) || defined(OS_FREEBSD) || defined(OS_GNU_KFREEBSD) || \
170     defined(OS_AIX) || (defined(__MACH__) && defined(__MAC_10_12))
171     struct timespec ts;
172     clock_gettime(CLOCK_THREAD_CPUTIME_ID, &ts);
173     return static_cast<uint64_t>(ts.tv_sec) * 1000000000 + ts.tv_nsec;
174 #endif
175     return 0;
176   }
177 
SleepForMicroseconds(int micros)178   void SleepForMicroseconds(int micros) override { usleep(micros); }
179 
GetCurrentTime(int64_t * unix_time)180   Status GetCurrentTime(int64_t* unix_time) override {
181     time_t ret = time(nullptr);
182     if (ret == (time_t)-1) {
183       return IOError("GetCurrentTime", "", errno);
184     }
185     *unix_time = (int64_t)ret;
186     return Status::OK();
187   }
188 
TimeToString(uint64_t secondsSince1970)189   std::string TimeToString(uint64_t secondsSince1970) override {
190     const time_t seconds = (time_t)secondsSince1970;
191     struct tm t;
192     int maxsize = 64;
193     std::string dummy;
194     dummy.reserve(maxsize);
195     dummy.resize(maxsize);
196     char* p = &dummy[0];
197     localtime_r(&seconds, &t);
198     snprintf(p, maxsize, "%04d/%02d/%02d-%02d:%02d:%02d ", t.tm_year + 1900,
199              t.tm_mon + 1, t.tm_mday, t.tm_hour, t.tm_min, t.tm_sec);
200     return dummy;
201   }
202 };
203 
204 class PosixEnv : public CompositeEnv {
205  public:
206   PosixEnv(const PosixEnv* default_env, const std::shared_ptr<FileSystem>& fs);
~PosixEnv()207   ~PosixEnv() override {
208     if (this == Env::Default()) {
209       for (const auto tid : threads_to_join_) {
210         pthread_join(tid, nullptr);
211       }
212       for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) {
213         thread_pools_[pool_id].JoinAllThreads();
214       }
215       // Do not delete the thread_status_updater_ in order to avoid the
216       // free after use when Env::Default() is destructed while some other
217       // child threads are still trying to update thread status. All
218       // PosixEnv instances use the same thread_status_updater_, so never
219       // explicitly delete it.
220     }
221   }
222 
SetFD_CLOEXEC(int fd,const EnvOptions * options)223   void SetFD_CLOEXEC(int fd, const EnvOptions* options) {
224     if ((options == nullptr || options->set_fd_cloexec) && fd > 0) {
225       fcntl(fd, F_SETFD, fcntl(fd, F_GETFD) | FD_CLOEXEC);
226     }
227   }
228 
229 #ifndef ROCKSDB_NO_DYNAMIC_EXTENSION
230   // Loads the named library into the result.
231   // If the input name is empty, the current executable is loaded
232   // On *nix systems, a "lib" prefix is added to the name if one is not supplied
233   // Comparably, the appropriate shared library extension is added to the name
234   // if not supplied. If search_path is not specified, the shared library will
235   // be loaded using the default path (LD_LIBRARY_PATH) If search_path is
236   // specified, the shared library will be searched for in the directories
237   // provided by the search path
LoadLibrary(const std::string & name,const std::string & path,std::shared_ptr<DynamicLibrary> * result)238   Status LoadLibrary(const std::string& name, const std::string& path,
239                      std::shared_ptr<DynamicLibrary>* result) override {
240     assert(result != nullptr);
241     if (name.empty()) {
242       void* hndl = dlopen(NULL, RTLD_NOW);
243       if (hndl != nullptr) {
244         result->reset(new PosixDynamicLibrary(name, hndl));
245         return Status::OK();
246       }
247     } else {
248       std::string library_name = name;
249       if (library_name.find(kSharedLibExt) == std::string::npos) {
250         library_name = library_name + kSharedLibExt;
251       }
252 #if !defined(OS_WIN)
253       if (library_name.find('/') == std::string::npos &&
254           library_name.compare(0, 3, "lib") != 0) {
255         library_name = "lib" + library_name;
256       }
257 #endif
258       if (path.empty()) {
259         void* hndl = dlopen(library_name.c_str(), RTLD_NOW);
260         if (hndl != nullptr) {
261           result->reset(new PosixDynamicLibrary(library_name, hndl));
262           return Status::OK();
263         }
264       } else {
265         std::string local_path;
266         std::stringstream ss(path);
267         while (getline(ss, local_path, kPathSeparator)) {
268           if (!path.empty()) {
269             std::string full_name = local_path + "/" + library_name;
270             void* hndl = dlopen(full_name.c_str(), RTLD_NOW);
271             if (hndl != nullptr) {
272               result->reset(new PosixDynamicLibrary(full_name, hndl));
273               return Status::OK();
274             }
275           }
276         }
277       }
278     }
279     return Status::IOError(
280         IOErrorMsg("Failed to open shared library: xs", name), dlerror());
281   }
282 #endif  // !ROCKSDB_NO_DYNAMIC_EXTENSION
283 
284   void Schedule(void (*function)(void* arg1), void* arg, Priority pri = LOW,
285                 void* tag = nullptr,
286                 void (*unschedFunction)(void* arg) = nullptr) override;
287 
288   int UnSchedule(void* arg, Priority pri) override;
289 
290   void StartThread(void (*function)(void* arg), void* arg) override;
291 
292   void WaitForJoin() override;
293 
294   unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override;
295 
GetThreadList(std::vector<ThreadStatus> * thread_list)296   Status GetThreadList(std::vector<ThreadStatus>* thread_list) override {
297     assert(thread_status_updater_);
298     return thread_status_updater_->GetThreadList(thread_list);
299   }
300 
gettid(pthread_t tid)301   static uint64_t gettid(pthread_t tid) {
302     uint64_t thread_id = 0;
303     memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid)));
304     return thread_id;
305   }
306 
gettid()307   static uint64_t gettid() {
308     pthread_t tid = pthread_self();
309     return gettid(tid);
310   }
311 
GetThreadID() const312   uint64_t GetThreadID() const override { return gettid(pthread_self()); }
313 
GetHostName(char * name,uint64_t len)314   Status GetHostName(char* name, uint64_t len) override {
315     int ret = gethostname(name, static_cast<size_t>(len));
316     if (ret < 0) {
317       if (errno == EFAULT || errno == EINVAL) {
318         return Status::InvalidArgument(errnoStr(errno).c_str());
319       } else {
320         return IOError("GetHostName", name, errno);
321       }
322     }
323     return Status::OK();
324   }
325 
GetThreadStatusUpdater() const326   ThreadStatusUpdater* GetThreadStatusUpdater() const override {
327     return Env::GetThreadStatusUpdater();
328   }
329 
GenerateUniqueId()330   std::string GenerateUniqueId() override { return Env::GenerateUniqueId(); }
331 
332   // Allow increasing the number of worker threads.
SetBackgroundThreads(int num,Priority pri)333   void SetBackgroundThreads(int num, Priority pri) override {
334     assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
335     thread_pools_[pri].SetBackgroundThreads(num);
336   }
337 
GetBackgroundThreads(Priority pri)338   int GetBackgroundThreads(Priority pri) override {
339     assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
340     return thread_pools_[pri].GetBackgroundThreads();
341   }
342 
SetAllowNonOwnerAccess(bool allow_non_owner_access)343   Status SetAllowNonOwnerAccess(bool allow_non_owner_access) override {
344     allow_non_owner_access_ = allow_non_owner_access;
345     return Status::OK();
346   }
347 
348   // Allow increasing the number of worker threads.
IncBackgroundThreadsIfNeeded(int num,Priority pri)349   void IncBackgroundThreadsIfNeeded(int num, Priority pri) override {
350     assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
351     thread_pools_[pri].IncBackgroundThreadsIfNeeded(num);
352   }
353 
LowerThreadPoolIOPriority(Priority pool)354   void LowerThreadPoolIOPriority(Priority pool) override {
355     assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH);
356 #ifdef OS_LINUX
357     thread_pools_[pool].LowerIOPriority();
358 #else
359     (void)pool;
360 #endif
361   }
362 
LowerThreadPoolCPUPriority(Priority pool)363   void LowerThreadPoolCPUPriority(Priority pool) override {
364     assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH);
365     thread_pools_[pool].LowerCPUPriority(CpuPriority::kLow);
366   }
367 
LowerThreadPoolCPUPriority(Priority pool,CpuPriority pri)368   Status LowerThreadPoolCPUPriority(Priority pool, CpuPriority pri) override {
369     assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH);
370     thread_pools_[pool].LowerCPUPriority(pri);
371     return Status::OK();
372   }
373 
374  private:
375   friend Env* Env::Default();
376   // Constructs the default Env, a singleton
377   PosixEnv();
378 
379   // The below 4 members are only used by the default PosixEnv instance.
380   // Non-default instances simply maintain references to the backing
381   // members in te default instance
382   std::vector<ThreadPoolImpl> thread_pools_storage_;
383   pthread_mutex_t mu_storage_;
384   std::vector<pthread_t> threads_to_join_storage_;
385   bool allow_non_owner_access_storage_;
386 
387   std::vector<ThreadPoolImpl>& thread_pools_;
388   pthread_mutex_t& mu_;
389   std::vector<pthread_t>& threads_to_join_;
390   // If true, allow non owner read access for db files. Otherwise, non-owner
391   //  has no access to db files.
392   bool& allow_non_owner_access_;
393 };
394 
PosixEnv()395 PosixEnv::PosixEnv()
396     : CompositeEnv(FileSystem::Default(), SystemClock::Default()),
397       thread_pools_storage_(Priority::TOTAL),
398       allow_non_owner_access_storage_(true),
399       thread_pools_(thread_pools_storage_),
400       mu_(mu_storage_),
401       threads_to_join_(threads_to_join_storage_),
402       allow_non_owner_access_(allow_non_owner_access_storage_) {
403   ThreadPoolImpl::PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr));
404   for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) {
405     thread_pools_[pool_id].SetThreadPriority(
406         static_cast<Env::Priority>(pool_id));
407     // This allows later initializing the thread-local-env of each thread.
408     thread_pools_[pool_id].SetHostEnv(this);
409   }
410   thread_status_updater_ = CreateThreadStatusUpdater();
411 }
412 
PosixEnv(const PosixEnv * default_env,const std::shared_ptr<FileSystem> & fs)413 PosixEnv::PosixEnv(const PosixEnv* default_env,
414                    const std::shared_ptr<FileSystem>& fs)
415     : CompositeEnv(fs, default_env->GetSystemClock()),
416       thread_pools_(default_env->thread_pools_),
417       mu_(default_env->mu_),
418       threads_to_join_(default_env->threads_to_join_),
419       allow_non_owner_access_(default_env->allow_non_owner_access_) {
420   thread_status_updater_ = default_env->thread_status_updater_;
421 }
422 
Schedule(void (* function)(void * arg1),void * arg,Priority pri,void * tag,void (* unschedFunction)(void * arg))423 void PosixEnv::Schedule(void (*function)(void* arg1), void* arg, Priority pri,
424                         void* tag, void (*unschedFunction)(void* arg)) {
425   assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
426   thread_pools_[pri].Schedule(function, arg, tag, unschedFunction);
427 }
428 
UnSchedule(void * arg,Priority pri)429 int PosixEnv::UnSchedule(void* arg, Priority pri) {
430   return thread_pools_[pri].UnSchedule(arg);
431 }
432 
GetThreadPoolQueueLen(Priority pri) const433 unsigned int PosixEnv::GetThreadPoolQueueLen(Priority pri) const {
434   assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
435   return thread_pools_[pri].GetQueueLen();
436 }
437 
438 struct StartThreadState {
439   void (*user_function)(void*);
440   void* arg;
441 };
442 
StartThreadWrapper(void * arg)443 static void* StartThreadWrapper(void* arg) {
444   StartThreadState* state = reinterpret_cast<StartThreadState*>(arg);
445   state->user_function(state->arg);
446   delete state;
447   return nullptr;
448 }
449 
StartThread(void (* function)(void * arg),void * arg)450 void PosixEnv::StartThread(void (*function)(void* arg), void* arg) {
451   pthread_t t;
452   StartThreadState* state = new StartThreadState;
453   state->user_function = function;
454   state->arg = arg;
455   ThreadPoolImpl::PthreadCall(
456       "start thread", pthread_create(&t, nullptr, &StartThreadWrapper, state));
457   ThreadPoolImpl::PthreadCall("lock", pthread_mutex_lock(&mu_));
458   threads_to_join_.push_back(t);
459   ThreadPoolImpl::PthreadCall("unlock", pthread_mutex_unlock(&mu_));
460 }
461 
WaitForJoin()462 void PosixEnv::WaitForJoin() {
463   for (const auto tid : threads_to_join_) {
464     pthread_join(tid, nullptr);
465   }
466   threads_to_join_.clear();
467 }
468 
469 }  // namespace
470 
GenerateUniqueId()471 std::string Env::GenerateUniqueId() {
472   std::string uuid_file = "/proc/sys/kernel/random/uuid";
473   std::shared_ptr<FileSystem> fs = FileSystem::Default();
474 
475   Status s = fs->FileExists(uuid_file, IOOptions(), nullptr);
476   if (s.ok()) {
477     std::string uuid;
478     s = ReadFileToString(fs.get(), uuid_file, &uuid);
479     if (s.ok()) {
480       return uuid;
481     }
482   }
483   // Could not read uuid_file - generate uuid using "nanos-random"
484   Random64 r(time(nullptr));
485   uint64_t random_uuid_portion =
486     r.Uniform(std::numeric_limits<uint64_t>::max());
487   uint64_t nanos_uuid_portion = NowNanos();
488   char uuid2[200];
489   snprintf(uuid2,
490            200,
491            "%lx-%lx",
492            (unsigned long)nanos_uuid_portion,
493            (unsigned long)random_uuid_portion);
494   return uuid2;
495 }
496 
497 //
498 // Default Posix Env
499 //
Default()500 Env* Env::Default() {
501   // The following function call initializes the singletons of ThreadLocalPtr
502   // right before the static default_env.  This guarantees default_env will
503   // always being destructed before the ThreadLocalPtr singletons get
504   // destructed as C++ guarantees that the destructions of static variables
505   // is in the reverse order of their constructions.
506   //
507   // Since static members are destructed in the reverse order
508   // of their construction, having this call here guarantees that
509   // the destructor of static PosixEnv will go first, then the
510   // the singletons of ThreadLocalPtr.
511   ThreadLocalPtr::InitSingletons();
512   CompressionContextCache::InitSingleton();
513   INIT_SYNC_POINT_SINGLETONS();
514   static PosixEnv default_env;
515   return &default_env;
516 }
517 
NewCompositeEnv(const std::shared_ptr<FileSystem> & fs)518 std::unique_ptr<Env> NewCompositeEnv(const std::shared_ptr<FileSystem>& fs) {
519   PosixEnv* default_env = static_cast<PosixEnv*>(Env::Default());
520   return std::unique_ptr<Env>(new PosixEnv(default_env, fs));
521 }
522 
523 //
524 // Default Posix SystemClock
525 //
Default()526 const std::shared_ptr<SystemClock>& SystemClock::Default() {
527   static std::shared_ptr<SystemClock> default_clock =
528       std::make_shared<PosixClock>();
529   return default_clock;
530 }
531 }  // namespace ROCKSDB_NAMESPACE
532 
533 #endif
534