1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #include <thrift/thrift-config.h>
21 
22 #include <thrift/concurrency/ThreadManager.h>
23 #include <thrift/concurrency/Exception.h>
24 #include <thrift/concurrency/Monitor.h>
25 #include <thrift/concurrency/Util.h>
26 
27 #include <thrift/stdcxx.h>
28 
29 #include <stdexcept>
30 #include <deque>
31 #include <set>
32 
33 namespace apache {
34 namespace thrift {
35 namespace concurrency {
36 
37 using stdcxx::shared_ptr;
38 using stdcxx::dynamic_pointer_cast;
39 
40 /**
41  * ThreadManager class
42  *
43  * This class manages a pool of threads. It uses a ThreadFactory to create
44  * threads.  It never actually creates or destroys worker threads, rather
45  * it maintains statistics on number of idle threads, number of active threads,
46  * task backlog, and average wait and service times.
47  *
48  * There are three different monitors used for signaling different conditions
49  * however they all share the same mutex_.
50  *
51  * @version $Id:$
52  */
53 class ThreadManager::Impl : public ThreadManager {
54 
55 public:
Impl()56   Impl()
57     : workerCount_(0),
58       workerMaxCount_(0),
59       idleCount_(0),
60       pendingTaskCountMax_(0),
61       expiredCount_(0),
62       state_(ThreadManager::UNINITIALIZED),
63       monitor_(&mutex_),
64       maxMonitor_(&mutex_),
65       workerMonitor_(&mutex_) {}
66 
~Impl()67   ~Impl() { stop(); }
68 
69   void start();
70   void stop();
71 
state() const72   ThreadManager::STATE state() const { return state_; }
73 
threadFactory() const74   shared_ptr<ThreadFactory> threadFactory() const {
75     Guard g(mutex_);
76     return threadFactory_;
77   }
78 
threadFactory(shared_ptr<ThreadFactory> value)79   void threadFactory(shared_ptr<ThreadFactory> value) {
80     Guard g(mutex_);
81     if (threadFactory_ && threadFactory_->isDetached() != value->isDetached()) {
82       throw InvalidArgumentException();
83     }
84     threadFactory_ = value;
85   }
86 
87   void addWorker(size_t value);
88 
89   void removeWorker(size_t value);
90 
idleWorkerCount() const91   size_t idleWorkerCount() const { return idleCount_; }
92 
workerCount() const93   size_t workerCount() const {
94     Guard g(mutex_);
95     return workerCount_;
96   }
97 
pendingTaskCount() const98   size_t pendingTaskCount() const {
99     Guard g(mutex_);
100     return tasks_.size();
101   }
102 
totalTaskCount() const103   size_t totalTaskCount() const {
104     Guard g(mutex_);
105     return tasks_.size() + workerCount_ - idleCount_;
106   }
107 
pendingTaskCountMax() const108   size_t pendingTaskCountMax() const {
109     Guard g(mutex_);
110     return pendingTaskCountMax_;
111   }
112 
expiredTaskCount()113   size_t expiredTaskCount() {
114     Guard g(mutex_);
115     return expiredCount_;
116   }
117 
pendingTaskCountMax(const size_t value)118   void pendingTaskCountMax(const size_t value) {
119     Guard g(mutex_);
120     pendingTaskCountMax_ = value;
121   }
122 
123   void add(shared_ptr<Runnable> value, int64_t timeout, int64_t expiration);
124 
125   void remove(shared_ptr<Runnable> task);
126 
127   shared_ptr<Runnable> removeNextPending();
128 
removeExpiredTasks()129   void removeExpiredTasks() {
130     removeExpired(false);
131   }
132 
133   void setExpireCallback(ExpireCallback expireCallback);
134 
135 private:
136   /**
137    * Remove one or more expired tasks.
138    * \param[in]  justOne  if true, try to remove just one task and return
139    */
140   void removeExpired(bool justOne);
141 
142   /**
143    * \returns whether it is acceptable to block, depending on the current thread id
144    */
145   bool canSleep() const;
146 
147   /**
148    * Lowers the maximum worker count and blocks until enough worker threads complete
149    * to get to the new maximum worker limit.  The caller is responsible for acquiring
150    * a lock on the class mutex_.
151    */
152   void removeWorkersUnderLock(size_t value);
153 
154   size_t workerCount_;
155   size_t workerMaxCount_;
156   size_t idleCount_;
157   size_t pendingTaskCountMax_;
158   size_t expiredCount_;
159   ExpireCallback expireCallback_;
160 
161   ThreadManager::STATE state_;
162   shared_ptr<ThreadFactory> threadFactory_;
163 
164   friend class ThreadManager::Task;
165   typedef std::deque<shared_ptr<Task> > TaskQueue;
166   TaskQueue tasks_;
167   Mutex mutex_;
168   Monitor monitor_;
169   Monitor maxMonitor_;
170   Monitor workerMonitor_;       // used to synchronize changes in worker count
171 
172   friend class ThreadManager::Worker;
173   std::set<shared_ptr<Thread> > workers_;
174   std::set<shared_ptr<Thread> > deadWorkers_;
175   std::map<const Thread::id_t, shared_ptr<Thread> > idMap_;
176 };
177 
178 class ThreadManager::Task : public Runnable {
179 
180 public:
181   enum STATE { WAITING, EXECUTING, TIMEDOUT, COMPLETE };
182 
Task(shared_ptr<Runnable> runnable,int64_t expiration=0LL)183   Task(shared_ptr<Runnable> runnable, int64_t expiration = 0LL)
184     : runnable_(runnable),
185       state_(WAITING),
186       expireTime_(expiration != 0LL ? Util::currentTime() + expiration : 0LL) {}
187 
~Task()188   ~Task() {}
189 
run()190   void run() {
191     if (state_ == EXECUTING) {
192       runnable_->run();
193       state_ = COMPLETE;
194     }
195   }
196 
getRunnable()197   shared_ptr<Runnable> getRunnable() { return runnable_; }
198 
getExpireTime() const199   int64_t getExpireTime() const { return expireTime_; }
200 
201 private:
202   shared_ptr<Runnable> runnable_;
203   friend class ThreadManager::Worker;
204   STATE state_;
205   int64_t expireTime_;
206 };
207 
208 class ThreadManager::Worker : public Runnable {
209   enum STATE { UNINITIALIZED, STARTING, STARTED, STOPPING, STOPPED };
210 
211 public:
Worker(ThreadManager::Impl * manager)212   Worker(ThreadManager::Impl* manager) : manager_(manager), state_(UNINITIALIZED) {}
213 
~Worker()214   ~Worker() {}
215 
216 private:
isActive() const217   bool isActive() const {
218     return (manager_->workerCount_ <= manager_->workerMaxCount_)
219            || (manager_->state_ == JOINING && !manager_->tasks_.empty());
220   }
221 
222 public:
223   /**
224    * Worker entry point
225    *
226    * As long as worker thread is running, pull tasks off the task queue and
227    * execute.
228    */
run()229   void run() {
230     Guard g(manager_->mutex_);
231 
232     /**
233      * This method has three parts; one is to check for and account for
234      * admitting a task which happens under a lock.  Then the lock is released
235      * and the task itself is executed.  Finally we do some accounting
236      * under lock again when the task completes.
237      */
238 
239     /**
240      * Admitting
241      */
242 
243     /**
244      * Increment worker semaphore and notify manager if worker count reached
245      * desired max
246      */
247     bool active = manager_->workerCount_ < manager_->workerMaxCount_;
248     if (active) {
249       if (++manager_->workerCount_ == manager_->workerMaxCount_) {
250         manager_->workerMonitor_.notify();
251       }
252     }
253 
254     while (active) {
255       /**
256         * While holding manager monitor block for non-empty task queue (Also
257         * check that the thread hasn't been requested to stop). Once the queue
258         * is non-empty, dequeue a task, release monitor, and execute. If the
259         * worker max count has been decremented such that we exceed it, mark
260         * ourself inactive, decrement the worker count and notify the manager
261         * (technically we're notifying the next blocked thread but eventually
262         * the manager will see it.
263         */
264       active = isActive();
265 
266       while (active && manager_->tasks_.empty()) {
267         manager_->idleCount_++;
268         manager_->monitor_.wait();
269         active = isActive();
270         manager_->idleCount_--;
271       }
272 
273       shared_ptr<ThreadManager::Task> task;
274 
275       if (active) {
276         if (!manager_->tasks_.empty()) {
277           task = manager_->tasks_.front();
278           manager_->tasks_.pop_front();
279           if (task->state_ == ThreadManager::Task::WAITING) {
280             // If the state is changed to anything other than EXECUTING or TIMEDOUT here
281             // then the execution loop needs to be changed below.
282             task->state_ =
283                 (task->getExpireTime() && task->getExpireTime() < Util::currentTime()) ?
284                     ThreadManager::Task::TIMEDOUT :
285                     ThreadManager::Task::EXECUTING;
286           }
287         }
288 
289         /* If we have a pending task max and we just dropped below it, wakeup any
290             thread that might be blocked on add. */
291         if (manager_->pendingTaskCountMax_ != 0
292             && manager_->tasks_.size() <= manager_->pendingTaskCountMax_ - 1) {
293           manager_->maxMonitor_.notify();
294         }
295       }
296 
297       /**
298        * Execution - not holding a lock
299        */
300       if (task) {
301         if (task->state_ == ThreadManager::Task::EXECUTING) {
302 
303           // Release the lock so we can run the task without blocking the thread manager
304           manager_->mutex_.unlock();
305 
306           try {
307             task->run();
308           } catch (const std::exception& e) {
309             GlobalOutput.printf("[ERROR] task->run() raised an exception: %s", e.what());
310           } catch (...) {
311             GlobalOutput.printf("[ERROR] task->run() raised an unknown exception");
312           }
313 
314           // Re-acquire the lock to proceed in the thread manager
315           manager_->mutex_.lock();
316 
317         } else if (manager_->expireCallback_) {
318           // The only other state the task could have been in is TIMEDOUT (see above)
319           manager_->expireCallback_(task->getRunnable());
320           manager_->expiredCount_++;
321         }
322       }
323     }
324 
325     /**
326      * Final accounting for the worker thread that is done working
327      */
328     manager_->deadWorkers_.insert(this->thread());
329     if (--manager_->workerCount_ == manager_->workerMaxCount_) {
330       manager_->workerMonitor_.notify();
331     }
332   }
333 
334 private:
335   ThreadManager::Impl* manager_;
336   friend class ThreadManager::Impl;
337   STATE state_;
338 };
339 
addWorker(size_t value)340 void ThreadManager::Impl::addWorker(size_t value) {
341   std::set<shared_ptr<Thread> > newThreads;
342   for (size_t ix = 0; ix < value; ix++) {
343     shared_ptr<ThreadManager::Worker> worker
344         = shared_ptr<ThreadManager::Worker>(new ThreadManager::Worker(this));
345     newThreads.insert(threadFactory_->newThread(worker));
346   }
347 
348   Guard g(mutex_);
349   workerMaxCount_ += value;
350   workers_.insert(newThreads.begin(), newThreads.end());
351 
352   for (std::set<shared_ptr<Thread> >::iterator ix = newThreads.begin(); ix != newThreads.end();
353        ++ix) {
354     shared_ptr<ThreadManager::Worker> worker
355         = dynamic_pointer_cast<ThreadManager::Worker, Runnable>((*ix)->runnable());
356     worker->state_ = ThreadManager::Worker::STARTING;
357     (*ix)->start();
358     idMap_.insert(std::pair<const Thread::id_t, shared_ptr<Thread> >((*ix)->getId(), *ix));
359   }
360 
361   while (workerCount_ != workerMaxCount_) {
362     workerMonitor_.wait();
363   }
364 }
365 
start()366 void ThreadManager::Impl::start() {
367   Guard g(mutex_);
368   if (state_ == ThreadManager::STOPPED) {
369     return;
370   }
371 
372   if (state_ == ThreadManager::UNINITIALIZED) {
373     if (!threadFactory_) {
374       throw InvalidArgumentException();
375     }
376     state_ = ThreadManager::STARTED;
377     monitor_.notifyAll();
378   }
379 
380   while (state_ == STARTING) {
381     monitor_.wait();
382   }
383 }
384 
stop()385 void ThreadManager::Impl::stop() {
386   Guard g(mutex_);
387   bool doStop = false;
388 
389   if (state_ != ThreadManager::STOPPING && state_ != ThreadManager::JOINING
390       && state_ != ThreadManager::STOPPED) {
391     doStop = true;
392     state_ = ThreadManager::JOINING;
393   }
394 
395   if (doStop) {
396     removeWorkersUnderLock(workerCount_);
397   }
398 
399   state_ = ThreadManager::STOPPED;
400 }
401 
removeWorker(size_t value)402 void ThreadManager::Impl::removeWorker(size_t value) {
403   Guard g(mutex_);
404   removeWorkersUnderLock(value);
405 }
406 
removeWorkersUnderLock(size_t value)407 void ThreadManager::Impl::removeWorkersUnderLock(size_t value) {
408   if (value > workerMaxCount_) {
409     throw InvalidArgumentException();
410   }
411 
412   workerMaxCount_ -= value;
413 
414   if (idleCount_ > value) {
415     // There are more idle workers than we need to remove,
416     // so notify enough of them so they can terminate.
417     for (size_t ix = 0; ix < value; ix++) {
418       monitor_.notify();
419     }
420   } else {
421     // There are as many or less idle workers than we need to remove,
422     // so just notify them all so they can terminate.
423     monitor_.notifyAll();
424   }
425 
426   while (workerCount_ != workerMaxCount_) {
427     workerMonitor_.wait();
428   }
429 
430   for (std::set<shared_ptr<Thread> >::iterator ix = deadWorkers_.begin();
431        ix != deadWorkers_.end();
432        ++ix) {
433 
434     // when used with a joinable thread factory, we join the threads as we remove them
435     if (!threadFactory_->isDetached()) {
436       (*ix)->join();
437     }
438 
439     idMap_.erase((*ix)->getId());
440     workers_.erase(*ix);
441   }
442 
443   deadWorkers_.clear();
444 }
445 
canSleep() const446 bool ThreadManager::Impl::canSleep() const {
447   const Thread::id_t id = threadFactory_->getCurrentThreadId();
448   return idMap_.find(id) == idMap_.end();
449 }
450 
add(shared_ptr<Runnable> value,int64_t timeout,int64_t expiration)451 void ThreadManager::Impl::add(shared_ptr<Runnable> value, int64_t timeout, int64_t expiration) {
452   Guard g(mutex_, timeout);
453 
454   if (!g) {
455     throw TimedOutException();
456   }
457 
458   if (state_ != ThreadManager::STARTED) {
459     throw IllegalStateException(
460         "ThreadManager::Impl::add ThreadManager "
461         "not started");
462   }
463 
464   // if we're at a limit, remove an expired task to see if the limit clears
465   if (pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) {
466     removeExpired(true);
467   }
468 
469   if (pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) {
470     if (canSleep() && timeout >= 0) {
471       while (pendingTaskCountMax_ > 0 && tasks_.size() >= pendingTaskCountMax_) {
472         // This is thread safe because the mutex is shared between monitors.
473         maxMonitor_.wait(timeout);
474       }
475     } else {
476       throw TooManyPendingTasksException();
477     }
478   }
479 
480   tasks_.push_back(shared_ptr<ThreadManager::Task>(new ThreadManager::Task(value, expiration)));
481 
482   // If idle thread is available notify it, otherwise all worker threads are
483   // running and will get around to this task in time.
484   if (idleCount_ > 0) {
485     monitor_.notify();
486   }
487 }
488 
remove(shared_ptr<Runnable> task)489 void ThreadManager::Impl::remove(shared_ptr<Runnable> task) {
490   Guard g(mutex_);
491   if (state_ != ThreadManager::STARTED) {
492     throw IllegalStateException(
493         "ThreadManager::Impl::remove ThreadManager not "
494         "started");
495   }
496 
497   for (TaskQueue::iterator it = tasks_.begin(); it != tasks_.end(); ++it)
498   {
499     if ((*it)->getRunnable() == task)
500     {
501       tasks_.erase(it);
502       return;
503     }
504   }
505 }
506 
removeNextPending()507 stdcxx::shared_ptr<Runnable> ThreadManager::Impl::removeNextPending() {
508   Guard g(mutex_);
509   if (state_ != ThreadManager::STARTED) {
510     throw IllegalStateException(
511         "ThreadManager::Impl::removeNextPending "
512         "ThreadManager not started");
513   }
514 
515   if (tasks_.empty()) {
516     return stdcxx::shared_ptr<Runnable>();
517   }
518 
519   shared_ptr<ThreadManager::Task> task = tasks_.front();
520   tasks_.pop_front();
521 
522   return task->getRunnable();
523 }
524 
removeExpired(bool justOne)525 void ThreadManager::Impl::removeExpired(bool justOne) {
526   // this is always called under a lock
527   int64_t now = 0LL;
528 
529   for (TaskQueue::iterator it = tasks_.begin(); it != tasks_.end(); )
530   {
531     if (now == 0LL) {
532       now = Util::currentTime();
533     }
534 
535     if ((*it)->getExpireTime() > 0LL && (*it)->getExpireTime() < now) {
536       if (expireCallback_) {
537         expireCallback_((*it)->getRunnable());
538       }
539       it = tasks_.erase(it);
540       ++expiredCount_;
541       if (justOne) {
542         return;
543       }
544     }
545     else
546     {
547       ++it;
548     }
549   }
550 }
551 
setExpireCallback(ExpireCallback expireCallback)552 void ThreadManager::Impl::setExpireCallback(ExpireCallback expireCallback) {
553   Guard g(mutex_);
554   expireCallback_ = expireCallback;
555 }
556 
557 class SimpleThreadManager : public ThreadManager::Impl {
558 
559 public:
SimpleThreadManager(size_t workerCount=4,size_t pendingTaskCountMax=0)560   SimpleThreadManager(size_t workerCount = 4, size_t pendingTaskCountMax = 0)
561     : workerCount_(workerCount), pendingTaskCountMax_(pendingTaskCountMax) {}
562 
start()563   void start() {
564     ThreadManager::Impl::pendingTaskCountMax(pendingTaskCountMax_);
565     ThreadManager::Impl::start();
566     addWorker(workerCount_);
567   }
568 
569 private:
570   const size_t workerCount_;
571   const size_t pendingTaskCountMax_;
572 };
573 
newThreadManager()574 shared_ptr<ThreadManager> ThreadManager::newThreadManager() {
575   return shared_ptr<ThreadManager>(new ThreadManager::Impl());
576 }
577 
newSimpleThreadManager(size_t count,size_t pendingTaskCountMax)578 shared_ptr<ThreadManager> ThreadManager::newSimpleThreadManager(size_t count,
579                                                                 size_t pendingTaskCountMax) {
580   return shared_ptr<ThreadManager>(new SimpleThreadManager(count, pendingTaskCountMax));
581 }
582 }
583 }
584 } // apache::thrift::concurrency
585