1 /*****************************************************************************
2  * Copyright (c) 2014-2020 OpenRCT2 developers
3  *
4  * For a complete list of all authors, please refer to contributors.md
5  * Interested in contributing? Visit https://github.com/OpenRCT2/OpenRCT2
6  *
7  * OpenRCT2 is licensed under the GNU General Public License version 3.
8  *****************************************************************************/
9 
10 #include "JobPool.h"
11 
12 #include <algorithm>
13 #include <cassert>
14 
TaskData(std::function<void ()> workFn,std::function<void ()> completionFn)15 JobPool::TaskData::TaskData(std::function<void()> workFn, std::function<void()> completionFn)
16     : WorkFn(workFn)
17     , CompletionFn(completionFn)
18 {
19 }
20 
JobPool(size_t maxThreads)21 JobPool::JobPool(size_t maxThreads)
22 {
23     maxThreads = std::min<size_t>(maxThreads, std::thread::hardware_concurrency());
24     for (size_t n = 0; n < maxThreads; n++)
25     {
26         _threads.emplace_back(&JobPool::ProcessQueue, this);
27     }
28 }
29 
~JobPool()30 JobPool::~JobPool()
31 {
32     {
33         unique_lock lock(_mutex);
34         _shouldStop = true;
35         _condPending.notify_all();
36     }
37 
38     for (auto& th : _threads)
39     {
40         assert(th.joinable() != false);
41         th.join();
42     }
43 }
44 
AddTask(std::function<void ()> workFn,std::function<void ()> completionFn)45 void JobPool::AddTask(std::function<void()> workFn, std::function<void()> completionFn)
46 {
47     unique_lock lock(_mutex);
48     _pending.emplace_back(workFn, completionFn);
49     _condPending.notify_one();
50 }
51 
Join(std::function<void ()> reportFn)52 void JobPool::Join(std::function<void()> reportFn)
53 {
54     unique_lock lock(_mutex);
55     while (true)
56     {
57         // Wait for the queue to become empty or having completed tasks.
58         _condComplete.wait(lock, [this]() { return (_pending.empty() && _processing == 0) || !_completed.empty(); });
59 
60         // Dispatch all completion callbacks if there are any.
61         while (!_completed.empty())
62         {
63             auto taskData = _completed.front();
64             _completed.pop_front();
65 
66             if (taskData.CompletionFn)
67             {
68                 lock.unlock();
69 
70                 taskData.CompletionFn();
71 
72                 lock.lock();
73             }
74         }
75 
76         if (reportFn)
77         {
78             lock.unlock();
79 
80             reportFn();
81 
82             lock.lock();
83         }
84 
85         // If everything is empty and no more work has to be done we can stop waiting.
86         if (_completed.empty() && _pending.empty() && _processing == 0)
87         {
88             break;
89         }
90     }
91 }
92 
CountPending()93 size_t JobPool::CountPending()
94 {
95     return _pending.size();
96 }
97 
ProcessQueue()98 void JobPool::ProcessQueue()
99 {
100     unique_lock lock(_mutex);
101     do
102     {
103         // Wait for work or cancellation.
104         _condPending.wait(lock, [this]() { return _shouldStop || !_pending.empty(); });
105 
106         if (!_pending.empty())
107         {
108             _processing++;
109 
110             auto taskData = _pending.front();
111             _pending.pop_front();
112 
113             lock.unlock();
114 
115             taskData.WorkFn();
116 
117             lock.lock();
118 
119             _completed.push_back(std::move(taskData));
120 
121             _processing--;
122             _condComplete.notify_one();
123         }
124     } while (!_shouldStop);
125 }
126