1 /********************************************************************************
2 *                                                                               *
3 *                             T h r e a d   P o o l                             *
4 *                                                                               *
5 *********************************************************************************
6 * Copyright (C) 2006,2021 by Jeroen van der Zijp.   All Rights Reserved.        *
7 *********************************************************************************
8 * This library is free software; you can redistribute it and/or modify          *
9 * it under the terms of the GNU Lesser General Public License as published by   *
10 * the Free Software Foundation; either version 3 of the License, or             *
11 * (at your option) any later version.                                           *
12 *                                                                               *
13 * This library is distributed in the hope that it will be useful,               *
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of                *
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the                 *
16 * GNU Lesser General Public License for more details.                           *
17 *                                                                               *
18 * You should have received a copy of the GNU Lesser General Public License      *
19 * along with this program.  If not, see <http://www.gnu.org/licenses/>          *
20 ********************************************************************************/
21 #include "xincs.h"
22 #include "fxver.h"
23 #include "fxdefs.h"
24 #include "fxmath.h"
25 #include "FXException.h"
26 #include "FXElement.h"
27 #include "FXArray.h"
28 #include "FXPtrList.h"
29 #include "FXAtomic.h"
30 #include "FXSemaphore.h"
31 #include "FXCompletion.h"
32 #include "FXRunnable.h"
33 #include "FXAutoThreadStorageKey.h"
34 #include "FXThread.h"
35 #include "FXWorker.h"
36 #include "FXLFQueue.h"
37 #include "FXThreadPool.h"
38 
39 
40 /*
41   Notes:
42   - Process tasks from lock-free queue using multiple threads.
43 
44   - A semaphore is used to block producing thread in the case the queue is full, for
45     arbitrary amounts of time ranging from 0 to forever.
46 
47   - Producer ONLY ever blocks if queue is full, i.e. no free slot in the queue for an
48     additional task.
49 
50   - Multiple producers may add tasks to the queue without lock-contention, if queue is
51     not full.
52 
53   - Another semaphore is used to block consuming threads.  Permanent worker threads
54     block indefinitely; temporary workers, if present, block only for short amount of
55     time and exit when no tasks appear within that time.
56 
57   - Instead of waiting, a thread can become an additional consumer thread, except that
58     it will not block but return if the queue is empty, or count becomes zero.
59 
60   - Task groups provide a mechanism to execute tasks which belong together, and allow
61     the producing thread to know when the entire group is complete.
62 
63   - To this end, a task group maintains a completion object which records the number of
64     tasks started versus completed, and which sports a semaphore which can coordinate
65     the worker threads with the producer thread.
66 
67   - For groups of tasks belonging together, a counter is used to record the completions
68     of all the tasks started in the group.  If the producer also participates in the
69     processing of of tasks, it exits as soon as the completion count reaches zero or
70     the task queue becomes empty.  In either case, it may have to block for a little
71     while on the completion semaphore.
72 
73   - No new tasks can be posted when about to shut down; so when queue becomes empty, it
74     will stay empty.
75 
76 */
77 
78 using namespace FX;
79 
80 /*******************************************************************************/
81 
82 namespace FX {
83 
84 
85 // Locate thread pool to which worker thread belongs
86 FXAutoThreadStorageKey FXThreadPool::reference;
87 
88 
89 // Create thread pool
FXThreadPool(FXuint sz)90 FXThreadPool::FXThreadPool(FXuint sz):queue(sz),freeslots(sz),usedslots(0),stacksize(0),expiration(forever),maximum(FXThread::processors()),minimum(1),workers(0),running(0){
91   FXTRACE((100,"FXThreadPool::FXThreadPool(%d)\n",sz));
92   }
93 
94 
95 // Change task queue size, return true if success
setSize(FXuint sz)96 FXbool FXThreadPool::setSize(FXuint sz){
97   if((sz<8) || (sz&(sz-1))){ fxerror("FXThreadPool::setSize: bad argument: %u.\n",sz); }
98   if(atomicBoolCas(&running,0,2)){
99     FXuint osz=queue.getSize();
100     if(setSize(sz)){
101       while(osz<sz){ ++osz; freeslots.post(); }
102       while(osz>sz){ --osz; freeslots.wait(); }
103       running=0;
104       return true;
105       }
106     running=0;
107     }
108   return false;
109   }
110 
111 
112 // Change minimum number of worker threads
setMinimumThreads(FXuint n)113 FXbool FXThreadPool::setMinimumThreads(FXuint n){
114   if(atomicBoolCas(&running,0,2)){
115     minimum=n;
116     running=0;
117     return true;
118     }
119   return false;
120   }
121 
122 
123 // Change maximum number of worker threads
setMaximumThreads(FXuint n)124 FXbool FXThreadPool::setMaximumThreads(FXuint n){
125   if(atomicBoolCas(&running,0,2)){
126     maximum=FXMAX(n,1);
127     running=0;
128     return true;
129     }
130   return false;
131   }
132 
133 
134 // Change expiration time
setExpiration(FXTime ns)135 FXbool FXThreadPool::setExpiration(FXTime ns){
136   if(atomicBoolCas(&running,0,2)){
137     expiration=ns;
138     running=0;
139     return true;
140     }
141   return false;
142   }
143 
144 
145 // Change stack size
setStackSize(FXuval sz)146 FXbool FXThreadPool::setStackSize(FXuval sz){
147   if(atomicBoolCas(&running,0,2)){
148     stacksize=sz;
149     running=0;
150     return true;
151     }
152   return false;
153   }
154 
155 
156 // Return calling thread's thread pool
instance()157 FXThreadPool* FXThreadPool::instance(){
158   return (FXThreadPool*)reference.get();
159   }
160 
161 
162 // Change calling thread's thread pool
instance(FXThreadPool * pool)163 void FXThreadPool::instance(FXThreadPool *pool){
164   reference.set(pool);
165   }
166 
167 
168 // Start a worker and reset semaphore
startWorker()169 FXbool FXThreadPool::startWorker(){
170   threads.increment();
171   if(FXWorker::execute(this,stacksize)){
172     return true;
173     }
174   threads.decrement();
175   return false;
176   }
177 
178 
179 // Start thread pool
start(FXuint count)180 FXuint FXThreadPool::start(FXuint count){
181   FXuint result=0;
182   FXTRACE((150,"FXThreadPool::start(%u)\n",count));
183   if(atomicBoolCas(&running,0,2)){
184 
185     // Start number of workers
186     while(result<count && startWorker()){
187       result++;
188       }
189 
190     // Set context reference if not set yet
191     if(instance()==NULL) instance(this);
192 
193     // Start running
194     running=1;
195     }
196   return result;
197   }
198 
199 
200 // Wait until counter becomes zero, return if no new tasks posted within timeout
runWhile(FXCompletion & comp,FXTime timeout)201 void FXThreadPool::runWhile(FXCompletion& comp,FXTime timeout){
202   FXRunnable* task;
203   while(!comp.done() && usedslots.wait(timeout) && queue.pop(task)){
204     freeslots.post();
205     try{
206       task->run();
207       }
208     catch(...){
209       tasks.decrement();
210       throw;
211       }
212     tasks.decrement();
213     }
214   }
215 
216 
217 // Process tasks from the queue using multiple worker threads.
218 // When queue becomes empty, extra workers will exit if no work arrives within
219 // a set amount of time; the last worker to terminate will signal the semaphore.
220 // Any exceptions raised during task processing will be rethrown after adjusting
221 // the current count of workers.
run()222 FXint FXThreadPool::run(){
223   FXuint w=atomicAdd(&workers,1);
224   instance(this);
225   try{
226     runWhile(threads,(w<minimum)?forever:expiration);
227     }
228   catch(...){
229     instance(NULL);
230     atomicAdd(&workers,-1);
231     threads.decrement();
232     throw;
233     }
234   instance(NULL);
235   atomicAdd(&workers,-1);
236   threads.decrement();
237   return 0;
238   }
239 
240 
241 // Try to add new task to the queue, waiting for space if necessary
execute(FXRunnable * task,FXTime blocking)242 FXbool FXThreadPool::execute(FXRunnable* task,FXTime blocking){
243   if(__likely(running==1 && task)){
244     if(tasks.count()<threads.count() || maximum<=threads.count() || startWorker()){
245       if(freeslots.wait(blocking)){
246         tasks.increment();
247         queue.push(task);
248         usedslots.post();
249         return true;
250         }
251       }
252     }
253   return false;
254   }
255 
256 
257 // Execute task, and wait for all completion of all tasks
executeAndWait(FXRunnable * task,FXTime blocking)258 FXbool FXThreadPool::executeAndWait(FXRunnable* task,FXTime blocking){
259   if(execute(task,blocking)){
260     runWhile(tasks,0);
261     tasks.wait();
262     return true;
263     }
264   return false;
265   }
266 
267 
268 // Execute task, and wait for completion
executeAndWaitFor(FXRunnable * task,FXCompletion & comp,FXTime blocking)269 FXbool FXThreadPool::executeAndWaitFor(FXRunnable* task,FXCompletion& comp,FXTime blocking){
270   if(execute(task,blocking)){
271     runWhile(comp,0);
272     comp.wait();
273     return true;
274     }
275   return false;
276   }
277 
278 
279 // Wait for completion of all tasks
wait()280 FXbool FXThreadPool::wait(){
281   if(__likely(running)){
282     runWhile(tasks,0);
283     tasks.wait();
284     return true;
285     }
286   return false;
287   }
288 
289 
290 // Wait for completion
waitFor(FXCompletion & comp)291 FXbool FXThreadPool::waitFor(FXCompletion& comp){
292   if(__likely(running)){
293     runWhile(comp,0);
294     comp.wait();
295     return true;
296     }
297   return false;
298   }
299 
300 
301 // Stop thread pool
stop()302 FXbool FXThreadPool::stop(){
303   FXTRACE((150,"FXThreadPool::stop()\n"));
304   if(atomicBoolCas(&running,1,2)){
305     FXint w=threads.count();
306 
307     // Help out processing tasks while waiting
308     wait();
309 
310     // Queue empty now
311     FXASSERT(queue.isEmpty());
312 
313     // Force all workers to stop
314     while(w){ usedslots.post(); --w; }
315 
316     // Wait till last worker is done
317     threads.wait();
318 
319     // Reset usedslots semaphore to zero
320     while(usedslots.trywait()){ }
321 
322     // Unset context reference if set to this context
323     if(instance()==this) instance(NULL);
324 
325     // Stop running
326     running=0;
327     return true;
328     }
329   return false;
330   }
331 
332 
333 // Delete thread pool
~FXThreadPool()334 FXThreadPool::~FXThreadPool(){
335   FXTRACE((100,"FXThreadPool::~FXThreadPool()\n"));
336   stop();
337   }
338 
339 }
340