1 /********************************************************************************
2 * *
3 * T h r e a d P o o l *
4 * *
5 *********************************************************************************
6 * Copyright (C) 2006,2021 by Jeroen van der Zijp. All Rights Reserved. *
7 *********************************************************************************
8 * This library is free software; you can redistribute it and/or modify *
9 * it under the terms of the GNU Lesser General Public License as published by *
10 * the Free Software Foundation; either version 3 of the License, or *
11 * (at your option) any later version. *
12 * *
13 * This library is distributed in the hope that it will be useful, *
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
16 * GNU Lesser General Public License for more details. *
17 * *
18 * You should have received a copy of the GNU Lesser General Public License *
19 * along with this program. If not, see <http://www.gnu.org/licenses/> *
20 ********************************************************************************/
21 #include "xincs.h"
22 #include "fxver.h"
23 #include "fxdefs.h"
24 #include "fxmath.h"
25 #include "FXException.h"
26 #include "FXElement.h"
27 #include "FXArray.h"
28 #include "FXPtrList.h"
29 #include "FXAtomic.h"
30 #include "FXSemaphore.h"
31 #include "FXCompletion.h"
32 #include "FXRunnable.h"
33 #include "FXAutoThreadStorageKey.h"
34 #include "FXThread.h"
35 #include "FXWorker.h"
36 #include "FXLFQueue.h"
37 #include "FXThreadPool.h"
38
39
40 /*
41 Notes:
42 - Process tasks from lock-free queue using multiple threads.
43
44 - A semaphore is used to block producing thread in the case the queue is full, for
45 arbitrary amounts of time ranging from 0 to forever.
46
47 - Producer ONLY ever blocks if queue is full, i.e. no free slot in the queue for an
48 additional task.
49
50 - Multiple producers may add tasks to the queue without lock-contention, if queue is
51 not full.
52
53 - Another semaphore is used to block consuming threads. Permanent worker threads
54 block indefinitely; temporary workers, if present, block only for short amount of
55 time and exit when no tasks appear within that time.
56
57 - Instead of waiting, a thread can become an additional consumer thread, except that
58 it will not block but return if the queue is empty, or count becomes zero.
59
60 - Task groups provide a mechanism to execute tasks which belong together, and allow
61 the producing thread to know when the entire group is complete.
62
63 - To this end, a task group maintains a completion object which records the number of
64 tasks started versus completed, and which sports a semaphore which can coordinate
65 the worker threads with the producer thread.
66
67 - For groups of tasks belonging together, a counter is used to record the completions
68 of all the tasks started in the group. If the producer also participates in the
69 processing of of tasks, it exits as soon as the completion count reaches zero or
70 the task queue becomes empty. In either case, it may have to block for a little
71 while on the completion semaphore.
72
73 - No new tasks can be posted when about to shut down; so when queue becomes empty, it
74 will stay empty.
75
76 */
77
78 using namespace FX;
79
80 /*******************************************************************************/
81
82 namespace FX {
83
84
85 // Locate thread pool to which worker thread belongs
86 FXAutoThreadStorageKey FXThreadPool::reference;
87
88
89 // Create thread pool
FXThreadPool(FXuint sz)90 FXThreadPool::FXThreadPool(FXuint sz):queue(sz),freeslots(sz),usedslots(0),stacksize(0),expiration(forever),maximum(FXThread::processors()),minimum(1),workers(0),running(0){
91 FXTRACE((100,"FXThreadPool::FXThreadPool(%d)\n",sz));
92 }
93
94
95 // Change task queue size, return true if success
setSize(FXuint sz)96 FXbool FXThreadPool::setSize(FXuint sz){
97 if((sz<8) || (sz&(sz-1))){ fxerror("FXThreadPool::setSize: bad argument: %u.\n",sz); }
98 if(atomicBoolCas(&running,0,2)){
99 FXuint osz=queue.getSize();
100 if(setSize(sz)){
101 while(osz<sz){ ++osz; freeslots.post(); }
102 while(osz>sz){ --osz; freeslots.wait(); }
103 running=0;
104 return true;
105 }
106 running=0;
107 }
108 return false;
109 }
110
111
112 // Change minimum number of worker threads
setMinimumThreads(FXuint n)113 FXbool FXThreadPool::setMinimumThreads(FXuint n){
114 if(atomicBoolCas(&running,0,2)){
115 minimum=n;
116 running=0;
117 return true;
118 }
119 return false;
120 }
121
122
123 // Change maximum number of worker threads
setMaximumThreads(FXuint n)124 FXbool FXThreadPool::setMaximumThreads(FXuint n){
125 if(atomicBoolCas(&running,0,2)){
126 maximum=FXMAX(n,1);
127 running=0;
128 return true;
129 }
130 return false;
131 }
132
133
134 // Change expiration time
setExpiration(FXTime ns)135 FXbool FXThreadPool::setExpiration(FXTime ns){
136 if(atomicBoolCas(&running,0,2)){
137 expiration=ns;
138 running=0;
139 return true;
140 }
141 return false;
142 }
143
144
145 // Change stack size
setStackSize(FXuval sz)146 FXbool FXThreadPool::setStackSize(FXuval sz){
147 if(atomicBoolCas(&running,0,2)){
148 stacksize=sz;
149 running=0;
150 return true;
151 }
152 return false;
153 }
154
155
156 // Return calling thread's thread pool
instance()157 FXThreadPool* FXThreadPool::instance(){
158 return (FXThreadPool*)reference.get();
159 }
160
161
162 // Change calling thread's thread pool
instance(FXThreadPool * pool)163 void FXThreadPool::instance(FXThreadPool *pool){
164 reference.set(pool);
165 }
166
167
168 // Start a worker and reset semaphore
startWorker()169 FXbool FXThreadPool::startWorker(){
170 threads.increment();
171 if(FXWorker::execute(this,stacksize)){
172 return true;
173 }
174 threads.decrement();
175 return false;
176 }
177
178
179 // Start thread pool
start(FXuint count)180 FXuint FXThreadPool::start(FXuint count){
181 FXuint result=0;
182 FXTRACE((150,"FXThreadPool::start(%u)\n",count));
183 if(atomicBoolCas(&running,0,2)){
184
185 // Start number of workers
186 while(result<count && startWorker()){
187 result++;
188 }
189
190 // Set context reference if not set yet
191 if(instance()==NULL) instance(this);
192
193 // Start running
194 running=1;
195 }
196 return result;
197 }
198
199
200 // Wait until counter becomes zero, return if no new tasks posted within timeout
runWhile(FXCompletion & comp,FXTime timeout)201 void FXThreadPool::runWhile(FXCompletion& comp,FXTime timeout){
202 FXRunnable* task;
203 while(!comp.done() && usedslots.wait(timeout) && queue.pop(task)){
204 freeslots.post();
205 try{
206 task->run();
207 }
208 catch(...){
209 tasks.decrement();
210 throw;
211 }
212 tasks.decrement();
213 }
214 }
215
216
217 // Process tasks from the queue using multiple worker threads.
218 // When queue becomes empty, extra workers will exit if no work arrives within
219 // a set amount of time; the last worker to terminate will signal the semaphore.
220 // Any exceptions raised during task processing will be rethrown after adjusting
221 // the current count of workers.
run()222 FXint FXThreadPool::run(){
223 FXuint w=atomicAdd(&workers,1);
224 instance(this);
225 try{
226 runWhile(threads,(w<minimum)?forever:expiration);
227 }
228 catch(...){
229 instance(NULL);
230 atomicAdd(&workers,-1);
231 threads.decrement();
232 throw;
233 }
234 instance(NULL);
235 atomicAdd(&workers,-1);
236 threads.decrement();
237 return 0;
238 }
239
240
241 // Try to add new task to the queue, waiting for space if necessary
execute(FXRunnable * task,FXTime blocking)242 FXbool FXThreadPool::execute(FXRunnable* task,FXTime blocking){
243 if(__likely(running==1 && task)){
244 if(tasks.count()<threads.count() || maximum<=threads.count() || startWorker()){
245 if(freeslots.wait(blocking)){
246 tasks.increment();
247 queue.push(task);
248 usedslots.post();
249 return true;
250 }
251 }
252 }
253 return false;
254 }
255
256
257 // Execute task, and wait for all completion of all tasks
executeAndWait(FXRunnable * task,FXTime blocking)258 FXbool FXThreadPool::executeAndWait(FXRunnable* task,FXTime blocking){
259 if(execute(task,blocking)){
260 runWhile(tasks,0);
261 tasks.wait();
262 return true;
263 }
264 return false;
265 }
266
267
268 // Execute task, and wait for completion
executeAndWaitFor(FXRunnable * task,FXCompletion & comp,FXTime blocking)269 FXbool FXThreadPool::executeAndWaitFor(FXRunnable* task,FXCompletion& comp,FXTime blocking){
270 if(execute(task,blocking)){
271 runWhile(comp,0);
272 comp.wait();
273 return true;
274 }
275 return false;
276 }
277
278
279 // Wait for completion of all tasks
wait()280 FXbool FXThreadPool::wait(){
281 if(__likely(running)){
282 runWhile(tasks,0);
283 tasks.wait();
284 return true;
285 }
286 return false;
287 }
288
289
290 // Wait for completion
waitFor(FXCompletion & comp)291 FXbool FXThreadPool::waitFor(FXCompletion& comp){
292 if(__likely(running)){
293 runWhile(comp,0);
294 comp.wait();
295 return true;
296 }
297 return false;
298 }
299
300
301 // Stop thread pool
stop()302 FXbool FXThreadPool::stop(){
303 FXTRACE((150,"FXThreadPool::stop()\n"));
304 if(atomicBoolCas(&running,1,2)){
305 FXint w=threads.count();
306
307 // Help out processing tasks while waiting
308 wait();
309
310 // Queue empty now
311 FXASSERT(queue.isEmpty());
312
313 // Force all workers to stop
314 while(w){ usedslots.post(); --w; }
315
316 // Wait till last worker is done
317 threads.wait();
318
319 // Reset usedslots semaphore to zero
320 while(usedslots.trywait()){ }
321
322 // Unset context reference if set to this context
323 if(instance()==this) instance(NULL);
324
325 // Stop running
326 running=0;
327 return true;
328 }
329 return false;
330 }
331
332
333 // Delete thread pool
~FXThreadPool()334 FXThreadPool::~FXThreadPool(){
335 FXTRACE((100,"FXThreadPool::~FXThreadPool()\n"));
336 stop();
337 }
338
339 }
340