1 /*****************************************************************************
2 * Copyright (c) 2014-2020 OpenRCT2 developers
3 *
4 * For a complete list of all authors, please refer to contributors.md
5 * Interested in contributing? Visit https://github.com/OpenRCT2/OpenRCT2
6 *
7 * OpenRCT2 is licensed under the GNU General Public License version 3.
8 *****************************************************************************/
9
10 #include "JobPool.h"
11
12 #include <algorithm>
13 #include <cassert>
14
TaskData(std::function<void ()> workFn,std::function<void ()> completionFn)15 JobPool::TaskData::TaskData(std::function<void()> workFn, std::function<void()> completionFn)
16 : WorkFn(workFn)
17 , CompletionFn(completionFn)
18 {
19 }
20
JobPool(size_t maxThreads)21 JobPool::JobPool(size_t maxThreads)
22 {
23 maxThreads = std::min<size_t>(maxThreads, std::thread::hardware_concurrency());
24 for (size_t n = 0; n < maxThreads; n++)
25 {
26 _threads.emplace_back(&JobPool::ProcessQueue, this);
27 }
28 }
29
~JobPool()30 JobPool::~JobPool()
31 {
32 {
33 unique_lock lock(_mutex);
34 _shouldStop = true;
35 _condPending.notify_all();
36 }
37
38 for (auto& th : _threads)
39 {
40 assert(th.joinable() != false);
41 th.join();
42 }
43 }
44
AddTask(std::function<void ()> workFn,std::function<void ()> completionFn)45 void JobPool::AddTask(std::function<void()> workFn, std::function<void()> completionFn)
46 {
47 unique_lock lock(_mutex);
48 _pending.emplace_back(workFn, completionFn);
49 _condPending.notify_one();
50 }
51
Join(std::function<void ()> reportFn)52 void JobPool::Join(std::function<void()> reportFn)
53 {
54 unique_lock lock(_mutex);
55 while (true)
56 {
57 // Wait for the queue to become empty or having completed tasks.
58 _condComplete.wait(lock, [this]() { return (_pending.empty() && _processing == 0) || !_completed.empty(); });
59
60 // Dispatch all completion callbacks if there are any.
61 while (!_completed.empty())
62 {
63 auto taskData = _completed.front();
64 _completed.pop_front();
65
66 if (taskData.CompletionFn)
67 {
68 lock.unlock();
69
70 taskData.CompletionFn();
71
72 lock.lock();
73 }
74 }
75
76 if (reportFn)
77 {
78 lock.unlock();
79
80 reportFn();
81
82 lock.lock();
83 }
84
85 // If everything is empty and no more work has to be done we can stop waiting.
86 if (_completed.empty() && _pending.empty() && _processing == 0)
87 {
88 break;
89 }
90 }
91 }
92
CountPending()93 size_t JobPool::CountPending()
94 {
95 return _pending.size();
96 }
97
ProcessQueue()98 void JobPool::ProcessQueue()
99 {
100 unique_lock lock(_mutex);
101 do
102 {
103 // Wait for work or cancellation.
104 _condPending.wait(lock, [this]() { return _shouldStop || !_pending.empty(); });
105
106 if (!_pending.empty())
107 {
108 _processing++;
109
110 auto taskData = _pending.front();
111 _pending.pop_front();
112
113 lock.unlock();
114
115 taskData.WorkFn();
116
117 lock.lock();
118
119 _completed.push_back(std::move(taskData));
120
121 _processing--;
122 _condComplete.notify_one();
123 }
124 } while (!_shouldStop);
125 }
126