1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors
9 
10 #include "port/lang.h"
11 #if !defined(OS_WIN)
12 
13 #include <dirent.h>
14 #ifndef ROCKSDB_NO_DYNAMIC_EXTENSION
15 #include <dlfcn.h>
16 #endif
17 #include <errno.h>
18 #include <fcntl.h>
19 
20 #if defined(ROCKSDB_IOURING_PRESENT)
21 #include <liburing.h>
22 #endif
23 #include <pthread.h>
24 #include <signal.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <sys/mman.h>
29 #include <sys/stat.h>
30 #if defined(OS_LINUX) || defined(OS_SOLARIS) || defined(OS_ANDROID)
31 #include <sys/statfs.h>
32 #endif
33 #include <sys/statvfs.h>
34 #include <sys/time.h>
35 #include <sys/types.h>
36 #if defined(ROCKSDB_IOURING_PRESENT)
37 #include <sys/uio.h>
38 #endif
39 #include <time.h>
40 #include <algorithm>
41 // Get nano time includes
42 #if defined(OS_LINUX) || defined(OS_FREEBSD) || defined(OS_GNU_KFREEBSD)
43 #elif defined(__MACH__)
44 #include <Availability.h>
45 #include <mach/clock.h>
46 #include <mach/mach.h>
47 #else
48 #include <chrono>
49 #endif
50 #include <deque>
51 #include <set>
52 #include <vector>
53 
54 #include "env/composite_env_wrapper.h"
55 #include "env/io_posix.h"
56 #include "logging/posix_logger.h"
57 #include "monitoring/iostats_context_imp.h"
58 #include "monitoring/thread_status_updater.h"
59 #include "port/port.h"
60 #include "rocksdb/env.h"
61 #include "rocksdb/options.h"
62 #include "rocksdb/slice.h"
63 #include "rocksdb/system_clock.h"
64 #include "test_util/sync_point.h"
65 #include "util/coding.h"
66 #include "util/compression_context_cache.h"
67 #include "util/random.h"
68 #include "util/string_util.h"
69 #include "util/thread_local.h"
70 #include "util/threadpool_imp.h"
71 
72 #if !defined(TMPFS_MAGIC)
73 #define TMPFS_MAGIC 0x01021994
74 #endif
75 #if !defined(XFS_SUPER_MAGIC)
76 #define XFS_SUPER_MAGIC 0x58465342
77 #endif
78 #if !defined(EXT4_SUPER_MAGIC)
79 #define EXT4_SUPER_MAGIC 0xEF53
80 #endif
81 
82 namespace ROCKSDB_NAMESPACE {
83 #if defined(OS_WIN)
84 static const std::string kSharedLibExt = ".dll";
85 static const char kPathSeparator = ';';
86 #else
87 static const char kPathSeparator = ':';
88 #if defined(OS_MACOSX)
89 static const std::string kSharedLibExt = ".dylib";
90 #else
91 static const std::string kSharedLibExt = ".so";
92 #endif
93 #endif
94 
95 namespace {
96 
CreateThreadStatusUpdater()97 ThreadStatusUpdater* CreateThreadStatusUpdater() {
98   return new ThreadStatusUpdater();
99 }
100 
101 #ifndef ROCKSDB_NO_DYNAMIC_EXTENSION
102 class PosixDynamicLibrary : public DynamicLibrary {
103  public:
PosixDynamicLibrary(const std::string & name,void * handle)104   PosixDynamicLibrary(const std::string& name, void* handle)
105       : name_(name), handle_(handle) {}
~PosixDynamicLibrary()106   ~PosixDynamicLibrary() override { dlclose(handle_); }
107 
LoadSymbol(const std::string & sym_name,void ** func)108   Status LoadSymbol(const std::string& sym_name, void** func) override {
109     assert(nullptr != func);
110     dlerror();  // Clear any old error
111     *func = dlsym(handle_, sym_name.c_str());
112     if (*func != nullptr) {
113       return Status::OK();
114     } else {
115       char* err = dlerror();
116       return Status::NotFound("Error finding symbol: " + sym_name, err);
117     }
118   }
119 
Name() const120   const char* Name() const override { return name_.c_str(); }
121 
122  private:
123   std::string name_;
124   void* handle_;
125 };
126 #endif  // !ROCKSDB_NO_DYNAMIC_EXTENSION
127 
128 class PosixClock : public SystemClock {
129  public:
kClassName()130   static const char* kClassName() { return "PosixClock"; }
Name() const131   const char* Name() const override { return kClassName(); }
NickName() const132   const char* NickName() const override { return kDefaultName(); }
133 
NowMicros()134   uint64_t NowMicros() override {
135     struct timeval tv;
136     gettimeofday(&tv, nullptr);
137     return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
138   }
139 
NowNanos()140   uint64_t NowNanos() override {
141 #if defined(OS_LINUX) || defined(OS_FREEBSD) || defined(OS_GNU_KFREEBSD) || \
142     defined(OS_AIX)
143     struct timespec ts;
144     clock_gettime(CLOCK_MONOTONIC, &ts);
145     return static_cast<uint64_t>(ts.tv_sec) * 1000000000 + ts.tv_nsec;
146 #elif defined(OS_SOLARIS)
147     return gethrtime();
148 #elif defined(__MACH__)
149     clock_serv_t cclock;
150     mach_timespec_t ts;
151     host_get_clock_service(mach_host_self(), CALENDAR_CLOCK, &cclock);
152     clock_get_time(cclock, &ts);
153     mach_port_deallocate(mach_task_self(), cclock);
154     return static_cast<uint64_t>(ts.tv_sec) * 1000000000 + ts.tv_nsec;
155 #else
156     return std::chrono::duration_cast<std::chrono::nanoseconds>(
157                std::chrono::steady_clock::now().time_since_epoch())
158         .count();
159 #endif
160   }
161 
CPUMicros()162   uint64_t CPUMicros() override {
163 #if defined(OS_LINUX) || defined(OS_FREEBSD) || defined(OS_GNU_KFREEBSD) || \
164     defined(OS_AIX) || (defined(__MACH__) && defined(__MAC_10_12))
165     struct timespec ts;
166     clock_gettime(CLOCK_THREAD_CPUTIME_ID, &ts);
167     return static_cast<uint64_t>(ts.tv_sec) * 1000000000;
168 #endif
169     return 0;
170   }
171 
CPUNanos()172   uint64_t CPUNanos() override {
173 #if defined(OS_LINUX) || defined(OS_FREEBSD) || defined(OS_GNU_KFREEBSD) || \
174     defined(OS_AIX) || (defined(__MACH__) && defined(__MAC_10_12))
175     struct timespec ts;
176     clock_gettime(CLOCK_THREAD_CPUTIME_ID, &ts);
177     return static_cast<uint64_t>(ts.tv_sec) * 1000000000 + ts.tv_nsec;
178 #endif
179     return 0;
180   }
181 
SleepForMicroseconds(int micros)182   void SleepForMicroseconds(int micros) override { usleep(micros); }
183 
GetCurrentTime(int64_t * unix_time)184   Status GetCurrentTime(int64_t* unix_time) override {
185     time_t ret = time(nullptr);
186     if (ret == (time_t)-1) {
187       return IOError("GetCurrentTime", "", errno);
188     }
189     *unix_time = (int64_t)ret;
190     return Status::OK();
191   }
192 
TimeToString(uint64_t secondsSince1970)193   std::string TimeToString(uint64_t secondsSince1970) override {
194     const time_t seconds = (time_t)secondsSince1970;
195     struct tm t;
196     int maxsize = 64;
197     std::string dummy;
198     dummy.reserve(maxsize);
199     dummy.resize(maxsize);
200     char* p = &dummy[0];
201     localtime_r(&seconds, &t);
202     snprintf(p, maxsize, "%04d/%02d/%02d-%02d:%02d:%02d ", t.tm_year + 1900,
203              t.tm_mon + 1, t.tm_mday, t.tm_hour, t.tm_min, t.tm_sec);
204     return dummy;
205   }
206 };
207 
208 class PosixEnv : public CompositeEnv {
209  public:
210   PosixEnv(const PosixEnv* default_env, const std::shared_ptr<FileSystem>& fs);
~PosixEnv()211   ~PosixEnv() override {
212     if (this == Env::Default()) {
213       for (const auto tid : threads_to_join_) {
214         pthread_join(tid, nullptr);
215       }
216       for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) {
217         thread_pools_[pool_id].JoinAllThreads();
218       }
219       // Do not delete the thread_status_updater_ in order to avoid the
220       // free after use when Env::Default() is destructed while some other
221       // child threads are still trying to update thread status. All
222       // PosixEnv instances use the same thread_status_updater_, so never
223       // explicitly delete it.
224     }
225   }
226 
SetFD_CLOEXEC(int fd,const EnvOptions * options)227   void SetFD_CLOEXEC(int fd, const EnvOptions* options) {
228     if ((options == nullptr || options->set_fd_cloexec) && fd > 0) {
229       fcntl(fd, F_SETFD, fcntl(fd, F_GETFD) | FD_CLOEXEC);
230     }
231   }
232 
233 #ifndef ROCKSDB_NO_DYNAMIC_EXTENSION
234   // Loads the named library into the result.
235   // If the input name is empty, the current executable is loaded
236   // On *nix systems, a "lib" prefix is added to the name if one is not supplied
237   // Comparably, the appropriate shared library extension is added to the name
238   // if not supplied. If search_path is not specified, the shared library will
239   // be loaded using the default path (LD_LIBRARY_PATH) If search_path is
240   // specified, the shared library will be searched for in the directories
241   // provided by the search path
LoadLibrary(const std::string & name,const std::string & path,std::shared_ptr<DynamicLibrary> * result)242   Status LoadLibrary(const std::string& name, const std::string& path,
243                      std::shared_ptr<DynamicLibrary>* result) override {
244     assert(result != nullptr);
245     if (name.empty()) {
246       void* hndl = dlopen(NULL, RTLD_NOW);
247       if (hndl != nullptr) {
248         result->reset(new PosixDynamicLibrary(name, hndl));
249         return Status::OK();
250       }
251     } else {
252       std::string library_name = name;
253       if (library_name.find(kSharedLibExt) == std::string::npos) {
254         library_name = library_name + kSharedLibExt;
255       }
256 #if !defined(OS_WIN)
257       if (library_name.find('/') == std::string::npos &&
258           library_name.compare(0, 3, "lib") != 0) {
259         library_name = "lib" + library_name;
260       }
261 #endif
262       if (path.empty()) {
263         void* hndl = dlopen(library_name.c_str(), RTLD_NOW);
264         if (hndl != nullptr) {
265           result->reset(new PosixDynamicLibrary(library_name, hndl));
266           return Status::OK();
267         }
268       } else {
269         std::string local_path;
270         std::stringstream ss(path);
271         while (getline(ss, local_path, kPathSeparator)) {
272           if (!path.empty()) {
273             std::string full_name = local_path + "/" + library_name;
274             void* hndl = dlopen(full_name.c_str(), RTLD_NOW);
275             if (hndl != nullptr) {
276               result->reset(new PosixDynamicLibrary(full_name, hndl));
277               return Status::OK();
278             }
279           }
280         }
281       }
282     }
283     return Status::IOError(
284         IOErrorMsg("Failed to open shared library: xs", name), dlerror());
285   }
286 #endif  // !ROCKSDB_NO_DYNAMIC_EXTENSION
287 
288   void Schedule(void (*function)(void* arg1), void* arg, Priority pri = LOW,
289                 void* tag = nullptr,
290                 void (*unschedFunction)(void* arg) = nullptr) override;
291 
292   int UnSchedule(void* arg, Priority pri) override;
293 
294   void StartThread(void (*function)(void* arg), void* arg) override;
295 
296   void WaitForJoin() override;
297 
298   unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override;
299 
GetThreadList(std::vector<ThreadStatus> * thread_list)300   Status GetThreadList(std::vector<ThreadStatus>* thread_list) override {
301     assert(thread_status_updater_);
302     return thread_status_updater_->GetThreadList(thread_list);
303   }
304 
gettid(pthread_t tid)305   static uint64_t gettid(pthread_t tid) {
306     uint64_t thread_id = 0;
307     memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid)));
308     return thread_id;
309   }
310 
gettid()311   static uint64_t gettid() {
312     pthread_t tid = pthread_self();
313     return gettid(tid);
314   }
315 
GetThreadID() const316   uint64_t GetThreadID() const override { return gettid(pthread_self()); }
317 
GetHostName(char * name,uint64_t len)318   Status GetHostName(char* name, uint64_t len) override {
319     int ret = gethostname(name, static_cast<size_t>(len));
320     if (ret < 0) {
321       if (errno == EFAULT || errno == EINVAL) {
322         return Status::InvalidArgument(errnoStr(errno).c_str());
323       } else {
324         return IOError("GetHostName", name, errno);
325       }
326     }
327     return Status::OK();
328   }
329 
GetThreadStatusUpdater() const330   ThreadStatusUpdater* GetThreadStatusUpdater() const override {
331     return Env::GetThreadStatusUpdater();
332   }
333 
GenerateUniqueId()334   std::string GenerateUniqueId() override { return Env::GenerateUniqueId(); }
335 
336   // Allow increasing the number of worker threads.
SetBackgroundThreads(int num,Priority pri)337   void SetBackgroundThreads(int num, Priority pri) override {
338     assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
339     thread_pools_[pri].SetBackgroundThreads(num);
340   }
341 
GetBackgroundThreads(Priority pri)342   int GetBackgroundThreads(Priority pri) override {
343     assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
344     return thread_pools_[pri].GetBackgroundThreads();
345   }
346 
SetAllowNonOwnerAccess(bool allow_non_owner_access)347   Status SetAllowNonOwnerAccess(bool allow_non_owner_access) override {
348     allow_non_owner_access_ = allow_non_owner_access;
349     return Status::OK();
350   }
351 
352   // Allow increasing the number of worker threads.
IncBackgroundThreadsIfNeeded(int num,Priority pri)353   void IncBackgroundThreadsIfNeeded(int num, Priority pri) override {
354     assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
355     thread_pools_[pri].IncBackgroundThreadsIfNeeded(num);
356   }
357 
LowerThreadPoolIOPriority(Priority pool)358   void LowerThreadPoolIOPriority(Priority pool) override {
359     assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH);
360 #ifdef OS_LINUX
361     thread_pools_[pool].LowerIOPriority();
362 #else
363     (void)pool;
364 #endif
365   }
366 
LowerThreadPoolCPUPriority(Priority pool)367   void LowerThreadPoolCPUPriority(Priority pool) override {
368     assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH);
369     thread_pools_[pool].LowerCPUPriority(CpuPriority::kLow);
370   }
371 
LowerThreadPoolCPUPriority(Priority pool,CpuPriority pri)372   Status LowerThreadPoolCPUPriority(Priority pool, CpuPriority pri) override {
373     assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH);
374     thread_pools_[pool].LowerCPUPriority(pri);
375     return Status::OK();
376   }
377 
378  private:
379   friend Env* Env::Default();
380   // Constructs the default Env, a singleton
381   PosixEnv();
382 
383   // The below 4 members are only used by the default PosixEnv instance.
384   // Non-default instances simply maintain references to the backing
385   // members in te default instance
386   std::vector<ThreadPoolImpl> thread_pools_storage_;
387   pthread_mutex_t mu_storage_;
388   std::vector<pthread_t> threads_to_join_storage_;
389   bool allow_non_owner_access_storage_;
390 
391   std::vector<ThreadPoolImpl>& thread_pools_;
392   pthread_mutex_t& mu_;
393   std::vector<pthread_t>& threads_to_join_;
394   // If true, allow non owner read access for db files. Otherwise, non-owner
395   //  has no access to db files.
396   bool& allow_non_owner_access_;
397 };
398 
PosixEnv()399 PosixEnv::PosixEnv()
400     : CompositeEnv(FileSystem::Default(), SystemClock::Default()),
401       thread_pools_storage_(Priority::TOTAL),
402       allow_non_owner_access_storage_(true),
403       thread_pools_(thread_pools_storage_),
404       mu_(mu_storage_),
405       threads_to_join_(threads_to_join_storage_),
406       allow_non_owner_access_(allow_non_owner_access_storage_) {
407   ThreadPoolImpl::PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr));
408   for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) {
409     thread_pools_[pool_id].SetThreadPriority(
410         static_cast<Env::Priority>(pool_id));
411     // This allows later initializing the thread-local-env of each thread.
412     thread_pools_[pool_id].SetHostEnv(this);
413   }
414   thread_status_updater_ = CreateThreadStatusUpdater();
415 }
416 
PosixEnv(const PosixEnv * default_env,const std::shared_ptr<FileSystem> & fs)417 PosixEnv::PosixEnv(const PosixEnv* default_env,
418                    const std::shared_ptr<FileSystem>& fs)
419     : CompositeEnv(fs, default_env->GetSystemClock()),
420       thread_pools_(default_env->thread_pools_),
421       mu_(default_env->mu_),
422       threads_to_join_(default_env->threads_to_join_),
423       allow_non_owner_access_(default_env->allow_non_owner_access_) {
424   thread_status_updater_ = default_env->thread_status_updater_;
425 }
426 
Schedule(void (* function)(void * arg1),void * arg,Priority pri,void * tag,void (* unschedFunction)(void * arg))427 void PosixEnv::Schedule(void (*function)(void* arg1), void* arg, Priority pri,
428                         void* tag, void (*unschedFunction)(void* arg)) {
429   assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
430   thread_pools_[pri].Schedule(function, arg, tag, unschedFunction);
431 }
432 
UnSchedule(void * arg,Priority pri)433 int PosixEnv::UnSchedule(void* arg, Priority pri) {
434   return thread_pools_[pri].UnSchedule(arg);
435 }
436 
GetThreadPoolQueueLen(Priority pri) const437 unsigned int PosixEnv::GetThreadPoolQueueLen(Priority pri) const {
438   assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
439   return thread_pools_[pri].GetQueueLen();
440 }
441 
442 struct StartThreadState {
443   void (*user_function)(void*);
444   void* arg;
445 };
446 
StartThreadWrapper(void * arg)447 static void* StartThreadWrapper(void* arg) {
448   StartThreadState* state = reinterpret_cast<StartThreadState*>(arg);
449   state->user_function(state->arg);
450   delete state;
451   return nullptr;
452 }
453 
StartThread(void (* function)(void * arg),void * arg)454 void PosixEnv::StartThread(void (*function)(void* arg), void* arg) {
455   pthread_t t;
456   StartThreadState* state = new StartThreadState;
457   state->user_function = function;
458   state->arg = arg;
459   ThreadPoolImpl::PthreadCall(
460       "start thread", pthread_create(&t, nullptr, &StartThreadWrapper, state));
461   ThreadPoolImpl::PthreadCall("lock", pthread_mutex_lock(&mu_));
462   threads_to_join_.push_back(t);
463   ThreadPoolImpl::PthreadCall("unlock", pthread_mutex_unlock(&mu_));
464 }
465 
WaitForJoin()466 void PosixEnv::WaitForJoin() {
467   for (const auto tid : threads_to_join_) {
468     pthread_join(tid, nullptr);
469   }
470   threads_to_join_.clear();
471 }
472 
473 }  // namespace
474 
475 //
476 // Default Posix Env
477 //
Default()478 Env* Env::Default() {
479   // The following function call initializes the singletons of ThreadLocalPtr
480   // right before the static default_env.  This guarantees default_env will
481   // always being destructed before the ThreadLocalPtr singletons get
482   // destructed as C++ guarantees that the destructions of static variables
483   // is in the reverse order of their constructions.
484   //
485   // Since static members are destructed in the reverse order
486   // of their construction, having this call here guarantees that
487   // the destructor of static PosixEnv will go first, then the
488   // the singletons of ThreadLocalPtr.
489   ThreadLocalPtr::InitSingletons();
490   CompressionContextCache::InitSingleton();
491   INIT_SYNC_POINT_SINGLETONS();
492   // ~PosixEnv must be called on exit
493   static PosixEnv default_env;
494   return &default_env;
495 }
496 
NewCompositeEnv(const std::shared_ptr<FileSystem> & fs)497 std::unique_ptr<Env> NewCompositeEnv(const std::shared_ptr<FileSystem>& fs) {
498   PosixEnv* default_env = static_cast<PosixEnv*>(Env::Default());
499   return std::unique_ptr<Env>(new PosixEnv(default_env, fs));
500 }
501 
502 //
503 // Default Posix SystemClock
504 //
Default()505 const std::shared_ptr<SystemClock>& SystemClock::Default() {
506   static std::shared_ptr<SystemClock> default_clock =
507       std::make_shared<PosixClock>();
508   return default_clock;
509 }
510 }  // namespace ROCKSDB_NAMESPACE
511 
512 #endif
513