1 // Copyright 2019 The libgav1 Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "src/utils/threadpool.h"
16 
17 #if defined(_MSC_VER)
18 #include <process.h>
19 #include <windows.h>
20 #else  // defined(_MSC_VER)
21 #include <pthread.h>
22 #endif  // defined(_MSC_VER)
23 #if defined(__ANDROID__) || defined(__GLIBC__)
24 #include <sys/types.h>
25 #include <unistd.h>
26 #endif
27 #include <algorithm>
28 #include <cassert>
29 #include <cinttypes>
30 #include <cstddef>
31 #include <cstdint>
32 #include <cstring>
33 #include <new>
34 #include <utility>
35 
36 #if defined(__ANDROID__)
37 #include <chrono>  // NOLINT (unapproved c++11 header)
38 #endif
39 
40 // The glibc wrapper for the gettid() system call was added in glibc 2.30.
41 // Emulate it for older versions of glibc.
42 #if defined(__GLIBC_PREREQ)
43 #if !__GLIBC_PREREQ(2, 30)
44 
45 #include <sys/syscall.h>
46 
gettid()47 static pid_t gettid() { return static_cast<pid_t>(syscall(SYS_gettid)); }
48 
49 #endif
50 #endif  // defined(__GLIBC_PREREQ)
51 
52 namespace libgav1 {
53 
54 #if defined(__ANDROID__)
55 namespace {
56 
57 using Clock = std::chrono::steady_clock;
58 using Duration = Clock::duration;
59 constexpr Duration kBusyWaitDuration =
60     std::chrono::duration_cast<Duration>(std::chrono::duration<double>(2e-3));
61 
62 }  // namespace
63 #endif  // defined(__ANDROID__)
64 
65 // static
Create(int num_threads)66 std::unique_ptr<ThreadPool> ThreadPool::Create(int num_threads) {
67   return Create(/*name_prefix=*/"", num_threads);
68 }
69 
70 // static
Create(const char name_prefix[],int num_threads)71 std::unique_ptr<ThreadPool> ThreadPool::Create(const char name_prefix[],
72                                                int num_threads) {
73   if (name_prefix == nullptr || num_threads <= 0) return nullptr;
74   std::unique_ptr<WorkerThread*[]> threads(new (std::nothrow)
75                                                WorkerThread*[num_threads]);
76   if (threads == nullptr) return nullptr;
77   std::unique_ptr<ThreadPool> pool(new (std::nothrow) ThreadPool(
78       name_prefix, std::move(threads), num_threads));
79   if (pool != nullptr && !pool->StartWorkers()) {
80     pool = nullptr;
81   }
82   return pool;
83 }
84 
ThreadPool(const char name_prefix[],std::unique_ptr<WorkerThread * []> threads,int num_threads)85 ThreadPool::ThreadPool(const char name_prefix[],
86                        std::unique_ptr<WorkerThread*[]> threads,
87                        int num_threads)
88     : threads_(std::move(threads)), num_threads_(num_threads) {
89   threads_[0] = nullptr;
90   assert(name_prefix != nullptr);
91   const size_t name_prefix_len =
92       std::min(strlen(name_prefix), sizeof(name_prefix_) - 1);
93   memcpy(name_prefix_, name_prefix, name_prefix_len);
94   name_prefix_[name_prefix_len] = '\0';
95 }
96 
~ThreadPool()97 ThreadPool::~ThreadPool() { Shutdown(); }
98 
Schedule(std::function<void ()> closure)99 void ThreadPool::Schedule(std::function<void()> closure) {
100   LockMutex();
101   if (!queue_.GrowIfNeeded()) {
102     // queue_ is full and we can't grow it. Run |closure| directly.
103     UnlockMutex();
104     closure();
105     return;
106   }
107   queue_.Push(std::move(closure));
108   UnlockMutex();
109   SignalOne();
110 }
111 
num_threads() const112 int ThreadPool::num_threads() const { return num_threads_; }
113 
114 // A simple implementation that mirrors the non-portable Thread.  We may
115 // choose to expand this in the future as a portable implementation of
116 // Thread, or replace it at such a time as one is implemented.
117 class ThreadPool::WorkerThread : public Allocable {
118  public:
119   // Creates and starts a thread that runs pool->WorkerFunction().
120   explicit WorkerThread(ThreadPool* pool);
121 
122   // Not copyable or movable.
123   WorkerThread(const WorkerThread&) = delete;
124   WorkerThread& operator=(const WorkerThread&) = delete;
125 
126   // REQUIRES: Join() must have been called if Start() was called and
127   // succeeded.
128   ~WorkerThread() = default;
129 
130   LIBGAV1_MUST_USE_RESULT bool Start();
131 
132   // Joins with the running thread.
133   void Join();
134 
135  private:
136 #if defined(_MSC_VER)
137   static unsigned int __stdcall ThreadBody(void* arg);
138 #else
139   static void* ThreadBody(void* arg);
140 #endif
141 
142   void SetupName();
143   void Run();
144 
145   ThreadPool* pool_;
146 #if defined(_MSC_VER)
147   HANDLE handle_;
148 #else
149   pthread_t thread_;
150 #endif
151 };
152 
WorkerThread(ThreadPool * pool)153 ThreadPool::WorkerThread::WorkerThread(ThreadPool* pool) : pool_(pool) {}
154 
155 #if defined(_MSC_VER)
156 
Start()157 bool ThreadPool::WorkerThread::Start() {
158   // Since our code calls the C run-time library (CRT), use _beginthreadex
159   // rather than CreateThread. Microsoft documentation says "If a thread
160   // created using CreateThread calls the CRT, the CRT may terminate the
161   // process in low-memory conditions."
162   uintptr_t handle = _beginthreadex(
163       /*security=*/nullptr, /*stack_size=*/0, ThreadBody, this,
164       /*initflag=*/CREATE_SUSPENDED, /*thrdaddr=*/nullptr);
165   if (handle == 0) return false;
166   handle_ = reinterpret_cast<HANDLE>(handle);
167   ResumeThread(handle_);
168   return true;
169 }
170 
Join()171 void ThreadPool::WorkerThread::Join() {
172   WaitForSingleObject(handle_, INFINITE);
173   CloseHandle(handle_);
174 }
175 
ThreadBody(void * arg)176 unsigned int ThreadPool::WorkerThread::ThreadBody(void* arg) {
177   auto* thread = static_cast<WorkerThread*>(arg);
178   thread->Run();
179   return 0;
180 }
181 
SetupName()182 void ThreadPool::WorkerThread::SetupName() {
183   // Not currently supported on Windows.
184 }
185 
186 #else  // defined(_MSC_VER)
187 
Start()188 bool ThreadPool::WorkerThread::Start() {
189   return pthread_create(&thread_, nullptr, ThreadBody, this) == 0;
190 }
191 
Join()192 void ThreadPool::WorkerThread::Join() { pthread_join(thread_, nullptr); }
193 
ThreadBody(void * arg)194 void* ThreadPool::WorkerThread::ThreadBody(void* arg) {
195   auto* thread = static_cast<WorkerThread*>(arg);
196   thread->Run();
197   return nullptr;
198 }
199 
SetupName()200 void ThreadPool::WorkerThread::SetupName() {
201   if (pool_->name_prefix_[0] != '\0') {
202 #if defined(__APPLE__)
203     // Apple's version of pthread_setname_np takes one argument and operates on
204     // the current thread only. Also, pthread_mach_thread_np is Apple-specific.
205     // The maximum size of the |name| buffer was noted in the Chromium source
206     // code and was confirmed by experiments.
207     char name[64];
208     mach_port_t id = pthread_mach_thread_np(pthread_self());
209     int rv = snprintf(name, sizeof(name), "%s/%" PRId64, pool_->name_prefix_,
210                       static_cast<int64_t>(id));
211     assert(rv >= 0);
212     rv = pthread_setname_np(name);
213     assert(rv == 0);
214     static_cast<void>(rv);
215 #elif defined(__ANDROID__) || defined(__GLIBC__)
216     // If the |name| buffer is longer than 16 bytes, pthread_setname_np fails
217     // with error 34 (ERANGE) on Android.
218     char name[16];
219     pid_t id = gettid();
220     int rv = snprintf(name, sizeof(name), "%s/%" PRId64, pool_->name_prefix_,
221                       static_cast<int64_t>(id));
222     assert(rv >= 0);
223     rv = pthread_setname_np(pthread_self(), name);
224     assert(rv == 0);
225     static_cast<void>(rv);
226 #endif
227   }
228 }
229 
230 #endif  // defined(_MSC_VER)
231 
Run()232 void ThreadPool::WorkerThread::Run() {
233   SetupName();
234   pool_->WorkerFunction();
235 }
236 
StartWorkers()237 bool ThreadPool::StartWorkers() {
238   if (!queue_.Init()) return false;
239   for (int i = 0; i < num_threads_; ++i) {
240     threads_[i] = new (std::nothrow) WorkerThread(this);
241     if (threads_[i] == nullptr) return false;
242     if (!threads_[i]->Start()) {
243       delete threads_[i];
244       threads_[i] = nullptr;
245       return false;
246     }
247   }
248   return true;
249 }
250 
WorkerFunction()251 void ThreadPool::WorkerFunction() {
252   LockMutex();
253   while (true) {
254     if (queue_.Empty()) {
255       if (exit_threads_) {
256         break;  // Queue is empty and exit was requested.
257       }
258 #if defined(__ANDROID__)
259       // On android, if we go to a conditional wait right away, the CPU governor
260       // kicks in and starts shutting the cores down. So we do a very small busy
261       // wait to see if we get our next job within that period. This
262       // significantly improves the performance of common cases of tile parallel
263       // decoding. If we don't receive a job in the busy wait time, we then go
264       // to an actual conditional wait as usual.
265       UnlockMutex();
266       bool found_job = false;
267       const auto wait_start = Clock::now();
268       while (Clock::now() - wait_start < kBusyWaitDuration) {
269         LockMutex();
270         if (!queue_.Empty()) {
271           found_job = true;
272           break;
273         }
274         UnlockMutex();
275       }
276       // If |found_job| is true, we simply continue since we already hold the
277       // mutex and we know for sure that the |queue_| is not empty.
278       if (found_job) continue;
279       // Since |found_job_| was false, the mutex is not being held at this
280       // point.
281       LockMutex();
282       // Ensure that the queue is still empty.
283       if (!queue_.Empty()) continue;
284       if (exit_threads_) {
285         break;  // Queue is empty and exit was requested.
286       }
287 #endif  // defined(__ANDROID__)
288       // Queue is still empty, wait for signal or broadcast.
289       Wait();
290     } else {
291       // Take a job from the queue.
292       std::function<void()> job = std::move(queue_.Front());
293       queue_.Pop();
294 
295       UnlockMutex();
296       // Note that it is good practice to surround this with a try/catch so
297       // the thread pool doesn't go to hell if the job throws an exception.
298       // This is omitted here because Google3 doesn't like exceptions.
299       std::move(job)();
300       job = nullptr;
301 
302       LockMutex();
303     }
304   }
305   UnlockMutex();
306 }
307 
Shutdown()308 void ThreadPool::Shutdown() {
309   // Tell worker threads how to exit.
310   LockMutex();
311   exit_threads_ = true;
312   UnlockMutex();
313   SignalAll();
314 
315   // Join all workers. This will block.
316   for (int i = 0; i < num_threads_; ++i) {
317     if (threads_[i] == nullptr) break;
318     threads_[i]->Join();
319     delete threads_[i];
320   }
321 }
322 
323 }  // namespace libgav1
324