1 /*
2   ==============================================================================
3 
4    This file is part of the JUCE library.
5    Copyright (c) 2017 - ROLI Ltd.
6 
7    JUCE is an open source library subject to commercial or open-source
8    licensing.
9 
10    The code included in this file is provided under the terms of the ISC license
11    http://www.isc.org/downloads/software-support-policy/isc-license. Permission
12    To use, copy, modify, and/or distribute this software for any purpose with or
13    without fee is hereby granted provided that the above copyright notice and
14    this permission notice appear in all copies.
15 
16    JUCE IS PROVIDED "AS IS" WITHOUT ANY WARRANTY, AND ALL WARRANTIES, WHETHER
17    EXPRESSED OR IMPLIED, INCLUDING MERCHANTABILITY AND FITNESS FOR PURPOSE, ARE
18    DISCLAIMED.
19 
20   ==============================================================================
21 */
22 
23 namespace juce
24 {
25 
26 struct ThreadPool::ThreadPoolThread  : public Thread
27 {
ThreadPoolThreadjuce::ThreadPool::ThreadPoolThread28     ThreadPoolThread (ThreadPool& p, size_t stackSize)
29        : Thread ("Pool", stackSize), pool (p)
30     {
31     }
32 
runjuce::ThreadPool::ThreadPoolThread33     void run() override
34     {
35         while (! threadShouldExit())
36             if (! pool.runNextJob (*this))
37                 wait (500);
38     }
39 
40     std::atomic<ThreadPoolJob*> currentJob { nullptr };
41     ThreadPool& pool;
42 
43     JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (ThreadPoolThread)
44 };
45 
46 //==============================================================================
ThreadPoolJob(const String & name)47 ThreadPoolJob::ThreadPoolJob (const String& name)  : jobName (name)
48 {
49 }
50 
~ThreadPoolJob()51 ThreadPoolJob::~ThreadPoolJob()
52 {
53     // you mustn't delete a job while it's still in a pool! Use ThreadPool::removeJob()
54     // to remove it first!
55     jassert (pool == nullptr || ! pool->contains (this));
56 }
57 
getJobName() const58 String ThreadPoolJob::getJobName() const
59 {
60     return jobName;
61 }
62 
setJobName(const String & newName)63 void ThreadPoolJob::setJobName (const String& newName)
64 {
65     jobName = newName;
66 }
67 
signalJobShouldExit()68 void ThreadPoolJob::signalJobShouldExit()
69 {
70     shouldStop = true;
71     listeners.call ([] (Thread::Listener& l) { l.exitSignalSent(); });
72 }
73 
addListener(Thread::Listener * listener)74 void ThreadPoolJob::addListener (Thread::Listener* listener)
75 {
76     listeners.add (listener);
77 }
78 
removeListener(Thread::Listener * listener)79 void ThreadPoolJob::removeListener (Thread::Listener* listener)
80 {
81     listeners.remove (listener);
82 }
83 
getCurrentThreadPoolJob()84 ThreadPoolJob* ThreadPoolJob::getCurrentThreadPoolJob()
85 {
86     if (auto* t = dynamic_cast<ThreadPool::ThreadPoolThread*> (Thread::getCurrentThread()))
87         return t->currentJob.load();
88 
89     return nullptr;
90 }
91 
92 //==============================================================================
ThreadPool(int numThreads,size_t threadStackSize)93 ThreadPool::ThreadPool (int numThreads, size_t threadStackSize)
94 {
95     jassert (numThreads > 0); // not much point having a pool without any threads!
96 
97     createThreads (numThreads, threadStackSize);
98 }
99 
ThreadPool()100 ThreadPool::ThreadPool()
101 {
102     createThreads (SystemStats::getNumCpus());
103 }
104 
~ThreadPool()105 ThreadPool::~ThreadPool()
106 {
107     removeAllJobs (true, 5000);
108     stopThreads();
109 }
110 
createThreads(int numThreads,size_t threadStackSize)111 void ThreadPool::createThreads (int numThreads, size_t threadStackSize)
112 {
113     for (int i = jmax (1, numThreads); --i >= 0;)
114         threads.add (new ThreadPoolThread (*this, threadStackSize));
115 
116     for (auto* t : threads)
117         t->startThread();
118 }
119 
stopThreads()120 void ThreadPool::stopThreads()
121 {
122     for (auto* t : threads)
123         t->signalThreadShouldExit();
124 
125     for (auto* t : threads)
126         t->stopThread (500);
127 }
128 
addJob(ThreadPoolJob * job,bool deleteJobWhenFinished)129 void ThreadPool::addJob (ThreadPoolJob* job, bool deleteJobWhenFinished)
130 {
131     jassert (job != nullptr);
132     jassert (job->pool == nullptr);
133 
134     if (job->pool == nullptr)
135     {
136         job->pool = this;
137         job->shouldStop = false;
138         job->isActive = false;
139         job->shouldBeDeleted = deleteJobWhenFinished;
140 
141         {
142             const ScopedLock sl (lock);
143             jobs.add (job);
144         }
145 
146         for (auto* t : threads)
147             t->notify();
148     }
149 }
150 
addJob(std::function<ThreadPoolJob::JobStatus ()> jobToRun)151 void ThreadPool::addJob (std::function<ThreadPoolJob::JobStatus()> jobToRun)
152 {
153     struct LambdaJobWrapper  : public ThreadPoolJob
154     {
155         LambdaJobWrapper (std::function<ThreadPoolJob::JobStatus()> j) : ThreadPoolJob ("lambda"), job (j) {}
156         JobStatus runJob() override      { return job(); }
157 
158         std::function<ThreadPoolJob::JobStatus()> job;
159     };
160 
161     addJob (new LambdaJobWrapper (jobToRun), true);
162 }
163 
addJob(std::function<void ()> jobToRun)164 void ThreadPool::addJob (std::function<void()> jobToRun)
165 {
166     struct LambdaJobWrapper  : public ThreadPoolJob
167     {
168         LambdaJobWrapper (std::function<void()> j) : ThreadPoolJob ("lambda"), job (j) {}
169         JobStatus runJob() override      { job(); return ThreadPoolJob::jobHasFinished; }
170 
171         std::function<void()> job;
172     };
173 
174     addJob (new LambdaJobWrapper (jobToRun), true);
175 }
176 
getNumJobs() const177 int ThreadPool::getNumJobs() const noexcept
178 {
179     const ScopedLock sl (lock);
180     return jobs.size();
181 }
182 
getNumThreads() const183 int ThreadPool::getNumThreads() const noexcept
184 {
185     return threads.size();
186 }
187 
getJob(int index) const188 ThreadPoolJob* ThreadPool::getJob (int index) const noexcept
189 {
190     const ScopedLock sl (lock);
191     return jobs [index];
192 }
193 
contains(const ThreadPoolJob * job) const194 bool ThreadPool::contains (const ThreadPoolJob* job) const noexcept
195 {
196     const ScopedLock sl (lock);
197     return jobs.contains (const_cast<ThreadPoolJob*> (job));
198 }
199 
isJobRunning(const ThreadPoolJob * job) const200 bool ThreadPool::isJobRunning (const ThreadPoolJob* job) const noexcept
201 {
202     const ScopedLock sl (lock);
203     return jobs.contains (const_cast<ThreadPoolJob*> (job)) && job->isActive;
204 }
205 
moveJobToFront(const ThreadPoolJob * job)206 void ThreadPool::moveJobToFront (const ThreadPoolJob* job) noexcept
207 {
208     const ScopedLock sl (lock);
209 
210     auto index = jobs.indexOf (const_cast<ThreadPoolJob*> (job));
211 
212     if (index > 0 && ! job->isActive)
213         jobs.move (index, 0);
214 }
215 
waitForJobToFinish(const ThreadPoolJob * job,int timeOutMs) const216 bool ThreadPool::waitForJobToFinish (const ThreadPoolJob* job, int timeOutMs) const
217 {
218     if (job != nullptr)
219     {
220         auto start = Time::getMillisecondCounter();
221 
222         while (contains (job))
223         {
224             if (timeOutMs >= 0 && Time::getMillisecondCounter() >= start + (uint32) timeOutMs)
225                 return false;
226 
227             jobFinishedSignal.wait (2);
228         }
229     }
230 
231     return true;
232 }
233 
removeJob(ThreadPoolJob * job,bool interruptIfRunning,int timeOutMs)234 bool ThreadPool::removeJob (ThreadPoolJob* job, bool interruptIfRunning, int timeOutMs)
235 {
236     bool dontWait = true;
237     OwnedArray<ThreadPoolJob> deletionList;
238 
239     if (job != nullptr)
240     {
241         const ScopedLock sl (lock);
242 
243         if (jobs.contains (job))
244         {
245             if (job->isActive)
246             {
247                 if (interruptIfRunning)
248                     job->signalJobShouldExit();
249 
250                 dontWait = false;
251             }
252             else
253             {
254                 jobs.removeFirstMatchingValue (job);
255                 addToDeleteList (deletionList, job);
256             }
257         }
258     }
259 
260     return dontWait || waitForJobToFinish (job, timeOutMs);
261 }
262 
removeAllJobs(bool interruptRunningJobs,int timeOutMs,ThreadPool::JobSelector * selectedJobsToRemove)263 bool ThreadPool::removeAllJobs (bool interruptRunningJobs, int timeOutMs,
264                                 ThreadPool::JobSelector* selectedJobsToRemove)
265 {
266     Array<ThreadPoolJob*> jobsToWaitFor;
267 
268     {
269         OwnedArray<ThreadPoolJob> deletionList;
270 
271         {
272             const ScopedLock sl (lock);
273 
274             for (int i = jobs.size(); --i >= 0;)
275             {
276                 auto* job = jobs.getUnchecked(i);
277 
278                 if (selectedJobsToRemove == nullptr || selectedJobsToRemove->isJobSuitable (job))
279                 {
280                     if (job->isActive)
281                     {
282                         jobsToWaitFor.add (job);
283 
284                         if (interruptRunningJobs)
285                             job->signalJobShouldExit();
286                     }
287                     else
288                     {
289                         jobs.remove (i);
290                         addToDeleteList (deletionList, job);
291                     }
292                 }
293             }
294         }
295     }
296 
297     auto start = Time::getMillisecondCounter();
298 
299     for (;;)
300     {
301         for (int i = jobsToWaitFor.size(); --i >= 0;)
302         {
303             auto* job = jobsToWaitFor.getUnchecked (i);
304 
305             if (! isJobRunning (job))
306                 jobsToWaitFor.remove (i);
307         }
308 
309         if (jobsToWaitFor.size() == 0)
310             break;
311 
312         if (timeOutMs >= 0 && Time::getMillisecondCounter() >= start + (uint32) timeOutMs)
313             return false;
314 
315         jobFinishedSignal.wait (20);
316     }
317 
318     return true;
319 }
320 
getNamesOfAllJobs(bool onlyReturnActiveJobs) const321 StringArray ThreadPool::getNamesOfAllJobs (bool onlyReturnActiveJobs) const
322 {
323     StringArray s;
324     const ScopedLock sl (lock);
325 
326     for (auto* job : jobs)
327         if (job->isActive || ! onlyReturnActiveJobs)
328             s.add (job->getJobName());
329 
330     return s;
331 }
332 
setThreadPriorities(int newPriority)333 bool ThreadPool::setThreadPriorities (int newPriority)
334 {
335     bool ok = true;
336 
337     for (auto* t : threads)
338         if (! t->setPriority (newPriority))
339             ok = false;
340 
341     return ok;
342 }
343 
pickNextJobToRun()344 ThreadPoolJob* ThreadPool::pickNextJobToRun()
345 {
346     OwnedArray<ThreadPoolJob> deletionList;
347 
348     {
349         const ScopedLock sl (lock);
350 
351         for (int i = 0; i < jobs.size(); ++i)
352         {
353             if (auto* job = jobs[i])
354             {
355                 if (! job->isActive)
356                 {
357                     if (job->shouldStop)
358                     {
359                         jobs.remove (i);
360                         addToDeleteList (deletionList, job);
361                         --i;
362                         continue;
363                     }
364 
365                     job->isActive = true;
366                     return job;
367                 }
368             }
369         }
370     }
371 
372     return nullptr;
373 }
374 
runNextJob(ThreadPoolThread & thread)375 bool ThreadPool::runNextJob (ThreadPoolThread& thread)
376 {
377     if (auto* job = pickNextJobToRun())
378     {
379         auto result = ThreadPoolJob::jobHasFinished;
380         thread.currentJob = job;
381 
382         try
383         {
384             result = job->runJob();
385         }
386         catch (...)
387         {
388             jassertfalse; // Your runJob() method mustn't throw any exceptions!
389         }
390 
391         thread.currentJob = nullptr;
392 
393         OwnedArray<ThreadPoolJob> deletionList;
394 
395         {
396             const ScopedLock sl (lock);
397 
398             if (jobs.contains (job))
399             {
400                 job->isActive = false;
401 
402                 if (result != ThreadPoolJob::jobNeedsRunningAgain || job->shouldStop)
403                 {
404                     jobs.removeFirstMatchingValue (job);
405                     addToDeleteList (deletionList, job);
406 
407                     jobFinishedSignal.signal();
408                 }
409                 else
410                 {
411                     // move the job to the end of the queue if it wants another go
412                     jobs.move (jobs.indexOf (job), -1);
413                 }
414             }
415         }
416 
417         return true;
418     }
419 
420     return false;
421 }
422 
addToDeleteList(OwnedArray<ThreadPoolJob> & deletionList,ThreadPoolJob * job) const423 void ThreadPool::addToDeleteList (OwnedArray<ThreadPoolJob>& deletionList, ThreadPoolJob* job) const
424 {
425     job->shouldStop = true;
426     job->pool = nullptr;
427 
428     if (job->shouldBeDeleted)
429         deletionList.add (job);
430 }
431 
432 } // namespace juce
433