1 /* 2 //@HEADER 3 // ************************************************************************ 4 // 5 // Kokkos v. 3.0 6 // Copyright (2020) National Technology & Engineering 7 // Solutions of Sandia, LLC (NTESS). 8 // 9 // Under the terms of Contract DE-NA0003525 with NTESS, 10 // the U.S. Government retains certain rights in this software. 11 // 12 // Redistribution and use in source and binary forms, with or without 13 // modification, are permitted provided that the following conditions are 14 // met: 15 // 16 // 1. Redistributions of source code must retain the above copyright 17 // notice, this list of conditions and the following disclaimer. 18 // 19 // 2. Redistributions in binary form must reproduce the above copyright 20 // notice, this list of conditions and the following disclaimer in the 21 // documentation and/or other materials provided with the distribution. 22 // 23 // 3. Neither the name of the Corporation nor the names of the 24 // contributors may be used to endorse or promote products derived from 25 // this software without specific prior written permission. 26 // 27 // THIS SOFTWARE IS PROVIDED BY NTESS "AS IS" AND ANY 28 // EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 29 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 30 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL NTESS OR THE 31 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 32 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 33 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 34 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 35 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 36 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 37 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 38 // 39 // Questions? Contact Christian R. Trott (crtrott@sandia.gov) 40 // 41 // ************************************************************************ 42 //@HEADER 43 */ 44 45 #ifndef KOKKOS_WORKGRAPHPOLICY_HPP 46 #define KOKKOS_WORKGRAPHPOLICY_HPP 47 48 #include <impl/Kokkos_AnalyzePolicy.hpp> 49 #include <Kokkos_Crs.hpp> 50 51 namespace Kokkos { 52 namespace Impl { 53 54 template <class functor_type, class execution_space, class... policy_args> 55 class WorkGraphExec; 56 57 } 58 } // namespace Kokkos 59 60 namespace Kokkos { 61 62 template <class... Properties> 63 class WorkGraphPolicy : public Kokkos::Impl::PolicyTraits<Properties...> { 64 public: 65 using execution_policy = WorkGraphPolicy<Properties...>; 66 using self_type = WorkGraphPolicy<Properties...>; 67 using traits = Kokkos::Impl::PolicyTraits<Properties...>; 68 using index_type = typename traits::index_type; 69 using member_type = index_type; 70 using execution_space = typename traits::execution_space; 71 using memory_space = typename execution_space::memory_space; 72 using graph_type = Kokkos::Crs<index_type, execution_space, void, index_type>; 73 74 enum : std::int32_t { 75 END_TOKEN = -1, 76 BEGIN_TOKEN = -2, 77 COMPLETED_TOKEN = -3 78 }; 79 80 private: 81 using ints_type = Kokkos::View<std::int32_t*, memory_space>; 82 83 // Let N = m_graph.numRows(), the total work 84 // m_queue[ 0 .. N-1] = the ready queue 85 // m_queue[ N .. 2*N-1] = the waiting queue counts 86 // m_queue[2*N .. 2*N+2] = the ready queue hints 87 88 graph_type const m_graph; 89 ints_type m_queue; 90 91 KOKKOS_INLINE_FUNCTION push_work(const std::int32_t w) const92 void push_work(const std::int32_t w) const noexcept { 93 const std::int32_t N = m_graph.numRows(); 94 95 std::int32_t volatile* const ready_queue = &m_queue[0]; 96 std::int32_t volatile* const end_hint = &m_queue[2 * N + 1]; 97 98 // Push work to end of queue 99 const std::int32_t j = atomic_fetch_add(end_hint, 1); 100 101 if ((N <= j) || (END_TOKEN != atomic_exchange(ready_queue + j, w))) { 102 // ERROR: past the end of queue or did not replace END_TOKEN 103 Kokkos::abort("WorkGraphPolicy push_work error"); 104 } 105 106 memory_fence(); 107 } 108 109 public: 110 /**\brief Attempt to pop the work item at the head of the queue. 111 * 112 * Find entry 'i' such that 113 * ( m_queue[i] != BEGIN_TOKEN ) AND 114 * ( i == 0 OR m_queue[i-1] == BEGIN_TOKEN ) 115 * if found then 116 * increment begin hint 117 * return atomic_exchange( m_queue[i] , BEGIN_TOKEN ) 118 * else if i < total work 119 * return END_TOKEN 120 * else 121 * return COMPLETED_TOKEN 122 * 123 */ 124 KOKKOS_INLINE_FUNCTION pop_work() const125 std::int32_t pop_work() const noexcept { 126 const std::int32_t N = m_graph.numRows(); 127 128 std::int32_t volatile* const ready_queue = &m_queue[0]; 129 std::int32_t volatile* const begin_hint = &m_queue[2 * N]; 130 131 // begin hint is guaranteed to be less than or equal to 132 // actual begin location in the queue. 133 134 for (std::int32_t i = *begin_hint; i < N; ++i) { 135 const std::int32_t w = ready_queue[i]; 136 137 if (w == END_TOKEN) { 138 return END_TOKEN; 139 } 140 141 if ((w != BEGIN_TOKEN) && 142 (w == atomic_compare_exchange(ready_queue + i, w, 143 (std::int32_t)BEGIN_TOKEN))) { 144 // Attempt to claim ready work index succeeded, 145 // update the hint and return work index 146 atomic_increment(begin_hint); 147 return w; 148 } 149 // arrive here when ready_queue[i] == BEGIN_TOKEN 150 } 151 152 return COMPLETED_TOKEN; 153 } 154 155 KOKKOS_INLINE_FUNCTION completed_work(std::int32_t w) const156 void completed_work(std::int32_t w) const noexcept { 157 Kokkos::memory_fence(); 158 159 // Make sure the completed work function's memory accesses are flushed. 160 161 const std::int32_t N = m_graph.numRows(); 162 163 std::int32_t volatile* const count_queue = &m_queue[N]; 164 165 const std::int32_t B = m_graph.row_map(w); 166 const std::int32_t E = m_graph.row_map(w + 1); 167 168 for (std::int32_t i = B; i < E; ++i) { 169 const std::int32_t j = m_graph.entries(i); 170 if (1 == atomic_fetch_add(count_queue + j, -1)) { 171 push_work(j); 172 } 173 } 174 } 175 176 struct TagInit {}; 177 struct TagCount {}; 178 struct TagReady {}; 179 180 /**\brief Initialize queue 181 * 182 * m_queue[0..N-1] = END_TOKEN, the ready queue 183 * m_queue[N..2*N-1] = 0, the waiting count queue 184 * m_queue[2*N..2*N+1] = 0, begin/end hints for ready queue 185 */ 186 KOKKOS_INLINE_FUNCTION operator ()(const TagInit,int i) const187 void operator()(const TagInit, int i) const noexcept { 188 m_queue[i] = i < m_graph.numRows() ? END_TOKEN : 0; 189 } 190 191 KOKKOS_INLINE_FUNCTION operator ()(const TagCount,int i) const192 void operator()(const TagCount, int i) const noexcept { 193 std::int32_t volatile* const count_queue = &m_queue[m_graph.numRows()]; 194 195 atomic_increment(count_queue + m_graph.entries[i]); 196 } 197 198 KOKKOS_INLINE_FUNCTION operator ()(const TagReady,int w) const199 void operator()(const TagReady, int w) const noexcept { 200 std::int32_t const* const count_queue = &m_queue[m_graph.numRows()]; 201 202 if (0 == count_queue[w]) push_work(w); 203 } 204 space() const205 execution_space space() const { return execution_space(); } 206 WorkGraphPolicy(const graph_type & arg_graph)207 WorkGraphPolicy(const graph_type& arg_graph) 208 : m_graph(arg_graph), 209 m_queue(view_alloc("queue", WithoutInitializing), 210 arg_graph.numRows() * 2 + 2) { 211 { // Initialize 212 using policy_type = RangePolicy<std::int32_t, execution_space, TagInit>; 213 using closure_type = Kokkos::Impl::ParallelFor<self_type, policy_type>; 214 const closure_type closure(*this, policy_type(0, m_queue.size())); 215 closure.execute(); 216 execution_space().fence(); 217 } 218 219 { // execute-after counts 220 using policy_type = RangePolicy<std::int32_t, execution_space, TagCount>; 221 using closure_type = Kokkos::Impl::ParallelFor<self_type, policy_type>; 222 const closure_type closure(*this, policy_type(0, m_graph.entries.size())); 223 closure.execute(); 224 execution_space().fence(); 225 } 226 227 { // Scheduling ready tasks 228 using policy_type = RangePolicy<std::int32_t, execution_space, TagReady>; 229 using closure_type = Kokkos::Impl::ParallelFor<self_type, policy_type>; 230 const closure_type closure(*this, policy_type(0, m_graph.numRows())); 231 closure.execute(); 232 execution_space().fence(); 233 } 234 } 235 }; 236 237 } // namespace Kokkos 238 239 #ifdef KOKKOS_ENABLE_SERIAL 240 #include "impl/Kokkos_Serial_WorkGraphPolicy.hpp" 241 #endif 242 243 #ifdef KOKKOS_ENABLE_OPENMP 244 #include "OpenMP/Kokkos_OpenMP_WorkGraphPolicy.hpp" 245 #endif 246 247 #ifdef KOKKOS_ENABLE_CUDA 248 #include "Cuda/Kokkos_Cuda_WorkGraphPolicy.hpp" 249 #endif 250 251 #ifdef KOKKOS_ENABLE_HIP 252 #include "HIP/Kokkos_HIP_WorkGraphPolicy.hpp" 253 #endif 254 255 #ifdef KOKKOS_ENABLE_THREADS 256 #include "Threads/Kokkos_Threads_WorkGraphPolicy.hpp" 257 #endif 258 259 #ifdef KOKKOS_ENABLE_HPX 260 #include "HPX/Kokkos_HPX_WorkGraphPolicy.hpp" 261 #endif 262 263 #endif /* #define KOKKOS_WORKGRAPHPOLICY_HPP */ 264