1 /*
2 * Copyright (c) Facebook, Inc. and its affiliates.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #pragma once
18
19 #include <sys/types.h>
20
21 #include <array>
22 #include <chrono>
23 #include <functional>
24 #include <memory>
25 #include <optional>
26 #include <string>
27
28 #include <folly/DefaultKeepAliveExecutor.h>
29 #include <folly/Executor.h>
30 #include <folly/Portability.h>
31 #include <folly/executors/Codel.h>
32 #include <folly/executors/QueueObserver.h>
33 #include <folly/io/async/Request.h>
34 #include <folly/portability/GFlags.h>
35 #include <folly/portability/Unistd.h>
36 #include <folly/synchronization/LifoSem.h>
37
38 #include <thrift/lib/cpp/concurrency/FunctionRunner.h>
39 #include <thrift/lib/cpp/concurrency/PosixThreadFactory.h>
40 #include <thrift/lib/cpp/concurrency/Thread.h>
41 #include <thrift/lib/cpp/concurrency/Util.h>
42
43 DECLARE_bool(codel_enabled);
44
45 namespace apache {
46 namespace thrift {
47 namespace concurrency {
48
49 class Runnable;
50 class ThreadFactory;
51 class ThreadManagerObserver;
52
53 /**
54 * ThreadManager class
55 *
56 * This class manages a pool of threads. It uses a ThreadFactory to create
57 * threads. It never actually creates or destroys worker threads, rather
58 * It maintains statistics on number of idle threads, number of active threads,
59 * task backlog, and average wait and service times and informs the PoolPolicy
60 * object bound to instances of this manager of interesting transitions. It is
61 * then up the PoolPolicy object to decide if the thread pool size needs to be
62 * adjusted and call this object addWorker and removeWorker methods to make
63 * changes.
64 *
65 * This design allows different policy implementations to used this code to
66 * handle basic worker thread management and worker task execution and focus on
67 * policy issues. The simplest policy, StaticPolicy, does nothing other than
68 * create a fixed number of threads.
69 */
70 class ThreadManager : public virtual folly::Executor {
71 protected:
ThreadManager()72 ThreadManager() {}
73
74 public:
75 using PRIORITY = apache::thrift::concurrency::PRIORITY;
76
77 static const size_t DEFAULT_MAX_QUEUE_SIZE = 1 << 16; // should be power of 2
78
79 class Task;
80 typedef std::function<void(std::shared_ptr<Runnable>)> ExpireCallback;
81 typedef std::function<void()> InitCallback;
82
~ThreadManager()83 ~ThreadManager() override {}
84
85 /**
86 * Starts the thread manager. Verifies all attributes have been properly
87 * initialized, then allocates necessary resources to begin operation
88 */
89 virtual void start() = 0;
90
91 /**
92 * Stops the thread manager. Aborts all remaining unprocessed task, shuts
93 * down all created worker threads, and releases all allocated resources.
94 * This method blocks for all worker threads to complete, thus it can
95 * potentially block forever if a worker thread is running a task that
96 * won't terminate.
97 */
98 virtual void stop() = 0;
99
100 /**
101 * Joins the thread manager. This is the same as stop, except that it will
102 * wait until all the tasks have finished, rather than aborting the tasks.
103 */
104 virtual void join() = 0;
105
106 enum STATE {
107 UNINITIALIZED,
108 STARTING,
109 STARTED,
110 JOINING,
111 STOPPING,
112 STOPPED,
113 };
114
115 virtual STATE state() const = 0;
116
117 virtual std::shared_ptr<ThreadFactory> threadFactory() const = 0;
118
119 virtual void threadFactory(std::shared_ptr<ThreadFactory> value) = 0;
120
121 virtual std::string getNamePrefix() const = 0;
122
123 virtual void setNamePrefix(const std::string& name) = 0;
124
125 virtual void addWorker(size_t value = 1) = 0;
126
127 virtual void removeWorker(size_t value = 1) = 0;
128
129 /**
130 * Gets the current number of idle worker threads
131 */
132 virtual size_t idleWorkerCount() const = 0;
133
134 /**
135 * Gets the current number of total worker threads
136 */
137 virtual size_t workerCount() const = 0;
138
139 /**
140 * Gets the current number of pending tasks
141 */
142 virtual size_t pendingTaskCount() const = 0;
143
144 /**
145 * Gets the current number of pending tasks
146 */
147 virtual size_t pendingUpstreamTaskCount() const = 0;
148
149 /**
150 * Gets the current number of pending and executing tasks
151 */
152 virtual size_t totalTaskCount() const = 0;
153
154 /**
155 * Gets the number of tasks which have been expired without being run.
156 */
157 virtual size_t expiredTaskCount() = 0;
158
159 enum class Source {
160 INTERNAL = 0,
161 EXISTING_INTERACTION = 1,
162 UPSTREAM = 2,
163 LAST = UPSTREAM,
164 };
165 static constexpr int N_SOURCES = static_cast<int>(Source::LAST) + 1;
166
167 /**
168 * Adds a task to be executed at some time in the future by a worker thread.
169 *
170 * @param task The task to queue for execution
171 *
172 * @param timeout, this argument is deprecated, add() will always be
173 * non-blocking
174 *
175 * @param expiration when nonzero, the number of milliseconds the task is
176 * valid to be run; if exceeded, the task will be dropped off the queue and
177 * not run.
178 * @param upstream hint whether the task is come from an upstream.
179 * Implementations may prioritize those tasks different than other tasks of
180 * the same priority.
181 */
182 virtual void add(
183 std::shared_ptr<Runnable> task,
184 int64_t timeout,
185 int64_t expiration,
186 Source source) noexcept = 0;
187 void add(
188 std::shared_ptr<Runnable> task,
189 int64_t timeout = 0,
190 int64_t expiration = 0,
191 bool upstream = false) noexcept {
192 add(std::move(task),
193 timeout,
194 expiration,
195 upstream ? Source::UPSTREAM : Source::INTERNAL);
196 }
197
198 /**
199 * Implements folly::Executor::add()
200 */
201 void add(folly::Func f) override = 0;
202
203 /**
204 * Removes a pending task
205 */
206 virtual void remove(std::shared_ptr<Runnable> task) = 0;
207
208 /**
209 * Remove the next pending task which would be run.
210 *
211 * @return the task removed.
212 */
213 virtual std::shared_ptr<Runnable> removeNextPending() = 0;
214
215 /**
216 * Removes all pending tasks.
217 */
218 virtual void clearPending() = 0;
219
220 /**
221 * Set a callback to be called when a task is expired and not run.
222 *
223 * @param expireCallback a function called with the shared_ptr<Runnable> for
224 * the expired task.
225 */
226 virtual void setExpireCallback(ExpireCallback expireCallback) = 0;
227 virtual void setCodelCallback(ExpireCallback expireCallback) = 0;
228
229 /**
230 * Set a callback to be called when a worker thread is created.
231 */
232 virtual void setThreadInitCallback(InitCallback initCallback) = 0;
233
234 /**
235 * Creates a simple thread manager that uses count number of worker threads
236 */
237 static std::shared_ptr<ThreadManager> newSimpleThreadManager(
238 size_t count = 4);
239
240 /**
241 * Creates a simple thread manager that uses count number of worker threads
242 * and sets the name prefix
243 */
244 static std::shared_ptr<ThreadManager> newSimpleThreadManager(
245 const std::string& name, size_t count = 4);
246
247 /**
248 * Creates a thread manager with support for priorities. Unlike
249 * PriorityThreadManager, requests are still served from a single
250 * thread pool.
251 */
252 static std::shared_ptr<ThreadManager> newPriorityQueueThreadManager(
253 size_t numThreads);
254
255 struct RunStats {
256 const std::string& threadPoolName;
257 std::chrono::steady_clock::time_point queueBegin;
258 std::chrono::steady_clock::time_point workBegin;
259 std::chrono::steady_clock::time_point workEnd;
260 };
261
262 class Observer {
263 public:
~Observer()264 virtual ~Observer() {}
265
266 virtual void preRun(folly::RequestContext*) = 0;
267 virtual void postRun(folly::RequestContext*, const RunStats&) = 0;
268 };
269
270 static void setGlobalObserver(std::shared_ptr<Observer> observer);
271
addTaskObserver(std::shared_ptr<Observer>)272 virtual void addTaskObserver(std::shared_ptr<Observer>) {
273 LOG(FATAL) << "Method not implemented";
274 }
275
276 /**
277 * Returns the cpu time used by this thread pool. Requires support for
278 * thread-specific clocks.
279 */
getUsedCpuTime()280 virtual std::chrono::nanoseconds getUsedCpuTime() const {
281 return std::chrono::nanoseconds(0);
282 }
283
284 virtual void enableCodel(bool) = 0;
285
286 virtual folly::Codel* getCodel() = 0;
287
288 /**
289 * This class may be used by getKeepAlive() to decide which executor to
290 * return.
291 */
292 class ExecutionScope {
293 public:
ExecutionScope(PRIORITY priority)294 ExecutionScope(PRIORITY priority) : pri_(priority) {}
295
getPriority()296 PRIORITY getPriority() const { return pri_; }
297
setTenantId(std::optional<uint32_t> tenant)298 void setTenantId(std::optional<uint32_t> tenant) { tenant_id_ = tenant; }
299
getTenantId()300 std::optional<uint32_t> getTenantId() const { return tenant_id_; }
301
302 private:
303 PRIORITY pri_;
304 std::optional<uint32_t> tenant_id_;
305 };
306
307 [[nodiscard]] virtual KeepAlive<> getKeepAlive(
308 ExecutionScope es, Source level) const = 0;
309
310 class Impl;
311 };
312
313 /**
314 * PriorityThreadManager class
315 *
316 * This class extends ThreadManager by adding priorities to tasks.
317 * It is up to the specific implementation to define how worker threads
318 * are assigned to run tasks of various priorities. The current
319 * implementation, PriorityImpl, runs a task on a thread with exactly
320 * the same priority.
321 *
322 * The interface of this class was kept as close to ThreadManager's
323 * as possible, diverging only where the original interface doesn't
324 * make sense in the priority world.
325 */
326 class PriorityThreadManager : public ThreadManager {
327 public:
328 using ThreadManager::addWorker;
329 virtual void addWorker(PRIORITY priority, size_t value) = 0;
330
331 using ThreadManager::removeWorker;
332 virtual void removeWorker(PRIORITY priority, size_t value) = 0;
333
334 using ThreadManager::workerCount;
335 virtual size_t workerCount(PRIORITY priority) = 0;
336
337 using ThreadManager::pendingTaskCount;
338 virtual size_t pendingTaskCount(PRIORITY priority) const = 0;
339
340 using ThreadManager::idleWorkerCount;
341 virtual size_t idleWorkerCount(PRIORITY priority) const = 0;
342
343 using ThreadManager::add;
344 virtual void add(
345 PRIORITY priority,
346 std::shared_ptr<Runnable> task,
347 int64_t timeout,
348 int64_t expiration,
349 ThreadManager::Source source) noexcept = 0;
350 void add(
351 PRIORITY priority,
352 std::shared_ptr<Runnable> task,
353 int64_t timeout = 0,
354 int64_t expiration = 0,
355 bool upstream = false) noexcept {
356 add(priority,
357 std::move(task),
358 timeout,
359 expiration,
360 upstream ? Source::UPSTREAM : Source::INTERNAL);
361 }
362
getNumPriorities()363 uint8_t getNumPriorities() const override { return N_PRIORITIES; }
364
365 using ThreadManager::getCodel;
366 virtual folly::Codel* getCodel(PRIORITY priority) = 0;
367
368 /**
369 * Creates a priority-aware thread manager given thread factory and size for
370 * each priority.
371 *
372 * At least NORMAL_PRIORITY_MINIMUM_THREADS threads are created for
373 * priority NORMAL.
374 */
375 static std::shared_ptr<PriorityThreadManager> newPriorityThreadManager(
376 const std::array<
377 std::pair<std::shared_ptr<ThreadFactory>, size_t>,
378 N_PRIORITIES>& counts);
379
380 /**
381 * Creates a priority-aware thread manager that uses counts[X]
382 * worker threads for priority X.
383 */
384 static std::shared_ptr<PriorityThreadManager> newPriorityThreadManager(
385 const std::array<size_t, N_PRIORITIES>& counts);
386
387 /**
388 * Creates a priority-aware thread manager that uses normalThreadsCount
389 * threads of NORMAL priority, and an implementation-defined number of threads
390 * for other priorities. Useful when the vast majority of tasks are NORMAL,
391 * but occasionally a non-NORMAL task may arrive.
392 *
393 * @param normalThreadsCount - number of threads of NORMAL priority, defaults
394 * to the number of CPUs on the system
395 */
396 static std::shared_ptr<PriorityThreadManager> newPriorityThreadManager(
397 size_t normalThreadsCount = sysconf(_SC_NPROCESSORS_ONLN));
398
399 class PriorityImpl;
400 };
401
402 FOLLY_PUSH_WARNING
403 FOLLY_MSVC_DISABLE_WARNING(4250) // inherits keepAliveAcq/Rel via dominance
404 // Adapter class that converts a folly::Executor to a ThreadManager interface
405 class ThreadManagerExecutorAdapter : public ThreadManager,
406 public folly::DefaultKeepAliveExecutor {
407 public:
408 struct Options {
409 Options() = default;
OptionsOptions410 explicit Options(std::string name) : wrappedExecutorName(std::move(name)) {}
411 std::string wrappedExecutorName;
412 };
413
414 /* implicit */
415 explicit ThreadManagerExecutorAdapter(
416 std::shared_ptr<folly::Executor> exe, Options opts = Options());
417 explicit ThreadManagerExecutorAdapter(
418 folly::Executor::KeepAlive<> ka, Options opts = Options());
419 explicit ThreadManagerExecutorAdapter(
420 std::array<std::shared_ptr<Executor>, N_PRIORITIES>,
421 Options opts = Options());
422
423 ~ThreadManagerExecutorAdapter() override;
424
425 void join() override;
426 void start() override;
427 void stop() override;
state()428 STATE state() const override { return STARTED; }
429
430 std::shared_ptr<ThreadFactory> threadFactory() const override;
431 void threadFactory(std::shared_ptr<ThreadFactory> value) override;
432 std::string getNamePrefix() const override;
433 void setNamePrefix(const std::string& name) override;
434 void addWorker(size_t value = 1) override;
435 void removeWorker(size_t value = 1) override;
436
437 size_t idleWorkerCount() const override;
438 size_t workerCount() const override;
439 size_t pendingUpstreamTaskCount() const override;
440 size_t pendingTaskCount() const override;
441 size_t totalTaskCount() const override;
442 size_t expiredTaskCount() override;
443
444 void add(
445 std::shared_ptr<Runnable> task,
446 int64_t /*timeout*/,
447 int64_t /*expiration*/,
448 ThreadManager::Source source) noexcept override;
449 void add(folly::Func f) override;
450
remove(std::shared_ptr<Runnable>)451 void remove(std::shared_ptr<Runnable> /*task*/) override {}
removeNextPending()452 std::shared_ptr<Runnable> removeNextPending() override { return nullptr; }
clearPending()453 void clearPending() override {}
454
setExpireCallback(ExpireCallback)455 void setExpireCallback(ExpireCallback /*expireCallback*/) override {}
setCodelCallback(ExpireCallback)456 void setCodelCallback(ExpireCallback /*expireCallback*/) override {}
setThreadInitCallback(InitCallback)457 void setThreadInitCallback(InitCallback /*initCallback*/) override {}
enableCodel(bool)458 void enableCodel(bool) override {}
getCodel()459 folly::Codel* getCodel() override { return nullptr; }
460
461 [[nodiscard]] KeepAlive<> getKeepAlive(
462 ExecutionScope es, Source source) const override;
463
464 private:
465 explicit ThreadManagerExecutorAdapter(
466 std::array<folly::Executor*, N_PRIORITIES * N_SOURCES>);
467
468 std::vector<std::shared_ptr<void>> owning_;
469 std::array<folly::Executor*, N_PRIORITIES * N_SOURCES> executors_;
joinKeepAliveOnce()470 void joinKeepAliveOnce() {
471 if (!std::exchange(keepAliveJoined_, true)) {
472 joinKeepAlive();
473 }
474 }
475
476 bool keepAliveJoined_{false};
477 Options opts_;
478 };
479 FOLLY_POP_WARNING
480
481 FOLLY_PUSH_WARNING
482 FOLLY_MSVC_DISABLE_WARNING(4250) // inherits keepAliveAcq/Rel via dominance
483 class SimpleThreadManager : public ThreadManager,
484 public folly::DefaultKeepAliveExecutor {
485 public:
486 explicit SimpleThreadManager(size_t workerCount = 4);
487 ~SimpleThreadManager() override;
488
489 void start() override;
490 void stop() override;
491 void join() override;
492 STATE state() const override;
493 std::shared_ptr<ThreadFactory> threadFactory() const override;
494 void threadFactory(std::shared_ptr<ThreadFactory> value) override;
495 std::string getNamePrefix() const override;
496 void setNamePrefix(const std::string& name) override;
497 void addWorker(size_t value = 1) override;
498 void removeWorker(size_t value = 1) override;
499 size_t idleWorkerCount() const override;
500 size_t workerCount() const override;
501 size_t pendingTaskCount() const override;
502 size_t pendingUpstreamTaskCount() const override;
503 size_t totalTaskCount() const override;
504 size_t expiredTaskCount() override;
505 void add(
506 std::shared_ptr<Runnable> task,
507 int64_t timeout,
508 int64_t expiration,
509 Source source) noexcept override;
510 void add(folly::Func f) override;
511 void remove(std::shared_ptr<Runnable> task) override;
512 std::shared_ptr<Runnable> removeNextPending() override;
513 void clearPending() override;
514 void enableCodel(bool) override;
515 folly::Codel* getCodel() override;
516 void setExpireCallback(ExpireCallback expireCallback) override;
517 void setCodelCallback(ExpireCallback expireCallback) override;
518 void setThreadInitCallback(InitCallback initCallback) override;
519 void addTaskObserver(std::shared_ptr<Observer> observer) override;
520 std::chrono::nanoseconds getUsedCpuTime() const override;
521
522 [[nodiscard]] KeepAlive<> getKeepAlive(
523 ExecutionScope es, Source source) const override;
524
525 private:
joinKeepAliveOnce()526 void joinKeepAliveOnce() {
527 if (!std::exchange(keepAliveJoined_, true)) {
528 joinKeepAlive();
529 }
530 }
531
532 std::unique_ptr<Impl> impl_;
533 bool keepAliveJoined_{false};
534 };
535 FOLLY_POP_WARNING
536
Factory(PosixThreadFactory::THREAD_PRIORITY prio)537 inline std::shared_ptr<ThreadFactory> Factory(
538 PosixThreadFactory::THREAD_PRIORITY prio) {
539 return std::make_shared<PosixThreadFactory>(PosixThreadFactory::OTHER, prio);
540 }
541
542 } // namespace concurrency
543 } // namespace thrift
544 } // namespace apache
545