1 /* Copyright 2017-present Facebook, Inc. 2 * Licensed under the Apache License, Version 2.0 */ 3 #include "ThreadPool.h" 4 #include "watchman_log.h" 5 6 namespace watchman { 7 getThreadPool()8ThreadPool& getThreadPool() { 9 static ThreadPool pool; 10 return pool; 11 } 12 ~ThreadPool()13ThreadPool::~ThreadPool() { 14 stop(); 15 } 16 start(size_t numWorkers,size_t maxItems)17void ThreadPool::start(size_t numWorkers, size_t maxItems) { 18 std::unique_lock<std::mutex> lock(mutex_); 19 if (!workers_.empty()) { 20 throw std::runtime_error("ThreadPool already started"); 21 } 22 if (stopping_) { 23 throw std::runtime_error("Cannot restart a stopped pool"); 24 } 25 maxItems_ = maxItems; 26 27 for (auto i = 0U; i < numWorkers; ++i) { 28 workers_.emplace_back([this, i] { 29 w_set_thread_name("ThreadPool-%i", i); 30 runWorker(); 31 }); 32 } 33 } 34 runWorker()35void ThreadPool::runWorker() { 36 while (true) { 37 std::function<void()> task; 38 39 { 40 std::unique_lock<std::mutex> lock(mutex_); 41 condition_.wait(lock, [this] { return stopping_ || !tasks_.empty(); }); 42 if (stopping_ && tasks_.empty()) { 43 return; 44 } 45 task = std::move(tasks_.front()); 46 tasks_.pop_front(); 47 } 48 49 task(); 50 } 51 } 52 stop(bool join)53void ThreadPool::stop(bool join) { 54 { 55 std::unique_lock<std::mutex> lock(mutex_); 56 stopping_ = true; 57 } 58 condition_.notify_all(); 59 60 if (join) { 61 for (auto& worker : workers_) { 62 worker.join(); 63 } 64 } 65 } 66 run(std::function<void ()> && func)67void ThreadPool::run(std::function<void()>&& func) { 68 { 69 std::unique_lock<std::mutex> lock(mutex_); 70 if (stopping_) { 71 throw std::runtime_error("cannot add tasks after pool has stopped"); 72 } 73 if (tasks_.size() + 1 >= maxItems_) { 74 throw std::runtime_error("thread pool queue is full"); 75 } 76 77 tasks_.emplace_back(std::move(func)); 78 } 79 80 condition_.notify_one(); 81 } 82 } 83