1 /*
2  * A simple test for message limits (dropping the message at message peaks).
3  */
4 
5 #include <iostream>
6 #include <map>
7 #include <exception>
8 #include <stdexcept>
9 #include <cstdlib>
10 #include <thread>
11 #include <chrono>
12 
13 #include <so_5/all.hpp>
14 
15 #include <test/3rd_party/various_helpers/time_limited_execution.hpp>
16 
17 struct msg_ping : public so_5::signal_t {};
18 struct msg_pong : public so_5::signal_t {};
19 
20 struct msg_finish : public so_5::signal_t {};
21 
22 class a_sender_t : public so_5::agent_t
23 {
24 public :
a_sender_t(context_t ctx)25 	a_sender_t( context_t ctx )
26 		:	so_5::agent_t( ctx )
27 	{}
28 
29 	void
set_receiver(const so_5::mbox_t & mbox)30 	set_receiver( const so_5::mbox_t & mbox )
31 	{
32 		m_receiver = mbox;
33 	}
34 
35 	virtual void
so_define_agent()36 	so_define_agent() override
37 	{
38 		so_default_state().event(
39 			[&](mhood_t< msg_pong >) {
40 				const unsigned int max_series = 5;
41 
42 				++m_pongs;
43 				if( !( m_pongs % 2) )
44 				{
45 					if( m_series_sent < max_series )
46 						send_pings();
47 					else if( m_series_sent == max_series )
48 						so_5::send< msg_finish >( *this );
49 				}
50 			} );
51 
52 		so_default_state().event(
53 			[&](mhood_t< msg_finish >) {
54 				const auto expected = m_series_sent * 2;
55 				if( m_pongs == expected )
56 					so_deregister_agent_coop_normally();
57 				else
58 					throw std::runtime_error( "pongs count mismatch; "
59 							"expected: " + std::to_string( expected ) +
60 							", received: " + std::to_string( m_pongs ) );
61 			} );
62 	}
63 
64 	virtual void
so_evt_start()65 	so_evt_start() override
66 	{
67 		send_pings();
68 	}
69 
70 private :
71 	so_5::mbox_t m_receiver;
72 
73 	unsigned int m_series_sent = 0;
74 	unsigned int m_pongs = 0;
75 
76 	void
send_pings()77 	send_pings()
78 	{
79 		so_5::send< msg_ping >( m_receiver );
80 		so_5::send< msg_ping >( m_receiver );
81 		so_5::send< msg_ping >( m_receiver );
82 
83 		++m_series_sent;
84 	}
85 };
86 
87 class a_receiver_t : public so_5::agent_t
88 {
89 public :
a_receiver_t(context_t ctx,so_5::mbox_t sender)90 	a_receiver_t(
91 		context_t ctx,
92 		so_5::mbox_t sender )
93 		:	so_5::agent_t( ctx + limit_then_drop< msg_ping >( 2 ) )
94 		,	m_sender( std::move( sender ) )
95 	{}
96 
97 	virtual void
so_define_agent()98 	so_define_agent() override
99 	{
100 		so_default_state()
101 			.event( [&](mhood_t< msg_ping >){ so_5::send< msg_pong >( m_sender ); } );
102 	}
103 
104 private :
105 	const so_5::mbox_t m_sender;
106 };
107 
108 void
init(so_5::environment_t & env)109 init( so_5::environment_t & env )
110 {
111 	auto coop = env.make_coop();
112 	auto sender = coop->make_agent< a_sender_t >();
113 	auto receiver = coop->make_agent< a_receiver_t >( sender->so_direct_mbox() );
114 	sender->set_receiver( receiver->so_direct_mbox() );
115 
116 	env.register_coop( std::move( coop ) );
117 }
118 
119 int
main()120 main()
121 {
122 	try
123 	{
124 		run_with_time_limit(
125 			[]()
126 			{
127 				so_5::launch( &init );
128 			},
129 			20,
130 			"simple message drop at peaks test" );
131 	}
132 	catch( const std::exception & ex )
133 	{
134 		std::cerr << "Error: " << ex.what() << std::endl;
135 		return 1;
136 	}
137 
138 	return 0;
139 }
140 
141