1 //
2 // Copyright 2016 The ANGLE Project Authors. All rights reserved.
3 // Use of this source code is governed by a BSD-style license that can be
4 // found in the LICENSE file.
5 //
6 // WorkerThread:
7 //   Task running thread for ANGLE, similar to a TaskRunner in Chromium.
8 //   Might be implemented differently depending on platform.
9 //
10 
11 #include "libANGLE/WorkerThread.h"
12 
13 #include "libANGLE/trace.h"
14 
15 #if (ANGLE_DELEGATE_WORKERS == ANGLE_ENABLED) || (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED)
16 #    include <condition_variable>
17 #    include <future>
18 #    include <mutex>
19 #    include <queue>
20 #    include <thread>
21 #endif  // (ANGLE_DELEGATE_WORKERS == ANGLE_ENABLED) || (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED)
22 
23 namespace angle
24 {
25 
26 WaitableEvent::WaitableEvent()  = default;
27 WaitableEvent::~WaitableEvent() = default;
28 
wait()29 void WaitableEventDone::wait() {}
30 
isReady()31 bool WaitableEventDone::isReady()
32 {
33     return true;
34 }
35 
36 WorkerThreadPool::WorkerThreadPool()  = default;
37 WorkerThreadPool::~WorkerThreadPool() = default;
38 
39 class SingleThreadedWaitableEvent final : public WaitableEvent
40 {
41   public:
42     SingleThreadedWaitableEvent()           = default;
43     ~SingleThreadedWaitableEvent() override = default;
44 
45     void wait() override;
46     bool isReady() override;
47 };
48 
wait()49 void SingleThreadedWaitableEvent::wait() {}
50 
isReady()51 bool SingleThreadedWaitableEvent::isReady()
52 {
53     return true;
54 }
55 
56 class SingleThreadedWorkerPool final : public WorkerThreadPool
57 {
58   public:
59     std::shared_ptr<WaitableEvent> postWorkerTask(std::shared_ptr<Closure> task) override;
60     void setMaxThreads(size_t maxThreads) override;
61     bool isAsync() override;
62 };
63 
64 // SingleThreadedWorkerPool implementation.
postWorkerTask(std::shared_ptr<Closure> task)65 std::shared_ptr<WaitableEvent> SingleThreadedWorkerPool::postWorkerTask(
66     std::shared_ptr<Closure> task)
67 {
68     (*task)();
69     return std::make_shared<SingleThreadedWaitableEvent>();
70 }
71 
setMaxThreads(size_t maxThreads)72 void SingleThreadedWorkerPool::setMaxThreads(size_t maxThreads) {}
73 
isAsync()74 bool SingleThreadedWorkerPool::isAsync()
75 {
76     return false;
77 }
78 
79 #if (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED)
80 class AsyncWaitableEvent final : public WaitableEvent
81 {
82   public:
AsyncWaitableEvent()83     AsyncWaitableEvent() : mIsPending(true) {}
84     ~AsyncWaitableEvent() override = default;
85 
86     void wait() override;
87     bool isReady() override;
88 
89   private:
90     friend class AsyncWorkerPool;
91     void setFuture(std::future<void> &&future);
92 
93     // To block wait() when the task is still in queue to be run.
94     // Also to protect the concurrent accesses from both main thread and
95     // background threads to the member fields.
96     std::mutex mMutex;
97 
98     bool mIsPending;
99     std::condition_variable mCondition;
100     std::future<void> mFuture;
101 };
102 
setFuture(std::future<void> && future)103 void AsyncWaitableEvent::setFuture(std::future<void> &&future)
104 {
105     mFuture = std::move(future);
106 }
107 
wait()108 void AsyncWaitableEvent::wait()
109 {
110     ANGLE_TRACE_EVENT0("gpu.angle", "AsyncWaitableEvent::wait");
111     {
112         std::unique_lock<std::mutex> lock(mMutex);
113         mCondition.wait(lock, [this] { return !mIsPending; });
114     }
115 
116     ASSERT(mFuture.valid());
117     mFuture.wait();
118 }
119 
isReady()120 bool AsyncWaitableEvent::isReady()
121 {
122     std::lock_guard<std::mutex> lock(mMutex);
123     if (mIsPending)
124     {
125         return false;
126     }
127     ASSERT(mFuture.valid());
128     return mFuture.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
129 }
130 
131 class AsyncWorkerPool final : public WorkerThreadPool
132 {
133   public:
AsyncWorkerPool(size_t maxThreads)134     AsyncWorkerPool(size_t maxThreads) : mMaxThreads(maxThreads), mRunningThreads(0) {}
135     ~AsyncWorkerPool() override = default;
136 
137     std::shared_ptr<WaitableEvent> postWorkerTask(std::shared_ptr<Closure> task) override;
138     void setMaxThreads(size_t maxThreads) override;
139     bool isAsync() override;
140 
141   private:
142     void checkToRunPendingTasks();
143 
144     // To protect the concurrent accesses from both main thread and background
145     // threads to the member fields.
146     std::mutex mMutex;
147 
148     size_t mMaxThreads;
149     size_t mRunningThreads;
150     std::queue<std::pair<std::shared_ptr<AsyncWaitableEvent>, std::shared_ptr<Closure>>> mTaskQueue;
151 };
152 
153 // AsyncWorkerPool implementation.
postWorkerTask(std::shared_ptr<Closure> task)154 std::shared_ptr<WaitableEvent> AsyncWorkerPool::postWorkerTask(std::shared_ptr<Closure> task)
155 {
156     ASSERT(mMaxThreads > 0);
157 
158     auto waitable = std::make_shared<AsyncWaitableEvent>();
159     {
160         std::lock_guard<std::mutex> lock(mMutex);
161         mTaskQueue.push(std::make_pair(waitable, task));
162     }
163     checkToRunPendingTasks();
164     return std::move(waitable);
165 }
166 
setMaxThreads(size_t maxThreads)167 void AsyncWorkerPool::setMaxThreads(size_t maxThreads)
168 {
169     {
170         std::lock_guard<std::mutex> lock(mMutex);
171         mMaxThreads = (maxThreads == 0xFFFFFFFF ? std::thread::hardware_concurrency() : maxThreads);
172     }
173     checkToRunPendingTasks();
174 }
175 
isAsync()176 bool AsyncWorkerPool::isAsync()
177 {
178     return true;
179 }
180 
checkToRunPendingTasks()181 void AsyncWorkerPool::checkToRunPendingTasks()
182 {
183     std::lock_guard<std::mutex> lock(mMutex);
184     while (mRunningThreads < mMaxThreads && !mTaskQueue.empty())
185     {
186         auto task = mTaskQueue.front();
187         mTaskQueue.pop();
188         auto waitable = task.first;
189         auto closure  = task.second;
190 
191         auto future = std::async(std::launch::async, [closure, this] {
192             {
193                 ANGLE_TRACE_EVENT0("gpu.angle", "AsyncWorkerPool::RunTask");
194                 (*closure)();
195             }
196             {
197                 std::lock_guard<std::mutex> lock(mMutex);
198                 ASSERT(mRunningThreads != 0);
199                 --mRunningThreads;
200             }
201             checkToRunPendingTasks();
202         });
203 
204         ++mRunningThreads;
205 
206         {
207             std::lock_guard<std::mutex> waitableLock(waitable->mMutex);
208             waitable->mIsPending = false;
209             waitable->setFuture(std::move(future));
210         }
211         waitable->mCondition.notify_all();
212     }
213 }
214 #endif  // (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED)
215 
216 #if (ANGLE_DELEGATE_WORKERS == ANGLE_ENABLED)
217 class DelegateWaitableEvent final : public WaitableEvent
218 {
219   public:
220     DelegateWaitableEvent()           = default;
221     ~DelegateWaitableEvent() override = default;
222 
223     void wait() override;
224     bool isReady() override;
225 
226     void markAsReady();
227 
228   private:
229     // To protect the concurrent accesses from both main thread and background
230     // threads to the member fields.
231     std::mutex mMutex;
232 
233     bool mIsReady = false;
234     std::condition_variable mCondition;
235 };
236 
markAsReady()237 void DelegateWaitableEvent::markAsReady()
238 {
239     std::lock_guard<std::mutex> lock(mMutex);
240     mIsReady = true;
241     mCondition.notify_all();
242 }
243 
wait()244 void DelegateWaitableEvent::wait()
245 {
246     std::unique_lock<std::mutex> lock(mMutex);
247     mCondition.wait(lock, [this] { return mIsReady; });
248 }
249 
isReady()250 bool DelegateWaitableEvent::isReady()
251 {
252     std::lock_guard<std::mutex> lock(mMutex);
253     return mIsReady;
254 }
255 
256 class DelegateWorkerPool final : public WorkerThreadPool
257 {
258   public:
259     DelegateWorkerPool()           = default;
260     ~DelegateWorkerPool() override = default;
261 
262     std::shared_ptr<WaitableEvent> postWorkerTask(std::shared_ptr<Closure> task) override;
263 
264     void setMaxThreads(size_t maxThreads) override;
265     bool isAsync() override;
266 };
267 
268 // A function wrapper to execute the closure and to notify the waitable
269 // event after the execution.
270 class DelegateWorkerTask
271 {
272   public:
DelegateWorkerTask(std::shared_ptr<Closure> task,std::shared_ptr<DelegateWaitableEvent> waitable)273     DelegateWorkerTask(std::shared_ptr<Closure> task,
274                        std::shared_ptr<DelegateWaitableEvent> waitable)
275         : mTask(task), mWaitable(waitable)
276     {}
277     DelegateWorkerTask()                     = delete;
278     DelegateWorkerTask(DelegateWorkerTask &) = delete;
279 
RunTask(void * userData)280     static void RunTask(void *userData)
281     {
282         DelegateWorkerTask *workerTask = static_cast<DelegateWorkerTask *>(userData);
283         (*workerTask->mTask)();
284         workerTask->mWaitable->markAsReady();
285 
286         // Delete the task after its execution.
287         delete workerTask;
288     }
289 
290   private:
291     ~DelegateWorkerTask() = default;
292 
293     std::shared_ptr<Closure> mTask;
294     std::shared_ptr<DelegateWaitableEvent> mWaitable;
295 };
296 
postWorkerTask(std::shared_ptr<Closure> task)297 std::shared_ptr<WaitableEvent> DelegateWorkerPool::postWorkerTask(std::shared_ptr<Closure> task)
298 {
299     auto waitable = std::make_shared<DelegateWaitableEvent>();
300 
301     // The task will be deleted by DelegateWorkerTask::RunTask(...) after its execution.
302     DelegateWorkerTask *workerTask = new DelegateWorkerTask(task, waitable);
303     auto *platform                 = ANGLEPlatformCurrent();
304     platform->postWorkerTask(platform, DelegateWorkerTask::RunTask, workerTask);
305 
306     return std::move(waitable);
307 }
308 
setMaxThreads(size_t maxThreads)309 void DelegateWorkerPool::setMaxThreads(size_t maxThreads) {}
310 
isAsync()311 bool DelegateWorkerPool::isAsync()
312 {
313     return true;
314 }
315 #endif
316 
317 // static
Create(bool multithreaded)318 std::shared_ptr<WorkerThreadPool> WorkerThreadPool::Create(bool multithreaded)
319 {
320     std::shared_ptr<WorkerThreadPool> pool(nullptr);
321 
322 #if (ANGLE_DELEGATE_WORKERS == ANGLE_ENABLED)
323     const bool hasPostWorkerTaskImpl = ANGLEPlatformCurrent()->postWorkerTask;
324     if (hasPostWorkerTaskImpl && multithreaded)
325     {
326         pool = std::shared_ptr<WorkerThreadPool>(new DelegateWorkerPool());
327     }
328 #endif
329 #if (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED)
330     if (!pool && multithreaded)
331     {
332         pool = std::shared_ptr<WorkerThreadPool>(
333             new AsyncWorkerPool(std::thread::hardware_concurrency()));
334     }
335 #endif
336     if (!pool)
337     {
338         return std::shared_ptr<WorkerThreadPool>(new SingleThreadedWorkerPool());
339     }
340     return pool;
341 }
342 
343 // static
PostWorkerTask(std::shared_ptr<WorkerThreadPool> pool,std::shared_ptr<Closure> task)344 std::shared_ptr<WaitableEvent> WorkerThreadPool::PostWorkerTask(
345     std::shared_ptr<WorkerThreadPool> pool,
346     std::shared_ptr<Closure> task)
347 {
348     std::shared_ptr<WaitableEvent> event = pool->postWorkerTask(task);
349     if (event.get())
350     {
351         event->setWorkerThreadPool(pool);
352     }
353     return event;
354 }
355 
356 }  // namespace angle
357