1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <thrift/lib/cpp/concurrency/TimerManager.h>
18 
19 #include <assert.h>
20 #include <iostream>
21 #include <set>
22 
23 #include <thrift/lib/cpp/concurrency/Exception.h>
24 #include <thrift/lib/cpp/concurrency/Util.h>
25 
26 namespace apache {
27 namespace thrift {
28 namespace concurrency {
29 
30 using std::shared_ptr;
31 
32 /**
33  * TimerManager class
34  *
35  * @version $Id:$
36  */
37 class TimerManager::Task : public Runnable {
38  public:
39   enum STATE {
40     WAITING,
41     EXECUTING,
42     CANCELLED,
43     COMPLETE,
44   };
45 
Task(shared_ptr<Runnable> runnable)46   Task(shared_ptr<Runnable> runnable) : runnable_(runnable), state_(WAITING) {}
47 
~Task()48   ~Task() override {}
49 
run()50   void run() override {
51     if (state_ == EXECUTING) {
52       runnable_->run();
53       state_ = COMPLETE;
54     }
55   }
56 
57  private:
58   shared_ptr<Runnable> runnable_;
59   friend class TimerManager::Dispatcher;
60   STATE state_;
61 };
62 
63 class TimerManager::Dispatcher : public Runnable {
64  public:
Dispatcher(TimerManager * manager)65   Dispatcher(TimerManager* manager) : manager_(manager) {}
66 
~Dispatcher()67   ~Dispatcher() override {}
68 
69   /**
70    * Dispatcher entry point
71    *
72    * As long as dispatcher thread is running, pull tasks off the task taskMap_
73    * and execute.
74    */
run()75   void run() override {
76     {
77       std::unique_lock<std::mutex> l(manager_->mutex_);
78       if (manager_->state_ == TimerManager::STARTING) {
79         manager_->state_ = TimerManager::STARTED;
80         manager_->cond_.notify_all();
81       }
82     }
83 
84     do {
85       std::set<shared_ptr<TimerManager::Task>> expiredTasks;
86       {
87         std::unique_lock<std::mutex> l(manager_->mutex_);
88         task_iterator expiredTaskEnd;
89         int64_t now = Util::currentTime();
90         while (manager_->state_ == TimerManager::STARTED &&
91                (expiredTaskEnd = manager_->taskMap_.upper_bound(now)) ==
92                    manager_->taskMap_.begin()) {
93           int64_t timeout = 0LL;
94           if (!manager_->taskMap_.empty()) {
95             timeout = manager_->taskMap_.begin()->first - now;
96           }
97           auto count = manager_->taskCount_.load(std::memory_order_relaxed);
98           (void)count;
99           assert((timeout != 0 && count > 0) || (timeout == 0 && count == 0));
100           if (timeout == 0) {
101             manager_->cond_.wait(l);
102           } else {
103             manager_->cond_.wait_for(l, std::chrono::milliseconds(timeout));
104           }
105           now = Util::currentTime();
106         }
107 
108         if (manager_->state_ == TimerManager::STARTED) {
109           for (task_iterator ix = manager_->taskMap_.begin();
110                ix != expiredTaskEnd;
111                ix++) {
112             shared_ptr<TimerManager::Task> task = ix->second;
113             expiredTasks.insert(task);
114             if (task->state_ == TimerManager::Task::WAITING) {
115               task->state_ = TimerManager::Task::EXECUTING;
116             }
117             auto count = manager_->taskCount_.load(std::memory_order_relaxed);
118             manager_->taskCount_.store(count - 1, std::memory_order_relaxed);
119           }
120           manager_->taskMap_.erase(manager_->taskMap_.begin(), expiredTaskEnd);
121         }
122       }
123 
124       for (std::set<shared_ptr<Task>>::iterator ix = expiredTasks.begin();
125            ix != expiredTasks.end();
126            ix++) {
127         (*ix)->run();
128       }
129 
130     } while (manager_->state_ == TimerManager::STARTED);
131 
132     {
133       std::unique_lock<std::mutex> l(manager_->mutex_);
134       if (manager_->state_ == TimerManager::STOPPING) {
135         manager_->state_ = TimerManager::STOPPED;
136         manager_->cond_.notify_one();
137       }
138     }
139     return;
140   }
141 
142  private:
143   TimerManager* manager_;
144   friend class TimerManager;
145 };
146 
TimerManager()147 TimerManager::TimerManager()
148     : taskCount_(0),
149       state_(TimerManager::UNINITIALIZED),
150       dispatcher_(shared_ptr<Dispatcher>(new Dispatcher(this))) {}
151 
~TimerManager()152 TimerManager::~TimerManager() {
153   // If we haven't been explicitly stopped, do so now.  We don't need to grab
154   // the monitor here, since stop already takes care of reentrancy.
155 
156   if (state_ != STOPPED) {
157     stop();
158   }
159 }
160 
start()161 void TimerManager::start() {
162   bool doStart = false;
163   {
164     std::unique_lock<std::mutex> l(mutex_);
165     if (threadFactory_ == nullptr) {
166       throw InvalidArgumentException();
167     }
168     if (state_ == TimerManager::UNINITIALIZED) {
169       state_ = TimerManager::STARTING;
170       doStart = true;
171     }
172   }
173 
174   if (doStart) {
175     dispatcherThread_ = threadFactory_->newThread(dispatcher_);
176     dispatcherThread_->start();
177   }
178 
179   {
180     std::unique_lock<std::mutex> l(mutex_);
181     while (state_ == TimerManager::STARTING) {
182       cond_.wait(l);
183     }
184     assert(state_ != TimerManager::STARTING);
185   }
186 }
187 
stop()188 void TimerManager::stop() {
189   bool doStop = false;
190   {
191     std::unique_lock<std::mutex> l(mutex_);
192     if (state_ == TimerManager::UNINITIALIZED) {
193       state_ = TimerManager::STOPPED;
194     } else if (state_ != STOPPING && state_ != STOPPED) {
195       doStop = true;
196       state_ = STOPPING;
197       cond_.notify_all();
198     }
199     while (state_ != STOPPED) {
200       cond_.wait(l);
201     }
202   }
203 
204   if (doStop) {
205     // Clean up any outstanding tasks
206     taskMap_.clear();
207 
208     // Remove dispatcher's reference to us.
209     dispatcher_->manager_ = nullptr;
210   }
211 }
212 
threadFactory() const213 shared_ptr<const ThreadFactory> TimerManager::threadFactory() const {
214   std::unique_lock<std::mutex> l(mutex_);
215   return threadFactory_;
216 }
217 
threadFactory(shared_ptr<const ThreadFactory> value)218 void TimerManager::threadFactory(shared_ptr<const ThreadFactory> value) {
219   std::unique_lock<std::mutex> l(mutex_);
220   threadFactory_ = value;
221 }
222 
taskCount() const223 size_t TimerManager::taskCount() const {
224   return taskCount_.load(std::memory_order_relaxed);
225 }
226 
add(shared_ptr<Runnable> task,int64_t timeout)227 void TimerManager::add(shared_ptr<Runnable> task, int64_t timeout) {
228   int64_t now = Util::currentTime();
229   timeout += now;
230 
231   {
232     std::unique_lock<std::mutex> l(mutex_);
233     if (state_ != TimerManager::STARTED) {
234       throw IllegalStateException();
235     }
236 
237     // If the task map was empty, or if we have an expiration that is earlier
238     // than any previously seen, kick the dispatcher so it can update its
239     // timeout. Do this before inserting to limit comparisons to current tasks
240     auto const count = taskCount_.load(std::memory_order_relaxed);
241     if (count == 0 || timeout < taskMap_.begin()->first) {
242       cond_.notify_one();
243     }
244 
245     taskCount_.store(count + 1, std::memory_order_relaxed);
246     taskMap_.insert({timeout, std::make_shared<Task>(std::move(task))});
247   }
248 }
249 
add(shared_ptr<Runnable> task,const struct timespec & value)250 void TimerManager::add(
251     shared_ptr<Runnable> task, const struct timespec& value) {
252   int64_t expiration;
253   Util::toMilliseconds(expiration, value);
254 
255   int64_t now = Util::currentTime();
256 
257   if (expiration < now) {
258     throw InvalidArgumentException();
259   }
260 
261   add(task, expiration - now);
262 }
263 
remove(shared_ptr<Runnable>)264 void TimerManager::remove(shared_ptr<Runnable> /*task*/) {
265   std::unique_lock<std::mutex> l(mutex_);
266   if (state_ != TimerManager::STARTED) {
267     throw IllegalStateException();
268   }
269 }
270 
state() const271 TimerManager::STATE TimerManager::state() const {
272   return state_;
273 }
274 
275 } // namespace concurrency
276 } // namespace thrift
277 } // namespace apache
278