1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <algorithm>
20 #include <mutex>
21 #include <queue>
22 
23 #include <glog/logging.h>
24 
25 #include <folly/DefaultKeepAliveExecutor.h>
26 #include <folly/Memory.h>
27 #include <folly/SharedMutex.h>
28 #include <folly/executors/GlobalThreadPoolList.h>
29 #include <folly/executors/task_queue/LifoSemMPMCQueue.h>
30 #include <folly/executors/thread_factory/NamedThreadFactory.h>
31 #include <folly/io/async/Request.h>
32 #include <folly/portability/GFlags.h>
33 #include <folly/synchronization/AtomicStruct.h>
34 #include <folly/synchronization/Baton.h>
35 
36 namespace folly {
37 
38 /* Base class for implementing threadpool based executors.
39  *
40  * Dynamic thread behavior:
41  *
42  * ThreadPoolExecutors may vary their actual running number of threads
43  * between minThreads_ and maxThreads_, tracked by activeThreads_.
44  * The actual implementation of joining an idle thread is left to the
45  * ThreadPoolExecutors' subclass (typically by LifoSem try_take_for
46  * timing out).  Idle threads should be removed from threadList_, and
47  * threadsToJoin incremented, and activeThreads_ decremented.
48  *
49  * On task add(), if an executor can garantee there is an active
50  * thread that will handle the task, then nothing needs to be done.
51  * If not, then ensureActiveThreads() should be called to possibly
52  * start another pool thread, up to maxThreads_.
53  *
54  * ensureJoined() is called on add(), such that we can join idle
55  * threads that were destroyed (which can't be joined from
56  * themselves).
57  *
58  * Thread pool stats accounting:
59  *
60  * Derived classes must register instances to keep stats on all thread
61  * pools by calling registerThreadPoolExecutor(this) on constructions
62  * and deregisterThreadPoolExecutor(this) on destruction.
63  *
64  * Registration must be done wherever getPendingTaskCountImpl is implemented
65  * and getPendingTaskCountImpl should be marked 'final' to avoid data races.
66  */
67 class ThreadPoolExecutor : public DefaultKeepAliveExecutor {
68  public:
69   explicit ThreadPoolExecutor(
70       size_t maxThreads,
71       size_t minThreads,
72       std::shared_ptr<ThreadFactory> threadFactory);
73 
74   ~ThreadPoolExecutor() override;
75 
76   void add(Func func) override = 0;
77   virtual void add(
78       Func func, std::chrono::milliseconds expiration, Func expireCallback);
79 
setThreadFactory(std::shared_ptr<ThreadFactory> threadFactory)80   void setThreadFactory(std::shared_ptr<ThreadFactory> threadFactory) {
81     CHECK(numThreads() == 0);
82     threadFactory_ = std::move(threadFactory);
83     namePrefix_ = getNameHelper();
84   }
85 
getThreadFactory()86   std::shared_ptr<ThreadFactory> getThreadFactory() const {
87     return threadFactory_;
88   }
89 
90   size_t numThreads() const;
91   void setNumThreads(size_t numThreads);
92 
93   // Return actual number of active threads -- this could be different from
94   // numThreads() due to ThreadPoolExecutor's dynamic behavior.
95   size_t numActiveThreads() const;
96 
97   /*
98    * stop() is best effort - there is no guarantee that unexecuted tasks won't
99    * be executed before it returns. Specifically, IOThreadPoolExecutor's stop()
100    * behaves like join().
101    */
102   void stop();
103   void join();
104 
105   /**
106    * Execute f against all ThreadPoolExecutors, primarily for retrieving and
107    * exporting stats.
108    */
109   static void withAll(FunctionRef<void(ThreadPoolExecutor&)> f);
110 
111   struct PoolStats {
PoolStatsPoolStats112     PoolStats()
113         : threadCount(0),
114           idleThreadCount(0),
115           activeThreadCount(0),
116           pendingTaskCount(0),
117           totalTaskCount(0),
118           maxIdleTime(0) {}
119     size_t threadCount, idleThreadCount, activeThreadCount;
120     uint64_t pendingTaskCount, totalTaskCount;
121     std::chrono::nanoseconds maxIdleTime;
122   };
123 
124   PoolStats getPoolStats() const;
125   size_t getPendingTaskCount() const;
126   const std::string& getName() const;
127 
128   /**
129    * Return the cumulative CPU time used by all threads in the pool, including
130    * those that are no longer alive. Requires system support for per-thread CPU
131    * clocks. If not available, the function returns 0. This operation can be
132    * expensive.
133    */
getUsedCpuTime()134   std::chrono::nanoseconds getUsedCpuTime() const {
135     SharedMutex::ReadHolder r{&threadListLock_};
136     return threadList_.getUsedCpuTime();
137   }
138 
139   struct TaskStats {
TaskStatsTaskStats140     TaskStats() : expired(false), waitTime(0), runTime(0), requestId(0) {}
141     bool expired;
142     std::chrono::nanoseconds waitTime;
143     std::chrono::nanoseconds runTime;
144     std::chrono::steady_clock::time_point enqueueTime;
145     uint64_t requestId;
146   };
147 
148   using TaskStatsCallback = std::function<void(TaskStats)>;
149   void subscribeToTaskStats(TaskStatsCallback cb);
150 
151   /**
152    * Base class for threads created with ThreadPoolExecutor.
153    * Some subclasses have methods that operate on these
154    * handles.
155    */
156   class ThreadHandle {
157    public:
158     virtual ~ThreadHandle() = default;
159   };
160 
161   /**
162    * Observer interface for thread start/stop.
163    * Provides hooks so actions can be taken when
164    * threads are created
165    */
166   class Observer {
167    public:
168     virtual void threadStarted(ThreadHandle*) = 0;
169     virtual void threadStopped(ThreadHandle*) = 0;
threadPreviouslyStarted(ThreadHandle * h)170     virtual void threadPreviouslyStarted(ThreadHandle* h) { threadStarted(h); }
threadNotYetStopped(ThreadHandle * h)171     virtual void threadNotYetStopped(ThreadHandle* h) { threadStopped(h); }
172     virtual ~Observer() = default;
173   };
174 
175   void addObserver(std::shared_ptr<Observer>);
176   void removeObserver(std::shared_ptr<Observer>);
177 
setThreadDeathTimeout(std::chrono::milliseconds timeout)178   void setThreadDeathTimeout(std::chrono::milliseconds timeout) {
179     threadTimeout_ = timeout;
180   }
181 
182  protected:
183   // Prerequisite: threadListLock_ writelocked
184   void addThreads(size_t n);
185   // Prerequisite: threadListLock_ writelocked
186   void removeThreads(size_t n, bool isJoin);
187 
188   struct TaskStatsCallbackRegistry;
189 
190   struct //
alignas(folly::cacheline_align_v)191       alignas(folly::cacheline_align_v) //
192       alignas(folly::AtomicStruct<std::chrono::steady_clock::time_point>) //
193       Thread : public ThreadHandle {
194     explicit Thread(ThreadPoolExecutor* pool)
195         : id(nextId++),
196           handle(),
197           idle(true),
198           lastActiveTime(std::chrono::steady_clock::now()),
199           taskStatsCallbacks(pool->taskStatsCallbacks_) {}
200 
201     ~Thread() override = default;
202 
203     std::chrono::nanoseconds usedCpuTime() const;
204 
205     static std::atomic<uint64_t> nextId;
206     uint64_t id;
207     std::thread handle;
208     std::atomic<bool> idle;
209     folly::AtomicStruct<std::chrono::steady_clock::time_point> lastActiveTime;
210     folly::Baton<> startupBaton;
211     std::shared_ptr<TaskStatsCallbackRegistry> taskStatsCallbacks;
212   };
213 
214   typedef std::shared_ptr<Thread> ThreadPtr;
215 
216   struct Task {
217     explicit Task(
218         Func&& func,
219         std::chrono::milliseconds expiration,
220         Func&& expireCallback);
221     Func func_;
222     std::chrono::steady_clock::time_point enqueueTime_;
223     std::chrono::milliseconds expiration_;
224     Func expireCallback_;
225     std::shared_ptr<folly::RequestContext> context_;
226   };
227 
228   void runTask(const ThreadPtr& thread, Task&& task);
229 
230   // The function that will be bound to pool threads. It must call
231   // thread->startupBaton.post() when it's ready to consume work.
232   virtual void threadRun(ThreadPtr thread) = 0;
233 
234   // Stop n threads and put their ThreadPtrs in the stoppedThreads_ queue
235   // and remove them from threadList_, either synchronize or asynchronize
236   // Prerequisite: threadListLock_ writelocked
237   virtual void stopThreads(size_t n) = 0;
238 
239   // Join n stopped threads and remove them from waitingForJoinThreads_ queue.
240   // Should not hold a lock because joining thread operation may invoke some
241   // cleanup operations on the thread, and those cleanup operations may
242   // require a lock on ThreadPoolExecutor.
243   void joinStoppedThreads(size_t n);
244 
245   // Create a suitable Thread struct
makeThread()246   virtual ThreadPtr makeThread() { return std::make_shared<Thread>(this); }
247 
248   static void registerThreadPoolExecutor(ThreadPoolExecutor* tpe);
249   static void deregisterThreadPoolExecutor(ThreadPoolExecutor* tpe);
250 
251   // Prerequisite: threadListLock_ readlocked or writelocked
252   virtual size_t getPendingTaskCountImpl() const = 0;
253 
254   class ThreadList {
255    public:
add(const ThreadPtr & state)256     void add(const ThreadPtr& state) {
257       auto it = std::lower_bound(vec_.begin(), vec_.end(), state, Compare{});
258       vec_.insert(it, state);
259     }
260 
remove(const ThreadPtr & state)261     void remove(const ThreadPtr& state) {
262       auto itPair =
263           std::equal_range(vec_.begin(), vec_.end(), state, Compare{});
264       CHECK(itPair.first != vec_.end());
265       CHECK(std::next(itPair.first) == itPair.second);
266       vec_.erase(itPair.first);
267       pastCpuUsed_ += state->usedCpuTime();
268     }
269 
contains(const ThreadPtr & ts)270     bool contains(const ThreadPtr& ts) const {
271       return std::binary_search(vec_.cbegin(), vec_.cend(), ts, Compare{});
272     }
273 
get()274     const std::vector<ThreadPtr>& get() const { return vec_; }
275 
getUsedCpuTime()276     std::chrono::nanoseconds getUsedCpuTime() const {
277       auto acc{pastCpuUsed_};
278       for (const auto& thread : vec_) {
279         acc += thread->usedCpuTime();
280       }
281       return acc;
282     }
283 
284    private:
285     struct Compare {
operatorCompare286       bool operator()(const ThreadPtr& ts1, const ThreadPtr& ts2) const {
287         return ts1->id < ts2->id;
288       }
289     };
290     std::vector<ThreadPtr> vec_;
291     // cpu time used by threads that are no longer alive
292     std::chrono::nanoseconds pastCpuUsed_{0};
293   };
294 
295   class StoppedThreadQueue : public BlockingQueue<ThreadPtr> {
296    public:
297     BlockingQueueAddResult add(ThreadPtr item) override;
298     ThreadPtr take() override;
299     size_t size() override;
300     folly::Optional<ThreadPtr> try_take_for(
301         std::chrono::milliseconds /*timeout */) override;
302 
303    private:
304     folly::LifoSem sem_;
305     std::mutex mutex_;
306     std::queue<ThreadPtr> queue_;
307   };
308 
309   std::string getNameHelper() const;
310 
311   std::shared_ptr<ThreadFactory> threadFactory_;
312   std::string namePrefix_;
313 
314   ThreadList threadList_;
315   SharedMutex threadListLock_;
316   StoppedThreadQueue stoppedThreads_;
317   std::atomic<bool> isJoin_{false}; // whether the current downsizing is a join
318 
319   struct TaskStatsCallbackRegistry {
320     folly::ThreadLocal<bool> inCallback;
321     folly::Synchronized<std::vector<TaskStatsCallback>> callbackList;
322   };
323   std::shared_ptr<TaskStatsCallbackRegistry> taskStatsCallbacks_;
324   std::vector<std::shared_ptr<Observer>> observers_;
325   folly::ThreadPoolListHook threadPoolHook_;
326 
327   // Dynamic thread sizing functions and variables
328   void ensureActiveThreads();
329   void ensureJoined();
330   bool minActive();
331   bool tryTimeoutThread();
332 
333   // These are only modified while holding threadListLock_, but
334   // are read without holding the lock.
335   std::atomic<size_t> maxThreads_{0};
336   std::atomic<size_t> minThreads_{0};
337   std::atomic<size_t> activeThreads_{0};
338 
339   std::atomic<size_t> threadsToJoin_{0};
340   std::chrono::milliseconds threadTimeout_{0};
341 
joinKeepAliveOnce()342   void joinKeepAliveOnce() {
343     if (!std::exchange(keepAliveJoined_, true)) {
344       joinKeepAlive();
345     }
346   }
347 
348   bool keepAliveJoined_{false};
349 };
350 
351 } // namespace folly
352