1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <sys/types.h>
20 
21 #include <array>
22 #include <chrono>
23 #include <functional>
24 #include <memory>
25 #include <optional>
26 #include <string>
27 
28 #include <folly/DefaultKeepAliveExecutor.h>
29 #include <folly/Executor.h>
30 #include <folly/Portability.h>
31 #include <folly/executors/Codel.h>
32 #include <folly/executors/QueueObserver.h>
33 #include <folly/io/async/Request.h>
34 #include <folly/portability/GFlags.h>
35 #include <folly/portability/Unistd.h>
36 #include <folly/synchronization/LifoSem.h>
37 
38 #include <thrift/lib/cpp/concurrency/FunctionRunner.h>
39 #include <thrift/lib/cpp/concurrency/PosixThreadFactory.h>
40 #include <thrift/lib/cpp/concurrency/Thread.h>
41 #include <thrift/lib/cpp/concurrency/Util.h>
42 
43 DECLARE_bool(codel_enabled);
44 
45 namespace apache {
46 namespace thrift {
47 namespace concurrency {
48 
49 class Runnable;
50 class ThreadFactory;
51 class ThreadManagerObserver;
52 
53 /**
54  * ThreadManager class
55  *
56  * This class manages a pool of threads. It uses a ThreadFactory to create
57  * threads. It never actually creates or destroys worker threads, rather
58  * It maintains statistics on number of idle threads, number of active threads,
59  * task backlog, and average wait and service times and informs the PoolPolicy
60  * object bound to instances of this manager of interesting transitions. It is
61  * then up the PoolPolicy object to decide if the thread pool size needs to be
62  * adjusted and call this object addWorker and removeWorker methods to make
63  * changes.
64  *
65  * This design allows different policy implementations to used this code to
66  * handle basic worker thread management and worker task execution and focus on
67  * policy issues. The simplest policy, StaticPolicy, does nothing other than
68  * create a fixed number of threads.
69  */
70 class ThreadManager : public virtual folly::Executor {
71  protected:
ThreadManager()72   ThreadManager() {}
73 
74  public:
75   using PRIORITY = apache::thrift::concurrency::PRIORITY;
76 
77   static const size_t DEFAULT_MAX_QUEUE_SIZE = 1 << 16; // should be power of 2
78 
79   class Task;
80   typedef std::function<void(std::shared_ptr<Runnable>)> ExpireCallback;
81   typedef std::function<void()> InitCallback;
82 
~ThreadManager()83   ~ThreadManager() override {}
84 
85   /**
86    * Starts the thread manager. Verifies all attributes have been properly
87    * initialized, then allocates necessary resources to begin operation
88    */
89   virtual void start() = 0;
90 
91   /**
92    * Stops the thread manager. Aborts all remaining unprocessed task, shuts
93    * down all created worker threads, and releases all allocated resources.
94    * This method blocks for all worker threads to complete, thus it can
95    * potentially block forever if a worker thread is running a task that
96    * won't terminate.
97    */
98   virtual void stop() = 0;
99 
100   /**
101    * Joins the thread manager. This is the same as stop, except that it will
102    * wait until all the tasks have finished, rather than aborting the tasks.
103    */
104   virtual void join() = 0;
105 
106   enum STATE {
107     UNINITIALIZED,
108     STARTING,
109     STARTED,
110     JOINING,
111     STOPPING,
112     STOPPED,
113   };
114 
115   virtual STATE state() const = 0;
116 
117   virtual std::shared_ptr<ThreadFactory> threadFactory() const = 0;
118 
119   virtual void threadFactory(std::shared_ptr<ThreadFactory> value) = 0;
120 
121   virtual std::string getNamePrefix() const = 0;
122 
123   virtual void setNamePrefix(const std::string& name) = 0;
124 
125   virtual void addWorker(size_t value = 1) = 0;
126 
127   virtual void removeWorker(size_t value = 1) = 0;
128 
129   /**
130    * Gets the current number of idle worker threads
131    */
132   virtual size_t idleWorkerCount() const = 0;
133 
134   /**
135    * Gets the current number of total worker threads
136    */
137   virtual size_t workerCount() const = 0;
138 
139   /**
140    * Gets the current number of pending tasks
141    */
142   virtual size_t pendingTaskCount() const = 0;
143 
144   /**
145    * Gets the current number of pending tasks
146    */
147   virtual size_t pendingUpstreamTaskCount() const = 0;
148 
149   /**
150    * Gets the current number of pending and executing tasks
151    */
152   virtual size_t totalTaskCount() const = 0;
153 
154   /**
155    * Gets the number of tasks which have been expired without being run.
156    */
157   virtual size_t expiredTaskCount() = 0;
158 
159   enum class Source {
160     INTERNAL = 0,
161     EXISTING_INTERACTION = 1,
162     UPSTREAM = 2,
163     LAST = UPSTREAM,
164   };
165   static constexpr int N_SOURCES = static_cast<int>(Source::LAST) + 1;
166 
167   /**
168    * Adds a task to be executed at some time in the future by a worker thread.
169    *
170    * @param task  The task to queue for execution
171    *
172    * @param timeout, this argument is deprecated, add() will always be
173    * non-blocking
174    *
175    * @param expiration when nonzero, the number of milliseconds the task is
176    * valid to be run; if exceeded, the task will be dropped off the queue and
177    * not run.
178    * @param upstream hint whether the task is come from an upstream.
179    * Implementations may prioritize those tasks different than other tasks of
180    * the same priority.
181    */
182   virtual void add(
183       std::shared_ptr<Runnable> task,
184       int64_t timeout,
185       int64_t expiration,
186       Source source) noexcept = 0;
187   void add(
188       std::shared_ptr<Runnable> task,
189       int64_t timeout = 0,
190       int64_t expiration = 0,
191       bool upstream = false) noexcept {
192     add(std::move(task),
193         timeout,
194         expiration,
195         upstream ? Source::UPSTREAM : Source::INTERNAL);
196   }
197 
198   /**
199    * Implements folly::Executor::add()
200    */
201   void add(folly::Func f) override = 0;
202 
203   /**
204    * Removes a pending task
205    */
206   virtual void remove(std::shared_ptr<Runnable> task) = 0;
207 
208   /**
209    * Remove the next pending task which would be run.
210    *
211    * @return the task removed.
212    */
213   virtual std::shared_ptr<Runnable> removeNextPending() = 0;
214 
215   /**
216    * Removes all pending tasks.
217    */
218   virtual void clearPending() = 0;
219 
220   /**
221    * Set a callback to be called when a task is expired and not run.
222    *
223    * @param expireCallback a function called with the shared_ptr<Runnable> for
224    * the expired task.
225    */
226   virtual void setExpireCallback(ExpireCallback expireCallback) = 0;
227   virtual void setCodelCallback(ExpireCallback expireCallback) = 0;
228 
229   /**
230    * Set a callback to be called when a worker thread is created.
231    */
232   virtual void setThreadInitCallback(InitCallback initCallback) = 0;
233 
234   /**
235    * Creates a simple thread manager that uses count number of worker threads
236    */
237   static std::shared_ptr<ThreadManager> newSimpleThreadManager(
238       size_t count = 4);
239 
240   /**
241    * Creates a simple thread manager that uses count number of worker threads
242    * and sets the name prefix
243    */
244   static std::shared_ptr<ThreadManager> newSimpleThreadManager(
245       const std::string& name, size_t count = 4);
246 
247   /**
248    * Creates a thread manager with support for priorities. Unlike
249    * PriorityThreadManager, requests are still served from a single
250    * thread pool.
251    */
252   static std::shared_ptr<ThreadManager> newPriorityQueueThreadManager(
253       size_t numThreads);
254 
255   struct RunStats {
256     const std::string& threadPoolName;
257     std::chrono::steady_clock::time_point queueBegin;
258     std::chrono::steady_clock::time_point workBegin;
259     std::chrono::steady_clock::time_point workEnd;
260   };
261 
262   class Observer {
263    public:
~Observer()264     virtual ~Observer() {}
265 
266     virtual void preRun(folly::RequestContext*) = 0;
267     virtual void postRun(folly::RequestContext*, const RunStats&) = 0;
268   };
269 
270   static void setGlobalObserver(std::shared_ptr<Observer> observer);
271 
addTaskObserver(std::shared_ptr<Observer>)272   virtual void addTaskObserver(std::shared_ptr<Observer>) {
273     LOG(FATAL) << "Method not implemented";
274   }
275 
276   /**
277    * Returns the cpu time used by this thread pool. Requires support for
278    * thread-specific clocks.
279    */
getUsedCpuTime()280   virtual std::chrono::nanoseconds getUsedCpuTime() const {
281     return std::chrono::nanoseconds(0);
282   }
283 
284   virtual void enableCodel(bool) = 0;
285 
286   virtual folly::Codel* getCodel() = 0;
287 
288   /**
289    * This class may be used by getKeepAlive() to decide which executor to
290    * return.
291    */
292   class ExecutionScope {
293    public:
ExecutionScope(PRIORITY priority)294     ExecutionScope(PRIORITY priority) : pri_(priority) {}
295 
getPriority()296     PRIORITY getPriority() const { return pri_; }
297 
setTenantId(std::optional<uint32_t> tenant)298     void setTenantId(std::optional<uint32_t> tenant) { tenant_id_ = tenant; }
299 
getTenantId()300     std::optional<uint32_t> getTenantId() const { return tenant_id_; }
301 
302    private:
303     PRIORITY pri_;
304     std::optional<uint32_t> tenant_id_;
305   };
306 
307   [[nodiscard]] virtual KeepAlive<> getKeepAlive(
308       ExecutionScope es, Source level) const = 0;
309 
310   class Impl;
311 };
312 
313 /**
314  * PriorityThreadManager class
315  *
316  * This class extends ThreadManager by adding priorities to tasks.
317  * It is up to the specific implementation to define how worker threads
318  * are assigned to run tasks of various priorities. The current
319  * implementation, PriorityImpl, runs a task on a thread with exactly
320  * the same priority.
321  *
322  * The interface of this class was kept as close to ThreadManager's
323  * as possible, diverging only where the original interface doesn't
324  * make sense in the priority world.
325  */
326 class PriorityThreadManager : public ThreadManager {
327  public:
328   using ThreadManager::addWorker;
329   virtual void addWorker(PRIORITY priority, size_t value) = 0;
330 
331   using ThreadManager::removeWorker;
332   virtual void removeWorker(PRIORITY priority, size_t value) = 0;
333 
334   using ThreadManager::workerCount;
335   virtual size_t workerCount(PRIORITY priority) = 0;
336 
337   using ThreadManager::pendingTaskCount;
338   virtual size_t pendingTaskCount(PRIORITY priority) const = 0;
339 
340   using ThreadManager::idleWorkerCount;
341   virtual size_t idleWorkerCount(PRIORITY priority) const = 0;
342 
343   using ThreadManager::add;
344   virtual void add(
345       PRIORITY priority,
346       std::shared_ptr<Runnable> task,
347       int64_t timeout,
348       int64_t expiration,
349       ThreadManager::Source source) noexcept = 0;
350   void add(
351       PRIORITY priority,
352       std::shared_ptr<Runnable> task,
353       int64_t timeout = 0,
354       int64_t expiration = 0,
355       bool upstream = false) noexcept {
356     add(priority,
357         std::move(task),
358         timeout,
359         expiration,
360         upstream ? Source::UPSTREAM : Source::INTERNAL);
361   }
362 
getNumPriorities()363   uint8_t getNumPriorities() const override { return N_PRIORITIES; }
364 
365   using ThreadManager::getCodel;
366   virtual folly::Codel* getCodel(PRIORITY priority) = 0;
367 
368   /**
369    * Creates a priority-aware thread manager given thread factory and size for
370    * each priority.
371    *
372    * At least NORMAL_PRIORITY_MINIMUM_THREADS threads are created for
373    * priority NORMAL.
374    */
375   static std::shared_ptr<PriorityThreadManager> newPriorityThreadManager(
376       const std::array<
377           std::pair<std::shared_ptr<ThreadFactory>, size_t>,
378           N_PRIORITIES>& counts);
379 
380   /**
381    * Creates a priority-aware thread manager that uses counts[X]
382    * worker threads for priority X.
383    */
384   static std::shared_ptr<PriorityThreadManager> newPriorityThreadManager(
385       const std::array<size_t, N_PRIORITIES>& counts);
386 
387   /**
388    * Creates a priority-aware thread manager that uses normalThreadsCount
389    * threads of NORMAL priority, and an implementation-defined number of threads
390    * for other priorities. Useful when the vast majority of tasks are NORMAL,
391    * but occasionally a non-NORMAL task may arrive.
392    *
393    * @param normalThreadsCount - number of threads of NORMAL priority, defaults
394    *          to the number of CPUs on the system
395    */
396   static std::shared_ptr<PriorityThreadManager> newPriorityThreadManager(
397       size_t normalThreadsCount = sysconf(_SC_NPROCESSORS_ONLN));
398 
399   class PriorityImpl;
400 };
401 
402 FOLLY_PUSH_WARNING
403 FOLLY_MSVC_DISABLE_WARNING(4250) // inherits keepAliveAcq/Rel via dominance
404 // Adapter class that converts a folly::Executor to a ThreadManager interface
405 class ThreadManagerExecutorAdapter : public ThreadManager,
406                                      public folly::DefaultKeepAliveExecutor {
407  public:
408   struct Options {
409     Options() = default;
OptionsOptions410     explicit Options(std::string name) : wrappedExecutorName(std::move(name)) {}
411     std::string wrappedExecutorName;
412   };
413 
414   /* implicit */
415   explicit ThreadManagerExecutorAdapter(
416       std::shared_ptr<folly::Executor> exe, Options opts = Options());
417   explicit ThreadManagerExecutorAdapter(
418       folly::Executor::KeepAlive<> ka, Options opts = Options());
419   explicit ThreadManagerExecutorAdapter(
420       std::array<std::shared_ptr<Executor>, N_PRIORITIES>,
421       Options opts = Options());
422 
423   ~ThreadManagerExecutorAdapter() override;
424 
425   void join() override;
426   void start() override;
427   void stop() override;
state()428   STATE state() const override { return STARTED; }
429 
430   std::shared_ptr<ThreadFactory> threadFactory() const override;
431   void threadFactory(std::shared_ptr<ThreadFactory> value) override;
432   std::string getNamePrefix() const override;
433   void setNamePrefix(const std::string& name) override;
434   void addWorker(size_t value = 1) override;
435   void removeWorker(size_t value = 1) override;
436 
437   size_t idleWorkerCount() const override;
438   size_t workerCount() const override;
439   size_t pendingUpstreamTaskCount() const override;
440   size_t pendingTaskCount() const override;
441   size_t totalTaskCount() const override;
442   size_t expiredTaskCount() override;
443 
444   void add(
445       std::shared_ptr<Runnable> task,
446       int64_t /*timeout*/,
447       int64_t /*expiration*/,
448       ThreadManager::Source source) noexcept override;
449   void add(folly::Func f) override;
450 
remove(std::shared_ptr<Runnable>)451   void remove(std::shared_ptr<Runnable> /*task*/) override {}
removeNextPending()452   std::shared_ptr<Runnable> removeNextPending() override { return nullptr; }
clearPending()453   void clearPending() override {}
454 
setExpireCallback(ExpireCallback)455   void setExpireCallback(ExpireCallback /*expireCallback*/) override {}
setCodelCallback(ExpireCallback)456   void setCodelCallback(ExpireCallback /*expireCallback*/) override {}
setThreadInitCallback(InitCallback)457   void setThreadInitCallback(InitCallback /*initCallback*/) override {}
enableCodel(bool)458   void enableCodel(bool) override {}
getCodel()459   folly::Codel* getCodel() override { return nullptr; }
460 
461   [[nodiscard]] KeepAlive<> getKeepAlive(
462       ExecutionScope es, Source source) const override;
463 
464  private:
465   explicit ThreadManagerExecutorAdapter(
466       std::array<folly::Executor*, N_PRIORITIES * N_SOURCES>);
467 
468   std::vector<std::shared_ptr<void>> owning_;
469   std::array<folly::Executor*, N_PRIORITIES * N_SOURCES> executors_;
joinKeepAliveOnce()470   void joinKeepAliveOnce() {
471     if (!std::exchange(keepAliveJoined_, true)) {
472       joinKeepAlive();
473     }
474   }
475 
476   bool keepAliveJoined_{false};
477   Options opts_;
478 };
479 FOLLY_POP_WARNING
480 
481 FOLLY_PUSH_WARNING
482 FOLLY_MSVC_DISABLE_WARNING(4250) // inherits keepAliveAcq/Rel via dominance
483 class SimpleThreadManager : public ThreadManager,
484                             public folly::DefaultKeepAliveExecutor {
485  public:
486   explicit SimpleThreadManager(size_t workerCount = 4);
487   ~SimpleThreadManager() override;
488 
489   void start() override;
490   void stop() override;
491   void join() override;
492   STATE state() const override;
493   std::shared_ptr<ThreadFactory> threadFactory() const override;
494   void threadFactory(std::shared_ptr<ThreadFactory> value) override;
495   std::string getNamePrefix() const override;
496   void setNamePrefix(const std::string& name) override;
497   void addWorker(size_t value = 1) override;
498   void removeWorker(size_t value = 1) override;
499   size_t idleWorkerCount() const override;
500   size_t workerCount() const override;
501   size_t pendingTaskCount() const override;
502   size_t pendingUpstreamTaskCount() const override;
503   size_t totalTaskCount() const override;
504   size_t expiredTaskCount() override;
505   void add(
506       std::shared_ptr<Runnable> task,
507       int64_t timeout,
508       int64_t expiration,
509       Source source) noexcept override;
510   void add(folly::Func f) override;
511   void remove(std::shared_ptr<Runnable> task) override;
512   std::shared_ptr<Runnable> removeNextPending() override;
513   void clearPending() override;
514   void enableCodel(bool) override;
515   folly::Codel* getCodel() override;
516   void setExpireCallback(ExpireCallback expireCallback) override;
517   void setCodelCallback(ExpireCallback expireCallback) override;
518   void setThreadInitCallback(InitCallback initCallback) override;
519   void addTaskObserver(std::shared_ptr<Observer> observer) override;
520   std::chrono::nanoseconds getUsedCpuTime() const override;
521 
522   [[nodiscard]] KeepAlive<> getKeepAlive(
523       ExecutionScope es, Source source) const override;
524 
525  private:
joinKeepAliveOnce()526   void joinKeepAliveOnce() {
527     if (!std::exchange(keepAliveJoined_, true)) {
528       joinKeepAlive();
529     }
530   }
531 
532   std::unique_ptr<Impl> impl_;
533   bool keepAliveJoined_{false};
534 };
535 FOLLY_POP_WARNING
536 
Factory(PosixThreadFactory::THREAD_PRIORITY prio)537 inline std::shared_ptr<ThreadFactory> Factory(
538     PosixThreadFactory::THREAD_PRIORITY prio) {
539   return std::make_shared<PosixThreadFactory>(PosixThreadFactory::OTHER, prio);
540 }
541 
542 } // namespace concurrency
543 } // namespace thrift
544 } // namespace apache
545