1 /*
2 * Copyright (c) Facebook, Inc. and its affiliates.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <folly/executors/ThreadPoolExecutor.h>
18
19 #include <ctime>
20
21 #include <folly/executors/GlobalThreadPoolList.h>
22 #include <folly/portability/PThread.h>
23 #include <folly/synchronization/AsymmetricMemoryBarrier.h>
24 #include <folly/tracing/StaticTracepoint.h>
25
26 namespace folly {
27
28 using SyncVecThreadPoolExecutors =
29 folly::Synchronized<std::vector<ThreadPoolExecutor*>>;
30
getSyncVecThreadPoolExecutors()31 SyncVecThreadPoolExecutors& getSyncVecThreadPoolExecutors() {
32 static Indestructible<SyncVecThreadPoolExecutors> storage;
33 return *storage;
34 }
35
registerThreadPoolExecutor(ThreadPoolExecutor * tpe)36 void ThreadPoolExecutor::registerThreadPoolExecutor(ThreadPoolExecutor* tpe) {
37 getSyncVecThreadPoolExecutors().wlock()->push_back(tpe);
38 }
39
deregisterThreadPoolExecutor(ThreadPoolExecutor * tpe)40 void ThreadPoolExecutor::deregisterThreadPoolExecutor(ThreadPoolExecutor* tpe) {
41 getSyncVecThreadPoolExecutors().withWLock([tpe](auto& tpes) {
42 tpes.erase(std::remove(tpes.begin(), tpes.end(), tpe), tpes.end());
43 });
44 }
45
46 DEFINE_int64(
47 threadtimeout_ms,
48 60000,
49 "Idle time before ThreadPoolExecutor threads are joined");
50
ThreadPoolExecutor(size_t,size_t minThreads,std::shared_ptr<ThreadFactory> threadFactory)51 ThreadPoolExecutor::ThreadPoolExecutor(
52 size_t /* maxThreads */,
53 size_t minThreads,
54 std::shared_ptr<ThreadFactory> threadFactory)
55 : threadFactory_(std::move(threadFactory)),
56 taskStatsCallbacks_(std::make_shared<TaskStatsCallbackRegistry>()),
57 threadPoolHook_("folly::ThreadPoolExecutor"),
58 minThreads_(minThreads),
59 threadTimeout_(FLAGS_threadtimeout_ms) {
60 namePrefix_ = getNameHelper();
61 }
62
~ThreadPoolExecutor()63 ThreadPoolExecutor::~ThreadPoolExecutor() {
64 joinKeepAliveOnce();
65 CHECK_EQ(0, threadList_.get().size());
66 }
67
Task(Func && func,std::chrono::milliseconds expiration,Func && expireCallback)68 ThreadPoolExecutor::Task::Task(
69 Func&& func, std::chrono::milliseconds expiration, Func&& expireCallback)
70 : func_(std::move(func)),
71 expiration_(expiration),
72 expireCallback_(std::move(expireCallback)),
73 context_(folly::RequestContext::saveContext()) {
74 // Assume that the task in enqueued on creation
75 enqueueTime_ = std::chrono::steady_clock::now();
76 }
77
78 namespace {
79
80 template <class F>
nothrow(const char * name,F && f)81 void nothrow(const char* name, F&& f) {
82 try {
83 f();
84 } catch (const std::exception& e) {
85 LOG(ERROR) << "ThreadPoolExecutor: " << name << " threw unhandled "
86 << typeid(e).name() << " exception: " << e.what();
87 } catch (...) {
88 LOG(ERROR) << "ThreadPoolExecutor: " << name
89 << " threw unhandled non-exception object";
90 }
91 }
92
93 } // namespace
94
runTask(const ThreadPtr & thread,Task && task)95 void ThreadPoolExecutor::runTask(const ThreadPtr& thread, Task&& task) {
96 thread->idle.store(false, std::memory_order_relaxed);
97 auto startTime = std::chrono::steady_clock::now();
98 TaskStats stats;
99 stats.enqueueTime = task.enqueueTime_;
100 if (task.context_) {
101 stats.requestId = task.context_->getRootId();
102 }
103 stats.waitTime = startTime - task.enqueueTime_;
104
105 {
106 folly::RequestContextScopeGuard rctx(task.context_);
107 if (task.expiration_ > std::chrono::milliseconds(0) &&
108 stats.waitTime >= task.expiration_) {
109 task.func_ = nullptr;
110 stats.expired = true;
111 if (task.expireCallback_ != nullptr) {
112 invokeCatchingExns(
113 "ThreadPoolExecutor: expireCallback",
114 std::exchange(task.expireCallback_, {}));
115 }
116 } else {
117 invokeCatchingExns(
118 "ThreadPoolExecutor: func", std::exchange(task.func_, {}));
119 task.expireCallback_ = nullptr;
120 }
121 }
122 if (!stats.expired) {
123 stats.runTime = std::chrono::steady_clock::now() - startTime;
124 }
125
126 // Times in this USDT use granularity of std::chrono::steady_clock::duration,
127 // which is platform dependent. On Facebook servers, the granularity is
128 // nanoseconds. We explicitly do not perform any unit conversions to avoid
129 // unnecessary costs and leave it to consumers of this data to know what
130 // effective clock resolution is.
131 FOLLY_SDT(
132 folly,
133 thread_pool_executor_task_stats,
134 namePrefix_.c_str(),
135 stats.requestId,
136 stats.enqueueTime.time_since_epoch().count(),
137 stats.waitTime.count(),
138 stats.runTime.count());
139
140 thread->idle.store(true, std::memory_order_relaxed);
141 thread->lastActiveTime.store(
142 std::chrono::steady_clock::now(), std::memory_order_relaxed);
143 thread->taskStatsCallbacks->callbackList.withRLock([&](auto& callbacks) {
144 *thread->taskStatsCallbacks->inCallback = true;
145 SCOPE_EXIT { *thread->taskStatsCallbacks->inCallback = false; };
146 try {
147 for (auto& callback : callbacks) {
148 callback(stats);
149 }
150 } catch (const std::exception& e) {
151 LOG(ERROR) << "ThreadPoolExecutor: task stats callback threw "
152 "unhandled "
153 << typeid(e).name() << " exception: " << e.what();
154 } catch (...) {
155 LOG(ERROR) << "ThreadPoolExecutor: task stats callback threw "
156 "unhandled non-exception object";
157 }
158 });
159 }
160
add(Func,std::chrono::milliseconds,Func)161 void ThreadPoolExecutor::add(Func, std::chrono::milliseconds, Func) {
162 throw std::runtime_error(
163 "add() with expiration is not implemented for this Executor");
164 }
165
numThreads() const166 size_t ThreadPoolExecutor::numThreads() const {
167 return maxThreads_.load(std::memory_order_relaxed);
168 }
169
numActiveThreads() const170 size_t ThreadPoolExecutor::numActiveThreads() const {
171 return activeThreads_.load(std::memory_order_relaxed);
172 }
173
174 // Set the maximum number of running threads.
setNumThreads(size_t numThreads)175 void ThreadPoolExecutor::setNumThreads(size_t numThreads) {
176 /* Since ThreadPoolExecutor may be dynamically adjusting the number of
177 threads, we adjust the relevant variables instead of changing
178 the number of threads directly. Roughly:
179
180 If numThreads < minthreads reset minThreads to numThreads.
181
182 If numThreads < active threads, reduce number of running threads.
183
184 If the number of pending tasks is > 0, then increase the currently
185 active number of threads such that we can run all the tasks, or reach
186 numThreads.
187
188 Note that if there are observers, we actually have to create all
189 the threads, because some observer implementations need to 'observe'
190 all thread creation (see tests for an example of this)
191 */
192
193 size_t numThreadsToJoin = 0;
194 {
195 SharedMutex::WriteHolder w{&threadListLock_};
196 auto pending = getPendingTaskCountImpl();
197 maxThreads_.store(numThreads, std::memory_order_relaxed);
198 auto active = activeThreads_.load(std::memory_order_relaxed);
199 auto minthreads = minThreads_.load(std::memory_order_relaxed);
200 if (numThreads < minthreads) {
201 minthreads = numThreads;
202 minThreads_.store(numThreads, std::memory_order_relaxed);
203 }
204 if (active > numThreads) {
205 numThreadsToJoin = active - numThreads;
206 if (numThreadsToJoin > active - minthreads) {
207 numThreadsToJoin = active - minthreads;
208 }
209 ThreadPoolExecutor::removeThreads(numThreadsToJoin, false);
210 activeThreads_.store(
211 active - numThreadsToJoin, std::memory_order_relaxed);
212 } else if (pending > 0 || !observers_.empty() || active < minthreads) {
213 size_t numToAdd = std::min(pending, numThreads - active);
214 if (!observers_.empty()) {
215 numToAdd = numThreads - active;
216 }
217 if (active + numToAdd < minthreads) {
218 numToAdd = minthreads - active;
219 }
220 ThreadPoolExecutor::addThreads(numToAdd);
221 activeThreads_.store(active + numToAdd, std::memory_order_relaxed);
222 }
223 }
224
225 /* We may have removed some threads, attempt to join them */
226 joinStoppedThreads(numThreadsToJoin);
227 }
228
229 // threadListLock_ is writelocked
addThreads(size_t n)230 void ThreadPoolExecutor::addThreads(size_t n) {
231 std::vector<ThreadPtr> newThreads;
232 for (size_t i = 0; i < n; i++) {
233 newThreads.push_back(makeThread());
234 }
235 for (auto& thread : newThreads) {
236 // TODO need a notion of failing to create the thread
237 // and then handling for that case
238 thread->handle = threadFactory_->newThread(
239 std::bind(&ThreadPoolExecutor::threadRun, this, thread));
240 threadList_.add(thread);
241 }
242 for (auto& thread : newThreads) {
243 thread->startupBaton.wait(
244 folly::Baton<>::wait_options().logging_enabled(false));
245 }
246 for (auto& o : observers_) {
247 for (auto& thread : newThreads) {
248 o->threadStarted(thread.get());
249 }
250 }
251 }
252
253 // threadListLock_ is writelocked
removeThreads(size_t n,bool isJoin)254 void ThreadPoolExecutor::removeThreads(size_t n, bool isJoin) {
255 isJoin_ = isJoin;
256 stopThreads(n);
257 }
258
joinStoppedThreads(size_t n)259 void ThreadPoolExecutor::joinStoppedThreads(size_t n) {
260 for (size_t i = 0; i < n; i++) {
261 auto thread = stoppedThreads_.take();
262 thread->handle.join();
263 }
264 }
265
stop()266 void ThreadPoolExecutor::stop() {
267 joinKeepAliveOnce();
268 size_t n = 0;
269 {
270 SharedMutex::WriteHolder w{&threadListLock_};
271 maxThreads_.store(0, std::memory_order_release);
272 activeThreads_.store(0, std::memory_order_release);
273 n = threadList_.get().size();
274 removeThreads(n, false);
275 n += threadsToJoin_.load(std::memory_order_relaxed);
276 threadsToJoin_.store(0, std::memory_order_relaxed);
277 }
278 joinStoppedThreads(n);
279 CHECK_EQ(0, threadList_.get().size());
280 CHECK_EQ(0, stoppedThreads_.size());
281 }
282
join()283 void ThreadPoolExecutor::join() {
284 joinKeepAliveOnce();
285 size_t n = 0;
286 {
287 SharedMutex::WriteHolder w{&threadListLock_};
288 maxThreads_.store(0, std::memory_order_release);
289 activeThreads_.store(0, std::memory_order_release);
290 n = threadList_.get().size();
291 removeThreads(n, true);
292 n += threadsToJoin_.load(std::memory_order_relaxed);
293 threadsToJoin_.store(0, std::memory_order_relaxed);
294 }
295 joinStoppedThreads(n);
296 CHECK_EQ(0, threadList_.get().size());
297 CHECK_EQ(0, stoppedThreads_.size());
298 }
299
withAll(FunctionRef<void (ThreadPoolExecutor &)> f)300 void ThreadPoolExecutor::withAll(FunctionRef<void(ThreadPoolExecutor&)> f) {
301 getSyncVecThreadPoolExecutors().withRLock([f](auto& tpes) {
302 for (auto tpe : tpes) {
303 f(*tpe);
304 }
305 });
306 }
307
getPoolStats() const308 ThreadPoolExecutor::PoolStats ThreadPoolExecutor::getPoolStats() const {
309 const auto now = std::chrono::steady_clock::now();
310 SharedMutex::ReadHolder r{&threadListLock_};
311 ThreadPoolExecutor::PoolStats stats;
312 size_t activeTasks = 0;
313 size_t idleAlive = 0;
314 for (const auto& thread : threadList_.get()) {
315 if (thread->idle.load(std::memory_order_relaxed)) {
316 const std::chrono::nanoseconds idleTime =
317 now - thread->lastActiveTime.load(std::memory_order_relaxed);
318 stats.maxIdleTime = std::max(stats.maxIdleTime, idleTime);
319 idleAlive++;
320 } else {
321 activeTasks++;
322 }
323 }
324 stats.pendingTaskCount = getPendingTaskCountImpl();
325 stats.totalTaskCount = stats.pendingTaskCount + activeTasks;
326
327 stats.threadCount = maxThreads_.load(std::memory_order_relaxed);
328 stats.activeThreadCount =
329 activeThreads_.load(std::memory_order_relaxed) - idleAlive;
330 stats.idleThreadCount = stats.threadCount - stats.activeThreadCount;
331 return stats;
332 }
333
getPendingTaskCount() const334 size_t ThreadPoolExecutor::getPendingTaskCount() const {
335 SharedMutex::ReadHolder r{&threadListLock_};
336 return getPendingTaskCountImpl();
337 }
338
getName() const339 const std::string& ThreadPoolExecutor::getName() const {
340 return namePrefix_;
341 }
342
getNameHelper() const343 std::string ThreadPoolExecutor::getNameHelper() const {
344 auto ntf = dynamic_cast<NamedThreadFactory*>(threadFactory_.get());
345 if (ntf == nullptr) {
346 return folly::demangle(typeid(*this).name()).toStdString();
347 }
348 return ntf->getNamePrefix();
349 }
350
351 std::atomic<uint64_t> ThreadPoolExecutor::Thread::nextId(0);
352
usedCpuTime() const353 std::chrono::nanoseconds ThreadPoolExecutor::Thread::usedCpuTime() const {
354 using std::chrono::nanoseconds;
355 using std::chrono::seconds;
356 timespec tp{};
357 #ifdef __linux__
358 clockid_t clockid;
359 auto th = const_cast<std::thread&>(handle).native_handle();
360 if (!pthread_getcpuclockid(th, &clockid)) {
361 clock_gettime(clockid, &tp);
362 }
363 #endif
364 return nanoseconds(tp.tv_nsec) + seconds(tp.tv_sec);
365 }
366
subscribeToTaskStats(TaskStatsCallback cb)367 void ThreadPoolExecutor::subscribeToTaskStats(TaskStatsCallback cb) {
368 if (*taskStatsCallbacks_->inCallback) {
369 throw std::runtime_error("cannot subscribe in task stats callback");
370 }
371 taskStatsCallbacks_->callbackList.wlock()->push_back(std::move(cb));
372 }
373
add(ThreadPoolExecutor::ThreadPtr item)374 BlockingQueueAddResult ThreadPoolExecutor::StoppedThreadQueue::add(
375 ThreadPoolExecutor::ThreadPtr item) {
376 std::lock_guard<std::mutex> guard(mutex_);
377 queue_.push(std::move(item));
378 return sem_.post();
379 }
380
take()381 ThreadPoolExecutor::ThreadPtr ThreadPoolExecutor::StoppedThreadQueue::take() {
382 while (true) {
383 {
384 std::lock_guard<std::mutex> guard(mutex_);
385 if (!queue_.empty()) {
386 auto item = std::move(queue_.front());
387 queue_.pop();
388 return item;
389 }
390 }
391 sem_.wait();
392 }
393 }
394
395 folly::Optional<ThreadPoolExecutor::ThreadPtr>
try_take_for(std::chrono::milliseconds time)396 ThreadPoolExecutor::StoppedThreadQueue::try_take_for(
397 std::chrono::milliseconds time) {
398 while (true) {
399 {
400 std::lock_guard<std::mutex> guard(mutex_);
401 if (!queue_.empty()) {
402 auto item = std::move(queue_.front());
403 queue_.pop();
404 return item;
405 }
406 }
407 if (!sem_.try_wait_for(time)) {
408 return folly::none;
409 }
410 }
411 }
412
size()413 size_t ThreadPoolExecutor::StoppedThreadQueue::size() {
414 std::lock_guard<std::mutex> guard(mutex_);
415 return queue_.size();
416 }
417
addObserver(std::shared_ptr<Observer> o)418 void ThreadPoolExecutor::addObserver(std::shared_ptr<Observer> o) {
419 {
420 SharedMutex::WriteHolder r{&threadListLock_};
421 observers_.push_back(o);
422 for (auto& thread : threadList_.get()) {
423 o->threadPreviouslyStarted(thread.get());
424 }
425 }
426 while (activeThreads_.load(std::memory_order_relaxed) <
427 maxThreads_.load(std::memory_order_relaxed)) {
428 ensureActiveThreads();
429 }
430 }
431
removeObserver(std::shared_ptr<Observer> o)432 void ThreadPoolExecutor::removeObserver(std::shared_ptr<Observer> o) {
433 SharedMutex::WriteHolder r{&threadListLock_};
434 for (auto& thread : threadList_.get()) {
435 o->threadNotYetStopped(thread.get());
436 }
437
438 for (auto it = observers_.begin(); it != observers_.end(); it++) {
439 if (*it == o) {
440 observers_.erase(it);
441 return;
442 }
443 }
444 DCHECK(false);
445 }
446
447 // Idle threads may have destroyed themselves, attempt to join
448 // them here
ensureJoined()449 void ThreadPoolExecutor::ensureJoined() {
450 auto tojoin = threadsToJoin_.load(std::memory_order_relaxed);
451 if (tojoin) {
452 {
453 SharedMutex::WriteHolder w{&threadListLock_};
454 tojoin = threadsToJoin_.load(std::memory_order_relaxed);
455 threadsToJoin_.store(0, std::memory_order_relaxed);
456 }
457 joinStoppedThreads(tojoin);
458 }
459 }
460
461 // threadListLock_ must be write locked.
tryTimeoutThread()462 bool ThreadPoolExecutor::tryTimeoutThread() {
463 // Try to stop based on idle thread timeout (try_take_for),
464 // if there are at least minThreads running.
465 if (!minActive()) {
466 return false;
467 }
468
469 // Remove thread from active count
470 activeThreads_.store(
471 activeThreads_.load(std::memory_order_relaxed) - 1,
472 std::memory_order_relaxed);
473
474 // There is a memory ordering constraint w.r.t the queue
475 // implementation's add() and getPendingTaskCountImpl() - while many
476 // queues have seq_cst ordering, some do not, so add an explicit
477 // barrier. tryTimeoutThread is the slow path and only happens once
478 // every thread timeout; use asymmetric barrier to keep add() fast.
479 asymmetricHeavyBarrier();
480
481 // If this is based on idle thread timeout, then
482 // adjust vars appropriately (otherwise stop() or join()
483 // does this).
484 if (getPendingTaskCountImpl() > 0) {
485 // There are still pending tasks, we can't stop yet.
486 // re-up active threads and return.
487 activeThreads_.store(
488 activeThreads_.load(std::memory_order_relaxed) + 1,
489 std::memory_order_relaxed);
490 return false;
491 }
492
493 threadsToJoin_.store(
494 threadsToJoin_.load(std::memory_order_relaxed) + 1,
495 std::memory_order_relaxed);
496
497 return true;
498 }
499
500 // If we can't ensure that we were able to hand off a task to a thread,
501 // attempt to start a thread that handled the task, if we aren't already
502 // running the maximum number of threads.
ensureActiveThreads()503 void ThreadPoolExecutor::ensureActiveThreads() {
504 ensureJoined();
505
506 // Matches barrier in tryTimeoutThread(). Ensure task added
507 // is seen before loading activeThreads_ below.
508 asymmetricLightBarrier();
509
510 // Fast path assuming we are already at max threads.
511 auto active = activeThreads_.load(std::memory_order_relaxed);
512 auto total = maxThreads_.load(std::memory_order_relaxed);
513
514 if (active >= total) {
515 return;
516 }
517
518 SharedMutex::WriteHolder w{&threadListLock_};
519 // Double check behind lock.
520 active = activeThreads_.load(std::memory_order_relaxed);
521 total = maxThreads_.load(std::memory_order_relaxed);
522 if (active >= total) {
523 return;
524 }
525 ThreadPoolExecutor::addThreads(1);
526 activeThreads_.store(active + 1, std::memory_order_relaxed);
527 }
528
529 // If an idle thread times out, only join it if there are at least
530 // minThreads threads.
minActive()531 bool ThreadPoolExecutor::minActive() {
532 return activeThreads_.load(std::memory_order_relaxed) >
533 minThreads_.load(std::memory_order_relaxed);
534 }
535
536 } // namespace folly
537