1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "util/threadpool_imp.h"
11 
12 #include "monitoring/thread_status_util.h"
13 #include "port/port.h"
14 
15 #ifndef OS_WIN
16 #  include <unistd.h>
17 #endif
18 
19 #ifdef OS_LINUX
20 #  include <sys/syscall.h>
21 #  include <sys/resource.h>
22 #endif
23 
24 #include <stdlib.h>
25 #include <algorithm>
26 #include <atomic>
27 #include <condition_variable>
28 #include <deque>
29 #include <mutex>
30 #include <sstream>
31 #include <thread>
32 #include <vector>
33 
34 namespace ROCKSDB_NAMESPACE {
35 
PthreadCall(const char * label,int result)36 void ThreadPoolImpl::PthreadCall(const char* label, int result) {
37   if (result != 0) {
38     fprintf(stderr, "pthread %s: %s\n", label, strerror(result));
39     abort();
40   }
41 }
42 
43 struct ThreadPoolImpl::Impl {
44 
45   Impl();
46   ~Impl();
47 
48   void JoinThreads(bool wait_for_jobs_to_complete);
49 
50   void SetBackgroundThreadsInternal(int num, bool allow_reduce);
51   int GetBackgroundThreads();
52 
GetQueueLenROCKSDB_NAMESPACE::ThreadPoolImpl::Impl53   unsigned int GetQueueLen() const {
54     return queue_len_.load(std::memory_order_relaxed);
55   }
56 
57   void LowerIOPriority();
58 
59   void LowerCPUPriority();
60 
WakeUpAllThreadsROCKSDB_NAMESPACE::ThreadPoolImpl::Impl61   void WakeUpAllThreads() {
62     bgsignal_.notify_all();
63   }
64 
65   void BGThread(size_t thread_id);
66 
67   void StartBGThreads();
68 
69   void Submit(std::function<void()>&& schedule,
70     std::function<void()>&& unschedule, void* tag);
71 
72   int UnSchedule(void* arg);
73 
SetHostEnvROCKSDB_NAMESPACE::ThreadPoolImpl::Impl74   void SetHostEnv(Env* env) { env_ = env; }
75 
GetHostEnvROCKSDB_NAMESPACE::ThreadPoolImpl::Impl76   Env* GetHostEnv() const { return env_; }
77 
HasExcessiveThreadROCKSDB_NAMESPACE::ThreadPoolImpl::Impl78   bool HasExcessiveThread() const {
79     return static_cast<int>(bgthreads_.size()) > total_threads_limit_;
80   }
81 
82   // Return true iff the current thread is the excessive thread to terminate.
83   // Always terminate the running thread that is added last, even if there are
84   // more than one thread to terminate.
IsLastExcessiveThreadROCKSDB_NAMESPACE::ThreadPoolImpl::Impl85   bool IsLastExcessiveThread(size_t thread_id) const {
86     return HasExcessiveThread() && thread_id == bgthreads_.size() - 1;
87   }
88 
IsExcessiveThreadROCKSDB_NAMESPACE::ThreadPoolImpl::Impl89   bool IsExcessiveThread(size_t thread_id) const {
90     return static_cast<int>(thread_id) >= total_threads_limit_;
91   }
92 
93   // Return the thread priority.
94   // This would allow its member-thread to know its priority.
GetThreadPriorityROCKSDB_NAMESPACE::ThreadPoolImpl::Impl95   Env::Priority GetThreadPriority() const { return priority_; }
96 
97   // Set the thread priority.
SetThreadPriorityROCKSDB_NAMESPACE::ThreadPoolImpl::Impl98   void SetThreadPriority(Env::Priority priority) { priority_ = priority; }
99 
100 private:
101  static void BGThreadWrapper(void* arg);
102 
103  bool low_io_priority_;
104  bool low_cpu_priority_;
105  Env::Priority priority_;
106  Env* env_;
107 
108  int total_threads_limit_;
109  std::atomic_uint queue_len_;  // Queue length. Used for stats reporting
110  bool exit_all_threads_;
111  bool wait_for_jobs_to_complete_;
112 
113  // Entry per Schedule()/Submit() call
114  struct BGItem {
115    void* tag = nullptr;
116    std::function<void()> function;
117    std::function<void()> unschedFunction;
118   };
119 
120   using BGQueue = std::deque<BGItem>;
121   BGQueue       queue_;
122 
123   std::mutex               mu_;
124   std::condition_variable  bgsignal_;
125   std::vector<port::Thread> bgthreads_;
126 };
127 
128 
129 inline
Impl()130 ThreadPoolImpl::Impl::Impl()
131     :
132       low_io_priority_(false),
133       low_cpu_priority_(false),
134       priority_(Env::LOW),
135       env_(nullptr),
136       total_threads_limit_(0),
137       queue_len_(),
138       exit_all_threads_(false),
139       wait_for_jobs_to_complete_(false),
140       queue_(),
141       mu_(),
142       bgsignal_(),
143       bgthreads_() {
144 }
145 
146 inline
~Impl()147 ThreadPoolImpl::Impl::~Impl() { assert(bgthreads_.size() == 0U); }
148 
JoinThreads(bool wait_for_jobs_to_complete)149 void ThreadPoolImpl::Impl::JoinThreads(bool wait_for_jobs_to_complete) {
150 
151   std::unique_lock<std::mutex> lock(mu_);
152   assert(!exit_all_threads_);
153 
154   wait_for_jobs_to_complete_ = wait_for_jobs_to_complete;
155   exit_all_threads_ = true;
156   // prevent threads from being recreated right after they're joined, in case
157   // the user is concurrently submitting jobs.
158   total_threads_limit_ = 0;
159 
160   lock.unlock();
161 
162   bgsignal_.notify_all();
163 
164   for (auto& th : bgthreads_) {
165     th.join();
166   }
167 
168   bgthreads_.clear();
169 
170   exit_all_threads_ = false;
171   wait_for_jobs_to_complete_ = false;
172 }
173 
174 inline
LowerIOPriority()175 void ThreadPoolImpl::Impl::LowerIOPriority() {
176   std::lock_guard<std::mutex> lock(mu_);
177   low_io_priority_ = true;
178 }
179 
180 inline
LowerCPUPriority()181 void ThreadPoolImpl::Impl::LowerCPUPriority() {
182   std::lock_guard<std::mutex> lock(mu_);
183   low_cpu_priority_ = true;
184 }
185 
BGThread(size_t thread_id)186 void ThreadPoolImpl::Impl::BGThread(size_t thread_id) {
187   bool low_io_priority = false;
188   bool low_cpu_priority = false;
189 
190   while (true) {
191     // Wait until there is an item that is ready to run
192     std::unique_lock<std::mutex> lock(mu_);
193     // Stop waiting if the thread needs to do work or needs to terminate.
194     while (!exit_all_threads_ && !IsLastExcessiveThread(thread_id) &&
195            (queue_.empty() || IsExcessiveThread(thread_id))) {
196       bgsignal_.wait(lock);
197     }
198 
199     if (exit_all_threads_) {  // mechanism to let BG threads exit safely
200 
201       if (!wait_for_jobs_to_complete_ ||
202           queue_.empty()) {
203         break;
204        }
205     }
206 
207     if (IsLastExcessiveThread(thread_id)) {
208       // Current thread is the last generated one and is excessive.
209       // We always terminate excessive thread in the reverse order of
210       // generation time.
211       auto& terminating_thread = bgthreads_.back();
212       terminating_thread.detach();
213       bgthreads_.pop_back();
214 
215       if (HasExcessiveThread()) {
216         // There is still at least more excessive thread to terminate.
217         WakeUpAllThreads();
218       }
219       break;
220     }
221 
222     auto func = std::move(queue_.front().function);
223     queue_.pop_front();
224 
225     queue_len_.store(static_cast<unsigned int>(queue_.size()),
226                      std::memory_order_relaxed);
227 
228     bool decrease_io_priority = (low_io_priority != low_io_priority_);
229     bool decrease_cpu_priority = (low_cpu_priority != low_cpu_priority_);
230     lock.unlock();
231 
232 #ifdef OS_LINUX
233     if (decrease_cpu_priority) {
234       setpriority(
235           PRIO_PROCESS,
236           // Current thread.
237           0,
238           // Lowest priority possible.
239           19);
240       low_cpu_priority = true;
241     }
242 
243     if (decrease_io_priority) {
244 #define IOPRIO_CLASS_SHIFT (13)
245 #define IOPRIO_PRIO_VALUE(class, data) (((class) << IOPRIO_CLASS_SHIFT) | data)
246       // Put schedule into IOPRIO_CLASS_IDLE class (lowest)
247       // These system calls only have an effect when used in conjunction
248       // with an I/O scheduler that supports I/O priorities. As at
249       // kernel 2.6.17 the only such scheduler is the Completely
250       // Fair Queuing (CFQ) I/O scheduler.
251       // To change scheduler:
252       //  echo cfq > /sys/block/<device_name>/queue/schedule
253       // Tunables to consider:
254       //  /sys/block/<device_name>/queue/slice_idle
255       //  /sys/block/<device_name>/queue/slice_sync
256       syscall(SYS_ioprio_set, 1,  // IOPRIO_WHO_PROCESS
257               0,                  // current thread
258               IOPRIO_PRIO_VALUE(3, 0));
259       low_io_priority = true;
260     }
261 #else
262     (void)decrease_io_priority;  // avoid 'unused variable' error
263     (void)decrease_cpu_priority;
264 #endif
265     func();
266   }
267 }
268 
269 // Helper struct for passing arguments when creating threads.
270 struct BGThreadMetadata {
271   ThreadPoolImpl::Impl* thread_pool_;
272   size_t thread_id_;  // Thread count in the thread.
BGThreadMetadataROCKSDB_NAMESPACE::BGThreadMetadata273   BGThreadMetadata(ThreadPoolImpl::Impl* thread_pool, size_t thread_id)
274       : thread_pool_(thread_pool), thread_id_(thread_id) {}
275 };
276 
BGThreadWrapper(void * arg)277 void ThreadPoolImpl::Impl::BGThreadWrapper(void* arg) {
278   BGThreadMetadata* meta = reinterpret_cast<BGThreadMetadata*>(arg);
279   size_t thread_id = meta->thread_id_;
280   ThreadPoolImpl::Impl* tp = meta->thread_pool_;
281 #ifdef ROCKSDB_USING_THREAD_STATUS
282   // initialize it because compiler isn't good enough to see we don't use it
283   // uninitialized
284   ThreadStatus::ThreadType thread_type = ThreadStatus::NUM_THREAD_TYPES;
285   switch (tp->GetThreadPriority()) {
286     case Env::Priority::HIGH:
287       thread_type = ThreadStatus::HIGH_PRIORITY;
288       break;
289     case Env::Priority::LOW:
290       thread_type = ThreadStatus::LOW_PRIORITY;
291       break;
292     case Env::Priority::BOTTOM:
293       thread_type = ThreadStatus::BOTTOM_PRIORITY;
294       break;
295     case Env::Priority::USER:
296       thread_type = ThreadStatus::USER;
297       break;
298     case Env::Priority::TOTAL:
299       assert(false);
300       return;
301   }
302   assert(thread_type != ThreadStatus::NUM_THREAD_TYPES);
303   ThreadStatusUtil::RegisterThread(tp->GetHostEnv(), thread_type);
304 #endif
305   delete meta;
306   tp->BGThread(thread_id);
307 #ifdef ROCKSDB_USING_THREAD_STATUS
308   ThreadStatusUtil::UnregisterThread();
309 #endif
310   return;
311 }
312 
SetBackgroundThreadsInternal(int num,bool allow_reduce)313 void ThreadPoolImpl::Impl::SetBackgroundThreadsInternal(int num,
314   bool allow_reduce) {
315   std::lock_guard<std::mutex> lock(mu_);
316   if (exit_all_threads_) {
317     return;
318   }
319   if (num > total_threads_limit_ ||
320       (num < total_threads_limit_ && allow_reduce)) {
321     total_threads_limit_ = std::max(0, num);
322     WakeUpAllThreads();
323     StartBGThreads();
324   }
325 }
326 
GetBackgroundThreads()327 int ThreadPoolImpl::Impl::GetBackgroundThreads() {
328   std::unique_lock<std::mutex> lock(mu_);
329   return total_threads_limit_;
330 }
331 
StartBGThreads()332 void ThreadPoolImpl::Impl::StartBGThreads() {
333   // Start background thread if necessary
334   while ((int)bgthreads_.size() < total_threads_limit_) {
335 
336     port::Thread p_t(&BGThreadWrapper,
337       new BGThreadMetadata(this, bgthreads_.size()));
338 
339 // Set the thread name to aid debugging
340 #if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ)
341 #if __GLIBC_PREREQ(2, 12)
342     auto th_handle = p_t.native_handle();
343     std::string thread_priority = Env::PriorityToString(GetThreadPriority());
344     std::ostringstream thread_name_stream;
345     thread_name_stream << "rocksdb:";
346     for (char c : thread_priority) {
347       thread_name_stream << static_cast<char>(tolower(c));
348     }
349     thread_name_stream << bgthreads_.size();
350     pthread_setname_np(th_handle, thread_name_stream.str().c_str());
351 #endif
352 #endif
353     bgthreads_.push_back(std::move(p_t));
354   }
355 }
356 
Submit(std::function<void ()> && schedule,std::function<void ()> && unschedule,void * tag)357 void ThreadPoolImpl::Impl::Submit(std::function<void()>&& schedule,
358   std::function<void()>&& unschedule, void* tag) {
359 
360   std::lock_guard<std::mutex> lock(mu_);
361 
362   if (exit_all_threads_) {
363     return;
364   }
365 
366   StartBGThreads();
367 
368   // Add to priority queue
369   queue_.push_back(BGItem());
370 
371   auto& item = queue_.back();
372   item.tag = tag;
373   item.function = std::move(schedule);
374   item.unschedFunction = std::move(unschedule);
375 
376   queue_len_.store(static_cast<unsigned int>(queue_.size()),
377     std::memory_order_relaxed);
378 
379   if (!HasExcessiveThread()) {
380     // Wake up at least one waiting thread.
381     bgsignal_.notify_one();
382   } else {
383     // Need to wake up all threads to make sure the one woken
384     // up is not the one to terminate.
385     WakeUpAllThreads();
386   }
387 }
388 
UnSchedule(void * arg)389 int ThreadPoolImpl::Impl::UnSchedule(void* arg) {
390   int count = 0;
391 
392   std::vector<std::function<void()>> candidates;
393   {
394     std::lock_guard<std::mutex> lock(mu_);
395 
396     // Remove from priority queue
397     BGQueue::iterator it = queue_.begin();
398     while (it != queue_.end()) {
399       if (arg == (*it).tag) {
400         if (it->unschedFunction) {
401           candidates.push_back(std::move(it->unschedFunction));
402         }
403         it = queue_.erase(it);
404         count++;
405       } else {
406         ++it;
407       }
408     }
409     queue_len_.store(static_cast<unsigned int>(queue_.size()),
410       std::memory_order_relaxed);
411   }
412 
413 
414  // Run unschedule functions outside the mutex
415   for (auto& f : candidates) {
416     f();
417   }
418 
419   return count;
420 }
421 
ThreadPoolImpl()422 ThreadPoolImpl::ThreadPoolImpl() :
423   impl_(new Impl()) {
424 }
425 
426 
~ThreadPoolImpl()427 ThreadPoolImpl::~ThreadPoolImpl() {
428 }
429 
JoinAllThreads()430 void ThreadPoolImpl::JoinAllThreads() {
431   impl_->JoinThreads(false);
432 }
433 
SetBackgroundThreads(int num)434 void ThreadPoolImpl::SetBackgroundThreads(int num) {
435   impl_->SetBackgroundThreadsInternal(num, true);
436 }
437 
GetBackgroundThreads()438 int ThreadPoolImpl::GetBackgroundThreads() {
439   return impl_->GetBackgroundThreads();
440 }
441 
GetQueueLen() const442 unsigned int ThreadPoolImpl::GetQueueLen() const {
443   return impl_->GetQueueLen();
444 }
445 
WaitForJobsAndJoinAllThreads()446 void ThreadPoolImpl::WaitForJobsAndJoinAllThreads() {
447   impl_->JoinThreads(true);
448 }
449 
LowerIOPriority()450 void ThreadPoolImpl::LowerIOPriority() {
451   impl_->LowerIOPriority();
452 }
453 
LowerCPUPriority()454 void ThreadPoolImpl::LowerCPUPriority() {
455   impl_->LowerCPUPriority();
456 }
457 
IncBackgroundThreadsIfNeeded(int num)458 void ThreadPoolImpl::IncBackgroundThreadsIfNeeded(int num) {
459   impl_->SetBackgroundThreadsInternal(num, false);
460 }
461 
SubmitJob(const std::function<void ()> & job)462 void ThreadPoolImpl::SubmitJob(const std::function<void()>& job) {
463   auto copy(job);
464   impl_->Submit(std::move(copy), std::function<void()>(), nullptr);
465 }
466 
467 
SubmitJob(std::function<void ()> && job)468 void ThreadPoolImpl::SubmitJob(std::function<void()>&& job) {
469   impl_->Submit(std::move(job), std::function<void()>(), nullptr);
470 }
471 
Schedule(void (* function)(void * arg1),void * arg,void * tag,void (* unschedFunction)(void * arg))472 void ThreadPoolImpl::Schedule(void(*function)(void* arg1), void* arg,
473   void* tag, void(*unschedFunction)(void* arg)) {
474   if (unschedFunction == nullptr) {
475     impl_->Submit(std::bind(function, arg), std::function<void()>(), tag);
476   } else {
477     impl_->Submit(std::bind(function, arg), std::bind(unschedFunction, arg),
478                   tag);
479   }
480 }
481 
UnSchedule(void * arg)482 int ThreadPoolImpl::UnSchedule(void* arg) {
483   return impl_->UnSchedule(arg);
484 }
485 
SetHostEnv(Env * env)486 void ThreadPoolImpl::SetHostEnv(Env* env) { impl_->SetHostEnv(env); }
487 
GetHostEnv() const488 Env* ThreadPoolImpl::GetHostEnv() const { return impl_->GetHostEnv(); }
489 
490 // Return the thread priority.
491 // This would allow its member-thread to know its priority.
GetThreadPriority() const492 Env::Priority ThreadPoolImpl::GetThreadPriority() const {
493   return impl_->GetThreadPriority();
494 }
495 
496 // Set the thread priority.
SetThreadPriority(Env::Priority priority)497 void ThreadPoolImpl::SetThreadPriority(Env::Priority priority) {
498   impl_->SetThreadPriority(priority);
499 }
500 
NewThreadPool(int num_threads)501 ThreadPool* NewThreadPool(int num_threads) {
502   ThreadPoolImpl* thread_pool = new ThreadPoolImpl();
503   thread_pool->SetBackgroundThreads(num_threads);
504   return thread_pool;
505 }
506 
507 }  // namespace ROCKSDB_NAMESPACE
508