1 /*!
2  * \file
3  * \brief Implementation of mbox which holds last sent message.
4  *
5  * \since
6  * v.1.0.3
7  */
8 
9 #pragma once
10 
11 #include <so_5_extra/error_ranges.hpp>
12 
13 #include <so_5/impl/agent_ptr_compare.hpp>
14 #include <so_5/impl/message_limit_internals.hpp>
15 #include <so_5/impl/msg_tracing_helpers.hpp>
16 
17 #include <so_5/details/sync_helpers.hpp>
18 
19 #include <so_5/mbox.hpp>
20 
21 #include <memory>
22 
23 namespace so_5 {
24 
25 namespace extra {
26 
27 namespace mboxes {
28 
29 namespace retained_msg {
30 
31 namespace errors {
32 
33 /*!
34  * \brief An attempt perform service request via retained message mbox.
35  *
36  * \since
37  * v.1.0.3
38  */
39 const int rc_service_request_via_retained_msg_mbox =
40 		so_5::extra::errors::retained_msg_mbox_errors;
41 
42 
43 } /* namespace errors */
44 
45 namespace details {
46 
47 /*!
48  * \brief A helper type which is a collection of type parameters.
49  *
50  * This type is used to simplify code of last_msg_mbox internals.
51  * Instead of writting something like:
52  * \code
53  * template< typename Traits >
54  * class ... {...};
55  *
56  * template< typename Traits, typename Lock_Type >
57  * class ... {...};
58  * \endcode
59  * this config_type allows to write like that:
60  * \code
61  * template< typename Config_Type >
62  * class ... {...};
63  *
64  * template< typename Config_Type >
65  * class ... {...};
66  * \endcode
67  *
68  * \tparam Traits traits type to be used.
69  *
70  * \tparam Lock_Type type of object to be used for thread-safety (like
71  * std::mutex or so_5::null_mutex_t).
72  *
73  * \since
74  * v.1.0.3
75  */
76 template<
77 	typename Traits,
78 	typename Lock_Type >
79 struct config_type
80 	{
81 		using traits_type = Traits;
82 		using lock_type = Lock_Type;
83 	};
84 
85 /*!
86  * \name Type extractors for config_type
87  * \{
88  */
89 template< typename Config_Type >
90 using traits_t = typename Config_Type::traits_type;
91 
92 template< typename Config_Type >
93 using lock_t = typename Config_Type::lock_type;
94 /*!
95  * \}
96  */
97 
98 /*!
99  * \brief An information block about one subscriber.
100  *
101  * \since
102  * v.1.0.3
103  */
104 class subscriber_info_t
105 {
106 	/*!
107 	 * \brief Current status of the subscriber.
108 	 */
109 	enum class state_t
110 	{
111 		nothing,
112 		only_subscriptions,
113 		only_filter,
114 		subscriptions_and_filter
115 	};
116 
117 	//! Optional message limit for that subscriber.
118 	const so_5::message_limit::control_block_t * m_limit;
119 
120 	/*!
121 	 * \brief Delivery filter for that message for that subscriber.
122 	 */
123 	const delivery_filter_t * m_filter;
124 
125 	/*!
126 	 * \brief Current state of the subscriber parameters.
127 	 */
128 	state_t m_state;
129 
130 public :
131 	//! Constructor for the case when subscriber info is being
132 	//! created during event subscription.
subscriber_info_t(const so_5::message_limit::control_block_t * limit)133 	subscriber_info_t(
134 		const so_5::message_limit::control_block_t * limit )
135 		:	m_limit( limit )
136 		,	m_filter( nullptr )
137 		,	m_state( state_t::only_subscriptions )
138 	{}
139 
140 	//! Constructor for the case when subscriber info is being
141 	//! created during event subscription.
subscriber_info_t(const delivery_filter_t * filter)142 	subscriber_info_t(
143 		const delivery_filter_t * filter )
144 		:	m_limit( nullptr )
145 		,	m_filter( filter )
146 		,	m_state( state_t::only_filter )
147 	{}
148 
149 	bool
empty() const150 	empty() const noexcept
151 	{
152 		return state_t::nothing == m_state;
153 	}
154 
155 	const message_limit::control_block_t *
limit() const156 	limit() const noexcept
157 	{
158 		return m_limit;
159 	}
160 
161 	//! Set the message limit for the subscriber.
162 	/*!
163 	 * Setting the message limit means that there are subscriptions
164 	 * for the agent.
165 	 *
166 	 * \note The message limit can be nullptr.
167 	 */
168 	void
set_limit(const message_limit::control_block_t * limit)169 	set_limit( const message_limit::control_block_t * limit ) noexcept
170 	{
171 		m_limit = limit;
172 
173 		m_state = ( state_t::nothing == m_state ?
174 				state_t::only_subscriptions :
175 				state_t::subscriptions_and_filter );
176 	}
177 
178 	//! Drop the message limit for the subscriber.
179 	/*!
180 	 * Dropping the message limit means that there is no more
181 	 * subscription for the agent.
182 	 */
183 	void
drop_limit()184 	drop_limit() noexcept
185 	{
186 		m_limit = nullptr;
187 
188 		m_state = ( state_t::only_subscriptions == m_state ?
189 				state_t::nothing : state_t::only_filter );
190 	}
191 
192 	//! Set the delivery filter for the subscriber.
193 	void
set_filter(const delivery_filter_t & filter)194 	set_filter( const delivery_filter_t & filter ) noexcept
195 	{
196 		m_filter = &filter;
197 
198 		m_state = ( state_t::nothing == m_state ?
199 				state_t::only_filter :
200 				state_t::subscriptions_and_filter );
201 	}
202 
203 	//! Drop the delivery filter for the subscriber.
204 	void
drop_filter()205 	drop_filter() noexcept
206 	{
207 		m_filter = nullptr;
208 
209 		m_state = ( state_t::only_filter == m_state ?
210 				state_t::nothing : state_t::only_subscriptions );
211 	}
212 
213 	//! Must a message be delivered to the subscriber?
214 	delivery_possibility_t
must_be_delivered(agent_t & subscriber,message_t & msg) const215 	must_be_delivered(
216 		agent_t & subscriber,
217 		message_t & msg ) const noexcept
218 	{
219 		// For the case when there are actual subscriptions.
220 		// We assume that will be in 99.9% cases.
221 		auto need_deliver = delivery_possibility_t::must_be_delivered;
222 
223 		if( state_t::only_filter == m_state )
224 			// Only filter, no actual subscriptions.
225 			// No message delivery for that case.
226 			need_deliver = delivery_possibility_t::no_subscription;
227 		else if( state_t::subscriptions_and_filter == m_state )
228 			// Delivery must be checked by delivery filter.
229 			need_deliver = m_filter->check( subscriber, msg ) ?
230 					delivery_possibility_t::must_be_delivered :
231 					delivery_possibility_t::disabled_by_delivery_filter;
232 
233 		return need_deliver;
234 	}
235 };
236 
237 //
238 // messages_table_item_t
239 //
240 /*!
241  * \brief A type of item of message table for retained message mbox.
242  *
243  * For each message type is necessary to store:
244  * - a list of subscriber for that message;
245  * - the last message sent.
246  *
247  * This type is intended to be used as a container for such data.
248  *
249  * \since
250  * v.1.0.3
251  */
252 struct messages_table_item_t
253 	{
254 		//! A special coparator for agents with respect to
255 		//! agent's priority.
256 		struct agent_ptr_comparator_t
257 			{
operator ()so_5::extra::mboxes::retained_msg::details::messages_table_item_t::agent_ptr_comparator_t258 				bool operator()( agent_t * a, agent_t * b ) const noexcept
259 				{
260 					return ::so_5::impl::special_agent_ptr_compare( *a, *b );
261 				}
262 			};
263 
264 		//! Type of subscribers map.
265 		using subscribers_map_t =
266 				std::map< agent_t *, subscriber_info_t, agent_ptr_comparator_t >;
267 
268 		//! Subscribers.
269 		/*!
270 		 * Can be empty. This is for case when the first message was sent
271 		 * when there is no subscribers yet.
272 		 */
273 		subscribers_map_t m_subscribers;
274 
275 		//! Retained message.
276 		/*!
277 		 * Can be nullptr. It means that there is no any attempts to send
278 		 * a message of this type.
279 		 */
280 		message_ref_t m_retained_msg;
281 	};
282 
283 //
284 // template_independent_mbox_data_t
285 //
286 /*!
287  * \brief A mixin with actual data which is necessary for implementation
288  * of retained mbox.
289  *
290  * This data type doesn't depend on any template parameters.
291  *
292  * \since
293  * v.1.0.3
294  */
295 struct template_independent_mbox_data_t
296 	{
297 		//! SObjectizer Environment to work in.
298 		environment_t & m_env;
299 
300 		//! ID of the mbox.
301 		const mbox_id_t m_id;
302 
303 		//! Type of messages table.
304 		using messages_table_t =
305 				std::map< std::type_index, messages_table_item_t >;
306 
307 		//! Table of current subscriptions and messages.
308 		messages_table_t m_messages_table;
309 
template_independent_mbox_data_tso_5::extra::mboxes::retained_msg::details::template_independent_mbox_data_t310 		template_independent_mbox_data_t(
311 			environment_t & env,
312 			mbox_id_t id )
313 			:	m_env{ env }
314 			,	m_id{id}
315 		{}
316 	};
317 
318 //
319 // actual_mbox_t
320 //
321 
322 /*!
323  * \brief An actual implementation of retained message mbox.
324  *
325  * \tparam Config type with main definitions for this message box type.
326  *
327  * \tparam Tracing_Base base class with implementation of message
328  * delivery tracing methods.
329  *
330  * \since
331  * v.1.0.3
332  */
333 template<
334 	typename Config,
335 	typename Tracing_Base >
336 class actual_mbox_t final
337 	:	public abstract_message_box_t
338 	,	private Tracing_Base
339 	{
340 	public:
341 		/*!
342 		 * \brief Initializing constructor.
343 		 *
344 		 * \tparam Tracing_Args parameters for Tracing_Base constructor
345 		 * (can be empty list if Tracing_Base have only the default constructor).
346 		 */
347 		template< typename... Tracing_Args >
actual_mbox_t(environment_t & env,mbox_id_t id,Tracing_Args &&...args)348 		actual_mbox_t(
349 			//! SObjectizer Environment to work in.
350 			environment_t & env,
351 			//! ID of this mbox.
352 			mbox_id_t id,
353 			//! Optional parameters for Tracing_Base's constructor.
354 			Tracing_Args &&... args )
355 			:	Tracing_Base{ std::forward< Tracing_Args >(args)... }
356 			,	m_data{ env, id }
357 			{}
358 
359 		mbox_id_t
id() const360 		id() const override
361 			{
362 				return this->m_data.m_id;
363 			}
364 
365 		void
subscribe_event_handler(const std::type_index & msg_type,const so_5::message_limit::control_block_t * limit,agent_t & subscriber)366 		subscribe_event_handler(
367 			const std::type_index & msg_type,
368 			const so_5::message_limit::control_block_t * limit,
369 			agent_t & subscriber ) override
370 			{
371 				insert_or_modify_subscriber(
372 						msg_type,
373 						subscriber,
374 						[&] {
375 							return subscriber_info_t{ limit };
376 						},
377 						[&]( subscriber_info_t & info ) {
378 							info.set_limit( limit );
379 						} );
380 			}
381 
382 		void
unsubscribe_event_handlers(const std::type_index & msg_type,agent_t & subscriber)383 		unsubscribe_event_handlers(
384 			const std::type_index & msg_type,
385 			agent_t & subscriber ) override
386 			{
387 				modify_and_remove_subscriber_if_needed(
388 						msg_type,
389 						subscriber,
390 						[]( subscriber_info_t & info ) {
391 							info.drop_limit();
392 						} );
393 			}
394 
395 		std::string
query_name() const396 		query_name() const override
397 			{
398 				std::ostringstream s;
399 				s << "<mbox:type=RETAINED_MPMC:id=" << this->m_data.m_id << ">";
400 
401 				return s.str();
402 			}
403 
404 		mbox_type_t
type() const405 		type() const override
406 			{
407 				return mbox_type_t::multi_producer_multi_consumer;
408 			}
409 
410 		void
do_deliver_message(const std::type_index & msg_type,const message_ref_t & message,unsigned int overlimit_reaction_deep)411 		do_deliver_message(
412 			const std::type_index & msg_type,
413 			const message_ref_t & message,
414 			unsigned int overlimit_reaction_deep ) override
415 			{
416 				typename Tracing_Base::deliver_op_tracer tracer{
417 						*this, // as Tracing_base
418 						*this, // as abstract_message_box_t
419 						"deliver_message",
420 						msg_type, message, overlimit_reaction_deep };
421 
422 				ensure_immutable_message( msg_type, message );
423 
424 				do_deliver_message_impl(
425 						tracer,
426 						msg_type,
427 						message,
428 						overlimit_reaction_deep );
429 			}
430 
431 		void
set_delivery_filter(const std::type_index & msg_type,const delivery_filter_t & filter,agent_t & subscriber)432 		set_delivery_filter(
433 			const std::type_index & msg_type,
434 			const delivery_filter_t & filter,
435 			agent_t & subscriber ) override
436 			{
437 				insert_or_modify_subscriber(
438 						msg_type,
439 						subscriber,
440 						[&] {
441 							return subscriber_info_t{ &filter };
442 						},
443 						[&]( subscriber_info_t & info ) {
444 							info.set_filter( filter );
445 						} );
446 			}
447 
448 		void
drop_delivery_filter(const std::type_index & msg_type,agent_t & subscriber)449 		drop_delivery_filter(
450 			const std::type_index & msg_type,
451 			agent_t & subscriber ) noexcept override
452 			{
453 				modify_and_remove_subscriber_if_needed(
454 						msg_type,
455 						subscriber,
456 						[]( subscriber_info_t & info ) {
457 							info.drop_filter();
458 						} );
459 			}
460 
461 		so_5::environment_t &
environment() const462 		environment() const noexcept override
463 			{
464 				return this->m_data.m_env;
465 			}
466 
467 	private :
468 		//! Data of this message mbox.
469 		template_independent_mbox_data_t m_data;
470 
471 		//! Object lock.
472 		lock_t<Config> m_lock;
473 
474 		template< typename Info_Maker, typename Info_Changer >
475 		void
insert_or_modify_subscriber(const std::type_index & msg_type,agent_t & subscriber,Info_Maker maker,Info_Changer changer)476 		insert_or_modify_subscriber(
477 			const std::type_index & msg_type,
478 			agent_t & subscriber,
479 			Info_Maker maker,
480 			Info_Changer changer )
481 			{
482 				std::lock_guard< lock_t<Config> > lock( m_lock );
483 
484 				// If there is no item for this message type it will be
485 				// created automatically.
486 				auto & table_item = this->m_data.m_messages_table[ msg_type ];
487 
488 				auto it_subscriber = table_item.m_subscribers.find( &subscriber );
489 				if( it_subscriber == table_item.m_subscribers.end() )
490 					// There is no subscriber yet. It must be added.
491 					it_subscriber = table_item.m_subscribers.emplace(
492 							&subscriber, maker() ).first;
493 				else
494 					// Subscriber is known. It must be updated.
495 					changer( it_subscriber->second );
496 
497 				// If there is a retained message then delivery attempt
498 				// must be performed.
499 				// NOTE: an exception at this stage doesn't remove new subscription.
500 				if( table_item.m_retained_msg )
501 					try_deliver_retained_message_to(
502 							msg_type,
503 							table_item.m_retained_msg,
504 							subscriber,
505 							it_subscriber->second );
506 			}
507 
508 		template< typename Info_Changer >
509 		void
modify_and_remove_subscriber_if_needed(const std::type_index & msg_type,agent_t & subscriber,Info_Changer changer)510 		modify_and_remove_subscriber_if_needed(
511 			const std::type_index & msg_type,
512 			agent_t & subscriber,
513 			Info_Changer changer )
514 			{
515 				std::lock_guard< lock_t<Config> > lock( m_lock );
516 
517 				auto it_table_item = this->m_data.m_messages_table.find( msg_type );
518 				if( it_table_item != this->m_data.m_messages_table.end() )
519 					{
520 						auto & table_item = it_table_item->second;
521 
522 						auto it_subscriber = table_item.m_subscribers.find(
523 								&subscriber );
524 						if( it_subscriber != table_item.m_subscribers.end() )
525 							{
526 								// Subscriber is found and must be modified.
527 								changer( it_subscriber->second );
528 
529 								// If info about subscriber becomes empty after
530 								// modification then subscriber info must be removed.
531 								if( it_subscriber->second.empty() )
532 									table_item.m_subscribers.erase( it_subscriber );
533 							}
534 					}
535 			}
536 
537 		void
do_deliver_message_impl(typename Tracing_Base::deliver_op_tracer const & tracer,const std::type_index & msg_type,const message_ref_t & message,unsigned int overlimit_reaction_deep)538 		do_deliver_message_impl(
539 			typename Tracing_Base::deliver_op_tracer const & tracer,
540 			const std::type_index & msg_type,
541 			const message_ref_t & message,
542 			unsigned int overlimit_reaction_deep )
543 			{
544 				std::lock_guard< lock_t<Config> > lock( m_lock );
545 
546 				// If there is no item for this message type it will be
547 				// created automatically.
548 				auto & table_item = this->m_data.m_messages_table[ msg_type ];
549 
550 				// Message must be stored as retained.
551 				table_item.m_retained_msg = message;
552 
553 				auto & subscribers = table_item.m_subscribers;
554 				if( !subscribers.empty() )
555 					for( const auto & kv : subscribers )
556 						do_deliver_message_to_subscriber(
557 								*(kv.first),
558 								kv.second,
559 								tracer,
560 								msg_type,
561 								message,
562 								overlimit_reaction_deep );
563 				else
564 					tracer.no_subscribers();
565 			}
566 
567 		void
do_deliver_message_to_subscriber(agent_t & subscriber,const subscriber_info_t & subscriber_info,typename Tracing_Base::deliver_op_tracer const & tracer,const std::type_index & msg_type,const message_ref_t & message,unsigned int overlimit_reaction_deep) const568 		do_deliver_message_to_subscriber(
569 			agent_t & subscriber,
570 			const subscriber_info_t & subscriber_info,
571 			typename Tracing_Base::deliver_op_tracer const & tracer,
572 			const std::type_index & msg_type,
573 			const message_ref_t & message,
574 			unsigned int overlimit_reaction_deep ) const
575 			{
576 				const auto delivery_status =
577 						subscriber_info.must_be_delivered(
578 								subscriber,
579 								*(message.get()) );
580 
581 				if( delivery_possibility_t::must_be_delivered == delivery_status )
582 					{
583 						using namespace so_5::message_limit::impl;
584 
585 						try_to_deliver_to_agent(
586 								this->m_data.m_id,
587 								subscriber,
588 								subscriber_info.limit(),
589 								msg_type,
590 								message,
591 								overlimit_reaction_deep,
592 								tracer.overlimit_tracer(),
593 								[&] {
594 									tracer.push_to_queue( std::addressof(subscriber) );
595 
596 									agent_t::call_push_event(
597 											subscriber,
598 											subscriber_info.limit(),
599 											this->m_data.m_id,
600 											msg_type,
601 											message );
602 								} );
603 					}
604 				else
605 					tracer.message_rejected(
606 							std::addressof(subscriber), delivery_status );
607 			}
608 
609 		/*!
610 		 * \brief An attempt to deliver retained message to the new subscriber.
611 		 *
612 		 * This attempt will be performed only if there is the retained message.
613 		 */
614 		void
try_deliver_retained_message_to(const std::type_index & msg_type,const message_ref_t & retained_msg,agent_t & subscriber,const subscriber_info_t & subscriber_info)615 		try_deliver_retained_message_to(
616 			const std::type_index & msg_type,
617 			const message_ref_t & retained_msg,
618 			agent_t & subscriber,
619 			const subscriber_info_t & subscriber_info )
620 			{
621 				if( retained_msg )
622 					{
623 						const unsigned int overlimit_reaction_deep = 0;
624 
625 						typename Tracing_Base::deliver_op_tracer tracer{
626 								*this, // as Tracing_base
627 								*this, // as abstract_message_box_t
628 								"deliver_message_on_subscription",
629 								msg_type,
630 								retained_msg,
631 								overlimit_reaction_deep };
632 
633 						do_deliver_message_to_subscriber(
634 								subscriber,
635 								subscriber_info,
636 								tracer,
637 								msg_type,
638 								retained_msg,
639 								overlimit_reaction_deep );
640 					}
641 			}
642 
643 		/*!
644 		 * \brief Ensures that message is an immutable message.
645 		 *
646 		 * Checks mutability flag and throws an exception if message is
647 		 * a mutable one.
648 		 */
649 		void
ensure_immutable_message(const std::type_index & msg_type,const message_ref_t & what) const650 		ensure_immutable_message(
651 			const std::type_index & msg_type,
652 			const message_ref_t & what ) const
653 			{
654 				if( message_mutability_t::immutable_message !=
655 						message_mutability( what ) )
656 					SO_5_THROW_EXCEPTION(
657 							so_5::rc_mutable_msg_cannot_be_delivered_via_mpmc_mbox,
658 							"an attempt to deliver mutable message via MPMC mbox"
659 							", msg_type=" + std::string(msg_type.name()) );
660 			}
661 	};
662 
663 } /* namespace details */
664 
665 //
666 //
667 // default_traits_t
668 //
669 /*!
670  * \brief Default traits for retained message mbox.
671  */
672 struct default_traits_t {};
673 
674 //
675 // make_mbox
676 //
677 /*!
678  * \brief Create an instance of retained message mbox.
679  *
680  * Simple usage example:
681  * \code
682  * so_5::environment_t & env = ...;
683  * const so_5::mbox_t retained_mbox = so_5::extra::mboxes::retained_msg::make_mbox<>(env);
684  * so_5::send<Some_Message>(retained_mbox, ...);
685  * \endcode
686  * An instance of default implementation retained message mbox will be created.
687  * This instance will be protected by std::mutex.
688  *
689  * If you want to use retained_mbox in a single-threaded environment
690  * without a multithreaded protection then so_5::null_mutex_t (or any
691  * similar null-mutex implementation) can be used:
692  * \code
693  * so_5::environment_t & env = ...
694  * const so_5::mbox_t retained_mbox =
695  * 		so_5::extra::mboxes::retained_msg::make_mbox<
696  * 				so_5::extra::mboxes::retained_msg::default_traits_t,
697  * 				so_5::null_mutex_t>(env);
698  * so_5::send<Some_Message>(retained_mbox, ...);
699  * \endcode
700  *
701  * If you want to use your own mutex-like object (with interface which
702  * allows to use your mutex-like class with std::lock_guard) then you can
703  * do it similar way:
704  * \code
705  * so_5::environment_t & env = ...
706  * const so_5::mbox_t retained_mbox =
707  * 		so_5::extra::mboxes::retained_msg::make_mbox<
708  * 				so_5::extra::mboxes::retained_msg::default_traits_t,
709  * 				Your_Own_Mutex_Class>(env);
710  * so_5::send<Some_Message>(retained_mbox, ...);
711  * \endcode
712  *
713  * \tparam Traits type with traits of mbox implementation.
714  *
715  * \tparam Lock_Type a type of mutex to be used for protection of
716  * retained message mbox content. This must be a DefaultConstructible
717  * type with interface which allows to use Lock_Type with std::lock_guard.
718  *
719  * \since
720  * v.1.0.3
721  */
722 template<
723 	typename Traits = default_traits_t,
724 	typename Lock_Type = std::mutex >
725 mbox_t
make_mbox(environment_t & env)726 make_mbox( environment_t & env )
727 	{
728 		using config_type = details::config_type< Traits, Lock_Type >;
729 
730 		return env.make_custom_mbox(
731 				[]( const mbox_creation_data_t & data )
732 				{
733 					mbox_t result;
734 
735 					if( data.m_tracer.get().is_msg_tracing_enabled() )
736 						{
737 							using T = details::actual_mbox_t<
738 									config_type,
739 									::so_5::impl::msg_tracing_helpers::tracing_enabled_base >;
740 
741 							result = mbox_t{ new T{
742 									data.m_env.get(), data.m_id, data.m_tracer.get()
743 								}
744 							};
745 						}
746 					else
747 						{
748 							using T = details::actual_mbox_t<
749 									config_type,
750 									::so_5::impl::msg_tracing_helpers::tracing_disabled_base >;
751 							result = mbox_t{ new T{ data.m_env.get(), data.m_id } };
752 						}
753 
754 					return result;
755 				} );
756 	}
757 
758 } /* namespace retained_msg */
759 
760 } /* namespace mboxes */
761 
762 } /* namespace extra */
763 
764 } /* namespace so_5 */
765 
766