1 #include <iostream>
2 #include <set>
3 #include <chrono>
4 
5 #include <cstdio>
6 #include <cstdlib>
7 
8 #include <so_5/all.hpp>
9 
10 #include <test/3rd_party/various_helpers/time_limited_execution.hpp>
11 
12 class a_ring_member_t : public so_5::agent_t
13 	{
14 	public :
15 		struct msg_start : public so_5::signal_t {};
16 
17 		struct msg_your_turn
18 			{
19 				unsigned long long m_request_number;
20 			};
21 
a_ring_member_t(context_t ctx)22 		a_ring_member_t( context_t ctx )
23 			:	so_5::agent_t( ctx )
24 			{}
25 
26 		void
set_next_mbox(const so_5::mbox_t & mbox)27 		set_next_mbox( const so_5::mbox_t & mbox )
28 			{
29 				m_next_mbox = mbox;
30 			}
31 
32 		void
so_define_agent()33 		so_define_agent() override
34 			{
35 				so_default_state()
36 					.event( &a_ring_member_t::evt_start )
37 					.event( &a_ring_member_t::evt_your_turn );
38 			}
39 
40 		void
evt_start(mhood_t<msg_start>)41 		evt_start(mhood_t< msg_start >)
42 			{
43 				so_5::send< msg_your_turn >( m_next_mbox, 0ull );
44 			}
45 
46 		void
evt_your_turn(const msg_your_turn & evt)47 		evt_your_turn( const msg_your_turn & evt )
48 			{
49 				++m_rounds_passed;
50 				if( m_rounds_passed < m_rounds )
51 					so_5::send< msg_your_turn >( m_next_mbox, evt.m_request_number + 1 );
52 				else
53 					{
54 						so_environment().stop();
55 					}
56 			}
57 
58 	private :
59 		so_5::mbox_t m_self_mbox;
60 		so_5::mbox_t m_next_mbox;
61 
62 		unsigned int m_rounds_passed = 0;
63 		const unsigned int m_rounds = 20;
64 	};
65 
66 using lock_factory_t = so_5::disp::mpsc_queue_traits::lock_factory_t;
67 
68 class case_setter_t;
69 
70 struct cleanup_caller_t
71 	{
72 		void operator()(case_setter_t *) noexcept;
73 	};
74 
75 using case_setter_cleaner_t = std::unique_ptr<case_setter_t, cleanup_caller_t>;
76 
77 class case_setter_t
78 	{
79 	public :
case_setter_t(lock_factory_t lock_factory)80 		case_setter_t( lock_factory_t lock_factory )
81 			:	m_lock_factory{ std::move(lock_factory) }
82 			{}
~case_setter_t()83 		virtual ~case_setter_t() {}
84 
85 		virtual void
tune_env_params(so_5::environment_params_t &)86 		tune_env_params( so_5::environment_params_t & )
87 			{
88 				// Nothing to do by default.
89 			}
90 
91 		virtual case_setter_cleaner_t
92 		make_dispatcher( so_5::environment_t & env ) = 0;
93 
94 		virtual so_5::disp_binder_shptr_t
95 		make_binder() = 0;
96 
97 		virtual void
98 		cleanup() noexcept = 0;
99 
100 	protected :
101 		const lock_factory_t &
lock_factory() const102 		lock_factory() const { return m_lock_factory; }
103 
104 		template< typename P >
105 		P
setup_lock_factory(P params) const106 		setup_lock_factory( P params ) const
107 			{
108 				params.tune_queue_params(
109 					[&]( so_5::disp::mpsc_queue_traits::queue_params_t & p ) {
110 						p.lock_factory( m_lock_factory );
111 					} );
112 
113 				return params;
114 			}
115 
make_cleaner()116 		auto make_cleaner() noexcept
117 			{
118 				return case_setter_cleaner_t{ this };
119 			}
120 
121 	private :
122 		const lock_factory_t m_lock_factory;
123 	};
124 
125 using case_setter_unique_ptr_t = std::unique_ptr< case_setter_t >;
126 
127 void
operator ()(case_setter_t * setter)128 cleanup_caller_t::operator()( case_setter_t * setter ) noexcept
129 	{
130 		setter->cleanup();
131 	}
132 
133 class default_disp_setter_t final : public case_setter_t
134 	{
135 		so_5::disp_binder_shptr_t m_binder;
136 
137 	public :
138 		using case_setter_t::case_setter_t;
139 
140 		void
tune_env_params(so_5::environment_params_t & params)141 		tune_env_params( so_5::environment_params_t & params ) override
142 			{
143 				params.default_disp_params(
144 					setup_lock_factory( so_5::disp::one_thread::disp_params_t{} ) );
145 			}
146 
147 		case_setter_cleaner_t
make_dispatcher(so_5::environment_t & env)148 		make_dispatcher( so_5::environment_t & env ) override
149 			{
150 				m_binder = so_5::make_default_disp_binder( env );
151 				return make_cleaner();
152 			}
153 
154 		so_5::disp_binder_shptr_t
make_binder()155 		make_binder() override
156 			{
157 				return m_binder;
158 			}
159 
160 		void
cleanup()161 		cleanup() noexcept override
162 			{
163 				m_binder.reset();
164 			}
165 	};
166 
167 class one_thread_case_setter_t final : public case_setter_t
168 	{
169 		so_5::disp::one_thread::dispatcher_handle_t m_disp;
170 
171 	public :
172 		using case_setter_t::case_setter_t;
173 
174 		case_setter_cleaner_t
make_dispatcher(so_5::environment_t & env)175 		make_dispatcher( so_5::environment_t & env ) override
176 			{
177 				m_disp = so_5::disp::one_thread::make_dispatcher(
178 						env,
179 						"one_thread",
180 						setup_lock_factory( so_5::disp::one_thread::disp_params_t{} ) );
181 				return make_cleaner();
182 			}
183 
184 		so_5::disp_binder_shptr_t
make_binder()185 		make_binder() override
186 			{
187 				return m_disp.binder();
188 			}
189 
190 		void
cleanup()191 		cleanup() noexcept override
192 			{
193 				m_disp.reset();
194 			}
195 	};
196 
197 class active_obj_case_setter_t : public case_setter_t
198 	{
199 		so_5::disp::active_obj::dispatcher_handle_t m_disp;
200 
201 	public :
202 		using case_setter_t::case_setter_t;
203 
204 		case_setter_cleaner_t
make_dispatcher(so_5::environment_t & env)205 		make_dispatcher( so_5::environment_t & env ) override
206 			{
207 				m_disp = so_5::disp::active_obj::make_dispatcher(
208 						env,
209 						"active_obj",
210 						setup_lock_factory(
211 								so_5::disp::active_obj::disp_params_t{} ) );
212 
213 				return make_cleaner();
214 			}
215 
216 		so_5::disp_binder_shptr_t
make_binder()217 		make_binder() override
218 			{
219 				return m_disp.binder();
220 			}
221 
222 		void
cleanup()223 		cleanup() noexcept override
224 			{
225 				m_disp.reset();
226 			}
227 	};
228 
229 class active_group_case_setter_t : public case_setter_t
230 	{
231 		so_5::disp::active_group::dispatcher_handle_t m_disp;
232 
233 	public :
234 		using case_setter_t::case_setter_t;
235 
236 		case_setter_cleaner_t
make_dispatcher(so_5::environment_t & env)237 		make_dispatcher( so_5::environment_t & env ) override
238 			{
239 				m_disp = so_5::disp::active_group::make_dispatcher(
240 						env,
241 						"active_group",
242 						setup_lock_factory(
243 								so_5::disp::active_group::disp_params_t{} ) );
244 
245 				return make_cleaner();
246 			}
247 
248 		so_5::disp_binder_shptr_t
make_binder()249 		make_binder() override
250 			{
251 				auto id = ++m_id;
252 				return m_disp.binder( std::to_string(id) );
253 			}
254 
255 		void
cleanup()256 		cleanup() noexcept override
257 			{
258 				m_disp.reset();
259 			}
260 
261 	private :
262 		unsigned int m_id = {0};
263 	};
264 
265 class prio_strictly_ordered_case_setter_t : public case_setter_t
266 	{
267 		so_5::disp::prio_one_thread::strictly_ordered::dispatcher_handle_t m_disp;
268 
269 	public :
270 		using case_setter_t::case_setter_t;
271 
272 		case_setter_cleaner_t
make_dispatcher(so_5::environment_t & env)273 		make_dispatcher( so_5::environment_t & env ) override
274 			{
275 				namespace disp_ns = so_5::disp::prio_one_thread::strictly_ordered;
276 				m_disp = disp_ns::make_dispatcher(
277 						env,
278 						"prio::strictly_ordered",
279 						setup_lock_factory( disp_ns::disp_params_t{} ) );
280 
281 				return make_cleaner();
282 			}
283 
284 		so_5::disp_binder_shptr_t
make_binder()285 		make_binder() override
286 			{
287 				return m_disp.binder();
288 			}
289 
290 		void
cleanup()291 		cleanup() noexcept override
292 			{
293 				m_disp.reset();
294 			}
295 	};
296 
297 class prio_quoted_round_robin_case_setter_t : public case_setter_t
298 	{
299 		so_5::disp::prio_one_thread::quoted_round_robin::dispatcher_handle_t m_disp;
300 
301 	public :
302 		using case_setter_t::case_setter_t;
303 
304 		case_setter_cleaner_t
make_dispatcher(so_5::environment_t & env)305 		make_dispatcher( so_5::environment_t & env ) override
306 			{
307 				namespace disp_ns = so_5::disp::prio_one_thread::quoted_round_robin;
308 				m_disp = disp_ns::make_dispatcher(
309 						env,
310 						"prio::quoted_round_robin",
311 						disp_ns::quotes_t{ 10 },
312 						setup_lock_factory( disp_ns::disp_params_t{} ) );
313 
314 				return make_cleaner();
315 			}
316 
317 		so_5::disp_binder_shptr_t
make_binder()318 		make_binder() override
319 			{
320 				return m_disp.binder();
321 			}
322 
323 		void
cleanup()324 		cleanup() noexcept override
325 			{
326 				m_disp.reset();
327 			}
328 	};
329 
330 class one_per_prio_case_setter_t : public case_setter_t
331 	{
332 		so_5::disp::prio_dedicated_threads::one_per_prio::dispatcher_handle_t m_disp;
333 
334 	public :
335 		using case_setter_t::case_setter_t;
336 
337 		case_setter_cleaner_t
make_dispatcher(so_5::environment_t & env)338 		make_dispatcher( so_5::environment_t & env ) override
339 			{
340 				namespace disp_ns = so_5::disp::prio_dedicated_threads::one_per_prio;
341 				m_disp = disp_ns::make_dispatcher(
342 						env,
343 						"prio::one_per_prio",
344 						setup_lock_factory( disp_ns::disp_params_t{} ) );
345 
346 				return make_cleaner();
347 			}
348 
349 		so_5::disp_binder_shptr_t
make_binder()350 		make_binder() override
351 			{
352 				return m_disp.binder();
353 			}
354 
355 		void
cleanup()356 		cleanup() noexcept override
357 			{
358 				m_disp.reset();
359 			}
360 	};
361 
362 void
create_coop(so_5::environment_t & env,case_setter_t & setter)363 create_coop(
364 	so_5::environment_t & env,
365 	case_setter_t & setter )
366 	{
367 		auto setter_cleaner = setter.make_dispatcher( env );
368 
369 		so_5::mbox_t first_agent_mbox = env.introduce_coop(
370 			[&]( so_5::coop_t & coop )
371 			{
372 				const std::size_t ring_size = 16;
373 
374 				std::vector< a_ring_member_t * > agents;
375 				agents.reserve( ring_size );
376 
377 				std::vector< so_5::mbox_t > mboxes;
378 				mboxes.reserve( ring_size );
379 
380 				for( unsigned int i = 0; i != ring_size; ++i )
381 					{
382 						auto member = coop.make_agent_with_binder< a_ring_member_t >(
383 								setter.make_binder() );
384 						agents.push_back( member );
385 						mboxes.push_back( member->so_direct_mbox() );
386 					}
387 
388 				for( unsigned int i = 0; i != ring_size; ++i )
389 					{
390 						agents[ i ]->set_next_mbox(
391 								mboxes[ (i + 1) % ring_size ] );
392 					}
393 
394 				return mboxes[ 0 ];
395 			} );
396 
397 		so_5::send< a_ring_member_t::msg_start >( first_agent_mbox );
398 	}
399 
400 using case_maker_t = std::function<
401 	case_setter_unique_ptr_t(lock_factory_t) >;
402 
403 template< typename Setter >
maker()404 case_maker_t maker()
405 	{
406 		return []( lock_factory_t lock_factory ) {
407 			return std::make_unique< Setter >( std::move(lock_factory) );
408 		};
409 	}
410 
411 void
do_test()412 do_test()
413 	{
414 		struct case_info_t
415 			{
416 				std::string m_disp_name;
417 				case_maker_t m_maker;
418 			};
419 		std::vector< case_info_t > cases;
420 		cases.push_back( case_info_t{
421 				"default_disp", maker< default_disp_setter_t >() } );
422 		cases.push_back( case_info_t{
423 				"one_thread", maker< one_thread_case_setter_t >() } );
424 		cases.push_back( case_info_t{
425 				"active_obj", maker< active_obj_case_setter_t >() } );
426 		cases.push_back( case_info_t{
427 				"active_group", maker< active_group_case_setter_t >() } );
428 		cases.push_back( case_info_t{
429 				"prio::strictly_ordered",
430 				maker< prio_strictly_ordered_case_setter_t >() } );
431 		cases.push_back( case_info_t{
432 				"prio::quoted_round_robin",
433 				maker< prio_quoted_round_robin_case_setter_t >() } );
434 		cases.push_back( case_info_t{
435 				"prio::one_per_prio",
436 				maker< one_per_prio_case_setter_t >() } );
437 
438 		struct lock_factory_info_t
439 			{
440 				std::string m_name;
441 				lock_factory_t m_factory;
442 			};
443 		std::vector< lock_factory_info_t > factories;
444 		factories.push_back( lock_factory_info_t{
445 				"combined_lock", so_5::disp::mpsc_queue_traits::combined_lock_factory() } );
446 		factories.push_back( lock_factory_info_t{
447 				"combined_lock(1s)",
448 				so_5::disp::mpsc_queue_traits::combined_lock_factory(
449 						std::chrono::seconds(1) ) } );
450 		factories.push_back( lock_factory_info_t{
451 				"simple_lock",
452 				so_5::disp::mpsc_queue_traits::simple_lock_factory() } );
453 
454 		for( const auto & c : cases )
455 			for( const auto & f : factories )
456 				{
457 					std::cout << "--- " << c.m_disp_name << "+" << f.m_name << "---"
458 							<< std::endl;
459 
460 					run_with_time_limit( [&] {
461 								auto setter = c.m_maker( f.m_factory );
462 
463 								so_5::launch(
464 									[&]( so_5::environment_t & env ) {
465 										create_coop( env, *setter );
466 									},
467 									[&]( so_5::environment_params_t & params ) {
468 										setter->tune_env_params( params );
469 									} );
470 							},
471 							100,
472 							"dispatcher: " + c.m_disp_name + ", lock: " + f.m_name );
473 
474 					std::cout << "--- DONE ---" << std::endl;
475 				}
476 	}
477 
478 int
main()479 main()
480 {
481 	try
482 	{
483 		do_test();
484 
485 		return 0;
486 	}
487 	catch( const std::exception & x )
488 	{
489 		std::cerr << "*** Exception caught: " << x.what() << std::endl;
490 	}
491 
492 	return 2;
493 }
494 
495