1 /**
2  * @file   thread_pool.cc
3  *
4  * @section LICENSE
5  *
6  * The MIT License
7  *
8  * @copyright Copyright (c) 2018-2021 TileDB, Inc.
9  *
10  * Permission is hereby granted, free of charge, to any person obtaining a copy
11  * of this software and associated documentation files (the "Software"), to deal
12  * in the Software without restriction, including without limitation the rights
13  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14  * copies of the Software, and to permit persons to whom the Software is
15  * furnished to do so, subject to the following conditions:
16  *
17  * The above copyright notice and this permission notice shall be included in
18  * all copies or substantial portions of the Software.
19  *
20  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26  * THE SOFTWARE.
27  *
28  * @section DESCRIPTION
29  *
30  * This file defines the ThreadPool class.
31  */
32 
33 #include <cassert>
34 
35 #include "tiledb/common/logger.h"
36 #include "tiledb/common/thread_pool.h"
37 
38 namespace tiledb {
39 namespace common {
40 
41 // Define the static ThreadPool member variables.
42 std::unordered_map<std::thread::id, ThreadPool*> ThreadPool::tp_index_;
43 std::mutex ThreadPool::tp_index_lock_;
44 std::unordered_map<std::thread::id, tdb_shared_ptr<ThreadPool::PackagedTask>>
45     ThreadPool::task_index_;
46 std::mutex ThreadPool::task_index_lock_;
47 
ThreadPool()48 ThreadPool::ThreadPool()
49     : concurrency_level_(0)
50     , task_stack_clock_(0)
51     , idle_threads_(0)
52     , should_terminate_(false) {
53 }
54 
~ThreadPool()55 ThreadPool::~ThreadPool() {
56   terminate();
57 }
58 
init(const uint64_t concurrency_level)59 Status ThreadPool::init(const uint64_t concurrency_level) {
60   if (concurrency_level == 0) {
61     return Status::ThreadPoolError(
62         "Unable to initialize a thread pool with a concurrency level of 0.");
63   }
64 
65   Status st = Status::Ok();
66 
67   // We allocate one less thread than `concurrency_level` because
68   // the `wait_all*()` routines may service tasks concurrently with
69   // the worker threads.
70   const uint64_t num_threads = concurrency_level - 1;
71   for (uint64_t i = 0; i < num_threads; i++) {
72     try {
73       threads_.emplace_back([this]() { worker(*this); });
74     } catch (const std::exception& e) {
75       st = Status::ThreadPoolError(
76           "Error initializing thread pool of concurrency level " +
77           std::to_string(concurrency_level) + "; " + e.what());
78       LOG_STATUS(st);
79       break;
80     }
81   }
82 
83   if (!st.ok()) {
84     // Join any created threads on error.
85     terminate();
86     return st;
87   }
88 
89   // Save the concurrency level.
90   concurrency_level_ = concurrency_level;
91 
92   // Add indexes from this ThreadPool instance from all of its thread ids.
93   add_tp_index();
94 
95   // Add task indexes for each thread in this thread pool.
96   add_task_index();
97 
98   return st;
99 }
100 
execute(std::function<Status ()> && function)101 ThreadPool::Task ThreadPool::execute(std::function<Status()>&& function) {
102   if (concurrency_level_ == 0) {
103     Task invalid_future;
104     LOG_ERROR("Cannot execute task; thread pool uninitialized.");
105     return invalid_future;
106   }
107 
108   if (!function) {
109     Task invalid_future;
110     LOG_ERROR("Cannot execute task; invalid function.");
111     return invalid_future;
112   }
113 
114   std::unique_lock<std::mutex> ul(task_stack_mutex_);
115 
116   if (should_terminate_) {
117     Task invalid_future;
118     LOG_ERROR("Cannot execute task; thread pool has terminated.");
119     return invalid_future;
120   }
121 
122   // Locate the currently executing task, which may be null.
123   const std::thread::id tid = std::this_thread::get_id();
124   tdb_shared_ptr<PackagedTask> parent_task = lookup_task(tid);
125 
126   // Create the packaged task.
127   tdb_shared_ptr<PackagedTask> task = tdb_make_shared(
128       PackagedTask, std::move(function), std::move(parent_task));
129 
130   // Fetch the future from the packaged task.
131   ThreadPool::Task future = task->get_future();
132 
133   // When we have a concurrency level > 1, we will have at least
134   // one thread available to pick up the task. For a concurrency
135   // level == 1, we have no worker threads available. When no
136   // worker threads are available, execute the task on this
137   // thread.
138   if (concurrency_level_ == 1) {
139     ul.unlock();
140     exec_packaged_task(task);
141   } else {
142     // Lookup the thread pool that this thread belongs to. If it
143     // does not belong to a thread pool, `lookup_tp` will return
144     // `nullptr`.
145     ThreadPool* const tp = lookup_tp(tid);
146 
147     // As both an optimization and a means of breaking deadlock,
148     // execute the task if this thread belongs to `this`. Otherwise,
149     // queue it for a worker thread.
150     if (tp == this && idle_threads_ == 0) {
151       ul.unlock();
152       exec_packaged_task(task);
153     } else {
154       // Add `task` to the stack of pending tasks.
155       task_stack_.emplace_back(std::move(task));
156       task_stack_cv_.notify_one();
157 
158       // Increment the logical clock to indicate that the
159       // `task_stack_` has been modified.
160       ++task_stack_clock_;
161 
162       // The `ul` protects `task_stack_`, `task_stack_clock_`, and
163       // `idle_threads_`. Save a copy of `idle_threads_` before releasing
164       // the lock.
165       const uint64_t idle_threads_cp = idle_threads_;
166       ul.unlock();
167 
168       // If all threads are busy, signal a thread in `this` that is
169       // blocked waiting on another task. This wakes up one of those
170       // threads to service the `task` that we just added to `task_stack_`.
171       // There is a race here on `idle_threads_` because we just released
172       // the lock. If a thread became idle and picks up `task`, we have
173       // spuriously unlocked a thread in the `wait` path. It will find
174       // that the `task_stack_` is empty and re-enter its wait.
175       if (idle_threads_cp == 0) {
176         blocked_tasks_mutex_.lock();
177         if (!blocked_tasks_.empty()) {
178           // Signal the first blocked task to wake up and check the task
179           // stack for a task to execute.
180           tdb_shared_ptr<TaskState> blocked_task = *blocked_tasks_.begin();
181           {
182             std::lock_guard<std::mutex> lg(blocked_task->return_st_mutex_);
183             blocked_task->check_task_stack_ = true;
184           }
185           blocked_task->cv_.notify_all();
186           blocked_tasks_.erase(blocked_task);
187         }
188         blocked_tasks_mutex_.unlock();
189       }
190     }
191   }
192 
193   assert(future.valid());
194   return future;
195 }
196 
concurrency_level() const197 uint64_t ThreadPool::concurrency_level() const {
198   return concurrency_level_;
199 }
200 
wait_all(std::vector<Task> & tasks)201 Status ThreadPool::wait_all(std::vector<Task>& tasks) {
202   auto statuses = wait_all_status(tasks);
203   for (auto& st : statuses) {
204     if (!st.ok()) {
205       return st;
206     }
207   }
208   return Status::Ok();
209 }
210 
wait_all_status(std::vector<Task> & tasks)211 std::vector<Status> ThreadPool::wait_all_status(std::vector<Task>& tasks) {
212   std::vector<Status> statuses;
213   for (auto& task : tasks) {
214     if (!task.valid()) {
215       LOG_ERROR("Waiting on invalid task future.");
216       statuses.push_back(Status::ThreadPoolError("Invalid task future"));
217     } else {
218       Status status = wait_or_work(std::move(task));
219       if (!status.ok()) {
220         LOG_STATUS(status);
221       }
222       statuses.push_back(status);
223     }
224   }
225   return statuses;
226 }
227 
wait_or_work(Task && task)228 Status ThreadPool::wait_or_work(Task&& task) {
229   // Records the last read value from `task_stack_clock_`.
230   uint64_t last_task_stack_clock = 0;
231 
232   do {
233     if (task.done())
234       break;
235 
236     // Lock the `task_stack_` to receive the next task to work on.
237     task_stack_mutex_.lock();
238 
239     // Determine if `task_stack_` has been modified since our
240     // last loop. This is always true for the first iteration
241     // in this loop. Note that `task_stack_clock_` may overflow,
242     // producing a false-positive. In that scenario, we will
243     // perform one spurious loop but will not affect the
244     // correctness of this routine.
245     const bool task_stack_modified = last_task_stack_clock == 0 ||
246                                      last_task_stack_clock != task_stack_clock_;
247 
248     // If there are no pending tasks or the stack of pending tasks has
249     // not changed since our last inspection, we will wait for `task`
250     // to complete.
251     if (task_stack_.empty() || !task_stack_modified) {
252       task_stack_mutex_.unlock();
253 
254       // Add `task` to `blocked_tasks_` so that the `execute()` path can
255       // signal it when a new pending task is available.
256       blocked_tasks_mutex_.lock();
257       blocked_tasks_.insert(task.task_state_);
258       blocked_tasks_mutex_.unlock();
259 
260       // Block until the task is signaled. It will be signaled when it
261       // has completed or when there is new work to execute on `task_stack_`.
262       task.wait();
263 
264       // Remove `task` from `blocked_tasks_`.
265       blocked_tasks_mutex_.lock();
266       if (blocked_tasks_.count(task.task_state_) > 0)
267         blocked_tasks_.erase(task.task_state_);
268       blocked_tasks_mutex_.unlock();
269 
270       // After the task has been woken up, check to see if it has completed.
271       if (task.done()) {
272         break;
273       }
274 
275       // The task did not complete. This task has been signaled because a new
276       // pending task was added to `task_stack_`. Reset the `check_task_stack_`
277       // flag.
278       {
279         std::lock_guard<std::mutex> lg(task.task_state_->return_st_mutex_);
280         task.task_state_->check_task_stack_ = false;
281       }
282 
283       // Lock the `task_stack_` again before checking for the next pending task.
284       task_stack_mutex_.lock();
285     }
286 
287     // We may have released and re-aquired the `task_stack_mutex_`. We must
288     // check if it is still non-empty.
289     if (!task_stack_.empty()) {
290       // Pull the next task off of the task stack. We specifically use a LIFO
291       // ordering to prevent overflowing the call stack. We will skip tasks
292       // that are not descendents of the task we are currently executing in.
293       const std::thread::id tid = std::this_thread::get_id();
294       tdb_shared_ptr<PackagedTask> current_task = lookup_task(tid);
295       tdb_shared_ptr<PackagedTask> descendent_task = nullptr;
296       if (current_task == nullptr) {
297         // We are not executing in the context of a threadpool task, we do
298         // not have any restriction on which task we can execute. Select
299         // the next one.
300         descendent_task = task_stack_.back();
301         task_stack_.pop_back();
302       } else {
303         // Find the next pending task that is a descendent of `current_task`.
304         for (auto riter = task_stack_.rbegin(); riter != task_stack_.rend();
305              ++riter) {
306           // Determine if the task pointed to by `riter` is a descendent
307           // of `current_task`.
308           const PackagedTask* tmp_task = riter->get();
309           while (tmp_task != nullptr) {
310             const PackagedTask* const tmp_task_parent = tmp_task->get_parent();
311             if (tmp_task_parent == current_task.get()) {
312               descendent_task = *riter;
313               break;
314             }
315             tmp_task = tmp_task_parent;
316           }
317 
318           // If we found a descendent task, erase it from the task stack
319           // and break.
320           if (descendent_task != nullptr) {
321             task_stack_.erase(std::next(riter).base());
322             break;
323           }
324         }
325       }
326 
327       // Save the current state of `task_stack_clock_`. We may mutate it below.
328       last_task_stack_clock = task_stack_clock_;
329 
330       // If `descendent_task` is non-null, we must have removed it from
331       // the `task_stack_`. In this scenario, increment the logical clock
332       // to indicate that the `task_stack_` has been modified.
333       if (descendent_task != nullptr)
334         ++task_stack_clock_;
335 
336       // We're done mutating `task_stack_` and `task_stack_clock_`.
337       task_stack_mutex_.unlock();
338 
339       // Execute the descendent task if we found one.
340       if (descendent_task != nullptr)
341         exec_packaged_task(descendent_task);
342     } else {
343       // The task stack is now empty, retry.
344       task_stack_mutex_.unlock();
345     }
346   } while (true);
347 
348   // The task has completed and will not block.
349   assert(task.done());
350   return task.get();
351 }
352 
terminate()353 void ThreadPool::terminate() {
354   {
355     std::unique_lock<std::mutex> ul(task_stack_mutex_);
356     should_terminate_ = true;
357     task_stack_cv_.notify_all();
358   }
359 
360   remove_tp_index();
361 
362   for (auto& t : threads_) {
363     t.join();
364   }
365 
366   remove_task_index();
367 
368   threads_.clear();
369 }
370 
worker(ThreadPool & pool)371 void ThreadPool::worker(ThreadPool& pool) {
372   while (true) {
373     tdb_shared_ptr<PackagedTask> task = nullptr;
374 
375     {
376       // Wait until there's work to do.
377       std::unique_lock<std::mutex> ul(pool.task_stack_mutex_);
378       ++pool.idle_threads_;
379       pool.task_stack_cv_.wait(ul, [&pool]() {
380         return pool.should_terminate_ || !pool.task_stack_.empty();
381       });
382 
383       if (!pool.task_stack_.empty()) {
384         task = std::move(pool.task_stack_.back());
385         pool.task_stack_.pop_back();
386         --pool.idle_threads_;
387       } else {
388         // The task stack was empty, ensure `task` is invalid.
389         task = nullptr;
390       }
391     }
392 
393     if (task != nullptr)
394       exec_packaged_task(task);
395 
396     if (pool.should_terminate_)
397       break;
398   }
399 }
400 
add_tp_index()401 void ThreadPool::add_tp_index() {
402   std::lock_guard<std::mutex> lock(tp_index_lock_);
403   for (const auto& thread : threads_)
404     tp_index_[thread.get_id()] = this;
405 }
406 
remove_tp_index()407 void ThreadPool::remove_tp_index() {
408   std::lock_guard<std::mutex> lock(tp_index_lock_);
409   for (const auto& thread : threads_)
410     tp_index_.erase(thread.get_id());
411 }
412 
lookup_tp(const std::thread::id tid)413 ThreadPool* ThreadPool::lookup_tp(const std::thread::id tid) {
414   std::lock_guard<std::mutex> lock(tp_index_lock_);
415   if (tp_index_.count(tid) == 1)
416     return tp_index_[tid];
417   return nullptr;
418 }
419 
add_task_index()420 void ThreadPool::add_task_index() {
421   std::lock_guard<std::mutex> lock(task_index_lock_);
422   for (const auto& thread : threads_)
423     task_index_[thread.get_id()] = nullptr;
424 }
425 
remove_task_index()426 void ThreadPool::remove_task_index() {
427   std::lock_guard<std::mutex> lock(task_index_lock_);
428   for (const auto& thread : threads_)
429     task_index_.erase(thread.get_id());
430 }
431 
lookup_task(const std::thread::id tid)432 tdb_shared_ptr<ThreadPool::PackagedTask> ThreadPool::lookup_task(
433     const std::thread::id tid) {
434   std::lock_guard<std::mutex> lock(task_index_lock_);
435   if (task_index_.count(tid) == 1)
436     return task_index_[tid];
437   return nullptr;
438 }
439 
exec_packaged_task(tdb_shared_ptr<PackagedTask> const task)440 void ThreadPool::exec_packaged_task(tdb_shared_ptr<PackagedTask> const task) {
441   const std::thread::id tid = std::this_thread::get_id();
442 
443   // Before we execute `task`, we must update `task_index_` to map
444   // this thread-id to the executing task. Note that we only lock
445   // `task_index_` to protect the container itself. The elements
446   // on the map are only ever accessed by the thread that the
447   // element is keyed on, making it implicitly safe.
448   tdb_shared_ptr<PackagedTask> tmp_task = nullptr;
449   std::unique_lock<std::mutex> ul(task_index_lock_);
450   if (task_index_.count(tid) == 1)
451     tmp_task = task_index_[tid];
452   task_index_[tid] = task;
453   ul.unlock();
454 
455   // Execute `task`.
456   (*task)();
457 
458   // Restore `task_index_` to the task that it was previously
459   // executing, which may be null.
460   ul.lock();
461   task_index_[tid] = tmp_task;
462   ul.unlock();
463 }
464 
465 }  // namespace common
466 }  // namespace tiledb
467