1
2 #ifndef _THREADPOOL_H
3 #define _THREADPOOL_H
4
5
6 #ifndef THREADPOOL
7
8 //#include <boost/thread/future.hpp>
9 #include <functional>
10
11 namespace ThreadPool {
12 //template<class F, class... Args>
13 //static inline auto enqueue(F&& f, Args&&... args)
14 //-> std::shared_ptr<boost::unique_future<typename std::result_of<F(Args...)>::type>> {}
15
SetThreadCount(int num)16 static inline void SetThreadCount(int num) {}
SetThreadSpinTime(int milliSeconds)17 static inline void SetThreadSpinTime(int milliSeconds) {}
GetThreadNum()18 static inline int GetThreadNum() { return 0; }
GetMaxThreads()19 static inline int GetMaxThreads() { return 1; }
GetNumThreads()20 static inline int GetNumThreads() { return 1; }
NotifyWorkerThreads()21 static inline void NotifyWorkerThreads() {}
22 }
23
24
for_mt(int start,int end,int step,const std::function<void (const int i)> && f)25 static inline void for_mt(int start, int end, int step, const std::function<void(const int i)>&& f)
26 {
27 for (int i = start; i < end; i += step) {
28 f(i);
29 }
30 }
31
32
for_mt(int start,int end,const std::function<void (const int i)> && f)33 static inline void for_mt(int start, int end, const std::function<void(const int i)>&& f)
34 {
35 for_mt(start, end, 1, std::move(f));
36 }
37
38
parallel(const std::function<void ()> && f)39 static inline void parallel(const std::function<void()>&& f)
40 {
41 f();
42 }
43
44
45 template<class F, class G>
46 static inline auto parallel_reduce(F&& f, G&& g) -> typename std::result_of<F()>::type
47 {
48 return f();
49 }
50
51 #else
52
53 #include "TimeProfiler.h"
54 #include "System/Log/ILog.h"
55 #include "System/Platform/Threading.h"
56
57 #include <deque>
58 #include <vector>
59 #include <list>
60 #include <boost/optional.hpp>
61 #include <numeric>
62 #include <atomic>
63
64 // mingw is missing c++11 thread support atm, so for KISS always prefer boost atm
65 #include <boost/thread/future.hpp>
66 #undef gt
67 #include <boost/chrono/include.hpp>
68 #include <boost/utility.hpp>
69 #include <memory>
70
71 #ifdef UNITSYNC
72 #undef SCOPED_MT_TIMER
73 #define SCOPED_MT_TIMER(x)
74 #endif
75
76
77
78 class ITaskGroup
79 {
80 public:
~ITaskGroup()81 virtual ~ITaskGroup() {}
82
83 virtual boost::optional<std::function<void()>> GetTask() = 0;
84 virtual bool IsFinished() const = 0;
85 virtual bool IsEmpty() const = 0;
86
87 virtual int RemainingTasks() const = 0;
88
89 template< class Rep, class Period >
wait_for(const boost::chrono::duration<Rep,Period> & rel_time)90 bool wait_for(const boost::chrono::duration<Rep, Period>& rel_time) const {
91 const auto end = boost::chrono::high_resolution_clock::now() + rel_time;
92 while (!IsFinished() && (boost::chrono::high_resolution_clock::now() < end)) {
93 }
94 return IsFinished();
95 }
96 private:
97 //virtual void FinishedATask() = 0;
98 };
99
100
101 namespace ThreadPool {
102 template<class F, class... Args>
103 static auto enqueue(F&& f, Args&&... args)
104 -> std::shared_ptr<boost::unique_future<typename std::result_of<F(Args...)>::type>>;
105
106 void PushTaskGroup(std::shared_ptr<ITaskGroup> taskgroup);
107 void WaitForFinished(std::shared_ptr<ITaskGroup> taskgroup);
108
109 template<typename T>
PushTaskGroup(std::shared_ptr<T> taskgroup)110 inline void PushTaskGroup(std::shared_ptr<T> taskgroup) { PushTaskGroup(std::static_pointer_cast<ITaskGroup>(taskgroup)); }
111 template<typename T>
WaitForFinished(std::shared_ptr<T> taskgroup)112 inline void WaitForFinished(std::shared_ptr<T> taskgroup) { WaitForFinished(std::static_pointer_cast<ITaskGroup>(taskgroup)); }
113
114 void SetThreadCount(int num);
115 void SetThreadSpinTime(int milliSeconds);
116 int GetThreadNum();
117 bool HasThreads();
118 int GetMaxThreads();
119 int GetNumThreads();
120 void NotifyWorkerThreads();
121 }
122
123
124 template<class F, class... Args>
125 class SingleTask : public ITaskGroup
126 {
127 public:
128 typedef typename std::result_of<F(Args...)>::type return_type;
129
SingleTask(F && f,Args &&...args)130 SingleTask(F&& f, Args&&... args) : finished(false), done(false) {
131 auto p = std::make_shared<boost::packaged_task<return_type>>(
132 std::bind(std::forward<F>(f), std::forward<Args>(args)...)
133 );
134 result = std::make_shared<boost::unique_future<return_type>>(p->get_future());
135 task = [&,p]{ (*p)(); finished = true; };
136 }
137
GetTask()138 boost::optional<std::function<void()>> GetTask() { return (!std::atomic_exchange(&done, true)) ? boost::optional<std::function<void()>>(task) : boost::optional<std::function<void()>>(); }
139
IsEmpty()140 bool IsEmpty() const { return done; }
IsFinished()141 bool IsFinished() const { return finished; }
RemainingTasks()142 int RemainingTasks() const { return done ? 0 : 1; }
GetFuture()143 std::shared_ptr<boost::unique_future<return_type>> GetFuture() { assert(result->valid()); return std::move(result); } //FIXME rethrow exceptions some time
144
145 private:
146 //void FinishedATask() { finished = true; }
147
148 public:
149 std::atomic<bool> finished;
150 std::atomic<bool> done;
151 std::function<void()> task;
152 std::shared_ptr<boost::unique_future<return_type>> result;
153 };
154
155
156 template<class F, class... Args>
157 class TaskGroup : public ITaskGroup
158 {
159 public:
160 TaskGroup(const int num = 0) : remainingTasks(0), curtask(0), latency(0) {
161 //start = boost::chrono::high_resolution_clock::now();
162 results.reserve(num);
163 tasks.reserve(num);
164 }
165
~TaskGroup()166 virtual ~TaskGroup() {}
167
168 typedef typename std::result_of<F(Args...)>::type return_type;
169
enqueue(F & f,Args &...args)170 void enqueue(F& f, Args&... args)
171 {
172 auto task = std::make_shared<boost::packaged_task<return_type>>(
173 std::bind(f, args ...)
174 );
175 results.emplace_back(task->get_future());
176 // workaround a Fedora gcc bug else it reports in the lambda below:
177 // error: no 'operator--(int)' declared for postfix '--'
178 auto* atomicCounter = &remainingTasks;
179 tasks.emplace_back([task,atomicCounter]{ (*task)(); (*atomicCounter)--; });
180 remainingTasks++;
181 }
182
enqueue(F && f,Args &&...args)183 void enqueue(F&& f, Args&&... args)
184 {
185 auto task = std::make_shared< boost::packaged_task<return_type> >(
186 std::bind(std::forward<F>(f), std::forward<Args>(args)...)
187 );
188 results.emplace_back(task->get_future());
189 // workaround a Fedora gcc bug else it reports in the lambda below:
190 // error: no 'operator--(int)' declared for postfix '--'
191 auto* atomicCounter = &remainingTasks;
192 tasks.emplace_back([task,atomicCounter]{ (*task)(); (*atomicCounter)--; });
193 remainingTasks++;
194 }
195
196
GetTask()197 virtual boost::optional<std::function<void()>> GetTask()
198 {
199 const int pos = curtask++;
200 if (pos < tasks.size()) {
201 /*if (latency.count() == 0) {
202 auto now = boost::chrono::high_resolution_clock::now();
203 latency = (now - start);
204 LOG("latency %fms", latency.count() / 1000000.f);
205 }*/
206 return tasks[pos];
207 }
208 return boost::optional<std::function<void()>>();
209 }
210
IsEmpty()211 virtual bool IsEmpty() const { return curtask >= tasks.size() /*tasks.empty()*/; }
IsFinished()212 bool IsFinished() const { return (remainingTasks == 0); }
RemainingTasks()213 int RemainingTasks() const { return remainingTasks; }
214
215 template<typename G>
GetResult(const G && g)216 return_type GetResult(const G&& g) {
217 return std::accumulate(results.begin(), results.end(), 0, g);
218 }
219
220 private:
221 //void FinishedATask() { remainingTasks--; }
222
223 public:
224 std::atomic<int> remainingTasks;
225 std::atomic<int> curtask;
226 std::vector<std::function<void()>> tasks;
227 std::vector<boost::unique_future<return_type>> results;
228
229 boost::chrono::time_point<boost::chrono::high_resolution_clock> start; // use for latency profiling!
230 boost::chrono::nanoseconds latency;
231 };
232
233
234 template<class F, class... Args>
235 class ParallelTaskGroup : public TaskGroup<F,Args...>
236 {
237 public:
238 ParallelTaskGroup(const int num = 0) : TaskGroup<F,Args...>(num) {
239 uniqueTasks.resize(ThreadPool::GetNumThreads());
240 }
241
242 typedef typename std::result_of<F(Args...)>::type return_type;
243
enqueue_unique(const int threadNum,F & f,Args &...args)244 void enqueue_unique(const int threadNum, F& f, Args&... args)
245 {
246 auto task = std::make_shared< boost::packaged_task<return_type> >(
247 std::bind(std::forward<F>(f), std::forward<Args>(args)...)
248 );
249 this->results.emplace_back(task->get_future());
250 uniqueTasks[threadNum].emplace_back([&,task]{ (*task)(); (this->remainingTasks)--; });
251 this->remainingTasks++;
252 }
253
enqueue_unique(const int threadNum,F && f,Args &&...args)254 void enqueue_unique(const int threadNum, F&& f, Args&&... args)
255 {
256 auto task = std::make_shared< boost::packaged_task<return_type> >(
257 std::bind(std::forward<F>(f), std::forward<Args>(args)...)
258 );
259 this->results.emplace_back(task->get_future());
260 uniqueTasks[threadNum].emplace_back([&,task]{ (*task)(); (this->remainingTasks)--; });
261 this->remainingTasks++;
262 }
263
264
GetTask()265 boost::optional<std::function<void()>> GetTask()
266 {
267 auto& ut = uniqueTasks[ThreadPool::GetThreadNum()];
268 if (!ut.empty()) {
269 // no need to make threadsafe cause each thread got its own container
270 auto t = ut.front();
271 ut.pop_front();
272 return t;
273 }
274
275 return TaskGroup<F,Args...>::GetTask();
276 }
277
IsEmpty()278 bool IsEmpty() const {
279 for(auto& ut: uniqueTasks) { if (!ut.empty()) return false; }
280 return TaskGroup<F,Args...>::IsEmpty();
281 }
282
283 public:
284 std::vector<std::deque<std::function<void()>>> uniqueTasks;
285 };
286
287
288
for_mt(int start,int end,int step,const std::function<void (const int i)> && f)289 static inline void for_mt(int start, int end, int step, const std::function<void(const int i)>&& f)
290 {
291 if (end <= start)
292 return;
293
294 if ((end - start) < step) {
295 // single iteration -> directly process
296 f(start);
297 return;
298 }
299
300 // do not use HasThreads because that counts main as a worker
301 if (!ThreadPool::HasThreads()) {
302 for (int i = start; i < end; i += step) {
303 f(i);
304 }
305 return;
306 }
307
308 ThreadPool::NotifyWorkerThreads();
309 SCOPED_MT_TIMER("::ThreadWorkers (real)");
310 auto taskgroup = std::make_shared<TaskGroup<const std::function<void(const int)>, const int>>((end-start)/step);
311 for (int i = start; i < end; i += step) { //FIXME optimize worksize (group tasks in bigger ones than 1-steps)
312 taskgroup->enqueue(f, i);
313 }
314 ThreadPool::PushTaskGroup(taskgroup);
315 ThreadPool::WaitForFinished(taskgroup);
316 }
317
318
for_mt(int start,int end,const std::function<void (const int i)> && f)319 static inline void for_mt(int start, int end, const std::function<void(const int i)>&& f)
320 {
321 for_mt(start, end, 1, std::move(f));
322 }
323
324
parallel(const std::function<void ()> && f)325 static inline void parallel(const std::function<void()>&& f)
326 {
327 if (!ThreadPool::HasThreads())
328 return f();
329
330 ThreadPool::NotifyWorkerThreads();
331 SCOPED_MT_TIMER("::ThreadWorkers (real)");
332
333 auto taskgroup = std::make_shared<ParallelTaskGroup<const std::function<void()>>>();
334 for (int i = 0; i < ThreadPool::GetNumThreads(); ++i) {
335 taskgroup->enqueue_unique(i, f);
336 }
337 ThreadPool::PushTaskGroup(taskgroup);
338 ThreadPool::WaitForFinished(taskgroup);
339 }
340
341
342 template<class F, class G>
343 static inline auto parallel_reduce(F&& f, G&& g) -> typename std::result_of<F()>::type
344 {
345 if (!ThreadPool::HasThreads())
346 return f();
347
348 ThreadPool::NotifyWorkerThreads();
349 SCOPED_MT_TIMER("::ThreadWorkers (real)");
350
351 auto taskgroup = std::make_shared<ParallelTaskGroup<F>>();
352 for (int i = 0; i < ThreadPool::GetNumThreads(); ++i) {
353 taskgroup->enqueue_unique(i, f);
354 }
355
356 ThreadPool::PushTaskGroup(taskgroup);
357 ThreadPool::WaitForFinished(taskgroup);
358 return taskgroup->GetResult(std::move(g));
359 }
360
361
362 namespace ThreadPool {
363 template<class F, class... Args>
364 static inline auto enqueue(F&& f, Args&&... args)
365 -> std::shared_ptr<boost::unique_future<typename std::result_of<F(Args...)>::type>>
366 {
367 typedef typename std::result_of<F(Args...)>::type return_type;
368
369 if (!ThreadPool::HasThreads()) {
370 // directly process when there are no worker threads
371 auto task = std::make_shared< boost::packaged_task<return_type> >(std::bind(f, args ...));
372 auto fut = std::make_shared<boost::unique_future<return_type>>(task->get_future());
373 (*task)();
374 return fut;
375 }
376
377 auto singletask = std::make_shared<SingleTask<F, Args...>>(std::forward<F>(f), std::forward<Args>(args)...);
378 ThreadPool::PushTaskGroup(singletask);
379 return singletask->GetFuture();
380 }
381 }
382
383 #endif
384 #endif
385