1 /*
2  *  Copyright (c) 2012-2014, Bruno Levy
3  *  All rights reserved.
4  *
5  *  Redistribution and use in source and binary forms, with or without
6  *  modification, are permitted provided that the following conditions are met:
7  *
8  *  * Redistributions of source code must retain the above copyright notice,
9  *  this list of conditions and the following disclaimer.
10  *  * Redistributions in binary form must reproduce the above copyright notice,
11  *  this list of conditions and the following disclaimer in the documentation
12  *  and/or other materials provided with the distribution.
13  *  * Neither the name of the ALICE Project-Team nor the names of its
14  *  contributors may be used to endorse or promote products derived from this
15  *  software without specific prior written permission.
16  *
17  *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18  *  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  *  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  *  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21  *  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22  *  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23  *  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24  *  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25  *  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26  *  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  *  POSSIBILITY OF SUCH DAMAGE.
28  *
29  *  If you modify this software, you should include a notice giving the
30  *  name of the person performing the modification, the date of modification,
31  *  and the reason for such modification.
32  *
33  *  Contact: Bruno Levy
34  *
35  *     Bruno.Levy@inria.fr
36  *     http://www.loria.fr/~levy
37  *
38  *     ALICE Project
39  *     LORIA, INRIA Lorraine,
40  *     Campus Scientifique, BP 239
41  *     54506 VANDOEUVRE LES NANCY CEDEX
42  *     FRANCE
43  *
44  */
45 
46 #include <geogram/basic/process.h>
47 #include <geogram/basic/logger.h>
48 #include <geogram/basic/environment.h>
49 #include <geogram/basic/string.h>
50 #include <geogram/basic/command_line.h>
51 #include <geogram/basic/stopwatch.h>
52 #include <thread>
53 #include <chrono>
54 
55 #ifdef GEO_OPENMP
56 #include <omp.h>
57 #endif
58 
59 namespace {
60     using namespace GEO;
61 
62     ThreadManager_var thread_manager_;
63     int running_threads_invocations_ = 0;
64 
65     bool multithreading_initialized_ = false;
66     bool multithreading_enabled_ = true;
67 
68     index_t max_threads_initialized_ = false;
69     index_t max_threads_ = 0;
70 
71     bool fpe_initialized_ = false;
72     bool fpe_enabled_ = false;
73 
74     bool cancel_initialized_ = false;
75     bool cancel_enabled_ = false;
76 
77     double start_time_ = 0.0;
78 
79     /************************************************************************/
80 
81     /**
82      * \brief Process Environment
83      * \details This environment exposes and controls the configuration of the
84      * Process module.
85      */
86     class ProcessEnvironment : public Environment {
87     protected:
88         /**
89          * \brief Gets a Process property
90          * \details Retrieves the value of the property \p name and stores it
91          * in \p value. The property must be a valid Process property (see
92          * sys:xxx properties in Vorpaline's help).
93          * \param[in] name name of the property
94          * \param[out] value receives the value of the property
95          * \retval true if the property is a valid Process property
96          * \retval false otherwise
97          * \see Environment::get_value()
98          */
get_local_value(const std::string & name,std::string & value) const99         virtual bool get_local_value(
100             const std::string& name, std::string& value
101         ) const {
102             if(name == "sys:nb_cores") {
103                 value = String::to_string(Process::number_of_cores());
104                 return true;
105             }
106             if(name == "sys:multithread") {
107                 value = String::to_string(multithreading_enabled_);
108                 return true;
109             }
110             if(name == "sys:max_threads") {
111                 value = String::to_string(
112                     Process::maximum_concurrent_threads()
113                 );
114                 return true;
115             }
116             if(name == "sys:FPE") {
117                 value = String::to_string(fpe_enabled_);
118                 return true;
119             }
120             if(name == "sys:cancel") {
121                 value = String::to_string(cancel_enabled_);
122                 return true;
123             }
124             if(name == "sys:assert") {
125                 value = assert_mode() == ASSERT_THROW ? "throw" : "abort";
126                 return true;
127             }
128             return false;
129         }
130 
131         /**
132          * \brief Sets a Process property
133          * \details Sets the property \p name with value \p value in the
134          * Process. The property must be a valid Process property (see sys:xxx
135          * properties in Vorpaline's help) and \p value must be a legal value
136          * for the property.
137          * \param[in] name name of the property
138          * \param[in] value value of the property
139          * \retval true if the property was successfully set
140          * \retval false otherwise
141          * \see Environment::set_value()
142          */
set_local_value(const std::string & name,const std::string & value)143         virtual bool set_local_value(
144             const std::string& name, const std::string& value
145         ) {
146             if(name == "sys:multithread") {
147                 Process::enable_multithreading(String::to_bool(value));
148                 return true;
149             }
150             if(name == "sys:max_threads") {
151                 Process::set_max_threads(String::to_uint(value));
152                 return true;
153             }
154             if(name == "sys:FPE") {
155                 Process::enable_FPE(String::to_bool(value));
156                 return true;
157             }
158             if(name == "sys:cancel") {
159                 Process::enable_cancel(String::to_bool(value));
160                 return true;
161             }
162             if(name == "sys:assert") {
163                 if(value == "throw") {
164                     set_assert_mode(ASSERT_THROW);
165                     return true;
166                 }
167 		if(value == "abort") {
168                     set_assert_mode(ASSERT_ABORT);
169                     return true;
170                 }
171 		if(value == "breakpoint") {
172                     set_assert_mode(ASSERT_BREAKPOINT);
173                     return true;
174 		}
175                 Logger::err("Process")
176                     << "Invalid value for property sys:abort: "
177                     << value
178                     << std::endl;
179                 return false;
180             }
181             return false;
182         }
183 
184         /** ProcessEnvironment destructor */
~ProcessEnvironment()185         virtual ~ProcessEnvironment() {
186         }
187     };
188 
189     /************************************************************************/
190 
191 #ifdef GEO_OPENMP
192 
193     /**
194      * \brief OpenMP Thread Manager
195      * \details
196      * OMPThreadManager is an implementation of ThreadManager that uses OpenMP
197      * for running concurrent threads and control critical sections.
198      */
199     class GEOGRAM_API OMPThreadManager : public ThreadManager {
200     public:
201         /**
202          * \brief Creates and initializes the OpenMP ThreadManager
203          */
OMPThreadManager()204         OMPThreadManager() {
205             omp_init_lock(&lock_);
206         }
207 
208         /** \copydoc GEO::ThreadManager::maximum_concurrent_threads() */
maximum_concurrent_threads()209         virtual index_t maximum_concurrent_threads() {
210             return Process::number_of_cores();
211         }
212 
213         /** \copydoc GEO::ThreadManager::enter_critical_section() */
enter_critical_section()214         virtual void enter_critical_section() {
215             omp_set_lock(&lock_);
216         }
217 
218         /** \copydoc GEO::ThreadManager::leave_critical_section() */
leave_critical_section()219         virtual void leave_critical_section() {
220             omp_unset_lock(&lock_);
221         }
222 
223     protected:
224         /** \brief OMPThreadManager destructor */
~OMPThreadManager()225         virtual ~OMPThreadManager() {
226             omp_destroy_lock(&lock_);
227         }
228 
229         /** \copydoc GEO::ThreadManager::run_concurrent_threads() */
run_concurrent_threads(ThreadGroup & threads,index_t max_threads)230         virtual void run_concurrent_threads(
231             ThreadGroup& threads, index_t max_threads
232         ) {
233             // TODO: take max_threads_ into account
234             geo_argused(max_threads);
235 
236 #pragma omp parallel for schedule(dynamic)
237             for(int i = 0; i < int(threads.size()); i++) {
238 	        index_t ii = index_t(i);
239                 set_thread_id(threads[ii],ii);
240                 set_current_thread(threads[ii]);
241                 threads[ii]->run();
242             }
243         }
244 
245     private:
246         omp_lock_t lock_;
247     };
248 
249 #endif
250 }
251 
252 
253 namespace {
254     /**
255      * \brief The (thread-local) variable that stores a
256      *  pointer to the current thread.
257      * \details It cannot be a static member of class
258      *  Thread, because Visual C++ does not accept
259      *  to export thread local storage variables in
260      *  DLLs.
261      */
262     thread_local Thread* geo_current_thread_ = nullptr;
263 }
264 
265 namespace GEO {
266 
set_current(Thread * thread)267     void Thread::set_current(Thread* thread) {
268         geo_current_thread_ = thread;
269     }
270 
current()271     Thread* Thread::current() {
272         return geo_current_thread_;
273     }
274 
~Thread()275     Thread::~Thread() {
276     }
277 
278     /************************************************************************/
279 
~ThreadManager()280     ThreadManager::~ThreadManager() {
281     }
282 
run_threads(ThreadGroup & threads)283     void ThreadManager::run_threads(ThreadGroup& threads) {
284         index_t max_threads = maximum_concurrent_threads();
285         if(Process::multithreading_enabled() && max_threads > 1) {
286             run_concurrent_threads(threads, max_threads);
287         } else {
288             for(index_t i = 0; i < threads.size(); i++) {
289                 threads[i]->run();
290             }
291         }
292     }
293 
294     /************************************************************************/
295 
~MonoThreadingThreadManager()296     MonoThreadingThreadManager::~MonoThreadingThreadManager() {
297     }
298 
run_concurrent_threads(ThreadGroup & threads,index_t max_threads)299     void MonoThreadingThreadManager::run_concurrent_threads(
300         ThreadGroup& threads, index_t max_threads
301     ) {
302         geo_argused(threads);
303         geo_argused(max_threads);
304         geo_assert_not_reached;
305     }
306 
maximum_concurrent_threads()307     index_t MonoThreadingThreadManager::maximum_concurrent_threads() {
308         return 1;
309     }
310 
enter_critical_section()311     void MonoThreadingThreadManager::enter_critical_section() {
312     }
313 
leave_critical_section()314     void MonoThreadingThreadManager::leave_critical_section() {
315     }
316 
317     /************************************************************************/
318 
319     namespace Process {
320 
321         // OS dependent functions implemented in process_unix.cpp and
322         // process_win.cpp
323 
324         bool os_init_threads();
325         void os_brute_force_kill();
326         bool os_enable_FPE(bool flag);
327         bool os_enable_cancel(bool flag);
328         void os_install_signal_handlers();
329         index_t os_number_of_cores();
330         size_t os_used_memory();
331         size_t os_max_used_memory();
332         std::string os_executable_filename();
333 
initialize(int flags)334         void initialize(int flags) {
335 
336             Environment* env = Environment::instance();
337             env->add_environment(new ProcessEnvironment);
338 
339             if(!os_init_threads()) {
340 #ifdef GEO_OPENMP
341                 Logger::out("Process")
342                     << "Using OpenMP threads"
343                     << std::endl;
344                 set_thread_manager(new OMPThreadManager);
345 #else
346                 Logger::out("Process")
347                     << "Multithreading not supported, going monothread"
348                     << std::endl;
349                 set_thread_manager(new MonoThreadingThreadManager);
350 #endif
351             }
352 
353 	    if(
354 		(::getenv("GEO_NO_SIGNAL_HANDLER") == nullptr) &&
355 		(flags & GEOGRAM_INSTALL_HANDLERS) != 0
356 	    ) {
357 		os_install_signal_handlers();
358 	    }
359 
360             // Initialize Process default values
361             enable_multithreading(multithreading_enabled_);
362             set_max_threads(number_of_cores());
363             enable_FPE(fpe_enabled_);
364             enable_cancel(cancel_enabled_);
365 
366             start_time_ = SystemStopwatch::now();
367         }
368 
show_stats()369         void show_stats() {
370 
371             Logger::out("Process") << "Total elapsed time: "
372                                    << SystemStopwatch::now() - start_time_
373                                    << "s" << std::endl;
374 
375             const size_t K=size_t(1024);
376             const size_t M=K*K;
377             const size_t G=K*M;
378 
379             size_t max_mem = Process::max_used_memory() ;
380             size_t r = max_mem;
381 
382             size_t mem_G = r / G;
383             r = r % G;
384             size_t mem_M = r / M;
385             r = r % M;
386             size_t mem_K = r / K;
387             r = r % K;
388 
389             std::string s;
390             if(mem_G != 0) {
391                 s += String::to_string(mem_G)+"G ";
392             }
393             if(mem_M != 0) {
394                 s += String::to_string(mem_M)+"M ";
395             }
396             if(mem_K != 0) {
397                 s += String::to_string(mem_K)+"K ";
398             }
399             if(r != 0) {
400                 s += String::to_string(r);
401             }
402 
403             Logger::out("Process") << "Maximum used memory: "
404                                    << max_mem << " (" << s << ")"
405                                    << std::endl;
406         }
407 
terminate()408         void terminate() {
409             thread_manager_.reset();
410         }
411 
brute_force_kill()412         void brute_force_kill() {
413             os_brute_force_kill();
414         }
415 
number_of_cores()416         index_t number_of_cores() {
417             static index_t result = 0;
418             if(result == 0) {
419 #ifdef GEO_NO_THREAD_LOCAL
420 		// Deactivate multithreading if thread_local is
421 		// not supported (e.g. with old OS-X).
422 		result = 1;
423 #else
424                 result = os_number_of_cores();
425 #endif
426             }
427             return result;
428         }
429 
used_memory()430         size_t used_memory() {
431             return os_used_memory();
432         }
433 
max_used_memory()434         size_t max_used_memory() {
435             return os_max_used_memory();
436         }
437 
executable_filename()438         std::string executable_filename() {
439             return os_executable_filename();
440         }
441 
set_thread_manager(ThreadManager * thread_manager)442         void set_thread_manager(ThreadManager* thread_manager) {
443             thread_manager_ = thread_manager;
444         }
445 
run_threads(ThreadGroup & threads)446         void run_threads(ThreadGroup& threads) {
447             running_threads_invocations_++;
448             thread_manager_->run_threads(threads);
449             running_threads_invocations_--;
450         }
451 
enter_critical_section()452         void enter_critical_section() {
453             thread_manager_->enter_critical_section();
454         }
455 
leave_critical_section()456         void leave_critical_section() {
457             thread_manager_->leave_critical_section();
458         }
459 
is_running_threads()460         bool is_running_threads() {
461 #ifdef GEO_OPENMP
462             return (
463 		omp_in_parallel() ||
464 		(running_threads_invocations_ > 0)
465 	    );
466 #else
467             return running_threads_invocations_ > 0;
468 #endif
469         }
470 
multithreading_enabled()471         bool multithreading_enabled() {
472             return multithreading_enabled_;
473         }
474 
enable_multithreading(bool flag)475         void enable_multithreading(bool flag) {
476             if(
477                 multithreading_initialized_ &&
478                 multithreading_enabled_ == flag
479             ) {
480                 return;
481             }
482             multithreading_initialized_ = true;
483             multithreading_enabled_ = flag;
484             if(multithreading_enabled_) {
485                 Logger::out("Process")
486                     << "Multithreading enabled" << std::endl
487                     << "Available cores = " << number_of_cores()
488                     << std::endl;
489                 // Logger::out("Process")
490                 //    << "Max. concurrent threads = "
491                 //    << maximum_concurrent_threads() << std::endl ;
492                 if(number_of_cores() == 1) {
493                     Logger::warn("Process")
494                         << "Processor is not a multicore"
495 			<< "(or multithread is not supported)"
496                         << std::endl;
497                 }
498                 if(thread_manager_ == nullptr) {
499                     Logger::warn("Process")
500                         << "Missing multithreading manager"
501                         << std::endl;
502                 }
503             } else {
504                 Logger::out("Process")
505                     << "Multithreading disabled" << std::endl;
506             }
507         }
508 
max_threads()509         index_t max_threads() {
510             return max_threads_initialized_
511                    ? max_threads_
512                    : number_of_cores();
513         }
514 
set_max_threads(index_t num_threads)515         void set_max_threads(index_t num_threads) {
516             if(
517                 max_threads_initialized_ &&
518                 max_threads_ == num_threads
519             ) {
520                 return;
521             }
522             max_threads_initialized_ = true;
523             if(num_threads == 0) {
524                 num_threads = 1;
525             } else if(num_threads > number_of_cores()) {
526                 Logger::warn("Process")
527                     << "Cannot allocate " << num_threads
528                     << " for multithreading"
529                     << std::endl;
530                 num_threads = number_of_cores();
531             }
532             max_threads_ = num_threads;
533             Logger::out("Process")
534                 << "Max used threads = " << max_threads_
535                 << std::endl;
536         }
537 
maximum_concurrent_threads()538         index_t maximum_concurrent_threads() {
539             if(!multithreading_enabled_ || thread_manager_ == nullptr) {
540                 return 1;
541             }
542             return max_threads_;
543             /*
544                // commented out for now, since under Windows,
545                // it seems that maximum_concurrent_threads() does not
546                // report the number of hyperthreaded cores.
547                         return
548                             geo_min(
549                                 thread_manager_->maximum_concurrent_threads(),
550                                 max_threads_
551                             ) ;
552              */
553         }
554 
FPE_enabled()555         bool FPE_enabled() {
556             return fpe_enabled_;
557         }
558 
enable_FPE(bool flag)559         void enable_FPE(bool flag) {
560             if(fpe_initialized_ && fpe_enabled_ == flag) {
561                 return;
562             }
563             fpe_initialized_ = true;
564             fpe_enabled_ = flag;
565 	    os_enable_FPE(flag);
566         }
567 
cancel_enabled()568         bool cancel_enabled() {
569             return cancel_enabled_;
570         }
571 
enable_cancel(bool flag)572         void enable_cancel(bool flag) {
573             if(cancel_initialized_ && cancel_enabled_ == flag) {
574                 return;
575             }
576             cancel_initialized_ = true;
577             cancel_enabled_ = flag;
578 
579             if(os_enable_cancel(flag)) {
580                 Logger::out("Process")
581                     << (flag ? "Cancel mode enabled" : "Cancel mode disabled")
582                     << std::endl;
583             } else {
584                 Logger::warn("Process")
585                     << "Cancel mode not implemented" << std::endl;
586             }
587         }
588     }
589 }
590 
591 
592 namespace {
593     using namespace GEO;
594 
595     /**
596      * \brief Used by the implementation of GEO::parallel()
597      * \see GEO::parallel()
598      */
599     class ParallelThread : public Thread {
600     public:
601 	/**
602 	 * \brief ParallelThread constructor.
603 	 * \param[in] func a void function with no parameter.
604 	 */
ParallelThread(std::function<void (void)> func)605 	ParallelThread(
606 	    std::function<void(void)> func
607 	) : func_(func) {
608 	}
609 
610 	/**
611 	 * \copydoc Thread::run()
612 	 */
run()613         void run() override {
614 	    func_();
615         }
616     private:
617 	std::function<void()> func_;
618     };
619 
620 
621     /**
622      * \brief Used by the implementation of GEO::parallel_for()
623      * \see GEO::parallel_for()
624      */
625     class ParallelForThread : public Thread {
626     public:
627 
628 	/**
629 	 * \param[in] func a void function that takes an index_t
630 	 * \param[in] from the first iteration index
631 	 * \param[in] to one position past the last interation index
632 	 * \param[in] step iteration step
633 	 */
ParallelForThread(std::function<void (index_t)> func,index_t from,index_t to,index_t step=1)634 	ParallelForThread(
635 	    std::function<void(index_t)> func,
636 	    index_t from, index_t to, index_t step=1
637 	) : func_(func), from_(from), to_(to), step_(step) {
638 	}
639 
640 	/**
641 	 * \copydoc Thread::run()
642 	 */
run()643         void run() override {
644             for(index_t i = from_; i < to_; i += step_) {
645                 func_(i);
646             }
647         }
648     private:
649 	std::function<void(index_t)> func_;
650 	index_t from_;
651 	index_t to_;
652 	index_t step_;
653     };
654 
655     /**
656      * \brief Used by the implementation of GEO::parallel_for_slice()
657      * \see GEO::parallel_for_slice()
658      */
659     class ParallelForSliceThread : public Thread {
660     public:
661 
662 	/**
663 	 * \param[in] func a void function that takes two index_t arguments
664 	 * \param[in] from the first iteration index
665 	 * \param[in] to one position past the last interation index
666 	 */
ParallelForSliceThread(std::function<void (index_t,index_t)> func,index_t from,index_t to)667 	ParallelForSliceThread(
668 	    std::function<void(index_t,index_t)> func,
669 	    index_t from, index_t to
670 	) : func_(func), from_(from), to_(to) {
671 	}
672 
673 	/**
674 	 * \copydoc Thread::run()
675 	 */
run()676         void run() override {
677 	    func_(from_, to_);
678         }
679     private:
680 	std::function<void(index_t,index_t)> func_;
681 	index_t from_;
682 	index_t to_;
683     };
684 
685 }
686 
687 namespace GEO {
688 
parallel_for(index_t from,index_t to,std::function<void (index_t)> func,index_t threads_per_core,bool interleaved)689     void parallel_for(
690         index_t from, index_t to, std::function<void(index_t)> func,
691         index_t threads_per_core, bool interleaved
692     ) {
693 #ifdef GEO_OS_WINDOWS
694         // TODO: This is a limitation of WindowsThreadManager, to be fixed.
695         threads_per_core = 1;
696 #endif
697 
698         index_t nb_threads = std::min(
699             to - from,
700             Process::maximum_concurrent_threads() * threads_per_core
701         );
702 
703 	nb_threads = std::max(index_t(1), nb_threads);
704 
705         index_t batch_size = (to - from) / nb_threads;
706         if(Process::is_running_threads() || nb_threads == 1) {
707             for(index_t i = from; i < to; i++) {
708                 func(i);
709             }
710         } else {
711             ThreadGroup threads;
712             if(interleaved) {
713                 for(index_t i = 0; i < nb_threads; i++) {
714                     threads.push_back(
715                         new ParallelForThread(
716                             func, from + i, to, nb_threads
717                         )
718                     );
719                 }
720             } else {
721                 index_t cur = from;
722                 for(index_t i = 0; i < nb_threads; i++) {
723                     if(i == nb_threads - 1) {
724                         threads.push_back(
725                             new ParallelForThread(
726                                 func, cur, to
727                             )
728                         );
729                     } else {
730                         threads.push_back(
731                             new ParallelForThread(
732                                 func, cur, cur + batch_size
733                             )
734                         );
735                     }
736                     cur += batch_size;
737                 }
738             }
739             Process::run_threads(threads);
740         }
741     }
742 
743 
parallel_for_slice(index_t from,index_t to,std::function<void (index_t,index_t)> func,index_t threads_per_core)744     void parallel_for_slice(
745 	index_t from, index_t to, std::function<void(index_t, index_t)> func,
746         index_t threads_per_core
747     ) {
748 #ifdef GEO_OS_WINDOWS
749         // TODO: This is a limitation of WindowsThreadManager, to be fixed.
750         threads_per_core = 1;
751 #endif
752 
753         index_t nb_threads = std::min(
754             to - from,
755             Process::maximum_concurrent_threads() * threads_per_core
756         );
757 
758 	nb_threads = std::max(index_t(1), nb_threads);
759 
760         index_t batch_size = (to - from) / nb_threads;
761         if(Process::is_running_threads() || nb_threads == 1) {
762 	    func(from, to);
763         } else {
764             ThreadGroup threads;
765 	    index_t cur = from;
766 	    for(index_t i = 0; i < nb_threads; i++) {
767 		if(i == nb_threads - 1) {
768 		    threads.push_back(
769 			new ParallelForSliceThread(
770 			    func, cur, to
771 			  )
772 			);
773 		} else {
774 		    threads.push_back(
775 			new ParallelForSliceThread(
776 			    func, cur, cur + batch_size
777                            )
778                         );
779 		}
780 		cur += batch_size;
781 	    }
782             Process::run_threads(threads);
783         }
784     }
785 
parallel(std::function<void ()> f1,std::function<void ()> f2)786     void parallel(
787 	std::function<void()> f1,
788 	std::function<void()> f2
789     ) {
790         if(Process::is_running_threads()) {
791 	    f1();
792 	    f2();
793         } else {
794             ThreadGroup threads;
795 	    threads.push_back(new ParallelThread(f1));
796 	    threads.push_back(new ParallelThread(f2));
797             Process::run_threads(threads);
798         }
799     }
800 
801 
parallel(std::function<void ()> f1,std::function<void ()> f2,std::function<void ()> f3,std::function<void ()> f4)802     void parallel(
803 	std::function<void()> f1,
804 	std::function<void()> f2,
805 	std::function<void()> f3,
806 	std::function<void()> f4
807     ) {
808         if(Process::is_running_threads()) {
809 	    f1();
810 	    f2();
811 	    f3();
812 	    f4();
813         } else {
814             ThreadGroup threads;
815 	    threads.push_back(new ParallelThread(f1));
816 	    threads.push_back(new ParallelThread(f2));
817 	    threads.push_back(new ParallelThread(f3));
818 	    threads.push_back(new ParallelThread(f4));
819             Process::run_threads(threads);
820         }
821     }
822 
823 
parallel(std::function<void ()> f1,std::function<void ()> f2,std::function<void ()> f3,std::function<void ()> f4,std::function<void ()> f5,std::function<void ()> f6,std::function<void ()> f7,std::function<void ()> f8)824     void parallel(
825 	std::function<void()> f1,
826 	std::function<void()> f2,
827 	std::function<void()> f3,
828 	std::function<void()> f4,
829 	std::function<void()> f5,
830 	std::function<void()> f6,
831 	std::function<void()> f7,
832 	std::function<void()> f8
833     ) {
834         if(Process::is_running_threads()) {
835 	    f1();
836 	    f2();
837 	    f3();
838 	    f4();
839 	    f5();
840 	    f6();
841 	    f7();
842 	    f8();
843         } else {
844             ThreadGroup threads;
845 	    threads.push_back(new ParallelThread(f1));
846 	    threads.push_back(new ParallelThread(f2));
847 	    threads.push_back(new ParallelThread(f3));
848 	    threads.push_back(new ParallelThread(f4));
849 	    threads.push_back(new ParallelThread(f5));
850 	    threads.push_back(new ParallelThread(f6));
851 	    threads.push_back(new ParallelThread(f7));
852 	    threads.push_back(new ParallelThread(f8));
853             Process::run_threads(threads);
854         }
855     }
856 
857     namespace Process {
sleep(index_t microseconds)858 	void sleep(index_t microseconds) {
859 	    std::this_thread::sleep_for(std::chrono::microseconds(microseconds));
860 	}
861     }
862 }
863 
864