1 //===-- llvm/Support/ThreadPool.h - A ThreadPool implementation -*- C++ -*-===// 2 // 3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. 4 // See https://llvm.org/LICENSE.txt for license information. 5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception 6 // 7 //===----------------------------------------------------------------------===// 8 // 9 // This file defines a crude C++11 based thread pool. 10 // 11 //===----------------------------------------------------------------------===// 12 13 #ifndef LLVM_SUPPORT_THREADPOOL_H 14 #define LLVM_SUPPORT_THREADPOOL_H 15 16 #include "llvm/ADT/DenseMap.h" 17 #include "llvm/Config/llvm-config.h" 18 #include "llvm/Support/RWMutex.h" 19 #include "llvm/Support/Threading.h" 20 #include "llvm/Support/thread.h" 21 22 #include <future> 23 24 #include <condition_variable> 25 #include <deque> 26 #include <functional> 27 #include <memory> 28 #include <mutex> 29 #include <utility> 30 31 namespace llvm { 32 33 class ThreadPoolTaskGroup; 34 35 /// A ThreadPool for asynchronous parallel execution on a defined number of 36 /// threads. 37 /// 38 /// The pool keeps a vector of threads alive, waiting on a condition variable 39 /// for some work to become available. 40 /// 41 /// It is possible to reuse one thread pool for different groups of tasks 42 /// by grouping tasks using ThreadPoolTaskGroup. All tasks are processed using 43 /// the same queue, but it is possible to wait only for a specific group of 44 /// tasks to finish. 45 /// 46 /// It is also possible for worker threads to submit new tasks and wait for 47 /// them. Note that this may result in a deadlock in cases such as when a task 48 /// (directly or indirectly) tries to wait for its own completion, or when all 49 /// available threads are used up by tasks waiting for a task that has no thread 50 /// left to run on (this includes waiting on the returned future). It should be 51 /// generally safe to wait() for a group as long as groups do not form a cycle. 52 class ThreadPool { 53 public: 54 /// Construct a pool using the hardware strategy \p S for mapping hardware 55 /// execution resources (threads, cores, CPUs) 56 /// Defaults to using the maximum execution resources in the system, but 57 /// accounting for the affinity mask. 58 ThreadPool(ThreadPoolStrategy S = hardware_concurrency()); 59 60 /// Blocking destructor: the pool will wait for all the threads to complete. 61 ~ThreadPool(); 62 63 /// Asynchronous submission of a task to the pool. The returned future can be 64 /// used to wait for the task to finish and is *non-blocking* on destruction. 65 template <typename Function, typename... Args> 66 auto async(Function &&F, Args &&...ArgList) { 67 auto Task = 68 std::bind(std::forward<Function>(F), std::forward<Args>(ArgList)...); 69 return async(std::move(Task)); 70 } 71 72 /// Overload, task will be in the given task group. 73 template <typename Function, typename... Args> 74 auto async(ThreadPoolTaskGroup &Group, Function &&F, Args &&...ArgList) { 75 auto Task = 76 std::bind(std::forward<Function>(F), std::forward<Args>(ArgList)...); 77 return async(Group, std::move(Task)); 78 } 79 80 /// Asynchronous submission of a task to the pool. The returned future can be 81 /// used to wait for the task to finish and is *non-blocking* on destruction. 82 template <typename Func> 83 auto async(Func &&F) -> std::shared_future<decltype(F())> { 84 return asyncImpl(std::function<decltype(F())()>(std::forward<Func>(F)), 85 nullptr); 86 } 87 88 template <typename Func> 89 auto async(ThreadPoolTaskGroup &Group, Func &&F) 90 -> std::shared_future<decltype(F())> { 91 return asyncImpl(std::function<decltype(F())()>(std::forward<Func>(F)), 92 &Group); 93 } 94 95 /// Blocking wait for all the threads to complete and the queue to be empty. 96 /// It is an error to try to add new tasks while blocking on this call. 97 /// Calling wait() from a task would deadlock waiting for itself. 98 void wait(); 99 100 /// Blocking wait for only all the threads in the given group to complete. 101 /// It is possible to wait even inside a task, but waiting (directly or 102 /// indirectly) on itself will deadlock. If called from a task running on a 103 /// worker thread, the call may process pending tasks while waiting in order 104 /// not to waste the thread. 105 void wait(ThreadPoolTaskGroup &Group); 106 107 // TODO: misleading legacy name warning! 108 // Returns the maximum number of worker threads in the pool, not the current 109 // number of threads! 110 unsigned getThreadCount() const { return MaxThreadCount; } 111 112 /// Returns true if the current thread is a worker thread of this thread pool. 113 bool isWorkerThread() const; 114 115 private: 116 /// Helpers to create a promise and a callable wrapper of \p Task that sets 117 /// the result of the promise. Returns the callable and a future to access the 118 /// result. 119 template <typename ResTy> 120 static std::pair<std::function<void()>, std::future<ResTy>> 121 createTaskAndFuture(std::function<ResTy()> Task) { 122 std::shared_ptr<std::promise<ResTy>> Promise = 123 std::make_shared<std::promise<ResTy>>(); 124 auto F = Promise->get_future(); 125 return { 126 [Promise = std::move(Promise), Task]() { Promise->set_value(Task()); }, 127 std::move(F)}; 128 } 129 static std::pair<std::function<void()>, std::future<void>> 130 createTaskAndFuture(std::function<void()> Task) { 131 std::shared_ptr<std::promise<void>> Promise = 132 std::make_shared<std::promise<void>>(); 133 auto F = Promise->get_future(); 134 return {[Promise = std::move(Promise), Task]() { 135 Task(); 136 Promise->set_value(); 137 }, 138 std::move(F)}; 139 } 140 141 /// Returns true if all tasks in the given group have finished (nullptr means 142 /// all tasks regardless of their group). QueueLock must be locked. 143 bool workCompletedUnlocked(ThreadPoolTaskGroup *Group) const; 144 145 /// Asynchronous submission of a task to the pool. The returned future can be 146 /// used to wait for the task to finish and is *non-blocking* on destruction. 147 template <typename ResTy> 148 std::shared_future<ResTy> asyncImpl(std::function<ResTy()> Task, 149 ThreadPoolTaskGroup *Group) { 150 151 #if LLVM_ENABLE_THREADS 152 /// Wrap the Task in a std::function<void()> that sets the result of the 153 /// corresponding future. 154 auto R = createTaskAndFuture(Task); 155 156 int requestedThreads; 157 { 158 // Lock the queue and push the new task 159 std::unique_lock<std::mutex> LockGuard(QueueLock); 160 161 // Don't allow enqueueing after disabling the pool 162 assert(EnableFlag && "Queuing a thread during ThreadPool destruction"); 163 Tasks.emplace_back(std::make_pair(std::move(R.first), Group)); 164 requestedThreads = ActiveThreads + Tasks.size(); 165 } 166 QueueCondition.notify_one(); 167 grow(requestedThreads); 168 return R.second.share(); 169 170 #else // LLVM_ENABLE_THREADS Disabled 171 172 // Get a Future with launch::deferred execution using std::async 173 auto Future = std::async(std::launch::deferred, std::move(Task)).share(); 174 // Wrap the future so that both ThreadPool::wait() can operate and the 175 // returned future can be sync'ed on. 176 Tasks.emplace_back(std::make_pair([Future]() { Future.get(); }, Group)); 177 return Future; 178 #endif 179 } 180 181 #if LLVM_ENABLE_THREADS 182 // Grow to ensure that we have at least `requested` Threads, but do not go 183 // over MaxThreadCount. 184 void grow(int requested); 185 186 void processTasks(ThreadPoolTaskGroup *WaitingForGroup); 187 #endif 188 189 /// Threads in flight 190 std::vector<llvm::thread> Threads; 191 /// Lock protecting access to the Threads vector. 192 mutable llvm::sys::RWMutex ThreadsLock; 193 194 /// Tasks waiting for execution in the pool. 195 std::deque<std::pair<std::function<void()>, ThreadPoolTaskGroup *>> Tasks; 196 197 /// Locking and signaling for accessing the Tasks queue. 198 std::mutex QueueLock; 199 std::condition_variable QueueCondition; 200 201 /// Signaling for job completion (all tasks or all tasks in a group). 202 std::condition_variable CompletionCondition; 203 204 /// Keep track of the number of thread actually busy 205 unsigned ActiveThreads = 0; 206 /// Number of threads active for tasks in the given group (only non-zero). 207 DenseMap<ThreadPoolTaskGroup *, unsigned> ActiveGroups; 208 209 #if LLVM_ENABLE_THREADS // avoids warning for unused variable 210 /// Signal for the destruction of the pool, asking thread to exit. 211 bool EnableFlag = true; 212 #endif 213 214 const ThreadPoolStrategy Strategy; 215 216 /// Maximum number of threads to potentially grow this pool to. 217 const unsigned MaxThreadCount; 218 }; 219 220 /// A group of tasks to be run on a thread pool. Thread pool tasks in different 221 /// groups can run on the same threadpool but can be waited for separately. 222 /// It is even possible for tasks of one group to submit and wait for tasks 223 /// of another group, as long as this does not form a loop. 224 class ThreadPoolTaskGroup { 225 public: 226 /// The ThreadPool argument is the thread pool to forward calls to. 227 ThreadPoolTaskGroup(ThreadPool &Pool) : Pool(Pool) {} 228 229 /// Blocking destructor: will wait for all the tasks in the group to complete 230 /// by calling ThreadPool::wait(). 231 ~ThreadPoolTaskGroup() { wait(); } 232 233 /// Calls ThreadPool::async() for this group. 234 template <typename Function, typename... Args> 235 inline auto async(Function &&F, Args &&...ArgList) { 236 return Pool.async(*this, std::forward<Function>(F), 237 std::forward<Args>(ArgList)...); 238 } 239 240 /// Calls ThreadPool::wait() for this group. 241 void wait() { Pool.wait(*this); } 242 243 private: 244 ThreadPool &Pool; 245 }; 246 247 } // namespace llvm 248 249 #endif // LLVM_SUPPORT_THREADPOOL_H 250