1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 #include <thrift/thrift-config.h>
21
22 #include <thrift/concurrency/ThreadManager.h>
23 #include <thrift/concurrency/Exception.h>
24 #include <thrift/concurrency/Monitor.h>
25 #include <thrift/concurrency/Util.h>
26
27 #include <thrift/stdcxx.h>
28
29 #include <stdexcept>
30 #include <deque>
31 #include <set>
32
33 namespace apache {
34 namespace thrift {
35 namespace concurrency {
36
37 using stdcxx::shared_ptr;
38 using stdcxx::dynamic_pointer_cast;
39
40 /**
41 * ThreadManager class
42 *
43 * This class manages a pool of threads. It uses a ThreadFactory to create
44 * threads. It never actually creates or destroys worker threads, rather
45 * it maintains statistics on number of idle threads, number of active threads,
46 * task backlog, and average wait and service times.
47 *
48 * There are three different monitors used for signaling different conditions
49 * however they all share the same mutex_.
50 *
51 * @version $Id:$
52 */
53 class ThreadManager::Impl : public ThreadManager {
54
55 public:
Impl()56 Impl()
57 : workerCount_(0),
58 workerMaxCount_(0),
59 idleCount_(0),
60 pendingTaskCountMax_(0),
61 expiredCount_(0),
62 state_(ThreadManager::UNINITIALIZED),
63 monitor_(&mutex_),
64 maxMonitor_(&mutex_),
65 workerMonitor_(&mutex_) {}
66
~Impl()67 ~Impl() { stop(); }
68
69 void start();
70 void stop();
71
state() const72 ThreadManager::STATE state() const { return state_; }
73
threadFactory() const74 shared_ptr<ThreadFactory> threadFactory() const {
75 Guard g(mutex_);
76 return threadFactory_;
77 }
78
threadFactory(shared_ptr<ThreadFactory> value)79 void threadFactory(shared_ptr<ThreadFactory> value) {
80 Guard g(mutex_);
81 if (threadFactory_ && threadFactory_->isDetached() != value->isDetached()) {
82 throw InvalidArgumentException();
83 }
84 threadFactory_ = value;
85 }
86
87 void addWorker(size_t value);
88
89 void removeWorker(size_t value);
90
idleWorkerCount() const91 size_t idleWorkerCount() const { return idleCount_; }
92
workerCount() const93 size_t workerCount() const {
94 Guard g(mutex_);
95 return workerCount_;
96 }
97
pendingTaskCount() const98 size_t pendingTaskCount() const {
99 Guard g(mutex_);
100 return tasks_.size();
101 }
102
totalTaskCount() const103 size_t totalTaskCount() const {
104 Guard g(mutex_);
105 return tasks_.size() + workerCount_ - idleCount_;
106 }
107
pendingTaskCountMax() const108 size_t pendingTaskCountMax() const {
109 Guard g(mutex_);
110 return pendingTaskCountMax_;
111 }
112
expiredTaskCount()113 size_t expiredTaskCount() {
114 Guard g(mutex_);
115 return expiredCount_;
116 }
117
pendingTaskCountMax(const size_t value)118 void pendingTaskCountMax(const size_t value) {
119 Guard g(mutex_);
120 pendingTaskCountMax_ = value;
121 }
122
123 void add(shared_ptr<Runnable> value, int64_t timeout, int64_t expiration);
124
125 void remove(shared_ptr<Runnable> task);
126
127 shared_ptr<Runnable> removeNextPending();
128
removeExpiredTasks()129 void removeExpiredTasks() {
130 removeExpired(false);
131 }
132
133 void setExpireCallback(ExpireCallback expireCallback);
134
135 private:
136 /**
137 * Remove one or more expired tasks.
138 * \param[in] justOne if true, try to remove just one task and return
139 */
140 void removeExpired(bool justOne);
141
142 /**
143 * \returns whether it is acceptable to block, depending on the current thread id
144 */
145 bool canSleep() const;
146
147 /**
148 * Lowers the maximum worker count and blocks until enough worker threads complete
149 * to get to the new maximum worker limit. The caller is responsible for acquiring
150 * a lock on the class mutex_.
151 */
152 void removeWorkersUnderLock(size_t value);
153
154 size_t workerCount_;
155 size_t workerMaxCount_;
156 size_t idleCount_;
157 size_t pendingTaskCountMax_;
158 size_t expiredCount_;
159 ExpireCallback expireCallback_;
160
161 ThreadManager::STATE state_;
162 shared_ptr<ThreadFactory> threadFactory_;
163
164 friend class ThreadManager::Task;
165 typedef std::deque<shared_ptr<Task> > TaskQueue;
166 TaskQueue tasks_;
167 Mutex mutex_;
168 Monitor monitor_;
169 Monitor maxMonitor_;
170 Monitor workerMonitor_; // used to synchronize changes in worker count
171
172 friend class ThreadManager::Worker;
173 std::set<shared_ptr<Thread> > workers_;
174 std::set<shared_ptr<Thread> > deadWorkers_;
175 std::map<const Thread::id_t, shared_ptr<Thread> > idMap_;
176 };
177
178 class ThreadManager::Task : public Runnable {
179
180 public:
181 enum STATE { WAITING, EXECUTING, TIMEDOUT, COMPLETE };
182
Task(shared_ptr<Runnable> runnable,int64_t expiration=0LL)183 Task(shared_ptr<Runnable> runnable, int64_t expiration = 0LL)
184 : runnable_(runnable),
185 state_(WAITING),
186 expireTime_(expiration != 0LL ? Util::currentTime() + expiration : 0LL) {}
187
~Task()188 ~Task() {}
189
run()190 void run() {
191 if (state_ == EXECUTING) {
192 runnable_->run();
193 state_ = COMPLETE;
194 }
195 }
196
getRunnable()197 shared_ptr<Runnable> getRunnable() { return runnable_; }
198
getExpireTime() const199 int64_t getExpireTime() const { return expireTime_; }
200
201 private:
202 shared_ptr<Runnable> runnable_;
203 friend class ThreadManager::Worker;
204 STATE state_;
205 int64_t expireTime_;
206 };
207
208 class ThreadManager::Worker : public Runnable {
209 enum STATE { UNINITIALIZED, STARTING, STARTED, STOPPING, STOPPED };
210
211 public:
Worker(ThreadManager::Impl * manager)212 Worker(ThreadManager::Impl* manager) : manager_(manager), state_(UNINITIALIZED) {}
213
~Worker()214 ~Worker() {}
215
216 private:
isActive() const217 bool isActive() const {
218 return (manager_->workerCount_ <= manager_->workerMaxCount_)
219 || (manager_->state_ == JOINING && !manager_->tasks_.empty());
220 }
221
222 public:
223 /**
224 * Worker entry point
225 *
226 * As long as worker thread is running, pull tasks off the task queue and
227 * execute.
228 */
run()229 void run() {
230 Guard g(manager_->mutex_);
231
232 /**
233 * This method has three parts; one is to check for and account for
234 * admitting a task which happens under a lock. Then the lock is released
235 * and the task itself is executed. Finally we do some accounting
236 * under lock again when the task completes.
237 */
238
239 /**
240 * Admitting
241 */
242
243 /**
244 * Increment worker semaphore and notify manager if worker count reached
245 * desired max
246 */
247 bool active = manager_->workerCount_ < manager_->workerMaxCount_;
248 if (active) {
249 if (++manager_->workerCount_ == manager_->workerMaxCount_) {
250 manager_->workerMonitor_.notify();
251 }
252 }
253
254 while (active) {
255 /**
256 * While holding manager monitor block for non-empty task queue (Also
257 * check that the thread hasn't been requested to stop). Once the queue
258 * is non-empty, dequeue a task, release monitor, and execute. If the
259 * worker max count has been decremented such that we exceed it, mark
260 * ourself inactive, decrement the worker count and notify the manager
261 * (technically we're notifying the next blocked thread but eventually
262 * the manager will see it.
263 */
264 active = isActive();
265
266 while (active && manager_->tasks_.empty()) {
267 manager_->idleCount_++;
268 manager_->monitor_.wait();
269 active = isActive();
270 manager_->idleCount_--;
271 }
272
273 shared_ptr<ThreadManager::Task> task;
274
275 if (active) {
276 if (!manager_->tasks_.empty()) {
277 task = manager_->tasks_.front();
278 manager_->tasks_.pop_front();
279 if (task->state_ == ThreadManager::Task::WAITING) {
280 // If the state is changed to anything other than EXECUTING or TIMEDOUT here
281 // then the execution loop needs to be changed below.
282 task->state_ =
283 (task->getExpireTime() && task->getExpireTime() < Util::currentTime()) ?
284 ThreadManager::Task::TIMEDOUT :
285 ThreadManager::Task::EXECUTING;
286 }
287 }
288
289 /* If we have a pending task max and we just dropped below it, wakeup any
290 thread that might be blocked on add. */
291 if (manager_->pendingTaskCountMax_ != 0
292 && manager_->tasks_.size() <= manager_->pendingTaskCountMax_ - 1) {
293 manager_->maxMonitor_.notify();
294 }
295 }
296
297 /**
298 * Execution - not holding a lock
299 */
300 if (task) {
301 if (task->state_ == ThreadManager::Task::EXECUTING) {
302
303 // Release the lock so we can run the task without blocking the thread manager
304 manager_->mutex_.unlock();
305
306 try {
307 task->run();
308 } catch (const std::exception& e) {
309 GlobalOutput.printf("[ERROR] task->run() raised an exception: %s", e.what());
310 } catch (...) {
311 GlobalOutput.printf("[ERROR] task->run() raised an unknown exception");
312 }
313
314 // Re-acquire the lock to proceed in the thread manager
315 manager_->mutex_.lock();
316
317 } else if (manager_->expireCallback_) {
318 // The only other state the task could have been in is TIMEDOUT (see above)
319 manager_->expireCallback_(task->getRunnable());
320 manager_->expiredCount_++;
321 }
322 }
323 }
324
325 /**
326 * Final accounting for the worker thread that is done working
327 */
328 manager_->deadWorkers_.insert(this->thread());
329 if (--manager_->workerCount_ == manager_->workerMaxCount_) {
330 manager_->workerMonitor_.notify();
331 }
332 }
333
334 private:
335 ThreadManager::Impl* manager_;
336 friend class ThreadManager::Impl;
337 STATE state_;
338 };
339
addWorker(size_t value)340 void ThreadManager::Impl::addWorker(size_t value) {
341 std::set<shared_ptr<Thread> > newThreads;
342 for (size_t ix = 0; ix < value; ix++) {
343 shared_ptr<ThreadManager::Worker> worker
344 = shared_ptr<ThreadManager::Worker>(new ThreadManager::Worker(this));
345 newThreads.insert(threadFactory_->newThread(worker));
346 }
347
348 Guard g(mutex_);
349 workerMaxCount_ += value;
350 workers_.insert(newThreads.begin(), newThreads.end());
351
352 for (std::set<shared_ptr<Thread> >::iterator ix = newThreads.begin(); ix != newThreads.end();
353 ++ix) {
354 shared_ptr<ThreadManager::Worker> worker
355 = dynamic_pointer_cast<ThreadManager::Worker, Runnable>((*ix)->runnable());
356 worker->state_ = ThreadManager::Worker::STARTING;
357 (*ix)->start();
358 idMap_.insert(std::pair<const Thread::id_t, shared_ptr<Thread> >((*ix)->getId(), *ix));
359 }
360
361 while (workerCount_ != workerMaxCount_) {
362 workerMonitor_.wait();
363 }
364 }
365
start()366 void ThreadManager::Impl::start() {
367 Guard g(mutex_);
368 if (state_ == ThreadManager::STOPPED) {
369 return;
370 }
371
372 if (state_ == ThreadManager::UNINITIALIZED) {
373 if (!threadFactory_) {
374 throw InvalidArgumentException();
375 }
376 state_ = ThreadManager::STARTED;
377 monitor_.notifyAll();
378 }
379
380 while (state_ == STARTING) {
381 monitor_.wait();
382 }
383 }
384
stop()385 void ThreadManager::Impl::stop() {
386 Guard g(mutex_);
387 bool doStop = false;
388
389 if (state_ != ThreadManager::STOPPING && state_ != ThreadManager::JOINING
390 && state_ != ThreadManager::STOPPED) {
391 doStop = true;
392 state_ = ThreadManager::JOINING;
393 }
394
395 if (doStop) {
396 removeWorkersUnderLock(workerCount_);
397 }
398
399 state_ = ThreadManager::STOPPED;
400 }
401
removeWorker(size_t value)402 void ThreadManager::Impl::removeWorker(size_t value) {
403 Guard g(mutex_);
404 removeWorkersUnderLock(value);
405 }
406
removeWorkersUnderLock(size_t value)407 void ThreadManager::Impl::removeWorkersUnderLock(size_t value) {
408 if (value > workerMaxCount_) {
409 throw InvalidArgumentException();
410 }
411
412 workerMaxCount_ -= value;
413
414 if (idleCount_ > value) {
415 // There are more idle workers than we need to remove,
416 // so notify enough of them so they can terminate.
417 for (size_t ix = 0; ix < value; ix++) {
418 monitor_.notify();
419 }
420 } else {
421 // There are as many or less idle workers than we need to remove,
422 // so just notify them all so they can terminate.
423 monitor_.notifyAll();
424 }
425
426 while (workerCount_ != workerMaxCount_) {
427 workerMonitor_.wait();
428 }
429
430 for (std::set<shared_ptr<Thread> >::iterator ix = deadWorkers_.begin();
431 ix != deadWorkers_.end();
432 ++ix) {
433
434 // when used with a joinable thread factory, we join the threads as we remove them
435 if (!threadFactory_->isDetached()) {
436 (*ix)->join();
437 }
438
439 idMap_.erase((*ix)->getId());
440 workers_.erase(*ix);
441 }
442
443 deadWorkers_.clear();
444 }
445
canSleep() const446 bool ThreadManager::Impl::canSleep() const {
447 const Thread::id_t id = threadFactory_->getCurrentThreadId();
448 return idMap_.find(id) == idMap_.end();
449 }
450
add(shared_ptr<Runnable> value,int64_t timeout,int64_t expiration)451 void ThreadManager::Impl::add(shared_ptr<Runnable> value, int64_t timeout, int64_t expiration) {
452 Guard g(mutex_, timeout);
453
454 if (!g) {
455 throw TimedOutException();
456 }
457
458 if (state_ != ThreadManager::STARTED) {
459 throw IllegalStateException(
460 "ThreadManager::Impl::add ThreadManager "
461 "not started");
462 }
463
464 // if we're at a limit, remove an expired task to see if the limit clears
465 if (pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) {
466 removeExpired(true);
467 }
468
469 if (pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) {
470 if (canSleep() && timeout >= 0) {
471 while (pendingTaskCountMax_ > 0 && tasks_.size() >= pendingTaskCountMax_) {
472 // This is thread safe because the mutex is shared between monitors.
473 maxMonitor_.wait(timeout);
474 }
475 } else {
476 throw TooManyPendingTasksException();
477 }
478 }
479
480 tasks_.push_back(shared_ptr<ThreadManager::Task>(new ThreadManager::Task(value, expiration)));
481
482 // If idle thread is available notify it, otherwise all worker threads are
483 // running and will get around to this task in time.
484 if (idleCount_ > 0) {
485 monitor_.notify();
486 }
487 }
488
remove(shared_ptr<Runnable> task)489 void ThreadManager::Impl::remove(shared_ptr<Runnable> task) {
490 Guard g(mutex_);
491 if (state_ != ThreadManager::STARTED) {
492 throw IllegalStateException(
493 "ThreadManager::Impl::remove ThreadManager not "
494 "started");
495 }
496
497 for (TaskQueue::iterator it = tasks_.begin(); it != tasks_.end(); ++it)
498 {
499 if ((*it)->getRunnable() == task)
500 {
501 tasks_.erase(it);
502 return;
503 }
504 }
505 }
506
removeNextPending()507 stdcxx::shared_ptr<Runnable> ThreadManager::Impl::removeNextPending() {
508 Guard g(mutex_);
509 if (state_ != ThreadManager::STARTED) {
510 throw IllegalStateException(
511 "ThreadManager::Impl::removeNextPending "
512 "ThreadManager not started");
513 }
514
515 if (tasks_.empty()) {
516 return stdcxx::shared_ptr<Runnable>();
517 }
518
519 shared_ptr<ThreadManager::Task> task = tasks_.front();
520 tasks_.pop_front();
521
522 return task->getRunnable();
523 }
524
removeExpired(bool justOne)525 void ThreadManager::Impl::removeExpired(bool justOne) {
526 // this is always called under a lock
527 int64_t now = 0LL;
528
529 for (TaskQueue::iterator it = tasks_.begin(); it != tasks_.end(); )
530 {
531 if (now == 0LL) {
532 now = Util::currentTime();
533 }
534
535 if ((*it)->getExpireTime() > 0LL && (*it)->getExpireTime() < now) {
536 if (expireCallback_) {
537 expireCallback_((*it)->getRunnable());
538 }
539 it = tasks_.erase(it);
540 ++expiredCount_;
541 if (justOne) {
542 return;
543 }
544 }
545 else
546 {
547 ++it;
548 }
549 }
550 }
551
setExpireCallback(ExpireCallback expireCallback)552 void ThreadManager::Impl::setExpireCallback(ExpireCallback expireCallback) {
553 Guard g(mutex_);
554 expireCallback_ = expireCallback;
555 }
556
557 class SimpleThreadManager : public ThreadManager::Impl {
558
559 public:
SimpleThreadManager(size_t workerCount=4,size_t pendingTaskCountMax=0)560 SimpleThreadManager(size_t workerCount = 4, size_t pendingTaskCountMax = 0)
561 : workerCount_(workerCount), pendingTaskCountMax_(pendingTaskCountMax) {}
562
start()563 void start() {
564 ThreadManager::Impl::pendingTaskCountMax(pendingTaskCountMax_);
565 ThreadManager::Impl::start();
566 addWorker(workerCount_);
567 }
568
569 private:
570 const size_t workerCount_;
571 const size_t pendingTaskCountMax_;
572 };
573
newThreadManager()574 shared_ptr<ThreadManager> ThreadManager::newThreadManager() {
575 return shared_ptr<ThreadManager>(new ThreadManager::Impl());
576 }
577
newSimpleThreadManager(size_t count,size_t pendingTaskCountMax)578 shared_ptr<ThreadManager> ThreadManager::newSimpleThreadManager(size_t count,
579 size_t pendingTaskCountMax) {
580 return shared_ptr<ThreadManager>(new SimpleThreadManager(count, pendingTaskCountMax));
581 }
582 }
583 }
584 } // apache::thrift::concurrency
585