1 /*========================================================================= 2 * 3 * Copyright Insight Software Consortium 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0.txt 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 *=========================================================================*/ 18 #ifndef itkThreadPool_h 19 #define itkThreadPool_h 20 21 #include "itkConfigure.h" 22 #include "itkIntTypes.h" 23 24 #include <deque> 25 #include <functional> 26 #include <future> 27 #include <condition_variable> 28 #include <thread> 29 30 #include "itkObject.h" 31 #include "itkObjectFactory.h" 32 #include "itkSingletonMacro.h" 33 34 35 namespace itk 36 { 37 38 /** 39 * \class ThreadPool 40 * \brief Thread pool maintains a constant number of threads. 41 * 42 * Thread pool is called and initialized from within the PoolMultiThreader. 43 * Initially the thread pool is started with GlobalDefaultNumberOfThreads. 44 * The jobs are submitted via AddWork method. 45 * 46 * This implementation heavily borrows from: 47 * https://github.com/progschj/ThreadPool 48 * 49 * \ingroup OSSystemObjects 50 * \ingroup ITKCommon 51 */ 52 53 struct ThreadPoolGlobals; 54 55 class ITKCommon_EXPORT ThreadPool : public Object 56 { 57 public: 58 ITK_DISALLOW_COPY_AND_ASSIGN(ThreadPool); 59 60 /** Standard class type aliases. */ 61 using Self = ThreadPool; 62 using Superclass = Object; 63 using Pointer = SmartPointer< Self >; 64 using ConstPointer = SmartPointer<const Self>; 65 66 /** Run-time type information (and related methods). */ 67 itkTypeMacro(ThreadPool, Object); 68 69 /** Returns the global instance */ 70 static Pointer New(); 71 72 /** Returns the global singleton instance of the ThreadPool */ 73 static Pointer GetInstance(); 74 75 /** Add this job to the thread pool queue. 76 * 77 * This method returns an std::future, and calling get() 78 * will block until the result is ready. Example usage: 79 * auto result = pool.AddWork([](int param) { return param; }, 7); 80 * std::cout << result.get() << std::endl; */ 81 template< class Function, class... Arguments > 82 auto 83 AddWork( Function&& function, Arguments&&... arguments ) 84 -> std::future< typename std::result_of< Function( Arguments... ) >::type > 85 { 86 using return_type = typename std::result_of< Function( Arguments... ) >::type; 87 88 auto task = std::make_shared< std::packaged_task< return_type() > >( 89 std::bind( std::forward< Function >( function ), std::forward< Arguments >( arguments )... ) ); 90 91 std::future< return_type > res = task->get_future(); 92 { 93 std::unique_lock< std::mutex > lock( this->GetMutex() ); 94 m_WorkQueue.emplace_back( [task]() { ( *task )(); } ); 95 } 96 m_Condition.notify_one(); 97 return res; 98 } 99 100 /** Can call this method if we want to add extra threads to the pool. */ 101 void AddThreads(ThreadIdType count); 102 GetMaximumNumberOfThreads()103 ThreadIdType GetMaximumNumberOfThreads() const 104 { 105 return static_cast< ThreadIdType >( m_Threads.size() ); 106 } 107 108 /** The approximate number of idle threads. */ 109 int GetNumberOfCurrentlyIdleThreads() const; 110 111 /** Set/Get wait for threads. 112 This function should be used carefully, probably only during static 113 initialization phase to disable waiting for threads when ITK is built as a 114 static library and linked into a shared library (Windows only).*/ 115 static bool GetDoNotWaitForThreads(); 116 static void SetDoNotWaitForThreads(bool doNotWaitForThreads); 117 118 protected: 119 120 /* We need access to the mutex in AddWork, and the variable is only 121 * visible in .cxx file, so this method returns it. */ 122 std::mutex& GetMutex(); 123 124 ThreadPool(); 125 ~ThreadPool() override; 126 127 private: 128 129 /** Only used to synchronize the global variable across static libraries.*/ 130 itkGetGlobalDeclarationMacro(ThreadPoolGlobals, PimplGlobals); 131 132 /** This is a list of jobs submitted to the thread pool. 133 * This is the only place where the jobs are submitted. 134 * Filled by AddWork, emptied by ThreadExecute. */ 135 std::deque< std::function< void() > > m_WorkQueue; 136 137 /** When a thread is idle, it is waiting on m_Condition. 138 * AddWork signals it to resume a (random) thread. */ 139 std::condition_variable m_Condition; 140 141 /** Vector to hold all thread handles. 142 * Thread handles are used to delete (join) the threads. */ 143 std::vector< std::thread > m_Threads; 144 145 /* Has destruction started? */ 146 bool m_Stopping{ false }; 147 148 /** To lock on the internal variables */ 149 static ThreadPoolGlobals * m_PimplGlobals; 150 151 /** The continuously running thread function */ 152 static void ThreadExecute(); 153 }; 154 155 } 156 #endif 157