1 ///////////////////////////////////////////////////////////////////////////
2 //
3 // Copyright (c) 2005-2012, Industrial Light & Magic, a division of Lucas
4 // Digital Ltd. LLC
5 //
6 // All rights reserved.
7 //
8 // Redistribution and use in source and binary forms, with or without
9 // modification, are permitted provided that the following conditions are
10 // met:
11 // *       Redistributions of source code must retain the above copyright
12 // notice, this list of conditions and the following disclaimer.
13 // *       Redistributions in binary form must reproduce the above
14 // copyright notice, this list of conditions and the following disclaimer
15 // in the documentation and/or other materials provided with the
16 // distribution.
17 // *       Neither the name of Industrial Light & Magic nor the names of
18 // its contributors may be used to endorse or promote products derived
19 // from this software without specific prior written permission.
20 //
21 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32 //
33 ///////////////////////////////////////////////////////////////////////////
34 
35 //-----------------------------------------------------------------------------
36 //
37 //	class Task, class ThreadPool, class TaskGroup
38 //
39 //-----------------------------------------------------------------------------
40 
41 #include "IlmThread.h"
42 #include "IlmThreadMutex.h"
43 #include "IlmThreadSemaphore.h"
44 #include "IlmThreadPool.h"
45 #include "Iex.h"
46 #include <list>
47 
48 using namespace std;
49 
50 ILMTHREAD_INTERNAL_NAMESPACE_SOURCE_ENTER
51 namespace {
52 
53 class WorkerThread: public Thread
54 {
55   public:
56 
57     WorkerThread (ThreadPool::Data* data);
58 
59     virtual void	run ();
60 
61   private:
62 
63     ThreadPool::Data *	_data;
64 };
65 
66 } //namespace
67 
68 
69 struct TaskGroup::Data
70 {
71      Data ();
72     ~Data ();
73 
74     void	addTask () ;
75     void	removeTask ();
76 
77     Semaphore	isEmpty;        // used to signal that the taskgroup is empty
78     int         numPending;     // number of pending tasks to still execute
79     Mutex       dtorMutex;      // used to work around the glibc bug:
80                                 // http://sources.redhat.com/bugzilla/show_bug.cgi?id=12674
81 };
82 
83 
84 struct ThreadPool::Data
85 {
86      Data ();
87     ~Data();
88 
89     void	finish ();
90     bool	stopped () const;
91     void	stop ();
92 
93     Semaphore taskSemaphore;        // threads wait on this for ready tasks
94     Mutex taskMutex;                // mutual exclusion for the tasks list
95     list<Task*> tasks;              // the list of tasks to execute
96     size_t numTasks;                // fast access to list size
97                                     //   (list::size() can be O(n))
98 
99     Semaphore threadSemaphore;      // signaled when a thread starts executing
100     Mutex threadMutex;              // mutual exclusion for threads list
101     list<WorkerThread*> threads;    // the list of all threads
102     size_t numThreads;              // fast access to list size
103 
104     bool stopping;                  // flag indicating whether to stop threads
105     Mutex stopMutex;                // mutual exclusion for stopping flag
106 };
107 
108 
109 
110 //
111 // class WorkerThread
112 //
113 
WorkerThread(ThreadPool::Data * data)114 WorkerThread::WorkerThread (ThreadPool::Data* data):
115     _data (data)
116 {
117     start();
118 }
119 
120 
121 void
run()122 WorkerThread::run ()
123 {
124     //
125     // Signal that the thread has started executing
126     //
127 
128     _data->threadSemaphore.post();
129 
130     while (true)
131     {
132 	//
133         // Wait for a task to become available
134 	//
135 
136         _data->taskSemaphore.wait();
137 
138         {
139             Lock taskLock (_data->taskMutex);
140 
141 	    //
142             // If there is a task pending, pop off the next task in the FIFO
143 	    //
144 
145             if (_data->numTasks > 0)
146             {
147                 Task* task = _data->tasks.front();
148 		TaskGroup* taskGroup = task->group();
149                 _data->tasks.pop_front();
150                 _data->numTasks--;
151 
152                 taskLock.release();
153                 task->execute();
154                 taskLock.acquire();
155 
156                 delete task;
157                 taskGroup->_data->removeTask();
158             }
159             else if (_data->stopped())
160 	    {
161                 break;
162 	    }
163         }
164     }
165 }
166 
167 
168 //
169 // struct TaskGroup::Data
170 //
171 
Data()172 TaskGroup::Data::Data (): isEmpty (1), numPending (0)
173 {
174     // empty
175 }
176 
177 
~Data()178 TaskGroup::Data::~Data ()
179 {
180     //
181     // A TaskGroup acts like an "inverted" semaphore: if the count
182     // is above 0 then waiting on the taskgroup will block.  This
183     // destructor waits until the taskgroup is empty before returning.
184     //
185 
186     isEmpty.wait ();
187 
188     // Alas, given the current bug in glibc we need a secondary
189     // syncronisation primitive here to account for the fact that
190     // destructing the isEmpty Semaphore in this thread can cause
191     // an error for a separate thread that is issuing the post() call.
192     // We are entitled to destruct the semaphore at this point, however,
193     // that post() call attempts to access data out of the associated
194     // memory *after* it has woken the waiting threads, including this one,
195     // potentially leading to invalid memory reads.
196     // http://sources.redhat.com/bugzilla/show_bug.cgi?id=12674
197 
198     Lock lock (dtorMutex);
199 }
200 
201 
202 void
addTask()203 TaskGroup::Data::addTask ()
204 {
205     //
206     // Any access to the taskgroup is protected by a mutex that is
207     // held by the threadpool.  Therefore it is safe to access
208     // numPending before we wait on the semaphore.
209     //
210 
211     if (numPending++ == 0)
212 	isEmpty.wait ();
213 }
214 
215 
216 void
removeTask()217 TaskGroup::Data::removeTask ()
218 {
219     // Alas, given the current bug in glibc we need a secondary
220     // syncronisation primitive here to account for the fact that
221     // destructing the isEmpty Semaphore in a separate thread can
222     // cause an error. Issuing the post call here the current libc
223     // implementation attempts to access memory *after* it has woken
224     // waiting threads.
225     // Since other threads are entitled to delete the semaphore the
226     // access to the memory location can be invalid.
227     // http://sources.redhat.com/bugzilla/show_bug.cgi?id=12674
228 
229     if (--numPending == 0)
230     {
231         Lock lock (dtorMutex);
232         isEmpty.post ();
233     }
234 }
235 
236 
237 //
238 // struct ThreadPool::Data
239 //
240 
Data()241 ThreadPool::Data::Data (): numTasks (0), numThreads (0), stopping (false)
242 {
243     // empty
244 }
245 
246 
~Data()247 ThreadPool::Data::~Data()
248 {
249     Lock lock (threadMutex);
250     finish ();
251 }
252 
253 
254 void
finish()255 ThreadPool::Data::finish ()
256 {
257     stop();
258 
259     //
260     // Signal enough times to allow all threads to stop.
261     //
262     // Wait until all threads have started their run functions.
263     // If we do not wait before we destroy the threads then it's
264     // possible that the threads have not yet called their run
265     // functions.
266     // If this happens then the run function will be called off
267     // of an invalid object and we will crash, most likely with
268     // an error like: "pure virtual method called"
269     //
270 
271     for (size_t i = 0; i < numThreads; i++)
272     {
273 	taskSemaphore.post();
274 	threadSemaphore.wait();
275     }
276 
277     //
278     // Join all the threads
279     //
280 
281     for (list<WorkerThread*>::iterator i = threads.begin();
282 	 i != threads.end();
283 	 ++i)
284     {
285 	delete (*i);
286     }
287 
288     Lock lock1 (taskMutex);
289     Lock lock2 (stopMutex);
290     threads.clear();
291     tasks.clear();
292     numThreads = 0;
293     numTasks = 0;
294     stopping = false;
295 }
296 
297 
298 bool
stopped() const299 ThreadPool::Data::stopped () const
300 {
301     Lock lock (stopMutex);
302     return stopping;
303 }
304 
305 
306 void
stop()307 ThreadPool::Data::stop ()
308 {
309     Lock lock (stopMutex);
310     stopping = true;
311 }
312 
313 
314 //
315 // class Task
316 //
317 
Task(TaskGroup * g)318 Task::Task (TaskGroup* g): _group(g)
319 {
320     // empty
321 }
322 
323 
~Task()324 Task::~Task()
325 {
326     // empty
327 }
328 
329 
330 TaskGroup*
group()331 Task::group ()
332 {
333     return _group;
334 }
335 
336 
TaskGroup()337 TaskGroup::TaskGroup ():
338     _data (new Data())
339 {
340     // empty
341 }
342 
343 
~TaskGroup()344 TaskGroup::~TaskGroup ()
345 {
346     delete _data;
347 }
348 
349 
350 //
351 // class ThreadPool
352 //
353 
ThreadPool(unsigned nthreads)354 ThreadPool::ThreadPool (unsigned nthreads):
355     _data (new Data())
356 {
357     setNumThreads (nthreads);
358 }
359 
360 
~ThreadPool()361 ThreadPool::~ThreadPool ()
362 {
363     delete _data;
364 }
365 
366 
367 int
numThreads() const368 ThreadPool::numThreads () const
369 {
370     Lock lock (_data->threadMutex);
371     return _data->numThreads;
372 }
373 
374 
375 void
setNumThreads(int count)376 ThreadPool::setNumThreads (int count)
377 {
378     if (count < 0)
379         throw IEX_INTERNAL_NAMESPACE::ArgExc ("Attempt to set the number of threads "
380 			   "in a thread pool to a negative value.");
381 
382     //
383     // Lock access to thread list and size
384     //
385 
386     Lock lock (_data->threadMutex);
387 
388     if ((size_t)count > _data->numThreads)
389     {
390 	//
391         // Add more threads
392 	//
393 
394         while (_data->numThreads < (size_t)count)
395         {
396             _data->threads.push_back (new WorkerThread (_data));
397             _data->numThreads++;
398         }
399     }
400     else if ((size_t)count < _data->numThreads)
401     {
402 	//
403 	// Wait until all existing threads are finished processing,
404 	// then delete all threads.
405 	//
406 
407         _data->finish ();
408 
409 	//
410         // Add in new threads
411 	//
412 
413         while (_data->numThreads < (size_t)count)
414         {
415             _data->threads.push_back (new WorkerThread (_data));
416             _data->numThreads++;
417         }
418     }
419 }
420 
421 
422 void
addTask(Task * task)423 ThreadPool::addTask (Task* task)
424 {
425     //
426     // Lock the threads, needed to access numThreads
427     //
428 
429     Lock lock (_data->threadMutex);
430 
431     if (_data->numThreads == 0)
432     {
433         task->execute ();
434         delete task;
435     }
436     else
437     {
438 	//
439         // Get exclusive access to the tasks queue
440 	//
441 
442         {
443             Lock taskLock (_data->taskMutex);
444 
445 	    //
446             // Push the new task into the FIFO
447 	    //
448 
449             _data->tasks.push_back (task);
450             _data->numTasks++;
451             task->group()->_data->addTask();
452         }
453 
454 	//
455         // Signal that we have a new task to process
456 	//
457 
458         _data->taskSemaphore.post ();
459     }
460 }
461 
462 
463 ThreadPool&
globalThreadPool()464 ThreadPool::globalThreadPool ()
465 {
466     //
467     // The global thread pool
468     //
469 
470     static ThreadPool gThreadPool (0);
471 
472     return gThreadPool;
473 }
474 
475 
476 void
addGlobalTask(Task * task)477 ThreadPool::addGlobalTask (Task* task)
478 {
479     globalThreadPool().addTask (task);
480 }
481 
482 
483 ILMTHREAD_INTERNAL_NAMESPACE_SOURCE_EXIT
484