1 /*
2 * SObjectizer-5
3 */
4
5 /*!
6 * \since
7 * v.5.5.3
8 *
9 * \file
10 * \brief A vector-based storage for agent's subscriptions information.
11 */
12
13 #include <so_5/impl/subscription_storage_iface.hpp>
14
15 #include <algorithm>
16 #include <vector>
17 #include <iterator>
18
19 #include <so_5/details/rollback_on_exception.hpp>
20
21 namespace so_5
22 {
23
24 namespace impl
25 {
26
27 /*!
28 * \since
29 * v.5.5.3
30 *
31 * \brief A vector-based storage for agent's subscriptions information.
32 */
33 namespace vector_based_subscr_storage
34 {
35
36 /*!
37 * \since
38 * v.5.5.3
39 *
40 * \brief A vector-based storage for agent's subscriptions information.
41 *
42 * This is very simple implementation of subscription storage which
43 * uses std::vector for storing information.
44 *
45 * All manipulation is performed by very simple linear search inside
46 * that vector. For agents with few subscriptions this will be the most
47 * efficient approach.
48 */
49 class storage_t : public subscription_storage_t
50 {
51 public :
52 storage_t(
53 agent_t * owner,
54 std::size_t initial_capacity );
55 ~storage_t() override;
56
57 virtual void
58 create_event_subscription(
59 const mbox_t & mbox_ref,
60 const std::type_index & type_index,
61 const message_limit::control_block_t * limit,
62 const state_t & target_state,
63 const event_handler_method_t & method,
64 thread_safety_t thread_safety,
65 event_handler_kind_t handler_kind ) override;
66
67 virtual void
68 drop_subscription(
69 const mbox_t & mbox,
70 const std::type_index & msg_type,
71 const state_t & target_state ) override;
72
73 void
74 drop_subscription_for_all_states(
75 const mbox_t & mbox,
76 const std::type_index & msg_type ) override;
77
78 const event_handler_data_t *
79 find_handler(
80 mbox_id_t mbox_id,
81 const std::type_index & msg_type,
82 const state_t & current_state ) const noexcept override;
83
84 void
85 debug_dump( std::ostream & to ) const override;
86
87 void
88 drop_content() override;
89
90 subscription_storage_common::subscr_info_vector_t
91 query_content() const override;
92
93 void
94 setup_content(
95 subscription_storage_common::subscr_info_vector_t && info ) override;
96
97 std::size_t
98 query_subscriptions_count() const override;
99
100 private :
101 using info_t = subscription_storage_common::subscr_info_t;
102 using subscr_info_vector_t =
103 subscription_storage_common::subscr_info_vector_t;
104
105 //! A helper predicate for searching the same
106 //! mbox and message type pairs.
107 struct is_same_mbox_msg
108 {
109 const mbox_id_t m_id;
110 const std::type_index & m_type;
111
112 bool
operator ()so_5::impl::vector_based_subscr_storage::storage_t::is_same_mbox_msg113 operator()( const info_t & info ) const
114 {
115 return m_id == info.m_mbox->id() &&
116 m_type == info.m_msg_type;
117 }
118 };
119
120 //! Subscription information.
121 subscr_info_vector_t m_events;
122
123 void
124 destroy_all_subscriptions();
125 };
126
127 namespace
128 {
129 template< class Container >
130 auto
find(Container & c,const mbox_id_t & mbox_id,const std::type_index & msg_type,const state_t & target_state)131 find( Container & c,
132 const mbox_id_t & mbox_id,
133 const std::type_index & msg_type,
134 const state_t & target_state ) -> decltype( c.begin() )
135 {
136 using namespace std;
137
138 return find_if( begin( c ), end( c ),
139 [&]( typename Container::value_type const & o ) {
140 return ( o.m_mbox->id() == mbox_id &&
141 o.m_msg_type == msg_type &&
142 o.m_state == &target_state );
143 } );
144 }
145
146 } /* namespace anonymous */
147
storage_t(agent_t * owner,std::size_t initial_capacity)148 storage_t::storage_t(
149 agent_t * owner,
150 std::size_t initial_capacity )
151 : subscription_storage_t( owner )
152 {
153 m_events.reserve( initial_capacity );
154 }
155
~storage_t()156 storage_t::~storage_t()
157 {
158 destroy_all_subscriptions();
159 }
160
161 void
create_event_subscription(const mbox_t & mbox,const std::type_index & msg_type,const message_limit::control_block_t * limit,const state_t & target_state,const event_handler_method_t & method,thread_safety_t thread_safety,event_handler_kind_t handler_kind)162 storage_t::create_event_subscription(
163 const mbox_t & mbox,
164 const std::type_index & msg_type,
165 const message_limit::control_block_t * limit,
166 const state_t & target_state,
167 const event_handler_method_t & method,
168 thread_safety_t thread_safety,
169 event_handler_kind_t handler_kind )
170 {
171 using namespace std;
172 using namespace subscription_storage_common;
173
174 const auto mbox_id = mbox->id();
175
176 // Check that this subscription is new.
177 auto existed_position = find(
178 m_events, mbox_id, msg_type, target_state );
179
180 if( existed_position != m_events.end() )
181 SO_5_THROW_EXCEPTION(
182 rc_evt_handler_already_provided,
183 "agent is already subscribed to message, " +
184 make_subscription_description( mbox, msg_type, target_state ) );
185
186 // Just add subscription to the end.
187 m_events.emplace_back(
188 mbox, msg_type, target_state, method, thread_safety, handler_kind );
189
190 // Note: since v.5.5.9 mbox subscription is initiated even if
191 // it is MPSC mboxes. It is important for the case of message
192 // delivery tracing.
193
194 // If there is no subscription for that mbox it must be created.
195 // Last item in m_events should not be checked becase it is
196 // description of the just added subscription.
197 auto last_to_check = --end( m_events );
198 if( last_to_check == find_if(
199 begin( m_events ), last_to_check,
200 is_same_mbox_msg{ mbox_id, msg_type } ) )
201 {
202 // Mbox must create subscription.
203 so_5::details::do_with_rollback_on_exception(
204 [&] {
205 mbox->subscribe_event_handler(
206 msg_type,
207 limit,
208 *owner() );
209 },
210 [&] {
211 m_events.pop_back();
212 } );
213 }
214 }
215
216 void
drop_subscription(const mbox_t & mbox,const std::type_index & msg_type,const state_t & target_state)217 storage_t::drop_subscription(
218 const mbox_t & mbox,
219 const std::type_index & msg_type,
220 const state_t & target_state )
221 {
222 using namespace std;
223
224 const auto mbox_id = mbox->id();
225
226 auto existed_position = find(
227 m_events, mbox_id, msg_type, target_state );
228 if( existed_position != m_events.end() )
229 {
230 m_events.erase( existed_position );
231
232 // Note v.5.5.9 unsubscribe_event_handlers is called for
233 // mbox even if it is MPSC mbox. It is necessary for the case
234 // of message delivery tracing.
235
236 // If there is no more subscriptions to that mbox then
237 // the mbox must remove information about that agent.
238 if( end( m_events ) == find_if(
239 begin( m_events ), end( m_events ),
240 is_same_mbox_msg{ mbox_id, msg_type } ) )
241 {
242 // If we are here then there is no more references
243 // to the mbox. And mbox must not hold reference
244 // to the agent.
245 mbox->unsubscribe_event_handlers( msg_type, *owner() );
246 }
247 }
248 }
249
250 void
drop_subscription_for_all_states(const mbox_t & mbox,const std::type_index & msg_type)251 storage_t::drop_subscription_for_all_states(
252 const mbox_t & mbox,
253 const std::type_index & msg_type )
254 {
255 using namespace std;
256
257 const auto mbox_id = mbox->id();
258
259 const auto old_size = m_events.size();
260
261 m_events.erase(
262 remove_if( begin( m_events ), end( m_events ),
263 [mbox_id, &msg_type]( const info_t & i ) {
264 return i.m_mbox->id() == mbox_id &&
265 i.m_msg_type == msg_type;
266 } ),
267 end( m_events ) );
268
269 // Note: since v.5.5.9 mbox unsubscription is initiated even if
270 // it is MPSC mboxes. It is important for the case of message
271 // delivery tracing.
272
273 if( old_size != m_events.size() )
274 mbox->unsubscribe_event_handlers( msg_type, *owner() );
275 }
276
277 const event_handler_data_t *
find_handler(mbox_id_t mbox_id,const std::type_index & msg_type,const state_t & current_state) const278 storage_t::find_handler(
279 mbox_id_t mbox_id,
280 const std::type_index & msg_type,
281 const state_t & current_state ) const noexcept
282 {
283 auto it = find( m_events, mbox_id, msg_type, current_state );
284
285 if( it != std::end( m_events ) )
286 return &(it->m_handler);
287 else
288 return nullptr;
289 }
290
291 void
debug_dump(std::ostream & to) const292 storage_t::debug_dump( std::ostream & to ) const
293 {
294 for( const auto & e : m_events )
295 to << "{" << e.m_mbox->id() << ", "
296 << e.m_msg_type.name() << ", "
297 << e.m_state->query_name() << "}"
298 << std::endl;
299 }
300
301 void
destroy_all_subscriptions()302 storage_t::destroy_all_subscriptions()
303 {
304 if( m_events.empty() )
305 // Nothing to do at empty subscription list.
306 return;
307
308 using namespace std;
309
310 struct mbox_msg_type_pair_t
311 {
312 abstract_message_box_t * m_mbox;
313 const type_index * m_msg_type;
314
315 bool
316 operator<( const mbox_msg_type_pair_t & o ) const
317 {
318 return m_mbox < o.m_mbox ||
319 ( m_mbox == o.m_mbox &&
320 (*m_msg_type) < (*o.m_msg_type) );
321 }
322
323 bool
324 operator==( const mbox_msg_type_pair_t & o ) const
325 {
326 return m_mbox == o.m_mbox &&
327 (*m_msg_type) == (*o.m_msg_type);
328 }
329 };
330
331 // First step: collect all pointers to mbox-es.
332 vector< mbox_msg_type_pair_t > mboxes;
333 mboxes.reserve( m_events.size() );
334
335 transform(
336 begin( m_events ), end( m_events ),
337 back_inserter( mboxes ),
338 []( info_t & i ) {
339 return mbox_msg_type_pair_t{ i.m_mbox.get(), &i.m_msg_type };
340 } );
341
342 // Second step: remove duplicates.
343 sort( begin( mboxes ), end( mboxes ) );
344 mboxes.erase(
345 unique( begin( mboxes ), end( mboxes ) ),
346 end( mboxes ) );
347
348 // Third step: destroy subscription in mboxes.
349 for( auto m : mboxes )
350 m.m_mbox->unsubscribe_event_handlers( *m.m_msg_type, *owner() );
351
352 // Fourth step: cleanup subscription vector.
353 drop_content();
354 }
355
356 void
drop_content()357 storage_t::drop_content()
358 {
359 subscr_info_vector_t empty_events;
360 m_events.swap( empty_events );
361 }
362
363 subscription_storage_common::subscr_info_vector_t
query_content() const364 storage_t::query_content() const
365 {
366 return m_events;
367 }
368
369 void
setup_content(subscription_storage_common::subscr_info_vector_t && info)370 storage_t::setup_content(
371 subscription_storage_common::subscr_info_vector_t && info )
372 {
373 m_events = std::move( info );
374 }
375
376 std::size_t
query_subscriptions_count() const377 storage_t::query_subscriptions_count() const
378 {
379 return m_events.size();
380 }
381
382 } /* namespace vector_based_subscr_storage */
383
384 } /* namespace impl */
385
386 SO_5_FUNC subscription_storage_factory_t
vector_based_subscription_storage_factory(std::size_t initial_capacity)387 vector_based_subscription_storage_factory(
388 std::size_t initial_capacity )
389 {
390 return [initial_capacity]( agent_t * owner ) {
391 return impl::subscription_storage_unique_ptr_t(
392 new impl::vector_based_subscr_storage::storage_t(
393 owner,
394 initial_capacity ) );
395 };
396 }
397
398 } /* namespace so_5 */
399
400
401