1 /*M///////////////////////////////////////////////////////////////////////////////////////
2 //
3 // IMPORTANT: READ BEFORE DOWNLOADING, COPYING, INSTALLING OR USING.
4 //
5 // By downloading, copying, installing or using the software you agree to this license.
6 // If you do not agree to this license, do not download, install,
7 // copy or use the software.
8 //
9 //
10 // License Agreement
11 // For Open Source Computer Vision Library
12 //
13 // Copyright (C) 2000-2008, Intel Corporation, all rights reserved.
14 // Copyright (C) 2009-2011, Willow Garage Inc., all rights reserved.
15 // Third party copyrights are property of their respective owners.
16 //
17 // Redistribution and use in source and binary forms, with or without modification,
18 // are permitted provided that the following conditions are met:
19 //
20 // * Redistribution's of source code must retain the above copyright notice,
21 // this list of conditions and the following disclaimer.
22 //
23 // * Redistribution's in binary form must reproduce the above copyright notice,
24 // this list of conditions and the following disclaimer in the documentation
25 // and/or other materials provided with the distribution.
26 //
27 // * The name of the copyright holders may not be used to endorse or promote products
28 // derived from this software without specific prior written permission.
29 //
30 // This software is provided by the copyright holders and contributors "as is" and
31 // any express or implied warranties, including, but not limited to, the implied
32 // warranties of merchantability and fitness for a particular purpose are disclaimed.
33 // In no event shall the Intel Corporation or contributors be liable for any direct,
34 // indirect, incidental, special, exemplary, or consequential damages
35 // (including, but not limited to, procurement of substitute goods or services;
36 // loss of use, data, or profits; or business interruption) however caused
37 // and on any theory of liability, whether in contract, strict liability,
38 // or tort (including negligence or otherwise) arising in any way out of
39 // the use of this software, even if advised of the possibility of such damage.
40 //
41 //M*/
42
43 #include "precomp.hpp"
44
45 #include <opencv2/core/utils/configuration.private.hpp>
46 #include <opencv2/core/utils/trace.private.hpp>
47
48 #include "opencv2/core/parallel/parallel_backend.hpp"
49 #include "parallel/parallel.hpp"
50
51 #if defined _WIN32 || defined WINCE
52 #include <windows.h>
53 #undef small
54 #undef min
55 #undef max
56 #undef abs
57 #endif
58
59 #if defined __linux__ || defined __APPLE__ || defined __GLIBC__ \
60 || defined __HAIKU__ || defined __EMSCRIPTEN__ || defined __DragonFly__ \
61 || defined __OpenBSD__
62 #include <unistd.h>
63 #include <stdio.h>
64 #include <sys/types.h>
65 #include <fstream>
66 #if defined __ANDROID__
67 #include <sys/sysconf.h>
68 #include <sys/syscall.h>
69 #include <sched.h>
70 #elif defined __APPLE__
71 #include <sys/sysctl.h>
72 #endif
73 #endif
74
75 #if defined CV_CXX11
76 #include <thread>
77 #endif
78
79 #ifdef _OPENMP
80 #define HAVE_OPENMP
81 #endif
82
83 #ifdef __APPLE__
84 #define HAVE_GCD
85 #endif
86
87 #if defined _MSC_VER && _MSC_VER >= 1600
88 #define HAVE_CONCURRENCY
89 #endif
90
91 /* IMPORTANT: always use the same order of defines
92 - HAVE_TBB - 3rdparty library, should be explicitly enabled
93 - HAVE_HPX - 3rdparty library, should be explicitly enabled
94 - HAVE_OPENMP - integrated to compiler, should be explicitly enabled
95 - HAVE_GCD - system wide, used automatically (APPLE only)
96 - WINRT - system wide, used automatically (Windows RT only)
97 - HAVE_CONCURRENCY - part of runtime, used automatically (Windows only - MSVS 10, MSVS 11)
98 - HAVE_PTHREADS_PF - pthreads if available
99 */
100
101 #if defined HAVE_TBB
102 #ifndef TBB_SUPPRESS_DEPRECATED_MESSAGES // supress warning
103 #define TBB_SUPPRESS_DEPRECATED_MESSAGES 1
104 #endif
105 #include "tbb/tbb.h"
106 #include "tbb/task.h"
107 #if TBB_INTERFACE_VERSION >= 8000
108 #include "tbb/task_arena.h"
109 #endif
110 #undef min
111 #undef max
112 #elif defined HAVE_HPX
113 #include <hpx/parallel/algorithms/for_loop.hpp>
114 #include <hpx/parallel/execution.hpp>
115 //
116 #include <hpx/hpx_start.hpp>
117 #include <hpx/hpx_suspend.hpp>
118 #include <hpx/include/apply.hpp>
119 #include <hpx/util/yield_while.hpp>
120 #include <hpx/include/threadmanager.hpp>
121
122 #elif defined HAVE_OPENMP
123 #include <omp.h>
124 #elif defined HAVE_GCD
125 #include <dispatch/dispatch.h>
126 #include <pthread.h>
127 #elif defined WINRT && _MSC_VER < 1900
128 #include <ppltasks.h>
129 #elif defined HAVE_CONCURRENCY
130 #include <ppl.h>
131 #endif
132
133
134 #if defined HAVE_TBB
135 # define CV_PARALLEL_FRAMEWORK "tbb"
136 #elif defined HAVE_HPX
137 # define CV_PARALLEL_FRAMEWORK "hpx"
138 #elif defined HAVE_OPENMP
139 # define CV_PARALLEL_FRAMEWORK "openmp"
140 #elif defined HAVE_GCD
141 # define CV_PARALLEL_FRAMEWORK "gcd"
142 #elif defined WINRT
143 # define CV_PARALLEL_FRAMEWORK "winrt-concurrency"
144 #elif defined HAVE_CONCURRENCY
145 # define CV_PARALLEL_FRAMEWORK "ms-concurrency"
146 #elif defined HAVE_PTHREADS_PF
147 # define CV_PARALLEL_FRAMEWORK "pthreads"
148 #endif
149
150 #include <atomic>
151
152 #include "parallel_impl.hpp"
153
154 #include "opencv2/core/detail/exception_ptr.hpp" // CV__EXCEPTION_PTR = 1 if std::exception_ptr is available
155
156 using namespace cv;
157
158 namespace cv {
159
~ParallelLoopBody()160 ParallelLoopBody::~ParallelLoopBody() {}
161
162 using namespace cv::parallel;
163
164 namespace {
165
166 #ifdef ENABLE_INSTRUMENTATION
SyncNodes(cv::instr::InstrNode * pNode)167 static void SyncNodes(cv::instr::InstrNode *pNode)
168 {
169 std::vector<cv::instr::NodeDataTls*> data;
170 pNode->m_payload.m_tls.gather(data);
171
172 uint64 ticksMax = 0;
173 int threads = 0;
174 for(size_t i = 0; i < data.size(); i++)
175 {
176 if(data[i] && data[i]->m_ticksTotal)
177 {
178 ticksMax = MAX(ticksMax, data[i]->m_ticksTotal);
179 pNode->m_payload.m_ticksTotal -= data[i]->m_ticksTotal;
180 data[i]->m_ticksTotal = 0;
181 threads++;
182 }
183 }
184 pNode->m_payload.m_ticksTotal += ticksMax;
185 pNode->m_payload.m_threads = MAX(pNode->m_payload.m_threads, threads);
186
187 for(size_t i = 0; i < pNode->m_childs.size(); i++)
188 SyncNodes(pNode->m_childs[i]);
189 }
190 #endif
191
192 class ParallelLoopBodyWrapperContext
193 {
194 public:
ParallelLoopBodyWrapperContext(const cv::ParallelLoopBody & _body,const cv::Range & _r,double _nstripes)195 ParallelLoopBodyWrapperContext(const cv::ParallelLoopBody& _body, const cv::Range& _r, double _nstripes) :
196 is_rng_used(false), hasException(false)
197 {
198
199 body = &_body;
200 wholeRange = _r;
201 double len = wholeRange.end - wholeRange.start;
202 nstripes = cvRound(_nstripes <= 0 ? len : MIN(MAX(_nstripes, 1.), len));
203
204 // propagate main thread state
205 rng = cv::theRNG();
206
207 #ifdef OPENCV_TRACE
208 traceRootRegion = CV_TRACE_NS::details::getCurrentRegion();
209 traceRootContext = CV_TRACE_NS::details::getTraceManager().tls.get();
210 #endif
211
212 #ifdef ENABLE_INSTRUMENTATION
213 pThreadRoot = cv::instr::getInstrumentTLSStruct().pCurrentNode;
214 #endif
215 }
finalize()216 void finalize()
217 {
218 #ifdef ENABLE_INSTRUMENTATION
219 for(size_t i = 0; i < pThreadRoot->m_childs.size(); i++)
220 SyncNodes(pThreadRoot->m_childs[i]);
221 #endif
222 if (is_rng_used)
223 {
224 // Some parallel backends execute nested jobs in the main thread,
225 // so we need to restore initial RNG state here.
226 cv::theRNG() = rng;
227 // We can't properly update RNG state based on RNG usage in worker threads,
228 // so lets just change main thread RNG state to the next value.
229 // Note: this behaviour is not equal to single-threaded mode.
230 cv::theRNG().next();
231 }
232 #ifdef OPENCV_TRACE
233 if (traceRootRegion)
234 CV_TRACE_NS::details::parallelForFinalize(*traceRootRegion);
235 #endif
236
237 if (hasException)
238 {
239 #if CV__EXCEPTION_PTR
240 std::rethrow_exception(pException);
241 #else
242 CV_Error(Error::StsError, "Exception in parallel_for() body: " + exception_message);
243 #endif
244 }
245 }
~ParallelLoopBodyWrapperContext()246 ~ParallelLoopBodyWrapperContext() {}
247
248 const cv::ParallelLoopBody* body;
249 cv::Range wholeRange;
250 int nstripes;
251 cv::RNG rng;
252 mutable bool is_rng_used;
253 #ifdef OPENCV_TRACE
254 CV_TRACE_NS::details::Region* traceRootRegion;
255 CV_TRACE_NS::details::TraceManagerThreadLocal* traceRootContext;
256 #endif
257 #ifdef ENABLE_INSTRUMENTATION
258 cv::instr::InstrNode *pThreadRoot;
259 #endif
260 bool hasException;
261 #if CV__EXCEPTION_PTR
262 std::exception_ptr pException;
263 #else
264 cv::String exception_message;
265 #endif
266 #if CV__EXCEPTION_PTR
recordException()267 void recordException()
268 #else
269 void recordException(const cv::String& msg)
270 #endif
271 {
272 if (!hasException)
273 {
274 cv::AutoLock lock(cv::getInitializationMutex());
275 if (!hasException)
276 {
277 hasException = true;
278 #if CV__EXCEPTION_PTR
279 pException = std::current_exception();
280 #else
281 exception_message = msg;
282 #endif
283 }
284 }
285 }
286 private:
287 ParallelLoopBodyWrapperContext(const ParallelLoopBodyWrapperContext&); // disabled
288 ParallelLoopBodyWrapperContext& operator=(const ParallelLoopBodyWrapperContext&); // disabled
289 };
290
291 class ParallelLoopBodyWrapper : public cv::ParallelLoopBody
292 {
293 public:
ParallelLoopBodyWrapper(ParallelLoopBodyWrapperContext & ctx_)294 ParallelLoopBodyWrapper(ParallelLoopBodyWrapperContext& ctx_) :
295 ctx(ctx_)
296 {
297 }
~ParallelLoopBodyWrapper()298 ~ParallelLoopBodyWrapper()
299 {
300 }
operator ()(const cv::Range & sr) const301 void operator()(const cv::Range& sr) const CV_OVERRIDE
302 {
303 #ifdef OPENCV_TRACE
304 // TODO CV_TRACE_NS::details::setCurrentRegion(rootRegion);
305 if (ctx.traceRootRegion && ctx.traceRootContext)
306 CV_TRACE_NS::details::parallelForSetRootRegion(*ctx.traceRootRegion, *ctx.traceRootContext);
307 CV__TRACE_OPENCV_FUNCTION_NAME("parallel_for_body");
308 if (ctx.traceRootRegion)
309 CV_TRACE_NS::details::parallelForAttachNestedRegion(*ctx.traceRootRegion);
310 #endif
311
312 #ifdef ENABLE_INSTRUMENTATION
313 {
314 cv::instr::InstrTLSStruct *pInstrTLS = &cv::instr::getInstrumentTLSStruct();
315 pInstrTLS->pCurrentNode = ctx.pThreadRoot; // Initialize TLS node for thread
316 }
317 CV_INSTRUMENT_REGION();
318 #endif
319
320 // propagate main thread state
321 cv::theRNG() = ctx.rng;
322
323 cv::Range r;
324 cv::Range wholeRange = ctx.wholeRange;
325 int nstripes = ctx.nstripes;
326 r.start = (int)(wholeRange.start +
327 ((uint64)sr.start*(wholeRange.end - wholeRange.start) + nstripes/2)/nstripes);
328 r.end = sr.end >= nstripes ? wholeRange.end : (int)(wholeRange.start +
329 ((uint64)sr.end*(wholeRange.end - wholeRange.start) + nstripes/2)/nstripes);
330
331 #ifdef OPENCV_TRACE
332 CV_TRACE_ARG_VALUE(range_start, "range.start", (int64)r.start);
333 CV_TRACE_ARG_VALUE(range_end, "range.end", (int64)r.end);
334 #endif
335
336 try
337 {
338 (*ctx.body)(r);
339 }
340 #if CV__EXCEPTION_PTR
341 catch (...)
342 {
343 ctx.recordException();
344 }
345 #else
346 catch (const cv::Exception& e)
347 {
348 ctx.recordException(e.what());
349 }
350 catch (const std::exception& e)
351 {
352 ctx.recordException(e.what());
353 }
354 catch (...)
355 {
356 ctx.recordException("Unknown exception");
357 }
358 #endif
359
360 if (!ctx.is_rng_used && !(cv::theRNG() == ctx.rng))
361 ctx.is_rng_used = true;
362 }
stripeRange() const363 cv::Range stripeRange() const { return cv::Range(0, ctx.nstripes); }
364
365 protected:
366 ParallelLoopBodyWrapperContext& ctx;
367 };
368
369 #if defined HAVE_TBB
370 class ProxyLoopBody : public ParallelLoopBodyWrapper
371 {
372 public:
ProxyLoopBody(ParallelLoopBodyWrapperContext & ctx_)373 ProxyLoopBody(ParallelLoopBodyWrapperContext& ctx_)
374 : ParallelLoopBodyWrapper(ctx_)
375 {}
376
operator ()(const tbb::blocked_range<int> & range) const377 void operator ()(const tbb::blocked_range<int>& range) const
378 {
379 this->ParallelLoopBodyWrapper::operator()(cv::Range(range.begin(), range.end()));
380 }
381
operator ()() const382 void operator ()() const // run parallel job
383 {
384 cv::Range range = this->stripeRange();
385 tbb::parallel_for(tbb::blocked_range<int>(range.start, range.end), *this);
386 }
387 };
388 #elif defined HAVE_HPX
389 class ProxyLoopBody : public ParallelLoopBodyWrapper
390 {
391 public:
ProxyLoopBody(ParallelLoopBodyWrapperContext & ctx_)392 ProxyLoopBody(ParallelLoopBodyWrapperContext& ctx_)
393 : ParallelLoopBodyWrapper(ctx_)
394 {}
395
operator ()() const396 void operator ()() const // run parallel job
397 {
398 cv::Range stripeRange = this->stripeRange();
399 hpx::parallel::for_loop(
400 hpx::parallel::execution::par,
401 stripeRange.start, stripeRange.end,
402 [&](const int &i) { ;
403 this->ParallelLoopBodyWrapper::operator()(
404 cv::Range(i, i + 1));
405 });
406 }
407 };
408 #elif defined HAVE_OPENMP
409 typedef ParallelLoopBodyWrapper ProxyLoopBody;
410 #elif defined HAVE_GCD
411 typedef ParallelLoopBodyWrapper ProxyLoopBody;
block_function(void * context,size_t index)412 static void block_function(void* context, size_t index)
413 {
414 ProxyLoopBody* ptr_body = static_cast<ProxyLoopBody*>(context);
415 (*ptr_body)(cv::Range((int)index, (int)index + 1));
416 }
417 #elif defined WINRT || defined HAVE_CONCURRENCY
418 class ProxyLoopBody : public ParallelLoopBodyWrapper
419 {
420 public:
ProxyLoopBody(ParallelLoopBodyWrapperContext & ctx)421 ProxyLoopBody(ParallelLoopBodyWrapperContext& ctx)
422 : ParallelLoopBodyWrapper(ctx)
423 {}
424
operator ()(int i) const425 void operator ()(int i) const
426 {
427 this->ParallelLoopBodyWrapper::operator()(cv::Range(i, i + 1));
428 }
429 };
430 #else
431 typedef ParallelLoopBodyWrapper ProxyLoopBody;
432 #endif
433
434 #if defined HAVE_TBB
435 #if TBB_INTERFACE_VERSION >= 8000
436 static tbb::task_arena tbbArena(tbb::task_arena::automatic);
437 #else
438 static tbb::task_scheduler_init tbbScheduler(tbb::task_scheduler_init::deferred);
439 #endif
440 #elif defined HAVE_HPX
441 // nothing for HPX
442 #elif defined HAVE_OPENMP
_initMaxThreads()443 static inline int _initMaxThreads()
444 {
445 int maxThreads = omp_get_max_threads();
446 if (!utils::getConfigurationParameterBool("OPENCV_FOR_OPENMP_DYNAMIC_DISABLE", false))
447 {
448 omp_set_dynamic(1);
449 }
450 return maxThreads;
451 }
452 static int numThreadsMax = _initMaxThreads();
453 #elif defined HAVE_GCD
454 // nothing for GCD
455 #elif defined WINRT
456 // nothing for WINRT
457 #elif defined HAVE_CONCURRENCY
458
459 class SchedPtr
460 {
461 Concurrency::Scheduler* sched_;
462 public:
operator ->()463 Concurrency::Scheduler* operator->() { return sched_; }
operator Concurrency::Scheduler*()464 operator Concurrency::Scheduler*() { return sched_; }
465
operator =(Concurrency::Scheduler * sched)466 void operator=(Concurrency::Scheduler* sched)
467 {
468 if (sched_) sched_->Release();
469 sched_ = sched;
470 }
471
SchedPtr()472 SchedPtr() : sched_(0) {}
~SchedPtr()473 ~SchedPtr() {}
474 };
475 static SchedPtr pplScheduler;
476
477 #endif
478
479 } // namespace anon
480
481 /* ================================ parallel_for_ ================================ */
482
483 static void parallel_for_impl(const cv::Range& range, const cv::ParallelLoopBody& body, double nstripes); // forward declaration
484
parallel_for_(const cv::Range & range,const cv::ParallelLoopBody & body,double nstripes)485 void parallel_for_(const cv::Range& range, const cv::ParallelLoopBody& body, double nstripes)
486 {
487 #ifdef OPENCV_TRACE
488 CV__TRACE_OPENCV_FUNCTION_NAME_("parallel_for", 0);
489 CV_TRACE_ARG_VALUE(range_start, "range.start", (int64)range.start);
490 CV_TRACE_ARG_VALUE(range_end, "range.end", (int64)range.end);
491 CV_TRACE_ARG_VALUE(nstripes, "nstripes", (int64)nstripes);
492 #endif
493
494 CV_INSTRUMENT_REGION_MT_FORK();
495 if (range.empty())
496 return;
497
498 static std::atomic<bool> flagNestedParallelFor(false);
499 bool isNotNestedRegion = !flagNestedParallelFor.load();
500 if (isNotNestedRegion)
501 isNotNestedRegion = !flagNestedParallelFor.exchange(true);
502 if (isNotNestedRegion)
503 {
504 try
505 {
506 parallel_for_impl(range, body, nstripes);
507 flagNestedParallelFor = false;
508 }
509 catch (...)
510 {
511 flagNestedParallelFor = false;
512 throw;
513 }
514 }
515 else // nested parallel_for_() calls are not parallelized
516 {
517 CV_UNUSED(nstripes);
518 body(range);
519 }
520 }
521
522 static
parallel_for_cb(int start,int end,void * data)523 void parallel_for_cb(int start, int end, void* data)
524 {
525 CV_DbgAssert(data);
526 const cv::ParallelLoopBody& body = *(const cv::ParallelLoopBody*)data;
527 body(Range(start, end));
528 }
529
parallel_for_impl(const cv::Range & range,const cv::ParallelLoopBody & body,double nstripes)530 static void parallel_for_impl(const cv::Range& range, const cv::ParallelLoopBody& body, double nstripes)
531 {
532 using namespace cv::parallel;
533 if ((numThreads < 0 || numThreads > 1) && range.end - range.start > 1)
534 {
535 ParallelLoopBodyWrapperContext ctx(body, range, nstripes);
536 ProxyLoopBody pbody(ctx);
537 cv::Range stripeRange = pbody.stripeRange();
538 if( stripeRange.end - stripeRange.start == 1 )
539 {
540 body(range);
541 return;
542 }
543
544 std::shared_ptr<ParallelForAPI>& api = getCurrentParallelForAPI();
545 if (api)
546 {
547 CV_CheckEQ(stripeRange.start, 0, "");
548 api->parallel_for(stripeRange.end, parallel_for_cb, (void*)&pbody);
549 ctx.finalize(); // propagate exceptions if exists
550 return;
551 }
552
553 #ifdef CV_PARALLEL_FRAMEWORK
554 #if defined HAVE_TBB
555
556 #if TBB_INTERFACE_VERSION >= 8000
557 tbbArena.execute(pbody);
558 #else
559 pbody();
560 #endif
561
562 #elif defined HAVE_HPX
563 pbody();
564
565 #elif defined HAVE_OPENMP
566
567 #pragma omp parallel for schedule(dynamic) num_threads(numThreads > 0 ? numThreads : numThreadsMax)
568 for (int i = stripeRange.start; i < stripeRange.end; ++i)
569 pbody(Range(i, i + 1));
570
571 #elif defined HAVE_GCD
572
573 dispatch_queue_t concurrent_queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
574 dispatch_apply_f(stripeRange.end - stripeRange.start, concurrent_queue, &pbody, block_function);
575
576 #elif defined WINRT
577
578 Concurrency::parallel_for(stripeRange.start, stripeRange.end, pbody);
579
580 #elif defined HAVE_CONCURRENCY
581
582 if(!pplScheduler || pplScheduler->Id() == Concurrency::CurrentScheduler::Id())
583 {
584 Concurrency::parallel_for(stripeRange.start, stripeRange.end, pbody);
585 }
586 else
587 {
588 pplScheduler->Attach();
589 Concurrency::parallel_for(stripeRange.start, stripeRange.end, pbody);
590 Concurrency::CurrentScheduler::Detach();
591 }
592
593 #elif defined HAVE_PTHREADS_PF
594
595 parallel_for_pthreads(pbody.stripeRange(), pbody, pbody.stripeRange().size());
596
597 #else
598
599 #error You have hacked and compiling with unsupported parallel framework
600
601 #endif
602
603 ctx.finalize(); // propagate exceptions if exists
604 return;
605 #endif // CV_PARALLEL_FRAMEWORK
606 }
607
608 body(range);
609 }
610
611
getNumThreads(void)612 int getNumThreads(void)
613 {
614 std::shared_ptr<ParallelForAPI>& api = getCurrentParallelForAPI();
615 if (api)
616 {
617 return api->getNumThreads();
618 }
619
620 if (numThreads == 0)
621 return 1;
622
623 #if defined HAVE_TBB
624
625 #if TBB_INTERFACE_VERSION >= 9100
626 return tbbArena.max_concurrency();
627 #elif TBB_INTERFACE_VERSION >= 8000
628 return numThreads > 0
629 ? numThreads
630 : tbb::task_scheduler_init::default_num_threads();
631 #else
632 return tbbScheduler.is_active()
633 ? numThreads
634 : tbb::task_scheduler_init::default_num_threads();
635 #endif
636
637 #elif defined HAVE_HPX
638 return numThreads;
639
640 #elif defined HAVE_OPENMP
641
642 return numThreads > 0
643 ? numThreads
644 : numThreadsMax;
645
646
647 #elif defined HAVE_GCD
648
649 return cv::getNumberOfCPUs(); // the GCD thread pool limit
650
651 #elif defined WINRT
652
653 return 0;
654
655 #elif defined HAVE_CONCURRENCY
656
657 return (pplScheduler == 0)
658 ? Concurrency::CurrentScheduler::Get()->GetNumberOfVirtualProcessors()
659 : (1 + pplScheduler->GetNumberOfVirtualProcessors());
660
661 #elif defined HAVE_PTHREADS_PF
662
663 return parallel_pthreads_get_threads_num();
664
665 #else
666
667 return 1;
668
669 #endif
670 }
671
defaultNumberOfThreads()672 unsigned defaultNumberOfThreads()
673 {
674 #ifdef __ANDROID__
675 // many modern phones/tables have 4-core CPUs. Let's use no more
676 // than 2 threads by default not to overheat the devices
677 const unsigned int default_number_of_threads = 2;
678 #else
679 const unsigned int default_number_of_threads = (unsigned int)std::max(1, cv::getNumberOfCPUs());
680 #endif
681
682 unsigned result = default_number_of_threads;
683
684 static int config_num_threads = (int)utils::getConfigurationParameterSizeT("OPENCV_FOR_THREADS_NUM", 0);
685
686 if (config_num_threads)
687 {
688 result = (unsigned)std::max(1, config_num_threads);
689 //do we need upper limit of threads number?
690 }
691 return result;
692 }
693
setNumThreads(int threads_)694 void setNumThreads( int threads_ )
695 {
696 CV_UNUSED(threads_);
697
698 int threads = (threads_ < 0) ? defaultNumberOfThreads() : (unsigned)threads_;
699 numThreads = threads;
700
701 std::shared_ptr<ParallelForAPI>& api = getCurrentParallelForAPI();
702 if (api)
703 {
704 api->setNumThreads(numThreads);
705 }
706
707 #ifdef HAVE_TBB
708
709 #if TBB_INTERFACE_VERSION >= 8000
710 if(tbbArena.is_active()) tbbArena.terminate();
711 if(threads > 0) tbbArena.initialize(threads);
712 #else
713 if(tbbScheduler.is_active()) tbbScheduler.terminate();
714 if(threads > 0) tbbScheduler.initialize(threads);
715 #endif
716
717 #elif defined HAVE_HPX
718 return; // nothing needed as numThreads is used
719
720 #elif defined HAVE_OPENMP
721
722 return; // nothing needed as num_threads clause is used in #pragma omp parallel for
723
724 #elif defined HAVE_GCD
725
726 // unsupported
727 // there is only private dispatch_queue_set_width() and only for desktop
728
729 #elif defined WINRT
730
731 return;
732
733 #elif defined HAVE_CONCURRENCY
734
735 if (threads <= 0)
736 {
737 pplScheduler = 0;
738 }
739 else if (threads == 1)
740 {
741 // Concurrency always uses >=2 threads, so we just disable it if 1 thread is requested
742 numThreads = 0;
743 }
744 else if (pplScheduler == 0 || 1 + pplScheduler->GetNumberOfVirtualProcessors() != (unsigned int)threads)
745 {
746 pplScheduler = Concurrency::Scheduler::Create(Concurrency::SchedulerPolicy(2,
747 Concurrency::MinConcurrency, threads-1,
748 Concurrency::MaxConcurrency, threads-1));
749 }
750
751 #elif defined HAVE_PTHREADS_PF
752
753 parallel_pthreads_set_threads_num(threads);
754
755 #endif
756 }
757
758
getThreadNum()759 int getThreadNum()
760 {
761 std::shared_ptr<ParallelForAPI>& api = getCurrentParallelForAPI();
762 if (api)
763 {
764 return api->getThreadNum();
765 }
766
767 #if defined HAVE_TBB
768 #if TBB_INTERFACE_VERSION >= 9100
769 return tbb::this_task_arena::current_thread_index();
770 #elif TBB_INTERFACE_VERSION >= 8000
771 return tbb::task_arena::current_thread_index();
772 #else
773 return 0;
774 #endif
775 #elif defined HAVE_HPX
776 return (int)(hpx::get_num_worker_threads());
777 #elif defined HAVE_OPENMP
778 return omp_get_thread_num();
779 #elif defined HAVE_GCD
780 return (int)(size_t)(void*)pthread_self(); // no zero-based indexing
781 #elif defined WINRT
782 return 0;
783 #elif defined HAVE_CONCURRENCY
784 return std::max(0, (int)Concurrency::Context::VirtualProcessorId()); // zero for master thread, unique number for others but not necessary 1,2,3,...
785 #elif defined HAVE_PTHREADS_PF
786 return (int)(size_t)(void*)pthread_self(); // no zero-based indexing
787 #else
788 return 0;
789 #endif
790 }
791
792
793 #if defined __linux__ || defined __GLIBC__ || defined __HAIKU__ || defined __ANDROID__
794 #define CV_CPU_GROUPS_1
795 #endif
796
797 #if defined __linux__ || defined __ANDROID__
798 #define CV_HAVE_CGROUPS 1
799 #endif
800
801 #if defined CV_CPU_GROUPS_1
802 static inline
getFileContents(const char * filename)803 std::string getFileContents(const char *filename)
804 {
805 std::ifstream ifs(filename);
806 if (!ifs.is_open())
807 return std::string();
808
809 std::string content( (std::istreambuf_iterator<char>(ifs) ),
810 (std::istreambuf_iterator<char>() ) );
811
812 if (ifs.fail())
813 return std::string();
814
815 return content;
816 }
817
818 static inline
getNumberOfCPUsImpl(const char * filename)819 int getNumberOfCPUsImpl(const char *filename)
820 {
821 std::string file_contents = getFileContents(filename);
822 if(file_contents.empty())
823 return 0;
824
825 char *pbuf = const_cast<char*>(file_contents.c_str());
826 //parse string of form "0-1,3,5-7,10,13-15"
827 int cpusAvailable = 0;
828
829 while(*pbuf)
830 {
831 const char* pos = pbuf;
832 bool range = false;
833 while(*pbuf && *pbuf != ',')
834 {
835 if(*pbuf == '-') range = true;
836 ++pbuf;
837 }
838 if(*pbuf) *pbuf++ = 0;
839 if(!range)
840 ++cpusAvailable;
841 else
842 {
843 int rstart = 0, rend = 0;
844 sscanf(pos, "%d-%d", &rstart, &rend);
845 cpusAvailable += rend - rstart + 1;
846 }
847
848 }
849 return cpusAvailable;
850 }
851 #endif
852
853 #if defined CV_HAVE_CGROUPS
854 static inline
getNumberOfCPUsCFS()855 unsigned getNumberOfCPUsCFS()
856 {
857 int cfs_quota = 0;
858 {
859 std::ifstream ss_period("/sys/fs/cgroup/cpu/cpu.cfs_quota_us", std::ios::in | std::ios::binary);
860 ss_period >> cfs_quota;
861
862 if (ss_period.fail() || cfs_quota < 1) /* cfs_quota must not be 0 or negative */
863 return 0;
864 }
865
866 int cfs_period = 0;
867 {
868 std::ifstream ss_quota("/sys/fs/cgroup/cpu/cpu.cfs_period_us", std::ios::in | std::ios::binary);
869 ss_quota >> cfs_period;
870
871 if (ss_quota.fail() || cfs_period < 1)
872 return 0;
873 }
874
875 return (unsigned)max(1, cfs_quota/cfs_period);
876 }
877 #endif
878
879 template <typename T> static inline
minNonZero(const T & val_1,const T & val_2)880 T minNonZero(const T& val_1, const T& val_2)
881 {
882 if ((val_1 != 0) && (val_2 != 0))
883 return std::min(val_1, val_2);
884 return (val_1 != 0) ? val_1 : val_2;
885 }
886
887 static
getNumberOfCPUs_()888 int getNumberOfCPUs_()
889 {
890 /*
891 * Logic here is to try different methods of getting CPU counts and return
892 * the minimum most value as it has high probablity of being right and safe.
893 * Return 1 if we get 0 or not found on all methods.
894 */
895 #if defined CV_CXX11 \
896 && !defined(__MINGW32__) /* not implemented (2020-03) */ \
897
898 /*
899 * Check for this standard C++11 way, we do not return directly because
900 * running in a docker or K8s environment will mean this is the host
901 * machines config not the containers or pods and as per docs this value
902 * must be "considered only a hint".
903 */
904 unsigned ncpus = std::thread::hardware_concurrency(); /* If the value is not well defined or not computable, returns 0 */
905 #else
906 unsigned ncpus = 0; /* 0 means we have to find out some other way */
907 #endif
908
909 #if defined _WIN32
910
911 SYSTEM_INFO sysinfo = {};
912 #if (defined(_M_ARM) || defined(_M_ARM64) || defined(_M_X64) || defined(WINRT)) && _WIN32_WINNT >= 0x501
913 GetNativeSystemInfo( &sysinfo );
914 #else
915 GetSystemInfo( &sysinfo );
916 #endif
917 unsigned ncpus_sysinfo = sysinfo.dwNumberOfProcessors;
918 ncpus = minNonZero(ncpus, ncpus_sysinfo);
919
920 #elif defined __APPLE__
921
922 int numCPU=0;
923 int mib[4];
924 size_t len = sizeof(numCPU);
925
926 /* set the mib for hw.ncpu */
927 mib[0] = CTL_HW;
928 mib[1] = HW_AVAILCPU; // alternatively, try HW_NCPU;
929
930 /* get the number of CPUs from the system */
931 sysctl(mib, 2, &numCPU, &len, NULL, 0);
932
933 if( numCPU < 1 )
934 {
935 mib[1] = HW_NCPU;
936 sysctl( mib, 2, &numCPU, &len, NULL, 0 );
937
938 if( numCPU < 1 )
939 numCPU = 1;
940 }
941
942 ncpus = minNonZero(ncpus, (unsigned)numCPU);
943
944 #elif defined CV_CPU_GROUPS_1
945
946 #if defined CV_HAVE_CGROUPS
947 static unsigned ncpus_impl_cpuset = (unsigned)getNumberOfCPUsImpl("/sys/fs/cgroup/cpuset/cpuset.cpus");
948 ncpus = minNonZero(ncpus, ncpus_impl_cpuset);
949
950 static unsigned ncpus_impl_cfs = getNumberOfCPUsCFS();
951 ncpus = minNonZero(ncpus, ncpus_impl_cfs);
952 #endif
953
954 static unsigned ncpus_impl_devices = (unsigned)getNumberOfCPUsImpl("/sys/devices/system/cpu/online");
955 ncpus = minNonZero(ncpus, ncpus_impl_devices);
956
957 #endif
958
959 #if defined _GNU_SOURCE \
960 && !defined(__MINGW32__) /* not implemented (2020-03) */ \
961 && !defined(__EMSCRIPTEN__) \
962 && !defined(__ANDROID__) // TODO: add check for modern Android NDK
963
964 cpu_set_t cpu_set;
965 if (0 == sched_getaffinity(0, sizeof(cpu_set), &cpu_set))
966 {
967 unsigned cpu_count_cpu_set = CPU_COUNT(&cpu_set);
968 ncpus = minNonZero(ncpus, cpu_count_cpu_set);
969 }
970
971 #endif
972
973 #if !defined(_WIN32) && !defined(__APPLE__)
974
975 static unsigned cpu_count_sysconf = (unsigned)sysconf( _SC_NPROCESSORS_ONLN );
976 ncpus = minNonZero(ncpus, cpu_count_sysconf);
977
978 #endif
979
980 return ncpus != 0 ? ncpus : 1;
981 }
982
getNumberOfCPUs()983 int getNumberOfCPUs()
984 {
985 static int nCPUs = getNumberOfCPUs_();
986 return nCPUs; // cached value
987 }
988
currentParallelFramework()989 const char* currentParallelFramework()
990 {
991 std::shared_ptr<ParallelForAPI>& api = getCurrentParallelForAPI();
992 if (api)
993 {
994 return api->getName();
995 }
996 #ifdef CV_PARALLEL_FRAMEWORK
997 return CV_PARALLEL_FRAMEWORK;
998 #else
999 return NULL;
1000 #endif
1001 }
1002
1003 } // namespace cv::
1004
cvSetNumThreads(int nt)1005 CV_IMPL void cvSetNumThreads(int nt)
1006 {
1007 cv::setNumThreads(nt);
1008 }
1009
cvGetNumThreads()1010 CV_IMPL int cvGetNumThreads()
1011 {
1012 return cv::getNumThreads();
1013 }
1014
cvGetThreadNum()1015 CV_IMPL int cvGetThreadNum()
1016 {
1017 return cv::getThreadNum();
1018 }
1019