1 /**
2 * @file thread_pool.cc
3 *
4 * @section LICENSE
5 *
6 * The MIT License
7 *
8 * @copyright Copyright (c) 2018-2021 TileDB, Inc.
9 *
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
16 *
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
19 *
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26 * THE SOFTWARE.
27 *
28 * @section DESCRIPTION
29 *
30 * This file defines the ThreadPool class.
31 */
32
33 #include "tiledb/sm/misc/cancelable_tasks.h"
34 #include "tiledb/common/thread_pool.h"
35
36 using namespace tiledb::common;
37
38 namespace tiledb {
39 namespace sm {
40
CancelableTasks()41 CancelableTasks::CancelableTasks()
42 : outstanding_tasks_(0)
43 , should_cancel_(false) {
44 }
45
execute(ThreadPool * const thread_pool,std::function<Status ()> && fn,std::function<void ()> && on_cancel)46 ThreadPool::Task CancelableTasks::execute(
47 ThreadPool* const thread_pool,
48 std::function<Status()>&& fn,
49 std::function<void()>&& on_cancel) {
50 std::function<Status()> wrapped_fn =
51 std::bind(&CancelableTasks::fn_wrapper, this, fn, on_cancel);
52
53 ThreadPool::Task task = thread_pool->execute(std::move(wrapped_fn));
54 if (task.valid()) {
55 std::unique_lock<std::mutex> lck(outstanding_tasks_mutex_);
56 ++outstanding_tasks_;
57 }
58
59 return task;
60 }
61
cancel_all_tasks()62 void CancelableTasks::cancel_all_tasks() {
63 {
64 std::unique_lock<std::mutex> lck(outstanding_tasks_mutex_);
65 should_cancel_ = true;
66 }
67
68 // Wait for all outstanding tasks to cancel.
69 {
70 std::unique_lock<std::mutex> lck(outstanding_tasks_mutex_);
71 outstanding_tasks_cv_.wait(
72 lck, [this]() { return outstanding_tasks_ == 0; });
73 should_cancel_ = false;
74 }
75 }
76
fn_wrapper(const std::function<Status ()> & fn,const std::function<void ()> & on_cancel)77 Status CancelableTasks::fn_wrapper(
78 const std::function<Status()>& fn, const std::function<void()>& on_cancel) {
79 std::unique_lock<std::mutex> lck(outstanding_tasks_mutex_);
80 if (should_cancel_) {
81 if (on_cancel) {
82 lck.unlock();
83 on_cancel();
84 lck.lock();
85 }
86 if (--outstanding_tasks_ == 0) {
87 outstanding_tasks_cv_.notify_all();
88 }
89 return Status::Error("Task cancelled before execution.");
90 } else {
91 lck.unlock();
92 Status st = fn();
93 lck.lock();
94 --outstanding_tasks_;
95 // If 'should_cancel_' became true when the lock was released to execute
96 // 'fn', we need to signal the CV.
97 if (should_cancel_ && outstanding_tasks_ == 0) {
98 outstanding_tasks_cv_.notify_all();
99 }
100 return st;
101 }
102 }
103
104 } // namespace sm
105 } // namespace tiledb
106