1 /* 2 * Copyright (c) 2012-2014, Bruno Levy 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * * Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * * Redistributions in binary form must reproduce the above copyright notice, 11 * this list of conditions and the following disclaimer in the documentation 12 * and/or other materials provided with the distribution. 13 * * Neither the name of the ALICE Project-Team nor the names of its 14 * contributors may be used to endorse or promote products derived from this 15 * software without specific prior written permission. 16 * 17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27 * POSSIBILITY OF SUCH DAMAGE. 28 * 29 * If you modify this software, you should include a notice giving the 30 * name of the person performing the modification, the date of modification, 31 * and the reason for such modification. 32 * 33 * Contact: Bruno Levy 34 * 35 * Bruno.Levy@inria.fr 36 * http://www.loria.fr/~levy 37 * 38 * ALICE Project 39 * LORIA, INRIA Lorraine, 40 * Campus Scientifique, BP 239 41 * 54506 VANDOEUVRE LES NANCY CEDEX 42 * FRANCE 43 * 44 */ 45 46 #include <geogram/basic/process.h> 47 #include <geogram/basic/logger.h> 48 #include <geogram/basic/environment.h> 49 #include <geogram/basic/string.h> 50 #include <geogram/basic/command_line.h> 51 #include <geogram/basic/stopwatch.h> 52 #include <thread> 53 #include <chrono> 54 55 #ifdef GEO_OPENMP 56 #include <omp.h> 57 #endif 58 59 namespace { 60 using namespace GEO; 61 62 ThreadManager_var thread_manager_; 63 int running_threads_invocations_ = 0; 64 65 bool multithreading_initialized_ = false; 66 bool multithreading_enabled_ = true; 67 68 index_t max_threads_initialized_ = false; 69 index_t max_threads_ = 0; 70 71 bool fpe_initialized_ = false; 72 bool fpe_enabled_ = false; 73 74 bool cancel_initialized_ = false; 75 bool cancel_enabled_ = false; 76 77 double start_time_ = 0.0; 78 79 /************************************************************************/ 80 81 /** 82 * \brief Process Environment 83 * \details This environment exposes and controls the configuration of the 84 * Process module. 85 */ 86 class ProcessEnvironment : public Environment { 87 protected: 88 /** 89 * \brief Gets a Process property 90 * \details Retrieves the value of the property \p name and stores it 91 * in \p value. The property must be a valid Process property (see 92 * sys:xxx properties in Vorpaline's help). 93 * \param[in] name name of the property 94 * \param[out] value receives the value of the property 95 * \retval true if the property is a valid Process property 96 * \retval false otherwise 97 * \see Environment::get_value() 98 */ get_local_value(const std::string & name,std::string & value) const99 virtual bool get_local_value( 100 const std::string& name, std::string& value 101 ) const { 102 if(name == "sys:nb_cores") { 103 value = String::to_string(Process::number_of_cores()); 104 return true; 105 } 106 if(name == "sys:multithread") { 107 value = String::to_string(multithreading_enabled_); 108 return true; 109 } 110 if(name == "sys:max_threads") { 111 value = String::to_string( 112 Process::maximum_concurrent_threads() 113 ); 114 return true; 115 } 116 if(name == "sys:FPE") { 117 value = String::to_string(fpe_enabled_); 118 return true; 119 } 120 if(name == "sys:cancel") { 121 value = String::to_string(cancel_enabled_); 122 return true; 123 } 124 if(name == "sys:assert") { 125 value = assert_mode() == ASSERT_THROW ? "throw" : "abort"; 126 return true; 127 } 128 return false; 129 } 130 131 /** 132 * \brief Sets a Process property 133 * \details Sets the property \p name with value \p value in the 134 * Process. The property must be a valid Process property (see sys:xxx 135 * properties in Vorpaline's help) and \p value must be a legal value 136 * for the property. 137 * \param[in] name name of the property 138 * \param[in] value value of the property 139 * \retval true if the property was successfully set 140 * \retval false otherwise 141 * \see Environment::set_value() 142 */ set_local_value(const std::string & name,const std::string & value)143 virtual bool set_local_value( 144 const std::string& name, const std::string& value 145 ) { 146 if(name == "sys:multithread") { 147 Process::enable_multithreading(String::to_bool(value)); 148 return true; 149 } 150 if(name == "sys:max_threads") { 151 Process::set_max_threads(String::to_uint(value)); 152 return true; 153 } 154 if(name == "sys:FPE") { 155 Process::enable_FPE(String::to_bool(value)); 156 return true; 157 } 158 if(name == "sys:cancel") { 159 Process::enable_cancel(String::to_bool(value)); 160 return true; 161 } 162 if(name == "sys:assert") { 163 if(value == "throw") { 164 set_assert_mode(ASSERT_THROW); 165 return true; 166 } 167 if(value == "abort") { 168 set_assert_mode(ASSERT_ABORT); 169 return true; 170 } 171 if(value == "breakpoint") { 172 set_assert_mode(ASSERT_BREAKPOINT); 173 return true; 174 } 175 Logger::err("Process") 176 << "Invalid value for property sys:abort: " 177 << value 178 << std::endl; 179 return false; 180 } 181 return false; 182 } 183 184 /** ProcessEnvironment destructor */ ~ProcessEnvironment()185 virtual ~ProcessEnvironment() { 186 } 187 }; 188 189 /************************************************************************/ 190 191 #ifdef GEO_OPENMP 192 193 /** 194 * \brief OpenMP Thread Manager 195 * \details 196 * OMPThreadManager is an implementation of ThreadManager that uses OpenMP 197 * for running concurrent threads and control critical sections. 198 */ 199 class GEOGRAM_API OMPThreadManager : public ThreadManager { 200 public: 201 /** 202 * \brief Creates and initializes the OpenMP ThreadManager 203 */ OMPThreadManager()204 OMPThreadManager() { 205 omp_init_lock(&lock_); 206 } 207 208 /** \copydoc GEO::ThreadManager::maximum_concurrent_threads() */ maximum_concurrent_threads()209 virtual index_t maximum_concurrent_threads() { 210 return Process::number_of_cores(); 211 } 212 213 /** \copydoc GEO::ThreadManager::enter_critical_section() */ enter_critical_section()214 virtual void enter_critical_section() { 215 omp_set_lock(&lock_); 216 } 217 218 /** \copydoc GEO::ThreadManager::leave_critical_section() */ leave_critical_section()219 virtual void leave_critical_section() { 220 omp_unset_lock(&lock_); 221 } 222 223 protected: 224 /** \brief OMPThreadManager destructor */ ~OMPThreadManager()225 virtual ~OMPThreadManager() { 226 omp_destroy_lock(&lock_); 227 } 228 229 /** \copydoc GEO::ThreadManager::run_concurrent_threads() */ run_concurrent_threads(ThreadGroup & threads,index_t max_threads)230 virtual void run_concurrent_threads( 231 ThreadGroup& threads, index_t max_threads 232 ) { 233 // TODO: take max_threads_ into account 234 geo_argused(max_threads); 235 236 #pragma omp parallel for schedule(dynamic) 237 for(int i = 0; i < int(threads.size()); i++) { 238 index_t ii = index_t(i); 239 set_thread_id(threads[ii],ii); 240 set_current_thread(threads[ii]); 241 threads[ii]->run(); 242 } 243 } 244 245 private: 246 omp_lock_t lock_; 247 }; 248 249 #endif 250 } 251 252 253 namespace { 254 /** 255 * \brief The (thread-local) variable that stores a 256 * pointer to the current thread. 257 * \details It cannot be a static member of class 258 * Thread, because Visual C++ does not accept 259 * to export thread local storage variables in 260 * DLLs. 261 */ 262 thread_local Thread* geo_current_thread_ = nullptr; 263 } 264 265 namespace GEO { 266 set_current(Thread * thread)267 void Thread::set_current(Thread* thread) { 268 geo_current_thread_ = thread; 269 } 270 current()271 Thread* Thread::current() { 272 return geo_current_thread_; 273 } 274 ~Thread()275 Thread::~Thread() { 276 } 277 278 /************************************************************************/ 279 ~ThreadManager()280 ThreadManager::~ThreadManager() { 281 } 282 run_threads(ThreadGroup & threads)283 void ThreadManager::run_threads(ThreadGroup& threads) { 284 index_t max_threads = maximum_concurrent_threads(); 285 if(Process::multithreading_enabled() && max_threads > 1) { 286 run_concurrent_threads(threads, max_threads); 287 } else { 288 for(index_t i = 0; i < threads.size(); i++) { 289 threads[i]->run(); 290 } 291 } 292 } 293 294 /************************************************************************/ 295 ~MonoThreadingThreadManager()296 MonoThreadingThreadManager::~MonoThreadingThreadManager() { 297 } 298 run_concurrent_threads(ThreadGroup & threads,index_t max_threads)299 void MonoThreadingThreadManager::run_concurrent_threads( 300 ThreadGroup& threads, index_t max_threads 301 ) { 302 geo_argused(threads); 303 geo_argused(max_threads); 304 geo_assert_not_reached; 305 } 306 maximum_concurrent_threads()307 index_t MonoThreadingThreadManager::maximum_concurrent_threads() { 308 return 1; 309 } 310 enter_critical_section()311 void MonoThreadingThreadManager::enter_critical_section() { 312 } 313 leave_critical_section()314 void MonoThreadingThreadManager::leave_critical_section() { 315 } 316 317 /************************************************************************/ 318 319 namespace Process { 320 321 // OS dependent functions implemented in process_unix.cpp and 322 // process_win.cpp 323 324 bool os_init_threads(); 325 void os_brute_force_kill(); 326 bool os_enable_FPE(bool flag); 327 bool os_enable_cancel(bool flag); 328 void os_install_signal_handlers(); 329 index_t os_number_of_cores(); 330 size_t os_used_memory(); 331 size_t os_max_used_memory(); 332 std::string os_executable_filename(); 333 initialize(int flags)334 void initialize(int flags) { 335 336 Environment* env = Environment::instance(); 337 env->add_environment(new ProcessEnvironment); 338 339 if(!os_init_threads()) { 340 #ifdef GEO_OPENMP 341 Logger::out("Process") 342 << "Using OpenMP threads" 343 << std::endl; 344 set_thread_manager(new OMPThreadManager); 345 #else 346 Logger::out("Process") 347 << "Multithreading not supported, going monothread" 348 << std::endl; 349 set_thread_manager(new MonoThreadingThreadManager); 350 #endif 351 } 352 353 if( 354 (::getenv("GEO_NO_SIGNAL_HANDLER") == nullptr) && 355 (flags & GEOGRAM_INSTALL_HANDLERS) != 0 356 ) { 357 os_install_signal_handlers(); 358 } 359 360 // Initialize Process default values 361 enable_multithreading(multithreading_enabled_); 362 set_max_threads(number_of_cores()); 363 enable_FPE(fpe_enabled_); 364 enable_cancel(cancel_enabled_); 365 366 start_time_ = SystemStopwatch::now(); 367 } 368 show_stats()369 void show_stats() { 370 371 Logger::out("Process") << "Total elapsed time: " 372 << SystemStopwatch::now() - start_time_ 373 << "s" << std::endl; 374 375 const size_t K=size_t(1024); 376 const size_t M=K*K; 377 const size_t G=K*M; 378 379 size_t max_mem = Process::max_used_memory() ; 380 size_t r = max_mem; 381 382 size_t mem_G = r / G; 383 r = r % G; 384 size_t mem_M = r / M; 385 r = r % M; 386 size_t mem_K = r / K; 387 r = r % K; 388 389 std::string s; 390 if(mem_G != 0) { 391 s += String::to_string(mem_G)+"G "; 392 } 393 if(mem_M != 0) { 394 s += String::to_string(mem_M)+"M "; 395 } 396 if(mem_K != 0) { 397 s += String::to_string(mem_K)+"K "; 398 } 399 if(r != 0) { 400 s += String::to_string(r); 401 } 402 403 Logger::out("Process") << "Maximum used memory: " 404 << max_mem << " (" << s << ")" 405 << std::endl; 406 } 407 terminate()408 void terminate() { 409 thread_manager_.reset(); 410 } 411 brute_force_kill()412 void brute_force_kill() { 413 os_brute_force_kill(); 414 } 415 number_of_cores()416 index_t number_of_cores() { 417 static index_t result = 0; 418 if(result == 0) { 419 #ifdef GEO_NO_THREAD_LOCAL 420 // Deactivate multithreading if thread_local is 421 // not supported (e.g. with old OS-X). 422 result = 1; 423 #else 424 result = os_number_of_cores(); 425 #endif 426 } 427 return result; 428 } 429 used_memory()430 size_t used_memory() { 431 return os_used_memory(); 432 } 433 max_used_memory()434 size_t max_used_memory() { 435 return os_max_used_memory(); 436 } 437 executable_filename()438 std::string executable_filename() { 439 return os_executable_filename(); 440 } 441 set_thread_manager(ThreadManager * thread_manager)442 void set_thread_manager(ThreadManager* thread_manager) { 443 thread_manager_ = thread_manager; 444 } 445 run_threads(ThreadGroup & threads)446 void run_threads(ThreadGroup& threads) { 447 running_threads_invocations_++; 448 thread_manager_->run_threads(threads); 449 running_threads_invocations_--; 450 } 451 enter_critical_section()452 void enter_critical_section() { 453 thread_manager_->enter_critical_section(); 454 } 455 leave_critical_section()456 void leave_critical_section() { 457 thread_manager_->leave_critical_section(); 458 } 459 is_running_threads()460 bool is_running_threads() { 461 #ifdef GEO_OPENMP 462 return ( 463 omp_in_parallel() || 464 (running_threads_invocations_ > 0) 465 ); 466 #else 467 return running_threads_invocations_ > 0; 468 #endif 469 } 470 multithreading_enabled()471 bool multithreading_enabled() { 472 return multithreading_enabled_; 473 } 474 enable_multithreading(bool flag)475 void enable_multithreading(bool flag) { 476 if( 477 multithreading_initialized_ && 478 multithreading_enabled_ == flag 479 ) { 480 return; 481 } 482 multithreading_initialized_ = true; 483 multithreading_enabled_ = flag; 484 if(multithreading_enabled_) { 485 Logger::out("Process") 486 << "Multithreading enabled" << std::endl 487 << "Available cores = " << number_of_cores() 488 << std::endl; 489 // Logger::out("Process") 490 // << "Max. concurrent threads = " 491 // << maximum_concurrent_threads() << std::endl ; 492 if(number_of_cores() == 1) { 493 Logger::warn("Process") 494 << "Processor is not a multicore" 495 << "(or multithread is not supported)" 496 << std::endl; 497 } 498 if(thread_manager_ == nullptr) { 499 Logger::warn("Process") 500 << "Missing multithreading manager" 501 << std::endl; 502 } 503 } else { 504 Logger::out("Process") 505 << "Multithreading disabled" << std::endl; 506 } 507 } 508 max_threads()509 index_t max_threads() { 510 return max_threads_initialized_ 511 ? max_threads_ 512 : number_of_cores(); 513 } 514 set_max_threads(index_t num_threads)515 void set_max_threads(index_t num_threads) { 516 if( 517 max_threads_initialized_ && 518 max_threads_ == num_threads 519 ) { 520 return; 521 } 522 max_threads_initialized_ = true; 523 if(num_threads == 0) { 524 num_threads = 1; 525 } else if(num_threads > number_of_cores()) { 526 Logger::warn("Process") 527 << "Cannot allocate " << num_threads 528 << " for multithreading" 529 << std::endl; 530 num_threads = number_of_cores(); 531 } 532 max_threads_ = num_threads; 533 Logger::out("Process") 534 << "Max used threads = " << max_threads_ 535 << std::endl; 536 } 537 maximum_concurrent_threads()538 index_t maximum_concurrent_threads() { 539 if(!multithreading_enabled_ || thread_manager_ == nullptr) { 540 return 1; 541 } 542 return max_threads_; 543 /* 544 // commented out for now, since under Windows, 545 // it seems that maximum_concurrent_threads() does not 546 // report the number of hyperthreaded cores. 547 return 548 geo_min( 549 thread_manager_->maximum_concurrent_threads(), 550 max_threads_ 551 ) ; 552 */ 553 } 554 FPE_enabled()555 bool FPE_enabled() { 556 return fpe_enabled_; 557 } 558 enable_FPE(bool flag)559 void enable_FPE(bool flag) { 560 if(fpe_initialized_ && fpe_enabled_ == flag) { 561 return; 562 } 563 fpe_initialized_ = true; 564 fpe_enabled_ = flag; 565 os_enable_FPE(flag); 566 } 567 cancel_enabled()568 bool cancel_enabled() { 569 return cancel_enabled_; 570 } 571 enable_cancel(bool flag)572 void enable_cancel(bool flag) { 573 if(cancel_initialized_ && cancel_enabled_ == flag) { 574 return; 575 } 576 cancel_initialized_ = true; 577 cancel_enabled_ = flag; 578 579 if(os_enable_cancel(flag)) { 580 Logger::out("Process") 581 << (flag ? "Cancel mode enabled" : "Cancel mode disabled") 582 << std::endl; 583 } else { 584 Logger::warn("Process") 585 << "Cancel mode not implemented" << std::endl; 586 } 587 } 588 } 589 } 590 591 592 namespace { 593 using namespace GEO; 594 595 /** 596 * \brief Used by the implementation of GEO::parallel() 597 * \see GEO::parallel() 598 */ 599 class ParallelThread : public Thread { 600 public: 601 /** 602 * \brief ParallelThread constructor. 603 * \param[in] func a void function with no parameter. 604 */ ParallelThread(std::function<void (void)> func)605 ParallelThread( 606 std::function<void(void)> func 607 ) : func_(func) { 608 } 609 610 /** 611 * \copydoc Thread::run() 612 */ run()613 void run() override { 614 func_(); 615 } 616 private: 617 std::function<void()> func_; 618 }; 619 620 621 /** 622 * \brief Used by the implementation of GEO::parallel_for() 623 * \see GEO::parallel_for() 624 */ 625 class ParallelForThread : public Thread { 626 public: 627 628 /** 629 * \param[in] func a void function that takes an index_t 630 * \param[in] from the first iteration index 631 * \param[in] to one position past the last interation index 632 * \param[in] step iteration step 633 */ ParallelForThread(std::function<void (index_t)> func,index_t from,index_t to,index_t step=1)634 ParallelForThread( 635 std::function<void(index_t)> func, 636 index_t from, index_t to, index_t step=1 637 ) : func_(func), from_(from), to_(to), step_(step) { 638 } 639 640 /** 641 * \copydoc Thread::run() 642 */ run()643 void run() override { 644 for(index_t i = from_; i < to_; i += step_) { 645 func_(i); 646 } 647 } 648 private: 649 std::function<void(index_t)> func_; 650 index_t from_; 651 index_t to_; 652 index_t step_; 653 }; 654 655 /** 656 * \brief Used by the implementation of GEO::parallel_for_slice() 657 * \see GEO::parallel_for_slice() 658 */ 659 class ParallelForSliceThread : public Thread { 660 public: 661 662 /** 663 * \param[in] func a void function that takes two index_t arguments 664 * \param[in] from the first iteration index 665 * \param[in] to one position past the last interation index 666 */ ParallelForSliceThread(std::function<void (index_t,index_t)> func,index_t from,index_t to)667 ParallelForSliceThread( 668 std::function<void(index_t,index_t)> func, 669 index_t from, index_t to 670 ) : func_(func), from_(from), to_(to) { 671 } 672 673 /** 674 * \copydoc Thread::run() 675 */ run()676 void run() override { 677 func_(from_, to_); 678 } 679 private: 680 std::function<void(index_t,index_t)> func_; 681 index_t from_; 682 index_t to_; 683 }; 684 685 } 686 687 namespace GEO { 688 parallel_for(index_t from,index_t to,std::function<void (index_t)> func,index_t threads_per_core,bool interleaved)689 void parallel_for( 690 index_t from, index_t to, std::function<void(index_t)> func, 691 index_t threads_per_core, bool interleaved 692 ) { 693 #ifdef GEO_OS_WINDOWS 694 // TODO: This is a limitation of WindowsThreadManager, to be fixed. 695 threads_per_core = 1; 696 #endif 697 698 index_t nb_threads = std::min( 699 to - from, 700 Process::maximum_concurrent_threads() * threads_per_core 701 ); 702 703 nb_threads = std::max(index_t(1), nb_threads); 704 705 index_t batch_size = (to - from) / nb_threads; 706 if(Process::is_running_threads() || nb_threads == 1) { 707 for(index_t i = from; i < to; i++) { 708 func(i); 709 } 710 } else { 711 ThreadGroup threads; 712 if(interleaved) { 713 for(index_t i = 0; i < nb_threads; i++) { 714 threads.push_back( 715 new ParallelForThread( 716 func, from + i, to, nb_threads 717 ) 718 ); 719 } 720 } else { 721 index_t cur = from; 722 for(index_t i = 0; i < nb_threads; i++) { 723 if(i == nb_threads - 1) { 724 threads.push_back( 725 new ParallelForThread( 726 func, cur, to 727 ) 728 ); 729 } else { 730 threads.push_back( 731 new ParallelForThread( 732 func, cur, cur + batch_size 733 ) 734 ); 735 } 736 cur += batch_size; 737 } 738 } 739 Process::run_threads(threads); 740 } 741 } 742 743 parallel_for_slice(index_t from,index_t to,std::function<void (index_t,index_t)> func,index_t threads_per_core)744 void parallel_for_slice( 745 index_t from, index_t to, std::function<void(index_t, index_t)> func, 746 index_t threads_per_core 747 ) { 748 #ifdef GEO_OS_WINDOWS 749 // TODO: This is a limitation of WindowsThreadManager, to be fixed. 750 threads_per_core = 1; 751 #endif 752 753 index_t nb_threads = std::min( 754 to - from, 755 Process::maximum_concurrent_threads() * threads_per_core 756 ); 757 758 nb_threads = std::max(index_t(1), nb_threads); 759 760 index_t batch_size = (to - from) / nb_threads; 761 if(Process::is_running_threads() || nb_threads == 1) { 762 func(from, to); 763 } else { 764 ThreadGroup threads; 765 index_t cur = from; 766 for(index_t i = 0; i < nb_threads; i++) { 767 if(i == nb_threads - 1) { 768 threads.push_back( 769 new ParallelForSliceThread( 770 func, cur, to 771 ) 772 ); 773 } else { 774 threads.push_back( 775 new ParallelForSliceThread( 776 func, cur, cur + batch_size 777 ) 778 ); 779 } 780 cur += batch_size; 781 } 782 Process::run_threads(threads); 783 } 784 } 785 parallel(std::function<void ()> f1,std::function<void ()> f2)786 void parallel( 787 std::function<void()> f1, 788 std::function<void()> f2 789 ) { 790 if(Process::is_running_threads()) { 791 f1(); 792 f2(); 793 } else { 794 ThreadGroup threads; 795 threads.push_back(new ParallelThread(f1)); 796 threads.push_back(new ParallelThread(f2)); 797 Process::run_threads(threads); 798 } 799 } 800 801 parallel(std::function<void ()> f1,std::function<void ()> f2,std::function<void ()> f3,std::function<void ()> f4)802 void parallel( 803 std::function<void()> f1, 804 std::function<void()> f2, 805 std::function<void()> f3, 806 std::function<void()> f4 807 ) { 808 if(Process::is_running_threads()) { 809 f1(); 810 f2(); 811 f3(); 812 f4(); 813 } else { 814 ThreadGroup threads; 815 threads.push_back(new ParallelThread(f1)); 816 threads.push_back(new ParallelThread(f2)); 817 threads.push_back(new ParallelThread(f3)); 818 threads.push_back(new ParallelThread(f4)); 819 Process::run_threads(threads); 820 } 821 } 822 823 parallel(std::function<void ()> f1,std::function<void ()> f2,std::function<void ()> f3,std::function<void ()> f4,std::function<void ()> f5,std::function<void ()> f6,std::function<void ()> f7,std::function<void ()> f8)824 void parallel( 825 std::function<void()> f1, 826 std::function<void()> f2, 827 std::function<void()> f3, 828 std::function<void()> f4, 829 std::function<void()> f5, 830 std::function<void()> f6, 831 std::function<void()> f7, 832 std::function<void()> f8 833 ) { 834 if(Process::is_running_threads()) { 835 f1(); 836 f2(); 837 f3(); 838 f4(); 839 f5(); 840 f6(); 841 f7(); 842 f8(); 843 } else { 844 ThreadGroup threads; 845 threads.push_back(new ParallelThread(f1)); 846 threads.push_back(new ParallelThread(f2)); 847 threads.push_back(new ParallelThread(f3)); 848 threads.push_back(new ParallelThread(f4)); 849 threads.push_back(new ParallelThread(f5)); 850 threads.push_back(new ParallelThread(f6)); 851 threads.push_back(new ParallelThread(f7)); 852 threads.push_back(new ParallelThread(f8)); 853 Process::run_threads(threads); 854 } 855 } 856 857 namespace Process { sleep(index_t microseconds)858 void sleep(index_t microseconds) { 859 std::this_thread::sleep_for(std::chrono::microseconds(microseconds)); 860 } 861 } 862 } 863 864