1 /*
2 * Copyright (c) Facebook, Inc. and its affiliates.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <thrift/lib/cpp/concurrency/TimerManager.h>
18
19 #include <assert.h>
20 #include <iostream>
21 #include <set>
22
23 #include <thrift/lib/cpp/concurrency/Exception.h>
24 #include <thrift/lib/cpp/concurrency/Util.h>
25
26 namespace apache {
27 namespace thrift {
28 namespace concurrency {
29
30 using std::shared_ptr;
31
32 /**
33 * TimerManager class
34 *
35 * @version $Id:$
36 */
37 class TimerManager::Task : public Runnable {
38 public:
39 enum STATE {
40 WAITING,
41 EXECUTING,
42 CANCELLED,
43 COMPLETE,
44 };
45
Task(shared_ptr<Runnable> runnable)46 Task(shared_ptr<Runnable> runnable) : runnable_(runnable), state_(WAITING) {}
47
~Task()48 ~Task() override {}
49
run()50 void run() override {
51 if (state_ == EXECUTING) {
52 runnable_->run();
53 state_ = COMPLETE;
54 }
55 }
56
57 private:
58 shared_ptr<Runnable> runnable_;
59 friend class TimerManager::Dispatcher;
60 STATE state_;
61 };
62
63 class TimerManager::Dispatcher : public Runnable {
64 public:
Dispatcher(TimerManager * manager)65 Dispatcher(TimerManager* manager) : manager_(manager) {}
66
~Dispatcher()67 ~Dispatcher() override {}
68
69 /**
70 * Dispatcher entry point
71 *
72 * As long as dispatcher thread is running, pull tasks off the task taskMap_
73 * and execute.
74 */
run()75 void run() override {
76 {
77 std::unique_lock<std::mutex> l(manager_->mutex_);
78 if (manager_->state_ == TimerManager::STARTING) {
79 manager_->state_ = TimerManager::STARTED;
80 manager_->cond_.notify_all();
81 }
82 }
83
84 do {
85 std::set<shared_ptr<TimerManager::Task>> expiredTasks;
86 {
87 std::unique_lock<std::mutex> l(manager_->mutex_);
88 task_iterator expiredTaskEnd;
89 int64_t now = Util::currentTime();
90 while (manager_->state_ == TimerManager::STARTED &&
91 (expiredTaskEnd = manager_->taskMap_.upper_bound(now)) ==
92 manager_->taskMap_.begin()) {
93 int64_t timeout = 0LL;
94 if (!manager_->taskMap_.empty()) {
95 timeout = manager_->taskMap_.begin()->first - now;
96 }
97 auto count = manager_->taskCount_.load(std::memory_order_relaxed);
98 (void)count;
99 assert((timeout != 0 && count > 0) || (timeout == 0 && count == 0));
100 if (timeout == 0) {
101 manager_->cond_.wait(l);
102 } else {
103 manager_->cond_.wait_for(l, std::chrono::milliseconds(timeout));
104 }
105 now = Util::currentTime();
106 }
107
108 if (manager_->state_ == TimerManager::STARTED) {
109 for (task_iterator ix = manager_->taskMap_.begin();
110 ix != expiredTaskEnd;
111 ix++) {
112 shared_ptr<TimerManager::Task> task = ix->second;
113 expiredTasks.insert(task);
114 if (task->state_ == TimerManager::Task::WAITING) {
115 task->state_ = TimerManager::Task::EXECUTING;
116 }
117 auto count = manager_->taskCount_.load(std::memory_order_relaxed);
118 manager_->taskCount_.store(count - 1, std::memory_order_relaxed);
119 }
120 manager_->taskMap_.erase(manager_->taskMap_.begin(), expiredTaskEnd);
121 }
122 }
123
124 for (std::set<shared_ptr<Task>>::iterator ix = expiredTasks.begin();
125 ix != expiredTasks.end();
126 ix++) {
127 (*ix)->run();
128 }
129
130 } while (manager_->state_ == TimerManager::STARTED);
131
132 {
133 std::unique_lock<std::mutex> l(manager_->mutex_);
134 if (manager_->state_ == TimerManager::STOPPING) {
135 manager_->state_ = TimerManager::STOPPED;
136 manager_->cond_.notify_one();
137 }
138 }
139 return;
140 }
141
142 private:
143 TimerManager* manager_;
144 friend class TimerManager;
145 };
146
TimerManager()147 TimerManager::TimerManager()
148 : taskCount_(0),
149 state_(TimerManager::UNINITIALIZED),
150 dispatcher_(shared_ptr<Dispatcher>(new Dispatcher(this))) {}
151
~TimerManager()152 TimerManager::~TimerManager() {
153 // If we haven't been explicitly stopped, do so now. We don't need to grab
154 // the monitor here, since stop already takes care of reentrancy.
155
156 if (state_ != STOPPED) {
157 stop();
158 }
159 }
160
start()161 void TimerManager::start() {
162 bool doStart = false;
163 {
164 std::unique_lock<std::mutex> l(mutex_);
165 if (threadFactory_ == nullptr) {
166 throw InvalidArgumentException();
167 }
168 if (state_ == TimerManager::UNINITIALIZED) {
169 state_ = TimerManager::STARTING;
170 doStart = true;
171 }
172 }
173
174 if (doStart) {
175 dispatcherThread_ = threadFactory_->newThread(dispatcher_);
176 dispatcherThread_->start();
177 }
178
179 {
180 std::unique_lock<std::mutex> l(mutex_);
181 while (state_ == TimerManager::STARTING) {
182 cond_.wait(l);
183 }
184 assert(state_ != TimerManager::STARTING);
185 }
186 }
187
stop()188 void TimerManager::stop() {
189 bool doStop = false;
190 {
191 std::unique_lock<std::mutex> l(mutex_);
192 if (state_ == TimerManager::UNINITIALIZED) {
193 state_ = TimerManager::STOPPED;
194 } else if (state_ != STOPPING && state_ != STOPPED) {
195 doStop = true;
196 state_ = STOPPING;
197 cond_.notify_all();
198 }
199 while (state_ != STOPPED) {
200 cond_.wait(l);
201 }
202 }
203
204 if (doStop) {
205 // Clean up any outstanding tasks
206 taskMap_.clear();
207
208 // Remove dispatcher's reference to us.
209 dispatcher_->manager_ = nullptr;
210 }
211 }
212
threadFactory() const213 shared_ptr<const ThreadFactory> TimerManager::threadFactory() const {
214 std::unique_lock<std::mutex> l(mutex_);
215 return threadFactory_;
216 }
217
threadFactory(shared_ptr<const ThreadFactory> value)218 void TimerManager::threadFactory(shared_ptr<const ThreadFactory> value) {
219 std::unique_lock<std::mutex> l(mutex_);
220 threadFactory_ = value;
221 }
222
taskCount() const223 size_t TimerManager::taskCount() const {
224 return taskCount_.load(std::memory_order_relaxed);
225 }
226
add(shared_ptr<Runnable> task,int64_t timeout)227 void TimerManager::add(shared_ptr<Runnable> task, int64_t timeout) {
228 int64_t now = Util::currentTime();
229 timeout += now;
230
231 {
232 std::unique_lock<std::mutex> l(mutex_);
233 if (state_ != TimerManager::STARTED) {
234 throw IllegalStateException();
235 }
236
237 // If the task map was empty, or if we have an expiration that is earlier
238 // than any previously seen, kick the dispatcher so it can update its
239 // timeout. Do this before inserting to limit comparisons to current tasks
240 auto const count = taskCount_.load(std::memory_order_relaxed);
241 if (count == 0 || timeout < taskMap_.begin()->first) {
242 cond_.notify_one();
243 }
244
245 taskCount_.store(count + 1, std::memory_order_relaxed);
246 taskMap_.insert({timeout, std::make_shared<Task>(std::move(task))});
247 }
248 }
249
add(shared_ptr<Runnable> task,const struct timespec & value)250 void TimerManager::add(
251 shared_ptr<Runnable> task, const struct timespec& value) {
252 int64_t expiration;
253 Util::toMilliseconds(expiration, value);
254
255 int64_t now = Util::currentTime();
256
257 if (expiration < now) {
258 throw InvalidArgumentException();
259 }
260
261 add(task, expiration - now);
262 }
263
remove(shared_ptr<Runnable>)264 void TimerManager::remove(shared_ptr<Runnable> /*task*/) {
265 std::unique_lock<std::mutex> l(mutex_);
266 if (state_ != TimerManager::STARTED) {
267 throw IllegalStateException();
268 }
269 }
270
state() const271 TimerManager::STATE TimerManager::state() const {
272 return state_;
273 }
274
275 } // namespace concurrency
276 } // namespace thrift
277 } // namespace apache
278