1 // This file is part of OpenCV project.
2 // It is subject to the license terms in the LICENSE file found in the top-level directory
3 // of this distribution and at http://opencv.org/license.html.
4 
5 #include "precomp.hpp"
6 
7 #include "parallel_impl.hpp"
8 
9 #ifdef HAVE_PTHREADS_PF
10 #include <pthread.h>
11 
12 #include <opencv2/core/utils/configuration.private.hpp>
13 
14 #include <opencv2/core/utils/logger.defines.hpp>
15 //#undef CV_LOG_STRIP_LEVEL
16 //#define CV_LOG_STRIP_LEVEL CV_LOG_LEVEL_VERBOSE + 1
17 #include <opencv2/core/utils/logger.hpp>
18 
19 #include <opencv2/core/utils/trace.private.hpp>
20 
21 //#define CV_PROFILE_THREADS 64
22 //#define getTickCount getCPUTickCount  // use this if getTickCount() calls are expensive (and getCPUTickCount() is accurate)
23 
24 //#define CV_USE_GLOBAL_WORKERS_COND_VAR  // not effective on many-core systems (10+)
25 
26 #include <atomic>
27 
28 // Spin lock's OS-level yield
29 #ifdef DECLARE_CV_YIELD
30 DECLARE_CV_YIELD
31 #endif
32 #ifndef CV_YIELD
33 # include <thread>
34 # define CV_YIELD() std::this_thread::yield()
35 #endif // CV_YIELD
36 
37 // Spin lock's CPU-level yield (required for Hyper-Threading)
38 #ifdef DECLARE_CV_PAUSE
39 DECLARE_CV_PAUSE
40 #endif
41 #ifndef CV_PAUSE
42 # if defined __GNUC__ && (defined __i386__ || defined __x86_64__)
43 #   if !defined(__SSE2__)
cv_non_sse_mm_pause()44       static inline void cv_non_sse_mm_pause() { __asm__ __volatile__ ("rep; nop"); }
45 #     define _mm_pause cv_non_sse_mm_pause
46 #   endif
47 #   define CV_PAUSE(v) do { for (int __delay = (v); __delay > 0; --__delay) { _mm_pause(); } } while (0)
48 # elif defined __GNUC__ && defined __aarch64__
49 #   define CV_PAUSE(v) do { for (int __delay = (v); __delay > 0; --__delay) { asm volatile("yield" ::: "memory"); } } while (0)
50 # elif defined __GNUC__ && defined __arm__
51 #   define CV_PAUSE(v) do { for (int __delay = (v); __delay > 0; --__delay) { asm volatile("" ::: "memory"); } } while (0)
52 # elif defined __GNUC__ && defined __mips__ && __mips_isa_rev >= 2
53 #   define CV_PAUSE(v) do { for (int __delay = (v); __delay > 0; --__delay) { asm volatile("pause" ::: "memory"); } } while (0)
54 # elif defined __GNUC__ && defined __PPC64__
55 #   define CV_PAUSE(v) do { for (int __delay = (v); __delay > 0; --__delay) { asm volatile("or 27,27,27" ::: "memory"); } } while (0)
56 # elif defined __GNUC__ && defined __riscv
57 // PAUSE HINT is not part of RISC-V ISA yet, but is under discussion now. For details see:
58 // https://github.com/riscv/riscv-isa-manual/pull/398
59 // https://github.com/riscv/riscv-isa-manual/issues/43
60 // #   define CV_PAUSE(v) do { for (int __delay = (v); __delay > 0; --__delay) { asm volatile("pause"); } } while (0)
61 #   define CV_PAUSE(v) do { for (int __delay = (v); __delay > 0; --__delay) { asm volatile("nop"); } } while (0)
62 # else
63 #   warning "Can't detect 'pause' (CPU-yield) instruction on the target platform. Specify CV_PAUSE() definition via compiler flags."
64 #   define CV_PAUSE(...) do { /* no-op: works, but not effective */ } while (0)
65 # endif
66 #endif // CV_PAUSE
67 
68 
69 namespace cv
70 {
71 
72 static int CV_ACTIVE_WAIT_PAUSE_LIMIT = (int)utils::getConfigurationParameterSizeT("OPENCV_THREAD_POOL_ACTIVE_WAIT_PAUSE_LIMIT", 16);  // iterations
73 static int CV_WORKER_ACTIVE_WAIT = (int)utils::getConfigurationParameterSizeT("OPENCV_THREAD_POOL_ACTIVE_WAIT_WORKER", 2000);  // iterations
74 static int CV_MAIN_THREAD_ACTIVE_WAIT = (int)utils::getConfigurationParameterSizeT("OPENCV_THREAD_POOL_ACTIVE_WAIT_MAIN", 10000); // iterations
75 
76 static int CV_WORKER_ACTIVE_WAIT_THREADS_LIMIT = (int)utils::getConfigurationParameterSizeT("OPENCV_THREAD_POOL_ACTIVE_WAIT_THREADS_LIMIT", 0); // number of real cores
77 
78 class WorkerThread;
79 class ParallelJob;
80 
81 class ThreadPool
82 {
83 public:
instance()84     static ThreadPool& instance()
85     {
86         CV_SINGLETON_LAZY_INIT_REF(ThreadPool, new ThreadPool())
87     }
88 
stop()89     static void stop()
90     {
91         ThreadPool& manager = instance();
92         manager.reconfigure(0);
93     }
94 
reconfigure(unsigned new_threads_count)95     void reconfigure(unsigned new_threads_count)
96     {
97         if (new_threads_count == threads.size())
98             return;
99         pthread_mutex_lock(&mutex);
100         reconfigure_(new_threads_count);
101         pthread_mutex_unlock(&mutex);
102     }
103     bool reconfigure_(unsigned new_threads_count); // internal implementation
104 
105     void run(const Range& range, const ParallelLoopBody& body, double nstripes);
106 
107     size_t getNumOfThreads();
108 
109     void setNumOfThreads(unsigned n);
110 
111     ThreadPool();
112 
113     ~ThreadPool();
114 
115     unsigned num_threads;
116 
117     pthread_mutex_t mutex;  // guards fields (job/threads) from non-worker threads (concurrent parallel_for calls)
118 #if defined(CV_USE_GLOBAL_WORKERS_COND_VAR)
119     pthread_cond_t cond_thread_wake;
120 #endif
121 
122     pthread_mutex_t mutex_notify;
123     pthread_cond_t cond_thread_task_complete;
124 
125     std::vector< Ptr<WorkerThread> > threads;
126 
127     Ptr<ParallelJob> job;
128 
129 #ifdef CV_PROFILE_THREADS
130     double tickFreq;
131     int64 jobSubmitTime;
132     struct ThreadStatistics
133     {
ThreadStatisticscv::ThreadPool::ThreadStatistics134         ThreadStatistics() : threadWait(0)
135         {
136             reset();
137         }
resetcv::ThreadPool::ThreadStatistics138         void reset()
139         {
140             threadWake = 0;
141             threadExecuteStart = 0;
142             threadExecuteStop = 0;
143             executedTasks = 0;
144             keepActive = false;
145             threadPing = getTickCount();
146         }
147         int64 threadWait; // don't reset by default
148         int64 threadPing; // don't reset by default
149         int64 threadWake;
150         int64 threadExecuteStart;
151         int64 threadExecuteStop;
152         int64 threadFree;
153         unsigned executedTasks;
154         bool keepActive;
155 
156         int64 dummy_[8]; // separate cache lines
157 
dumpcv::ThreadPool::ThreadStatistics158         void dump(int id, int64 baseTime, double tickFreq)
159         {
160             if (id < 0)
161                 std::cout << "Main: ";
162             else
163                 printf("T%03d: ", id + 2);
164             printf("wait=% 10.1f   ping=% 6.1f",
165                     threadWait > 0 ? (threadWait - baseTime) / tickFreq * 1e6 : -0.0,
166                     threadPing > 0 ? (threadPing - baseTime) / tickFreq * 1e6 : -0.0);
167             if (threadWake > 0)
168                 printf("   wake=% 6.1f",
169                     (threadWake > 0 ? (threadWake - baseTime) / tickFreq * 1e6 : -0.0));
170             if (threadExecuteStart > 0)
171             {
172                 printf("   exec=% 6.1f - % 6.1f   tasksDone=%5u   free=% 6.1f",
173                     (threadExecuteStart > 0 ? (threadExecuteStart - baseTime) / tickFreq * 1e6 : -0.0),
174                     (threadExecuteStop > 0 ? (threadExecuteStop - baseTime) / tickFreq * 1e6 : -0.0),
175                     executedTasks,
176                     (threadFree > 0 ? (threadFree - baseTime) / tickFreq * 1e6 : -0.0));
177                 if (id >= 0)
178                     printf(" active=%s\n", keepActive ? "true" : "false");
179                 else
180                     printf("\n");
181             }
182             else
183                 printf("   ------------------------------------------------------------------------------\n");
184         }
185     };
186     ThreadStatistics threads_stat[CV_PROFILE_THREADS]; // 0 - main thread, 1..N - worker threads
187 #endif
188 
189 };
190 
191 class WorkerThread
192 {
193 public:
194     ThreadPool& thread_pool;
195     const unsigned id;
196     pthread_t posix_thread;
197     bool is_created;
198 
199     std::atomic<bool> stop_thread;
200 
201     std::atomic<bool> has_wake_signal;
202 
203     Ptr<ParallelJob> job;
204 
205     pthread_mutex_t mutex;
206 #if !defined(CV_USE_GLOBAL_WORKERS_COND_VAR)
207     volatile bool isActive;
208     pthread_cond_t cond_thread_wake;
209 #endif
210 
WorkerThread(ThreadPool & thread_pool_,unsigned id_)211     WorkerThread(ThreadPool& thread_pool_, unsigned id_) :
212         thread_pool(thread_pool_),
213         id(id_),
214         posix_thread(0),
215         is_created(false),
216         stop_thread(false),
217         has_wake_signal(false)
218 #if !defined(CV_USE_GLOBAL_WORKERS_COND_VAR)
219         , isActive(true)
220 #endif
221     {
222         CV_LOG_VERBOSE(NULL, 1, "MainThread: initializing new worker: " << id);
223         int res = pthread_mutex_init(&mutex, NULL);
224         if (res != 0)
225         {
226             CV_LOG_ERROR(NULL, id << ": Can't create thread mutex: res = " << res);
227             return;
228         }
229 #if !defined(CV_USE_GLOBAL_WORKERS_COND_VAR)
230         res = pthread_cond_init(&cond_thread_wake, NULL);
231         if (res != 0)
232         {
233             CV_LOG_ERROR(NULL, id << ": Can't create thread condition variable: res = " << res);
234             return;
235         }
236 #endif
237         res = pthread_create(&posix_thread, NULL, thread_loop_wrapper, (void*)this);
238         if (res != 0)
239         {
240             CV_LOG_ERROR(NULL, id << ": Can't spawn new thread: res = " << res);
241         }
242         else
243         {
244             is_created = true;
245         }
246     }
247 
~WorkerThread()248     ~WorkerThread()
249     {
250         CV_LOG_VERBOSE(NULL, 1, "MainThread: destroy worker thread: " << id);
251         if (is_created)
252         {
253             if (!stop_thread)
254             {
255                 pthread_mutex_lock(&mutex);  // to avoid signal miss due pre-check
256                 stop_thread = true;
257                 pthread_mutex_unlock(&mutex);
258 #if defined(CV_USE_GLOBAL_WORKERS_COND_VAR)
259                 pthread_cond_broadcast(&thread_pool.cond_thread_wake);
260 #else
261                 pthread_cond_signal(&cond_thread_wake);
262 #endif
263             }
264             pthread_join(posix_thread, NULL);
265         }
266 #if !defined(CV_USE_GLOBAL_WORKERS_COND_VAR)
267         pthread_cond_destroy(&cond_thread_wake);
268 #endif
269         pthread_mutex_destroy(&mutex);
270     }
271 
272     void thread_body();
thread_loop_wrapper(void * thread_object)273     static void* thread_loop_wrapper(void* thread_object)
274     {
275 #ifdef OPENCV_WITH_ITT
276         __itt_thread_set_name(cv::format("OpenCVThread-%03d", cv::utils::getThreadID()).c_str());
277 #endif
278         ((WorkerThread*)thread_object)->thread_body();
279         return 0;
280     }
281 };
282 
283 class ParallelJob
284 {
285 public:
ParallelJob(const ThreadPool & thread_pool_,const Range & range_,const ParallelLoopBody & body_,int nstripes_)286     ParallelJob(const ThreadPool& thread_pool_, const Range& range_, const ParallelLoopBody& body_, int nstripes_) :
287         thread_pool(thread_pool_),
288         body(body_),
289         range(range_),
290         nstripes((unsigned)nstripes_),
291         is_completed(false)
292     {
293         CV_LOG_VERBOSE(NULL, 5, "ParallelJob::ParallelJob(" << (void*)this << ")");
294         current_task.store(0, std::memory_order_relaxed);
295         active_thread_count.store(0, std::memory_order_relaxed);
296         completed_thread_count.store(0, std::memory_order_relaxed);
297         dummy0_[0] = 0, dummy1_[0] = 0, dummy2_[0] = 0; // compiler warning
298     }
299 
~ParallelJob()300     ~ParallelJob()
301     {
302         CV_LOG_VERBOSE(NULL, 5, "ParallelJob::~ParallelJob(" << (void*)this << ")");
303     }
304 
execute(bool is_worker_thread)305     unsigned execute(bool is_worker_thread)
306     {
307         unsigned executed_tasks = 0;
308         const int task_count = range.size();
309         const int remaining_multiplier = std::min(nstripes,
310                 std::max(
311                         std::min(100u, thread_pool.num_threads * 4),
312                         thread_pool.num_threads * 2
313                 ));  // experimental value
314         for (;;)
315         {
316             int chunk_size = std::max(1, (task_count - current_task) / remaining_multiplier);
317             int id = current_task.fetch_add(chunk_size, std::memory_order_seq_cst);
318             if (id >= task_count)
319                 break; // no more free tasks
320 
321             executed_tasks += chunk_size;
322             int start_id = id;
323             int end_id = std::min(task_count, id + chunk_size);
324             CV_LOG_VERBOSE(NULL, 9, "Thread: job " << start_id << "-" << end_id);
325 
326             //TODO: if (not pending exception)
327             {
328                 body.operator()(Range(range.start + start_id, range.start + end_id));
329             }
330             if (is_worker_thread && is_completed)
331             {
332                 CV_LOG_ERROR(NULL, "\t\t\t\tBUG! Job: " << (void*)this << " " << id << " " << active_thread_count << " " << completed_thread_count);
333                 CV_Assert(!is_completed); // TODO Dbg this
334             }
335         }
336         return executed_tasks;
337     }
338 
339     const ThreadPool& thread_pool;
340     const ParallelLoopBody& body;
341     const Range range;
342     const unsigned nstripes;
343 
344     std::atomic<int> current_task;  // next free part of job
345     int64 dummy0_[8];  // avoid cache-line reusing for the same atomics
346 
347     std::atomic<int> active_thread_count;  // number of threads worked on this job
348     int64 dummy1_[8];  // avoid cache-line reusing for the same atomics
349 
350     std::atomic<int> completed_thread_count;  // number of threads completed any activities on this job
351     int64 dummy2_[8];  // avoid cache-line reusing for the same atomics
352 
353     std::atomic<bool> is_completed;
354 
355     // TODO exception handling
356 };
357 
358 
359 // Disable thread sanitization check when CV_USE_GLOBAL_WORKERS_COND_VAR is not
360 // set because it triggers as the main thread reads isActive while the children
361 // thread writes it (but it all works out because a mutex is locked in the main
362 // thread and isActive re-checked).
363 // This is to solve issue #19463.
364 #if !defined(CV_USE_GLOBAL_WORKERS_COND_VAR) && defined(__clang__) && defined(__has_feature)
365 #if __has_feature(thread_sanitizer)
366 __attribute__((no_sanitize("thread")))
367 #endif
368 #endif
thread_body()369 void WorkerThread::thread_body()
370 {
371     (void)cv::utils::getThreadID(); // notify OpenCV about new thread
372     CV_LOG_VERBOSE(NULL, 5, "Thread: new thread: " << id);
373 
374     bool allow_active_wait = true;
375 
376 #ifdef CV_PROFILE_THREADS
377     ThreadPool::ThreadStatistics& stat = thread_pool.threads_stat[id + 1];
378 #endif
379 
380     while (!stop_thread)
381     {
382         CV_LOG_VERBOSE(NULL, 5, "Thread: ... loop iteration: allow_active_wait=" << allow_active_wait << "   has_wake_signal=" << has_wake_signal);
383         if (allow_active_wait && CV_WORKER_ACTIVE_WAIT > 0)
384         {
385             allow_active_wait = false;
386             for (int i = 0; i < CV_WORKER_ACTIVE_WAIT; i++)
387             {
388                 if (has_wake_signal)
389                     break;
390                 if (CV_ACTIVE_WAIT_PAUSE_LIMIT > 0 && (i < CV_ACTIVE_WAIT_PAUSE_LIMIT || (i & 1)))
391                     CV_PAUSE(16);
392                 else
393                     CV_YIELD();
394             }
395         }
396         pthread_mutex_lock(&mutex);
397 #ifdef CV_PROFILE_THREADS
398         stat.threadWait = getTickCount();
399 #endif
400         while (!has_wake_signal) // to handle spurious wakeups
401         {
402             //CV_LOG_VERBOSE(NULL, 5, "Thread: wait (sleep) ...");
403 #if defined(CV_USE_GLOBAL_WORKERS_COND_VAR)
404             pthread_cond_wait(&thread_pool.cond_thread_wake, &mutex);
405 #else
406             isActive = false;
407             pthread_cond_wait(&cond_thread_wake, &mutex);
408             isActive = true;
409 #endif
410             CV_LOG_VERBOSE(NULL, 5, "Thread: wake ... (has_wake_signal=" << has_wake_signal << " stop_thread=" << stop_thread << ")")
411         }
412 #ifdef CV_PROFILE_THREADS
413         stat.threadWake = getTickCount();
414 #endif
415 
416         CV_LOG_VERBOSE(NULL, 5, "Thread: checking for new job");
417         if (CV_WORKER_ACTIVE_WAIT_THREADS_LIMIT == 0)
418             allow_active_wait = true;
419         Ptr<ParallelJob> j_ptr; swap(j_ptr, job);
420         has_wake_signal = false;
421         pthread_mutex_unlock(&mutex);
422 
423         if (!stop_thread)
424         {
425             ParallelJob* j = j_ptr;
426             if (j)
427             {
428                 CV_LOG_VERBOSE(NULL, 5, "Thread: job size=" << j->range.size() << " done=" << j->current_task);
429                 if (j->current_task < j->range.size())
430                 {
431                     int other = j->active_thread_count.fetch_add(1, std::memory_order_seq_cst);
432                     CV_LOG_VERBOSE(NULL, 5, "Thread: processing new job (with " << other << " other threads)"); CV_UNUSED(other);
433 #ifdef CV_PROFILE_THREADS
434                     stat.threadExecuteStart = getTickCount();
435                     stat.executedTasks = j->execute(true);
436                     stat.threadExecuteStop = getTickCount();
437 #else
438                     j->execute(true);
439 #endif
440                     int completed = j->completed_thread_count.fetch_add(1, std::memory_order_seq_cst) + 1;
441                     int active = j->active_thread_count.load(std::memory_order_acquire);
442                     if (CV_WORKER_ACTIVE_WAIT_THREADS_LIMIT > 0)
443                     {
444                         allow_active_wait = true;
445                         if (active >= CV_WORKER_ACTIVE_WAIT_THREADS_LIMIT && (id & 1) == 0) // turn off a half of threads
446                             allow_active_wait = false;
447                     }
448                     CV_LOG_VERBOSE(NULL, 5, "Thread: completed job processing: " << active << " " << completed);
449                     if (active == completed)
450                     {
451                         bool need_signal = !j->is_completed;
452                         j->is_completed = true;
453                         j = NULL; j_ptr.release();
454                         if (need_signal)
455                         {
456                             CV_LOG_VERBOSE(NULL, 5, "Thread: job finished => notifying the main thread");
457                             pthread_mutex_lock(&thread_pool.mutex_notify);  // to avoid signal miss due pre-check condition
458                             // empty
459                             pthread_mutex_unlock(&thread_pool.mutex_notify);
460                             pthread_cond_broadcast/*pthread_cond_signal*/(&thread_pool.cond_thread_task_complete);
461                         }
462                     }
463                 }
464                 else
465                 {
466                     CV_LOG_VERBOSE(NULL, 5, "Thread: no free job tasks");
467                 }
468             }
469         }
470 #ifdef CV_PROFILE_THREADS
471         stat.threadFree = getTickCount();
472         stat.keepActive = allow_active_wait;
473 #endif
474     }
475 }
476 
ThreadPool()477 ThreadPool::ThreadPool()
478 {
479 #ifdef CV_PROFILE_THREADS
480     tickFreq = getTickFrequency();
481 #endif
482 
483     int res = 0;
484     res |= pthread_mutex_init(&mutex, NULL);
485     res |= pthread_mutex_init(&mutex_notify, NULL);
486 #if defined(CV_USE_GLOBAL_WORKERS_COND_VAR)
487     res |= pthread_cond_init(&cond_thread_wake, NULL);
488 #endif
489     res |= pthread_cond_init(&cond_thread_task_complete, NULL);
490 
491     if (0 != res)
492     {
493         CV_LOG_FATAL(NULL, "Failed to initialize ThreadPool (pthreads)");
494     }
495     num_threads = defaultNumberOfThreads();
496 }
497 
reconfigure_(unsigned new_threads_count)498 bool ThreadPool::reconfigure_(unsigned new_threads_count)
499 {
500     if (new_threads_count == threads.size())
501         return false;
502 
503     if (new_threads_count < threads.size())
504     {
505         CV_LOG_VERBOSE(NULL, 1, "MainThread: reduce worker pool: " << threads.size() << " => " << new_threads_count);
506         std::vector< Ptr<WorkerThread> > release_threads(threads.size() - new_threads_count);
507         for (size_t i = new_threads_count; i < threads.size(); ++i)
508         {
509             pthread_mutex_lock(&threads[i]->mutex);  // to avoid signal miss due pre-check
510             threads[i]->stop_thread = true;
511             threads[i]->has_wake_signal = true;
512 #if !defined(CV_USE_GLOBAL_WORKERS_COND_VAR)
513             pthread_mutex_unlock(&threads[i]->mutex);
514             pthread_cond_broadcast/*pthread_cond_signal*/(&threads[i]->cond_thread_wake); // wake thread
515 #else
516             pthread_mutex_unlock(&threads[i]->mutex);
517 #endif
518             std::swap(threads[i], release_threads[i - new_threads_count]);
519         }
520 #if defined(CV_USE_GLOBAL_WORKERS_COND_VAR)
521         CV_LOG_VERBOSE(NULL, 1, "MainThread: notify worker threads about termination...");
522         pthread_cond_broadcast(&cond_thread_wake); // wake all threads
523 #endif
524         threads.resize(new_threads_count);
525         release_threads.clear();  // calls thread_join which want to lock mutex
526         return false;
527     }
528     else
529     {
530         CV_LOG_VERBOSE(NULL, 1, "MainThread: upgrade worker pool: " << threads.size() << " => " << new_threads_count);
531         for (size_t i = threads.size(); i < new_threads_count; ++i)
532         {
533             threads.push_back(Ptr<WorkerThread>(new WorkerThread(*this, (unsigned)i))); // spawn more threads
534         }
535     }
536     return false;
537 }
538 
~ThreadPool()539 ThreadPool::~ThreadPool()
540 {
541     reconfigure(0);
542     pthread_cond_destroy(&cond_thread_task_complete);
543 #if defined(CV_USE_GLOBAL_WORKERS_COND_VAR)
544     pthread_cond_destroy(&cond_thread_wake);
545 #endif
546     pthread_mutex_destroy(&mutex);
547     pthread_mutex_destroy(&mutex_notify);
548 }
549 
run(const Range & range,const ParallelLoopBody & body,double nstripes)550 void ThreadPool::run(const Range& range, const ParallelLoopBody& body, double nstripes)
551 {
552     CV_LOG_VERBOSE(NULL, 1, "MainThread: new parallel job: num_threads=" << num_threads << "   range=" << range.size() << "   nstripes=" << nstripes << "   job=" << (void*)job);
553 #ifdef CV_PROFILE_THREADS
554     jobSubmitTime = getTickCount();
555     threads_stat[0].reset();
556     threads_stat[0].threadWait = jobSubmitTime;
557     threads_stat[0].threadWake = jobSubmitTime;
558 #endif
559     if (getNumOfThreads() > 1 &&
560         job == NULL &&
561         (range.size() * nstripes >= 2 || (range.size() > 1 && nstripes <= 0))
562     )
563     {
564         pthread_mutex_lock(&mutex);
565         if (job != NULL)
566         {
567             pthread_mutex_unlock(&mutex);
568             body(range);
569             return;
570         }
571         reconfigure_(num_threads - 1);
572 
573         {
574             CV_LOG_VERBOSE(NULL, 1, "MainThread: initialize parallel job: " << range.size());
575             job = Ptr<ParallelJob>(new ParallelJob(*this, range, body, nstripes));
576             pthread_mutex_unlock(&mutex);
577 
578             CV_LOG_VERBOSE(NULL, 5, "MainThread: wake worker threads...");
579             for (size_t i = 0; i < threads.size(); ++i)
580             {
581                 WorkerThread& thread = *(threads[i].get());
582                 if (
583 #if !defined(CV_USE_GLOBAL_WORKERS_COND_VAR)
584                         thread.isActive ||
585 #endif
586                         thread.has_wake_signal
587                         || !thread.job.empty()  // #10881
588                 )
589                 {
590                     pthread_mutex_lock(&thread.mutex);
591                     thread.job = job;
592 #if !defined(CV_USE_GLOBAL_WORKERS_COND_VAR)
593                     bool isActive = thread.isActive;
594 #endif
595                     thread.has_wake_signal = true;
596 #ifdef CV_PROFILE_THREADS
597                     threads_stat[i + 1].reset();
598 #endif
599                     pthread_mutex_unlock(&thread.mutex);
600 #if !defined(CV_USE_GLOBAL_WORKERS_COND_VAR)
601                     if (!isActive)
602                     {
603                         pthread_cond_broadcast/*pthread_cond_signal*/(&thread.cond_thread_wake); // wake thread
604                     }
605 #endif
606                 }
607                 else
608                 {
609                     CV_Assert(thread.job.empty());
610                     thread.job = job;
611                     thread.has_wake_signal = true;
612 #ifdef CV_PROFILE_THREADS
613                     threads_stat[i + 1].reset();
614 #endif
615 #if !defined(CV_USE_GLOBAL_WORKERS_COND_VAR)
616                     pthread_cond_broadcast/*pthread_cond_signal*/(&thread.cond_thread_wake); // wake thread
617 #endif
618                 }
619             }
620 #ifdef CV_PROFILE_THREADS
621             threads_stat[0].threadPing = getTickCount();
622 #endif
623 #if defined(CV_USE_GLOBAL_WORKERS_COND_VAR)
624             pthread_cond_broadcast(&cond_thread_wake); // wake all threads
625 #endif
626 #ifdef CV_PROFILE_THREADS
627             threads_stat[0].threadWake = getTickCount();
628 #endif
629             CV_LOG_VERBOSE(NULL, 5, "MainThread: wake worker threads... (done)");
630 
631             {
632                 ParallelJob& j = *(this->job);
633 #ifdef CV_PROFILE_THREADS
634                 threads_stat[0].threadExecuteStart = getTickCount();
635                 threads_stat[0].executedTasks = j.execute(false);
636                 threads_stat[0].threadExecuteStop = getTickCount();
637 #else
638                 j.execute(false);
639 #endif
640                 CV_Assert(j.current_task >= j.range.size());
641                 CV_LOG_VERBOSE(NULL, 5, "MainThread: complete self-tasks: " << j.active_thread_count << " " << j.completed_thread_count);
642                 if (job->is_completed || j.active_thread_count == 0)
643                 {
644                     job->is_completed = true;
645                     CV_LOG_VERBOSE(NULL, 5, "MainThread: no WIP worker threads");
646                 }
647                 else
648                 {
649                     if (CV_MAIN_THREAD_ACTIVE_WAIT > 0)
650                     {
651                         for (int i = 0; i < CV_MAIN_THREAD_ACTIVE_WAIT; i++)  // don't spin too much in any case (inaccurate getTickCount())
652                         {
653                             if (job->is_completed)
654                             {
655                                 CV_LOG_VERBOSE(NULL, 5, "MainThread: job finalize (active wait) " << j.active_thread_count << " " << j.completed_thread_count);
656                                 break;
657                             }
658                             if (CV_ACTIVE_WAIT_PAUSE_LIMIT > 0 && (i < CV_ACTIVE_WAIT_PAUSE_LIMIT || (i & 1)))
659                                 CV_PAUSE(16);
660                             else
661                                 CV_YIELD();
662                         }
663                     }
664                     if (!job->is_completed)
665                     {
666                         CV_LOG_VERBOSE(NULL, 5, "MainThread: prepare wait " << j.active_thread_count << " " << j.completed_thread_count);
667                         pthread_mutex_lock(&mutex_notify);
668                         for (;;)
669                         {
670                             if (job->is_completed)
671                             {
672                                 CV_LOG_VERBOSE(NULL, 5, "MainThread: job finalize (wait) " << j.active_thread_count << " " << j.completed_thread_count);
673                                 break;
674                             }
675                             CV_LOG_VERBOSE(NULL, 5, "MainThread: wait completion (sleep) ...");
676                             pthread_cond_wait(&cond_thread_task_complete, &mutex_notify);
677                             CV_LOG_VERBOSE(NULL, 5, "MainThread: wake");
678                         }
679                         pthread_mutex_unlock(&mutex_notify);
680                     }
681                 }
682             }
683 #ifdef CV_PROFILE_THREADS
684             threads_stat[0].threadFree = getTickCount();
685             std::cout << "Job: sz=" << range.size() << " nstripes=" << nstripes << "    Time: " << (threads_stat[0].threadFree - jobSubmitTime) / tickFreq * 1e6 << " usec" << std::endl;
686             for (int i = 0; i < (int)threads.size() + 1; i++)
687             {
688                 threads_stat[i].dump(i - 1, jobSubmitTime, tickFreq);
689             }
690 #endif
691             if (job)
692             {
693                 pthread_mutex_lock(&mutex);
694                 CV_LOG_VERBOSE(NULL, 5, "MainThread: job release");
695                 CV_Assert(job->is_completed);
696                 job.release();
697                 pthread_mutex_unlock(&mutex);
698             }
699         }
700     }
701     else
702     {
703         body(range);
704     }
705 }
706 
getNumOfThreads()707 size_t ThreadPool::getNumOfThreads()
708 {
709     return num_threads;
710 }
711 
setNumOfThreads(unsigned n)712 void ThreadPool::setNumOfThreads(unsigned n)
713 {
714     if (n != num_threads)
715     {
716         num_threads = n;
717         if (n == 1)
718            if (job == NULL) reconfigure(0);  // stop worker threads immediately
719     }
720 }
721 
parallel_pthreads_get_threads_num()722 size_t parallel_pthreads_get_threads_num()
723 {
724     return ThreadPool::instance().getNumOfThreads();
725 }
726 
parallel_pthreads_set_threads_num(int num)727 void parallel_pthreads_set_threads_num(int num)
728 {
729     if(num < 0)
730     {
731         ThreadPool::instance().setNumOfThreads(0);
732     }
733     else
734     {
735         ThreadPool::instance().setNumOfThreads(unsigned(num));
736     }
737 }
738 
parallel_for_pthreads(const Range & range,const ParallelLoopBody & body,double nstripes)739 void parallel_for_pthreads(const Range& range, const ParallelLoopBody& body, double nstripes)
740 {
741     ThreadPool::instance().run(range, body, nstripes);
742 }
743 
744 }
745 
746 #endif
747