1 /* Copyright (C) 2014 InfiniDB, Inc. 2 3 This program is free software; you can redistribute it and/or 4 modify it under the terms of the GNU General Public License 5 as published by the Free Software Foundation; version 2 of 6 the License. 7 8 This program is distributed in the hope that it will be useful, 9 but WITHOUT ANY WARRANTY; without even the implied warranty of 10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11 GNU General Public License for more details. 12 13 You should have received a copy of the GNU General Public License 14 along with this program; if not, write to the Free Software 15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, 16 MA 02110-1301, USA. */ 17 18 /*********************************************************************** 19 * $Id: $ 20 * 21 * 22 ***********************************************************************/ 23 /** @file */ 24 25 #ifndef WEIGHTEDTHREADPOOL_H 26 #define WEIGHTEDTHREADPOOL_H 27 28 #include <string> 29 #include <iostream> 30 #include <cstdlib> 31 #include <sstream> 32 #include <stdexcept> 33 #include <boost/thread/thread.hpp> 34 #include <boost/thread/mutex.hpp> 35 #include <boost/thread/condition.hpp> 36 #include <boost/bind.hpp> 37 #include <boost/shared_ptr.hpp> 38 #include <boost/function.hpp> 39 40 namespace threadpool 41 { 42 /** @brief ThreadPool is a component for working with pools of threads and asynchronously 43 * executing tasks. It is responsible for creating threads and tracking which threads are "busy" 44 * and which are idle. Idle threads are utilized as "work" is added to the system. 45 */ 46 47 class WeightedThreadPool 48 { 49 public: 50 typedef boost::function0<int> Functor_T; 51 52 /********************************************* 53 * ctor/dtor 54 * 55 *********************************************/ 56 57 /** @brief ctor 58 */ 59 WeightedThreadPool(); 60 61 /** @brief ctor 62 * 63 * @param maxThreads the maximum number of threads in this pool. This is the maximum number 64 * of simultaneuous operations that can go on. 65 * @param queueSize the maximum number of work tasks in the queue. This is the maximum 66 * number of jobs that can queue up in the work list before invoke() blocks. 67 */ 68 explicit WeightedThreadPool( size_t maxThreadWeight, size_t maxThreads, size_t queueSize ); 69 70 /** @brief dtor 71 */ 72 ~WeightedThreadPool() throw(); 73 74 75 /********************************************* 76 * accessors/mutators 77 * 78 *********************************************/ 79 /** @brief set the work queue size 80 * 81 * @param queueSize the size of the work queue 82 */ 83 void setQueueSize( size_t queueSize ); 84 85 /** @brief fet the work queue size 86 */ getQueueSize()87 inline size_t getQueueSize() const 88 { 89 return fQueueSize; 90 } 91 92 /** @brief set the maximum number of threads to be used to process 93 * the work queue 94 * 95 * @param maxThreads the maximum number of threads 96 */ 97 void setMaxThreads( size_t maxThreads ); 98 99 /** @brief get the maximum number of threads 100 */ getMaxThreads()101 inline size_t getMaxThreads() const 102 { 103 return fMaxThreads; 104 } 105 106 /** @brief set the maximum processing weight of a thread to be 107 * submitted for execution from the existing jobs 108 * scheduled in the work queue 109 * 110 * @param maxWeight for execution 111 */ 112 void setMaxThreadWeight( size_t maxWeight ); 113 114 /** @brief get the maximum number of threads 115 */ getMaxThreadWeight()116 inline uint32_t getMaxThreadWeight() const 117 { 118 return fMaxThreadWeight; 119 } 120 121 /** @brief register a functor to be called when a new thread 122 * is created 123 */ 124 void setThreadCreatedListener(const Functor_T& f) ; 125 126 /** @brief queue size accessor 127 * 128 */ getWaiting()129 inline uint32_t getWaiting() const 130 { 131 return fWaitingFunctorsSize; 132 } 133 getWeight()134 inline uint32_t getWeight() const 135 { 136 return fWaitingFunctorsWeight; 137 } 138 139 void removeJobs(uint32_t id); 140 141 /********************************************* 142 * operations 143 * 144 *********************************************/ 145 146 /** @brief invoke a functor in a separate thread managed by the pool 147 * 148 * If all maxThreads are busy, threadfunc will be added to a work list and 149 * will run when a thread comes free. If all threads are busy and there are 150 * queueSize tasks already waiting, invoke() will block until a slot in the 151 * queue comes free. 152 */ 153 void invoke(const Functor_T& threadfunc, uint32_t functor_weight, uint32_t id); 154 155 /** @brief stop the threads 156 */ 157 void stop(); 158 159 /** @brief wait on all the threads to complete 160 */ 161 void wait(); 162 163 /** @brief for use in debugging 164 */ 165 void dump(); 166 167 protected: 168 169 private: 170 /** @brief initialize data memebers 171 */ 172 void init(); 173 174 /** @brief add a functor to the list 175 */ 176 void addFunctor(const Functor_T& func, uint32_t functor_weight, uint32_t id); 177 178 /** @brief thread entry point 179 */ 180 void beginThread() throw(); 181 182 183 WeightedThreadPool(const WeightedThreadPool&); 184 WeightedThreadPool& operator = (const WeightedThreadPool&); 185 186 friend struct beginThreadFunc; 187 188 struct beginThreadFunc 189 { beginThreadFuncbeginThreadFunc190 beginThreadFunc(WeightedThreadPool& impl) 191 : fImpl(impl) 192 {} 193 operatorbeginThreadFunc194 void operator() () 195 { 196 fImpl.beginThread(); 197 } 198 199 WeightedThreadPool& fImpl; 200 }; 201 202 struct NoOp 203 { operatorNoOp204 void operator () () const 205 {} 206 }; 207 208 size_t fThreadCount; 209 size_t fMaxThreadWeight; 210 size_t fMaxThreads; 211 size_t fQueueSize; 212 213 //typedef std::list<Functor_T> Container_T; 214 struct FunctorListItemStruct 215 { 216 Functor_T functor; 217 uint32_t functorWeight; 218 uint32_t id; 219 }; 220 221 typedef FunctorListItemStruct FunctorListItem; 222 typedef std::list<FunctorListItem> Container_T; 223 Container_T fWaitingFunctors; 224 Container_T::iterator fNextFunctor; 225 226 uint32_t issued; 227 boost::mutex fMutex; 228 boost::condition fThreadAvailable; // triggered when a thread is available 229 boost::condition fNeedThread; // triggered when a thread is needed 230 boost::thread_group fThreads; 231 232 bool fStop; 233 long fGeneralErrors; 234 long fFunctorErrors; 235 uint16_t fWaitingFunctorsSize; 236 uint16_t fWaitingFunctorsWeight; 237 238 }; 239 240 } // namespace threadpool 241 242 #endif //WEIGHTEDTHREADPOOL_H 243