1 //===- llvm/Support/Parallel.cpp - Parallel algorithms --------------------===//
2 //
3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4 // See https://llvm.org/LICENSE.txt for license information.
5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6 //
7 //===----------------------------------------------------------------------===//
8
9 #include "llvm/Support/Parallel.h"
10 #include "llvm/Config/llvm-config.h"
11 #include "llvm/Support/ManagedStatic.h"
12 #include "llvm/Support/Threading.h"
13
14 #include <atomic>
15 #include <deque>
16 #include <future>
17 #include <thread>
18 #include <vector>
19
20 llvm::ThreadPoolStrategy llvm::parallel::strategy;
21
22 namespace llvm {
23 namespace parallel {
24 #if LLVM_ENABLE_THREADS
25
26 #ifdef _WIN32
27 static thread_local unsigned threadIndex = UINT_MAX;
28
getThreadIndex()29 unsigned getThreadIndex() { GET_THREAD_INDEX_IMPL; }
30 #else
31 thread_local unsigned threadIndex = UINT_MAX;
32 #endif
33
34 namespace detail {
35
36 namespace {
37
38 /// An abstract class that takes closures and runs them asynchronously.
39 class Executor {
40 public:
41 virtual ~Executor() = default;
42 virtual void add(std::function<void()> func, bool Sequential = false) = 0;
43 virtual size_t getThreadCount() const = 0;
44
45 static Executor *getDefaultExecutor();
46 };
47
48 /// An implementation of an Executor that runs closures on a thread pool
49 /// in filo order.
50 class ThreadPoolExecutor : public Executor {
51 public:
ThreadPoolExecutor(ThreadPoolStrategy S=hardware_concurrency ())52 explicit ThreadPoolExecutor(ThreadPoolStrategy S = hardware_concurrency()) {
53 ThreadCount = S.compute_thread_count();
54 // Spawn all but one of the threads in another thread as spawning threads
55 // can take a while.
56 Threads.reserve(ThreadCount);
57 Threads.resize(1);
58 std::lock_guard<std::mutex> Lock(Mutex);
59 // Use operator[] before creating the thread to avoid data race in .size()
60 // in “safe libc++” mode.
61 auto &Thread0 = Threads[0];
62 Thread0 = std::thread([this, S] {
63 for (unsigned I = 1; I < ThreadCount; ++I) {
64 Threads.emplace_back([=] { work(S, I); });
65 if (Stop)
66 break;
67 }
68 ThreadsCreated.set_value();
69 work(S, 0);
70 });
71 }
72
stop()73 void stop() {
74 {
75 std::lock_guard<std::mutex> Lock(Mutex);
76 if (Stop)
77 return;
78 Stop = true;
79 }
80 Cond.notify_all();
81 ThreadsCreated.get_future().wait();
82 }
83
~ThreadPoolExecutor()84 ~ThreadPoolExecutor() override {
85 stop();
86 std::thread::id CurrentThreadId = std::this_thread::get_id();
87 for (std::thread &T : Threads)
88 if (T.get_id() == CurrentThreadId)
89 T.detach();
90 else
91 T.join();
92 }
93
94 struct Creator {
callllvm::parallel::detail::__anon1037b7310111::ThreadPoolExecutor::Creator95 static void *call() { return new ThreadPoolExecutor(strategy); }
96 };
97 struct Deleter {
callllvm::parallel::detail::__anon1037b7310111::ThreadPoolExecutor::Deleter98 static void call(void *Ptr) { ((ThreadPoolExecutor *)Ptr)->stop(); }
99 };
100
add(std::function<void ()> F,bool Sequential=false)101 void add(std::function<void()> F, bool Sequential = false) override {
102 {
103 std::lock_guard<std::mutex> Lock(Mutex);
104 if (Sequential)
105 WorkQueueSequential.emplace_front(std::move(F));
106 else
107 WorkQueue.emplace_back(std::move(F));
108 }
109 Cond.notify_one();
110 }
111
getThreadCount() const112 size_t getThreadCount() const override { return ThreadCount; }
113
114 private:
hasSequentialTasks() const115 bool hasSequentialTasks() const {
116 return !WorkQueueSequential.empty() && !SequentialQueueIsLocked;
117 }
118
hasGeneralTasks() const119 bool hasGeneralTasks() const { return !WorkQueue.empty(); }
120
work(ThreadPoolStrategy S,unsigned ThreadID)121 void work(ThreadPoolStrategy S, unsigned ThreadID) {
122 threadIndex = ThreadID;
123 S.apply_thread_strategy(ThreadID);
124 while (true) {
125 std::unique_lock<std::mutex> Lock(Mutex);
126 Cond.wait(Lock, [&] {
127 return Stop || hasGeneralTasks() || hasSequentialTasks();
128 });
129 if (Stop)
130 break;
131 bool Sequential = hasSequentialTasks();
132 if (Sequential)
133 SequentialQueueIsLocked = true;
134 else
135 assert(hasGeneralTasks());
136
137 auto &Queue = Sequential ? WorkQueueSequential : WorkQueue;
138 auto Task = std::move(Queue.back());
139 Queue.pop_back();
140 Lock.unlock();
141 Task();
142 if (Sequential)
143 SequentialQueueIsLocked = false;
144 }
145 }
146
147 std::atomic<bool> Stop{false};
148 std::atomic<bool> SequentialQueueIsLocked{false};
149 std::deque<std::function<void()>> WorkQueue;
150 std::deque<std::function<void()>> WorkQueueSequential;
151 std::mutex Mutex;
152 std::condition_variable Cond;
153 std::promise<void> ThreadsCreated;
154 std::vector<std::thread> Threads;
155 unsigned ThreadCount;
156 };
157
getDefaultExecutor()158 Executor *Executor::getDefaultExecutor() {
159 // The ManagedStatic enables the ThreadPoolExecutor to be stopped via
160 // llvm_shutdown() which allows a "clean" fast exit, e.g. via _exit(). This
161 // stops the thread pool and waits for any worker thread creation to complete
162 // but does not wait for the threads to finish. The wait for worker thread
163 // creation to complete is important as it prevents intermittent crashes on
164 // Windows due to a race condition between thread creation and process exit.
165 //
166 // The ThreadPoolExecutor will only be destroyed when the static unique_ptr to
167 // it is destroyed, i.e. in a normal full exit. The ThreadPoolExecutor
168 // destructor ensures it has been stopped and waits for worker threads to
169 // finish. The wait is important as it prevents intermittent crashes on
170 // Windows when the process is doing a full exit.
171 //
172 // The Windows crashes appear to only occur with the MSVC static runtimes and
173 // are more frequent with the debug static runtime.
174 //
175 // This also prevents intermittent deadlocks on exit with the MinGW runtime.
176
177 static ManagedStatic<ThreadPoolExecutor, ThreadPoolExecutor::Creator,
178 ThreadPoolExecutor::Deleter>
179 ManagedExec;
180 static std::unique_ptr<ThreadPoolExecutor> Exec(&(*ManagedExec));
181 return Exec.get();
182 }
183 } // namespace
184 } // namespace detail
185
getThreadCount()186 size_t getThreadCount() {
187 return detail::Executor::getDefaultExecutor()->getThreadCount();
188 }
189 #endif
190
191 // Latch::sync() called by the dtor may cause one thread to block. If is a dead
192 // lock if all threads in the default executor are blocked. To prevent the dead
193 // lock, only allow the root TaskGroup to run tasks parallelly. In the scenario
194 // of nested parallel_for_each(), only the outermost one runs parallelly.
TaskGroup()195 TaskGroup::TaskGroup()
196 #if LLVM_ENABLE_THREADS
197 : Parallel((parallel::strategy.ThreadsRequested != 1) &&
198 (threadIndex == UINT_MAX)) {}
199 #else
200 : Parallel(false) {}
201 #endif
~TaskGroup()202 TaskGroup::~TaskGroup() {
203 // We must ensure that all the workloads have finished before decrementing the
204 // instances count.
205 L.sync();
206 }
207
spawn(std::function<void ()> F,bool Sequential)208 void TaskGroup::spawn(std::function<void()> F, bool Sequential) {
209 #if LLVM_ENABLE_THREADS
210 if (Parallel) {
211 L.inc();
212 detail::Executor::getDefaultExecutor()->add(
213 [&, F = std::move(F)] {
214 F();
215 L.dec();
216 },
217 Sequential);
218 return;
219 }
220 #endif
221 F();
222 }
223
224 } // namespace parallel
225 } // namespace llvm
226
parallelFor(size_t Begin,size_t End,llvm::function_ref<void (size_t)> Fn)227 void llvm::parallelFor(size_t Begin, size_t End,
228 llvm::function_ref<void(size_t)> Fn) {
229 #if LLVM_ENABLE_THREADS
230 if (parallel::strategy.ThreadsRequested != 1) {
231 auto NumItems = End - Begin;
232 // Limit the number of tasks to MaxTasksPerGroup to limit job scheduling
233 // overhead on large inputs.
234 auto TaskSize = NumItems / parallel::detail::MaxTasksPerGroup;
235 if (TaskSize == 0)
236 TaskSize = 1;
237
238 parallel::TaskGroup TG;
239 for (; Begin + TaskSize < End; Begin += TaskSize) {
240 TG.spawn([=, &Fn] {
241 for (size_t I = Begin, E = Begin + TaskSize; I != E; ++I)
242 Fn(I);
243 });
244 }
245 if (Begin != End) {
246 TG.spawn([=, &Fn] {
247 for (size_t I = Begin; I != End; ++I)
248 Fn(I);
249 });
250 }
251 return;
252 }
253 #endif
254
255 for (; Begin != End; ++Begin)
256 Fn(Begin);
257 }
258