1 /**
2 * @file thread_pool.cc
3 *
4 * @section LICENSE
5 *
6 * The MIT License
7 *
8 * @copyright Copyright (c) 2018-2021 TileDB, Inc.
9 *
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
16 *
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
19 *
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26 * THE SOFTWARE.
27 *
28 * @section DESCRIPTION
29 *
30 * This file defines the ThreadPool class.
31 */
32
33 #include <cassert>
34
35 #include "tiledb/common/logger.h"
36 #include "tiledb/common/thread_pool.h"
37
38 namespace tiledb {
39 namespace common {
40
41 // Define the static ThreadPool member variables.
42 std::unordered_map<std::thread::id, ThreadPool*> ThreadPool::tp_index_;
43 std::mutex ThreadPool::tp_index_lock_;
44 std::unordered_map<std::thread::id, tdb_shared_ptr<ThreadPool::PackagedTask>>
45 ThreadPool::task_index_;
46 std::mutex ThreadPool::task_index_lock_;
47
ThreadPool()48 ThreadPool::ThreadPool()
49 : concurrency_level_(0)
50 , task_stack_clock_(0)
51 , idle_threads_(0)
52 , should_terminate_(false) {
53 }
54
~ThreadPool()55 ThreadPool::~ThreadPool() {
56 terminate();
57 }
58
init(const uint64_t concurrency_level)59 Status ThreadPool::init(const uint64_t concurrency_level) {
60 if (concurrency_level == 0) {
61 return Status::ThreadPoolError(
62 "Unable to initialize a thread pool with a concurrency level of 0.");
63 }
64
65 Status st = Status::Ok();
66
67 // We allocate one less thread than `concurrency_level` because
68 // the `wait_all*()` routines may service tasks concurrently with
69 // the worker threads.
70 const uint64_t num_threads = concurrency_level - 1;
71 for (uint64_t i = 0; i < num_threads; i++) {
72 try {
73 threads_.emplace_back([this]() { worker(*this); });
74 } catch (const std::exception& e) {
75 st = Status::ThreadPoolError(
76 "Error initializing thread pool of concurrency level " +
77 std::to_string(concurrency_level) + "; " + e.what());
78 LOG_STATUS(st);
79 break;
80 }
81 }
82
83 if (!st.ok()) {
84 // Join any created threads on error.
85 terminate();
86 return st;
87 }
88
89 // Save the concurrency level.
90 concurrency_level_ = concurrency_level;
91
92 // Add indexes from this ThreadPool instance from all of its thread ids.
93 add_tp_index();
94
95 // Add task indexes for each thread in this thread pool.
96 add_task_index();
97
98 return st;
99 }
100
execute(std::function<Status ()> && function)101 ThreadPool::Task ThreadPool::execute(std::function<Status()>&& function) {
102 if (concurrency_level_ == 0) {
103 Task invalid_future;
104 LOG_ERROR("Cannot execute task; thread pool uninitialized.");
105 return invalid_future;
106 }
107
108 if (!function) {
109 Task invalid_future;
110 LOG_ERROR("Cannot execute task; invalid function.");
111 return invalid_future;
112 }
113
114 std::unique_lock<std::mutex> ul(task_stack_mutex_);
115
116 if (should_terminate_) {
117 Task invalid_future;
118 LOG_ERROR("Cannot execute task; thread pool has terminated.");
119 return invalid_future;
120 }
121
122 // Locate the currently executing task, which may be null.
123 const std::thread::id tid = std::this_thread::get_id();
124 tdb_shared_ptr<PackagedTask> parent_task = lookup_task(tid);
125
126 // Create the packaged task.
127 tdb_shared_ptr<PackagedTask> task = tdb_make_shared(
128 PackagedTask, std::move(function), std::move(parent_task));
129
130 // Fetch the future from the packaged task.
131 ThreadPool::Task future = task->get_future();
132
133 // When we have a concurrency level > 1, we will have at least
134 // one thread available to pick up the task. For a concurrency
135 // level == 1, we have no worker threads available. When no
136 // worker threads are available, execute the task on this
137 // thread.
138 if (concurrency_level_ == 1) {
139 ul.unlock();
140 exec_packaged_task(task);
141 } else {
142 // Lookup the thread pool that this thread belongs to. If it
143 // does not belong to a thread pool, `lookup_tp` will return
144 // `nullptr`.
145 ThreadPool* const tp = lookup_tp(tid);
146
147 // As both an optimization and a means of breaking deadlock,
148 // execute the task if this thread belongs to `this`. Otherwise,
149 // queue it for a worker thread.
150 if (tp == this && idle_threads_ == 0) {
151 ul.unlock();
152 exec_packaged_task(task);
153 } else {
154 // Add `task` to the stack of pending tasks.
155 task_stack_.emplace_back(std::move(task));
156 task_stack_cv_.notify_one();
157
158 // Increment the logical clock to indicate that the
159 // `task_stack_` has been modified.
160 ++task_stack_clock_;
161
162 // The `ul` protects `task_stack_`, `task_stack_clock_`, and
163 // `idle_threads_`. Save a copy of `idle_threads_` before releasing
164 // the lock.
165 const uint64_t idle_threads_cp = idle_threads_;
166 ul.unlock();
167
168 // If all threads are busy, signal a thread in `this` that is
169 // blocked waiting on another task. This wakes up one of those
170 // threads to service the `task` that we just added to `task_stack_`.
171 // There is a race here on `idle_threads_` because we just released
172 // the lock. If a thread became idle and picks up `task`, we have
173 // spuriously unlocked a thread in the `wait` path. It will find
174 // that the `task_stack_` is empty and re-enter its wait.
175 if (idle_threads_cp == 0) {
176 blocked_tasks_mutex_.lock();
177 if (!blocked_tasks_.empty()) {
178 // Signal the first blocked task to wake up and check the task
179 // stack for a task to execute.
180 tdb_shared_ptr<TaskState> blocked_task = *blocked_tasks_.begin();
181 {
182 std::lock_guard<std::mutex> lg(blocked_task->return_st_mutex_);
183 blocked_task->check_task_stack_ = true;
184 }
185 blocked_task->cv_.notify_all();
186 blocked_tasks_.erase(blocked_task);
187 }
188 blocked_tasks_mutex_.unlock();
189 }
190 }
191 }
192
193 assert(future.valid());
194 return future;
195 }
196
concurrency_level() const197 uint64_t ThreadPool::concurrency_level() const {
198 return concurrency_level_;
199 }
200
wait_all(std::vector<Task> & tasks)201 Status ThreadPool::wait_all(std::vector<Task>& tasks) {
202 auto statuses = wait_all_status(tasks);
203 for (auto& st : statuses) {
204 if (!st.ok()) {
205 return st;
206 }
207 }
208 return Status::Ok();
209 }
210
wait_all_status(std::vector<Task> & tasks)211 std::vector<Status> ThreadPool::wait_all_status(std::vector<Task>& tasks) {
212 std::vector<Status> statuses;
213 for (auto& task : tasks) {
214 if (!task.valid()) {
215 LOG_ERROR("Waiting on invalid task future.");
216 statuses.push_back(Status::ThreadPoolError("Invalid task future"));
217 } else {
218 Status status = wait_or_work(std::move(task));
219 if (!status.ok()) {
220 LOG_STATUS(status);
221 }
222 statuses.push_back(status);
223 }
224 }
225 return statuses;
226 }
227
wait_or_work(Task && task)228 Status ThreadPool::wait_or_work(Task&& task) {
229 // Records the last read value from `task_stack_clock_`.
230 uint64_t last_task_stack_clock = 0;
231
232 do {
233 if (task.done())
234 break;
235
236 // Lock the `task_stack_` to receive the next task to work on.
237 task_stack_mutex_.lock();
238
239 // Determine if `task_stack_` has been modified since our
240 // last loop. This is always true for the first iteration
241 // in this loop. Note that `task_stack_clock_` may overflow,
242 // producing a false-positive. In that scenario, we will
243 // perform one spurious loop but will not affect the
244 // correctness of this routine.
245 const bool task_stack_modified = last_task_stack_clock == 0 ||
246 last_task_stack_clock != task_stack_clock_;
247
248 // If there are no pending tasks or the stack of pending tasks has
249 // not changed since our last inspection, we will wait for `task`
250 // to complete.
251 if (task_stack_.empty() || !task_stack_modified) {
252 task_stack_mutex_.unlock();
253
254 // Add `task` to `blocked_tasks_` so that the `execute()` path can
255 // signal it when a new pending task is available.
256 blocked_tasks_mutex_.lock();
257 blocked_tasks_.insert(task.task_state_);
258 blocked_tasks_mutex_.unlock();
259
260 // Block until the task is signaled. It will be signaled when it
261 // has completed or when there is new work to execute on `task_stack_`.
262 task.wait();
263
264 // Remove `task` from `blocked_tasks_`.
265 blocked_tasks_mutex_.lock();
266 if (blocked_tasks_.count(task.task_state_) > 0)
267 blocked_tasks_.erase(task.task_state_);
268 blocked_tasks_mutex_.unlock();
269
270 // After the task has been woken up, check to see if it has completed.
271 if (task.done()) {
272 break;
273 }
274
275 // The task did not complete. This task has been signaled because a new
276 // pending task was added to `task_stack_`. Reset the `check_task_stack_`
277 // flag.
278 {
279 std::lock_guard<std::mutex> lg(task.task_state_->return_st_mutex_);
280 task.task_state_->check_task_stack_ = false;
281 }
282
283 // Lock the `task_stack_` again before checking for the next pending task.
284 task_stack_mutex_.lock();
285 }
286
287 // We may have released and re-aquired the `task_stack_mutex_`. We must
288 // check if it is still non-empty.
289 if (!task_stack_.empty()) {
290 // Pull the next task off of the task stack. We specifically use a LIFO
291 // ordering to prevent overflowing the call stack. We will skip tasks
292 // that are not descendents of the task we are currently executing in.
293 const std::thread::id tid = std::this_thread::get_id();
294 tdb_shared_ptr<PackagedTask> current_task = lookup_task(tid);
295 tdb_shared_ptr<PackagedTask> descendent_task = nullptr;
296 if (current_task == nullptr) {
297 // We are not executing in the context of a threadpool task, we do
298 // not have any restriction on which task we can execute. Select
299 // the next one.
300 descendent_task = task_stack_.back();
301 task_stack_.pop_back();
302 } else {
303 // Find the next pending task that is a descendent of `current_task`.
304 for (auto riter = task_stack_.rbegin(); riter != task_stack_.rend();
305 ++riter) {
306 // Determine if the task pointed to by `riter` is a descendent
307 // of `current_task`.
308 const PackagedTask* tmp_task = riter->get();
309 while (tmp_task != nullptr) {
310 const PackagedTask* const tmp_task_parent = tmp_task->get_parent();
311 if (tmp_task_parent == current_task.get()) {
312 descendent_task = *riter;
313 break;
314 }
315 tmp_task = tmp_task_parent;
316 }
317
318 // If we found a descendent task, erase it from the task stack
319 // and break.
320 if (descendent_task != nullptr) {
321 task_stack_.erase(std::next(riter).base());
322 break;
323 }
324 }
325 }
326
327 // Save the current state of `task_stack_clock_`. We may mutate it below.
328 last_task_stack_clock = task_stack_clock_;
329
330 // If `descendent_task` is non-null, we must have removed it from
331 // the `task_stack_`. In this scenario, increment the logical clock
332 // to indicate that the `task_stack_` has been modified.
333 if (descendent_task != nullptr)
334 ++task_stack_clock_;
335
336 // We're done mutating `task_stack_` and `task_stack_clock_`.
337 task_stack_mutex_.unlock();
338
339 // Execute the descendent task if we found one.
340 if (descendent_task != nullptr)
341 exec_packaged_task(descendent_task);
342 } else {
343 // The task stack is now empty, retry.
344 task_stack_mutex_.unlock();
345 }
346 } while (true);
347
348 // The task has completed and will not block.
349 assert(task.done());
350 return task.get();
351 }
352
terminate()353 void ThreadPool::terminate() {
354 {
355 std::unique_lock<std::mutex> ul(task_stack_mutex_);
356 should_terminate_ = true;
357 task_stack_cv_.notify_all();
358 }
359
360 remove_tp_index();
361
362 for (auto& t : threads_) {
363 t.join();
364 }
365
366 remove_task_index();
367
368 threads_.clear();
369 }
370
worker(ThreadPool & pool)371 void ThreadPool::worker(ThreadPool& pool) {
372 while (true) {
373 tdb_shared_ptr<PackagedTask> task = nullptr;
374
375 {
376 // Wait until there's work to do.
377 std::unique_lock<std::mutex> ul(pool.task_stack_mutex_);
378 ++pool.idle_threads_;
379 pool.task_stack_cv_.wait(ul, [&pool]() {
380 return pool.should_terminate_ || !pool.task_stack_.empty();
381 });
382
383 if (!pool.task_stack_.empty()) {
384 task = std::move(pool.task_stack_.back());
385 pool.task_stack_.pop_back();
386 --pool.idle_threads_;
387 } else {
388 // The task stack was empty, ensure `task` is invalid.
389 task = nullptr;
390 }
391 }
392
393 if (task != nullptr)
394 exec_packaged_task(task);
395
396 if (pool.should_terminate_)
397 break;
398 }
399 }
400
add_tp_index()401 void ThreadPool::add_tp_index() {
402 std::lock_guard<std::mutex> lock(tp_index_lock_);
403 for (const auto& thread : threads_)
404 tp_index_[thread.get_id()] = this;
405 }
406
remove_tp_index()407 void ThreadPool::remove_tp_index() {
408 std::lock_guard<std::mutex> lock(tp_index_lock_);
409 for (const auto& thread : threads_)
410 tp_index_.erase(thread.get_id());
411 }
412
lookup_tp(const std::thread::id tid)413 ThreadPool* ThreadPool::lookup_tp(const std::thread::id tid) {
414 std::lock_guard<std::mutex> lock(tp_index_lock_);
415 if (tp_index_.count(tid) == 1)
416 return tp_index_[tid];
417 return nullptr;
418 }
419
add_task_index()420 void ThreadPool::add_task_index() {
421 std::lock_guard<std::mutex> lock(task_index_lock_);
422 for (const auto& thread : threads_)
423 task_index_[thread.get_id()] = nullptr;
424 }
425
remove_task_index()426 void ThreadPool::remove_task_index() {
427 std::lock_guard<std::mutex> lock(task_index_lock_);
428 for (const auto& thread : threads_)
429 task_index_.erase(thread.get_id());
430 }
431
lookup_task(const std::thread::id tid)432 tdb_shared_ptr<ThreadPool::PackagedTask> ThreadPool::lookup_task(
433 const std::thread::id tid) {
434 std::lock_guard<std::mutex> lock(task_index_lock_);
435 if (task_index_.count(tid) == 1)
436 return task_index_[tid];
437 return nullptr;
438 }
439
exec_packaged_task(tdb_shared_ptr<PackagedTask> const task)440 void ThreadPool::exec_packaged_task(tdb_shared_ptr<PackagedTask> const task) {
441 const std::thread::id tid = std::this_thread::get_id();
442
443 // Before we execute `task`, we must update `task_index_` to map
444 // this thread-id to the executing task. Note that we only lock
445 // `task_index_` to protect the container itself. The elements
446 // on the map are only ever accessed by the thread that the
447 // element is keyed on, making it implicitly safe.
448 tdb_shared_ptr<PackagedTask> tmp_task = nullptr;
449 std::unique_lock<std::mutex> ul(task_index_lock_);
450 if (task_index_.count(tid) == 1)
451 tmp_task = task_index_[tid];
452 task_index_[tid] = task;
453 ul.unlock();
454
455 // Execute `task`.
456 (*task)();
457
458 // Restore `task_index_` to the task that it was previously
459 // executing, which may be null.
460 ul.lock();
461 task_index_[tid] = tmp_task;
462 ul.unlock();
463 }
464
465 } // namespace common
466 } // namespace tiledb
467