1 /*M///////////////////////////////////////////////////////////////////////////////////////
2 //
3 //  IMPORTANT: READ BEFORE DOWNLOADING, COPYING, INSTALLING OR USING.
4 //
5 //  By downloading, copying, installing or using the software you agree to this license.
6 //  If you do not agree to this license, do not download, install,
7 //  copy or use the software.
8 //
9 //
10 //                           License Agreement
11 //                For Open Source Computer Vision Library
12 //
13 // Copyright (C) 2000-2008, Intel Corporation, all rights reserved.
14 // Copyright (C) 2009-2011, Willow Garage Inc., all rights reserved.
15 // Third party copyrights are property of their respective owners.
16 //
17 // Redistribution and use in source and binary forms, with or without modification,
18 // are permitted provided that the following conditions are met:
19 //
20 //   * Redistribution's of source code must retain the above copyright notice,
21 //     this list of conditions and the following disclaimer.
22 //
23 //   * Redistribution's in binary form must reproduce the above copyright notice,
24 //     this list of conditions and the following disclaimer in the documentation
25 //     and/or other materials provided with the distribution.
26 //
27 //   * The name of the copyright holders may not be used to endorse or promote products
28 //     derived from this software without specific prior written permission.
29 //
30 // This software is provided by the copyright holders and contributors "as is" and
31 // any express or implied warranties, including, but not limited to, the implied
32 // warranties of merchantability and fitness for a particular purpose are disclaimed.
33 // In no event shall the Intel Corporation or contributors be liable for any direct,
34 // indirect, incidental, special, exemplary, or consequential damages
35 // (including, but not limited to, procurement of substitute goods or services;
36 // loss of use, data, or profits; or business interruption) however caused
37 // and on any theory of liability, whether in contract, strict liability,
38 // or tort (including negligence or otherwise) arising in any way out of
39 // the use of this software, even if advised of the possibility of such damage.
40 //
41 //M*/
42 
43 #include "precomp.hpp"
44 
45 #include <opencv2/core/utils/configuration.private.hpp>
46 #include <opencv2/core/utils/trace.private.hpp>
47 
48 #include "opencv2/core/parallel/parallel_backend.hpp"
49 #include "parallel/parallel.hpp"
50 
51 #if defined _WIN32 || defined WINCE
52     #include <windows.h>
53     #undef small
54     #undef min
55     #undef max
56     #undef abs
57 #endif
58 
59 #if defined __linux__ || defined __APPLE__ || defined __GLIBC__ \
60     || defined __HAIKU__ || defined __EMSCRIPTEN__ || defined __DragonFly__ \
61     || defined __OpenBSD__
62     #include <unistd.h>
63     #include <stdio.h>
64     #include <sys/types.h>
65     #include <fstream>
66     #if defined __ANDROID__
67         #include <sys/sysconf.h>
68         #include <sys/syscall.h>
69         #include <sched.h>
70     #elif defined __APPLE__
71         #include <sys/sysctl.h>
72     #endif
73 #endif
74 
75 #if defined CV_CXX11
76     #include <thread>
77 #endif
78 
79 #ifdef _OPENMP
80     #define HAVE_OPENMP
81 #endif
82 
83 #ifdef __APPLE__
84     #define HAVE_GCD
85 #endif
86 
87 #if defined _MSC_VER && _MSC_VER >= 1600
88     #define HAVE_CONCURRENCY
89 #endif
90 
91 /* IMPORTANT: always use the same order of defines
92    - HAVE_TBB         - 3rdparty library, should be explicitly enabled
93    - HAVE_HPX         - 3rdparty library, should be explicitly enabled
94    - HAVE_OPENMP      - integrated to compiler, should be explicitly enabled
95    - HAVE_GCD         - system wide, used automatically        (APPLE only)
96    - WINRT            - system wide, used automatically        (Windows RT only)
97    - HAVE_CONCURRENCY - part of runtime, used automatically    (Windows only - MSVS 10, MSVS 11)
98    - HAVE_PTHREADS_PF - pthreads if available
99 */
100 
101 #if defined HAVE_TBB
102     #ifndef TBB_SUPPRESS_DEPRECATED_MESSAGES  // supress warning
103     #define TBB_SUPPRESS_DEPRECATED_MESSAGES 1
104     #endif
105     #include "tbb/tbb.h"
106     #include "tbb/task.h"
107     #if TBB_INTERFACE_VERSION >= 8000
108         #include "tbb/task_arena.h"
109     #endif
110     #undef min
111     #undef max
112 #elif defined HAVE_HPX
113     #include <hpx/parallel/algorithms/for_loop.hpp>
114     #include <hpx/parallel/execution.hpp>
115     //
116     #include <hpx/hpx_start.hpp>
117     #include <hpx/hpx_suspend.hpp>
118     #include <hpx/include/apply.hpp>
119     #include <hpx/util/yield_while.hpp>
120     #include <hpx/include/threadmanager.hpp>
121 
122 #elif defined HAVE_OPENMP
123     #include <omp.h>
124 #elif defined HAVE_GCD
125     #include <dispatch/dispatch.h>
126     #include <pthread.h>
127 #elif defined WINRT && _MSC_VER < 1900
128     #include <ppltasks.h>
129 #elif defined HAVE_CONCURRENCY
130     #include <ppl.h>
131 #endif
132 
133 
134 #if defined HAVE_TBB
135 #  define CV_PARALLEL_FRAMEWORK "tbb"
136 #elif defined HAVE_HPX
137 #  define CV_PARALLEL_FRAMEWORK "hpx"
138 #elif defined HAVE_OPENMP
139 #  define CV_PARALLEL_FRAMEWORK "openmp"
140 #elif defined HAVE_GCD
141 #  define CV_PARALLEL_FRAMEWORK "gcd"
142 #elif defined WINRT
143 #  define CV_PARALLEL_FRAMEWORK "winrt-concurrency"
144 #elif defined HAVE_CONCURRENCY
145 #  define CV_PARALLEL_FRAMEWORK "ms-concurrency"
146 #elif defined HAVE_PTHREADS_PF
147 #  define CV_PARALLEL_FRAMEWORK "pthreads"
148 #endif
149 
150 #include <atomic>
151 
152 #include "parallel_impl.hpp"
153 
154 #include "opencv2/core/detail/exception_ptr.hpp"  // CV__EXCEPTION_PTR = 1 if std::exception_ptr is available
155 
156 using namespace cv;
157 
158 namespace cv {
159 
~ParallelLoopBody()160 ParallelLoopBody::~ParallelLoopBody() {}
161 
162 using namespace cv::parallel;
163 
164 namespace {
165 
166 #ifdef ENABLE_INSTRUMENTATION
SyncNodes(cv::instr::InstrNode * pNode)167     static void SyncNodes(cv::instr::InstrNode *pNode)
168     {
169         std::vector<cv::instr::NodeDataTls*> data;
170         pNode->m_payload.m_tls.gather(data);
171 
172         uint64 ticksMax = 0;
173         int    threads  = 0;
174         for(size_t i = 0; i < data.size(); i++)
175         {
176             if(data[i] && data[i]->m_ticksTotal)
177             {
178                 ticksMax = MAX(ticksMax, data[i]->m_ticksTotal);
179                 pNode->m_payload.m_ticksTotal -= data[i]->m_ticksTotal;
180                 data[i]->m_ticksTotal = 0;
181                 threads++;
182             }
183         }
184         pNode->m_payload.m_ticksTotal += ticksMax;
185         pNode->m_payload.m_threads = MAX(pNode->m_payload.m_threads, threads);
186 
187         for(size_t i = 0; i < pNode->m_childs.size(); i++)
188             SyncNodes(pNode->m_childs[i]);
189     }
190 #endif
191 
192     class ParallelLoopBodyWrapperContext
193     {
194     public:
ParallelLoopBodyWrapperContext(const cv::ParallelLoopBody & _body,const cv::Range & _r,double _nstripes)195         ParallelLoopBodyWrapperContext(const cv::ParallelLoopBody& _body, const cv::Range& _r, double _nstripes) :
196             is_rng_used(false), hasException(false)
197         {
198 
199             body = &_body;
200             wholeRange = _r;
201             double len = wholeRange.end - wholeRange.start;
202             nstripes = cvRound(_nstripes <= 0 ? len : MIN(MAX(_nstripes, 1.), len));
203 
204             // propagate main thread state
205             rng = cv::theRNG();
206 
207 #ifdef OPENCV_TRACE
208             traceRootRegion = CV_TRACE_NS::details::getCurrentRegion();
209             traceRootContext = CV_TRACE_NS::details::getTraceManager().tls.get();
210 #endif
211 
212 #ifdef ENABLE_INSTRUMENTATION
213             pThreadRoot = cv::instr::getInstrumentTLSStruct().pCurrentNode;
214 #endif
215         }
finalize()216         void finalize()
217         {
218 #ifdef ENABLE_INSTRUMENTATION
219             for(size_t i = 0; i < pThreadRoot->m_childs.size(); i++)
220                 SyncNodes(pThreadRoot->m_childs[i]);
221 #endif
222             if (is_rng_used)
223             {
224                 // Some parallel backends execute nested jobs in the main thread,
225                 // so we need to restore initial RNG state here.
226                 cv::theRNG() = rng;
227                 // We can't properly update RNG state based on RNG usage in worker threads,
228                 // so lets just change main thread RNG state to the next value.
229                 // Note: this behaviour is not equal to single-threaded mode.
230                 cv::theRNG().next();
231             }
232 #ifdef OPENCV_TRACE
233             if (traceRootRegion)
234                 CV_TRACE_NS::details::parallelForFinalize(*traceRootRegion);
235 #endif
236 
237             if (hasException)
238             {
239 #if CV__EXCEPTION_PTR
240                 std::rethrow_exception(pException);
241 #else
242                 CV_Error(Error::StsError, "Exception in parallel_for() body: " + exception_message);
243 #endif
244             }
245         }
~ParallelLoopBodyWrapperContext()246         ~ParallelLoopBodyWrapperContext() {}
247 
248         const cv::ParallelLoopBody* body;
249         cv::Range wholeRange;
250         int nstripes;
251         cv::RNG rng;
252         mutable bool is_rng_used;
253 #ifdef OPENCV_TRACE
254         CV_TRACE_NS::details::Region* traceRootRegion;
255         CV_TRACE_NS::details::TraceManagerThreadLocal* traceRootContext;
256 #endif
257 #ifdef ENABLE_INSTRUMENTATION
258         cv::instr::InstrNode *pThreadRoot;
259 #endif
260         bool hasException;
261 #if CV__EXCEPTION_PTR
262         std::exception_ptr pException;
263 #else
264         cv::String exception_message;
265 #endif
266 #if CV__EXCEPTION_PTR
recordException()267         void recordException()
268 #else
269         void recordException(const cv::String& msg)
270 #endif
271         {
272             if (!hasException)
273             {
274                 cv::AutoLock lock(cv::getInitializationMutex());
275                 if (!hasException)
276                 {
277                     hasException = true;
278 #if CV__EXCEPTION_PTR
279                     pException = std::current_exception();
280 #else
281                     exception_message = msg;
282 #endif
283                 }
284             }
285         }
286     private:
287         ParallelLoopBodyWrapperContext(const ParallelLoopBodyWrapperContext&); // disabled
288         ParallelLoopBodyWrapperContext& operator=(const ParallelLoopBodyWrapperContext&); // disabled
289     };
290 
291     class ParallelLoopBodyWrapper : public cv::ParallelLoopBody
292     {
293     public:
ParallelLoopBodyWrapper(ParallelLoopBodyWrapperContext & ctx_)294         ParallelLoopBodyWrapper(ParallelLoopBodyWrapperContext& ctx_) :
295             ctx(ctx_)
296         {
297         }
~ParallelLoopBodyWrapper()298         ~ParallelLoopBodyWrapper()
299         {
300         }
operator ()(const cv::Range & sr) const301         void operator()(const cv::Range& sr) const CV_OVERRIDE
302         {
303 #ifdef OPENCV_TRACE
304             // TODO CV_TRACE_NS::details::setCurrentRegion(rootRegion);
305             if (ctx.traceRootRegion && ctx.traceRootContext)
306                 CV_TRACE_NS::details::parallelForSetRootRegion(*ctx.traceRootRegion, *ctx.traceRootContext);
307             CV__TRACE_OPENCV_FUNCTION_NAME("parallel_for_body");
308             if (ctx.traceRootRegion)
309                 CV_TRACE_NS::details::parallelForAttachNestedRegion(*ctx.traceRootRegion);
310 #endif
311 
312 #ifdef ENABLE_INSTRUMENTATION
313             {
314                 cv::instr::InstrTLSStruct *pInstrTLS = &cv::instr::getInstrumentTLSStruct();
315                 pInstrTLS->pCurrentNode = ctx.pThreadRoot; // Initialize TLS node for thread
316             }
317             CV_INSTRUMENT_REGION();
318 #endif
319 
320             // propagate main thread state
321             cv::theRNG() = ctx.rng;
322 
323             cv::Range r;
324             cv::Range wholeRange = ctx.wholeRange;
325             int nstripes = ctx.nstripes;
326             r.start = (int)(wholeRange.start +
327                             ((uint64)sr.start*(wholeRange.end - wholeRange.start) + nstripes/2)/nstripes);
328             r.end = sr.end >= nstripes ? wholeRange.end : (int)(wholeRange.start +
329                             ((uint64)sr.end*(wholeRange.end - wholeRange.start) + nstripes/2)/nstripes);
330 
331 #ifdef OPENCV_TRACE
332             CV_TRACE_ARG_VALUE(range_start, "range.start", (int64)r.start);
333             CV_TRACE_ARG_VALUE(range_end, "range.end", (int64)r.end);
334 #endif
335 
336             try
337             {
338                 (*ctx.body)(r);
339             }
340 #if CV__EXCEPTION_PTR
341             catch (...)
342             {
343                 ctx.recordException();
344             }
345 #else
346             catch (const cv::Exception& e)
347             {
348                 ctx.recordException(e.what());
349             }
350             catch (const std::exception& e)
351             {
352                 ctx.recordException(e.what());
353             }
354             catch (...)
355             {
356                 ctx.recordException("Unknown exception");
357             }
358 #endif
359 
360             if (!ctx.is_rng_used && !(cv::theRNG() == ctx.rng))
361                 ctx.is_rng_used = true;
362         }
stripeRange() const363         cv::Range stripeRange() const { return cv::Range(0, ctx.nstripes); }
364 
365     protected:
366         ParallelLoopBodyWrapperContext& ctx;
367     };
368 
369 #if defined HAVE_TBB
370     class ProxyLoopBody : public ParallelLoopBodyWrapper
371     {
372     public:
ProxyLoopBody(ParallelLoopBodyWrapperContext & ctx_)373         ProxyLoopBody(ParallelLoopBodyWrapperContext& ctx_)
374         : ParallelLoopBodyWrapper(ctx_)
375         {}
376 
operator ()(const tbb::blocked_range<int> & range) const377         void operator ()(const tbb::blocked_range<int>& range) const
378         {
379             this->ParallelLoopBodyWrapper::operator()(cv::Range(range.begin(), range.end()));
380         }
381 
operator ()() const382         void operator ()() const  // run parallel job
383         {
384             cv::Range range = this->stripeRange();
385             tbb::parallel_for(tbb::blocked_range<int>(range.start, range.end), *this);
386         }
387     };
388 #elif defined HAVE_HPX
389     class ProxyLoopBody : public ParallelLoopBodyWrapper
390     {
391     public:
ProxyLoopBody(ParallelLoopBodyWrapperContext & ctx_)392         ProxyLoopBody(ParallelLoopBodyWrapperContext& ctx_)
393                 : ParallelLoopBodyWrapper(ctx_)
394         {}
395 
operator ()() const396         void operator ()() const  // run parallel job
397         {
398             cv::Range stripeRange = this->stripeRange();
399             hpx::parallel::for_loop(
400                     hpx::parallel::execution::par,
401                     stripeRange.start, stripeRange.end,
402                     [&](const int &i) { ;
403                         this->ParallelLoopBodyWrapper::operator()(
404                                 cv::Range(i, i + 1));
405                     });
406         }
407     };
408 #elif defined HAVE_OPENMP
409     typedef ParallelLoopBodyWrapper ProxyLoopBody;
410 #elif defined HAVE_GCD
411     typedef ParallelLoopBodyWrapper ProxyLoopBody;
block_function(void * context,size_t index)412     static void block_function(void* context, size_t index)
413     {
414         ProxyLoopBody* ptr_body = static_cast<ProxyLoopBody*>(context);
415         (*ptr_body)(cv::Range((int)index, (int)index + 1));
416     }
417 #elif defined WINRT || defined HAVE_CONCURRENCY
418     class ProxyLoopBody : public ParallelLoopBodyWrapper
419     {
420     public:
ProxyLoopBody(ParallelLoopBodyWrapperContext & ctx)421         ProxyLoopBody(ParallelLoopBodyWrapperContext& ctx)
422         : ParallelLoopBodyWrapper(ctx)
423         {}
424 
operator ()(int i) const425         void operator ()(int i) const
426         {
427             this->ParallelLoopBodyWrapper::operator()(cv::Range(i, i + 1));
428         }
429     };
430 #else
431     typedef ParallelLoopBodyWrapper ProxyLoopBody;
432 #endif
433 
434 #if defined HAVE_TBB
435     #if TBB_INTERFACE_VERSION >= 8000
436         static tbb::task_arena tbbArena(tbb::task_arena::automatic);
437     #else
438         static tbb::task_scheduler_init tbbScheduler(tbb::task_scheduler_init::deferred);
439     #endif
440 #elif defined HAVE_HPX
441 // nothing for HPX
442 #elif defined HAVE_OPENMP
_initMaxThreads()443 static inline int _initMaxThreads()
444 {
445     int maxThreads = omp_get_max_threads();
446     if (!utils::getConfigurationParameterBool("OPENCV_FOR_OPENMP_DYNAMIC_DISABLE", false))
447     {
448         omp_set_dynamic(1);
449     }
450     return maxThreads;
451 }
452 static int numThreadsMax = _initMaxThreads();
453 #elif defined HAVE_GCD
454 // nothing for GCD
455 #elif defined WINRT
456 // nothing for WINRT
457 #elif defined HAVE_CONCURRENCY
458 
459 class SchedPtr
460 {
461     Concurrency::Scheduler* sched_;
462 public:
operator ->()463     Concurrency::Scheduler* operator->() { return sched_; }
operator Concurrency::Scheduler*()464     operator Concurrency::Scheduler*() { return sched_; }
465 
operator =(Concurrency::Scheduler * sched)466     void operator=(Concurrency::Scheduler* sched)
467     {
468         if (sched_) sched_->Release();
469         sched_ = sched;
470     }
471 
SchedPtr()472     SchedPtr() : sched_(0) {}
~SchedPtr()473     ~SchedPtr() {}
474 };
475 static SchedPtr pplScheduler;
476 
477 #endif
478 
479 } // namespace anon
480 
481 /* ================================   parallel_for_  ================================ */
482 
483 static void parallel_for_impl(const cv::Range& range, const cv::ParallelLoopBody& body, double nstripes); // forward declaration
484 
parallel_for_(const cv::Range & range,const cv::ParallelLoopBody & body,double nstripes)485 void parallel_for_(const cv::Range& range, const cv::ParallelLoopBody& body, double nstripes)
486 {
487 #ifdef OPENCV_TRACE
488     CV__TRACE_OPENCV_FUNCTION_NAME_("parallel_for", 0);
489     CV_TRACE_ARG_VALUE(range_start, "range.start", (int64)range.start);
490     CV_TRACE_ARG_VALUE(range_end, "range.end", (int64)range.end);
491     CV_TRACE_ARG_VALUE(nstripes, "nstripes", (int64)nstripes);
492 #endif
493 
494     CV_INSTRUMENT_REGION_MT_FORK();
495     if (range.empty())
496         return;
497 
498     static std::atomic<bool> flagNestedParallelFor(false);
499     bool isNotNestedRegion = !flagNestedParallelFor.load();
500     if (isNotNestedRegion)
501       isNotNestedRegion = !flagNestedParallelFor.exchange(true);
502     if (isNotNestedRegion)
503     {
504         try
505         {
506             parallel_for_impl(range, body, nstripes);
507             flagNestedParallelFor = false;
508         }
509         catch (...)
510         {
511             flagNestedParallelFor = false;
512             throw;
513         }
514     }
515     else // nested parallel_for_() calls are not parallelized
516     {
517         CV_UNUSED(nstripes);
518         body(range);
519     }
520 }
521 
522 static
parallel_for_cb(int start,int end,void * data)523 void parallel_for_cb(int start, int end, void* data)
524 {
525     CV_DbgAssert(data);
526     const cv::ParallelLoopBody& body = *(const cv::ParallelLoopBody*)data;
527     body(Range(start, end));
528 }
529 
parallel_for_impl(const cv::Range & range,const cv::ParallelLoopBody & body,double nstripes)530 static void parallel_for_impl(const cv::Range& range, const cv::ParallelLoopBody& body, double nstripes)
531 {
532     using namespace cv::parallel;
533     if ((numThreads < 0 || numThreads > 1) && range.end - range.start > 1)
534     {
535         ParallelLoopBodyWrapperContext ctx(body, range, nstripes);
536         ProxyLoopBody pbody(ctx);
537         cv::Range stripeRange = pbody.stripeRange();
538         if( stripeRange.end - stripeRange.start == 1 )
539         {
540             body(range);
541             return;
542         }
543 
544         std::shared_ptr<ParallelForAPI>& api = getCurrentParallelForAPI();
545         if (api)
546         {
547             CV_CheckEQ(stripeRange.start, 0, "");
548             api->parallel_for(stripeRange.end, parallel_for_cb, (void*)&pbody);
549             ctx.finalize();  // propagate exceptions if exists
550             return;
551         }
552 
553 #ifdef CV_PARALLEL_FRAMEWORK
554 #if defined HAVE_TBB
555 
556 #if TBB_INTERFACE_VERSION >= 8000
557         tbbArena.execute(pbody);
558 #else
559         pbody();
560 #endif
561 
562 #elif defined HAVE_HPX
563         pbody();
564 
565 #elif defined HAVE_OPENMP
566 
567         #pragma omp parallel for schedule(dynamic) num_threads(numThreads > 0 ? numThreads : numThreadsMax)
568         for (int i = stripeRange.start; i < stripeRange.end; ++i)
569             pbody(Range(i, i + 1));
570 
571 #elif defined HAVE_GCD
572 
573         dispatch_queue_t concurrent_queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
574         dispatch_apply_f(stripeRange.end - stripeRange.start, concurrent_queue, &pbody, block_function);
575 
576 #elif defined WINRT
577 
578         Concurrency::parallel_for(stripeRange.start, stripeRange.end, pbody);
579 
580 #elif defined HAVE_CONCURRENCY
581 
582         if(!pplScheduler || pplScheduler->Id() == Concurrency::CurrentScheduler::Id())
583         {
584             Concurrency::parallel_for(stripeRange.start, stripeRange.end, pbody);
585         }
586         else
587         {
588             pplScheduler->Attach();
589             Concurrency::parallel_for(stripeRange.start, stripeRange.end, pbody);
590             Concurrency::CurrentScheduler::Detach();
591         }
592 
593 #elif defined HAVE_PTHREADS_PF
594 
595         parallel_for_pthreads(pbody.stripeRange(), pbody, pbody.stripeRange().size());
596 
597 #else
598 
599 #error You have hacked and compiling with unsupported parallel framework
600 
601 #endif
602 
603         ctx.finalize();  // propagate exceptions if exists
604         return;
605 #endif // CV_PARALLEL_FRAMEWORK
606     }
607 
608     body(range);
609 }
610 
611 
getNumThreads(void)612 int getNumThreads(void)
613 {
614     std::shared_ptr<ParallelForAPI>& api = getCurrentParallelForAPI();
615     if (api)
616     {
617         return api->getNumThreads();
618     }
619 
620     if (numThreads == 0)
621         return 1;
622 
623 #if defined HAVE_TBB
624 
625 #if TBB_INTERFACE_VERSION >= 9100
626     return tbbArena.max_concurrency();
627 #elif TBB_INTERFACE_VERSION >= 8000
628     return numThreads > 0
629         ? numThreads
630         : tbb::task_scheduler_init::default_num_threads();
631 #else
632     return tbbScheduler.is_active()
633            ? numThreads
634            : tbb::task_scheduler_init::default_num_threads();
635 #endif
636 
637 #elif defined HAVE_HPX
638     return numThreads;
639 
640 #elif defined HAVE_OPENMP
641 
642     return numThreads > 0
643            ? numThreads
644            : numThreadsMax;
645 
646 
647 #elif defined HAVE_GCD
648 
649     return cv::getNumberOfCPUs(); // the GCD thread pool limit
650 
651 #elif defined WINRT
652 
653     return 0;
654 
655 #elif defined HAVE_CONCURRENCY
656 
657     return (pplScheduler == 0)
658         ? Concurrency::CurrentScheduler::Get()->GetNumberOfVirtualProcessors()
659         : (1 + pplScheduler->GetNumberOfVirtualProcessors());
660 
661 #elif defined HAVE_PTHREADS_PF
662 
663         return parallel_pthreads_get_threads_num();
664 
665 #else
666 
667     return 1;
668 
669 #endif
670 }
671 
defaultNumberOfThreads()672 unsigned defaultNumberOfThreads()
673 {
674 #ifdef __ANDROID__
675     // many modern phones/tables have 4-core CPUs. Let's use no more
676     // than 2 threads by default not to overheat the devices
677     const unsigned int default_number_of_threads = 2;
678 #else
679     const unsigned int default_number_of_threads = (unsigned int)std::max(1, cv::getNumberOfCPUs());
680 #endif
681 
682     unsigned result = default_number_of_threads;
683 
684     static int config_num_threads = (int)utils::getConfigurationParameterSizeT("OPENCV_FOR_THREADS_NUM", 0);
685 
686     if (config_num_threads)
687     {
688         result = (unsigned)std::max(1, config_num_threads);
689         //do we need upper limit of threads number?
690     }
691     return result;
692 }
693 
setNumThreads(int threads_)694 void setNumThreads( int threads_ )
695 {
696     CV_UNUSED(threads_);
697 
698     int threads = (threads_ < 0) ? defaultNumberOfThreads() : (unsigned)threads_;
699     numThreads = threads;
700 
701     std::shared_ptr<ParallelForAPI>& api = getCurrentParallelForAPI();
702     if (api)
703     {
704         api->setNumThreads(numThreads);
705     }
706 
707 #ifdef HAVE_TBB
708 
709 #if TBB_INTERFACE_VERSION >= 8000
710     if(tbbArena.is_active()) tbbArena.terminate();
711     if(threads > 0) tbbArena.initialize(threads);
712 #else
713     if(tbbScheduler.is_active()) tbbScheduler.terminate();
714     if(threads > 0) tbbScheduler.initialize(threads);
715 #endif
716 
717 #elif defined HAVE_HPX
718     return; // nothing needed as numThreads is used
719 
720 #elif defined HAVE_OPENMP
721 
722     return; // nothing needed as num_threads clause is used in #pragma omp parallel for
723 
724 #elif defined HAVE_GCD
725 
726     // unsupported
727     // there is only private dispatch_queue_set_width() and only for desktop
728 
729 #elif defined WINRT
730 
731     return;
732 
733 #elif defined HAVE_CONCURRENCY
734 
735     if (threads <= 0)
736     {
737         pplScheduler = 0;
738     }
739     else if (threads == 1)
740     {
741         // Concurrency always uses >=2 threads, so we just disable it if 1 thread is requested
742         numThreads = 0;
743     }
744     else if (pplScheduler == 0 || 1 + pplScheduler->GetNumberOfVirtualProcessors() != (unsigned int)threads)
745     {
746         pplScheduler = Concurrency::Scheduler::Create(Concurrency::SchedulerPolicy(2,
747                        Concurrency::MinConcurrency, threads-1,
748                        Concurrency::MaxConcurrency, threads-1));
749     }
750 
751 #elif defined HAVE_PTHREADS_PF
752 
753     parallel_pthreads_set_threads_num(threads);
754 
755 #endif
756 }
757 
758 
getThreadNum()759 int getThreadNum()
760 {
761     std::shared_ptr<ParallelForAPI>& api = getCurrentParallelForAPI();
762     if (api)
763     {
764         return api->getThreadNum();
765     }
766 
767 #if defined HAVE_TBB
768     #if TBB_INTERFACE_VERSION >= 9100
769         return tbb::this_task_arena::current_thread_index();
770     #elif TBB_INTERFACE_VERSION >= 8000
771         return tbb::task_arena::current_thread_index();
772     #else
773         return 0;
774     #endif
775 #elif defined HAVE_HPX
776         return (int)(hpx::get_num_worker_threads());
777 #elif defined HAVE_OPENMP
778     return omp_get_thread_num();
779 #elif defined HAVE_GCD
780     return (int)(size_t)(void*)pthread_self(); // no zero-based indexing
781 #elif defined WINRT
782     return 0;
783 #elif defined HAVE_CONCURRENCY
784     return std::max(0, (int)Concurrency::Context::VirtualProcessorId()); // zero for master thread, unique number for others but not necessary 1,2,3,...
785 #elif defined HAVE_PTHREADS_PF
786     return (int)(size_t)(void*)pthread_self(); // no zero-based indexing
787 #else
788     return 0;
789 #endif
790 }
791 
792 
793 #if defined __linux__ || defined __GLIBC__ || defined __HAIKU__ || defined __ANDROID__
794   #define CV_CPU_GROUPS_1
795 #endif
796 
797 #if defined __linux__ || defined __ANDROID__
798   #define CV_HAVE_CGROUPS 1
799 #endif
800 
801 #if defined CV_CPU_GROUPS_1
802 static inline
getFileContents(const char * filename)803 std::string getFileContents(const char *filename)
804 {
805     std::ifstream ifs(filename);
806     if (!ifs.is_open())
807         return std::string();
808 
809     std::string content( (std::istreambuf_iterator<char>(ifs) ),
810                          (std::istreambuf_iterator<char>()    ) );
811 
812     if (ifs.fail())
813         return std::string();
814 
815     return content;
816 }
817 
818 static inline
getNumberOfCPUsImpl(const char * filename)819 int getNumberOfCPUsImpl(const char *filename)
820 {
821    std::string file_contents = getFileContents(filename);
822    if(file_contents.empty())
823        return 0;
824 
825    char *pbuf = const_cast<char*>(file_contents.c_str());
826    //parse string of form "0-1,3,5-7,10,13-15"
827    int cpusAvailable = 0;
828 
829    while(*pbuf)
830    {
831       const char* pos = pbuf;
832       bool range = false;
833       while(*pbuf && *pbuf != ',')
834       {
835           if(*pbuf == '-') range = true;
836           ++pbuf;
837       }
838       if(*pbuf) *pbuf++ = 0;
839       if(!range)
840         ++cpusAvailable;
841       else
842       {
843           int rstart = 0, rend = 0;
844           sscanf(pos, "%d-%d", &rstart, &rend);
845           cpusAvailable += rend - rstart + 1;
846       }
847 
848    }
849    return cpusAvailable;
850 }
851 #endif
852 
853 #if defined CV_HAVE_CGROUPS
854 static inline
getNumberOfCPUsCFS()855 unsigned getNumberOfCPUsCFS()
856 {
857     int cfs_quota = 0;
858     {
859         std::ifstream ss_period("/sys/fs/cgroup/cpu/cpu.cfs_quota_us", std::ios::in | std::ios::binary);
860         ss_period >> cfs_quota;
861 
862         if (ss_period.fail() || cfs_quota < 1) /* cfs_quota must not be 0 or negative */
863             return 0;
864     }
865 
866     int cfs_period = 0;
867     {
868         std::ifstream ss_quota("/sys/fs/cgroup/cpu/cpu.cfs_period_us", std::ios::in | std::ios::binary);
869         ss_quota >> cfs_period;
870 
871         if (ss_quota.fail() || cfs_period < 1)
872             return 0;
873     }
874 
875     return (unsigned)max(1, cfs_quota/cfs_period);
876 }
877 #endif
878 
879 template <typename T> static inline
minNonZero(const T & val_1,const T & val_2)880 T minNonZero(const T& val_1, const T& val_2)
881 {
882     if ((val_1 != 0) && (val_2 != 0))
883         return std::min(val_1, val_2);
884     return (val_1 != 0) ? val_1 : val_2;
885 }
886 
887 static
getNumberOfCPUs_()888 int getNumberOfCPUs_()
889 {
890     /*
891      * Logic here is to try different methods of getting CPU counts and return
892      * the minimum most value as it has high probablity of being right and safe.
893      * Return 1 if we get 0 or not found on all methods.
894     */
895 #if defined CV_CXX11 \
896     && !defined(__MINGW32__) /* not implemented (2020-03) */ \
897 
898     /*
899      * Check for this standard C++11 way, we do not return directly because
900      * running in a docker or K8s environment will mean this is the host
901      * machines config not the containers or pods and as per docs this value
902      * must be "considered only a hint".
903     */
904     unsigned ncpus = std::thread::hardware_concurrency(); /* If the value is not well defined or not computable, returns 0 */
905 #else
906     unsigned ncpus = 0; /* 0 means we have to find out some other way */
907 #endif
908 
909 #if defined _WIN32
910 
911     SYSTEM_INFO sysinfo = {};
912 #if (defined(_M_ARM) || defined(_M_ARM64) || defined(_M_X64) || defined(WINRT)) && _WIN32_WINNT >= 0x501
913     GetNativeSystemInfo( &sysinfo );
914 #else
915     GetSystemInfo( &sysinfo );
916 #endif
917     unsigned ncpus_sysinfo = sysinfo.dwNumberOfProcessors;
918     ncpus = minNonZero(ncpus, ncpus_sysinfo);
919 
920 #elif defined __APPLE__
921 
922     int numCPU=0;
923     int mib[4];
924     size_t len = sizeof(numCPU);
925 
926     /* set the mib for hw.ncpu */
927     mib[0] = CTL_HW;
928     mib[1] = HW_AVAILCPU;  // alternatively, try HW_NCPU;
929 
930     /* get the number of CPUs from the system */
931     sysctl(mib, 2, &numCPU, &len, NULL, 0);
932 
933     if( numCPU < 1 )
934     {
935         mib[1] = HW_NCPU;
936         sysctl( mib, 2, &numCPU, &len, NULL, 0 );
937 
938         if( numCPU < 1 )
939             numCPU = 1;
940     }
941 
942     ncpus = minNonZero(ncpus, (unsigned)numCPU);
943 
944 #elif defined CV_CPU_GROUPS_1
945 
946 #if defined CV_HAVE_CGROUPS
947     static unsigned ncpus_impl_cpuset = (unsigned)getNumberOfCPUsImpl("/sys/fs/cgroup/cpuset/cpuset.cpus");
948     ncpus = minNonZero(ncpus, ncpus_impl_cpuset);
949 
950     static unsigned ncpus_impl_cfs = getNumberOfCPUsCFS();
951     ncpus = minNonZero(ncpus, ncpus_impl_cfs);
952 #endif
953 
954     static unsigned ncpus_impl_devices = (unsigned)getNumberOfCPUsImpl("/sys/devices/system/cpu/online");
955     ncpus = minNonZero(ncpus, ncpus_impl_devices);
956 
957 #endif
958 
959 #if defined _GNU_SOURCE \
960     && !defined(__MINGW32__) /* not implemented (2020-03) */ \
961     && !defined(__EMSCRIPTEN__) \
962     && !defined(__ANDROID__)  // TODO: add check for modern Android NDK
963 
964     cpu_set_t cpu_set;
965     if (0 == sched_getaffinity(0, sizeof(cpu_set), &cpu_set))
966     {
967         unsigned cpu_count_cpu_set = CPU_COUNT(&cpu_set);
968         ncpus = minNonZero(ncpus, cpu_count_cpu_set);
969     }
970 
971 #endif
972 
973 #if !defined(_WIN32) && !defined(__APPLE__)
974 
975     static unsigned cpu_count_sysconf = (unsigned)sysconf( _SC_NPROCESSORS_ONLN );
976     ncpus = minNonZero(ncpus, cpu_count_sysconf);
977 
978 #endif
979 
980     return ncpus != 0 ? ncpus : 1;
981 }
982 
getNumberOfCPUs()983 int getNumberOfCPUs()
984 {
985     static int nCPUs = getNumberOfCPUs_();
986     return nCPUs;  // cached value
987 }
988 
currentParallelFramework()989 const char* currentParallelFramework()
990 {
991     std::shared_ptr<ParallelForAPI>& api = getCurrentParallelForAPI();
992     if (api)
993     {
994         return api->getName();
995     }
996 #ifdef CV_PARALLEL_FRAMEWORK
997     return CV_PARALLEL_FRAMEWORK;
998 #else
999     return NULL;
1000 #endif
1001 }
1002 
1003 }  // namespace cv::
1004 
cvSetNumThreads(int nt)1005 CV_IMPL void cvSetNumThreads(int nt)
1006 {
1007     cv::setNumThreads(nt);
1008 }
1009 
cvGetNumThreads()1010 CV_IMPL int cvGetNumThreads()
1011 {
1012     return cv::getNumThreads();
1013 }
1014 
cvGetThreadNum()1015 CV_IMPL int cvGetThreadNum()
1016 {
1017     return cv::getThreadNum();
1018 }
1019