1 /*!
2 * \file
3 * \brief Implementation of mbox which holds last sent message.
4 *
5 * \since
6 * v.1.0.3
7 */
8
9 #pragma once
10
11 #include <so_5_extra/error_ranges.hpp>
12
13 #include <so_5/impl/agent_ptr_compare.hpp>
14 #include <so_5/impl/message_limit_internals.hpp>
15 #include <so_5/impl/msg_tracing_helpers.hpp>
16
17 #include <so_5/details/sync_helpers.hpp>
18
19 #include <so_5/mbox.hpp>
20
21 #include <memory>
22
23 namespace so_5 {
24
25 namespace extra {
26
27 namespace mboxes {
28
29 namespace retained_msg {
30
31 namespace errors {
32
33 /*!
34 * \brief An attempt perform service request via retained message mbox.
35 *
36 * \since
37 * v.1.0.3
38 */
39 const int rc_service_request_via_retained_msg_mbox =
40 so_5::extra::errors::retained_msg_mbox_errors;
41
42
43 } /* namespace errors */
44
45 namespace details {
46
47 /*!
48 * \brief A helper type which is a collection of type parameters.
49 *
50 * This type is used to simplify code of last_msg_mbox internals.
51 * Instead of writting something like:
52 * \code
53 * template< typename Traits >
54 * class ... {...};
55 *
56 * template< typename Traits, typename Lock_Type >
57 * class ... {...};
58 * \endcode
59 * this config_type allows to write like that:
60 * \code
61 * template< typename Config_Type >
62 * class ... {...};
63 *
64 * template< typename Config_Type >
65 * class ... {...};
66 * \endcode
67 *
68 * \tparam Traits traits type to be used.
69 *
70 * \tparam Lock_Type type of object to be used for thread-safety (like
71 * std::mutex or so_5::null_mutex_t).
72 *
73 * \since
74 * v.1.0.3
75 */
76 template<
77 typename Traits,
78 typename Lock_Type >
79 struct config_type
80 {
81 using traits_type = Traits;
82 using lock_type = Lock_Type;
83 };
84
85 /*!
86 * \name Type extractors for config_type
87 * \{
88 */
89 template< typename Config_Type >
90 using traits_t = typename Config_Type::traits_type;
91
92 template< typename Config_Type >
93 using lock_t = typename Config_Type::lock_type;
94 /*!
95 * \}
96 */
97
98 /*!
99 * \brief An information block about one subscriber.
100 *
101 * \since
102 * v.1.0.3
103 */
104 class subscriber_info_t
105 {
106 /*!
107 * \brief Current status of the subscriber.
108 */
109 enum class state_t
110 {
111 nothing,
112 only_subscriptions,
113 only_filter,
114 subscriptions_and_filter
115 };
116
117 //! Optional message limit for that subscriber.
118 const so_5::message_limit::control_block_t * m_limit;
119
120 /*!
121 * \brief Delivery filter for that message for that subscriber.
122 */
123 const delivery_filter_t * m_filter;
124
125 /*!
126 * \brief Current state of the subscriber parameters.
127 */
128 state_t m_state;
129
130 public :
131 //! Constructor for the case when subscriber info is being
132 //! created during event subscription.
subscriber_info_t(const so_5::message_limit::control_block_t * limit)133 subscriber_info_t(
134 const so_5::message_limit::control_block_t * limit )
135 : m_limit( limit )
136 , m_filter( nullptr )
137 , m_state( state_t::only_subscriptions )
138 {}
139
140 //! Constructor for the case when subscriber info is being
141 //! created during event subscription.
subscriber_info_t(const delivery_filter_t * filter)142 subscriber_info_t(
143 const delivery_filter_t * filter )
144 : m_limit( nullptr )
145 , m_filter( filter )
146 , m_state( state_t::only_filter )
147 {}
148
149 bool
empty() const150 empty() const noexcept
151 {
152 return state_t::nothing == m_state;
153 }
154
155 const message_limit::control_block_t *
limit() const156 limit() const noexcept
157 {
158 return m_limit;
159 }
160
161 //! Set the message limit for the subscriber.
162 /*!
163 * Setting the message limit means that there are subscriptions
164 * for the agent.
165 *
166 * \note The message limit can be nullptr.
167 */
168 void
set_limit(const message_limit::control_block_t * limit)169 set_limit( const message_limit::control_block_t * limit ) noexcept
170 {
171 m_limit = limit;
172
173 m_state = ( state_t::nothing == m_state ?
174 state_t::only_subscriptions :
175 state_t::subscriptions_and_filter );
176 }
177
178 //! Drop the message limit for the subscriber.
179 /*!
180 * Dropping the message limit means that there is no more
181 * subscription for the agent.
182 */
183 void
drop_limit()184 drop_limit() noexcept
185 {
186 m_limit = nullptr;
187
188 m_state = ( state_t::only_subscriptions == m_state ?
189 state_t::nothing : state_t::only_filter );
190 }
191
192 //! Set the delivery filter for the subscriber.
193 void
set_filter(const delivery_filter_t & filter)194 set_filter( const delivery_filter_t & filter ) noexcept
195 {
196 m_filter = &filter;
197
198 m_state = ( state_t::nothing == m_state ?
199 state_t::only_filter :
200 state_t::subscriptions_and_filter );
201 }
202
203 //! Drop the delivery filter for the subscriber.
204 void
drop_filter()205 drop_filter() noexcept
206 {
207 m_filter = nullptr;
208
209 m_state = ( state_t::only_filter == m_state ?
210 state_t::nothing : state_t::only_subscriptions );
211 }
212
213 //! Must a message be delivered to the subscriber?
214 delivery_possibility_t
must_be_delivered(agent_t & subscriber,message_t & msg) const215 must_be_delivered(
216 agent_t & subscriber,
217 message_t & msg ) const noexcept
218 {
219 // For the case when there are actual subscriptions.
220 // We assume that will be in 99.9% cases.
221 auto need_deliver = delivery_possibility_t::must_be_delivered;
222
223 if( state_t::only_filter == m_state )
224 // Only filter, no actual subscriptions.
225 // No message delivery for that case.
226 need_deliver = delivery_possibility_t::no_subscription;
227 else if( state_t::subscriptions_and_filter == m_state )
228 // Delivery must be checked by delivery filter.
229 need_deliver = m_filter->check( subscriber, msg ) ?
230 delivery_possibility_t::must_be_delivered :
231 delivery_possibility_t::disabled_by_delivery_filter;
232
233 return need_deliver;
234 }
235 };
236
237 //
238 // messages_table_item_t
239 //
240 /*!
241 * \brief A type of item of message table for retained message mbox.
242 *
243 * For each message type is necessary to store:
244 * - a list of subscriber for that message;
245 * - the last message sent.
246 *
247 * This type is intended to be used as a container for such data.
248 *
249 * \since
250 * v.1.0.3
251 */
252 struct messages_table_item_t
253 {
254 //! A special coparator for agents with respect to
255 //! agent's priority.
256 struct agent_ptr_comparator_t
257 {
operator ()so_5::extra::mboxes::retained_msg::details::messages_table_item_t::agent_ptr_comparator_t258 bool operator()( agent_t * a, agent_t * b ) const noexcept
259 {
260 return ::so_5::impl::special_agent_ptr_compare( *a, *b );
261 }
262 };
263
264 //! Type of subscribers map.
265 using subscribers_map_t =
266 std::map< agent_t *, subscriber_info_t, agent_ptr_comparator_t >;
267
268 //! Subscribers.
269 /*!
270 * Can be empty. This is for case when the first message was sent
271 * when there is no subscribers yet.
272 */
273 subscribers_map_t m_subscribers;
274
275 //! Retained message.
276 /*!
277 * Can be nullptr. It means that there is no any attempts to send
278 * a message of this type.
279 */
280 message_ref_t m_retained_msg;
281 };
282
283 //
284 // template_independent_mbox_data_t
285 //
286 /*!
287 * \brief A mixin with actual data which is necessary for implementation
288 * of retained mbox.
289 *
290 * This data type doesn't depend on any template parameters.
291 *
292 * \since
293 * v.1.0.3
294 */
295 struct template_independent_mbox_data_t
296 {
297 //! SObjectizer Environment to work in.
298 environment_t & m_env;
299
300 //! ID of the mbox.
301 const mbox_id_t m_id;
302
303 //! Type of messages table.
304 using messages_table_t =
305 std::map< std::type_index, messages_table_item_t >;
306
307 //! Table of current subscriptions and messages.
308 messages_table_t m_messages_table;
309
template_independent_mbox_data_tso_5::extra::mboxes::retained_msg::details::template_independent_mbox_data_t310 template_independent_mbox_data_t(
311 environment_t & env,
312 mbox_id_t id )
313 : m_env{ env }
314 , m_id{id}
315 {}
316 };
317
318 //
319 // actual_mbox_t
320 //
321
322 /*!
323 * \brief An actual implementation of retained message mbox.
324 *
325 * \tparam Config type with main definitions for this message box type.
326 *
327 * \tparam Tracing_Base base class with implementation of message
328 * delivery tracing methods.
329 *
330 * \since
331 * v.1.0.3
332 */
333 template<
334 typename Config,
335 typename Tracing_Base >
336 class actual_mbox_t final
337 : public abstract_message_box_t
338 , private Tracing_Base
339 {
340 public:
341 /*!
342 * \brief Initializing constructor.
343 *
344 * \tparam Tracing_Args parameters for Tracing_Base constructor
345 * (can be empty list if Tracing_Base have only the default constructor).
346 */
347 template< typename... Tracing_Args >
actual_mbox_t(environment_t & env,mbox_id_t id,Tracing_Args &&...args)348 actual_mbox_t(
349 //! SObjectizer Environment to work in.
350 environment_t & env,
351 //! ID of this mbox.
352 mbox_id_t id,
353 //! Optional parameters for Tracing_Base's constructor.
354 Tracing_Args &&... args )
355 : Tracing_Base{ std::forward< Tracing_Args >(args)... }
356 , m_data{ env, id }
357 {}
358
359 mbox_id_t
id() const360 id() const override
361 {
362 return this->m_data.m_id;
363 }
364
365 void
subscribe_event_handler(const std::type_index & msg_type,const so_5::message_limit::control_block_t * limit,agent_t & subscriber)366 subscribe_event_handler(
367 const std::type_index & msg_type,
368 const so_5::message_limit::control_block_t * limit,
369 agent_t & subscriber ) override
370 {
371 insert_or_modify_subscriber(
372 msg_type,
373 subscriber,
374 [&] {
375 return subscriber_info_t{ limit };
376 },
377 [&]( subscriber_info_t & info ) {
378 info.set_limit( limit );
379 } );
380 }
381
382 void
unsubscribe_event_handlers(const std::type_index & msg_type,agent_t & subscriber)383 unsubscribe_event_handlers(
384 const std::type_index & msg_type,
385 agent_t & subscriber ) override
386 {
387 modify_and_remove_subscriber_if_needed(
388 msg_type,
389 subscriber,
390 []( subscriber_info_t & info ) {
391 info.drop_limit();
392 } );
393 }
394
395 std::string
query_name() const396 query_name() const override
397 {
398 std::ostringstream s;
399 s << "<mbox:type=RETAINED_MPMC:id=" << this->m_data.m_id << ">";
400
401 return s.str();
402 }
403
404 mbox_type_t
type() const405 type() const override
406 {
407 return mbox_type_t::multi_producer_multi_consumer;
408 }
409
410 void
do_deliver_message(const std::type_index & msg_type,const message_ref_t & message,unsigned int overlimit_reaction_deep)411 do_deliver_message(
412 const std::type_index & msg_type,
413 const message_ref_t & message,
414 unsigned int overlimit_reaction_deep ) override
415 {
416 typename Tracing_Base::deliver_op_tracer tracer{
417 *this, // as Tracing_base
418 *this, // as abstract_message_box_t
419 "deliver_message",
420 msg_type, message, overlimit_reaction_deep };
421
422 ensure_immutable_message( msg_type, message );
423
424 do_deliver_message_impl(
425 tracer,
426 msg_type,
427 message,
428 overlimit_reaction_deep );
429 }
430
431 void
set_delivery_filter(const std::type_index & msg_type,const delivery_filter_t & filter,agent_t & subscriber)432 set_delivery_filter(
433 const std::type_index & msg_type,
434 const delivery_filter_t & filter,
435 agent_t & subscriber ) override
436 {
437 insert_or_modify_subscriber(
438 msg_type,
439 subscriber,
440 [&] {
441 return subscriber_info_t{ &filter };
442 },
443 [&]( subscriber_info_t & info ) {
444 info.set_filter( filter );
445 } );
446 }
447
448 void
drop_delivery_filter(const std::type_index & msg_type,agent_t & subscriber)449 drop_delivery_filter(
450 const std::type_index & msg_type,
451 agent_t & subscriber ) noexcept override
452 {
453 modify_and_remove_subscriber_if_needed(
454 msg_type,
455 subscriber,
456 []( subscriber_info_t & info ) {
457 info.drop_filter();
458 } );
459 }
460
461 so_5::environment_t &
environment() const462 environment() const noexcept override
463 {
464 return this->m_data.m_env;
465 }
466
467 private :
468 //! Data of this message mbox.
469 template_independent_mbox_data_t m_data;
470
471 //! Object lock.
472 lock_t<Config> m_lock;
473
474 template< typename Info_Maker, typename Info_Changer >
475 void
insert_or_modify_subscriber(const std::type_index & msg_type,agent_t & subscriber,Info_Maker maker,Info_Changer changer)476 insert_or_modify_subscriber(
477 const std::type_index & msg_type,
478 agent_t & subscriber,
479 Info_Maker maker,
480 Info_Changer changer )
481 {
482 std::lock_guard< lock_t<Config> > lock( m_lock );
483
484 // If there is no item for this message type it will be
485 // created automatically.
486 auto & table_item = this->m_data.m_messages_table[ msg_type ];
487
488 auto it_subscriber = table_item.m_subscribers.find( &subscriber );
489 if( it_subscriber == table_item.m_subscribers.end() )
490 // There is no subscriber yet. It must be added.
491 it_subscriber = table_item.m_subscribers.emplace(
492 &subscriber, maker() ).first;
493 else
494 // Subscriber is known. It must be updated.
495 changer( it_subscriber->second );
496
497 // If there is a retained message then delivery attempt
498 // must be performed.
499 // NOTE: an exception at this stage doesn't remove new subscription.
500 if( table_item.m_retained_msg )
501 try_deliver_retained_message_to(
502 msg_type,
503 table_item.m_retained_msg,
504 subscriber,
505 it_subscriber->second );
506 }
507
508 template< typename Info_Changer >
509 void
modify_and_remove_subscriber_if_needed(const std::type_index & msg_type,agent_t & subscriber,Info_Changer changer)510 modify_and_remove_subscriber_if_needed(
511 const std::type_index & msg_type,
512 agent_t & subscriber,
513 Info_Changer changer )
514 {
515 std::lock_guard< lock_t<Config> > lock( m_lock );
516
517 auto it_table_item = this->m_data.m_messages_table.find( msg_type );
518 if( it_table_item != this->m_data.m_messages_table.end() )
519 {
520 auto & table_item = it_table_item->second;
521
522 auto it_subscriber = table_item.m_subscribers.find(
523 &subscriber );
524 if( it_subscriber != table_item.m_subscribers.end() )
525 {
526 // Subscriber is found and must be modified.
527 changer( it_subscriber->second );
528
529 // If info about subscriber becomes empty after
530 // modification then subscriber info must be removed.
531 if( it_subscriber->second.empty() )
532 table_item.m_subscribers.erase( it_subscriber );
533 }
534 }
535 }
536
537 void
do_deliver_message_impl(typename Tracing_Base::deliver_op_tracer const & tracer,const std::type_index & msg_type,const message_ref_t & message,unsigned int overlimit_reaction_deep)538 do_deliver_message_impl(
539 typename Tracing_Base::deliver_op_tracer const & tracer,
540 const std::type_index & msg_type,
541 const message_ref_t & message,
542 unsigned int overlimit_reaction_deep )
543 {
544 std::lock_guard< lock_t<Config> > lock( m_lock );
545
546 // If there is no item for this message type it will be
547 // created automatically.
548 auto & table_item = this->m_data.m_messages_table[ msg_type ];
549
550 // Message must be stored as retained.
551 table_item.m_retained_msg = message;
552
553 auto & subscribers = table_item.m_subscribers;
554 if( !subscribers.empty() )
555 for( const auto & kv : subscribers )
556 do_deliver_message_to_subscriber(
557 *(kv.first),
558 kv.second,
559 tracer,
560 msg_type,
561 message,
562 overlimit_reaction_deep );
563 else
564 tracer.no_subscribers();
565 }
566
567 void
do_deliver_message_to_subscriber(agent_t & subscriber,const subscriber_info_t & subscriber_info,typename Tracing_Base::deliver_op_tracer const & tracer,const std::type_index & msg_type,const message_ref_t & message,unsigned int overlimit_reaction_deep) const568 do_deliver_message_to_subscriber(
569 agent_t & subscriber,
570 const subscriber_info_t & subscriber_info,
571 typename Tracing_Base::deliver_op_tracer const & tracer,
572 const std::type_index & msg_type,
573 const message_ref_t & message,
574 unsigned int overlimit_reaction_deep ) const
575 {
576 const auto delivery_status =
577 subscriber_info.must_be_delivered(
578 subscriber,
579 *(message.get()) );
580
581 if( delivery_possibility_t::must_be_delivered == delivery_status )
582 {
583 using namespace so_5::message_limit::impl;
584
585 try_to_deliver_to_agent(
586 this->m_data.m_id,
587 subscriber,
588 subscriber_info.limit(),
589 msg_type,
590 message,
591 overlimit_reaction_deep,
592 tracer.overlimit_tracer(),
593 [&] {
594 tracer.push_to_queue( std::addressof(subscriber) );
595
596 agent_t::call_push_event(
597 subscriber,
598 subscriber_info.limit(),
599 this->m_data.m_id,
600 msg_type,
601 message );
602 } );
603 }
604 else
605 tracer.message_rejected(
606 std::addressof(subscriber), delivery_status );
607 }
608
609 /*!
610 * \brief An attempt to deliver retained message to the new subscriber.
611 *
612 * This attempt will be performed only if there is the retained message.
613 */
614 void
try_deliver_retained_message_to(const std::type_index & msg_type,const message_ref_t & retained_msg,agent_t & subscriber,const subscriber_info_t & subscriber_info)615 try_deliver_retained_message_to(
616 const std::type_index & msg_type,
617 const message_ref_t & retained_msg,
618 agent_t & subscriber,
619 const subscriber_info_t & subscriber_info )
620 {
621 if( retained_msg )
622 {
623 const unsigned int overlimit_reaction_deep = 0;
624
625 typename Tracing_Base::deliver_op_tracer tracer{
626 *this, // as Tracing_base
627 *this, // as abstract_message_box_t
628 "deliver_message_on_subscription",
629 msg_type,
630 retained_msg,
631 overlimit_reaction_deep };
632
633 do_deliver_message_to_subscriber(
634 subscriber,
635 subscriber_info,
636 tracer,
637 msg_type,
638 retained_msg,
639 overlimit_reaction_deep );
640 }
641 }
642
643 /*!
644 * \brief Ensures that message is an immutable message.
645 *
646 * Checks mutability flag and throws an exception if message is
647 * a mutable one.
648 */
649 void
ensure_immutable_message(const std::type_index & msg_type,const message_ref_t & what) const650 ensure_immutable_message(
651 const std::type_index & msg_type,
652 const message_ref_t & what ) const
653 {
654 if( message_mutability_t::immutable_message !=
655 message_mutability( what ) )
656 SO_5_THROW_EXCEPTION(
657 so_5::rc_mutable_msg_cannot_be_delivered_via_mpmc_mbox,
658 "an attempt to deliver mutable message via MPMC mbox"
659 ", msg_type=" + std::string(msg_type.name()) );
660 }
661 };
662
663 } /* namespace details */
664
665 //
666 //
667 // default_traits_t
668 //
669 /*!
670 * \brief Default traits for retained message mbox.
671 */
672 struct default_traits_t {};
673
674 //
675 // make_mbox
676 //
677 /*!
678 * \brief Create an instance of retained message mbox.
679 *
680 * Simple usage example:
681 * \code
682 * so_5::environment_t & env = ...;
683 * const so_5::mbox_t retained_mbox = so_5::extra::mboxes::retained_msg::make_mbox<>(env);
684 * so_5::send<Some_Message>(retained_mbox, ...);
685 * \endcode
686 * An instance of default implementation retained message mbox will be created.
687 * This instance will be protected by std::mutex.
688 *
689 * If you want to use retained_mbox in a single-threaded environment
690 * without a multithreaded protection then so_5::null_mutex_t (or any
691 * similar null-mutex implementation) can be used:
692 * \code
693 * so_5::environment_t & env = ...
694 * const so_5::mbox_t retained_mbox =
695 * so_5::extra::mboxes::retained_msg::make_mbox<
696 * so_5::extra::mboxes::retained_msg::default_traits_t,
697 * so_5::null_mutex_t>(env);
698 * so_5::send<Some_Message>(retained_mbox, ...);
699 * \endcode
700 *
701 * If you want to use your own mutex-like object (with interface which
702 * allows to use your mutex-like class with std::lock_guard) then you can
703 * do it similar way:
704 * \code
705 * so_5::environment_t & env = ...
706 * const so_5::mbox_t retained_mbox =
707 * so_5::extra::mboxes::retained_msg::make_mbox<
708 * so_5::extra::mboxes::retained_msg::default_traits_t,
709 * Your_Own_Mutex_Class>(env);
710 * so_5::send<Some_Message>(retained_mbox, ...);
711 * \endcode
712 *
713 * \tparam Traits type with traits of mbox implementation.
714 *
715 * \tparam Lock_Type a type of mutex to be used for protection of
716 * retained message mbox content. This must be a DefaultConstructible
717 * type with interface which allows to use Lock_Type with std::lock_guard.
718 *
719 * \since
720 * v.1.0.3
721 */
722 template<
723 typename Traits = default_traits_t,
724 typename Lock_Type = std::mutex >
725 mbox_t
make_mbox(environment_t & env)726 make_mbox( environment_t & env )
727 {
728 using config_type = details::config_type< Traits, Lock_Type >;
729
730 return env.make_custom_mbox(
731 []( const mbox_creation_data_t & data )
732 {
733 mbox_t result;
734
735 if( data.m_tracer.get().is_msg_tracing_enabled() )
736 {
737 using T = details::actual_mbox_t<
738 config_type,
739 ::so_5::impl::msg_tracing_helpers::tracing_enabled_base >;
740
741 result = mbox_t{ new T{
742 data.m_env.get(), data.m_id, data.m_tracer.get()
743 }
744 };
745 }
746 else
747 {
748 using T = details::actual_mbox_t<
749 config_type,
750 ::so_5::impl::msg_tracing_helpers::tracing_disabled_base >;
751 result = mbox_t{ new T{ data.m_env.get(), data.m_id } };
752 }
753
754 return result;
755 } );
756 }
757
758 } /* namespace retained_msg */
759
760 } /* namespace mboxes */
761
762 } /* namespace extra */
763
764 } /* namespace so_5 */
765
766