1 /* Copyright (C) 2014 InfiniDB, Inc.
2 
3    This program is free software; you can redistribute it and/or
4    modify it under the terms of the GNU General Public License
5    as published by the Free Software Foundation; version 2 of
6    the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16    MA 02110-1301, USA. */
17 
18 /***********************************************************************
19 *   $Id: $
20 *
21 *
22 ***********************************************************************/
23 /** @file */
24 
25 #ifndef WEIGHTEDTHREADPOOL_H
26 #define WEIGHTEDTHREADPOOL_H
27 
28 #include <string>
29 #include <iostream>
30 #include <cstdlib>
31 #include <sstream>
32 #include <stdexcept>
33 #include <boost/thread/thread.hpp>
34 #include <boost/thread/mutex.hpp>
35 #include <boost/thread/condition.hpp>
36 #include <boost/bind.hpp>
37 #include <boost/shared_ptr.hpp>
38 #include <boost/function.hpp>
39 
40 namespace threadpool
41 {
42 /** @brief ThreadPool is a component for working with pools of threads and asynchronously
43   * executing tasks. It is responsible for creating threads and tracking which threads are "busy"
44   * and which are idle. Idle threads are utilized as "work" is added to the system.
45   */
46 
47 class WeightedThreadPool
48 {
49 public:
50     typedef boost::function0<int> Functor_T;
51 
52     /*********************************************
53      *  ctor/dtor
54      *
55      *********************************************/
56 
57     /** @brief ctor
58       */
59     WeightedThreadPool();
60 
61     /** @brief ctor
62       *
63       * @param maxThreads the maximum number of threads in this pool. This is the maximum number
64       *        of simultaneuous operations that can go on.
65       * @param queueSize  the maximum number of work tasks in the queue. This is the maximum
66       *        number of jobs that can queue up in the work list before invoke() blocks.
67       */
68     explicit WeightedThreadPool( size_t maxThreadWeight, size_t maxThreads, size_t queueSize );
69 
70     /** @brief dtor
71       */
72     ~WeightedThreadPool() throw();
73 
74 
75     /*********************************************
76      *  accessors/mutators
77      *
78      *********************************************/
79     /** @brief set the work queue size
80       *
81       * @param queueSize the size of the work queue
82       */
83     void setQueueSize( size_t queueSize );
84 
85     /** @brief fet the work queue size
86       */
getQueueSize()87     inline size_t getQueueSize() const
88     {
89         return fQueueSize;
90     }
91 
92     /** @brief set the maximum number of threads to be used to process
93       * the work queue
94       *
95       * @param maxThreads the maximum number of threads
96       */
97     void  setMaxThreads( size_t maxThreads );
98 
99     /** @brief get the maximum number of threads
100       */
getMaxThreads()101     inline size_t getMaxThreads() const
102     {
103         return fMaxThreads;
104     }
105 
106     /** @brief set the maximum processing weight of a thread to be
107       * submitted for execution from the existing jobs
108       * scheduled in the work queue
109       *
110       * @param maxWeight for execution
111       */
112     void  setMaxThreadWeight( size_t maxWeight );
113 
114     /** @brief get the maximum number of threads
115       */
getMaxThreadWeight()116     inline uint32_t getMaxThreadWeight() const
117     {
118         return fMaxThreadWeight;
119     }
120 
121     /** @brief register a functor to be called when a new thread
122       *  is created
123       */
124     void setThreadCreatedListener(const Functor_T& f) ;
125 
126     /** @brief queue size accessor
127       *
128       */
getWaiting()129     inline uint32_t getWaiting() const
130     {
131         return fWaitingFunctorsSize;
132     }
133 
getWeight()134     inline uint32_t getWeight() const
135     {
136         return fWaitingFunctorsWeight;
137     }
138 
139     void removeJobs(uint32_t id);
140 
141     /*********************************************
142      *  operations
143      *
144      *********************************************/
145 
146     /** @brief invoke a functor in a separate thread managed by the pool
147       *
148       * If all maxThreads are busy, threadfunc will be added to a work list and
149       * will run when a thread comes free. If all threads are busy and there are
150       * queueSize tasks already waiting, invoke() will block until a slot in the
151       * queue comes free.
152       */
153     void invoke(const Functor_T& threadfunc, uint32_t functor_weight, uint32_t id);
154 
155     /** @brief stop the threads
156       */
157     void stop();
158 
159     /** @brief wait on all the threads to complete
160       */
161     void wait();
162 
163     /** @brief for use in debugging
164       */
165     void dump();
166 
167 protected:
168 
169 private:
170     /** @brief initialize data memebers
171       */
172     void init();
173 
174     /** @brief add a functor to the list
175       */
176     void addFunctor(const Functor_T& func, uint32_t functor_weight, uint32_t id);
177 
178     /** @brief thread entry point
179       */
180     void beginThread() throw();
181 
182 
183     WeightedThreadPool(const WeightedThreadPool&);
184     WeightedThreadPool& operator = (const WeightedThreadPool&);
185 
186     friend struct beginThreadFunc;
187 
188     struct beginThreadFunc
189     {
beginThreadFuncbeginThreadFunc190         beginThreadFunc(WeightedThreadPool& impl)
191             : fImpl(impl)
192         {}
193 
operatorbeginThreadFunc194         void operator() ()
195         {
196             fImpl.beginThread();
197         }
198 
199         WeightedThreadPool& fImpl;
200     };
201 
202     struct NoOp
203     {
operatorNoOp204         void operator () () const
205         {}
206     };
207 
208     size_t fThreadCount;
209     size_t fMaxThreadWeight;
210     size_t fMaxThreads;
211     size_t fQueueSize;
212 
213     //typedef std::list<Functor_T> Container_T;
214     struct FunctorListItemStruct
215     {
216         Functor_T functor;
217         uint32_t functorWeight;
218         uint32_t id;
219     };
220 
221     typedef FunctorListItemStruct FunctorListItem;
222     typedef std::list<FunctorListItem> Container_T;
223     Container_T fWaitingFunctors;
224     Container_T::iterator fNextFunctor;
225 
226     uint32_t issued;
227     boost::mutex fMutex;
228     boost::condition fThreadAvailable; // triggered when a thread is available
229     boost::condition fNeedThread;      // triggered when a thread is needed
230     boost::thread_group fThreads;
231 
232     bool 	fStop;
233     long 	fGeneralErrors;
234     long	fFunctorErrors;
235     uint16_t 	fWaitingFunctorsSize;
236     uint16_t 	fWaitingFunctorsWeight;
237 
238 };
239 
240 } // namespace threadpool
241 
242 #endif //WEIGHTEDTHREADPOOL_H
243