1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #include <thrift/thrift-config.h>
21 
22 #include <thrift/concurrency/ThreadManager.h>
23 #include <thrift/concurrency/Exception.h>
24 #include <thrift/concurrency/Monitor.h>
25 
26 #include <memory>
27 
28 #include <stdexcept>
29 #include <deque>
30 #include <set>
31 
32 namespace apache {
33 namespace thrift {
34 namespace concurrency {
35 
36 using std::shared_ptr;
37 using std::unique_ptr;
38 using std::dynamic_pointer_cast;
39 
40 /**
41  * ThreadManager class
42  *
43  * This class manages a pool of threads. It uses a ThreadFactory to create
44  * threads.  It never actually creates or destroys worker threads, rather
45  * it maintains statistics on number of idle threads, number of active threads,
46  * task backlog, and average wait and service times.
47  *
48  * There are three different monitors used for signaling different conditions
49  * however they all share the same mutex_.
50  *
51  * @version $Id:$
52  */
53 class ThreadManager::Impl : public ThreadManager {
54 
55 public:
Impl()56   Impl()
57     : workerCount_(0),
58       workerMaxCount_(0),
59       idleCount_(0),
60       pendingTaskCountMax_(0),
61       expiredCount_(0),
62       state_(ThreadManager::UNINITIALIZED),
63       monitor_(&mutex_),
64       maxMonitor_(&mutex_),
65       workerMonitor_(&mutex_) {}
66 
~Impl()67   ~Impl() override { stop(); }
68 
69   void start() override;
70   void stop() override;
71 
state() const72   ThreadManager::STATE state() const override { return state_; }
73 
threadFactory() const74   shared_ptr<ThreadFactory> threadFactory() const override {
75     Guard g(mutex_);
76     return threadFactory_;
77   }
78 
threadFactory(shared_ptr<ThreadFactory> value)79   void threadFactory(shared_ptr<ThreadFactory> value) override {
80     Guard g(mutex_);
81     if (threadFactory_ && threadFactory_->isDetached() != value->isDetached()) {
82       throw InvalidArgumentException();
83     }
84     threadFactory_ = value;
85   }
86 
87   void addWorker(size_t value) override;
88 
89   void removeWorker(size_t value) override;
90 
idleWorkerCount() const91   size_t idleWorkerCount() const override { return idleCount_; }
92 
workerCount() const93   size_t workerCount() const override {
94     Guard g(mutex_);
95     return workerCount_;
96   }
97 
pendingTaskCount() const98   size_t pendingTaskCount() const override {
99     Guard g(mutex_);
100     return tasks_.size();
101   }
102 
totalTaskCount() const103   size_t totalTaskCount() const override {
104     Guard g(mutex_);
105     return tasks_.size() + workerCount_ - idleCount_;
106   }
107 
pendingTaskCountMax() const108   size_t pendingTaskCountMax() const override {
109     Guard g(mutex_);
110     return pendingTaskCountMax_;
111   }
112 
expiredTaskCount() const113   size_t expiredTaskCount() const override {
114     Guard g(mutex_);
115     return expiredCount_;
116   }
117 
pendingTaskCountMax(const size_t value)118   void pendingTaskCountMax(const size_t value) {
119     Guard g(mutex_);
120     pendingTaskCountMax_ = value;
121   }
122 
123   void add(shared_ptr<Runnable> value, int64_t timeout, int64_t expiration) override;
124 
125   void remove(shared_ptr<Runnable> task) override;
126 
127   shared_ptr<Runnable> removeNextPending() override;
128 
removeExpiredTasks()129   void removeExpiredTasks() override {
130     removeExpired(false);
131   }
132 
133   void setExpireCallback(ExpireCallback expireCallback) override;
134 
135 private:
136   /**
137    * Remove one or more expired tasks.
138    * \param[in]  justOne  if true, try to remove just one task and return
139    */
140   void removeExpired(bool justOne);
141 
142   /**
143    * \returns whether it is acceptable to block, depending on the current thread id
144    */
145   bool canSleep() const;
146 
147   /**
148    * Lowers the maximum worker count and blocks until enough worker threads complete
149    * to get to the new maximum worker limit.  The caller is responsible for acquiring
150    * a lock on the class mutex_.
151    */
152   void removeWorkersUnderLock(size_t value);
153 
154   size_t workerCount_;
155   size_t workerMaxCount_;
156   size_t idleCount_;
157   size_t pendingTaskCountMax_;
158   size_t expiredCount_;
159   ExpireCallback expireCallback_;
160 
161   ThreadManager::STATE state_;
162   shared_ptr<ThreadFactory> threadFactory_;
163 
164   friend class ThreadManager::Task;
165   typedef std::deque<shared_ptr<Task> > TaskQueue;
166   TaskQueue tasks_;
167   Mutex mutex_;
168   Monitor monitor_;
169   Monitor maxMonitor_;
170   Monitor workerMonitor_;       // used to synchronize changes in worker count
171 
172   friend class ThreadManager::Worker;
173   std::set<shared_ptr<Thread> > workers_;
174   std::set<shared_ptr<Thread> > deadWorkers_;
175   std::map<const Thread::id_t, shared_ptr<Thread> > idMap_;
176 };
177 
178 class ThreadManager::Task : public Runnable {
179 
180 public:
181   enum STATE { WAITING, EXECUTING, TIMEDOUT, COMPLETE };
182 
Task(shared_ptr<Runnable> runnable,uint64_t expiration=0ULL)183   Task(shared_ptr<Runnable> runnable, uint64_t expiration = 0ULL)
184     : runnable_(runnable),
185       state_(WAITING) {
186         if (expiration != 0ULL) {
187           expireTime_.reset(new std::chrono::steady_clock::time_point(std::chrono::steady_clock::now() + std::chrono::milliseconds(expiration)));
188         }
189     }
190 
191   ~Task() override = default;
192 
run()193   void run() override {
194     if (state_ == EXECUTING) {
195       runnable_->run();
196       state_ = COMPLETE;
197     }
198   }
199 
getRunnable()200   shared_ptr<Runnable> getRunnable() { return runnable_; }
201 
getExpireTime() const202   const unique_ptr<std::chrono::steady_clock::time_point> & getExpireTime() const { return expireTime_; }
203 
204 private:
205   shared_ptr<Runnable> runnable_;
206   friend class ThreadManager::Worker;
207   STATE state_;
208   unique_ptr<std::chrono::steady_clock::time_point> expireTime_;
209 };
210 
211 class ThreadManager::Worker : public Runnable {
212   enum STATE { UNINITIALIZED, STARTING, STARTED, STOPPING, STOPPED };
213 
214 public:
Worker(ThreadManager::Impl * manager)215   Worker(ThreadManager::Impl* manager) : manager_(manager), state_(UNINITIALIZED) {}
216 
217   ~Worker() override = default;
218 
219 private:
isActive() const220   bool isActive() const {
221     return (manager_->workerCount_ <= manager_->workerMaxCount_)
222            || (manager_->state_ == JOINING && !manager_->tasks_.empty());
223   }
224 
225 public:
226   /**
227    * Worker entry point
228    *
229    * As long as worker thread is running, pull tasks off the task queue and
230    * execute.
231    */
run()232   void run() override {
233     Guard g(manager_->mutex_);
234 
235     /**
236      * This method has three parts; one is to check for and account for
237      * admitting a task which happens under a lock.  Then the lock is released
238      * and the task itself is executed.  Finally we do some accounting
239      * under lock again when the task completes.
240      */
241 
242     /**
243      * Admitting
244      */
245 
246     /**
247      * Increment worker semaphore and notify manager if worker count reached
248      * desired max
249      */
250     bool active = manager_->workerCount_ < manager_->workerMaxCount_;
251     if (active) {
252       if (++manager_->workerCount_ == manager_->workerMaxCount_) {
253         manager_->workerMonitor_.notify();
254       }
255     }
256 
257     while (active) {
258       /**
259         * While holding manager monitor block for non-empty task queue (Also
260         * check that the thread hasn't been requested to stop). Once the queue
261         * is non-empty, dequeue a task, release monitor, and execute. If the
262         * worker max count has been decremented such that we exceed it, mark
263         * ourself inactive, decrement the worker count and notify the manager
264         * (technically we're notifying the next blocked thread but eventually
265         * the manager will see it.
266         */
267       active = isActive();
268 
269       while (active && manager_->tasks_.empty()) {
270         manager_->idleCount_++;
271         manager_->monitor_.wait();
272         active = isActive();
273         manager_->idleCount_--;
274       }
275 
276       shared_ptr<ThreadManager::Task> task;
277 
278       if (active) {
279         if (!manager_->tasks_.empty()) {
280           task = manager_->tasks_.front();
281           manager_->tasks_.pop_front();
282           if (task->state_ == ThreadManager::Task::WAITING) {
283             // If the state is changed to anything other than EXECUTING or TIMEDOUT here
284             // then the execution loop needs to be changed below.
285             task->state_ =
286                 (task->getExpireTime() && *(task->getExpireTime()) < std::chrono::steady_clock::now()) ?
287                     ThreadManager::Task::TIMEDOUT :
288                     ThreadManager::Task::EXECUTING;
289           }
290         }
291 
292         /* If we have a pending task max and we just dropped below it, wakeup any
293             thread that might be blocked on add. */
294         if (manager_->pendingTaskCountMax_ != 0
295             && manager_->tasks_.size() <= manager_->pendingTaskCountMax_ - 1) {
296           manager_->maxMonitor_.notify();
297         }
298       }
299 
300       /**
301        * Execution - not holding a lock
302        */
303       if (task) {
304         if (task->state_ == ThreadManager::Task::EXECUTING) {
305 
306           // Release the lock so we can run the task without blocking the thread manager
307           manager_->mutex_.unlock();
308 
309           try {
310             task->run();
311           } catch (const std::exception& e) {
312             GlobalOutput.printf("[ERROR] task->run() raised an exception: %s", e.what());
313           } catch (...) {
314             GlobalOutput.printf("[ERROR] task->run() raised an unknown exception");
315           }
316 
317           // Re-acquire the lock to proceed in the thread manager
318           manager_->mutex_.lock();
319 
320         } else if (manager_->expireCallback_) {
321           // The only other state the task could have been in is TIMEDOUT (see above)
322           manager_->mutex_.unlock();
323           manager_->expireCallback_(task->getRunnable());
324           manager_->mutex_.lock();
325           manager_->expiredCount_++;
326         }
327       }
328     }
329 
330     /**
331      * Final accounting for the worker thread that is done working
332      */
333     manager_->deadWorkers_.insert(this->thread());
334     if (--manager_->workerCount_ == manager_->workerMaxCount_) {
335       manager_->workerMonitor_.notify();
336     }
337   }
338 
339 private:
340   ThreadManager::Impl* manager_;
341   friend class ThreadManager::Impl;
342   STATE state_;
343 };
344 
addWorker(size_t value)345 void ThreadManager::Impl::addWorker(size_t value) {
346   std::set<shared_ptr<Thread> > newThreads;
347   for (size_t ix = 0; ix < value; ix++) {
348     shared_ptr<ThreadManager::Worker> worker
349         = std::make_shared<ThreadManager::Worker>(this);
350     newThreads.insert(threadFactory_->newThread(worker));
351   }
352 
353   Guard g(mutex_);
354   workerMaxCount_ += value;
355   workers_.insert(newThreads.begin(), newThreads.end());
356 
357   for (const auto & newThread : newThreads) {
358     shared_ptr<ThreadManager::Worker> worker
359         = dynamic_pointer_cast<ThreadManager::Worker, Runnable>(newThread->runnable());
360     worker->state_ = ThreadManager::Worker::STARTING;
361     newThread->start();
362     idMap_.insert(std::pair<const Thread::id_t, shared_ptr<Thread> >(newThread->getId(), newThread));
363   }
364 
365   while (workerCount_ != workerMaxCount_) {
366     workerMonitor_.wait();
367   }
368 }
369 
start()370 void ThreadManager::Impl::start() {
371   Guard g(mutex_);
372   if (state_ == ThreadManager::STOPPED) {
373     return;
374   }
375 
376   if (state_ == ThreadManager::UNINITIALIZED) {
377     if (!threadFactory_) {
378       throw InvalidArgumentException();
379     }
380     state_ = ThreadManager::STARTED;
381     monitor_.notifyAll();
382   }
383 
384   while (state_ == STARTING) {
385     monitor_.wait();
386   }
387 }
388 
stop()389 void ThreadManager::Impl::stop() {
390   Guard g(mutex_);
391   bool doStop = false;
392 
393   if (state_ != ThreadManager::STOPPING && state_ != ThreadManager::JOINING
394       && state_ != ThreadManager::STOPPED) {
395     doStop = true;
396     state_ = ThreadManager::JOINING;
397   }
398 
399   if (doStop) {
400     removeWorkersUnderLock(workerCount_);
401   }
402 
403   state_ = ThreadManager::STOPPED;
404 }
405 
removeWorker(size_t value)406 void ThreadManager::Impl::removeWorker(size_t value) {
407   Guard g(mutex_);
408   removeWorkersUnderLock(value);
409 }
410 
removeWorkersUnderLock(size_t value)411 void ThreadManager::Impl::removeWorkersUnderLock(size_t value) {
412   if (value > workerMaxCount_) {
413     throw InvalidArgumentException();
414   }
415 
416   workerMaxCount_ -= value;
417 
418   if (idleCount_ > value) {
419     // There are more idle workers than we need to remove,
420     // so notify enough of them so they can terminate.
421     for (size_t ix = 0; ix < value; ix++) {
422       monitor_.notify();
423     }
424   } else {
425     // There are as many or less idle workers than we need to remove,
426     // so just notify them all so they can terminate.
427     monitor_.notifyAll();
428   }
429 
430   while (workerCount_ != workerMaxCount_) {
431     workerMonitor_.wait();
432   }
433 
434   for (const auto & deadWorker : deadWorkers_) {
435 
436     // when used with a joinable thread factory, we join the threads as we remove them
437     if (!threadFactory_->isDetached()) {
438       deadWorker->join();
439     }
440 
441     idMap_.erase(deadWorker->getId());
442     workers_.erase(deadWorker);
443   }
444 
445   deadWorkers_.clear();
446 }
447 
canSleep() const448 bool ThreadManager::Impl::canSleep() const {
449   const Thread::id_t id = threadFactory_->getCurrentThreadId();
450   return idMap_.find(id) == idMap_.end();
451 }
452 
add(shared_ptr<Runnable> value,int64_t timeout,int64_t expiration)453 void ThreadManager::Impl::add(shared_ptr<Runnable> value, int64_t timeout, int64_t expiration) {
454   Guard g(mutex_, timeout);
455 
456   if (!g) {
457     throw TimedOutException();
458   }
459 
460   if (state_ != ThreadManager::STARTED) {
461     throw IllegalStateException(
462         "ThreadManager::Impl::add ThreadManager "
463         "not started");
464   }
465 
466   // if we're at a limit, remove an expired task to see if the limit clears
467   if (pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) {
468     removeExpired(true);
469   }
470 
471   if (pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) {
472     if (canSleep() && timeout >= 0) {
473       while (pendingTaskCountMax_ > 0 && tasks_.size() >= pendingTaskCountMax_) {
474         // This is thread safe because the mutex is shared between monitors.
475         maxMonitor_.wait(timeout);
476       }
477     } else {
478       throw TooManyPendingTasksException();
479     }
480   }
481 
482   tasks_.push_back(std::make_shared<ThreadManager::Task>(value, expiration));
483 
484   // If idle thread is available notify it, otherwise all worker threads are
485   // running and will get around to this task in time.
486   if (idleCount_ > 0) {
487     monitor_.notify();
488   }
489 }
490 
remove(shared_ptr<Runnable> task)491 void ThreadManager::Impl::remove(shared_ptr<Runnable> task) {
492   Guard g(mutex_);
493   if (state_ != ThreadManager::STARTED) {
494     throw IllegalStateException(
495         "ThreadManager::Impl::remove ThreadManager not "
496         "started");
497   }
498 
499   for (auto it = tasks_.begin(); it != tasks_.end(); ++it)
500   {
501     if ((*it)->getRunnable() == task)
502     {
503       tasks_.erase(it);
504       return;
505     }
506   }
507 }
508 
removeNextPending()509 std::shared_ptr<Runnable> ThreadManager::Impl::removeNextPending() {
510   Guard g(mutex_);
511   if (state_ != ThreadManager::STARTED) {
512     throw IllegalStateException(
513         "ThreadManager::Impl::removeNextPending "
514         "ThreadManager not started");
515   }
516 
517   if (tasks_.empty()) {
518     return std::shared_ptr<Runnable>();
519   }
520 
521   shared_ptr<ThreadManager::Task> task = tasks_.front();
522   tasks_.pop_front();
523 
524   return task->getRunnable();
525 }
526 
removeExpired(bool justOne)527 void ThreadManager::Impl::removeExpired(bool justOne) {
528   // this is always called under a lock
529   if (tasks_.empty()) {
530     return;
531   }
532   auto now = std::chrono::steady_clock::now();
533 
534   for (auto it = tasks_.begin(); it != tasks_.end(); )
535   {
536     if ((*it)->getExpireTime() && *((*it)->getExpireTime()) < now) {
537       if (expireCallback_) {
538         expireCallback_((*it)->getRunnable());
539       }
540       it = tasks_.erase(it);
541       ++expiredCount_;
542       if (justOne) {
543         return;
544       }
545     }
546     else
547     {
548       ++it;
549     }
550   }
551 }
552 
setExpireCallback(ExpireCallback expireCallback)553 void ThreadManager::Impl::setExpireCallback(ExpireCallback expireCallback) {
554   Guard g(mutex_);
555   expireCallback_ = expireCallback;
556 }
557 
558 class SimpleThreadManager : public ThreadManager::Impl {
559 
560 public:
SimpleThreadManager(size_t workerCount=4,size_t pendingTaskCountMax=0)561   SimpleThreadManager(size_t workerCount = 4, size_t pendingTaskCountMax = 0)
562     : workerCount_(workerCount), pendingTaskCountMax_(pendingTaskCountMax) {}
563 
start()564   void start() override {
565     ThreadManager::Impl::pendingTaskCountMax(pendingTaskCountMax_);
566     ThreadManager::Impl::start();
567     addWorker(workerCount_);
568   }
569 
570 private:
571   const size_t workerCount_;
572   const size_t pendingTaskCountMax_;
573 };
574 
newThreadManager()575 shared_ptr<ThreadManager> ThreadManager::newThreadManager() {
576   return shared_ptr<ThreadManager>(new ThreadManager::Impl());
577 }
578 
newSimpleThreadManager(size_t count,size_t pendingTaskCountMax)579 shared_ptr<ThreadManager> ThreadManager::newSimpleThreadManager(size_t count,
580                                                                 size_t pendingTaskCountMax) {
581   return shared_ptr<ThreadManager>(new SimpleThreadManager(count, pendingTaskCountMax));
582 }
583 }
584 }
585 } // apache::thrift::concurrency
586