1 // Copyright 2008-present Contributors to the OpenImageIO project.
2 // SPDX-License-Identifier: BSD-3-Clause
3 // https://github.com/OpenImageIO/oiio/blob/master/LICENSE.md
4
5
6 // This implementation of thread_pool is based on CTPL.
7 // We've made a variety of changes (we hope improvements) ourselves to cater
8 // it to our needs.
9 //
10 // The original CTPL is:
11 // https://github.com/vit-vit/CTPL
12 // Copyright (C) 2014 by Vitaliy Vitsentiy
13 // Licensed with Apache 2.0
14 // (see https://github.com/vit-vit/CTPL/blob/master/LICENSE)
15
16
17 #if defined(_MSC_VER)
18 # define _ENABLE_ATOMIC_ALIGNMENT_FIX /* Avoid MSVS error, ugh */
19 #endif
20
21 #include <exception>
22 #include <functional>
23 #include <future>
24 #include <memory>
25
26 #include <OpenImageIO/parallel.h>
27 #include <OpenImageIO/strutil.h>
28 #include <OpenImageIO/sysutil.h>
29 #include <OpenImageIO/thread.h>
30
31 #include <boost/container/flat_map.hpp>
32
33 #if 0
34
35 // Use boost::lockfree::queue for the task queue
36 # include <boost/lockfree/queue.hpp>
37 template<typename T> using Queue = boost::lockfree::queue<T>;
38
39 #else
40
41 # include <queue>
42
43 namespace {
44
45 template<typename T> class Queue {
46 public:
Queue(int)47 Queue(int /*size*/) {}
push(T const & value)48 bool push(T const& value)
49 {
50 std::unique_lock<Mutex> lock(this->mutex);
51 this->q.push(value);
52 return true;
53 }
54 // deletes the retrieved element, do not use for non integral types
pop(T & v)55 bool pop(T& v)
56 {
57 std::unique_lock<Mutex> lock(this->mutex);
58 if (this->q.empty())
59 return false;
60 v = this->q.front();
61 this->q.pop();
62 return true;
63 }
empty()64 bool empty()
65 {
66 std::unique_lock<Mutex> lock(this->mutex);
67 return this->q.empty();
68 }
size()69 size_t size()
70 {
71 std::unique_lock<Mutex> lock(this->mutex);
72 return q.size();
73 }
74
75 private:
76 typedef OIIO::spin_mutex Mutex;
77 std::queue<T> q;
78 Mutex mutex;
79 };
80
81 } // namespace
82
83 #endif
84
85
86 OIIO_NAMESPACE_BEGIN
87
88 static int
threads_default()89 threads_default()
90 {
91 int n = Strutil::from_string<int>(Sysutil::getenv("OPENIMAGEIO_THREADS"));
92 if (n < 1)
93 n = Sysutil::hardware_concurrency();
94 return n;
95 }
96
97
98
99 class thread_pool::Impl {
100 public:
Impl(int nThreads=0,int queueSize=1024)101 Impl(int nThreads = 0, int queueSize = 1024)
102 : q(queueSize)
103 {
104 this->init();
105 this->resize(nThreads);
106 }
107
108 // the destructor waits for all the functions in the queue to be finished
~Impl()109 ~Impl() { this->stop(true); }
110
111 // get the number of running threads in the pool
size() const112 int size() const
113 {
114 OIIO_DASSERT(m_size == static_cast<int>(this->threads.size()));
115 return m_size;
116 }
117
118 // number of idle threads
n_idle() const119 int n_idle() const { return this->nWaiting; }
120
get_thread(int i)121 std::thread& get_thread(int i) { return *this->threads[i]; }
122
123 // change the number of threads in the pool
124 // should be called from one thread, otherwise be careful to not interleave, also with this->stop()
125 // nThreads must be >= 0
resize(int nThreads)126 void resize(int nThreads)
127 {
128 if (nThreads < 0)
129 nThreads = std::max(1, int(threads_default()) - 1);
130 if (!this->isStop && !this->isDone) {
131 int oldNThreads = size();
132 if (oldNThreads
133 <= nThreads) { // if the number of threads is increased
134 this->threads.resize(nThreads);
135 this->flags.resize(nThreads);
136 for (int i = oldNThreads; i < nThreads; ++i) {
137 this->flags[i] = std::make_shared<std::atomic<bool>>(false);
138 this->set_thread(i);
139 }
140 } else { // the number of threads is decreased
141 for (int i = oldNThreads - 1; i >= nThreads; --i) {
142 *this->flags[i] = true; // this thread will finish
143 this->terminating_threads.push_back(
144 std::move(this->threads[i]));
145 this->threads.erase(this->threads.begin() + i);
146 }
147 {
148 // stop the detached threads that were waiting
149 std::unique_lock<std::mutex> lock(this->mutex);
150 this->cv.notify_all();
151 }
152 this->threads.resize(
153 nThreads); // safe to delete because the threads are detached
154 this->flags.resize(
155 nThreads); // safe to delete because the threads have copies of shared_ptr of the flags, not originals
156 }
157 }
158 m_size = nThreads;
159 }
160
161 // empty the queue
clear_queue()162 void clear_queue()
163 {
164 std::function<void(int id)>* _f;
165 while (this->q.pop(_f))
166 delete _f; // empty the queue
167 }
168
169 // pops a functional wraper to the original function
pop()170 std::function<void(int)> pop()
171 {
172 std::function<void(int id)>* _f = nullptr;
173 this->q.pop(_f);
174 std::unique_ptr<std::function<void(int id)>> func(
175 _f); // at return, delete the function even if an exception occurred
176 std::function<void(int)> f;
177 if (_f)
178 f = *_f;
179 return f;
180 }
181
182
183 // wait for all computing threads to finish and stop all threads
184 // may be called asyncronously to not pause the calling thread while waiting
185 // if isWait == true, all the functions in the queue are run, otherwise the queue is cleared without running the functions
stop(bool isWait=false)186 void stop(bool isWait = false)
187 {
188 if (!isWait) {
189 if (this->isStop)
190 return;
191 this->isStop = true;
192 for (int i = 0, n = this->size(); i < n; ++i) {
193 *this->flags[i] = true; // command the threads to stop
194 }
195 this->clear_queue(); // empty the queue
196 } else {
197 if (this->isDone || this->isStop)
198 return;
199 this->isDone = true; // give the waiting threads a command to finish
200 }
201
202 #if defined(_WIN32)
203 // When the static variable in default_thread_pool() is destroyed during DLL unloading,
204 // the thread_pool destructor is called but the threads are already terminated.
205 // So it is illegal to communicate with those other threads at this point.
206 // Checking Windows native thread status allows to detect this specific scenario and avoid an unnecessary call
207 // to this->cv.notify_all() which creates a deadlock (noticed only on Windows 7 but still unsafe in other versions).
208 bool hasTerminatedThread
209 = std::any_of(this->threads.begin(), this->threads.end(),
210 [](std::unique_ptr<std::thread>& t) {
211 DWORD rcode;
212 GetExitCodeThread((HANDLE)t->native_handle(),
213 &rcode);
214 return rcode != STILL_ACTIVE;
215 });
216
217 if (!hasTerminatedThread)
218 #endif
219 {
220 std::unique_lock<std::mutex> lock(this->mutex);
221 this->cv.notify_all(); // stop all waiting threads
222 }
223 // wait for the computing threads to finish
224 for (auto& thread : this->threads) {
225 if (thread->joinable())
226 thread->join();
227 }
228 // wait for the terminated threads to finish
229 for (auto& thread : this->terminating_threads) {
230 if (thread->joinable())
231 thread->join();
232 }
233 // if there were no threads in the pool but some functors in the queue, the functors are not deleted by the threads
234 // therefore delete them here
235 this->clear_queue();
236 this->threads.clear();
237 this->terminating_threads.clear();
238 this->flags.clear();
239 }
240
push_queue_and_notify(std::function<void (int id)> * f)241 void push_queue_and_notify(std::function<void(int id)>* f)
242 {
243 this->q.push(f);
244 std::unique_lock<std::mutex> lock(this->mutex);
245 this->cv.notify_one();
246 }
247
248 // If any tasks are on the queue, pop and run one with the calling
249 // thread.
run_one_task(std::thread::id id)250 bool run_one_task(std::thread::id id)
251 {
252 std::function<void(int)>* f = nullptr;
253 bool isPop = this->q.pop(f);
254 if (isPop) {
255 OIIO_DASSERT(f);
256 std::unique_ptr<std::function<void(int id)>> func(
257 f); // at return, delete the function even if an exception occurred
258 register_worker(id);
259 (*f)(-1);
260 deregister_worker(id);
261 } else {
262 OIIO_DASSERT(f == nullptr);
263 }
264 return isPop;
265 }
266
register_worker(std::thread::id id)267 void register_worker(std::thread::id id)
268 {
269 spin_lock lock(m_worker_threadids_mutex);
270 m_worker_threadids[id] += 1;
271 }
deregister_worker(std::thread::id id)272 void deregister_worker(std::thread::id id)
273 {
274 spin_lock lock(m_worker_threadids_mutex);
275 m_worker_threadids[id] -= 1;
276 }
is_worker(std::thread::id id) const277 bool is_worker(std::thread::id id) const
278 {
279 spin_lock lock(m_worker_threadids_mutex);
280 return m_worker_threadids[id] != 0;
281 }
282
jobs_in_queue() const283 size_t jobs_in_queue() const { return q.size(); }
284
very_busy() const285 bool very_busy() const { return jobs_in_queue() > size_t(4 * m_size); }
286
287 private:
288 Impl(const Impl&) = delete;
289 Impl(Impl&&) = delete;
290 Impl& operator=(const Impl&) = delete;
291 Impl& operator=(Impl&&) = delete;
292
set_thread(int i)293 void set_thread(int i)
294 {
295 std::shared_ptr<std::atomic<bool>> flag(
296 this->flags[i]); // a copy of the shared ptr to the flag
297 auto f = [this, i, flag /* a copy of the shared ptr to the flag */]() {
298 register_worker(std::this_thread::get_id());
299 std::atomic<bool>& _flag = *flag;
300 std::function<void(int id)>* _f;
301 bool isPop = this->q.pop(_f);
302 while (true) {
303 while (isPop) { // if there is anything in the queue
304 std::unique_ptr<std::function<void(int id)>> func(
305 _f); // at return, delete the function even if an exception occurred
306 (*_f)(i);
307 if (_flag) {
308 // the thread is wanted to stop, return even if the queue is not empty yet
309 return;
310 } else {
311 isPop = this->q.pop(_f);
312 }
313 }
314 // the queue is empty here, wait for the next command
315 std::unique_lock<std::mutex> lock(this->mutex);
316 ++this->nWaiting;
317 this->cv.wait(lock, [this, &_f, &isPop, &_flag]() {
318 isPop = this->q.pop(_f);
319 return isPop || this->isDone || _flag;
320 });
321 --this->nWaiting;
322 if (!isPop)
323 break; // if the queue is empty and this->isDone == true or *flag then return
324 }
325 deregister_worker(std::this_thread::get_id());
326 };
327 this->threads[i].reset(
328 new std::thread(f)); // compiler may not support std::make_unique()
329 }
330
init()331 void init()
332 {
333 this->nWaiting = 0;
334 this->isStop = false;
335 this->isDone = false;
336 }
337
338 std::vector<std::unique_ptr<std::thread>> threads;
339 std::vector<std::unique_ptr<std::thread>> terminating_threads;
340 std::vector<std::shared_ptr<std::atomic<bool>>> flags;
341 mutable Queue<std::function<void(int id)>*> q;
342 std::atomic<bool> isDone;
343 std::atomic<bool> isStop;
344 std::atomic<int> nWaiting; // how many threads are waiting
345 int m_size { 0 }; // Number of threads in the queue
346 std::mutex mutex;
347 std::condition_variable cv;
348 mutable boost::container::flat_map<std::thread::id, int> m_worker_threadids;
349 mutable spin_mutex m_worker_threadids_mutex;
350 };
351
352
353
thread_pool(int nthreads)354 thread_pool::thread_pool(int nthreads)
355 : m_impl(new Impl(nthreads))
356 {
357 resize(nthreads);
358 }
359
360
361
~thread_pool()362 thread_pool::~thread_pool()
363 {
364 // Will implicitly delete the impl
365 }
366
367
368
369 int
size() const370 thread_pool::size() const
371 {
372 return m_impl->size();
373 }
374
375
376
377 void
resize(int nthreads)378 thread_pool::resize(int nthreads)
379 {
380 m_impl->resize(nthreads);
381 }
382
383
384
385 int
idle() const386 thread_pool::idle() const
387 {
388 return m_impl->n_idle();
389 }
390
391
392
393 size_t
jobs_in_queue() const394 thread_pool::jobs_in_queue() const
395 {
396 return m_impl->jobs_in_queue();
397 }
398
399
400
401 bool
run_one_task(std::thread::id id)402 thread_pool::run_one_task(std::thread::id id)
403 {
404 return m_impl->run_one_task(id);
405 }
406
407
408
409 void
push_queue_and_notify(std::function<void (int id)> * f)410 thread_pool::push_queue_and_notify(std::function<void(int id)>* f)
411 {
412 m_impl->push_queue_and_notify(f);
413 }
414
415
416
417 /// DEPRECATED(2.1) -- use is_worker() instead.
418 bool
this_thread_is_in_pool() const419 thread_pool::this_thread_is_in_pool() const
420 {
421 return is_worker();
422 }
423
424
425
426 void
register_worker(std::thread::id id)427 thread_pool::register_worker(std::thread::id id)
428 {
429 m_impl->register_worker(id);
430 }
431
432 void
deregister_worker(std::thread::id id)433 thread_pool::deregister_worker(std::thread::id id)
434 {
435 m_impl->deregister_worker(id);
436 }
437
438 bool
is_worker(std::thread::id id) const439 thread_pool::is_worker(std::thread::id id) const
440 {
441 return m_impl->is_worker(id);
442 }
443
444
445 // DEPRECATED(2.1)
446 bool
is_worker(std::thread::id id)447 thread_pool::is_worker(std::thread::id id)
448 {
449 return m_impl->is_worker(id);
450 }
451
452
453 bool
very_busy() const454 thread_pool::very_busy() const
455 {
456 return m_impl->very_busy();
457 }
458
459
460
461 thread_pool*
default_thread_pool()462 default_thread_pool()
463 {
464 static std::unique_ptr<thread_pool> shared_pool(new thread_pool);
465 return shared_pool.get();
466 }
467
468
469
470 void
wait_for_task(size_t taskindex,bool block)471 task_set::wait_for_task(size_t taskindex, bool block)
472 {
473 OIIO_DASSERT(submitter() == std::this_thread::get_id());
474 if (taskindex >= m_futures.size())
475 return; // nothing to wait for
476 auto& f(m_futures[taskindex]);
477 if (block || m_pool->is_worker(m_submitter_thread)) {
478 // Block on completion of all the task and don't try to do any
479 // of the work with the calling thread.
480 f.wait();
481 return;
482 }
483 // If we made it here, we want to allow the calling thread to help
484 // do pool work if it's waiting around for a while.
485 const std::chrono::milliseconds wait_time(0);
486 int tries = 0;
487 while (1) {
488 // Asking future.wait_for for 0 time just checks the status.
489 if (f.wait_for(wait_time) == std::future_status::ready)
490 return; // task has completed
491 // We're still waiting for the task to complete. What next?
492 if (++tries < 4) { // First few times,
493 pause(4); // just busy-wait, check status again
494 continue;
495 }
496 // Since we're waiting, try to run a task ourselves to help
497 // with the load. If none is available, just yield schedule.
498 if (!m_pool->run_one_task(m_submitter_thread)) {
499 // We tried to do a task ourselves, but there weren't any
500 // left, so just wait for the rest to finish.
501 yield();
502 }
503 }
504 }
505
506
507
508 void
wait(bool block)509 task_set::wait(bool block)
510 {
511 OIIO_DASSERT(submitter() == std::this_thread::get_id());
512 const std::chrono::milliseconds wait_time(0);
513 if (m_pool->is_worker(m_submitter_thread))
514 block = true; // don't get into recursive work stealing
515 if (block == false) {
516 int tries = 0;
517 while (1) {
518 bool all_finished = true;
519 int nfutures = 0, finished = 0;
520 for (auto&& f : m_futures) {
521 // Asking future.wait_for for 0 time just checks the status.
522 ++nfutures;
523 auto status = f.wait_for(wait_time);
524 if (status != std::future_status::ready)
525 all_finished = false;
526 else
527 ++finished;
528 }
529 if (all_finished) // All futures are ready? We're done.
530 break;
531 // We're still waiting on some tasks to complete. What next?
532 if (++tries < 4) { // First few times,
533 pause(4); // just busy-wait, check status again
534 continue;
535 }
536 // Since we're waiting, try to run a task ourselves to help
537 // with the load. If none is available, just yield schedule.
538 if (!m_pool->run_one_task(m_submitter_thread)) {
539 // We tried to do a task ourselves, but there weren't any
540 // left, so just wait for the rest to finish.
541 #if 1
542 yield();
543 #else
544 // FIXME -- as currently written, if we see an empty queue
545 // but we're still waiting for the tasks in our set to end,
546 // we will keep looping and potentially ourselves do work
547 // that was part of another task set. If there a benefit to,
548 // once we see an empty queue, only waiting for the existing
549 // tasks to finish and not altruistically executing any more
550 // tasks? This is how we would take the exit now:
551 for (auto&& f : m_futures)
552 f.wait();
553 break;
554 #endif
555 }
556 }
557 } else {
558 // If block is true, just block on completion of all the tasks
559 // and don't try to do any of the work with the calling thread.
560 for (auto&& f : m_futures)
561 f.wait();
562 }
563 #ifndef NDEBUG
564 check_done();
565 #endif
566 }
567
568
569
570 // Helper function to keep track of the recursve depth of our use of the
571 // thread pool. Call with the adjustment (i.e., parallel_recursive_depth(1)
572 // to enter, parallel_recursive_depth(-1) to exit), and it will return the
573 // new value. Call with default args (0) to just return the current depth.
574 static int
parallel_recursive_depth(int change=0)575 parallel_recursive_depth(int change = 0)
576 {
577 thread_local int depth = 0; // let's only allow one level of parallel work
578 depth += change;
579 return depth;
580 }
581
582
583
584 void
parallel_for_chunked(int64_t start,int64_t end,int64_t chunksize,std::function<void (int id,int64_t b,int64_t e)> && task,parallel_options opt)585 parallel_for_chunked(int64_t start, int64_t end, int64_t chunksize,
586 std::function<void(int id, int64_t b, int64_t e)>&& task,
587 parallel_options opt)
588 {
589 if (parallel_recursive_depth(1) > 1)
590 opt.maxthreads = 1;
591 opt.resolve();
592 chunksize = std::min(chunksize, end - start);
593 if (chunksize < 1) { // If caller left chunk size to us...
594 if (opt.singlethread()) { // Single thread: do it all in one shot
595 chunksize = end - start;
596 } else { // Multithread: choose a good chunk size
597 int p = std::max(1, 2 * opt.maxthreads);
598 chunksize = std::max(int64_t(opt.minitems), (end - start) / p);
599 }
600 }
601 // N.B. If chunksize was specified, honor it, even for the single
602 // threaded case.
603 for (task_set ts(opt.pool); start < end; start += chunksize) {
604 int64_t e = std::min(end, start + chunksize);
605 if (e == end || opt.singlethread() || opt.pool->very_busy()) {
606 // For the last (or only) subtask, or if we are using just one
607 // thread, or if the pool is already oversubscribed, do it
608 // ourselves and avoid messing with the queue or handing off
609 // between threads.
610 task(-1, start, e);
611 } else {
612 ts.push(opt.pool->push(task, start, e));
613 }
614 }
615 parallel_recursive_depth(-1);
616 }
617
618
619
620 void
parallel_for_chunked_2D(int64_t xstart,int64_t xend,int64_t xchunksize,int64_t ystart,int64_t yend,int64_t ychunksize,std::function<void (int id,int64_t,int64_t,int64_t,int64_t)> && task,parallel_options opt)621 parallel_for_chunked_2D(
622 int64_t xstart, int64_t xend, int64_t xchunksize, int64_t ystart,
623 int64_t yend, int64_t ychunksize,
624 std::function<void(int id, int64_t, int64_t, int64_t, int64_t)>&& task,
625 parallel_options opt)
626 {
627 if (parallel_recursive_depth(1) > 1)
628 opt.maxthreads = 1;
629 opt.resolve();
630 if (opt.singlethread()
631 || (xchunksize >= (xend - xstart) && ychunksize >= (yend - ystart))
632 || opt.pool->very_busy()) {
633 task(-1, xstart, xend, ystart, yend);
634 parallel_recursive_depth(-1);
635 return;
636 }
637 if (ychunksize < 1)
638 ychunksize = std::max(int64_t(1),
639 (yend - ystart) / (2 * opt.maxthreads));
640 if (xchunksize < 1) {
641 int64_t ny = std::max(int64_t(1), (yend - ystart) / ychunksize);
642 int64_t nx = std::max(int64_t(1), opt.maxthreads / ny);
643 xchunksize = std::max(int64_t(1), (xend - xstart) / nx);
644 }
645 task_set ts(opt.pool);
646 for (auto y = ystart; y < yend; y += ychunksize) {
647 int64_t ychunkend = std::min(yend, y + ychunksize);
648 for (auto x = xstart; x < xend; x += xchunksize) {
649 int64_t xchunkend = std::min(xend, x + xchunksize);
650 ts.push(opt.pool->push(task, x, xchunkend, y, ychunkend));
651 }
652 }
653 parallel_recursive_depth(-1);
654 }
655
656
657 OIIO_NAMESPACE_END
658