1 /* 2 * Copyright (c) Facebook, Inc. and its affiliates. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #pragma once 18 19 #include <algorithm> 20 #include <mutex> 21 #include <queue> 22 23 #include <glog/logging.h> 24 25 #include <folly/DefaultKeepAliveExecutor.h> 26 #include <folly/Memory.h> 27 #include <folly/SharedMutex.h> 28 #include <folly/executors/GlobalThreadPoolList.h> 29 #include <folly/executors/task_queue/LifoSemMPMCQueue.h> 30 #include <folly/executors/thread_factory/NamedThreadFactory.h> 31 #include <folly/io/async/Request.h> 32 #include <folly/portability/GFlags.h> 33 #include <folly/synchronization/AtomicStruct.h> 34 #include <folly/synchronization/Baton.h> 35 36 namespace folly { 37 38 /* Base class for implementing threadpool based executors. 39 * 40 * Dynamic thread behavior: 41 * 42 * ThreadPoolExecutors may vary their actual running number of threads 43 * between minThreads_ and maxThreads_, tracked by activeThreads_. 44 * The actual implementation of joining an idle thread is left to the 45 * ThreadPoolExecutors' subclass (typically by LifoSem try_take_for 46 * timing out). Idle threads should be removed from threadList_, and 47 * threadsToJoin incremented, and activeThreads_ decremented. 48 * 49 * On task add(), if an executor can garantee there is an active 50 * thread that will handle the task, then nothing needs to be done. 51 * If not, then ensureActiveThreads() should be called to possibly 52 * start another pool thread, up to maxThreads_. 53 * 54 * ensureJoined() is called on add(), such that we can join idle 55 * threads that were destroyed (which can't be joined from 56 * themselves). 57 * 58 * Thread pool stats accounting: 59 * 60 * Derived classes must register instances to keep stats on all thread 61 * pools by calling registerThreadPoolExecutor(this) on constructions 62 * and deregisterThreadPoolExecutor(this) on destruction. 63 * 64 * Registration must be done wherever getPendingTaskCountImpl is implemented 65 * and getPendingTaskCountImpl should be marked 'final' to avoid data races. 66 */ 67 class ThreadPoolExecutor : public DefaultKeepAliveExecutor { 68 public: 69 explicit ThreadPoolExecutor( 70 size_t maxThreads, 71 size_t minThreads, 72 std::shared_ptr<ThreadFactory> threadFactory); 73 74 ~ThreadPoolExecutor() override; 75 76 void add(Func func) override = 0; 77 virtual void add( 78 Func func, std::chrono::milliseconds expiration, Func expireCallback); 79 setThreadFactory(std::shared_ptr<ThreadFactory> threadFactory)80 void setThreadFactory(std::shared_ptr<ThreadFactory> threadFactory) { 81 CHECK(numThreads() == 0); 82 threadFactory_ = std::move(threadFactory); 83 namePrefix_ = getNameHelper(); 84 } 85 getThreadFactory()86 std::shared_ptr<ThreadFactory> getThreadFactory() const { 87 return threadFactory_; 88 } 89 90 size_t numThreads() const; 91 void setNumThreads(size_t numThreads); 92 93 // Return actual number of active threads -- this could be different from 94 // numThreads() due to ThreadPoolExecutor's dynamic behavior. 95 size_t numActiveThreads() const; 96 97 /* 98 * stop() is best effort - there is no guarantee that unexecuted tasks won't 99 * be executed before it returns. Specifically, IOThreadPoolExecutor's stop() 100 * behaves like join(). 101 */ 102 void stop(); 103 void join(); 104 105 /** 106 * Execute f against all ThreadPoolExecutors, primarily for retrieving and 107 * exporting stats. 108 */ 109 static void withAll(FunctionRef<void(ThreadPoolExecutor&)> f); 110 111 struct PoolStats { PoolStatsPoolStats112 PoolStats() 113 : threadCount(0), 114 idleThreadCount(0), 115 activeThreadCount(0), 116 pendingTaskCount(0), 117 totalTaskCount(0), 118 maxIdleTime(0) {} 119 size_t threadCount, idleThreadCount, activeThreadCount; 120 uint64_t pendingTaskCount, totalTaskCount; 121 std::chrono::nanoseconds maxIdleTime; 122 }; 123 124 PoolStats getPoolStats() const; 125 size_t getPendingTaskCount() const; 126 const std::string& getName() const; 127 128 /** 129 * Return the cumulative CPU time used by all threads in the pool, including 130 * those that are no longer alive. Requires system support for per-thread CPU 131 * clocks. If not available, the function returns 0. This operation can be 132 * expensive. 133 */ getUsedCpuTime()134 std::chrono::nanoseconds getUsedCpuTime() const { 135 SharedMutex::ReadHolder r{&threadListLock_}; 136 return threadList_.getUsedCpuTime(); 137 } 138 139 struct TaskStats { TaskStatsTaskStats140 TaskStats() : expired(false), waitTime(0), runTime(0), requestId(0) {} 141 bool expired; 142 std::chrono::nanoseconds waitTime; 143 std::chrono::nanoseconds runTime; 144 std::chrono::steady_clock::time_point enqueueTime; 145 uint64_t requestId; 146 }; 147 148 using TaskStatsCallback = std::function<void(TaskStats)>; 149 void subscribeToTaskStats(TaskStatsCallback cb); 150 151 /** 152 * Base class for threads created with ThreadPoolExecutor. 153 * Some subclasses have methods that operate on these 154 * handles. 155 */ 156 class ThreadHandle { 157 public: 158 virtual ~ThreadHandle() = default; 159 }; 160 161 /** 162 * Observer interface for thread start/stop. 163 * Provides hooks so actions can be taken when 164 * threads are created 165 */ 166 class Observer { 167 public: 168 virtual void threadStarted(ThreadHandle*) = 0; 169 virtual void threadStopped(ThreadHandle*) = 0; threadPreviouslyStarted(ThreadHandle * h)170 virtual void threadPreviouslyStarted(ThreadHandle* h) { threadStarted(h); } threadNotYetStopped(ThreadHandle * h)171 virtual void threadNotYetStopped(ThreadHandle* h) { threadStopped(h); } 172 virtual ~Observer() = default; 173 }; 174 175 void addObserver(std::shared_ptr<Observer>); 176 void removeObserver(std::shared_ptr<Observer>); 177 setThreadDeathTimeout(std::chrono::milliseconds timeout)178 void setThreadDeathTimeout(std::chrono::milliseconds timeout) { 179 threadTimeout_ = timeout; 180 } 181 182 protected: 183 // Prerequisite: threadListLock_ writelocked 184 void addThreads(size_t n); 185 // Prerequisite: threadListLock_ writelocked 186 void removeThreads(size_t n, bool isJoin); 187 188 struct TaskStatsCallbackRegistry; 189 190 struct // alignas(folly::cacheline_align_v)191 alignas(folly::cacheline_align_v) // 192 alignas(folly::AtomicStruct<std::chrono::steady_clock::time_point>) // 193 Thread : public ThreadHandle { 194 explicit Thread(ThreadPoolExecutor* pool) 195 : id(nextId++), 196 handle(), 197 idle(true), 198 lastActiveTime(std::chrono::steady_clock::now()), 199 taskStatsCallbacks(pool->taskStatsCallbacks_) {} 200 201 ~Thread() override = default; 202 203 std::chrono::nanoseconds usedCpuTime() const; 204 205 static std::atomic<uint64_t> nextId; 206 uint64_t id; 207 std::thread handle; 208 std::atomic<bool> idle; 209 folly::AtomicStruct<std::chrono::steady_clock::time_point> lastActiveTime; 210 folly::Baton<> startupBaton; 211 std::shared_ptr<TaskStatsCallbackRegistry> taskStatsCallbacks; 212 }; 213 214 typedef std::shared_ptr<Thread> ThreadPtr; 215 216 struct Task { 217 explicit Task( 218 Func&& func, 219 std::chrono::milliseconds expiration, 220 Func&& expireCallback); 221 Func func_; 222 std::chrono::steady_clock::time_point enqueueTime_; 223 std::chrono::milliseconds expiration_; 224 Func expireCallback_; 225 std::shared_ptr<folly::RequestContext> context_; 226 }; 227 228 void runTask(const ThreadPtr& thread, Task&& task); 229 230 // The function that will be bound to pool threads. It must call 231 // thread->startupBaton.post() when it's ready to consume work. 232 virtual void threadRun(ThreadPtr thread) = 0; 233 234 // Stop n threads and put their ThreadPtrs in the stoppedThreads_ queue 235 // and remove them from threadList_, either synchronize or asynchronize 236 // Prerequisite: threadListLock_ writelocked 237 virtual void stopThreads(size_t n) = 0; 238 239 // Join n stopped threads and remove them from waitingForJoinThreads_ queue. 240 // Should not hold a lock because joining thread operation may invoke some 241 // cleanup operations on the thread, and those cleanup operations may 242 // require a lock on ThreadPoolExecutor. 243 void joinStoppedThreads(size_t n); 244 245 // Create a suitable Thread struct makeThread()246 virtual ThreadPtr makeThread() { return std::make_shared<Thread>(this); } 247 248 static void registerThreadPoolExecutor(ThreadPoolExecutor* tpe); 249 static void deregisterThreadPoolExecutor(ThreadPoolExecutor* tpe); 250 251 // Prerequisite: threadListLock_ readlocked or writelocked 252 virtual size_t getPendingTaskCountImpl() const = 0; 253 254 class ThreadList { 255 public: add(const ThreadPtr & state)256 void add(const ThreadPtr& state) { 257 auto it = std::lower_bound(vec_.begin(), vec_.end(), state, Compare{}); 258 vec_.insert(it, state); 259 } 260 remove(const ThreadPtr & state)261 void remove(const ThreadPtr& state) { 262 auto itPair = 263 std::equal_range(vec_.begin(), vec_.end(), state, Compare{}); 264 CHECK(itPair.first != vec_.end()); 265 CHECK(std::next(itPair.first) == itPair.second); 266 vec_.erase(itPair.first); 267 pastCpuUsed_ += state->usedCpuTime(); 268 } 269 contains(const ThreadPtr & ts)270 bool contains(const ThreadPtr& ts) const { 271 return std::binary_search(vec_.cbegin(), vec_.cend(), ts, Compare{}); 272 } 273 get()274 const std::vector<ThreadPtr>& get() const { return vec_; } 275 getUsedCpuTime()276 std::chrono::nanoseconds getUsedCpuTime() const { 277 auto acc{pastCpuUsed_}; 278 for (const auto& thread : vec_) { 279 acc += thread->usedCpuTime(); 280 } 281 return acc; 282 } 283 284 private: 285 struct Compare { operatorCompare286 bool operator()(const ThreadPtr& ts1, const ThreadPtr& ts2) const { 287 return ts1->id < ts2->id; 288 } 289 }; 290 std::vector<ThreadPtr> vec_; 291 // cpu time used by threads that are no longer alive 292 std::chrono::nanoseconds pastCpuUsed_{0}; 293 }; 294 295 class StoppedThreadQueue : public BlockingQueue<ThreadPtr> { 296 public: 297 BlockingQueueAddResult add(ThreadPtr item) override; 298 ThreadPtr take() override; 299 size_t size() override; 300 folly::Optional<ThreadPtr> try_take_for( 301 std::chrono::milliseconds /*timeout */) override; 302 303 private: 304 folly::LifoSem sem_; 305 std::mutex mutex_; 306 std::queue<ThreadPtr> queue_; 307 }; 308 309 std::string getNameHelper() const; 310 311 std::shared_ptr<ThreadFactory> threadFactory_; 312 std::string namePrefix_; 313 314 ThreadList threadList_; 315 SharedMutex threadListLock_; 316 StoppedThreadQueue stoppedThreads_; 317 std::atomic<bool> isJoin_{false}; // whether the current downsizing is a join 318 319 struct TaskStatsCallbackRegistry { 320 folly::ThreadLocal<bool> inCallback; 321 folly::Synchronized<std::vector<TaskStatsCallback>> callbackList; 322 }; 323 std::shared_ptr<TaskStatsCallbackRegistry> taskStatsCallbacks_; 324 std::vector<std::shared_ptr<Observer>> observers_; 325 folly::ThreadPoolListHook threadPoolHook_; 326 327 // Dynamic thread sizing functions and variables 328 void ensureActiveThreads(); 329 void ensureJoined(); 330 bool minActive(); 331 bool tryTimeoutThread(); 332 333 // These are only modified while holding threadListLock_, but 334 // are read without holding the lock. 335 std::atomic<size_t> maxThreads_{0}; 336 std::atomic<size_t> minThreads_{0}; 337 std::atomic<size_t> activeThreads_{0}; 338 339 std::atomic<size_t> threadsToJoin_{0}; 340 std::chrono::milliseconds threadTimeout_{0}; 341 joinKeepAliveOnce()342 void joinKeepAliveOnce() { 343 if (!std::exchange(keepAliveJoined_, true)) { 344 joinKeepAlive(); 345 } 346 } 347 348 bool keepAliveJoined_{false}; 349 }; 350 351 } // namespace folly 352