1 /**
2  * @file   thread_pool.cc
3  *
4  * @section LICENSE
5  *
6  * The MIT License
7  *
8  * @copyright Copyright (c) 2018-2021 TileDB, Inc.
9  *
10  * Permission is hereby granted, free of charge, to any person obtaining a copy
11  * of this software and associated documentation files (the "Software"), to deal
12  * in the Software without restriction, including without limitation the rights
13  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14  * copies of the Software, and to permit persons to whom the Software is
15  * furnished to do so, subject to the following conditions:
16  *
17  * The above copyright notice and this permission notice shall be included in
18  * all copies or substantial portions of the Software.
19  *
20  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26  * THE SOFTWARE.
27  *
28  * @section DESCRIPTION
29  *
30  * This file defines the ThreadPool class.
31  */
32 
33 #include "tiledb/sm/misc/cancelable_tasks.h"
34 #include "tiledb/common/thread_pool.h"
35 
36 using namespace tiledb::common;
37 
38 namespace tiledb {
39 namespace sm {
40 
CancelableTasks()41 CancelableTasks::CancelableTasks()
42     : outstanding_tasks_(0)
43     , should_cancel_(false) {
44 }
45 
execute(ThreadPool * const thread_pool,std::function<Status ()> && fn,std::function<void ()> && on_cancel)46 ThreadPool::Task CancelableTasks::execute(
47     ThreadPool* const thread_pool,
48     std::function<Status()>&& fn,
49     std::function<void()>&& on_cancel) {
50   std::function<Status()> wrapped_fn =
51       std::bind(&CancelableTasks::fn_wrapper, this, fn, on_cancel);
52 
53   ThreadPool::Task task = thread_pool->execute(std::move(wrapped_fn));
54   if (task.valid()) {
55     std::unique_lock<std::mutex> lck(outstanding_tasks_mutex_);
56     ++outstanding_tasks_;
57   }
58 
59   return task;
60 }
61 
cancel_all_tasks()62 void CancelableTasks::cancel_all_tasks() {
63   {
64     std::unique_lock<std::mutex> lck(outstanding_tasks_mutex_);
65     should_cancel_ = true;
66   }
67 
68   // Wait for all outstanding tasks to cancel.
69   {
70     std::unique_lock<std::mutex> lck(outstanding_tasks_mutex_);
71     outstanding_tasks_cv_.wait(
72         lck, [this]() { return outstanding_tasks_ == 0; });
73     should_cancel_ = false;
74   }
75 }
76 
fn_wrapper(const std::function<Status ()> & fn,const std::function<void ()> & on_cancel)77 Status CancelableTasks::fn_wrapper(
78     const std::function<Status()>& fn, const std::function<void()>& on_cancel) {
79   std::unique_lock<std::mutex> lck(outstanding_tasks_mutex_);
80   if (should_cancel_) {
81     if (on_cancel) {
82       lck.unlock();
83       on_cancel();
84       lck.lock();
85     }
86     if (--outstanding_tasks_ == 0) {
87       outstanding_tasks_cv_.notify_all();
88     }
89     return Status::Error("Task cancelled before execution.");
90   } else {
91     lck.unlock();
92     Status st = fn();
93     lck.lock();
94     --outstanding_tasks_;
95     // If 'should_cancel_' became true when the lock was released to execute
96     // 'fn', we need to signal the CV.
97     if (should_cancel_ && outstanding_tasks_ == 0) {
98       outstanding_tasks_cv_.notify_all();
99     }
100     return st;
101   }
102 }
103 
104 }  // namespace sm
105 }  // namespace tiledb
106