1 /*=========================================================================
2  *
3  *  Copyright Insight Software Consortium
4  *
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at
8  *
9  *         http://www.apache.org/licenses/LICENSE-2.0.txt
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  *
17  *=========================================================================*/
18 #ifndef itkThreadPool_h
19 #define itkThreadPool_h
20 
21 #include "itkConfigure.h"
22 #include "itkIntTypes.h"
23 
24 #include <deque>
25 #include <functional>
26 #include <future>
27 #include <condition_variable>
28 #include <thread>
29 
30 #include "itkObject.h"
31 #include "itkObjectFactory.h"
32 #include "itkSingletonMacro.h"
33 
34 
35 namespace itk
36 {
37 
38 /**
39  * \class ThreadPool
40  * \brief Thread pool maintains a constant number of threads.
41  *
42  * Thread pool is called and initialized from within the PoolMultiThreader.
43  * Initially the thread pool is started with GlobalDefaultNumberOfThreads.
44  * The jobs are submitted via AddWork method.
45  *
46  * This implementation heavily borrows from:
47  * https://github.com/progschj/ThreadPool
48  *
49  * \ingroup OSSystemObjects
50  * \ingroup ITKCommon
51  */
52 
53 struct ThreadPoolGlobals;
54 
55 class ITKCommon_EXPORT ThreadPool : public Object
56 {
57 public:
58   ITK_DISALLOW_COPY_AND_ASSIGN(ThreadPool);
59 
60   /** Standard class type aliases. */
61   using Self = ThreadPool;
62   using Superclass = Object;
63   using Pointer = SmartPointer< Self >;
64   using ConstPointer = SmartPointer<const Self>;
65 
66   /** Run-time type information (and related methods). */
67   itkTypeMacro(ThreadPool, Object);
68 
69   /** Returns the global instance */
70   static Pointer New();
71 
72   /** Returns the global singleton instance of the ThreadPool */
73   static Pointer GetInstance();
74 
75   /** Add this job to the thread pool queue.
76    *
77    * This method returns an std::future, and calling get()
78    * will block until the result is ready. Example usage:
79    * auto result = pool.AddWork([](int param) { return param; }, 7);
80    * std::cout << result.get() << std::endl; */
81   template< class Function, class... Arguments >
82   auto
83   AddWork( Function&& function, Arguments&&... arguments )
84     -> std::future< typename std::result_of< Function( Arguments... ) >::type >
85   {
86     using return_type = typename std::result_of< Function( Arguments... ) >::type;
87 
88     auto task = std::make_shared< std::packaged_task< return_type() > >(
89       std::bind( std::forward< Function >( function ), std::forward< Arguments >( arguments )... ) );
90 
91     std::future< return_type > res = task->get_future();
92     {
93       std::unique_lock< std::mutex > lock( this->GetMutex() );
94       m_WorkQueue.emplace_back( [task]() { ( *task )(); } );
95     }
96     m_Condition.notify_one();
97     return res;
98   }
99 
100   /** Can call this method if we want to add extra threads to the pool. */
101   void AddThreads(ThreadIdType count);
102 
GetMaximumNumberOfThreads()103   ThreadIdType GetMaximumNumberOfThreads() const
104   {
105     return static_cast< ThreadIdType >( m_Threads.size() );
106   }
107 
108   /** The approximate number of idle threads. */
109   int GetNumberOfCurrentlyIdleThreads() const;
110 
111   /** Set/Get wait for threads.
112   This function should be used carefully, probably only during static
113   initialization phase to disable waiting for threads when ITK is built as a
114   static library and linked into a shared library (Windows only).*/
115   static bool GetDoNotWaitForThreads();
116   static void SetDoNotWaitForThreads(bool doNotWaitForThreads);
117 
118 protected:
119 
120   /* We need access to the mutex in AddWork, and the variable is only
121    * visible in .cxx file, so this method returns it. */
122   std::mutex& GetMutex();
123 
124   ThreadPool();
125   ~ThreadPool() override;
126 
127 private:
128 
129   /** Only used to synchronize the global variable across static libraries.*/
130   itkGetGlobalDeclarationMacro(ThreadPoolGlobals, PimplGlobals);
131 
132   /** This is a list of jobs submitted to the thread pool.
133    * This is the only place where the jobs are submitted.
134    * Filled by AddWork, emptied by ThreadExecute. */
135   std::deque< std::function< void() > > m_WorkQueue;
136 
137   /** When a thread is idle, it is waiting on m_Condition.
138    * AddWork signals it to resume a (random) thread. */
139   std::condition_variable m_Condition;
140 
141   /** Vector to hold all thread handles.
142    * Thread handles are used to delete (join) the threads. */
143   std::vector< std::thread > m_Threads;
144 
145   /* Has destruction started? */
146   bool m_Stopping{ false };
147 
148   /** To lock on the internal variables */
149   static ThreadPoolGlobals * m_PimplGlobals;
150 
151   /** The continuously running thread function */
152   static void ThreadExecute();
153 };
154 
155 }
156 #endif
157