1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 #include <thrift/concurrency/TimerManager.h>
21 #include <thrift/concurrency/Exception.h>
22 #include <thrift/concurrency/Util.h>
23
24 #include <assert.h>
25 #include <iostream>
26 #include <set>
27
28 namespace apache {
29 namespace thrift {
30 namespace concurrency {
31
32 using stdcxx::shared_ptr;
33 using stdcxx::weak_ptr;
34
35 /**
36 * TimerManager class
37 *
38 * @version $Id:$
39 */
40 class TimerManager::Task : public Runnable {
41
42 public:
43 enum STATE { WAITING, EXECUTING, CANCELLED, COMPLETE };
44
Task(shared_ptr<Runnable> runnable)45 Task(shared_ptr<Runnable> runnable) : runnable_(runnable), state_(WAITING) {}
46
~Task()47 ~Task() {}
48
run()49 void run() {
50 if (state_ == EXECUTING) {
51 runnable_->run();
52 state_ = COMPLETE;
53 }
54 }
55
operator ==(const shared_ptr<Runnable> & runnable) const56 bool operator==(const shared_ptr<Runnable> & runnable) const { return runnable_ == runnable; }
57
58 task_iterator it_;
59
60 private:
61 shared_ptr<Runnable> runnable_;
62 friend class TimerManager::Dispatcher;
63 STATE state_;
64 };
65
66 class TimerManager::Dispatcher : public Runnable {
67
68 public:
Dispatcher(TimerManager * manager)69 Dispatcher(TimerManager* manager) : manager_(manager) {}
70
~Dispatcher()71 ~Dispatcher() {}
72
73 /**
74 * Dispatcher entry point
75 *
76 * As long as dispatcher thread is running, pull tasks off the task taskMap_
77 * and execute.
78 */
run()79 void run() {
80 {
81 Synchronized s(manager_->monitor_);
82 if (manager_->state_ == TimerManager::STARTING) {
83 manager_->state_ = TimerManager::STARTED;
84 manager_->monitor_.notifyAll();
85 }
86 }
87
88 do {
89 std::set<shared_ptr<TimerManager::Task> > expiredTasks;
90 {
91 Synchronized s(manager_->monitor_);
92 task_iterator expiredTaskEnd;
93 int64_t now = Util::currentTime();
94 while (manager_->state_ == TimerManager::STARTED
95 && (expiredTaskEnd = manager_->taskMap_.upper_bound(now))
96 == manager_->taskMap_.begin()) {
97 int64_t timeout = 0LL;
98 if (!manager_->taskMap_.empty()) {
99 timeout = manager_->taskMap_.begin()->first - now;
100 }
101 assert((timeout != 0 && manager_->taskCount_ > 0)
102 || (timeout == 0 && manager_->taskCount_ == 0));
103 try {
104 manager_->monitor_.wait(timeout);
105 } catch (TimedOutException&) {
106 }
107 now = Util::currentTime();
108 }
109
110 if (manager_->state_ == TimerManager::STARTED) {
111 for (task_iterator ix = manager_->taskMap_.begin(); ix != expiredTaskEnd; ix++) {
112 shared_ptr<TimerManager::Task> task = ix->second;
113 expiredTasks.insert(task);
114 task->it_ = manager_->taskMap_.end();
115 if (task->state_ == TimerManager::Task::WAITING) {
116 task->state_ = TimerManager::Task::EXECUTING;
117 }
118 manager_->taskCount_--;
119 }
120 manager_->taskMap_.erase(manager_->taskMap_.begin(), expiredTaskEnd);
121 }
122 }
123
124 for (std::set<shared_ptr<Task> >::iterator ix = expiredTasks.begin();
125 ix != expiredTasks.end();
126 ++ix) {
127 (*ix)->run();
128 }
129
130 } while (manager_->state_ == TimerManager::STARTED);
131
132 {
133 Synchronized s(manager_->monitor_);
134 if (manager_->state_ == TimerManager::STOPPING) {
135 manager_->state_ = TimerManager::STOPPED;
136 manager_->monitor_.notify();
137 }
138 }
139 return;
140 }
141
142 private:
143 TimerManager* manager_;
144 friend class TimerManager;
145 };
146
147 #if defined(_MSC_VER)
148 #pragma warning(push)
149 #pragma warning(disable : 4355) // 'this' used in base member initializer list
150 #endif
151
TimerManager()152 TimerManager::TimerManager()
153 : taskCount_(0),
154 state_(TimerManager::UNINITIALIZED),
155 dispatcher_(shared_ptr<Dispatcher>(new Dispatcher(this))) {
156 }
157
158 #if defined(_MSC_VER)
159 #pragma warning(pop)
160 #endif
161
~TimerManager()162 TimerManager::~TimerManager() {
163
164 // If we haven't been explicitly stopped, do so now. We don't need to grab
165 // the monitor here, since stop already takes care of reentrancy.
166
167 if (state_ != STOPPED) {
168 try {
169 stop();
170 } catch (...) {
171 // We're really hosed.
172 }
173 }
174 }
175
start()176 void TimerManager::start() {
177 bool doStart = false;
178 {
179 Synchronized s(monitor_);
180 if (!threadFactory_) {
181 throw InvalidArgumentException();
182 }
183 if (state_ == TimerManager::UNINITIALIZED) {
184 state_ = TimerManager::STARTING;
185 doStart = true;
186 }
187 }
188
189 if (doStart) {
190 dispatcherThread_ = threadFactory_->newThread(dispatcher_);
191 dispatcherThread_->start();
192 }
193
194 {
195 Synchronized s(monitor_);
196 while (state_ == TimerManager::STARTING) {
197 monitor_.wait();
198 }
199 assert(state_ != TimerManager::STARTING);
200 }
201 }
202
stop()203 void TimerManager::stop() {
204 bool doStop = false;
205 {
206 Synchronized s(monitor_);
207 if (state_ == TimerManager::UNINITIALIZED) {
208 state_ = TimerManager::STOPPED;
209 } else if (state_ != STOPPING && state_ != STOPPED) {
210 doStop = true;
211 state_ = STOPPING;
212 monitor_.notifyAll();
213 }
214 while (state_ != STOPPED) {
215 monitor_.wait();
216 }
217 }
218
219 if (doStop) {
220 // Clean up any outstanding tasks
221 taskMap_.clear();
222
223 // Remove dispatcher's reference to us.
224 dispatcher_->manager_ = NULL;
225 }
226 }
227
threadFactory() const228 shared_ptr<const ThreadFactory> TimerManager::threadFactory() const {
229 Synchronized s(monitor_);
230 return threadFactory_;
231 }
232
threadFactory(shared_ptr<const ThreadFactory> value)233 void TimerManager::threadFactory(shared_ptr<const ThreadFactory> value) {
234 Synchronized s(monitor_);
235 threadFactory_ = value;
236 }
237
taskCount() const238 size_t TimerManager::taskCount() const {
239 return taskCount_;
240 }
241
add(shared_ptr<Runnable> task,int64_t timeout)242 TimerManager::Timer TimerManager::add(shared_ptr<Runnable> task, int64_t timeout) {
243 int64_t now = Util::currentTime();
244 timeout += now;
245
246 {
247 Synchronized s(monitor_);
248 if (state_ != TimerManager::STARTED) {
249 throw IllegalStateException();
250 }
251
252 // If the task map is empty, we will kick the dispatcher for sure. Otherwise, we kick him
253 // if the expiration time is shorter than the current value. Need to test before we insert,
254 // because the new task might insert at the front.
255 bool notifyRequired = (taskCount_ == 0) ? true : timeout < taskMap_.begin()->first;
256
257 shared_ptr<Task> timer(new Task(task));
258 taskCount_++;
259 timer->it_ = taskMap_.insert(std::pair<int64_t, shared_ptr<Task> >(timeout, timer));
260
261 // If the task map was empty, or if we have an expiration that is earlier
262 // than any previously seen, kick the dispatcher so it can update its
263 // timeout
264 if (notifyRequired) {
265 monitor_.notify();
266 }
267
268 return timer;
269 }
270 }
271
add(shared_ptr<Runnable> task,const struct THRIFT_TIMESPEC & value)272 TimerManager::Timer TimerManager::add(shared_ptr<Runnable> task,
273 const struct THRIFT_TIMESPEC& value) {
274
275 int64_t expiration;
276 Util::toMilliseconds(expiration, value);
277
278 int64_t now = Util::currentTime();
279
280 if (expiration < now) {
281 throw InvalidArgumentException();
282 }
283
284 return add(task, expiration - now);
285 }
286
add(shared_ptr<Runnable> task,const struct timeval & value)287 TimerManager::Timer TimerManager::add(shared_ptr<Runnable> task,
288 const struct timeval& value) {
289
290 int64_t expiration;
291 Util::toMilliseconds(expiration, value);
292
293 int64_t now = Util::currentTime();
294
295 if (expiration < now) {
296 throw InvalidArgumentException();
297 }
298
299 return add(task, expiration - now);
300 }
301
remove(shared_ptr<Runnable> task)302 void TimerManager::remove(shared_ptr<Runnable> task) {
303 Synchronized s(monitor_);
304 if (state_ != TimerManager::STARTED) {
305 throw IllegalStateException();
306 }
307 bool found = false;
308 for (task_iterator ix = taskMap_.begin(); ix != taskMap_.end();) {
309 if (*ix->second == task) {
310 found = true;
311 taskCount_--;
312 taskMap_.erase(ix++);
313 } else {
314 ++ix;
315 }
316 }
317 if (!found) {
318 throw NoSuchTaskException();
319 }
320 }
321
remove(Timer handle)322 void TimerManager::remove(Timer handle) {
323 Synchronized s(monitor_);
324 if (state_ != TimerManager::STARTED) {
325 throw IllegalStateException();
326 }
327
328 shared_ptr<Task> task = handle.lock();
329 if (!task) {
330 throw NoSuchTaskException();
331 }
332
333 if (task->it_ == taskMap_.end()) {
334 // Task is being executed
335 throw UncancellableTaskException();
336 }
337
338 taskMap_.erase(task->it_);
339 taskCount_--;
340 }
341
state() const342 TimerManager::STATE TimerManager::state() const {
343 return state_;
344 }
345 }
346 }
347 } // apache::thrift::concurrency
348