1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <folly/executors/ThreadPoolExecutor.h>
18 
19 #include <ctime>
20 
21 #include <folly/executors/GlobalThreadPoolList.h>
22 #include <folly/portability/PThread.h>
23 #include <folly/synchronization/AsymmetricMemoryBarrier.h>
24 #include <folly/tracing/StaticTracepoint.h>
25 
26 namespace folly {
27 
28 using SyncVecThreadPoolExecutors =
29     folly::Synchronized<std::vector<ThreadPoolExecutor*>>;
30 
getSyncVecThreadPoolExecutors()31 SyncVecThreadPoolExecutors& getSyncVecThreadPoolExecutors() {
32   static Indestructible<SyncVecThreadPoolExecutors> storage;
33   return *storage;
34 }
35 
registerThreadPoolExecutor(ThreadPoolExecutor * tpe)36 void ThreadPoolExecutor::registerThreadPoolExecutor(ThreadPoolExecutor* tpe) {
37   getSyncVecThreadPoolExecutors().wlock()->push_back(tpe);
38 }
39 
deregisterThreadPoolExecutor(ThreadPoolExecutor * tpe)40 void ThreadPoolExecutor::deregisterThreadPoolExecutor(ThreadPoolExecutor* tpe) {
41   getSyncVecThreadPoolExecutors().withWLock([tpe](auto& tpes) {
42     tpes.erase(std::remove(tpes.begin(), tpes.end(), tpe), tpes.end());
43   });
44 }
45 
46 DEFINE_int64(
47     threadtimeout_ms,
48     60000,
49     "Idle time before ThreadPoolExecutor threads are joined");
50 
ThreadPoolExecutor(size_t,size_t minThreads,std::shared_ptr<ThreadFactory> threadFactory)51 ThreadPoolExecutor::ThreadPoolExecutor(
52     size_t /* maxThreads */,
53     size_t minThreads,
54     std::shared_ptr<ThreadFactory> threadFactory)
55     : threadFactory_(std::move(threadFactory)),
56       taskStatsCallbacks_(std::make_shared<TaskStatsCallbackRegistry>()),
57       threadPoolHook_("folly::ThreadPoolExecutor"),
58       minThreads_(minThreads),
59       threadTimeout_(FLAGS_threadtimeout_ms) {
60   namePrefix_ = getNameHelper();
61 }
62 
~ThreadPoolExecutor()63 ThreadPoolExecutor::~ThreadPoolExecutor() {
64   joinKeepAliveOnce();
65   CHECK_EQ(0, threadList_.get().size());
66 }
67 
Task(Func && func,std::chrono::milliseconds expiration,Func && expireCallback)68 ThreadPoolExecutor::Task::Task(
69     Func&& func, std::chrono::milliseconds expiration, Func&& expireCallback)
70     : func_(std::move(func)),
71       expiration_(expiration),
72       expireCallback_(std::move(expireCallback)),
73       context_(folly::RequestContext::saveContext()) {
74   // Assume that the task in enqueued on creation
75   enqueueTime_ = std::chrono::steady_clock::now();
76 }
77 
78 namespace {
79 
80 template <class F>
nothrow(const char * name,F && f)81 void nothrow(const char* name, F&& f) {
82   try {
83     f();
84   } catch (const std::exception& e) {
85     LOG(ERROR) << "ThreadPoolExecutor: " << name << " threw unhandled "
86                << typeid(e).name() << " exception: " << e.what();
87   } catch (...) {
88     LOG(ERROR) << "ThreadPoolExecutor: " << name
89                << " threw unhandled non-exception object";
90   }
91 }
92 
93 } // namespace
94 
runTask(const ThreadPtr & thread,Task && task)95 void ThreadPoolExecutor::runTask(const ThreadPtr& thread, Task&& task) {
96   thread->idle.store(false, std::memory_order_relaxed);
97   auto startTime = std::chrono::steady_clock::now();
98   TaskStats stats;
99   stats.enqueueTime = task.enqueueTime_;
100   if (task.context_) {
101     stats.requestId = task.context_->getRootId();
102   }
103   stats.waitTime = startTime - task.enqueueTime_;
104 
105   {
106     folly::RequestContextScopeGuard rctx(task.context_);
107     if (task.expiration_ > std::chrono::milliseconds(0) &&
108         stats.waitTime >= task.expiration_) {
109       task.func_ = nullptr;
110       stats.expired = true;
111       if (task.expireCallback_ != nullptr) {
112         invokeCatchingExns(
113             "ThreadPoolExecutor: expireCallback",
114             std::exchange(task.expireCallback_, {}));
115       }
116     } else {
117       invokeCatchingExns(
118           "ThreadPoolExecutor: func", std::exchange(task.func_, {}));
119       task.expireCallback_ = nullptr;
120     }
121   }
122   if (!stats.expired) {
123     stats.runTime = std::chrono::steady_clock::now() - startTime;
124   }
125 
126   // Times in this USDT use granularity of std::chrono::steady_clock::duration,
127   // which is platform dependent. On Facebook servers, the granularity is
128   // nanoseconds. We explicitly do not perform any unit conversions to avoid
129   // unnecessary costs and leave it to consumers of this data to know what
130   // effective clock resolution is.
131   FOLLY_SDT(
132       folly,
133       thread_pool_executor_task_stats,
134       namePrefix_.c_str(),
135       stats.requestId,
136       stats.enqueueTime.time_since_epoch().count(),
137       stats.waitTime.count(),
138       stats.runTime.count());
139 
140   thread->idle.store(true, std::memory_order_relaxed);
141   thread->lastActiveTime.store(
142       std::chrono::steady_clock::now(), std::memory_order_relaxed);
143   thread->taskStatsCallbacks->callbackList.withRLock([&](auto& callbacks) {
144     *thread->taskStatsCallbacks->inCallback = true;
145     SCOPE_EXIT { *thread->taskStatsCallbacks->inCallback = false; };
146     try {
147       for (auto& callback : callbacks) {
148         callback(stats);
149       }
150     } catch (const std::exception& e) {
151       LOG(ERROR) << "ThreadPoolExecutor: task stats callback threw "
152                     "unhandled "
153                  << typeid(e).name() << " exception: " << e.what();
154     } catch (...) {
155       LOG(ERROR) << "ThreadPoolExecutor: task stats callback threw "
156                     "unhandled non-exception object";
157     }
158   });
159 }
160 
add(Func,std::chrono::milliseconds,Func)161 void ThreadPoolExecutor::add(Func, std::chrono::milliseconds, Func) {
162   throw std::runtime_error(
163       "add() with expiration is not implemented for this Executor");
164 }
165 
numThreads() const166 size_t ThreadPoolExecutor::numThreads() const {
167   return maxThreads_.load(std::memory_order_relaxed);
168 }
169 
numActiveThreads() const170 size_t ThreadPoolExecutor::numActiveThreads() const {
171   return activeThreads_.load(std::memory_order_relaxed);
172 }
173 
174 // Set the maximum number of running threads.
setNumThreads(size_t numThreads)175 void ThreadPoolExecutor::setNumThreads(size_t numThreads) {
176   /* Since ThreadPoolExecutor may be dynamically adjusting the number of
177      threads, we adjust the relevant variables instead of changing
178      the number of threads directly.  Roughly:
179 
180      If numThreads < minthreads reset minThreads to numThreads.
181 
182      If numThreads < active threads, reduce number of running threads.
183 
184      If the number of pending tasks is > 0, then increase the currently
185      active number of threads such that we can run all the tasks, or reach
186      numThreads.
187 
188      Note that if there are observers, we actually have to create all
189      the threads, because some observer implementations need to 'observe'
190      all thread creation (see tests for an example of this)
191   */
192 
193   size_t numThreadsToJoin = 0;
194   {
195     SharedMutex::WriteHolder w{&threadListLock_};
196     auto pending = getPendingTaskCountImpl();
197     maxThreads_.store(numThreads, std::memory_order_relaxed);
198     auto active = activeThreads_.load(std::memory_order_relaxed);
199     auto minthreads = minThreads_.load(std::memory_order_relaxed);
200     if (numThreads < minthreads) {
201       minthreads = numThreads;
202       minThreads_.store(numThreads, std::memory_order_relaxed);
203     }
204     if (active > numThreads) {
205       numThreadsToJoin = active - numThreads;
206       if (numThreadsToJoin > active - minthreads) {
207         numThreadsToJoin = active - minthreads;
208       }
209       ThreadPoolExecutor::removeThreads(numThreadsToJoin, false);
210       activeThreads_.store(
211           active - numThreadsToJoin, std::memory_order_relaxed);
212     } else if (pending > 0 || !observers_.empty() || active < minthreads) {
213       size_t numToAdd = std::min(pending, numThreads - active);
214       if (!observers_.empty()) {
215         numToAdd = numThreads - active;
216       }
217       if (active + numToAdd < minthreads) {
218         numToAdd = minthreads - active;
219       }
220       ThreadPoolExecutor::addThreads(numToAdd);
221       activeThreads_.store(active + numToAdd, std::memory_order_relaxed);
222     }
223   }
224 
225   /* We may have removed some threads, attempt to join them */
226   joinStoppedThreads(numThreadsToJoin);
227 }
228 
229 // threadListLock_ is writelocked
addThreads(size_t n)230 void ThreadPoolExecutor::addThreads(size_t n) {
231   std::vector<ThreadPtr> newThreads;
232   for (size_t i = 0; i < n; i++) {
233     newThreads.push_back(makeThread());
234   }
235   for (auto& thread : newThreads) {
236     // TODO need a notion of failing to create the thread
237     // and then handling for that case
238     thread->handle = threadFactory_->newThread(
239         std::bind(&ThreadPoolExecutor::threadRun, this, thread));
240     threadList_.add(thread);
241   }
242   for (auto& thread : newThreads) {
243     thread->startupBaton.wait(
244         folly::Baton<>::wait_options().logging_enabled(false));
245   }
246   for (auto& o : observers_) {
247     for (auto& thread : newThreads) {
248       o->threadStarted(thread.get());
249     }
250   }
251 }
252 
253 // threadListLock_ is writelocked
removeThreads(size_t n,bool isJoin)254 void ThreadPoolExecutor::removeThreads(size_t n, bool isJoin) {
255   isJoin_ = isJoin;
256   stopThreads(n);
257 }
258 
joinStoppedThreads(size_t n)259 void ThreadPoolExecutor::joinStoppedThreads(size_t n) {
260   for (size_t i = 0; i < n; i++) {
261     auto thread = stoppedThreads_.take();
262     thread->handle.join();
263   }
264 }
265 
stop()266 void ThreadPoolExecutor::stop() {
267   joinKeepAliveOnce();
268   size_t n = 0;
269   {
270     SharedMutex::WriteHolder w{&threadListLock_};
271     maxThreads_.store(0, std::memory_order_release);
272     activeThreads_.store(0, std::memory_order_release);
273     n = threadList_.get().size();
274     removeThreads(n, false);
275     n += threadsToJoin_.load(std::memory_order_relaxed);
276     threadsToJoin_.store(0, std::memory_order_relaxed);
277   }
278   joinStoppedThreads(n);
279   CHECK_EQ(0, threadList_.get().size());
280   CHECK_EQ(0, stoppedThreads_.size());
281 }
282 
join()283 void ThreadPoolExecutor::join() {
284   joinKeepAliveOnce();
285   size_t n = 0;
286   {
287     SharedMutex::WriteHolder w{&threadListLock_};
288     maxThreads_.store(0, std::memory_order_release);
289     activeThreads_.store(0, std::memory_order_release);
290     n = threadList_.get().size();
291     removeThreads(n, true);
292     n += threadsToJoin_.load(std::memory_order_relaxed);
293     threadsToJoin_.store(0, std::memory_order_relaxed);
294   }
295   joinStoppedThreads(n);
296   CHECK_EQ(0, threadList_.get().size());
297   CHECK_EQ(0, stoppedThreads_.size());
298 }
299 
withAll(FunctionRef<void (ThreadPoolExecutor &)> f)300 void ThreadPoolExecutor::withAll(FunctionRef<void(ThreadPoolExecutor&)> f) {
301   getSyncVecThreadPoolExecutors().withRLock([f](auto& tpes) {
302     for (auto tpe : tpes) {
303       f(*tpe);
304     }
305   });
306 }
307 
getPoolStats() const308 ThreadPoolExecutor::PoolStats ThreadPoolExecutor::getPoolStats() const {
309   const auto now = std::chrono::steady_clock::now();
310   SharedMutex::ReadHolder r{&threadListLock_};
311   ThreadPoolExecutor::PoolStats stats;
312   size_t activeTasks = 0;
313   size_t idleAlive = 0;
314   for (const auto& thread : threadList_.get()) {
315     if (thread->idle.load(std::memory_order_relaxed)) {
316       const std::chrono::nanoseconds idleTime =
317           now - thread->lastActiveTime.load(std::memory_order_relaxed);
318       stats.maxIdleTime = std::max(stats.maxIdleTime, idleTime);
319       idleAlive++;
320     } else {
321       activeTasks++;
322     }
323   }
324   stats.pendingTaskCount = getPendingTaskCountImpl();
325   stats.totalTaskCount = stats.pendingTaskCount + activeTasks;
326 
327   stats.threadCount = maxThreads_.load(std::memory_order_relaxed);
328   stats.activeThreadCount =
329       activeThreads_.load(std::memory_order_relaxed) - idleAlive;
330   stats.idleThreadCount = stats.threadCount - stats.activeThreadCount;
331   return stats;
332 }
333 
getPendingTaskCount() const334 size_t ThreadPoolExecutor::getPendingTaskCount() const {
335   SharedMutex::ReadHolder r{&threadListLock_};
336   return getPendingTaskCountImpl();
337 }
338 
getName() const339 const std::string& ThreadPoolExecutor::getName() const {
340   return namePrefix_;
341 }
342 
getNameHelper() const343 std::string ThreadPoolExecutor::getNameHelper() const {
344   auto ntf = dynamic_cast<NamedThreadFactory*>(threadFactory_.get());
345   if (ntf == nullptr) {
346     return folly::demangle(typeid(*this).name()).toStdString();
347   }
348   return ntf->getNamePrefix();
349 }
350 
351 std::atomic<uint64_t> ThreadPoolExecutor::Thread::nextId(0);
352 
usedCpuTime() const353 std::chrono::nanoseconds ThreadPoolExecutor::Thread::usedCpuTime() const {
354   using std::chrono::nanoseconds;
355   using std::chrono::seconds;
356   timespec tp{};
357 #ifdef __linux__
358   clockid_t clockid;
359   auto th = const_cast<std::thread&>(handle).native_handle();
360   if (!pthread_getcpuclockid(th, &clockid)) {
361     clock_gettime(clockid, &tp);
362   }
363 #endif
364   return nanoseconds(tp.tv_nsec) + seconds(tp.tv_sec);
365 }
366 
subscribeToTaskStats(TaskStatsCallback cb)367 void ThreadPoolExecutor::subscribeToTaskStats(TaskStatsCallback cb) {
368   if (*taskStatsCallbacks_->inCallback) {
369     throw std::runtime_error("cannot subscribe in task stats callback");
370   }
371   taskStatsCallbacks_->callbackList.wlock()->push_back(std::move(cb));
372 }
373 
add(ThreadPoolExecutor::ThreadPtr item)374 BlockingQueueAddResult ThreadPoolExecutor::StoppedThreadQueue::add(
375     ThreadPoolExecutor::ThreadPtr item) {
376   std::lock_guard<std::mutex> guard(mutex_);
377   queue_.push(std::move(item));
378   return sem_.post();
379 }
380 
take()381 ThreadPoolExecutor::ThreadPtr ThreadPoolExecutor::StoppedThreadQueue::take() {
382   while (true) {
383     {
384       std::lock_guard<std::mutex> guard(mutex_);
385       if (!queue_.empty()) {
386         auto item = std::move(queue_.front());
387         queue_.pop();
388         return item;
389       }
390     }
391     sem_.wait();
392   }
393 }
394 
395 folly::Optional<ThreadPoolExecutor::ThreadPtr>
try_take_for(std::chrono::milliseconds time)396 ThreadPoolExecutor::StoppedThreadQueue::try_take_for(
397     std::chrono::milliseconds time) {
398   while (true) {
399     {
400       std::lock_guard<std::mutex> guard(mutex_);
401       if (!queue_.empty()) {
402         auto item = std::move(queue_.front());
403         queue_.pop();
404         return item;
405       }
406     }
407     if (!sem_.try_wait_for(time)) {
408       return folly::none;
409     }
410   }
411 }
412 
size()413 size_t ThreadPoolExecutor::StoppedThreadQueue::size() {
414   std::lock_guard<std::mutex> guard(mutex_);
415   return queue_.size();
416 }
417 
addObserver(std::shared_ptr<Observer> o)418 void ThreadPoolExecutor::addObserver(std::shared_ptr<Observer> o) {
419   {
420     SharedMutex::WriteHolder r{&threadListLock_};
421     observers_.push_back(o);
422     for (auto& thread : threadList_.get()) {
423       o->threadPreviouslyStarted(thread.get());
424     }
425   }
426   while (activeThreads_.load(std::memory_order_relaxed) <
427          maxThreads_.load(std::memory_order_relaxed)) {
428     ensureActiveThreads();
429   }
430 }
431 
removeObserver(std::shared_ptr<Observer> o)432 void ThreadPoolExecutor::removeObserver(std::shared_ptr<Observer> o) {
433   SharedMutex::WriteHolder r{&threadListLock_};
434   for (auto& thread : threadList_.get()) {
435     o->threadNotYetStopped(thread.get());
436   }
437 
438   for (auto it = observers_.begin(); it != observers_.end(); it++) {
439     if (*it == o) {
440       observers_.erase(it);
441       return;
442     }
443   }
444   DCHECK(false);
445 }
446 
447 // Idle threads may have destroyed themselves, attempt to join
448 // them here
ensureJoined()449 void ThreadPoolExecutor::ensureJoined() {
450   auto tojoin = threadsToJoin_.load(std::memory_order_relaxed);
451   if (tojoin) {
452     {
453       SharedMutex::WriteHolder w{&threadListLock_};
454       tojoin = threadsToJoin_.load(std::memory_order_relaxed);
455       threadsToJoin_.store(0, std::memory_order_relaxed);
456     }
457     joinStoppedThreads(tojoin);
458   }
459 }
460 
461 // threadListLock_ must be write locked.
tryTimeoutThread()462 bool ThreadPoolExecutor::tryTimeoutThread() {
463   // Try to stop based on idle thread timeout (try_take_for),
464   // if there are at least minThreads running.
465   if (!minActive()) {
466     return false;
467   }
468 
469   // Remove thread from active count
470   activeThreads_.store(
471       activeThreads_.load(std::memory_order_relaxed) - 1,
472       std::memory_order_relaxed);
473 
474   // There is a memory ordering constraint w.r.t the queue
475   // implementation's add() and getPendingTaskCountImpl() - while many
476   // queues have seq_cst ordering, some do not, so add an explicit
477   // barrier.  tryTimeoutThread is the slow path and only happens once
478   // every thread timeout; use asymmetric barrier to keep add() fast.
479   asymmetricHeavyBarrier();
480 
481   // If this is based on idle thread timeout, then
482   // adjust vars appropriately (otherwise stop() or join()
483   // does this).
484   if (getPendingTaskCountImpl() > 0) {
485     // There are still pending tasks, we can't stop yet.
486     // re-up active threads and return.
487     activeThreads_.store(
488         activeThreads_.load(std::memory_order_relaxed) + 1,
489         std::memory_order_relaxed);
490     return false;
491   }
492 
493   threadsToJoin_.store(
494       threadsToJoin_.load(std::memory_order_relaxed) + 1,
495       std::memory_order_relaxed);
496 
497   return true;
498 }
499 
500 // If we can't ensure that we were able to hand off a task to a thread,
501 // attempt to start a thread that handled the task, if we aren't already
502 // running the maximum number of threads.
ensureActiveThreads()503 void ThreadPoolExecutor::ensureActiveThreads() {
504   ensureJoined();
505 
506   // Matches barrier in tryTimeoutThread().  Ensure task added
507   // is seen before loading activeThreads_ below.
508   asymmetricLightBarrier();
509 
510   // Fast path assuming we are already at max threads.
511   auto active = activeThreads_.load(std::memory_order_relaxed);
512   auto total = maxThreads_.load(std::memory_order_relaxed);
513 
514   if (active >= total) {
515     return;
516   }
517 
518   SharedMutex::WriteHolder w{&threadListLock_};
519   // Double check behind lock.
520   active = activeThreads_.load(std::memory_order_relaxed);
521   total = maxThreads_.load(std::memory_order_relaxed);
522   if (active >= total) {
523     return;
524   }
525   ThreadPoolExecutor::addThreads(1);
526   activeThreads_.store(active + 1, std::memory_order_relaxed);
527 }
528 
529 // If an idle thread times out, only join it if there are at least
530 // minThreads threads.
minActive()531 bool ThreadPoolExecutor::minActive() {
532   return activeThreads_.load(std::memory_order_relaxed) >
533       minThreads_.load(std::memory_order_relaxed);
534 }
535 
536 } // namespace folly
537