1 
2 #ifndef _THREADPOOL_H
3 #define _THREADPOOL_H
4 
5 
6 #ifndef THREADPOOL
7 
8 //#include <boost/thread/future.hpp>
9 #include  <functional>
10 
11 namespace ThreadPool {
12 	//template<class F, class... Args>
13 	//static inline auto enqueue(F&& f, Args&&... args)
14 	//-> std::shared_ptr<boost::unique_future<typename std::result_of<F(Args...)>::type>> {}
15 
SetThreadCount(int num)16 	static inline void SetThreadCount(int num) {}
SetThreadSpinTime(int milliSeconds)17 	static inline void SetThreadSpinTime(int milliSeconds) {}
GetThreadNum()18 	static inline int GetThreadNum() { return 0; }
GetMaxThreads()19 	static inline int GetMaxThreads() { return 1; }
GetNumThreads()20 	static inline int GetNumThreads() { return 1; }
NotifyWorkerThreads()21 	static inline void NotifyWorkerThreads() {}
22 }
23 
24 
for_mt(int start,int end,int step,const std::function<void (const int i)> && f)25 static inline void for_mt(int start, int end, int step, const std::function<void(const int i)>&& f)
26 {
27 	for (int i = start; i < end; i += step) {
28 		f(i);
29 	}
30 }
31 
32 
for_mt(int start,int end,const std::function<void (const int i)> && f)33 static inline void for_mt(int start, int end, const std::function<void(const int i)>&& f)
34 {
35 	for_mt(start, end, 1, std::move(f));
36 }
37 
38 
parallel(const std::function<void ()> && f)39 static inline void parallel(const std::function<void()>&& f)
40 {
41 	f();
42 }
43 
44 
45 template<class F, class G>
46 static inline auto parallel_reduce(F&& f, G&& g) -> typename std::result_of<F()>::type
47 {
48 	return f();
49 }
50 
51 #else
52 
53 #include "TimeProfiler.h"
54 #include "System/Log/ILog.h"
55 #include "System/Platform/Threading.h"
56 
57 #include <deque>
58 #include <vector>
59 #include <list>
60 #include <boost/optional.hpp>
61 #include <numeric>
62 #include <atomic>
63 
64 // mingw is missing c++11 thread support atm, so for KISS always prefer boost atm
65 #include <boost/thread/future.hpp>
66 #undef gt
67 #include <boost/chrono/include.hpp>
68 #include <boost/utility.hpp>
69 #include <memory>
70 
71 #ifdef UNITSYNC
72 	#undef SCOPED_MT_TIMER
73 	#define SCOPED_MT_TIMER(x)
74 #endif
75 
76 
77 
78 class ITaskGroup
79 {
80 public:
~ITaskGroup()81 	virtual ~ITaskGroup() {}
82 
83 	virtual boost::optional<std::function<void()>> GetTask() = 0;
84 	virtual bool IsFinished() const = 0;
85 	virtual bool IsEmpty() const = 0;
86 
87 	virtual int RemainingTasks() const = 0;
88 
89 	template< class Rep, class Period >
wait_for(const boost::chrono::duration<Rep,Period> & rel_time)90 	bool wait_for(const boost::chrono::duration<Rep, Period>& rel_time) const {
91 		const auto end = boost::chrono::high_resolution_clock::now() + rel_time;
92 		while (!IsFinished() && (boost::chrono::high_resolution_clock::now() < end)) {
93 		}
94 		return IsFinished();
95 	}
96 private:
97 	//virtual void FinishedATask() = 0;
98 };
99 
100 
101 namespace ThreadPool {
102 	template<class F, class... Args>
103 	static auto enqueue(F&& f, Args&&... args)
104 	-> std::shared_ptr<boost::unique_future<typename std::result_of<F(Args...)>::type>>;
105 
106 	void PushTaskGroup(std::shared_ptr<ITaskGroup> taskgroup);
107 	void WaitForFinished(std::shared_ptr<ITaskGroup> taskgroup);
108 
109 	template<typename T>
PushTaskGroup(std::shared_ptr<T> taskgroup)110 	inline void PushTaskGroup(std::shared_ptr<T> taskgroup) { PushTaskGroup(std::static_pointer_cast<ITaskGroup>(taskgroup)); }
111 	template<typename T>
WaitForFinished(std::shared_ptr<T> taskgroup)112 	inline void WaitForFinished(std::shared_ptr<T> taskgroup) { WaitForFinished(std::static_pointer_cast<ITaskGroup>(taskgroup)); }
113 
114 	void SetThreadCount(int num);
115 	void SetThreadSpinTime(int milliSeconds);
116 	int GetThreadNum();
117 	bool HasThreads();
118 	int GetMaxThreads();
119 	int GetNumThreads();
120 	void NotifyWorkerThreads();
121 }
122 
123 
124 template<class F, class... Args>
125 class SingleTask : public ITaskGroup
126 {
127 public:
128 	typedef typename std::result_of<F(Args...)>::type return_type;
129 
SingleTask(F && f,Args &&...args)130 	SingleTask(F&& f, Args&&... args) : finished(false), done(false) {
131 		auto p = std::make_shared<boost::packaged_task<return_type>>(
132 			std::bind(std::forward<F>(f), std::forward<Args>(args)...)
133 		);
134 		result = std::make_shared<boost::unique_future<return_type>>(p->get_future());
135 		task = [&,p]{ (*p)(); finished = true; };
136 	}
137 
GetTask()138 	boost::optional<std::function<void()>> GetTask() { return (!std::atomic_exchange(&done, true)) ? boost::optional<std::function<void()>>(task) : boost::optional<std::function<void()>>(); }
139 
IsEmpty()140 	bool IsEmpty() const    { return done; }
IsFinished()141 	bool IsFinished() const { return finished; }
RemainingTasks()142 	int RemainingTasks() const { return done ? 0 : 1; }
GetFuture()143 	std::shared_ptr<boost::unique_future<return_type>> GetFuture() { assert(result->valid()); return std::move(result); } //FIXME rethrow exceptions some time
144 
145 private:
146 	//void FinishedATask() { finished = true; }
147 
148 public:
149 	std::atomic<bool> finished;
150 	std::atomic<bool> done;
151 	std::function<void()> task;
152 	std::shared_ptr<boost::unique_future<return_type>> result;
153 };
154 
155 
156 template<class F, class... Args>
157 class TaskGroup : public ITaskGroup
158 {
159 public:
160 	TaskGroup(const int num = 0) : remainingTasks(0), curtask(0), latency(0) {
161 		//start = boost::chrono::high_resolution_clock::now();
162 		results.reserve(num);
163 		tasks.reserve(num);
164 	}
165 
~TaskGroup()166 	virtual ~TaskGroup() {}
167 
168 	typedef typename std::result_of<F(Args...)>::type return_type;
169 
enqueue(F & f,Args &...args)170 	void enqueue(F& f, Args&... args)
171 	{
172 		auto task = std::make_shared<boost::packaged_task<return_type>>(
173 			std::bind(f, args ...)
174 		);
175 		results.emplace_back(task->get_future());
176 		// workaround a Fedora gcc bug else it reports in the lambda below:
177 		// error: no 'operator--(int)' declared for postfix '--'
178 		auto* atomicCounter = &remainingTasks;
179 		tasks.emplace_back([task,atomicCounter]{ (*task)(); (*atomicCounter)--; });
180 		remainingTasks++;
181 	}
182 
enqueue(F && f,Args &&...args)183 	void enqueue(F&& f, Args&&... args)
184 	{
185 		auto task = std::make_shared< boost::packaged_task<return_type> >(
186 			std::bind(std::forward<F>(f), std::forward<Args>(args)...)
187 		);
188 		results.emplace_back(task->get_future());
189 		// workaround a Fedora gcc bug else it reports in the lambda below:
190 		// error: no 'operator--(int)' declared for postfix '--'
191 		auto* atomicCounter = &remainingTasks;
192 		tasks.emplace_back([task,atomicCounter]{ (*task)(); (*atomicCounter)--; });
193 		remainingTasks++;
194 	}
195 
196 
GetTask()197 	virtual boost::optional<std::function<void()>> GetTask()
198 	{
199 		const int pos = curtask++;
200 		if (pos < tasks.size()) {
201 			/*if (latency.count() == 0) {
202 				auto now = boost::chrono::high_resolution_clock::now();
203 				latency = (now - start);
204 				LOG("latency %fms", latency.count() / 1000000.f);
205 			}*/
206 			return tasks[pos];
207 		}
208 		return boost::optional<std::function<void()>>();
209 	}
210 
IsEmpty()211 	virtual bool IsEmpty() const    { return curtask >= tasks.size() /*tasks.empty()*/; }
IsFinished()212 	bool IsFinished() const { return (remainingTasks == 0); }
RemainingTasks()213 	int RemainingTasks() const { return remainingTasks; }
214 
215 	template<typename G>
GetResult(const G && g)216 	return_type GetResult(const G&& g) {
217 		return std::accumulate(results.begin(), results.end(), 0, g);
218 	}
219 
220 private:
221 	//void FinishedATask() { remainingTasks--; }
222 
223 public:
224 	std::atomic<int> remainingTasks;
225 	std::atomic<int> curtask;
226 	std::vector<std::function<void()>> tasks;
227 	std::vector<boost::unique_future<return_type>> results;
228 
229 	boost::chrono::time_point<boost::chrono::high_resolution_clock> start; // use for latency profiling!
230 	boost::chrono::nanoseconds latency;
231 };
232 
233 
234 template<class F, class... Args>
235 class ParallelTaskGroup : public TaskGroup<F,Args...>
236 {
237 public:
238 	ParallelTaskGroup(const int num = 0) : TaskGroup<F,Args...>(num) {
239 		uniqueTasks.resize(ThreadPool::GetNumThreads());
240 	}
241 
242 	typedef typename std::result_of<F(Args...)>::type return_type;
243 
enqueue_unique(const int threadNum,F & f,Args &...args)244 	void enqueue_unique(const int threadNum, F& f, Args&... args)
245 	{
246 		auto task = std::make_shared< boost::packaged_task<return_type> >(
247 			std::bind(std::forward<F>(f), std::forward<Args>(args)...)
248 		);
249 		this->results.emplace_back(task->get_future());
250 		uniqueTasks[threadNum].emplace_back([&,task]{ (*task)(); (this->remainingTasks)--; });
251 		this->remainingTasks++;
252 	}
253 
enqueue_unique(const int threadNum,F && f,Args &&...args)254 	void enqueue_unique(const int threadNum, F&& f, Args&&... args)
255 	{
256 		auto task = std::make_shared< boost::packaged_task<return_type> >(
257 			std::bind(std::forward<F>(f), std::forward<Args>(args)...)
258 		);
259 		this->results.emplace_back(task->get_future());
260 		uniqueTasks[threadNum].emplace_back([&,task]{ (*task)(); (this->remainingTasks)--; });
261 		this->remainingTasks++;
262 	}
263 
264 
GetTask()265 	boost::optional<std::function<void()>> GetTask()
266 	{
267 		auto& ut = uniqueTasks[ThreadPool::GetThreadNum()];
268 		if (!ut.empty()) {
269 			// no need to make threadsafe cause each thread got its own container
270 			auto t = ut.front();
271 			ut.pop_front();
272 			return t;
273 		}
274 
275 		return TaskGroup<F,Args...>::GetTask();
276 	}
277 
IsEmpty()278 	bool IsEmpty() const {
279 		for(auto& ut: uniqueTasks) { if (!ut.empty()) return false; }
280 		return TaskGroup<F,Args...>::IsEmpty();
281 	}
282 
283 public:
284 	std::vector<std::deque<std::function<void()>>> uniqueTasks;
285 };
286 
287 
288 
for_mt(int start,int end,int step,const std::function<void (const int i)> && f)289 static inline void for_mt(int start, int end, int step, const std::function<void(const int i)>&& f)
290 {
291 	if (end <= start)
292 		return;
293 
294 	if ((end - start) < step) {
295 		// single iteration -> directly process
296 		f(start);
297 		return;
298 	}
299 
300 	// do not use HasThreads because that counts main as a worker
301 	if (!ThreadPool::HasThreads()) {
302 		for (int i = start; i < end; i += step) {
303 			f(i);
304 		}
305 		return;
306 	}
307 
308 	ThreadPool::NotifyWorkerThreads();
309 	SCOPED_MT_TIMER("::ThreadWorkers (real)");
310 	auto taskgroup = std::make_shared<TaskGroup<const std::function<void(const int)>, const int>>((end-start)/step);
311 	for (int i = start; i < end; i += step) { //FIXME optimize worksize (group tasks in bigger ones than 1-steps)
312 		taskgroup->enqueue(f, i);
313 	}
314 	ThreadPool::PushTaskGroup(taskgroup);
315 	ThreadPool::WaitForFinished(taskgroup);
316 }
317 
318 
for_mt(int start,int end,const std::function<void (const int i)> && f)319 static inline void for_mt(int start, int end, const std::function<void(const int i)>&& f)
320 {
321 	for_mt(start, end, 1, std::move(f));
322 }
323 
324 
parallel(const std::function<void ()> && f)325 static inline void parallel(const std::function<void()>&& f)
326 {
327 	if (!ThreadPool::HasThreads())
328 		return f();
329 
330 	ThreadPool::NotifyWorkerThreads();
331 	SCOPED_MT_TIMER("::ThreadWorkers (real)");
332 
333 	auto taskgroup = std::make_shared<ParallelTaskGroup<const std::function<void()>>>();
334 	for (int i = 0; i < ThreadPool::GetNumThreads(); ++i) {
335 		taskgroup->enqueue_unique(i, f);
336 	}
337 	ThreadPool::PushTaskGroup(taskgroup);
338 	ThreadPool::WaitForFinished(taskgroup);
339 }
340 
341 
342 template<class F, class G>
343 static inline auto parallel_reduce(F&& f, G&& g) -> typename std::result_of<F()>::type
344 {
345 	if (!ThreadPool::HasThreads())
346 		return f();
347 
348 	ThreadPool::NotifyWorkerThreads();
349 	SCOPED_MT_TIMER("::ThreadWorkers (real)");
350 
351 	auto taskgroup = std::make_shared<ParallelTaskGroup<F>>();
352 	for (int i = 0; i < ThreadPool::GetNumThreads(); ++i) {
353 		taskgroup->enqueue_unique(i, f);
354 	}
355 
356 	ThreadPool::PushTaskGroup(taskgroup);
357 	ThreadPool::WaitForFinished(taskgroup);
358 	return taskgroup->GetResult(std::move(g));
359 }
360 
361 
362 namespace ThreadPool {
363 	template<class F, class... Args>
364 	static inline auto enqueue(F&& f, Args&&... args)
365 	-> std::shared_ptr<boost::unique_future<typename std::result_of<F(Args...)>::type>>
366 	{
367 		typedef typename std::result_of<F(Args...)>::type return_type;
368 
369 		if (!ThreadPool::HasThreads()) {
370 			// directly process when there are no worker threads
371 			auto task = std::make_shared< boost::packaged_task<return_type> >(std::bind(f, args ...));
372 			auto fut = std::make_shared<boost::unique_future<return_type>>(task->get_future());
373 			(*task)();
374 			return fut;
375 		}
376 
377 		auto singletask = std::make_shared<SingleTask<F, Args...>>(std::forward<F>(f), std::forward<Args>(args)...);
378 		ThreadPool::PushTaskGroup(singletask);
379 		return singletask->GetFuture();
380 	}
381 }
382 
383 #endif
384 #endif
385