1 //===- llvm/Support/Parallel.h - Parallel algorithms ----------------------===//
2 //
3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4 // See https://llvm.org/LICENSE.txt for license information.
5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6 //
7 //===----------------------------------------------------------------------===//
8 
9 #ifndef LLVM_SUPPORT_PARALLEL_H
10 #define LLVM_SUPPORT_PARALLEL_H
11 
12 #include "llvm/ADT/STLExtras.h"
13 #include "llvm/Config/llvm-config.h"
14 #include "llvm/Support/Error.h"
15 #include "llvm/Support/MathExtras.h"
16 #include "llvm/Support/Threading.h"
17 
18 #include <algorithm>
19 #include <condition_variable>
20 #include <functional>
21 #include <mutex>
22 
23 namespace llvm {
24 
25 namespace parallel {
26 
27 // Strategy for the default executor used by the parallel routines provided by
28 // this file. It defaults to using all hardware threads and should be
29 // initialized before the first use of parallel routines.
30 extern ThreadPoolStrategy strategy;
31 
32 namespace detail {
33 
34 #if LLVM_ENABLE_THREADS
35 
36 class Latch {
37   uint32_t Count;
38   mutable std::mutex Mutex;
39   mutable std::condition_variable Cond;
40 
41 public:
Count(Count)42   explicit Latch(uint32_t Count = 0) : Count(Count) {}
~Latch()43   ~Latch() {
44     // Ensure at least that sync() was called.
45     assert(Count == 0);
46   }
47 
inc()48   void inc() {
49     std::lock_guard<std::mutex> lock(Mutex);
50     ++Count;
51   }
52 
dec()53   void dec() {
54     std::lock_guard<std::mutex> lock(Mutex);
55     if (--Count == 0)
56       Cond.notify_all();
57   }
58 
sync()59   void sync() const {
60     std::unique_lock<std::mutex> lock(Mutex);
61     Cond.wait(lock, [&] { return Count == 0; });
62   }
63 };
64 
65 class TaskGroup {
66   Latch L;
67   bool Parallel;
68 
69 public:
70   TaskGroup();
71   ~TaskGroup();
72 
73   void spawn(std::function<void()> f);
74 
sync()75   void sync() const { L.sync(); }
76 };
77 
78 const ptrdiff_t MinParallelSize = 1024;
79 
80 /// Inclusive median.
81 template <class RandomAccessIterator, class Comparator>
medianOf3(RandomAccessIterator Start,RandomAccessIterator End,const Comparator & Comp)82 RandomAccessIterator medianOf3(RandomAccessIterator Start,
83                                RandomAccessIterator End,
84                                const Comparator &Comp) {
85   RandomAccessIterator Mid = Start + (std::distance(Start, End) / 2);
86   return Comp(*Start, *(End - 1))
87              ? (Comp(*Mid, *(End - 1)) ? (Comp(*Start, *Mid) ? Mid : Start)
88                                        : End - 1)
89              : (Comp(*Mid, *Start) ? (Comp(*(End - 1), *Mid) ? Mid : End - 1)
90                                    : Start);
91 }
92 
93 template <class RandomAccessIterator, class Comparator>
parallel_quick_sort(RandomAccessIterator Start,RandomAccessIterator End,const Comparator & Comp,TaskGroup & TG,size_t Depth)94 void parallel_quick_sort(RandomAccessIterator Start, RandomAccessIterator End,
95                          const Comparator &Comp, TaskGroup &TG, size_t Depth) {
96   // Do a sequential sort for small inputs.
97   if (std::distance(Start, End) < detail::MinParallelSize || Depth == 0) {
98     llvm::sort(Start, End, Comp);
99     return;
100   }
101 
102   // Partition.
103   auto Pivot = medianOf3(Start, End, Comp);
104   // Move Pivot to End.
105   std::swap(*(End - 1), *Pivot);
106   Pivot = std::partition(Start, End - 1, [&Comp, End](decltype(*Start) V) {
107     return Comp(V, *(End - 1));
108   });
109   // Move Pivot to middle of partition.
110   std::swap(*Pivot, *(End - 1));
111 
112   // Recurse.
113   TG.spawn([=, &Comp, &TG] {
114     parallel_quick_sort(Start, Pivot, Comp, TG, Depth - 1);
115   });
116   parallel_quick_sort(Pivot + 1, End, Comp, TG, Depth - 1);
117 }
118 
119 template <class RandomAccessIterator, class Comparator>
parallel_sort(RandomAccessIterator Start,RandomAccessIterator End,const Comparator & Comp)120 void parallel_sort(RandomAccessIterator Start, RandomAccessIterator End,
121                    const Comparator &Comp) {
122   TaskGroup TG;
123   parallel_quick_sort(Start, End, Comp, TG,
124                       llvm::Log2_64(std::distance(Start, End)) + 1);
125 }
126 
127 // TaskGroup has a relatively high overhead, so we want to reduce
128 // the number of spawn() calls. We'll create up to 1024 tasks here.
129 // (Note that 1024 is an arbitrary number. This code probably needs
130 // improving to take the number of available cores into account.)
131 enum { MaxTasksPerGroup = 1024 };
132 
133 template <class IterTy, class FuncTy>
parallel_for_each(IterTy Begin,IterTy End,FuncTy Fn)134 void parallel_for_each(IterTy Begin, IterTy End, FuncTy Fn) {
135   // If we have zero or one items, then do not incur the overhead of spinning up
136   // a task group.  They are surprisingly expensive, and because they do not
137   // support nested parallelism, a single entry task group can block parallel
138   // execution underneath them.
139   auto NumItems = std::distance(Begin, End);
140   if (NumItems <= 1) {
141     if (NumItems)
142       Fn(*Begin);
143     return;
144   }
145 
146   // Limit the number of tasks to MaxTasksPerGroup to limit job scheduling
147   // overhead on large inputs.
148   ptrdiff_t TaskSize = NumItems / MaxTasksPerGroup;
149   if (TaskSize == 0)
150     TaskSize = 1;
151 
152   TaskGroup TG;
153   while (TaskSize < std::distance(Begin, End)) {
154     TG.spawn([=, &Fn] { std::for_each(Begin, Begin + TaskSize, Fn); });
155     Begin += TaskSize;
156   }
157   std::for_each(Begin, End, Fn);
158 }
159 
160 template <class IndexTy, class FuncTy>
parallel_for_each_n(IndexTy Begin,IndexTy End,FuncTy Fn)161 void parallel_for_each_n(IndexTy Begin, IndexTy End, FuncTy Fn) {
162   // If we have zero or one items, then do not incur the overhead of spinning up
163   // a task group.  They are surprisingly expensive, and because they do not
164   // support nested parallelism, a single entry task group can block parallel
165   // execution underneath them.
166   auto NumItems = End - Begin;
167   if (NumItems <= 1) {
168     if (NumItems)
169       Fn(Begin);
170     return;
171   }
172 
173   // Limit the number of tasks to MaxTasksPerGroup to limit job scheduling
174   // overhead on large inputs.
175   ptrdiff_t TaskSize = NumItems / MaxTasksPerGroup;
176   if (TaskSize == 0)
177     TaskSize = 1;
178 
179   TaskGroup TG;
180   IndexTy I = Begin;
181   for (; I + TaskSize < End; I += TaskSize) {
182     TG.spawn([=, &Fn] {
183       for (IndexTy J = I, E = I + TaskSize; J != E; ++J)
184         Fn(J);
185     });
186   }
187   for (IndexTy J = I; J < End; ++J)
188     Fn(J);
189 }
190 
191 template <class IterTy, class ResultTy, class ReduceFuncTy,
192           class TransformFuncTy>
parallel_transform_reduce(IterTy Begin,IterTy End,ResultTy Init,ReduceFuncTy Reduce,TransformFuncTy Transform)193 ResultTy parallel_transform_reduce(IterTy Begin, IterTy End, ResultTy Init,
194                                    ReduceFuncTy Reduce,
195                                    TransformFuncTy Transform) {
196   // Limit the number of tasks to MaxTasksPerGroup to limit job scheduling
197   // overhead on large inputs.
198   size_t NumInputs = std::distance(Begin, End);
199   if (NumInputs == 0)
200     return std::move(Init);
201   size_t NumTasks = std::min(static_cast<size_t>(MaxTasksPerGroup), NumInputs);
202   std::vector<ResultTy> Results(NumTasks, Init);
203   {
204     // Each task processes either TaskSize or TaskSize+1 inputs. Any inputs
205     // remaining after dividing them equally amongst tasks are distributed as
206     // one extra input over the first tasks.
207     TaskGroup TG;
208     size_t TaskSize = NumInputs / NumTasks;
209     size_t RemainingInputs = NumInputs % NumTasks;
210     IterTy TBegin = Begin;
211     for (size_t TaskId = 0; TaskId < NumTasks; ++TaskId) {
212       IterTy TEnd = TBegin + TaskSize + (TaskId < RemainingInputs ? 1 : 0);
213       TG.spawn([=, &Transform, &Reduce, &Results] {
214         // Reduce the result of transformation eagerly within each task.
215         ResultTy R = Init;
216         for (IterTy It = TBegin; It != TEnd; ++It)
217           R = Reduce(R, Transform(*It));
218         Results[TaskId] = R;
219       });
220       TBegin = TEnd;
221     }
222     assert(TBegin == End);
223   }
224 
225   // Do a final reduction. There are at most 1024 tasks, so this only adds
226   // constant single-threaded overhead for large inputs. Hopefully most
227   // reductions are cheaper than the transformation.
228   ResultTy FinalResult = std::move(Results.front());
229   for (ResultTy &PartialResult :
230        makeMutableArrayRef(Results.data() + 1, Results.size() - 1))
231     FinalResult = Reduce(FinalResult, std::move(PartialResult));
232   return std::move(FinalResult);
233 }
234 
235 #endif
236 
237 } // namespace detail
238 } // namespace parallel
239 
240 template <class RandomAccessIterator,
241           class Comparator = std::less<
242               typename std::iterator_traits<RandomAccessIterator>::value_type>>
243 void parallelSort(RandomAccessIterator Start, RandomAccessIterator End,
244                   const Comparator &Comp = Comparator()) {
245 #if LLVM_ENABLE_THREADS
246   if (parallel::strategy.ThreadsRequested != 1) {
247     parallel::detail::parallel_sort(Start, End, Comp);
248     return;
249   }
250 #endif
251   llvm::sort(Start, End, Comp);
252 }
253 
254 template <class IterTy, class FuncTy>
parallelForEach(IterTy Begin,IterTy End,FuncTy Fn)255 void parallelForEach(IterTy Begin, IterTy End, FuncTy Fn) {
256 #if LLVM_ENABLE_THREADS
257   if (parallel::strategy.ThreadsRequested != 1) {
258     parallel::detail::parallel_for_each(Begin, End, Fn);
259     return;
260   }
261 #endif
262   std::for_each(Begin, End, Fn);
263 }
264 
265 template <class FuncTy>
parallelForEachN(size_t Begin,size_t End,FuncTy Fn)266 void parallelForEachN(size_t Begin, size_t End, FuncTy Fn) {
267 #if LLVM_ENABLE_THREADS
268   if (parallel::strategy.ThreadsRequested != 1) {
269     parallel::detail::parallel_for_each_n(Begin, End, Fn);
270     return;
271   }
272 #endif
273   for (size_t I = Begin; I != End; ++I)
274     Fn(I);
275 }
276 
277 template <class IterTy, class ResultTy, class ReduceFuncTy,
278           class TransformFuncTy>
parallelTransformReduce(IterTy Begin,IterTy End,ResultTy Init,ReduceFuncTy Reduce,TransformFuncTy Transform)279 ResultTy parallelTransformReduce(IterTy Begin, IterTy End, ResultTy Init,
280                                  ReduceFuncTy Reduce,
281                                  TransformFuncTy Transform) {
282 #if LLVM_ENABLE_THREADS
283   if (parallel::strategy.ThreadsRequested != 1) {
284     return parallel::detail::parallel_transform_reduce(Begin, End, Init, Reduce,
285                                                        Transform);
286   }
287 #endif
288   for (IterTy I = Begin; I != End; ++I)
289     Init = Reduce(std::move(Init), Transform(*I));
290   return std::move(Init);
291 }
292 
293 // Range wrappers.
294 template <class RangeTy,
295           class Comparator = std::less<decltype(*std::begin(RangeTy()))>>
296 void parallelSort(RangeTy &&R, const Comparator &Comp = Comparator()) {
297   parallelSort(std::begin(R), std::end(R), Comp);
298 }
299 
300 template <class RangeTy, class FuncTy>
parallelForEach(RangeTy && R,FuncTy Fn)301 void parallelForEach(RangeTy &&R, FuncTy Fn) {
302   parallelForEach(std::begin(R), std::end(R), Fn);
303 }
304 
305 template <class RangeTy, class ResultTy, class ReduceFuncTy,
306           class TransformFuncTy>
parallelTransformReduce(RangeTy && R,ResultTy Init,ReduceFuncTy Reduce,TransformFuncTy Transform)307 ResultTy parallelTransformReduce(RangeTy &&R, ResultTy Init,
308                                  ReduceFuncTy Reduce,
309                                  TransformFuncTy Transform) {
310   return parallelTransformReduce(std::begin(R), std::end(R), Init, Reduce,
311                                  Transform);
312 }
313 
314 // Parallel for-each, but with error handling.
315 template <class RangeTy, class FuncTy>
parallelForEachError(RangeTy && R,FuncTy Fn)316 Error parallelForEachError(RangeTy &&R, FuncTy Fn) {
317   // The transform_reduce algorithm requires that the initial value be copyable.
318   // Error objects are uncopyable. We only need to copy initial success values,
319   // so work around this mismatch via the C API. The C API represents success
320   // values with a null pointer. The joinErrors discards null values and joins
321   // multiple errors into an ErrorList.
322   return unwrap(parallelTransformReduce(
323       std::begin(R), std::end(R), wrap(Error::success()),
324       [](LLVMErrorRef Lhs, LLVMErrorRef Rhs) {
325         return wrap(joinErrors(unwrap(Lhs), unwrap(Rhs)));
326       },
327       [&Fn](auto &&V) { return wrap(Fn(V)); }));
328 }
329 
330 } // namespace llvm
331 
332 #endif // LLVM_SUPPORT_PARALLEL_H
333