1 /*
2 //@HEADER
3 // ************************************************************************
4 //
5 //                        Kokkos v. 3.0
6 //       Copyright (2020) National Technology & Engineering
7 //               Solutions of Sandia, LLC (NTESS).
8 //
9 // Under the terms of Contract DE-NA0003525 with NTESS,
10 // the U.S. Government retains certain rights in this software.
11 //
12 // Redistribution and use in source and binary forms, with or without
13 // modification, are permitted provided that the following conditions are
14 // met:
15 //
16 // 1. Redistributions of source code must retain the above copyright
17 // notice, this list of conditions and the following disclaimer.
18 //
19 // 2. Redistributions in binary form must reproduce the above copyright
20 // notice, this list of conditions and the following disclaimer in the
21 // documentation and/or other materials provided with the distribution.
22 //
23 // 3. Neither the name of the Corporation nor the names of the
24 // contributors may be used to endorse or promote products derived from
25 // this software without specific prior written permission.
26 //
27 // THIS SOFTWARE IS PROVIDED BY NTESS "AS IS" AND ANY
28 // EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
29 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
30 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL NTESS OR THE
31 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
32 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
33 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
34 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
35 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
36 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
37 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38 //
39 // Questions? Contact Christian R. Trott (crtrott@sandia.gov)
40 //
41 // ************************************************************************
42 //@HEADER
43 */
44 
45 #ifndef KOKKOS_WORKGRAPHPOLICY_HPP
46 #define KOKKOS_WORKGRAPHPOLICY_HPP
47 
48 #include <impl/Kokkos_AnalyzePolicy.hpp>
49 #include <Kokkos_Crs.hpp>
50 
51 namespace Kokkos {
52 namespace Impl {
53 
54 template <class functor_type, class execution_space, class... policy_args>
55 class WorkGraphExec;
56 
57 }
58 }  // namespace Kokkos
59 
60 namespace Kokkos {
61 
62 template <class... Properties>
63 class WorkGraphPolicy : public Kokkos::Impl::PolicyTraits<Properties...> {
64  public:
65   using execution_policy = WorkGraphPolicy<Properties...>;
66   using self_type        = WorkGraphPolicy<Properties...>;
67   using traits           = Kokkos::Impl::PolicyTraits<Properties...>;
68   using index_type       = typename traits::index_type;
69   using member_type      = index_type;
70   using execution_space  = typename traits::execution_space;
71   using memory_space     = typename execution_space::memory_space;
72   using graph_type = Kokkos::Crs<index_type, execution_space, void, index_type>;
73 
74   enum : std::int32_t {
75     END_TOKEN       = -1,
76     BEGIN_TOKEN     = -2,
77     COMPLETED_TOKEN = -3
78   };
79 
80  private:
81   using ints_type = Kokkos::View<std::int32_t*, memory_space>;
82 
83   // Let N = m_graph.numRows(), the total work
84   // m_queue[  0 ..   N-1] = the ready queue
85   // m_queue[  N .. 2*N-1] = the waiting queue counts
86   // m_queue[2*N .. 2*N+2] = the ready queue hints
87 
88   graph_type const m_graph;
89   ints_type m_queue;
90 
91   KOKKOS_INLINE_FUNCTION
push_work(const std::int32_t w) const92   void push_work(const std::int32_t w) const noexcept {
93     const std::int32_t N = m_graph.numRows();
94 
95     std::int32_t volatile* const ready_queue = &m_queue[0];
96     std::int32_t volatile* const end_hint    = &m_queue[2 * N + 1];
97 
98     // Push work to end of queue
99     const std::int32_t j = atomic_fetch_add(end_hint, 1);
100 
101     if ((N <= j) || (END_TOKEN != atomic_exchange(ready_queue + j, w))) {
102       // ERROR: past the end of queue or did not replace END_TOKEN
103       Kokkos::abort("WorkGraphPolicy push_work error");
104     }
105 
106     memory_fence();
107   }
108 
109  public:
110   /**\brief  Attempt to pop the work item at the head of the queue.
111    *
112    *  Find entry 'i' such that
113    *    ( m_queue[i] != BEGIN_TOKEN ) AND
114    *    ( i == 0 OR m_queue[i-1] == BEGIN_TOKEN )
115    *  if found then
116    *    increment begin hint
117    *    return atomic_exchange( m_queue[i] , BEGIN_TOKEN )
118    *  else if i < total work
119    *    return END_TOKEN
120    *  else
121    *    return COMPLETED_TOKEN
122    *
123    */
124   KOKKOS_INLINE_FUNCTION
pop_work() const125   std::int32_t pop_work() const noexcept {
126     const std::int32_t N = m_graph.numRows();
127 
128     std::int32_t volatile* const ready_queue = &m_queue[0];
129     std::int32_t volatile* const begin_hint  = &m_queue[2 * N];
130 
131     // begin hint is guaranteed to be less than or equal to
132     // actual begin location in the queue.
133 
134     for (std::int32_t i = *begin_hint; i < N; ++i) {
135       const std::int32_t w = ready_queue[i];
136 
137       if (w == END_TOKEN) {
138         return END_TOKEN;
139       }
140 
141       if ((w != BEGIN_TOKEN) &&
142           (w == atomic_compare_exchange(ready_queue + i, w,
143                                         (std::int32_t)BEGIN_TOKEN))) {
144         // Attempt to claim ready work index succeeded,
145         // update the hint and return work index
146         atomic_increment(begin_hint);
147         return w;
148       }
149       // arrive here when ready_queue[i] == BEGIN_TOKEN
150     }
151 
152     return COMPLETED_TOKEN;
153   }
154 
155   KOKKOS_INLINE_FUNCTION
completed_work(std::int32_t w) const156   void completed_work(std::int32_t w) const noexcept {
157     Kokkos::memory_fence();
158 
159     // Make sure the completed work function's memory accesses are flushed.
160 
161     const std::int32_t N = m_graph.numRows();
162 
163     std::int32_t volatile* const count_queue = &m_queue[N];
164 
165     const std::int32_t B = m_graph.row_map(w);
166     const std::int32_t E = m_graph.row_map(w + 1);
167 
168     for (std::int32_t i = B; i < E; ++i) {
169       const std::int32_t j = m_graph.entries(i);
170       if (1 == atomic_fetch_add(count_queue + j, -1)) {
171         push_work(j);
172       }
173     }
174   }
175 
176   struct TagInit {};
177   struct TagCount {};
178   struct TagReady {};
179 
180   /**\brief  Initialize queue
181    *
182    *  m_queue[0..N-1] = END_TOKEN, the ready queue
183    *  m_queue[N..2*N-1] = 0, the waiting count queue
184    *  m_queue[2*N..2*N+1] = 0, begin/end hints for ready queue
185    */
186   KOKKOS_INLINE_FUNCTION
operator ()(const TagInit,int i) const187   void operator()(const TagInit, int i) const noexcept {
188     m_queue[i] = i < m_graph.numRows() ? END_TOKEN : 0;
189   }
190 
191   KOKKOS_INLINE_FUNCTION
operator ()(const TagCount,int i) const192   void operator()(const TagCount, int i) const noexcept {
193     std::int32_t volatile* const count_queue = &m_queue[m_graph.numRows()];
194 
195     atomic_increment(count_queue + m_graph.entries[i]);
196   }
197 
198   KOKKOS_INLINE_FUNCTION
operator ()(const TagReady,int w) const199   void operator()(const TagReady, int w) const noexcept {
200     std::int32_t const* const count_queue = &m_queue[m_graph.numRows()];
201 
202     if (0 == count_queue[w]) push_work(w);
203   }
204 
space() const205   execution_space space() const { return execution_space(); }
206 
WorkGraphPolicy(const graph_type & arg_graph)207   WorkGraphPolicy(const graph_type& arg_graph)
208       : m_graph(arg_graph),
209         m_queue(view_alloc("queue", WithoutInitializing),
210                 arg_graph.numRows() * 2 + 2) {
211     {  // Initialize
212       using policy_type  = RangePolicy<std::int32_t, execution_space, TagInit>;
213       using closure_type = Kokkos::Impl::ParallelFor<self_type, policy_type>;
214       const closure_type closure(*this, policy_type(0, m_queue.size()));
215       closure.execute();
216       execution_space().fence();
217     }
218 
219     {  // execute-after counts
220       using policy_type  = RangePolicy<std::int32_t, execution_space, TagCount>;
221       using closure_type = Kokkos::Impl::ParallelFor<self_type, policy_type>;
222       const closure_type closure(*this, policy_type(0, m_graph.entries.size()));
223       closure.execute();
224       execution_space().fence();
225     }
226 
227     {  // Scheduling ready tasks
228       using policy_type  = RangePolicy<std::int32_t, execution_space, TagReady>;
229       using closure_type = Kokkos::Impl::ParallelFor<self_type, policy_type>;
230       const closure_type closure(*this, policy_type(0, m_graph.numRows()));
231       closure.execute();
232       execution_space().fence();
233     }
234   }
235 };
236 
237 }  // namespace Kokkos
238 
239 #ifdef KOKKOS_ENABLE_SERIAL
240 #include "impl/Kokkos_Serial_WorkGraphPolicy.hpp"
241 #endif
242 
243 #ifdef KOKKOS_ENABLE_OPENMP
244 #include "OpenMP/Kokkos_OpenMP_WorkGraphPolicy.hpp"
245 #endif
246 
247 #ifdef KOKKOS_ENABLE_CUDA
248 #include "Cuda/Kokkos_Cuda_WorkGraphPolicy.hpp"
249 #endif
250 
251 #ifdef KOKKOS_ENABLE_HIP
252 #include "HIP/Kokkos_HIP_WorkGraphPolicy.hpp"
253 #endif
254 
255 #ifdef KOKKOS_ENABLE_THREADS
256 #include "Threads/Kokkos_Threads_WorkGraphPolicy.hpp"
257 #endif
258 
259 #ifdef KOKKOS_ENABLE_HPX
260 #include "HPX/Kokkos_HPX_WorkGraphPolicy.hpp"
261 #endif
262 
263 #endif /* #define KOKKOS_WORKGRAPHPOLICY_HPP */
264