1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #include <thrift/concurrency/TimerManager.h>
21 #include <thrift/concurrency/Exception.h>
22 #include <thrift/concurrency/Util.h>
23 
24 #include <assert.h>
25 #include <iostream>
26 #include <set>
27 
28 namespace apache {
29 namespace thrift {
30 namespace concurrency {
31 
32 using stdcxx::shared_ptr;
33 using stdcxx::weak_ptr;
34 
35 /**
36  * TimerManager class
37  *
38  * @version $Id:$
39  */
40 class TimerManager::Task : public Runnable {
41 
42 public:
43   enum STATE { WAITING, EXECUTING, CANCELLED, COMPLETE };
44 
Task(shared_ptr<Runnable> runnable)45   Task(shared_ptr<Runnable> runnable) : runnable_(runnable), state_(WAITING) {}
46 
~Task()47   ~Task() {}
48 
run()49   void run() {
50     if (state_ == EXECUTING) {
51       runnable_->run();
52       state_ = COMPLETE;
53     }
54   }
55 
operator ==(const shared_ptr<Runnable> & runnable) const56   bool operator==(const shared_ptr<Runnable> & runnable) const { return runnable_ == runnable; }
57 
58   task_iterator it_;
59 
60 private:
61   shared_ptr<Runnable> runnable_;
62   friend class TimerManager::Dispatcher;
63   STATE state_;
64 };
65 
66 class TimerManager::Dispatcher : public Runnable {
67 
68 public:
Dispatcher(TimerManager * manager)69   Dispatcher(TimerManager* manager) : manager_(manager) {}
70 
~Dispatcher()71   ~Dispatcher() {}
72 
73   /**
74    * Dispatcher entry point
75    *
76    * As long as dispatcher thread is running, pull tasks off the task taskMap_
77    * and execute.
78    */
run()79   void run() {
80     {
81       Synchronized s(manager_->monitor_);
82       if (manager_->state_ == TimerManager::STARTING) {
83         manager_->state_ = TimerManager::STARTED;
84         manager_->monitor_.notifyAll();
85       }
86     }
87 
88     do {
89       std::set<shared_ptr<TimerManager::Task> > expiredTasks;
90       {
91         Synchronized s(manager_->monitor_);
92         task_iterator expiredTaskEnd;
93         int64_t now = Util::currentTime();
94         while (manager_->state_ == TimerManager::STARTED
95                && (expiredTaskEnd = manager_->taskMap_.upper_bound(now))
96                   == manager_->taskMap_.begin()) {
97           int64_t timeout = 0LL;
98           if (!manager_->taskMap_.empty()) {
99             timeout = manager_->taskMap_.begin()->first - now;
100           }
101           assert((timeout != 0 && manager_->taskCount_ > 0)
102                  || (timeout == 0 && manager_->taskCount_ == 0));
103           try {
104             manager_->monitor_.wait(timeout);
105           } catch (TimedOutException&) {
106           }
107           now = Util::currentTime();
108         }
109 
110         if (manager_->state_ == TimerManager::STARTED) {
111           for (task_iterator ix = manager_->taskMap_.begin(); ix != expiredTaskEnd; ix++) {
112             shared_ptr<TimerManager::Task> task = ix->second;
113             expiredTasks.insert(task);
114             task->it_ = manager_->taskMap_.end();
115             if (task->state_ == TimerManager::Task::WAITING) {
116               task->state_ = TimerManager::Task::EXECUTING;
117             }
118             manager_->taskCount_--;
119           }
120           manager_->taskMap_.erase(manager_->taskMap_.begin(), expiredTaskEnd);
121         }
122       }
123 
124       for (std::set<shared_ptr<Task> >::iterator ix = expiredTasks.begin();
125            ix != expiredTasks.end();
126            ++ix) {
127         (*ix)->run();
128       }
129 
130     } while (manager_->state_ == TimerManager::STARTED);
131 
132     {
133       Synchronized s(manager_->monitor_);
134       if (manager_->state_ == TimerManager::STOPPING) {
135         manager_->state_ = TimerManager::STOPPED;
136         manager_->monitor_.notify();
137       }
138     }
139     return;
140   }
141 
142 private:
143   TimerManager* manager_;
144   friend class TimerManager;
145 };
146 
147 #if defined(_MSC_VER)
148 #pragma warning(push)
149 #pragma warning(disable : 4355) // 'this' used in base member initializer list
150 #endif
151 
TimerManager()152 TimerManager::TimerManager()
153   : taskCount_(0),
154     state_(TimerManager::UNINITIALIZED),
155     dispatcher_(shared_ptr<Dispatcher>(new Dispatcher(this))) {
156 }
157 
158 #if defined(_MSC_VER)
159 #pragma warning(pop)
160 #endif
161 
~TimerManager()162 TimerManager::~TimerManager() {
163 
164   // If we haven't been explicitly stopped, do so now.  We don't need to grab
165   // the monitor here, since stop already takes care of reentrancy.
166 
167   if (state_ != STOPPED) {
168     try {
169       stop();
170     } catch (...) {
171       // We're really hosed.
172     }
173   }
174 }
175 
start()176 void TimerManager::start() {
177   bool doStart = false;
178   {
179     Synchronized s(monitor_);
180     if (!threadFactory_) {
181       throw InvalidArgumentException();
182     }
183     if (state_ == TimerManager::UNINITIALIZED) {
184       state_ = TimerManager::STARTING;
185       doStart = true;
186     }
187   }
188 
189   if (doStart) {
190     dispatcherThread_ = threadFactory_->newThread(dispatcher_);
191     dispatcherThread_->start();
192   }
193 
194   {
195     Synchronized s(monitor_);
196     while (state_ == TimerManager::STARTING) {
197       monitor_.wait();
198     }
199     assert(state_ != TimerManager::STARTING);
200   }
201 }
202 
stop()203 void TimerManager::stop() {
204   bool doStop = false;
205   {
206     Synchronized s(monitor_);
207     if (state_ == TimerManager::UNINITIALIZED) {
208       state_ = TimerManager::STOPPED;
209     } else if (state_ != STOPPING && state_ != STOPPED) {
210       doStop = true;
211       state_ = STOPPING;
212       monitor_.notifyAll();
213     }
214     while (state_ != STOPPED) {
215       monitor_.wait();
216     }
217   }
218 
219   if (doStop) {
220     // Clean up any outstanding tasks
221     taskMap_.clear();
222 
223     // Remove dispatcher's reference to us.
224     dispatcher_->manager_ = NULL;
225   }
226 }
227 
threadFactory() const228 shared_ptr<const ThreadFactory> TimerManager::threadFactory() const {
229   Synchronized s(monitor_);
230   return threadFactory_;
231 }
232 
threadFactory(shared_ptr<const ThreadFactory> value)233 void TimerManager::threadFactory(shared_ptr<const ThreadFactory> value) {
234   Synchronized s(monitor_);
235   threadFactory_ = value;
236 }
237 
taskCount() const238 size_t TimerManager::taskCount() const {
239   return taskCount_;
240 }
241 
add(shared_ptr<Runnable> task,int64_t timeout)242 TimerManager::Timer TimerManager::add(shared_ptr<Runnable> task, int64_t timeout) {
243   int64_t now = Util::currentTime();
244   timeout += now;
245 
246   {
247     Synchronized s(monitor_);
248     if (state_ != TimerManager::STARTED) {
249       throw IllegalStateException();
250     }
251 
252     // If the task map is empty, we will kick the dispatcher for sure. Otherwise, we kick him
253     // if the expiration time is shorter than the current value. Need to test before we insert,
254     // because the new task might insert at the front.
255     bool notifyRequired = (taskCount_ == 0) ? true : timeout < taskMap_.begin()->first;
256 
257     shared_ptr<Task> timer(new Task(task));
258     taskCount_++;
259     timer->it_ = taskMap_.insert(std::pair<int64_t, shared_ptr<Task> >(timeout, timer));
260 
261     // If the task map was empty, or if we have an expiration that is earlier
262     // than any previously seen, kick the dispatcher so it can update its
263     // timeout
264     if (notifyRequired) {
265       monitor_.notify();
266     }
267 
268     return timer;
269   }
270 }
271 
add(shared_ptr<Runnable> task,const struct THRIFT_TIMESPEC & value)272 TimerManager::Timer TimerManager::add(shared_ptr<Runnable> task,
273     const struct THRIFT_TIMESPEC& value) {
274 
275   int64_t expiration;
276   Util::toMilliseconds(expiration, value);
277 
278   int64_t now = Util::currentTime();
279 
280   if (expiration < now) {
281     throw InvalidArgumentException();
282   }
283 
284   return add(task, expiration - now);
285 }
286 
add(shared_ptr<Runnable> task,const struct timeval & value)287 TimerManager::Timer TimerManager::add(shared_ptr<Runnable> task,
288     const struct timeval& value) {
289 
290   int64_t expiration;
291   Util::toMilliseconds(expiration, value);
292 
293   int64_t now = Util::currentTime();
294 
295   if (expiration < now) {
296     throw InvalidArgumentException();
297   }
298 
299   return add(task, expiration - now);
300 }
301 
remove(shared_ptr<Runnable> task)302 void TimerManager::remove(shared_ptr<Runnable> task) {
303   Synchronized s(monitor_);
304   if (state_ != TimerManager::STARTED) {
305     throw IllegalStateException();
306   }
307   bool found = false;
308   for (task_iterator ix = taskMap_.begin(); ix != taskMap_.end();) {
309     if (*ix->second == task) {
310       found = true;
311       taskCount_--;
312       taskMap_.erase(ix++);
313     } else {
314       ++ix;
315     }
316   }
317   if (!found) {
318     throw NoSuchTaskException();
319   }
320 }
321 
remove(Timer handle)322 void TimerManager::remove(Timer handle) {
323   Synchronized s(monitor_);
324   if (state_ != TimerManager::STARTED) {
325     throw IllegalStateException();
326   }
327 
328   shared_ptr<Task> task = handle.lock();
329   if (!task) {
330     throw NoSuchTaskException();
331   }
332 
333   if (task->it_ == taskMap_.end()) {
334     // Task is being executed
335     throw UncancellableTaskException();
336   }
337 
338   taskMap_.erase(task->it_);
339   taskCount_--;
340 }
341 
state() const342 TimerManager::STATE TimerManager::state() const {
343   return state_;
344 }
345 }
346 }
347 } // apache::thrift::concurrency
348