1 /* Copyright 2017-present Facebook, Inc.
2  * Licensed under the Apache License, Version 2.0 */
3 #pragma once
4 #include "watchman_system.h" // to avoid system header ordering issue on win32
5 #include <condition_variable>
6 #include <deque>
7 #include <mutex>
8 #include <thread>
9 #include <vector>
10 #include "Future.h"
11 
12 namespace watchman {
13 
14 // Almost the dumbest possible thread pool implementation.
15 // This allows us to set an upper bound on the number of concurrent
16 // tasks that are executed in the thread pool.  Contrast with
17 // std::async which leaves it to the implementation to decide
18 // whether each async invocation spawns a thread or uses a
19 // thread pool with an unspecified number of threads.
20 // Constraining the concurrency is important for watchman so
21 // that we can limit the amount of I/O that we might induce.
22 
23 class ThreadPool : public Executor {
24  public:
25   ThreadPool() = default;
26   ~ThreadPool() override;
27 
28   // Start a thread pool with the specified number of worker threads
29   // and the specified upper bound on the number of queued jobs.
30   // The queue limit is intended as a brake in case the system
31   // is under a heavy backlog, and can also help surface issues
32   // where there a task executing in the pool is blocking on
33   // the results of some other task also running in the thread
34   // pool.
35   void start(size_t numWorkers, size_t maxItems);
36 
37   // Request that the worker threads terminate.
38   // If `join` is true, wait for the worker threads to terminate.
39   void stop(bool join = true);
40 
41   // Run a function in the thread pool.
42   // This queues up the function for asynchronous execution and
43   // may return before func has been executed.
44   // If the thread pool has been stopped, throws a runtime_error.
45   void run(std::function<void()>&& func) override;
46 
47  private:
48   std::vector<std::thread> workers_;
49   std::deque<std::function<void()>> tasks_;
50 
51   std::mutex mutex_;
52   std::condition_variable condition_;
53   bool stopping_{false};
54   size_t maxItems_;
55 
56   void runWorker();
57 };
58 
59 // Return a reference to the shared thread pool for the watchman process.
60 ThreadPool& getThreadPool();
61 }
62