1 //===- llvm/Support/Parallel.cpp - Parallel algorithms --------------------===//
2 //
3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4 // See https://llvm.org/LICENSE.txt for license information.
5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6 //
7 //===----------------------------------------------------------------------===//
8 
9 #include "llvm/Support/Parallel.h"
10 #include "llvm/Config/llvm-config.h"
11 #include "llvm/Support/ManagedStatic.h"
12 #include "llvm/Support/Threading.h"
13 
14 #include <atomic>
15 #include <deque>
16 #include <future>
17 #include <thread>
18 #include <vector>
19 
20 llvm::ThreadPoolStrategy llvm::parallel::strategy;
21 
22 namespace llvm {
23 namespace parallel {
24 #if LLVM_ENABLE_THREADS
25 
26 #ifdef _WIN32
27 static thread_local unsigned threadIndex = UINT_MAX;
28 
29 unsigned getThreadIndex() { GET_THREAD_INDEX_IMPL; }
30 #else
31 thread_local unsigned threadIndex = UINT_MAX;
32 #endif
33 
34 namespace detail {
35 
36 namespace {
37 
38 /// An abstract class that takes closures and runs them asynchronously.
39 class Executor {
40 public:
41   virtual ~Executor() = default;
42   virtual void add(std::function<void()> func, bool Sequential = false) = 0;
43   virtual size_t getThreadCount() const = 0;
44 
45   static Executor *getDefaultExecutor();
46 };
47 
48 /// An implementation of an Executor that runs closures on a thread pool
49 ///   in filo order.
50 class ThreadPoolExecutor : public Executor {
51 public:
52   explicit ThreadPoolExecutor(ThreadPoolStrategy S = hardware_concurrency()) {
53     ThreadCount = S.compute_thread_count();
54     // Spawn all but one of the threads in another thread as spawning threads
55     // can take a while.
56     Threads.reserve(ThreadCount);
57     Threads.resize(1);
58     std::lock_guard<std::mutex> Lock(Mutex);
59     // Use operator[] before creating the thread to avoid data race in .size()
60     // in “safe libc++” mode.
61     auto &Thread0 = Threads[0];
62     Thread0 = std::thread([this, S] {
63       for (unsigned I = 1; I < ThreadCount; ++I) {
64         Threads.emplace_back([=] { work(S, I); });
65         if (Stop)
66           break;
67       }
68       ThreadsCreated.set_value();
69       work(S, 0);
70     });
71   }
72 
73   void stop() {
74     {
75       std::lock_guard<std::mutex> Lock(Mutex);
76       if (Stop)
77         return;
78       Stop = true;
79     }
80     Cond.notify_all();
81     ThreadsCreated.get_future().wait();
82   }
83 
84   ~ThreadPoolExecutor() override {
85     stop();
86     std::thread::id CurrentThreadId = std::this_thread::get_id();
87     for (std::thread &T : Threads)
88       if (T.get_id() == CurrentThreadId)
89         T.detach();
90       else
91         T.join();
92   }
93 
94   struct Creator {
95     static void *call() { return new ThreadPoolExecutor(strategy); }
96   };
97   struct Deleter {
98     static void call(void *Ptr) { ((ThreadPoolExecutor *)Ptr)->stop(); }
99   };
100 
101   void add(std::function<void()> F, bool Sequential = false) override {
102     {
103       std::lock_guard<std::mutex> Lock(Mutex);
104       if (Sequential)
105         WorkQueueSequential.emplace_front(std::move(F));
106       else
107         WorkQueue.emplace_back(std::move(F));
108     }
109     Cond.notify_one();
110   }
111 
112   size_t getThreadCount() const override { return ThreadCount; }
113 
114 private:
115   bool hasSequentialTasks() const {
116     return !WorkQueueSequential.empty() && !SequentialQueueIsLocked;
117   }
118 
119   bool hasGeneralTasks() const { return !WorkQueue.empty(); }
120 
121   void work(ThreadPoolStrategy S, unsigned ThreadID) {
122     threadIndex = ThreadID;
123     S.apply_thread_strategy(ThreadID);
124     while (true) {
125       std::unique_lock<std::mutex> Lock(Mutex);
126       Cond.wait(Lock, [&] {
127         return Stop || hasGeneralTasks() || hasSequentialTasks();
128       });
129       if (Stop)
130         break;
131       bool Sequential = hasSequentialTasks();
132       if (Sequential)
133         SequentialQueueIsLocked = true;
134       else
135         assert(hasGeneralTasks());
136 
137       auto &Queue = Sequential ? WorkQueueSequential : WorkQueue;
138       auto Task = std::move(Queue.back());
139       Queue.pop_back();
140       Lock.unlock();
141       Task();
142       if (Sequential)
143         SequentialQueueIsLocked = false;
144     }
145   }
146 
147   std::atomic<bool> Stop{false};
148   std::atomic<bool> SequentialQueueIsLocked{false};
149   std::deque<std::function<void()>> WorkQueue;
150   std::deque<std::function<void()>> WorkQueueSequential;
151   std::mutex Mutex;
152   std::condition_variable Cond;
153   std::promise<void> ThreadsCreated;
154   std::vector<std::thread> Threads;
155   unsigned ThreadCount;
156 };
157 
158 Executor *Executor::getDefaultExecutor() {
159   // The ManagedStatic enables the ThreadPoolExecutor to be stopped via
160   // llvm_shutdown() which allows a "clean" fast exit, e.g. via _exit(). This
161   // stops the thread pool and waits for any worker thread creation to complete
162   // but does not wait for the threads to finish. The wait for worker thread
163   // creation to complete is important as it prevents intermittent crashes on
164   // Windows due to a race condition between thread creation and process exit.
165   //
166   // The ThreadPoolExecutor will only be destroyed when the static unique_ptr to
167   // it is destroyed, i.e. in a normal full exit. The ThreadPoolExecutor
168   // destructor ensures it has been stopped and waits for worker threads to
169   // finish. The wait is important as it prevents intermittent crashes on
170   // Windows when the process is doing a full exit.
171   //
172   // The Windows crashes appear to only occur with the MSVC static runtimes and
173   // are more frequent with the debug static runtime.
174   //
175   // This also prevents intermittent deadlocks on exit with the MinGW runtime.
176 
177   static ManagedStatic<ThreadPoolExecutor, ThreadPoolExecutor::Creator,
178                        ThreadPoolExecutor::Deleter>
179       ManagedExec;
180   static std::unique_ptr<ThreadPoolExecutor> Exec(&(*ManagedExec));
181   return Exec.get();
182 }
183 } // namespace
184 } // namespace detail
185 
186 size_t getThreadCount() {
187   return detail::Executor::getDefaultExecutor()->getThreadCount();
188 }
189 #endif
190 
191 // Latch::sync() called by the dtor may cause one thread to block. If is a dead
192 // lock if all threads in the default executor are blocked. To prevent the dead
193 // lock, only allow the root TaskGroup to run tasks parallelly. In the scenario
194 // of nested parallel_for_each(), only the outermost one runs parallelly.
195 TaskGroup::TaskGroup()
196 #if LLVM_ENABLE_THREADS
197     : Parallel((parallel::strategy.ThreadsRequested != 1) &&
198                (threadIndex == UINT_MAX)) {}
199 #else
200     : Parallel(false) {}
201 #endif
202 TaskGroup::~TaskGroup() {
203   // We must ensure that all the workloads have finished before decrementing the
204   // instances count.
205   L.sync();
206 }
207 
208 void TaskGroup::spawn(std::function<void()> F, bool Sequential) {
209 #if LLVM_ENABLE_THREADS
210   if (Parallel) {
211     L.inc();
212     detail::Executor::getDefaultExecutor()->add(
213         [&, F = std::move(F)] {
214           F();
215           L.dec();
216         },
217         Sequential);
218     return;
219   }
220 #endif
221   F();
222 }
223 
224 } // namespace parallel
225 } // namespace llvm
226 
227 void llvm::parallelFor(size_t Begin, size_t End,
228                        llvm::function_ref<void(size_t)> Fn) {
229 #if LLVM_ENABLE_THREADS
230   if (parallel::strategy.ThreadsRequested != 1) {
231     auto NumItems = End - Begin;
232     // Limit the number of tasks to MaxTasksPerGroup to limit job scheduling
233     // overhead on large inputs.
234     auto TaskSize = NumItems / parallel::detail::MaxTasksPerGroup;
235     if (TaskSize == 0)
236       TaskSize = 1;
237 
238     parallel::TaskGroup TG;
239     for (; Begin + TaskSize < End; Begin += TaskSize) {
240       TG.spawn([=, &Fn] {
241         for (size_t I = Begin, E = Begin + TaskSize; I != E; ++I)
242           Fn(I);
243       });
244     }
245     if (Begin != End) {
246       TG.spawn([=, &Fn] {
247         for (size_t I = Begin; I != End; ++I)
248           Fn(I);
249       });
250     }
251     return;
252   }
253 #endif
254 
255   for (; Begin != End; ++Begin)
256     Fn(Begin);
257 }
258