1 // Copyright 2008-present Contributors to the OpenImageIO project.
2 // SPDX-License-Identifier: BSD-3-Clause
3 // https://github.com/OpenImageIO/oiio/blob/master/LICENSE.md
4 
5 
6 // This implementation of thread_pool is based on CTPL.
7 // We've made a variety of changes (we hope improvements) ourselves to cater
8 // it to our needs.
9 //
10 // The original CTPL is:
11 // https://github.com/vit-vit/CTPL
12 // Copyright (C) 2014 by Vitaliy Vitsentiy
13 // Licensed with Apache 2.0
14 // (see https://github.com/vit-vit/CTPL/blob/master/LICENSE)
15 
16 
17 #if defined(_MSC_VER)
18 #    define _ENABLE_ATOMIC_ALIGNMENT_FIX /* Avoid MSVS error, ugh */
19 #endif
20 
21 #include <exception>
22 #include <functional>
23 #include <future>
24 #include <memory>
25 
26 #include <OpenImageIO/parallel.h>
27 #include <OpenImageIO/strutil.h>
28 #include <OpenImageIO/sysutil.h>
29 #include <OpenImageIO/thread.h>
30 
31 #include <boost/container/flat_map.hpp>
32 
33 #if 0
34 
35 // Use boost::lockfree::queue for the task queue
36 #    include <boost/lockfree/queue.hpp>
37 template<typename T> using Queue = boost::lockfree::queue<T>;
38 
39 #else
40 
41 #    include <queue>
42 
43 namespace {
44 
45 template<typename T> class Queue {
46 public:
Queue(int)47     Queue(int /*size*/) {}
push(T const & value)48     bool push(T const& value)
49     {
50         std::unique_lock<Mutex> lock(this->mutex);
51         this->q.push(value);
52         return true;
53     }
54     // deletes the retrieved element, do not use for non integral types
pop(T & v)55     bool pop(T& v)
56     {
57         std::unique_lock<Mutex> lock(this->mutex);
58         if (this->q.empty())
59             return false;
60         v = this->q.front();
61         this->q.pop();
62         return true;
63     }
empty()64     bool empty()
65     {
66         std::unique_lock<Mutex> lock(this->mutex);
67         return this->q.empty();
68     }
size()69     size_t size()
70     {
71         std::unique_lock<Mutex> lock(this->mutex);
72         return q.size();
73     }
74 
75 private:
76     typedef OIIO::spin_mutex Mutex;
77     std::queue<T> q;
78     Mutex mutex;
79 };
80 
81 }  // namespace
82 
83 #endif
84 
85 
86 OIIO_NAMESPACE_BEGIN
87 
88 static int
threads_default()89 threads_default()
90 {
91     int n = Strutil::from_string<int>(Sysutil::getenv("OPENIMAGEIO_THREADS"));
92     if (n < 1)
93         n = Sysutil::hardware_concurrency();
94     return n;
95 }
96 
97 
98 
99 class thread_pool::Impl {
100 public:
Impl(int nThreads=0,int queueSize=1024)101     Impl(int nThreads = 0, int queueSize = 1024)
102         : q(queueSize)
103     {
104         this->init();
105         this->resize(nThreads);
106     }
107 
108     // the destructor waits for all the functions in the queue to be finished
~Impl()109     ~Impl() { this->stop(true); }
110 
111     // get the number of running threads in the pool
size() const112     int size() const
113     {
114         OIIO_DASSERT(m_size == static_cast<int>(this->threads.size()));
115         return m_size;
116     }
117 
118     // number of idle threads
n_idle() const119     int n_idle() const { return this->nWaiting; }
120 
get_thread(int i)121     std::thread& get_thread(int i) { return *this->threads[i]; }
122 
123     // change the number of threads in the pool
124     // should be called from one thread, otherwise be careful to not interleave, also with this->stop()
125     // nThreads must be >= 0
resize(int nThreads)126     void resize(int nThreads)
127     {
128         if (nThreads < 0)
129             nThreads = std::max(1, int(threads_default()) - 1);
130         if (!this->isStop && !this->isDone) {
131             int oldNThreads = size();
132             if (oldNThreads
133                 <= nThreads) {  // if the number of threads is increased
134                 this->threads.resize(nThreads);
135                 this->flags.resize(nThreads);
136                 for (int i = oldNThreads; i < nThreads; ++i) {
137                     this->flags[i] = std::make_shared<std::atomic<bool>>(false);
138                     this->set_thread(i);
139                 }
140             } else {  // the number of threads is decreased
141                 for (int i = oldNThreads - 1; i >= nThreads; --i) {
142                     *this->flags[i] = true;  // this thread will finish
143                     this->terminating_threads.push_back(
144                         std::move(this->threads[i]));
145                     this->threads.erase(this->threads.begin() + i);
146                 }
147                 {
148                     // stop the detached threads that were waiting
149                     std::unique_lock<std::mutex> lock(this->mutex);
150                     this->cv.notify_all();
151                 }
152                 this->threads.resize(
153                     nThreads);  // safe to delete because the threads are detached
154                 this->flags.resize(
155                     nThreads);  // safe to delete because the threads have copies of shared_ptr of the flags, not originals
156             }
157         }
158         m_size = nThreads;
159     }
160 
161     // empty the queue
clear_queue()162     void clear_queue()
163     {
164         std::function<void(int id)>* _f;
165         while (this->q.pop(_f))
166             delete _f;  // empty the queue
167     }
168 
169     // pops a functional wraper to the original function
pop()170     std::function<void(int)> pop()
171     {
172         std::function<void(int id)>* _f = nullptr;
173         this->q.pop(_f);
174         std::unique_ptr<std::function<void(int id)>> func(
175             _f);  // at return, delete the function even if an exception occurred
176         std::function<void(int)> f;
177         if (_f)
178             f = *_f;
179         return f;
180     }
181 
182 
183     // wait for all computing threads to finish and stop all threads
184     // may be called asyncronously to not pause the calling thread while waiting
185     // if isWait == true, all the functions in the queue are run, otherwise the queue is cleared without running the functions
stop(bool isWait=false)186     void stop(bool isWait = false)
187     {
188         if (!isWait) {
189             if (this->isStop)
190                 return;
191             this->isStop = true;
192             for (int i = 0, n = this->size(); i < n; ++i) {
193                 *this->flags[i] = true;  // command the threads to stop
194             }
195             this->clear_queue();  // empty the queue
196         } else {
197             if (this->isDone || this->isStop)
198                 return;
199             this->isDone = true;  // give the waiting threads a command to finish
200         }
201 
202 #if defined(_WIN32)
203         // When the static variable in default_thread_pool() is destroyed during DLL unloading,
204         // the thread_pool destructor is called but the threads are already terminated.
205         // So it is illegal to communicate with those other threads at this point.
206         // Checking Windows native thread status allows to detect this specific scenario and avoid an unnecessary call
207         // to this->cv.notify_all() which creates a deadlock (noticed only on Windows 7 but still unsafe in other versions).
208         bool hasTerminatedThread
209             = std::any_of(this->threads.begin(), this->threads.end(),
210                           [](std::unique_ptr<std::thread>& t) {
211                               DWORD rcode;
212                               GetExitCodeThread((HANDLE)t->native_handle(),
213                                                 &rcode);
214                               return rcode != STILL_ACTIVE;
215                           });
216 
217         if (!hasTerminatedThread)
218 #endif
219         {
220             std::unique_lock<std::mutex> lock(this->mutex);
221             this->cv.notify_all();  // stop all waiting threads
222         }
223         // wait for the computing threads to finish
224         for (auto& thread : this->threads) {
225             if (thread->joinable())
226                 thread->join();
227         }
228         // wait for the terminated threads to finish
229         for (auto& thread : this->terminating_threads) {
230             if (thread->joinable())
231                 thread->join();
232         }
233         // if there were no threads in the pool but some functors in the queue, the functors are not deleted by the threads
234         // therefore delete them here
235         this->clear_queue();
236         this->threads.clear();
237         this->terminating_threads.clear();
238         this->flags.clear();
239     }
240 
push_queue_and_notify(std::function<void (int id)> * f)241     void push_queue_and_notify(std::function<void(int id)>* f)
242     {
243         this->q.push(f);
244         std::unique_lock<std::mutex> lock(this->mutex);
245         this->cv.notify_one();
246     }
247 
248     // If any tasks are on the queue, pop and run one with the calling
249     // thread.
run_one_task(std::thread::id id)250     bool run_one_task(std::thread::id id)
251     {
252         std::function<void(int)>* f = nullptr;
253         bool isPop                  = this->q.pop(f);
254         if (isPop) {
255             OIIO_DASSERT(f);
256             std::unique_ptr<std::function<void(int id)>> func(
257                 f);  // at return, delete the function even if an exception occurred
258             register_worker(id);
259             (*f)(-1);
260             deregister_worker(id);
261         } else {
262             OIIO_DASSERT(f == nullptr);
263         }
264         return isPop;
265     }
266 
register_worker(std::thread::id id)267     void register_worker(std::thread::id id)
268     {
269         spin_lock lock(m_worker_threadids_mutex);
270         m_worker_threadids[id] += 1;
271     }
deregister_worker(std::thread::id id)272     void deregister_worker(std::thread::id id)
273     {
274         spin_lock lock(m_worker_threadids_mutex);
275         m_worker_threadids[id] -= 1;
276     }
is_worker(std::thread::id id) const277     bool is_worker(std::thread::id id) const
278     {
279         spin_lock lock(m_worker_threadids_mutex);
280         return m_worker_threadids[id] != 0;
281     }
282 
jobs_in_queue() const283     size_t jobs_in_queue() const { return q.size(); }
284 
very_busy() const285     bool very_busy() const { return jobs_in_queue() > size_t(4 * m_size); }
286 
287 private:
288     Impl(const Impl&) = delete;
289     Impl(Impl&&)      = delete;
290     Impl& operator=(const Impl&) = delete;
291     Impl& operator=(Impl&&) = delete;
292 
set_thread(int i)293     void set_thread(int i)
294     {
295         std::shared_ptr<std::atomic<bool>> flag(
296             this->flags[i]);  // a copy of the shared ptr to the flag
297         auto f = [this, i, flag /* a copy of the shared ptr to the flag */]() {
298             register_worker(std::this_thread::get_id());
299             std::atomic<bool>& _flag = *flag;
300             std::function<void(int id)>* _f;
301             bool isPop = this->q.pop(_f);
302             while (true) {
303                 while (isPop) {  // if there is anything in the queue
304                     std::unique_ptr<std::function<void(int id)>> func(
305                         _f);  // at return, delete the function even if an exception occurred
306                     (*_f)(i);
307                     if (_flag) {
308                         // the thread is wanted to stop, return even if the queue is not empty yet
309                         return;
310                     } else {
311                         isPop = this->q.pop(_f);
312                     }
313                 }
314                 // the queue is empty here, wait for the next command
315                 std::unique_lock<std::mutex> lock(this->mutex);
316                 ++this->nWaiting;
317                 this->cv.wait(lock, [this, &_f, &isPop, &_flag]() {
318                     isPop = this->q.pop(_f);
319                     return isPop || this->isDone || _flag;
320                 });
321                 --this->nWaiting;
322                 if (!isPop)
323                     break;  // if the queue is empty and this->isDone == true or *flag then return
324             }
325             deregister_worker(std::this_thread::get_id());
326         };
327         this->threads[i].reset(
328             new std::thread(f));  // compiler may not support std::make_unique()
329     }
330 
init()331     void init()
332     {
333         this->nWaiting = 0;
334         this->isStop   = false;
335         this->isDone   = false;
336     }
337 
338     std::vector<std::unique_ptr<std::thread>> threads;
339     std::vector<std::unique_ptr<std::thread>> terminating_threads;
340     std::vector<std::shared_ptr<std::atomic<bool>>> flags;
341     mutable Queue<std::function<void(int id)>*> q;
342     std::atomic<bool> isDone;
343     std::atomic<bool> isStop;
344     std::atomic<int> nWaiting;  // how many threads are waiting
345     int m_size { 0 };           // Number of threads in the queue
346     std::mutex mutex;
347     std::condition_variable cv;
348     mutable boost::container::flat_map<std::thread::id, int> m_worker_threadids;
349     mutable spin_mutex m_worker_threadids_mutex;
350 };
351 
352 
353 
thread_pool(int nthreads)354 thread_pool::thread_pool(int nthreads)
355     : m_impl(new Impl(nthreads))
356 {
357     resize(nthreads);
358 }
359 
360 
361 
~thread_pool()362 thread_pool::~thread_pool()
363 {
364     // Will implicitly delete the impl
365 }
366 
367 
368 
369 int
size() const370 thread_pool::size() const
371 {
372     return m_impl->size();
373 }
374 
375 
376 
377 void
resize(int nthreads)378 thread_pool::resize(int nthreads)
379 {
380     m_impl->resize(nthreads);
381 }
382 
383 
384 
385 int
idle() const386 thread_pool::idle() const
387 {
388     return m_impl->n_idle();
389 }
390 
391 
392 
393 size_t
jobs_in_queue() const394 thread_pool::jobs_in_queue() const
395 {
396     return m_impl->jobs_in_queue();
397 }
398 
399 
400 
401 bool
run_one_task(std::thread::id id)402 thread_pool::run_one_task(std::thread::id id)
403 {
404     return m_impl->run_one_task(id);
405 }
406 
407 
408 
409 void
push_queue_and_notify(std::function<void (int id)> * f)410 thread_pool::push_queue_and_notify(std::function<void(int id)>* f)
411 {
412     m_impl->push_queue_and_notify(f);
413 }
414 
415 
416 
417 /// DEPRECATED(2.1) -- use is_worker() instead.
418 bool
this_thread_is_in_pool() const419 thread_pool::this_thread_is_in_pool() const
420 {
421     return is_worker();
422 }
423 
424 
425 
426 void
register_worker(std::thread::id id)427 thread_pool::register_worker(std::thread::id id)
428 {
429     m_impl->register_worker(id);
430 }
431 
432 void
deregister_worker(std::thread::id id)433 thread_pool::deregister_worker(std::thread::id id)
434 {
435     m_impl->deregister_worker(id);
436 }
437 
438 bool
is_worker(std::thread::id id) const439 thread_pool::is_worker(std::thread::id id) const
440 {
441     return m_impl->is_worker(id);
442 }
443 
444 
445 // DEPRECATED(2.1)
446 bool
is_worker(std::thread::id id)447 thread_pool::is_worker(std::thread::id id)
448 {
449     return m_impl->is_worker(id);
450 }
451 
452 
453 bool
very_busy() const454 thread_pool::very_busy() const
455 {
456     return m_impl->very_busy();
457 }
458 
459 
460 
461 thread_pool*
default_thread_pool()462 default_thread_pool()
463 {
464     static std::unique_ptr<thread_pool> shared_pool(new thread_pool);
465     return shared_pool.get();
466 }
467 
468 
469 
470 void
wait_for_task(size_t taskindex,bool block)471 task_set::wait_for_task(size_t taskindex, bool block)
472 {
473     OIIO_DASSERT(submitter() == std::this_thread::get_id());
474     if (taskindex >= m_futures.size())
475         return;  // nothing to wait for
476     auto& f(m_futures[taskindex]);
477     if (block || m_pool->is_worker(m_submitter_thread)) {
478         // Block on completion of all the task and don't try to do any
479         // of the work with the calling thread.
480         f.wait();
481         return;
482     }
483     // If we made it here, we want to allow the calling thread to help
484     // do pool work if it's waiting around for a while.
485     const std::chrono::milliseconds wait_time(0);
486     int tries = 0;
487     while (1) {
488         // Asking future.wait_for for 0 time just checks the status.
489         if (f.wait_for(wait_time) == std::future_status::ready)
490             return;  // task has completed
491         // We're still waiting for the task to complete. What next?
492         if (++tries < 4) {  // First few times,
493             pause(4);       //   just busy-wait, check status again
494             continue;
495         }
496         // Since we're waiting, try to run a task ourselves to help
497         // with the load. If none is available, just yield schedule.
498         if (!m_pool->run_one_task(m_submitter_thread)) {
499             // We tried to do a task ourselves, but there weren't any
500             // left, so just wait for the rest to finish.
501             yield();
502         }
503     }
504 }
505 
506 
507 
508 void
wait(bool block)509 task_set::wait(bool block)
510 {
511     OIIO_DASSERT(submitter() == std::this_thread::get_id());
512     const std::chrono::milliseconds wait_time(0);
513     if (m_pool->is_worker(m_submitter_thread))
514         block = true;  // don't get into recursive work stealing
515     if (block == false) {
516         int tries = 0;
517         while (1) {
518             bool all_finished = true;
519             int nfutures = 0, finished = 0;
520             for (auto&& f : m_futures) {
521                 // Asking future.wait_for for 0 time just checks the status.
522                 ++nfutures;
523                 auto status = f.wait_for(wait_time);
524                 if (status != std::future_status::ready)
525                     all_finished = false;
526                 else
527                     ++finished;
528             }
529             if (all_finished)  // All futures are ready? We're done.
530                 break;
531             // We're still waiting on some tasks to complete. What next?
532             if (++tries < 4) {  // First few times,
533                 pause(4);       //   just busy-wait, check status again
534                 continue;
535             }
536             // Since we're waiting, try to run a task ourselves to help
537             // with the load. If none is available, just yield schedule.
538             if (!m_pool->run_one_task(m_submitter_thread)) {
539                 // We tried to do a task ourselves, but there weren't any
540                 // left, so just wait for the rest to finish.
541 #if 1
542                 yield();
543 #else
544                 // FIXME -- as currently written, if we see an empty queue
545                 // but we're still waiting for the tasks in our set to end,
546                 // we will keep looping and potentially ourselves do work
547                 // that was part of another task set. If there a benefit to,
548                 // once we see an empty queue, only waiting for the existing
549                 // tasks to finish and not altruistically executing any more
550                 // tasks?  This is how we would take the exit now:
551                 for (auto&& f : m_futures)
552                     f.wait();
553                 break;
554 #endif
555             }
556         }
557     } else {
558         // If block is true, just block on completion of all the tasks
559         // and don't try to do any of the work with the calling thread.
560         for (auto&& f : m_futures)
561             f.wait();
562     }
563 #ifndef NDEBUG
564     check_done();
565 #endif
566 }
567 
568 
569 
570 // Helper function to keep track of the recursve depth of our use of the
571 // thread pool. Call with the adjustment (i.e., parallel_recursive_depth(1)
572 // to enter, parallel_recursive_depth(-1) to exit), and it will return the
573 // new value. Call with default args (0) to just return the current depth.
574 static int
parallel_recursive_depth(int change=0)575 parallel_recursive_depth(int change = 0)
576 {
577     thread_local int depth = 0;  // let's only allow one level of parallel work
578     depth += change;
579     return depth;
580 }
581 
582 
583 
584 void
parallel_for_chunked(int64_t start,int64_t end,int64_t chunksize,std::function<void (int id,int64_t b,int64_t e)> && task,parallel_options opt)585 parallel_for_chunked(int64_t start, int64_t end, int64_t chunksize,
586                      std::function<void(int id, int64_t b, int64_t e)>&& task,
587                      parallel_options opt)
588 {
589     if (parallel_recursive_depth(1) > 1)
590         opt.maxthreads = 1;
591     opt.resolve();
592     chunksize = std::min(chunksize, end - start);
593     if (chunksize < 1) {           // If caller left chunk size to us...
594         if (opt.singlethread()) {  // Single thread: do it all in one shot
595             chunksize = end - start;
596         } else {  // Multithread: choose a good chunk size
597             int p     = std::max(1, 2 * opt.maxthreads);
598             chunksize = std::max(int64_t(opt.minitems), (end - start) / p);
599         }
600     }
601     // N.B. If chunksize was specified, honor it, even for the single
602     // threaded case.
603     for (task_set ts(opt.pool); start < end; start += chunksize) {
604         int64_t e = std::min(end, start + chunksize);
605         if (e == end || opt.singlethread() || opt.pool->very_busy()) {
606             // For the last (or only) subtask, or if we are using just one
607             // thread, or if the pool is already oversubscribed, do it
608             // ourselves and avoid messing with the queue or handing off
609             // between threads.
610             task(-1, start, e);
611         } else {
612             ts.push(opt.pool->push(task, start, e));
613         }
614     }
615     parallel_recursive_depth(-1);
616 }
617 
618 
619 
620 void
parallel_for_chunked_2D(int64_t xstart,int64_t xend,int64_t xchunksize,int64_t ystart,int64_t yend,int64_t ychunksize,std::function<void (int id,int64_t,int64_t,int64_t,int64_t)> && task,parallel_options opt)621 parallel_for_chunked_2D(
622     int64_t xstart, int64_t xend, int64_t xchunksize, int64_t ystart,
623     int64_t yend, int64_t ychunksize,
624     std::function<void(int id, int64_t, int64_t, int64_t, int64_t)>&& task,
625     parallel_options opt)
626 {
627     if (parallel_recursive_depth(1) > 1)
628         opt.maxthreads = 1;
629     opt.resolve();
630     if (opt.singlethread()
631         || (xchunksize >= (xend - xstart) && ychunksize >= (yend - ystart))
632         || opt.pool->very_busy()) {
633         task(-1, xstart, xend, ystart, yend);
634         parallel_recursive_depth(-1);
635         return;
636     }
637     if (ychunksize < 1)
638         ychunksize = std::max(int64_t(1),
639                               (yend - ystart) / (2 * opt.maxthreads));
640     if (xchunksize < 1) {
641         int64_t ny = std::max(int64_t(1), (yend - ystart) / ychunksize);
642         int64_t nx = std::max(int64_t(1), opt.maxthreads / ny);
643         xchunksize = std::max(int64_t(1), (xend - xstart) / nx);
644     }
645     task_set ts(opt.pool);
646     for (auto y = ystart; y < yend; y += ychunksize) {
647         int64_t ychunkend = std::min(yend, y + ychunksize);
648         for (auto x = xstart; x < xend; x += xchunksize) {
649             int64_t xchunkend = std::min(xend, x + xchunksize);
650             ts.push(opt.pool->push(task, x, xchunkend, y, ychunkend));
651         }
652     }
653     parallel_recursive_depth(-1);
654 }
655 
656 
657 OIIO_NAMESPACE_END
658