1 /* 2 * A test for adv_thread_pool dispatcher: all thread safe handlers 3 * must be finished before any thread unsafe handler. 4 */ 5 6 #include <iostream> 7 #include <set> 8 #include <vector> 9 #include <exception> 10 #include <stdexcept> 11 #include <cstdlib> 12 #include <thread> 13 #include <chrono> 14 #include <sstream> 15 16 #include <so_5/all.hpp> 17 18 #include <test/3rd_party/various_helpers/time_limited_execution.hpp> 19 20 #include "../for_each_lock_factory.hpp" 21 22 namespace atp_disp = so_5::disp::adv_thread_pool; 23 24 const unsigned int thread_count = 4; 25 26 struct msg_shutdown : public so_5::signal_t {}; 27 28 struct msg_safe_signal : public so_5::signal_t {}; 29 30 struct msg_unsafe_signal : public so_5::signal_t {}; 31 32 class a_test_t : public so_5::agent_t 33 { 34 public: a_test_t(so_5::environment_t & env)35 a_test_t( 36 so_5::environment_t & env ) 37 : so_5::agent_t( env ) 38 { 39 m_workers = 0; 40 } 41 42 void so_define_agent()43 so_define_agent() override 44 { 45 so_subscribe_self() 46 .event( &a_test_t::evt_shutdown ) 47 .event( &a_test_t::evt_safe_signal, so_5::thread_safe ) 48 .event( &a_test_t::evt_unsafe_signal ); 49 } 50 51 void so_evt_start()52 so_evt_start() override 53 { 54 for( size_t i = 0; i < 100; ++i ) 55 { 56 for( size_t j = 0; j != thread_count; ++j ) 57 so_5::send< msg_safe_signal >( *this ); 58 59 so_5::send< msg_unsafe_signal >( *this ); 60 } 61 62 so_5::send< msg_shutdown >( *this ); 63 } 64 65 void evt_shutdown(mhood_t<msg_shutdown>)66 evt_shutdown(mhood_t< msg_shutdown >) 67 { 68 so_environment().stop(); 69 } 70 71 void evt_safe_signal(mhood_t<msg_safe_signal>)72 evt_safe_signal(mhood_t< msg_safe_signal >) 73 { 74 ++m_workers; 75 76 while( thread_count != m_workers.load( std::memory_order_acquire ) ) 77 std::this_thread::yield(); 78 } 79 80 void evt_unsafe_signal(mhood_t<msg_unsafe_signal>)81 evt_unsafe_signal(mhood_t< msg_unsafe_signal >) 82 { 83 if( thread_count != m_workers ) 84 throw std::runtime_error( "m_workers != thread_count" ); 85 86 m_workers = 1; 87 88 using hrt = std::chrono::high_resolution_clock; 89 90 auto f = hrt::now() + std::chrono::milliseconds( 5 ); 91 92 do 93 { 94 if( 1 != m_workers ) 95 std::runtime_error( "m_workers != 1" ); 96 97 std::this_thread::sleep_for( std::chrono::microseconds( 100 ) ); 98 99 } while( f > hrt::now() ); 100 101 m_workers = 0; 102 } 103 104 private : 105 std::atomic_uint m_workers; 106 }; 107 108 void run_sobjectizer(atp_disp::queue_traits::lock_factory_t factory)109run_sobjectizer( atp_disp::queue_traits::lock_factory_t factory ) 110 { 111 so_5::launch( 112 [&]( so_5::environment_t & env ) 113 { 114 using namespace atp_disp; 115 116 env.register_agent_as_coop( 117 env.make_agent< a_test_t >(), 118 make_dispatcher( 119 env, 120 "thread_pool", 121 disp_params_t{} 122 .thread_count( thread_count ) 123 .set_queue_params( 124 queue_traits::queue_params_t{} 125 .lock_factory( factory ) ) ) 126 .binder() ); 127 } ); 128 } 129 130 int main()131main() 132 { 133 try 134 { 135 for_each_lock_factory( []( atp_disp::queue_traits::lock_factory_t factory ) { 136 run_with_time_limit( 137 [&]() 138 { 139 run_sobjectizer( factory ); 140 }, 141 20, 142 "unsafe_after_safe test" ); 143 } ); 144 } 145 catch( const std::exception & ex ) 146 { 147 std::cerr << "Error: " << ex.what() << std::endl; 148 return 1; 149 } 150 151 return 0; 152 } 153 154