1 #pragma once
2 
3 /// \file Scheduler.h
4 /// \brief Interface for executing tasks (potentially) asynchronously.
5 /// \author Pavel Sevecek (sevecek at sirrah.troja.mff.cuni.cz)
6 /// \date 2016-2021
7 
8 #include "common/ForwardDecl.h"
9 #include "objects/utility/IteratorAdapters.h"
10 #include "objects/wrappers/Function.h"
11 
12 NAMESPACE_SPH_BEGIN
13 
14 /// \brief Handle used to control tasks submitted into the scheduler.
15 class ITask : public Polymorphic {
16 public:
17     /// \brief Waits till the task and all the child tasks are completed.
18     virtual void wait() = 0;
19 
20     /// \brief Checks if the task already finished.
21     virtual bool completed() const = 0;
22 };
23 
24 /// \brief Interface that allows unified implementation of sequential and parallelized versions of algorithms.
25 ///
26 /// Currently suitable only for task-based schedulers, cannot be used for OpenMP, MPI, etc.
27 class IScheduler : public Polymorphic {
28 public:
29     /// \brief Submits a task to be potentially executed asynchronously.
30     ///
31     /// \return Handle to the task created from the functor.
32     virtual SharedPtr<ITask> submit(const Function<void()>& task) = 0;
33 
34     /// \brief Returns the index of the calling thread.
35     ///
36     /// If this thread was not invoked by the scheduler, returns NOTHING. The returned index is interval
37     /// [0, getThreadCnt()-1].
38     virtual Optional<Size> getThreadIdx() const = 0;
39 
40     /// \brief Returns the number of threads used by this scheduler.
41     ///
42     /// Note that this number is constant during the lifetime of the scheduler.
43     virtual Size getThreadCnt() const = 0;
44 
45     /// \brief Returns a value of granularity that is expected to perform well with the current thread count.
46     virtual Size getRecommendedGranularity() const = 0;
47 
48     using Functor = Function<void()>;
49     using RangeFunctor = Function<void(Size n1, Size n2)>;
50 
51     /// \brief Processes the given range concurrently.
52     ///
53     /// \param from First index of the processed range.
54     /// \param to One-past-last index of the processed range.
55     /// \param granularity Recommended size of the chunks passed to the functor.
56     /// \param functor Functor executed concurrently by the worker threads. Takes the first and the
57     ///                one-past-last index of the chunk to process sequentially within the thread.
58     virtual void parallelFor(const Size from,
59         const Size to,
60         const Size granularity,
61         const RangeFunctor& functor) = 0;
62 
63     /// \brief Executes two functors concurrently.
64     virtual void parallelInvoke(const Functor& task1, const Functor& task2) = 0;
65 };
66 
67 /// \brief Dummy scheduler that simply executes the submitted tasks sequentially on calling thread.
68 ///
69 /// Useful to run an algorithm with no parallelization, mainly for testing/debugging purposes.
70 class SequentialScheduler : public IScheduler {
71 public:
72     virtual SharedPtr<ITask> submit(const Function<void()>& task) override;
73 
74     virtual Optional<Size> getThreadIdx() const override;
75 
76     virtual Size getThreadCnt() const override;
77 
78     virtual Size getRecommendedGranularity() const override;
79 
80     virtual void parallelFor(const Size from,
81         const Size to,
82         const Size granularity,
83         const RangeFunctor& functor) override;
84 
85     virtual void parallelInvoke(const Functor& func1, const Functor& func2) override;
86 
87     static SharedPtr<SequentialScheduler> getGlobalInstance();
88 };
89 
90 /// \brief Global instance of the sequential scheduler.
91 ///
92 /// It can be used to specify sequential execution policy for parallel algorithms, without creating
93 /// unnecessary local instances of \ref SequentialScheduler.
94 extern SequentialScheduler SEQUENTIAL;
95 
96 
97 /// \brief Executes a functor concurrently from all available threads.
98 ///
99 /// Syntax mimics typical usage of for loop; functor is executed with index as parameter, starting at 'from'
100 /// and ending one before 'to', so that total number of executions is (to-from). The function blocks until
101 /// parallel for is completed.
102 /// \param scheduler Scheduler used for parallelization. If SequentialScheduler, this is essentailly ordinary
103 ///                  for loop (with bigger overhead).
104 /// \param from First processed index.
105 /// \param to One-past-last processed index.
106 /// \param functor Functor executed (to-from) times in different threads; takes an index as an argument.
107 template <typename TFunctor>
parallelFor(IScheduler & scheduler,const Size from,const Size to,TFunctor && functor)108 INLINE void parallelFor(IScheduler& scheduler, const Size from, const Size to, TFunctor&& functor) {
109     const Size granularity = scheduler.getRecommendedGranularity();
110     parallelFor(scheduler, from, to, granularity, std::forward<TFunctor>(functor));
111 }
112 
113 /// \brief Executes a functor concurrently with given granularity.
114 ///
115 /// Overload allowing to specify the granularity.
116 /// \param scheduler Scheduler used for parallelization.
117 /// \param from First processed index.
118 /// \param to One-past-last processed index.
119 /// \param granularity Number of indices processed by the functor at once. It shall be a positive number less
120 ///                    than or equal to (to-from).
121 /// \param functor Functor executed concurrently, takes an index as an argument.
122 template <typename TFunctor>
parallelFor(IScheduler & scheduler,const Size from,const Size to,const Size granularity,TFunctor && functor)123 INLINE void parallelFor(IScheduler& scheduler,
124     const Size from,
125     const Size to,
126     const Size granularity,
127     TFunctor&& functor) {
128     scheduler.parallelFor(from, to, granularity, [&functor](Size n1, Size n2) {
129         SPH_ASSERT(n1 <= n2);
130         for (Size i = n1; i < n2; ++i) {
131             functor(i);
132         }
133     });
134 }
135 
136 /// \brief Executes a functor concurrently from all available threads.
137 ///
138 /// Overload using an index sequence.
139 template <typename TFunctor>
parallelFor(IScheduler & scheduler,const IndexSequence & sequence,TFunctor && functor)140 INLINE void parallelFor(IScheduler& scheduler, const IndexSequence& sequence, TFunctor&& functor) {
141     parallelFor(scheduler, *sequence.begin(), *sequence.end(), std::forward<TFunctor>(functor));
142 }
143 
144 /// \brief Syntactic sugar, calls \ref parallelInvoke in given scheduler.
145 template <typename TFunctor1, typename TFunctor2>
parallelInvoke(IScheduler & scheduler,TFunctor1 && func1,TFunctor2 && func2)146 void parallelInvoke(IScheduler& scheduler, TFunctor1&& func1, TFunctor2&& func2) {
147     scheduler.parallelInvoke(std::forward<TFunctor1>(func1), std::forward<TFunctor2>(func2));
148 }
149 
150 NAMESPACE_SPH_END
151