1 ///////////////////////////////////////////////////////////////////////////
2 //
3 // Copyright (c) 2005-2012, Industrial Light & Magic, a division of Lucas
4 // Digital Ltd. LLC
5 //
6 // All rights reserved.
7 //
8 // Redistribution and use in source and binary forms, with or without
9 // modification, are permitted provided that the following conditions are
10 // met:
11 // * Redistributions of source code must retain the above copyright
12 // notice, this list of conditions and the following disclaimer.
13 // * Redistributions in binary form must reproduce the above
14 // copyright notice, this list of conditions and the following disclaimer
15 // in the documentation and/or other materials provided with the
16 // distribution.
17 // * Neither the name of Industrial Light & Magic nor the names of
18 // its contributors may be used to endorse or promote products derived
19 // from this software without specific prior written permission.
20 //
21 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32 //
33 ///////////////////////////////////////////////////////////////////////////
34
35 //-----------------------------------------------------------------------------
36 //
37 // class Task, class ThreadPool, class TaskGroup
38 //
39 //-----------------------------------------------------------------------------
40
41 #include "IlmThread.h"
42 #include "IlmThreadMutex.h"
43 #include "IlmThreadSemaphore.h"
44 #include "IlmThreadPool.h"
45 #include "Iex.h"
46 #include <list>
47
48 using namespace std;
49
50 ILMTHREAD_INTERNAL_NAMESPACE_SOURCE_ENTER
51 namespace {
52
53 class WorkerThread: public Thread
54 {
55 public:
56
57 WorkerThread (ThreadPool::Data* data);
58
59 virtual void run ();
60
61 private:
62
63 ThreadPool::Data * _data;
64 };
65
66 } //namespace
67
68
69 struct TaskGroup::Data
70 {
71 Data ();
72 ~Data ();
73
74 void addTask () ;
75 void removeTask ();
76
77 Semaphore isEmpty; // used to signal that the taskgroup is empty
78 int numPending; // number of pending tasks to still execute
79 Mutex dtorMutex; // used to work around the glibc bug:
80 // http://sources.redhat.com/bugzilla/show_bug.cgi?id=12674
81 };
82
83
84 struct ThreadPool::Data
85 {
86 Data ();
87 ~Data();
88
89 void finish ();
90 bool stopped () const;
91 void stop ();
92
93 Semaphore taskSemaphore; // threads wait on this for ready tasks
94 Mutex taskMutex; // mutual exclusion for the tasks list
95 list<Task*> tasks; // the list of tasks to execute
96 size_t numTasks; // fast access to list size
97 // (list::size() can be O(n))
98
99 Semaphore threadSemaphore; // signaled when a thread starts executing
100 Mutex threadMutex; // mutual exclusion for threads list
101 list<WorkerThread*> threads; // the list of all threads
102 size_t numThreads; // fast access to list size
103
104 bool stopping; // flag indicating whether to stop threads
105 Mutex stopMutex; // mutual exclusion for stopping flag
106 };
107
108
109
110 //
111 // class WorkerThread
112 //
113
WorkerThread(ThreadPool::Data * data)114 WorkerThread::WorkerThread (ThreadPool::Data* data):
115 _data (data)
116 {
117 start();
118 }
119
120
121 void
run()122 WorkerThread::run ()
123 {
124 //
125 // Signal that the thread has started executing
126 //
127
128 _data->threadSemaphore.post();
129
130 while (true)
131 {
132 //
133 // Wait for a task to become available
134 //
135
136 _data->taskSemaphore.wait();
137
138 {
139 Lock taskLock (_data->taskMutex);
140
141 //
142 // If there is a task pending, pop off the next task in the FIFO
143 //
144
145 if (_data->numTasks > 0)
146 {
147 Task* task = _data->tasks.front();
148 TaskGroup* taskGroup = task->group();
149 _data->tasks.pop_front();
150 _data->numTasks--;
151
152 taskLock.release();
153 task->execute();
154 taskLock.acquire();
155
156 delete task;
157 taskGroup->_data->removeTask();
158 }
159 else if (_data->stopped())
160 {
161 break;
162 }
163 }
164 }
165 }
166
167
168 //
169 // struct TaskGroup::Data
170 //
171
Data()172 TaskGroup::Data::Data (): isEmpty (1), numPending (0)
173 {
174 // empty
175 }
176
177
~Data()178 TaskGroup::Data::~Data ()
179 {
180 //
181 // A TaskGroup acts like an "inverted" semaphore: if the count
182 // is above 0 then waiting on the taskgroup will block. This
183 // destructor waits until the taskgroup is empty before returning.
184 //
185
186 isEmpty.wait ();
187
188 // Alas, given the current bug in glibc we need a secondary
189 // syncronisation primitive here to account for the fact that
190 // destructing the isEmpty Semaphore in this thread can cause
191 // an error for a separate thread that is issuing the post() call.
192 // We are entitled to destruct the semaphore at this point, however,
193 // that post() call attempts to access data out of the associated
194 // memory *after* it has woken the waiting threads, including this one,
195 // potentially leading to invalid memory reads.
196 // http://sources.redhat.com/bugzilla/show_bug.cgi?id=12674
197
198 Lock lock (dtorMutex);
199 }
200
201
202 void
addTask()203 TaskGroup::Data::addTask ()
204 {
205 //
206 // Any access to the taskgroup is protected by a mutex that is
207 // held by the threadpool. Therefore it is safe to access
208 // numPending before we wait on the semaphore.
209 //
210
211 if (numPending++ == 0)
212 isEmpty.wait ();
213 }
214
215
216 void
removeTask()217 TaskGroup::Data::removeTask ()
218 {
219 // Alas, given the current bug in glibc we need a secondary
220 // syncronisation primitive here to account for the fact that
221 // destructing the isEmpty Semaphore in a separate thread can
222 // cause an error. Issuing the post call here the current libc
223 // implementation attempts to access memory *after* it has woken
224 // waiting threads.
225 // Since other threads are entitled to delete the semaphore the
226 // access to the memory location can be invalid.
227 // http://sources.redhat.com/bugzilla/show_bug.cgi?id=12674
228
229 if (--numPending == 0)
230 {
231 Lock lock (dtorMutex);
232 isEmpty.post ();
233 }
234 }
235
236
237 //
238 // struct ThreadPool::Data
239 //
240
Data()241 ThreadPool::Data::Data (): numTasks (0), numThreads (0), stopping (false)
242 {
243 // empty
244 }
245
246
~Data()247 ThreadPool::Data::~Data()
248 {
249 Lock lock (threadMutex);
250 finish ();
251 }
252
253
254 void
finish()255 ThreadPool::Data::finish ()
256 {
257 stop();
258
259 //
260 // Signal enough times to allow all threads to stop.
261 //
262 // Wait until all threads have started their run functions.
263 // If we do not wait before we destroy the threads then it's
264 // possible that the threads have not yet called their run
265 // functions.
266 // If this happens then the run function will be called off
267 // of an invalid object and we will crash, most likely with
268 // an error like: "pure virtual method called"
269 //
270
271 for (size_t i = 0; i < numThreads; i++)
272 {
273 taskSemaphore.post();
274 threadSemaphore.wait();
275 }
276
277 //
278 // Join all the threads
279 //
280
281 for (list<WorkerThread*>::iterator i = threads.begin();
282 i != threads.end();
283 ++i)
284 {
285 delete (*i);
286 }
287
288 Lock lock1 (taskMutex);
289 Lock lock2 (stopMutex);
290 threads.clear();
291 tasks.clear();
292 numThreads = 0;
293 numTasks = 0;
294 stopping = false;
295 }
296
297
298 bool
stopped() const299 ThreadPool::Data::stopped () const
300 {
301 Lock lock (stopMutex);
302 return stopping;
303 }
304
305
306 void
stop()307 ThreadPool::Data::stop ()
308 {
309 Lock lock (stopMutex);
310 stopping = true;
311 }
312
313
314 //
315 // class Task
316 //
317
Task(TaskGroup * g)318 Task::Task (TaskGroup* g): _group(g)
319 {
320 // empty
321 }
322
323
~Task()324 Task::~Task()
325 {
326 // empty
327 }
328
329
330 TaskGroup*
group()331 Task::group ()
332 {
333 return _group;
334 }
335
336
TaskGroup()337 TaskGroup::TaskGroup ():
338 _data (new Data())
339 {
340 // empty
341 }
342
343
~TaskGroup()344 TaskGroup::~TaskGroup ()
345 {
346 delete _data;
347 }
348
349
350 //
351 // class ThreadPool
352 //
353
ThreadPool(unsigned nthreads)354 ThreadPool::ThreadPool (unsigned nthreads):
355 _data (new Data())
356 {
357 setNumThreads (nthreads);
358 }
359
360
~ThreadPool()361 ThreadPool::~ThreadPool ()
362 {
363 delete _data;
364 }
365
366
367 int
numThreads() const368 ThreadPool::numThreads () const
369 {
370 Lock lock (_data->threadMutex);
371 return _data->numThreads;
372 }
373
374
375 void
setNumThreads(int count)376 ThreadPool::setNumThreads (int count)
377 {
378 if (count < 0)
379 throw IEX_INTERNAL_NAMESPACE::ArgExc ("Attempt to set the number of threads "
380 "in a thread pool to a negative value.");
381
382 //
383 // Lock access to thread list and size
384 //
385
386 Lock lock (_data->threadMutex);
387
388 if ((size_t)count > _data->numThreads)
389 {
390 //
391 // Add more threads
392 //
393
394 while (_data->numThreads < (size_t)count)
395 {
396 _data->threads.push_back (new WorkerThread (_data));
397 _data->numThreads++;
398 }
399 }
400 else if ((size_t)count < _data->numThreads)
401 {
402 //
403 // Wait until all existing threads are finished processing,
404 // then delete all threads.
405 //
406
407 _data->finish ();
408
409 //
410 // Add in new threads
411 //
412
413 while (_data->numThreads < (size_t)count)
414 {
415 _data->threads.push_back (new WorkerThread (_data));
416 _data->numThreads++;
417 }
418 }
419 }
420
421
422 void
addTask(Task * task)423 ThreadPool::addTask (Task* task)
424 {
425 //
426 // Lock the threads, needed to access numThreads
427 //
428
429 Lock lock (_data->threadMutex);
430
431 if (_data->numThreads == 0)
432 {
433 task->execute ();
434 delete task;
435 }
436 else
437 {
438 //
439 // Get exclusive access to the tasks queue
440 //
441
442 {
443 Lock taskLock (_data->taskMutex);
444
445 //
446 // Push the new task into the FIFO
447 //
448
449 _data->tasks.push_back (task);
450 _data->numTasks++;
451 task->group()->_data->addTask();
452 }
453
454 //
455 // Signal that we have a new task to process
456 //
457
458 _data->taskSemaphore.post ();
459 }
460 }
461
462
463 ThreadPool&
globalThreadPool()464 ThreadPool::globalThreadPool ()
465 {
466 //
467 // The global thread pool
468 //
469
470 static ThreadPool gThreadPool (0);
471
472 return gThreadPool;
473 }
474
475
476 void
addGlobalTask(Task * task)477 ThreadPool::addGlobalTask (Task* task)
478 {
479 globalThreadPool().addTask (task);
480 }
481
482
483 ILMTHREAD_INTERNAL_NAMESPACE_SOURCE_EXIT
484