1 2 // Copyright Oliver Kowalke 2013. 3 // Distributed under the Boost Software License, Version 1.0. 4 // (See accompanying file LICENSE_1_0.txt or copy at 5 // http://www.boost.org/LICENSE_1_0.txt) 6 7 #include "boost/fiber/algo/shared_work.hpp" 8 9 #include <boost/assert.hpp> 10 11 #include "boost/fiber/type.hpp" 12 13 #ifdef BOOST_HAS_ABI_HEADERS 14 # include BOOST_ABI_PREFIX 15 #endif 16 17 namespace boost { 18 namespace fibers { 19 namespace algo { 20 21 //[awakened_ws 22 void awakened(context * ctx)23shared_work::awakened( context * ctx) noexcept { 24 if ( ctx->is_context( type::pinned_context) ) { /*< 25 recognize when we're passed this thread's main fiber (or an 26 implicit library helper fiber): never put those on the shared 27 queue 28 >*/ 29 lqueue_.push_back( * ctx); 30 } else { 31 ctx->detach(); 32 std::unique_lock< std::mutex > lk{ rqueue_mtx_ }; /*< 33 worker fiber, enqueue on shared queue 34 >*/ 35 rqueue_.push_back( ctx); 36 } 37 } 38 //] 39 40 //[pick_next_ws 41 context * pick_next()42shared_work::pick_next() noexcept { 43 context * ctx = nullptr; 44 std::unique_lock< std::mutex > lk{ rqueue_mtx_ }; 45 if ( ! rqueue_.empty() ) { /*< 46 pop an item from the ready queue 47 >*/ 48 ctx = rqueue_.front(); 49 rqueue_.pop_front(); 50 lk.unlock(); 51 BOOST_ASSERT( nullptr != ctx); 52 context::active()->attach( ctx); /*< 53 attach context to current scheduler via the active fiber 54 of this thread 55 >*/ 56 } else { 57 lk.unlock(); 58 if ( ! lqueue_.empty() ) { /*< 59 nothing in the ready queue, return main or dispatcher fiber 60 >*/ 61 ctx = & lqueue_.front(); 62 lqueue_.pop_front(); 63 } 64 } 65 return ctx; 66 } 67 //] 68 69 void suspend_until(std::chrono::steady_clock::time_point const & time_point)70shared_work::suspend_until( std::chrono::steady_clock::time_point const& time_point) noexcept { 71 if ( suspend_) { 72 if ( (std::chrono::steady_clock::time_point::max)() == time_point) { 73 std::unique_lock< std::mutex > lk{ mtx_ }; 74 cnd_.wait( lk, [this](){ return flag_; }); 75 flag_ = false; 76 } else { 77 std::unique_lock< std::mutex > lk{ mtx_ }; 78 cnd_.wait_until( lk, time_point, [this](){ return flag_; }); 79 flag_ = false; 80 } 81 } 82 } 83 84 void notify()85shared_work::notify() noexcept { 86 if ( suspend_) { 87 std::unique_lock< std::mutex > lk{ mtx_ }; 88 flag_ = true; 89 lk.unlock(); 90 cnd_.notify_all(); 91 } 92 } 93 94 shared_work::rqueue_type shared_work::rqueue_{}; 95 std::mutex shared_work::rqueue_mtx_{}; 96 97 }}} 98 99 #ifdef BOOST_HAS_ABI_HEADERS 100 # include BOOST_ABI_SUFFIX 101 #endif 102