1 // Copyright 2019 The libgav1 Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "src/utils/threadpool.h"
16 
17 #if defined(_MSC_VER)
18 #include <process.h>
19 #include <windows.h>
20 #else  // defined(_MSC_VER)
21 #include <pthread.h>
22 #endif  // defined(_MSC_VER)
23 #if defined(__ANDROID__) || defined(__GLIBC__)
24 #include <sys/types.h>
25 #include <unistd.h>
26 #endif
27 #include <algorithm>
28 #include <cassert>
29 #include <cinttypes>
30 #include <cstddef>
31 #include <cstdint>
32 #include <cstring>
33 #include <new>
34 #include <utility>
35 
36 #if defined(__ANDROID__)
37 #include <chrono>  // NOLINT (unapproved c++11 header)
38 #endif
39 
40 // Define the GetTid() function, a wrapper for the gettid() system call in
41 // Linux.
42 #if defined(__ANDROID__)
GetTid()43 static pid_t GetTid() { return gettid(); }
44 #elif defined(__GLIBC__)
45 // The glibc wrapper for the gettid() system call was added in glibc 2.30.
46 // Emulate it for older versions of glibc.
47 #if __GLIBC__ > 2 || (__GLIBC__ == 2 && __GLIBC_MINOR__ >= 30)
GetTid()48 static pid_t GetTid() { return gettid(); }
49 #else  // Older than glibc 2.30
50 #include <sys/syscall.h>
51 
GetTid()52 static pid_t GetTid() { return static_cast<pid_t>(syscall(SYS_gettid)); }
53 #endif  // glibc 2.30 or later.
54 #endif  // defined(__GLIBC__)
55 
56 namespace libgav1 {
57 
58 #if defined(__ANDROID__)
59 namespace {
60 
61 using Clock = std::chrono::steady_clock;
62 using Duration = Clock::duration;
63 constexpr Duration kBusyWaitDuration =
64     std::chrono::duration_cast<Duration>(std::chrono::duration<double>(2e-3));
65 
66 }  // namespace
67 #endif  // defined(__ANDROID__)
68 
69 // static
Create(int num_threads)70 std::unique_ptr<ThreadPool> ThreadPool::Create(int num_threads) {
71   return Create(/*name_prefix=*/"", num_threads);
72 }
73 
74 // static
Create(const char name_prefix[],int num_threads)75 std::unique_ptr<ThreadPool> ThreadPool::Create(const char name_prefix[],
76                                                int num_threads) {
77   if (name_prefix == nullptr || num_threads <= 0) return nullptr;
78   std::unique_ptr<WorkerThread*[]> threads(new (std::nothrow)
79                                                WorkerThread*[num_threads]);
80   if (threads == nullptr) return nullptr;
81   std::unique_ptr<ThreadPool> pool(new (std::nothrow) ThreadPool(
82       name_prefix, std::move(threads), num_threads));
83   if (pool != nullptr && !pool->StartWorkers()) {
84     pool = nullptr;
85   }
86   return pool;
87 }
88 
ThreadPool(const char name_prefix[],std::unique_ptr<WorkerThread * []> threads,int num_threads)89 ThreadPool::ThreadPool(const char name_prefix[],
90                        std::unique_ptr<WorkerThread*[]> threads,
91                        int num_threads)
92     : threads_(std::move(threads)), num_threads_(num_threads) {
93   threads_[0] = nullptr;
94   assert(name_prefix != nullptr);
95   const size_t name_prefix_len =
96       std::min(strlen(name_prefix), sizeof(name_prefix_) - 1);
97   memcpy(name_prefix_, name_prefix, name_prefix_len);
98   name_prefix_[name_prefix_len] = '\0';
99 }
100 
~ThreadPool()101 ThreadPool::~ThreadPool() { Shutdown(); }
102 
Schedule(std::function<void ()> closure)103 void ThreadPool::Schedule(std::function<void()> closure) {
104   LockMutex();
105   if (!queue_.GrowIfNeeded()) {
106     // queue_ is full and we can't grow it. Run |closure| directly.
107     UnlockMutex();
108     closure();
109     return;
110   }
111   queue_.Push(std::move(closure));
112   UnlockMutex();
113   SignalOne();
114 }
115 
num_threads() const116 int ThreadPool::num_threads() const { return num_threads_; }
117 
118 // A simple implementation that mirrors the non-portable Thread.  We may
119 // choose to expand this in the future as a portable implementation of
120 // Thread, or replace it at such a time as one is implemented.
121 class ThreadPool::WorkerThread : public Allocable {
122  public:
123   // Creates and starts a thread that runs pool->WorkerFunction().
124   explicit WorkerThread(ThreadPool* pool);
125 
126   // Not copyable or movable.
127   WorkerThread(const WorkerThread&) = delete;
128   WorkerThread& operator=(const WorkerThread&) = delete;
129 
130   // REQUIRES: Join() must have been called if Start() was called and
131   // succeeded.
132   ~WorkerThread() = default;
133 
134   LIBGAV1_MUST_USE_RESULT bool Start();
135 
136   // Joins with the running thread.
137   void Join();
138 
139  private:
140 #if defined(_MSC_VER)
141   static unsigned int __stdcall ThreadBody(void* arg);
142 #else
143   static void* ThreadBody(void* arg);
144 #endif
145 
146   void SetupName();
147   void Run();
148 
149   ThreadPool* pool_;
150 #if defined(_MSC_VER)
151   HANDLE handle_;
152 #else
153   pthread_t thread_;
154 #endif
155 };
156 
WorkerThread(ThreadPool * pool)157 ThreadPool::WorkerThread::WorkerThread(ThreadPool* pool) : pool_(pool) {}
158 
159 #if defined(_MSC_VER)
160 
Start()161 bool ThreadPool::WorkerThread::Start() {
162   // Since our code calls the C run-time library (CRT), use _beginthreadex
163   // rather than CreateThread. Microsoft documentation says "If a thread
164   // created using CreateThread calls the CRT, the CRT may terminate the
165   // process in low-memory conditions."
166   uintptr_t handle = _beginthreadex(
167       /*security=*/nullptr, /*stack_size=*/0, ThreadBody, this,
168       /*initflag=*/CREATE_SUSPENDED, /*thrdaddr=*/nullptr);
169   if (handle == 0) return false;
170   handle_ = reinterpret_cast<HANDLE>(handle);
171   ResumeThread(handle_);
172   return true;
173 }
174 
Join()175 void ThreadPool::WorkerThread::Join() {
176   WaitForSingleObject(handle_, INFINITE);
177   CloseHandle(handle_);
178 }
179 
ThreadBody(void * arg)180 unsigned int ThreadPool::WorkerThread::ThreadBody(void* arg) {
181   auto* thread = static_cast<WorkerThread*>(arg);
182   thread->Run();
183   return 0;
184 }
185 
SetupName()186 void ThreadPool::WorkerThread::SetupName() {
187   // Not currently supported on Windows.
188 }
189 
190 #else  // defined(_MSC_VER)
191 
Start()192 bool ThreadPool::WorkerThread::Start() {
193   return pthread_create(&thread_, nullptr, ThreadBody, this) == 0;
194 }
195 
Join()196 void ThreadPool::WorkerThread::Join() { pthread_join(thread_, nullptr); }
197 
ThreadBody(void * arg)198 void* ThreadPool::WorkerThread::ThreadBody(void* arg) {
199   auto* thread = static_cast<WorkerThread*>(arg);
200   thread->Run();
201   return nullptr;
202 }
203 
SetupName()204 void ThreadPool::WorkerThread::SetupName() {
205   if (pool_->name_prefix_[0] != '\0') {
206 #if defined(__APPLE__)
207     // Apple's version of pthread_setname_np takes one argument and operates on
208     // the current thread only. Also, pthread_mach_thread_np is Apple-specific.
209     // The maximum size of the |name| buffer was noted in the Chromium source
210     // code and was confirmed by experiments.
211     char name[64];
212     mach_port_t id = pthread_mach_thread_np(pthread_self());
213     int rv = snprintf(name, sizeof(name), "%s/%" PRId64, pool_->name_prefix_,
214                       static_cast<int64_t>(id));
215     assert(rv >= 0);
216     rv = pthread_setname_np(name);
217     assert(rv == 0);
218     static_cast<void>(rv);
219 #elif defined(__ANDROID__) || defined(__GLIBC__)
220     // If the |name| buffer is longer than 16 bytes, pthread_setname_np fails
221     // with error 34 (ERANGE) on Android.
222     char name[16];
223     pid_t id = GetTid();
224     int rv = snprintf(name, sizeof(name), "%s/%" PRId64, pool_->name_prefix_,
225                       static_cast<int64_t>(id));
226     assert(rv >= 0);
227     rv = pthread_setname_np(pthread_self(), name);
228     assert(rv == 0);
229     static_cast<void>(rv);
230 #endif
231   }
232 }
233 
234 #endif  // defined(_MSC_VER)
235 
Run()236 void ThreadPool::WorkerThread::Run() {
237   SetupName();
238   pool_->WorkerFunction();
239 }
240 
StartWorkers()241 bool ThreadPool::StartWorkers() {
242   if (!queue_.Init()) return false;
243   for (int i = 0; i < num_threads_; ++i) {
244     threads_[i] = new (std::nothrow) WorkerThread(this);
245     if (threads_[i] == nullptr) return false;
246     if (!threads_[i]->Start()) {
247       delete threads_[i];
248       threads_[i] = nullptr;
249       return false;
250     }
251   }
252   return true;
253 }
254 
WorkerFunction()255 void ThreadPool::WorkerFunction() {
256   LockMutex();
257   while (true) {
258     if (queue_.Empty()) {
259       if (exit_threads_) {
260         break;  // Queue is empty and exit was requested.
261       }
262 #if defined(__ANDROID__)
263       // On android, if we go to a conditional wait right away, the CPU governor
264       // kicks in and starts shutting the cores down. So we do a very small busy
265       // wait to see if we get our next job within that period. This
266       // significantly improves the performance of common cases of tile parallel
267       // decoding. If we don't receive a job in the busy wait time, we then go
268       // to an actual conditional wait as usual.
269       UnlockMutex();
270       bool found_job = false;
271       const auto wait_start = Clock::now();
272       while (Clock::now() - wait_start < kBusyWaitDuration) {
273         LockMutex();
274         if (!queue_.Empty()) {
275           found_job = true;
276           break;
277         }
278         UnlockMutex();
279       }
280       // If |found_job| is true, we simply continue since we already hold the
281       // mutex and we know for sure that the |queue_| is not empty.
282       if (found_job) continue;
283       // Since |found_job_| was false, the mutex is not being held at this
284       // point.
285       LockMutex();
286       // Ensure that the queue is still empty.
287       if (!queue_.Empty()) continue;
288       if (exit_threads_) {
289         break;  // Queue is empty and exit was requested.
290       }
291 #endif  // defined(__ANDROID__)
292       // Queue is still empty, wait for signal or broadcast.
293       Wait();
294     } else {
295       // Take a job from the queue.
296       std::function<void()> job = std::move(queue_.Front());
297       queue_.Pop();
298 
299       UnlockMutex();
300       // Note that it is good practice to surround this with a try/catch so
301       // the thread pool doesn't go to hell if the job throws an exception.
302       // This is omitted here because Google3 doesn't like exceptions.
303       std::move(job)();
304       job = nullptr;
305 
306       LockMutex();
307     }
308   }
309   UnlockMutex();
310 }
311 
Shutdown()312 void ThreadPool::Shutdown() {
313   // Tell worker threads how to exit.
314   LockMutex();
315   exit_threads_ = true;
316   UnlockMutex();
317   SignalAll();
318 
319   // Join all workers. This will block.
320   for (int i = 0; i < num_threads_; ++i) {
321     if (threads_[i] == nullptr) break;
322     threads_[i]->Join();
323     delete threads_[i];
324   }
325 }
326 
327 }  // namespace libgav1
328