1 /*
2  * A test for adv_thread_pool dispatcher: all thread safe handlers
3  * must be finished before any thread unsafe handler.
4  */
5 
6 #include <iostream>
7 #include <set>
8 #include <vector>
9 #include <exception>
10 #include <stdexcept>
11 #include <cstdlib>
12 #include <thread>
13 #include <chrono>
14 #include <sstream>
15 
16 #include <so_5/all.hpp>
17 
18 #include <test/3rd_party/various_helpers/time_limited_execution.hpp>
19 
20 #include "../for_each_lock_factory.hpp"
21 
22 namespace atp_disp = so_5::disp::adv_thread_pool;
23 
24 const unsigned int thread_count = 4;
25 
26 struct msg_shutdown : public so_5::signal_t {};
27 
28 struct msg_safe_signal : public so_5::signal_t {};
29 
30 struct msg_unsafe_signal : public so_5::signal_t {};
31 
32 class a_test_t : public so_5::agent_t
33 {
34 	public:
a_test_t(so_5::environment_t & env)35 		a_test_t(
36 			so_5::environment_t & env )
37 			:	so_5::agent_t( env )
38 		{
39 			m_workers = 0;
40 		}
41 
42 		void
so_define_agent()43 		so_define_agent() override
44 		{
45 			so_subscribe_self()
46 				.event( &a_test_t::evt_shutdown )
47 				.event( &a_test_t::evt_safe_signal, so_5::thread_safe )
48 				.event( &a_test_t::evt_unsafe_signal );
49 		}
50 
51 		void
so_evt_start()52 		so_evt_start() override
53 		{
54 			for( size_t i = 0; i < 100; ++i )
55 			{
56 				for( size_t j = 0; j != thread_count; ++j )
57 					so_5::send< msg_safe_signal >( *this );
58 
59 				so_5::send< msg_unsafe_signal >( *this );
60 			}
61 
62 			so_5::send< msg_shutdown >( *this );
63 		}
64 
65 		void
evt_shutdown(mhood_t<msg_shutdown>)66 		evt_shutdown(mhood_t< msg_shutdown >)
67 		{
68 			so_environment().stop();
69 		}
70 
71 		void
evt_safe_signal(mhood_t<msg_safe_signal>)72 		evt_safe_signal(mhood_t< msg_safe_signal >)
73 		{
74 			++m_workers;
75 
76 			while( thread_count != m_workers.load( std::memory_order_acquire ) )
77 				std::this_thread::yield();
78 		}
79 
80 		void
evt_unsafe_signal(mhood_t<msg_unsafe_signal>)81 		evt_unsafe_signal(mhood_t< msg_unsafe_signal >)
82 		{
83 			if( thread_count != m_workers )
84 				throw std::runtime_error( "m_workers != thread_count" );
85 
86 			m_workers = 1;
87 
88 			using hrt = std::chrono::high_resolution_clock;
89 
90 			auto f = hrt::now() + std::chrono::milliseconds( 5 );
91 
92 			do
93 			{
94 				if( 1 != m_workers )
95 					std::runtime_error( "m_workers != 1" );
96 
97 				std::this_thread::sleep_for( std::chrono::microseconds( 100 ) );
98 
99 			} while( f > hrt::now() );
100 
101 			m_workers = 0;
102 		}
103 
104 	private :
105 		std::atomic_uint m_workers;
106 };
107 
108 void
run_sobjectizer(atp_disp::queue_traits::lock_factory_t factory)109 run_sobjectizer( atp_disp::queue_traits::lock_factory_t factory )
110 {
111 	so_5::launch(
112 		[&]( so_5::environment_t & env )
113 		{
114 			using namespace atp_disp;
115 
116 			env.register_agent_as_coop(
117 					env.make_agent< a_test_t >(),
118 					make_dispatcher(
119 							env,
120 							"thread_pool",
121 							disp_params_t{}
122 									.thread_count( thread_count )
123 									.set_queue_params(
124 											queue_traits::queue_params_t{}
125 													.lock_factory( factory ) ) )
126 					.binder() );
127 		} );
128 }
129 
130 int
main()131 main()
132 {
133 	try
134 	{
135 		for_each_lock_factory( []( atp_disp::queue_traits::lock_factory_t factory ) {
136 			run_with_time_limit(
137 				[&]()
138 				{
139 					run_sobjectizer( factory );
140 				},
141 				20,
142 				"unsafe_after_safe test" );
143 		} );
144 	}
145 	catch( const std::exception & ex )
146 	{
147 		std::cerr << "Error: " << ex.what() << std::endl;
148 		return 1;
149 	}
150 
151 	return 0;
152 }
153 
154