1 /*
2  * SObjectizer-5
3  */
4 
5 /*!
6  * \since
7  * v.5.5.3
8  *
9  * \file
10  * \brief A vector-based storage for agent's subscriptions information.
11  */
12 
13 #include <so_5/impl/subscription_storage_iface.hpp>
14 
15 #include <algorithm>
16 #include <vector>
17 #include <iterator>
18 
19 #include <so_5/details/rollback_on_exception.hpp>
20 
21 namespace so_5
22 {
23 
24 namespace impl
25 {
26 
27 /*!
28  * \since
29  * v.5.5.3
30  *
31  * \brief A vector-based storage for agent's subscriptions information.
32  */
33 namespace vector_based_subscr_storage
34 {
35 
36 /*!
37  * \since
38  * v.5.5.3
39  *
40  * \brief A vector-based storage for agent's subscriptions information.
41  *
42  * This is very simple implementation of subscription storage which
43  * uses std::vector for storing information.
44  *
45  * All manipulation is performed by very simple linear search inside
46  * that vector. For agents with few subscriptions this will be the most
47  * efficient approach.
48  */
49 class storage_t : public subscription_storage_t
50 	{
51 	public :
52 		storage_t(
53 			agent_t * owner,
54 			std::size_t initial_capacity );
55 		~storage_t() override;
56 
57 		virtual void
58 		create_event_subscription(
59 			const mbox_t & mbox_ref,
60 			const std::type_index & type_index,
61 			const message_limit::control_block_t * limit,
62 			const state_t & target_state,
63 			const event_handler_method_t & method,
64 			thread_safety_t thread_safety,
65 			event_handler_kind_t handler_kind ) override;
66 
67 		virtual void
68 		drop_subscription(
69 			const mbox_t & mbox,
70 			const std::type_index & msg_type,
71 			const state_t & target_state ) override;
72 
73 		void
74 		drop_subscription_for_all_states(
75 			const mbox_t & mbox,
76 			const std::type_index & msg_type ) override;
77 
78 		const event_handler_data_t *
79 		find_handler(
80 			mbox_id_t mbox_id,
81 			const std::type_index & msg_type,
82 			const state_t & current_state ) const noexcept override;
83 
84 		void
85 		debug_dump( std::ostream & to ) const override;
86 
87 		void
88 		drop_content() override;
89 
90 		subscription_storage_common::subscr_info_vector_t
91 		query_content() const override;
92 
93 		void
94 		setup_content(
95 			subscription_storage_common::subscr_info_vector_t && info ) override;
96 
97 		std::size_t
98 		query_subscriptions_count() const override;
99 
100 	private :
101 		using info_t = subscription_storage_common::subscr_info_t;
102 		using subscr_info_vector_t =
103 				subscription_storage_common::subscr_info_vector_t;
104 
105 		//! A helper predicate for searching the same
106 		//! mbox and message type pairs.
107 		struct is_same_mbox_msg
108 			{
109 				const mbox_id_t m_id;
110 				const std::type_index & m_type;
111 
112 				bool
operator ()so_5::impl::vector_based_subscr_storage::storage_t::is_same_mbox_msg113 				operator()( const info_t & info ) const
114 					{
115 						return m_id == info.m_mbox->id() &&
116 								m_type == info.m_msg_type;
117 					}
118 			};
119 
120 		//! Subscription information.
121 		subscr_info_vector_t m_events;
122 
123 		void
124 		destroy_all_subscriptions();
125 	};
126 
127 namespace
128 {
129 	template< class Container >
130 	auto
find(Container & c,const mbox_id_t & mbox_id,const std::type_index & msg_type,const state_t & target_state)131 	find( Container & c,
132 		const mbox_id_t & mbox_id,
133 		const std::type_index & msg_type,
134 		const state_t & target_state ) -> decltype( c.begin() )
135 		{
136 			using namespace std;
137 
138 			return find_if( begin( c ), end( c ),
139 				[&]( typename Container::value_type const & o ) {
140 					return ( o.m_mbox->id() == mbox_id &&
141 						o.m_msg_type == msg_type &&
142 						o.m_state == &target_state );
143 				} );
144 		}
145 
146 } /* namespace anonymous */
147 
storage_t(agent_t * owner,std::size_t initial_capacity)148 storage_t::storage_t(
149 	agent_t * owner,
150 	std::size_t initial_capacity )
151 	:	subscription_storage_t( owner )
152 	{
153 		m_events.reserve( initial_capacity );
154 	}
155 
~storage_t()156 storage_t::~storage_t()
157 	{
158 		destroy_all_subscriptions();
159 	}
160 
161 void
create_event_subscription(const mbox_t & mbox,const std::type_index & msg_type,const message_limit::control_block_t * limit,const state_t & target_state,const event_handler_method_t & method,thread_safety_t thread_safety,event_handler_kind_t handler_kind)162 storage_t::create_event_subscription(
163 	const mbox_t & mbox,
164 	const std::type_index & msg_type,
165 	const message_limit::control_block_t * limit,
166 	const state_t & target_state,
167 	const event_handler_method_t & method,
168 	thread_safety_t thread_safety,
169 	event_handler_kind_t handler_kind )
170 	{
171 		using namespace std;
172 		using namespace subscription_storage_common;
173 
174 		const auto mbox_id = mbox->id();
175 
176 		// Check that this subscription is new.
177 		auto existed_position = find(
178 				m_events, mbox_id, msg_type, target_state );
179 
180 		if( existed_position != m_events.end() )
181 			SO_5_THROW_EXCEPTION(
182 				rc_evt_handler_already_provided,
183 				"agent is already subscribed to message, " +
184 				make_subscription_description( mbox, msg_type, target_state ) );
185 
186 		// Just add subscription to the end.
187 		m_events.emplace_back(
188 				mbox, msg_type, target_state, method, thread_safety, handler_kind );
189 
190 		// Note: since v.5.5.9 mbox subscription is initiated even if
191 		// it is MPSC mboxes. It is important for the case of message
192 		// delivery tracing.
193 
194 		// If there is no subscription for that mbox it must be created.
195 		// Last item in m_events should not be checked becase it is
196 		// description of the just added subscription.
197 		auto last_to_check = --end( m_events );
198 		if( last_to_check == find_if(
199 				begin( m_events ), last_to_check,
200 				is_same_mbox_msg{ mbox_id, msg_type } ) )
201 			{
202 				// Mbox must create subscription.
203 				so_5::details::do_with_rollback_on_exception(
204 					[&] {
205 						mbox->subscribe_event_handler(
206 								msg_type,
207 								limit,
208 								*owner() );
209 					},
210 					[&] {
211 						m_events.pop_back();
212 					} );
213 			}
214 	}
215 
216 void
drop_subscription(const mbox_t & mbox,const std::type_index & msg_type,const state_t & target_state)217 storage_t::drop_subscription(
218 	const mbox_t & mbox,
219 	const std::type_index & msg_type,
220 	const state_t & target_state )
221 	{
222 		using namespace std;
223 
224 		const auto mbox_id = mbox->id();
225 
226 		auto existed_position = find(
227 				m_events, mbox_id, msg_type, target_state );
228 		if( existed_position != m_events.end() )
229 			{
230 				m_events.erase( existed_position );
231 
232 				// Note v.5.5.9 unsubscribe_event_handlers is called for
233 				// mbox even if it is MPSC mbox. It is necessary for the case
234 				// of message delivery tracing.
235 
236 				// If there is no more subscriptions to that mbox then
237 				// the mbox must remove information about that agent.
238 				if( end( m_events ) == find_if(
239 						begin( m_events ), end( m_events ),
240 						is_same_mbox_msg{ mbox_id, msg_type } ) )
241 					{
242 						// If we are here then there is no more references
243 						// to the mbox. And mbox must not hold reference
244 						// to the agent.
245 						mbox->unsubscribe_event_handlers( msg_type, *owner() );
246 					}
247 			}
248 	}
249 
250 void
drop_subscription_for_all_states(const mbox_t & mbox,const std::type_index & msg_type)251 storage_t::drop_subscription_for_all_states(
252 	const mbox_t & mbox,
253 	const std::type_index & msg_type )
254 	{
255 		using namespace std;
256 
257 		const auto mbox_id = mbox->id();
258 
259 		const auto old_size = m_events.size();
260 
261 		m_events.erase(
262 				remove_if( begin( m_events ), end( m_events ),
263 						[mbox_id, &msg_type]( const info_t & i ) {
264 							return i.m_mbox->id() == mbox_id &&
265 									i.m_msg_type == msg_type;
266 						} ),
267 				end( m_events ) );
268 
269 		// Note: since v.5.5.9 mbox unsubscription is initiated even if
270 		// it is MPSC mboxes. It is important for the case of message
271 		// delivery tracing.
272 
273 		if( old_size != m_events.size() )
274 			mbox->unsubscribe_event_handlers( msg_type, *owner() );
275 	}
276 
277 const event_handler_data_t *
find_handler(mbox_id_t mbox_id,const std::type_index & msg_type,const state_t & current_state) const278 storage_t::find_handler(
279 	mbox_id_t mbox_id,
280 	const std::type_index & msg_type,
281 	const state_t & current_state ) const noexcept
282 	{
283 		auto it = find( m_events, mbox_id, msg_type, current_state );
284 
285 		if( it != std::end( m_events ) )
286 			return &(it->m_handler);
287 		else
288 			return nullptr;
289 	}
290 
291 void
debug_dump(std::ostream & to) const292 storage_t::debug_dump( std::ostream & to ) const
293 	{
294 		for( const auto & e : m_events )
295 			to << "{" << e.m_mbox->id() << ", "
296 					<< e.m_msg_type.name() << ", "
297 					<< e.m_state->query_name() << "}"
298 					<< std::endl;
299 	}
300 
301 void
destroy_all_subscriptions()302 storage_t::destroy_all_subscriptions()
303 	{
304 		if( m_events.empty() )
305 			// Nothing to do at empty subscription list.
306 			return;
307 
308 		using namespace std;
309 
310 		struct mbox_msg_type_pair_t
311 			{
312 				abstract_message_box_t * m_mbox;
313 				const type_index * m_msg_type;
314 
315 				bool
316 				operator<( const mbox_msg_type_pair_t & o ) const
317 					{
318 						return m_mbox < o.m_mbox ||
319 								( m_mbox == o.m_mbox &&
320 								 (*m_msg_type) < (*o.m_msg_type) );
321 					}
322 
323 				bool
324 				operator==( const mbox_msg_type_pair_t & o ) const
325 					{
326 						return m_mbox == o.m_mbox &&
327 								(*m_msg_type) == (*o.m_msg_type);
328 					}
329 			};
330 
331 		// First step: collect all pointers to mbox-es.
332 		vector< mbox_msg_type_pair_t > mboxes;
333 		mboxes.reserve( m_events.size() );
334 
335 		transform(
336 				begin( m_events ), end( m_events ),
337 				back_inserter( mboxes ),
338 				[]( info_t & i ) {
339 					return mbox_msg_type_pair_t{ i.m_mbox.get(), &i.m_msg_type };
340 				} );
341 
342 		// Second step: remove duplicates.
343 		sort( begin( mboxes ), end( mboxes ) );
344 		mboxes.erase(
345 				unique( begin( mboxes ), end( mboxes ) ),
346 				end( mboxes ) );
347 
348 		// Third step: destroy subscription in mboxes.
349 		for( auto m : mboxes )
350 			m.m_mbox->unsubscribe_event_handlers( *m.m_msg_type, *owner() );
351 
352 		// Fourth step: cleanup subscription vector.
353 		drop_content();
354 	}
355 
356 void
drop_content()357 storage_t::drop_content()
358 	{
359 		subscr_info_vector_t empty_events;
360 		m_events.swap( empty_events );
361 	}
362 
363 subscription_storage_common::subscr_info_vector_t
query_content() const364 storage_t::query_content() const
365 	{
366 		return m_events;
367 	}
368 
369 void
setup_content(subscription_storage_common::subscr_info_vector_t && info)370 storage_t::setup_content(
371 	subscription_storage_common::subscr_info_vector_t && info )
372 	{
373 		m_events = std::move( info );
374 	}
375 
376 std::size_t
query_subscriptions_count() const377 storage_t::query_subscriptions_count() const
378 	{
379 		return m_events.size();
380 	}
381 
382 } /* namespace vector_based_subscr_storage */
383 
384 } /* namespace impl */
385 
386 SO_5_FUNC subscription_storage_factory_t
vector_based_subscription_storage_factory(std::size_t initial_capacity)387 vector_based_subscription_storage_factory(
388 	std::size_t initial_capacity )
389 	{
390 		return [initial_capacity]( agent_t * owner ) {
391 			return impl::subscription_storage_unique_ptr_t(
392 					new impl::vector_based_subscr_storage::storage_t(
393 							owner,
394 							initial_capacity ) );
395 		};
396 	}
397 
398 } /* namespace so_5 */
399 
400 
401