1 /*
2 SObjectizer 5.
3 */
4
5 #include <so_5/agent.hpp>
6 #include <so_5/mbox.hpp>
7 #include <so_5/enveloped_msg.hpp>
8 #include <so_5/environment.hpp>
9 #include <so_5/send_functions.hpp>
10
11 #include <so_5/impl/internal_env_iface.hpp>
12 #include <so_5/impl/coop_private_iface.hpp>
13
14 #include <so_5/impl/subscription_storage_iface.hpp>
15 #include <so_5/impl/process_unhandled_exception.hpp>
16 #include <so_5/impl/message_limit_internals.hpp>
17 #include <so_5/impl/delivery_filter_storage.hpp>
18 #include <so_5/impl/msg_tracing_helpers.hpp>
19
20 #include <so_5/impl/enveloped_msg_details.hpp>
21
22 #include <so_5/details/abort_on_fatal_error.hpp>
23
24 #include <so_5/spinlocks.hpp>
25
26 #include <algorithm>
27 #include <sstream>
28 #include <cstdlib>
29
30 namespace so_5
31 {
32
33 namespace
34 {
35
36 /*!
37 * \since
38 * v.5.4.0
39 *
40 * \brief A helper class for temporary setting and then dropping
41 * the ID of the current working thread.
42 *
43 * \note New working thread_id is set only if it is not an
44 * null thread_id.
45 */
46 struct working_thread_id_sentinel_t
47 {
48 so_5::current_thread_id_t & m_id;
49
working_thread_id_sentinel_tso_5::__anon0cacc8900111::working_thread_id_sentinel_t50 working_thread_id_sentinel_t(
51 so_5::current_thread_id_t & id_var,
52 so_5::current_thread_id_t value_to_set )
53 : m_id( id_var )
54 {
55 if( value_to_set != null_current_thread_id() )
56 m_id = value_to_set;
57 }
~working_thread_id_sentinel_tso_5::__anon0cacc8900111::working_thread_id_sentinel_t58 ~working_thread_id_sentinel_t()
59 {
60 if( m_id != null_current_thread_id() )
61 m_id = null_current_thread_id();
62 }
63 };
64
65 /*!
66 * \since
67 * v.5.4.0
68 *
69 */
70 std::string
create_anonymous_state_name(const agent_t * agent,const state_t * st)71 create_anonymous_state_name( const agent_t * agent, const state_t * st )
72 {
73 std::ostringstream ss;
74 ss << "<state:target=" << agent << ":this=" << st << ">";
75 return ss.str();
76 }
77
78 } /* namespace anonymous */
79
80 // NOTE: Implementation of state_t is moved to that file in v.5.4.0.
81
82 //
83 // state_t::time_limit_t
84 //
85 struct state_t::time_limit_t
86 {
87 struct timeout : public signal_t {};
88
89 duration_t m_limit;
90 const state_t & m_state_to_switch;
91
92 mbox_t m_unique_mbox;
93 timer_id_t m_timer;
94
time_limit_tso_5::state_t::time_limit_t95 time_limit_t(
96 duration_t limit,
97 const state_t & state_to_switch )
98 : m_limit( limit )
99 , m_state_to_switch( state_to_switch )
100 {}
101
102 void
set_up_limit_for_agentso_5::state_t::time_limit_t103 set_up_limit_for_agent(
104 agent_t & agent,
105 const state_t & current_state ) noexcept
106 {
107 // Because this method is called from on_enter handler it can't
108 // throw exceptions. Any exception will lead to abort of the application.
109 // So we don't care about exception safety.
110 so_5::details::invoke_noexcept_code( [&] {
111
112 // New unique mbox is necessary for time limit.
113 m_unique_mbox = impl::internal_env_iface_t{ agent.so_environment() }
114 // A new MPSC mbox will be used for that.
115 .create_mpsc_mbox(
116 // New MPSC mbox will be directly connected to target agent.
117 &agent,
118 // Message limits will not be used.
119 nullptr );
120
121 // A subscription must be created for timeout signal.
122 agent.so_subscribe( m_unique_mbox )
123 .in( current_state )
124 .event( [&agent, this](mhood_t<timeout>) {
125 agent.so_change_state( m_state_to_switch );
126 } );
127
128 // Delayed timeout signal must be sent.
129 m_timer = send_periodic< timeout >(
130 m_unique_mbox,
131 m_limit,
132 duration_t::zero() );
133 } );
134 }
135
136 void
drop_limit_for_agentso_5::state_t::time_limit_t137 drop_limit_for_agent(
138 agent_t & agent,
139 const state_t & current_state ) noexcept
140 {
141 // Because this method is called from on_exit handler it can't
142 // throw exceptions. Any exception will lead to abort of the application.
143 // So we don't care about exception safety.
144 so_5::details::invoke_noexcept_code( [&] {
145 m_timer.release();
146
147 if( m_unique_mbox )
148 {
149 // Old subscription must be removed.
150 agent.so_drop_subscription< timeout >( m_unique_mbox, current_state );
151 // Unique mbox is no more needed.
152 m_unique_mbox = mbox_t{};
153 }
154 } );
155 }
156 };
157
158 //
159 // state_t
160 //
161
state_t(agent_t * target_agent,std::string state_name,state_t * parent_state,std::size_t nested_level,history_t state_history)162 state_t::state_t(
163 agent_t * target_agent,
164 std::string state_name,
165 state_t * parent_state,
166 std::size_t nested_level,
167 history_t state_history )
168 : m_target_agent{ target_agent }
169 , m_state_name( std::move(state_name) )
170 , m_parent_state{ parent_state }
171 , m_initial_substate{ nullptr }
172 , m_state_history{ state_history }
173 , m_last_active_substate{ nullptr }
174 , m_nested_level{ nested_level }
175 , m_substate_count{ 0 }
176 {
177 if( parent_state )
178 {
179 // We should check the deep of nested states.
180 if( m_nested_level >= max_deep )
181 SO_5_THROW_EXCEPTION( rc_state_nesting_is_too_deep,
182 "max nesting deep for agent states is " +
183 std::to_string( max_deep ) );
184
185 // Now we can safely mark parent state as composite.
186 parent_state->m_substate_count += 1;
187 }
188 }
189
state_t(agent_t * agent)190 state_t::state_t(
191 agent_t * agent )
192 : state_t{ agent, history_t::none }
193 {
194 }
195
state_t(agent_t * agent,history_t state_history)196 state_t::state_t(
197 agent_t * agent,
198 history_t state_history )
199 : state_t{ agent, std::string(), nullptr, 0, state_history }
200 {
201 }
202
state_t(agent_t * agent,std::string state_name)203 state_t::state_t(
204 agent_t * agent,
205 std::string state_name )
206 : state_t{ agent, std::move(state_name), history_t::none }
207 {}
208
state_t(agent_t * agent,std::string state_name,history_t state_history)209 state_t::state_t(
210 agent_t * agent,
211 std::string state_name,
212 history_t state_history )
213 : state_t{ agent, std::move(state_name), nullptr, 0, state_history }
214 {}
215
state_t(initial_substate_of parent)216 state_t::state_t(
217 initial_substate_of parent )
218 : state_t{ parent, std::string(), history_t::none }
219 {}
220
state_t(initial_substate_of parent,std::string state_name)221 state_t::state_t(
222 initial_substate_of parent,
223 std::string state_name )
224 : state_t{ parent, std::move(state_name), history_t::none }
225 {}
226
state_t(initial_substate_of parent,std::string state_name,history_t state_history)227 state_t::state_t(
228 initial_substate_of parent,
229 std::string state_name,
230 history_t state_history )
231 : state_t{
232 parent.m_parent_state->m_target_agent,
233 std::move(state_name),
234 parent.m_parent_state,
235 parent.m_parent_state->m_nested_level + 1,
236 state_history }
237 {
238 if( m_parent_state->m_initial_substate )
239 SO_5_THROW_EXCEPTION( rc_initial_substate_already_defined,
240 "initial substate for state " + m_parent_state->query_name() +
241 " is already defined: " +
242 m_parent_state->m_initial_substate->query_name() );
243
244 m_parent_state->m_initial_substate = this;
245 }
246
state_t(substate_of parent)247 state_t::state_t(
248 substate_of parent )
249 : state_t{ parent, std::string(), history_t::none }
250 {}
251
state_t(substate_of parent,std::string state_name)252 state_t::state_t(
253 substate_of parent,
254 std::string state_name )
255 : state_t{ parent, std::move(state_name), history_t::none }
256 {}
257
state_t(substate_of parent,std::string state_name,history_t state_history)258 state_t::state_t(
259 substate_of parent,
260 std::string state_name,
261 history_t state_history )
262 : state_t{
263 parent.m_parent_state->m_target_agent,
264 std::move(state_name),
265 parent.m_parent_state,
266 parent.m_parent_state->m_nested_level + 1,
267 state_history }
268 {}
269
state_t(state_t && other)270 state_t::state_t(
271 state_t && other )
272 : m_target_agent( other.m_target_agent )
273 , m_state_name( std::move( other.m_state_name ) )
274 , m_parent_state{ other.m_parent_state }
275 , m_initial_substate{ other.m_initial_substate }
276 , m_state_history{ other.m_state_history }
277 , m_last_active_substate{ other.m_last_active_substate }
278 , m_nested_level{ other.m_nested_level }
279 , m_substate_count{ other.m_substate_count }
280 , m_on_enter{ std::move(other.m_on_enter) }
281 , m_on_exit{ std::move(other.m_on_exit) }
282 {
283 if( m_parent_state && m_parent_state->m_initial_substate == &other )
284 m_parent_state->m_initial_substate = this;
285 }
286
~state_t()287 state_t::~state_t()
288 {
289 }
290
291 bool
operator ==(const state_t & state) const292 state_t::operator == ( const state_t & state ) const noexcept
293 {
294 return &state == this;
295 }
296
297 std::string
query_name() const298 state_t::query_name() const
299 {
300 auto getter = [this]() -> std::string {
301 if( m_state_name.empty() )
302 return create_anonymous_state_name( m_target_agent, this );
303 else
304 return m_state_name;
305 };
306
307 if( m_parent_state )
308 return m_parent_state->query_name() + "." + getter();
309 else
310 return getter();
311 }
312
313 namespace {
314
315 #if defined(__clang__)
316 #pragma clang diagnostic push
317 #pragma clang diagnostic ignored "-Wexit-time-destructors"
318 #pragma clang diagnostic ignored "-Wglobal-constructors"
319 #endif
320
321 /*!
322 * \since
323 * v.5.4.0
324 *
325 * \brief A special object for the state in which agent is awaiting
326 * for deregistration after unhandled exception.
327 *
328 * This object will be shared between all agents.
329 */
330 const state_t awaiting_deregistration_state(
331 nullptr, "<AWAITING_DEREGISTRATION_AFTER_UNHANDLED_EXCEPTION>" );
332
333 /*!
334 * \since
335 * v.5.5.21
336 *
337 * \brief A special object to be used as state for make subscriptions
338 * for deadletter handlers.
339 *
340 * This object will be shared between all agents.
341 */
342 const state_t deadletter_state(
343 nullptr, "<DEADLETTER_STATE>" );
344
345 #if defined(__clang__)
346 #pragma clang diagnostic pop
347 #endif
348
349 } /* namespace anonymous */
350
351 bool
is_target(const agent_t * agent) const352 state_t::is_target( const agent_t * agent ) const noexcept
353 {
354 if( m_target_agent )
355 return m_target_agent == agent;
356 else if( this == &awaiting_deregistration_state )
357 return true;
358 else
359 return false;
360 }
361
362 void
activate() const363 state_t::activate() const
364 {
365 m_target_agent->so_change_state( *this );
366 }
367
368 state_t &
time_limit(duration_t timeout,const state_t & state_to_switch)369 state_t::time_limit(
370 duration_t timeout,
371 const state_t & state_to_switch )
372 {
373 if( duration_t::zero() == timeout )
374 SO_5_THROW_EXCEPTION( rc_invalid_time_limit_for_state,
375 "zero can't be used as time limit for state: " +
376 query_name() );
377
378 // Old time limit must be dropped if it exists.
379 {
380 // As a defense from exception create new time_limit object first.
381 auto fresh_limit = std::make_unique< time_limit_t >(
382 timeout, std::cref(state_to_switch) );
383 drop_time_limit();
384 m_time_limit = std::move(fresh_limit);
385 }
386
387 // If this state is active then new time limit must be activated.
388 if( is_active() )
389 so_5::details::do_with_rollback_on_exception(
390 [&] {
391 m_time_limit->set_up_limit_for_agent( *m_target_agent, *this );
392 },
393 [&] {
394 // Time limit must be dropped because it is not activated
395 // for the current state.
396 drop_time_limit();
397 } );
398
399 return *this;
400 }
401
402 state_t &
drop_time_limit()403 state_t::drop_time_limit()
404 {
405 if( m_time_limit )
406 {
407 m_time_limit->drop_limit_for_agent( *m_target_agent, *this );
408 m_time_limit.reset();
409 }
410
411 return *this;
412 }
413
414 const state_t *
actual_state_to_enter() const415 state_t::actual_state_to_enter() const
416 {
417 const state_t * s = this;
418 while( 0 != s->m_substate_count )
419 {
420 if( s->m_last_active_substate )
421 // Note: for states with shallow history m_last_active_substate
422 // can point to composite substate. This substate must be
423 // processed usual way with checking for substate count, presence
424 // of initial substate and so on...
425 s = s->m_last_active_substate;
426 else if( !s->m_initial_substate )
427 SO_5_THROW_EXCEPTION( rc_no_initial_substate,
428 "there is no initial substate for composite state: " +
429 query_name() );
430 else
431 s = s->m_initial_substate;
432 }
433
434 return s;
435 }
436
437 void
update_history_in_parent_states() const438 state_t::update_history_in_parent_states() const
439 {
440 auto p = m_parent_state;
441
442 // This pointer will be used for update states with shallow history.
443 // This pointer will be changed on every iteration.
444 auto c = this;
445
446 while( p )
447 {
448 if( history_t::shallow == p->m_state_history )
449 p->m_last_active_substate = c;
450 else if( history_t::deep == p->m_state_history )
451 p->m_last_active_substate = this;
452
453 c = p;
454 p = p->m_parent_state;
455 }
456 }
457
458 void
handle_time_limit_on_enter() const459 state_t::handle_time_limit_on_enter() const
460 {
461 m_time_limit->set_up_limit_for_agent( *m_target_agent, *this );
462 }
463
464 void
handle_time_limit_on_exit() const465 state_t::handle_time_limit_on_exit() const
466 {
467 m_time_limit->drop_limit_for_agent( *m_target_agent, *this );
468 }
469
470 //
471 // agent_t
472 //
473
agent_t(environment_t & env)474 agent_t::agent_t(
475 environment_t & env )
476 : agent_t( env, tuning_options() )
477 {
478 }
479
agent_t(environment_t & env,agent_tuning_options_t options)480 agent_t::agent_t(
481 environment_t & env,
482 agent_tuning_options_t options )
483 : agent_t( context_t( env, std::move( options ) ) )
484 {
485 }
486
agent_t(context_t ctx)487 agent_t::agent_t(
488 context_t ctx )
489 : m_current_state_ptr( &st_default )
490 , m_current_status( agent_status_t::not_defined_yet )
491 , m_handler_finder(
492 // Actual handler finder is dependent on msg_tracing status.
493 impl::internal_env_iface_t( ctx.env() ).is_msg_tracing_enabled() ?
494 &agent_t::handler_finder_msg_tracing_enabled :
495 &agent_t::handler_finder_msg_tracing_disabled )
496 , m_subscriptions(
497 ctx.options().query_subscription_storage_factory()( self_ptr() ) )
498 , m_message_limits(
499 message_limit::impl::create_info_storage_if_necessary(
500 ctx.options().giveout_message_limits() ) )
501 , m_env( ctx.env() )
502 , m_event_queue( nullptr )
503 , m_direct_mbox(
504 impl::internal_env_iface_t( ctx.env() ).create_mpsc_mbox(
505 self_ptr(),
506 m_message_limits.get() ) )
507 // It is necessary to enable agent subscription in the
508 // constructor of derived class.
509 , m_working_thread_id( so_5::query_current_thread_id() )
510 , m_agent_coop( nullptr )
511 , m_priority( ctx.options().query_priority() )
512 {
513 }
514
~agent_t()515 agent_t::~agent_t()
516 {
517 // Sometimes it is possible that agent is destroyed without
518 // correct deregistration from SO Environment.
519 drop_all_delivery_filters();
520 m_subscriptions.reset();
521 }
522
523 void
so_evt_start()524 agent_t::so_evt_start()
525 {
526 // Default implementation do nothing.
527 }
528
529 void
so_evt_finish()530 agent_t::so_evt_finish()
531 {
532 // Default implementation do nothing.
533 }
534
535 bool
so_is_active_state(const state_t & state_to_check) const536 agent_t::so_is_active_state( const state_t & state_to_check ) const noexcept
537 {
538 state_t::path_t path;
539 m_current_state_ptr->fill_path( path );
540
541 auto e = begin(path) + static_cast< state_t::path_t::difference_type >(
542 m_current_state_ptr->nested_level() ) + 1;
543
544 return e != std::find( begin(path), e, &state_to_check );
545 }
546
547 void
so_add_nondestroyable_listener(agent_state_listener_t & state_listener)548 agent_t::so_add_nondestroyable_listener(
549 agent_state_listener_t & state_listener )
550 {
551 m_state_listener_controller.add(
552 impl::state_listener_controller_t::wrap_nondestroyable(
553 state_listener ) );
554 }
555
556 void
so_add_destroyable_listener(agent_state_listener_unique_ptr_t state_listener)557 agent_t::so_add_destroyable_listener(
558 agent_state_listener_unique_ptr_t state_listener )
559 {
560 m_state_listener_controller.add(
561 impl::state_listener_controller_t::wrap_destroyable(
562 std::move( state_listener ) ) );
563 }
564
565 exception_reaction_t
so_exception_reaction() const566 agent_t::so_exception_reaction() const
567 {
568 if( m_agent_coop )
569 return m_agent_coop->exception_reaction();
570 else
571 // This is very strange case. So it would be better to abort.
572 return abort_on_exception;
573 }
574
575 void
so_switch_to_awaiting_deregistration_state()576 agent_t::so_switch_to_awaiting_deregistration_state()
577 {
578 so_change_state( awaiting_deregistration_state );
579 }
580
581 const mbox_t &
so_direct_mbox() const582 agent_t::so_direct_mbox() const
583 {
584 return m_direct_mbox;
585 }
586
587 mbox_t
so_make_new_direct_mbox()588 agent_t::so_make_new_direct_mbox()
589 {
590 return impl::internal_env_iface_t{ so_environment() }.create_mpsc_mbox(
591 self_ptr(),
592 m_message_limits.get() );
593 }
594
595 const state_t &
so_default_state() const596 agent_t::so_default_state() const
597 {
598 return st_default;
599 }
600
601 namespace impl {
602
603 class state_switch_guard_t
604 {
605 agent_t & m_agent;
606 agent_t::agent_status_t m_previous_status;
607
608 public :
state_switch_guard_t(agent_t & agent)609 state_switch_guard_t(
610 agent_t & agent )
611 : m_agent( agent )
612 , m_previous_status( agent.m_current_status )
613 {
614 if( agent_t::agent_status_t::state_switch_in_progress
615 == agent.m_current_status )
616 SO_5_THROW_EXCEPTION(
617 rc_another_state_switch_in_progress,
618 "an attempt to switch agent state when another state "
619 "switch operation is in progress for the same agent" );
620
621 agent.m_current_status = agent_t::agent_status_t::state_switch_in_progress;
622 }
~state_switch_guard_t()623 ~state_switch_guard_t()
624 {
625 m_agent.m_current_status = m_previous_status;
626 }
627 };
628
629 } /* namespace impl */
630
631 void
so_change_state(const state_t & new_state)632 agent_t::so_change_state(
633 const state_t & new_state )
634 {
635 ensure_operation_is_on_working_thread( "so_change_state" );
636
637 if( new_state.is_target( this ) )
638 {
639 // Since v.5.5.18 we must check nested state switch operations.
640 // This object will drop pointer to the current state.
641 impl::state_switch_guard_t switch_op_guard( *this );
642
643 auto actual_new_state = new_state.actual_state_to_enter();
644 if( !( *actual_new_state == *m_current_state_ptr ) )
645 {
646 // New state differs from the current one.
647 // Actual state switch must be performed.
648 do_state_switch( *actual_new_state );
649
650 // State listener should be informed.
651 m_state_listener_controller.changed(
652 *this,
653 *m_current_state_ptr );
654 }
655 }
656 else
657 SO_5_THROW_EXCEPTION(
658 rc_agent_unknown_state,
659 "unable to switch agent to alien state "
660 "(the state that doesn't belong to this agent)" );
661 }
662
663 void
so_initiate_agent_definition()664 agent_t::so_initiate_agent_definition()
665 {
666 working_thread_id_sentinel_t sentinel(
667 m_working_thread_id,
668 so_5::query_current_thread_id() );
669
670 so_define_agent();
671
672 m_current_status = agent_status_t::defined;
673 }
674
675 void
so_define_agent()676 agent_t::so_define_agent()
677 {
678 // Default implementation do nothing.
679 }
680
681 bool
so_was_defined() const682 agent_t::so_was_defined() const
683 {
684 return agent_status_t::not_defined_yet != m_current_status;
685 }
686
687 environment_t &
so_environment() const688 agent_t::so_environment() const
689 {
690 return m_env;
691 }
692
693 [[nodiscard]]
694 coop_handle_t
so_coop() const695 agent_t::so_coop() const
696 {
697 if( !m_agent_coop )
698 SO_5_THROW_EXCEPTION(
699 rc_agent_has_no_cooperation,
700 "agent_t::so_coop() can be completed because agent is not bound "
701 "to any cooperation" );
702
703 return m_agent_coop->handle();
704 }
705
706 void
so_bind_to_dispatcher(event_queue_t & queue)707 agent_t::so_bind_to_dispatcher(
708 event_queue_t & queue ) noexcept
709 {
710 // Since v.5.5.24 we should use event_queue_hook to get an
711 // actual event_queue.
712 auto * actual_queue = impl::internal_env_iface_t{ m_env }
713 .event_queue_on_bind( this, &queue );
714
715 std::lock_guard< default_rw_spinlock_t > queue_lock{ m_event_queue_lock };
716
717 // Cooperation usage counter should be incremented.
718 // It will be decremented during final agent event execution.
719 impl::coop_private_iface_t::increment_usage_count( *m_agent_coop );
720
721 // A starting demand must be sent first.
722 actual_queue->push(
723 execution_demand_t(
724 this,
725 message_limit::control_block_t::none(),
726 0,
727 typeid(void),
728 message_ref_t(),
729 &agent_t::demand_handler_on_start ) );
730
731 // Only then pointer to the queue could be stored.
732 m_event_queue = actual_queue;
733 }
734
735 execution_hint_t
so_create_execution_hint(execution_demand_t & d)736 agent_t::so_create_execution_hint(
737 execution_demand_t & d )
738 {
739 enum class demand_type_t {
740 message, enveloped_msg, other
741 };
742
743 // We can't use message_kind_t here because there are special
744 // demands like demands for so_evt_start/so_evt_finish.
745 // Because of that a pointer to demand handler will be analyzed.
746 const auto demand_type =
747 (d.m_demand_handler == &agent_t::demand_handler_on_message ?
748 demand_type_t::message :
749 (d.m_demand_handler == &agent_t::demand_handler_on_enveloped_msg ?
750 demand_type_t::enveloped_msg : demand_type_t::other));
751
752 if( demand_type_t::other != demand_type )
753 {
754 // Try to find handler for the demand.
755 auto handler = d.m_receiver->m_handler_finder(
756 d, "create_execution_hint" );
757 if( demand_type_t::message == demand_type )
758 {
759 if( handler )
760 return execution_hint_t(
761 d,
762 [handler](
763 execution_demand_t & demand,
764 current_thread_id_t thread_id ) {
765 process_message(
766 thread_id,
767 demand,
768 handler->m_method );
769 },
770 handler->m_thread_safety );
771 else
772 // Handler not found.
773 return execution_hint_t::create_empty_execution_hint( d );
774 }
775 else
776 {
777 // Execution hint for enveloped message is
778 // very similar to hint for service request.
779 return execution_hint_t(
780 d,
781 [handler](
782 execution_demand_t & demand,
783 current_thread_id_t thread_id ) {
784 process_enveloped_msg(
785 thread_id,
786 demand,
787 handler );
788 },
789 handler ? handler->m_thread_safety :
790 // If there is no real handler then
791 // there will only be actions from
792 // envelope.
793 // These actions should be thread safe.
794 thread_safe );
795 }
796 }
797 else
798 // This is demand_handler_on_start or demand_handler_on_finish.
799 return execution_hint_t(
800 d,
801 []( execution_demand_t & demand,
802 current_thread_id_t thread_id ) {
803 demand.call_handler( thread_id );
804 },
805 not_thread_safe );
806 }
807
808 void
so_deregister_agent_coop(int dereg_reason)809 agent_t::so_deregister_agent_coop( int dereg_reason )
810 {
811 so_environment().deregister_coop(
812 m_agent_coop->handle(), dereg_reason );
813 }
814
815 void
so_deregister_agent_coop_normally()816 agent_t::so_deregister_agent_coop_normally()
817 {
818 so_deregister_agent_coop( dereg_reason::normal );
819 }
820
821 agent_ref_t
create_ref()822 agent_t::create_ref()
823 {
824 agent_ref_t agent_ref( this );
825 return agent_ref;
826 }
827
828 void
bind_to_coop(coop_t & coop)829 agent_t::bind_to_coop( coop_t & coop )
830 {
831 m_agent_coop = &coop;
832 }
833
834 void
shutdown_agent()835 agent_t::shutdown_agent() noexcept
836 {
837 event_queue_t * actual_queue = nullptr;
838 {
839 std::lock_guard< default_rw_spinlock_t > queue_lock{ m_event_queue_lock };
840
841 // Since v.5.5.8 shutdown is done by two simple step:
842 // - remove actual value from m_event_queue;
843 // - pushing final demand to actual event queue.
844 //
845 // No new demands will be sent to the agent, but all the subscriptions
846 // remains. They will be destroyed at the very end of agent's lifetime.
847
848 if( m_event_queue )
849 {
850 // This pointer will be used later.
851 actual_queue = m_event_queue;
852
853 // Final event must be pushed to queue.
854 so_5::details::invoke_noexcept_code( [&] {
855 m_event_queue->push(
856 execution_demand_t(
857 this,
858 message_limit::control_block_t::none(),
859 0,
860 typeid(void),
861 message_ref_t(),
862 &agent_t::demand_handler_on_finish ) );
863
864 // No more events will be stored to the queue.
865 m_event_queue = nullptr;
866 } );
867
868 }
869 else
870 so_5::details::abort_on_fatal_error( [&] {
871 SO_5_LOG_ERROR( so_environment(), log_stream )
872 {
873 log_stream << "Unexpected error: m_event_queue contains "
874 "nullptr. Unable to push demand_handler_on_finish for "
875 "the agent (" << this << "). Application will be aborted"
876 << std::endl;
877 }
878 } );
879 }
880
881 if( actual_queue )
882 // Since v.5.5.24 we should utilize event_queue via
883 // event_queue_hook.
884 impl::internal_env_iface_t{ m_env }
885 .event_queue_on_unbind( this, actual_queue );
886 }
887
888 void
so_create_event_subscription(const mbox_t & mbox_ref,std::type_index msg_type,const state_t & target_state,const event_handler_method_t & method,thread_safety_t thread_safety,event_handler_kind_t handler_kind)889 agent_t::so_create_event_subscription(
890 const mbox_t & mbox_ref,
891 std::type_index msg_type,
892 const state_t & target_state,
893 const event_handler_method_t & method,
894 thread_safety_t thread_safety,
895 event_handler_kind_t handler_kind )
896 {
897 // Since v.5.4.0 there is no need for locking agent's mutex
898 // because this operation can be performed only on agent's
899 // working thread.
900
901 ensure_operation_is_on_working_thread( "so_create_event_subscription" );
902
903 m_subscriptions->create_event_subscription(
904 mbox_ref,
905 msg_type,
906 detect_limit_for_message_type( msg_type ),
907 target_state,
908 method,
909 thread_safety,
910 handler_kind );
911 }
912
913 void
so_create_deadletter_subscription(const mbox_t & mbox,const std::type_index & msg_type,const event_handler_method_t & method,thread_safety_t thread_safety)914 agent_t::so_create_deadletter_subscription(
915 const mbox_t & mbox,
916 const std::type_index & msg_type,
917 const event_handler_method_t & method,
918 thread_safety_t thread_safety )
919 {
920 ensure_operation_is_on_working_thread( "so_create_deadletter_subscription" );
921
922 m_subscriptions->create_event_subscription(
923 mbox,
924 msg_type,
925 detect_limit_for_message_type( msg_type ),
926 deadletter_state,
927 method,
928 thread_safety,
929 event_handler_kind_t::final_handler );
930 }
931
932 void
so_destroy_deadletter_subscription(const mbox_t & mbox,const std::type_index & msg_type)933 agent_t::so_destroy_deadletter_subscription(
934 const mbox_t & mbox,
935 const std::type_index & msg_type )
936 {
937 // Since v.5.4.0 there is no need for locking agent's mutex
938 // because this operation can be performed only on agent's
939 // working thread.
940
941 ensure_operation_is_on_working_thread( "do_drop_deadletter_handler" );
942
943 m_subscriptions->drop_subscription( mbox, msg_type, deadletter_state );
944 }
945
946 const message_limit::control_block_t *
detect_limit_for_message_type(const std::type_index & msg_type)947 agent_t::detect_limit_for_message_type(
948 const std::type_index & msg_type )
949 {
950 const message_limit::control_block_t * result = nullptr;
951
952 if( m_message_limits )
953 {
954 result = m_message_limits->find_or_create( msg_type );
955 if( !result )
956 SO_5_THROW_EXCEPTION(
957 so_5::rc_message_has_no_limit_defined,
958 std::string( "an attempt to subscribe to message type without "
959 "predefined limit for that type, type: " ) +
960 msg_type.name() );
961 }
962
963 return result;
964 }
965
966 void
do_drop_subscription(const mbox_t & mbox,const std::type_index & msg_type,const state_t & target_state)967 agent_t::do_drop_subscription(
968 const mbox_t & mbox,
969 const std::type_index & msg_type,
970 const state_t & target_state )
971 {
972 // Since v.5.4.0 there is no need for locking agent's mutex
973 // because this operation can be performed only on agent's
974 // working thread.
975
976 ensure_operation_is_on_working_thread( "do_drop_subscription" );
977
978 m_subscriptions->drop_subscription( mbox, msg_type, target_state );
979 }
980
981 void
do_drop_subscription_for_all_states(const mbox_t & mbox,const std::type_index & msg_type)982 agent_t::do_drop_subscription_for_all_states(
983 const mbox_t & mbox,
984 const std::type_index & msg_type )
985 {
986 // Since v.5.4.0 there is no need for locking agent's mutex
987 // because this operation can be performed only on agent's
988 // working thread.
989
990 ensure_operation_is_on_working_thread(
991 "do_drop_subscription_for_all_states" );
992
993 m_subscriptions->drop_subscription_for_all_states( mbox, msg_type );
994 }
995
996 bool
do_check_subscription_presence(const mbox_t & mbox,const std::type_index & msg_type,const state_t & target_state) const997 agent_t::do_check_subscription_presence(
998 const mbox_t & mbox,
999 const std::type_index & msg_type,
1000 const state_t & target_state ) const noexcept
1001 {
1002 return nullptr != m_subscriptions->find_handler(
1003 mbox->id(), msg_type, target_state );
1004 }
1005
1006 bool
do_check_deadletter_presence(const mbox_t & mbox,const std::type_index & msg_type) const1007 agent_t::do_check_deadletter_presence(
1008 const mbox_t & mbox,
1009 const std::type_index & msg_type ) const noexcept
1010 {
1011 return nullptr != m_subscriptions->find_handler(
1012 mbox->id(), msg_type, deadletter_state );
1013 }
1014
1015 namespace {
1016
1017 /*!
1018 * \brief A helper function to select actual demand handler in
1019 * dependency of message kind.
1020 *
1021 * \since
1022 * v.5.5.23
1023 */
1024 inline demand_handler_pfn_t
select_demand_handler_for_message(const agent_t & agent,const message_ref_t & msg)1025 select_demand_handler_for_message(
1026 const agent_t & agent,
1027 const message_ref_t & msg )
1028 {
1029 demand_handler_pfn_t result = &agent_t::demand_handler_on_message;
1030 if( msg )
1031 {
1032 switch( message_kind( *msg ) )
1033 {
1034 case message_t::kind_t::classical_message : // Already has value.
1035 break;
1036
1037 case message_t::kind_t::user_type_message : // Already has value.
1038 break;
1039
1040 case message_t::kind_t::enveloped_msg :
1041 result = &agent_t::demand_handler_on_enveloped_msg;
1042 break;
1043
1044 case message_t::kind_t::signal :
1045 so_5::details::abort_on_fatal_error( [&] {
1046 SO_5_LOG_ERROR( agent.so_environment(), log_stream )
1047 {
1048 log_stream << "message that has data and message_kind_t::signal!"
1049 "Signals can't have data. Application will be aborted!"
1050 << std::endl;
1051 }
1052 } );
1053 break;
1054 }
1055 }
1056
1057 return result;
1058 }
1059
1060 } /* namespace anonymous */
1061
1062 void
push_event(const message_limit::control_block_t * limit,mbox_id_t mbox_id,std::type_index msg_type,const message_ref_t & message)1063 agent_t::push_event(
1064 const message_limit::control_block_t * limit,
1065 mbox_id_t mbox_id,
1066 std::type_index msg_type,
1067 const message_ref_t & message )
1068 {
1069 const auto handler = select_demand_handler_for_message( *this, message );
1070
1071 read_lock_guard_t< default_rw_spinlock_t > queue_lock{ m_event_queue_lock };
1072
1073 if( m_event_queue )
1074 m_event_queue->push(
1075 execution_demand_t(
1076 this,
1077 limit,
1078 mbox_id,
1079 msg_type,
1080 message,
1081 handler ) );
1082 }
1083
1084 void
demand_handler_on_start(current_thread_id_t working_thread_id,execution_demand_t & d)1085 agent_t::demand_handler_on_start(
1086 current_thread_id_t working_thread_id,
1087 execution_demand_t & d )
1088 {
1089 d.m_receiver->ensure_binding_finished();
1090
1091 working_thread_id_sentinel_t sentinel(
1092 d.m_receiver->m_working_thread_id,
1093 working_thread_id );
1094
1095 try
1096 {
1097 d.m_receiver->so_evt_start();
1098 }
1099 catch( const std::exception & x )
1100 {
1101 impl::process_unhandled_exception(
1102 working_thread_id, x, *(d.m_receiver) );
1103 }
1104 catch( ... ) // Since v.5.5.24.3
1105 {
1106 impl::process_unhandled_unknown_exception(
1107 working_thread_id, *(d.m_receiver) );
1108 }
1109 }
1110
1111 void
ensure_binding_finished()1112 agent_t::ensure_binding_finished()
1113 {
1114 // Nothing more to do.
1115 // Just lock coop's binding_lock. If cooperation is not finished yet
1116 // it would stop the current thread.
1117 std::lock_guard< std::mutex > binding_lock{ m_agent_coop->m_lock };
1118 }
1119
1120 demand_handler_pfn_t
get_demand_handler_on_start_ptr()1121 agent_t::get_demand_handler_on_start_ptr() noexcept
1122 {
1123 return &agent_t::demand_handler_on_start;
1124 }
1125
1126 void
demand_handler_on_finish(current_thread_id_t working_thread_id,execution_demand_t & d)1127 agent_t::demand_handler_on_finish(
1128 current_thread_id_t working_thread_id,
1129 execution_demand_t & d )
1130 {
1131 {
1132 // Sentinel must finish its work before decrementing
1133 // reference count to cooperation.
1134 working_thread_id_sentinel_t sentinel(
1135 d.m_receiver->m_working_thread_id,
1136 working_thread_id );
1137
1138 try
1139 {
1140 d.m_receiver->so_evt_finish();
1141 }
1142 catch( const std::exception & x )
1143 {
1144 impl::process_unhandled_exception(
1145 working_thread_id, x, *(d.m_receiver) );
1146 }
1147 catch( ... ) // Since v.5.5.24.3
1148 {
1149 impl::process_unhandled_unknown_exception(
1150 working_thread_id, *(d.m_receiver) );
1151 }
1152
1153 // Since v.5.5.15 agent should be returned in default state.
1154 d.m_receiver->return_to_default_state_if_possible();
1155 }
1156
1157 // Cooperation should receive notification about agent deregistration.
1158 impl::coop_private_iface_t::decrement_usage_count(
1159 *(d.m_receiver->m_agent_coop) );
1160 }
1161
1162 demand_handler_pfn_t
get_demand_handler_on_finish_ptr()1163 agent_t::get_demand_handler_on_finish_ptr() noexcept
1164 {
1165 return &agent_t::demand_handler_on_finish;
1166 }
1167
1168 void
demand_handler_on_message(current_thread_id_t working_thread_id,execution_demand_t & d)1169 agent_t::demand_handler_on_message(
1170 current_thread_id_t working_thread_id,
1171 execution_demand_t & d )
1172 {
1173 message_limit::control_block_t::decrement( d.m_limit );
1174
1175 auto handler = d.m_receiver->m_handler_finder(
1176 d, "demand_handler_on_message" );
1177 if( handler )
1178 process_message( working_thread_id, d, handler->m_method );
1179 }
1180
1181 demand_handler_pfn_t
get_demand_handler_on_message_ptr()1182 agent_t::get_demand_handler_on_message_ptr() noexcept
1183 {
1184 return &agent_t::demand_handler_on_message;
1185 }
1186
1187 void
demand_handler_on_enveloped_msg(current_thread_id_t working_thread_id,execution_demand_t & d)1188 agent_t::demand_handler_on_enveloped_msg(
1189 current_thread_id_t working_thread_id,
1190 execution_demand_t & d )
1191 {
1192 message_limit::control_block_t::decrement( d.m_limit );
1193
1194 auto handler = d.m_receiver->m_handler_finder(
1195 d, "demand_handler_on_enveloped_msg" );
1196 process_enveloped_msg( working_thread_id, d, handler );
1197 }
1198
1199 demand_handler_pfn_t
get_demand_handler_on_enveloped_msg_ptr()1200 agent_t::get_demand_handler_on_enveloped_msg_ptr() noexcept
1201 {
1202 return &agent_t::demand_handler_on_enveloped_msg;
1203 }
1204
1205 void
process_message(current_thread_id_t working_thread_id,execution_demand_t & d,event_handler_method_t method)1206 agent_t::process_message(
1207 current_thread_id_t working_thread_id,
1208 execution_demand_t & d,
1209 event_handler_method_t method )
1210 {
1211 working_thread_id_sentinel_t sentinel(
1212 d.m_receiver->m_working_thread_id,
1213 working_thread_id );
1214
1215 try
1216 {
1217 method( d.m_message_ref );
1218 }
1219 catch( const std::exception & x )
1220 {
1221 impl::process_unhandled_exception(
1222 working_thread_id, x, *(d.m_receiver) );
1223 }
1224 catch( ... ) // Since v.5.5.24.3
1225 {
1226 impl::process_unhandled_unknown_exception(
1227 working_thread_id, *(d.m_receiver) );
1228 }
1229 }
1230
1231 void
process_enveloped_msg(current_thread_id_t working_thread_id,execution_demand_t & d,const impl::event_handler_data_t * handler_data)1232 agent_t::process_enveloped_msg(
1233 current_thread_id_t working_thread_id,
1234 execution_demand_t & d,
1235 const impl::event_handler_data_t * handler_data )
1236 {
1237 using namespace enveloped_msg::impl;
1238
1239 if( handler_data )
1240 {
1241 // If this is intermediate_handler then we should pass the
1242 // whole envelope to it.
1243 if( event_handler_kind_t::intermediate_handler == handler_data->m_kind )
1244 // Just call process_message() in that case because
1245 // process_message() already does what we need (including
1246 // setting working_thread_id and handling of exceptions).
1247 process_message( working_thread_id, d, handler_data->m_method );
1248 else
1249 // For a final_handler the payload should be extracted
1250 // from the envelope and the extracted payload should go
1251 // to the handler.
1252 // We don't expect exceptions here and can't restore after them.
1253 so_5::details::invoke_noexcept_code( [&] {
1254 auto & envelope = message_to_envelope( d.m_message_ref );
1255 agent_demand_handler_invoker_t invoker{
1256 working_thread_id,
1257 d,
1258 *handler_data
1259 };
1260 envelope.access_hook(
1261 so_5::enveloped_msg::access_context_t::handler_found,
1262 invoker );
1263 } );
1264 }
1265 }
1266
1267 void
ensure_operation_is_on_working_thread(const char * operation_name) const1268 agent_t::ensure_operation_is_on_working_thread(
1269 const char * operation_name ) const
1270 {
1271 if( so_5::query_current_thread_id() != m_working_thread_id )
1272 {
1273 std::ostringstream s;
1274
1275 s << operation_name
1276 << ": operation is enabled only on agent's working thread; "
1277 << "working_thread_id: ";
1278
1279 if( m_working_thread_id == null_current_thread_id() )
1280 s << "<NONE>";
1281 else
1282 s << m_working_thread_id;
1283
1284 s << ", current_thread_id: " << so_5::query_current_thread_id();
1285
1286 SO_5_THROW_EXCEPTION(
1287 so_5::rc_operation_enabled_only_on_agent_working_thread,
1288 s.str() );
1289 }
1290 }
1291
1292 void
drop_all_delivery_filters()1293 agent_t::drop_all_delivery_filters() noexcept
1294 {
1295 if( m_delivery_filters )
1296 {
1297 m_delivery_filters->drop_all( *this );
1298 m_delivery_filters.reset();
1299 }
1300 }
1301
1302 void
do_set_delivery_filter(const mbox_t & mbox,const std::type_index & msg_type,delivery_filter_unique_ptr_t filter)1303 agent_t::do_set_delivery_filter(
1304 const mbox_t & mbox,
1305 const std::type_index & msg_type,
1306 delivery_filter_unique_ptr_t filter )
1307 {
1308 ensure_operation_is_on_working_thread( "set_delivery_filter" );
1309
1310 if( !m_delivery_filters )
1311 m_delivery_filters.reset( new impl::delivery_filter_storage_t() );
1312
1313 m_delivery_filters->set_delivery_filter(
1314 mbox,
1315 msg_type,
1316 std::move(filter),
1317 *this );
1318 }
1319
1320 void
do_drop_delivery_filter(const mbox_t & mbox,const std::type_index & msg_type)1321 agent_t::do_drop_delivery_filter(
1322 const mbox_t & mbox,
1323 const std::type_index & msg_type ) noexcept
1324 {
1325 ensure_operation_is_on_working_thread( "set_delivery_filter" );
1326
1327 if( m_delivery_filters )
1328 m_delivery_filters->drop_delivery_filter( mbox, msg_type, *this );
1329 }
1330
1331 const impl::event_handler_data_t *
handler_finder_msg_tracing_disabled(execution_demand_t & d,const char *)1332 agent_t::handler_finder_msg_tracing_disabled(
1333 execution_demand_t & d,
1334 const char * /*context_marker*/ )
1335 {
1336 auto search_result = find_event_handler_for_current_state( d );
1337 if( !search_result )
1338 // Since v.5.5.21 we should check for deadletter handler for that demand.
1339 search_result = find_deadletter_handler( d );
1340
1341 return search_result;
1342 }
1343
1344 const impl::event_handler_data_t *
handler_finder_msg_tracing_enabled(execution_demand_t & d,const char * context_marker)1345 agent_t::handler_finder_msg_tracing_enabled(
1346 execution_demand_t & d,
1347 const char * context_marker )
1348 {
1349 auto search_result = find_event_handler_for_current_state( d );
1350
1351 if( !search_result )
1352 {
1353 // Since v.5.5.21 we should check for deadletter handler for that demand.
1354 search_result = find_deadletter_handler( d );
1355
1356 if( search_result )
1357 {
1358 // Deadletter handler found. This must be reflected in trace.
1359 impl::msg_tracing_helpers::trace_deadletter_handler_search_result(
1360 d,
1361 context_marker,
1362 search_result );
1363
1364 return search_result;
1365 }
1366 }
1367
1368 // This trace will be made if an event_handler is found for the
1369 // current state or not found at all (including deadletter handlers).
1370 impl::msg_tracing_helpers::trace_event_handler_search_result(
1371 d,
1372 context_marker,
1373 search_result );
1374
1375 return search_result;
1376 }
1377
1378 const impl::event_handler_data_t *
find_event_handler_for_current_state(execution_demand_t & d)1379 agent_t::find_event_handler_for_current_state(
1380 execution_demand_t & d )
1381 {
1382 const impl::event_handler_data_t * search_result = nullptr;
1383 const state_t * s = &d.m_receiver->so_current_state();
1384
1385 do {
1386 search_result = d.m_receiver->m_subscriptions->find_handler(
1387 d.m_mbox_id,
1388 d.m_msg_type,
1389 *s );
1390
1391 if( !search_result )
1392 s = s->parent_state();
1393
1394 } while( search_result == nullptr && s != nullptr );
1395
1396 return search_result;
1397 }
1398
1399 const impl::event_handler_data_t *
find_deadletter_handler(execution_demand_t & demand)1400 agent_t::find_deadletter_handler(
1401 execution_demand_t & demand )
1402 {
1403 return demand.m_receiver->m_subscriptions->find_handler(
1404 demand.m_mbox_id,
1405 demand.m_msg_type,
1406 deadletter_state );
1407 }
1408
1409 void
do_state_switch(const state_t & state_to_be_set)1410 agent_t::do_state_switch(
1411 const state_t & state_to_be_set ) noexcept
1412 {
1413 state_t::path_t old_path;
1414 state_t::path_t new_path;
1415
1416 // Since v.5.5.22 we will change the value of m_current_state_ptr
1417 // during state change procedure.
1418 auto current_st = m_current_state_ptr;
1419
1420 current_st->fill_path( old_path );
1421 state_to_be_set.fill_path( new_path );
1422
1423 // Find the first item which is different in the paths.
1424 std::size_t first_diff = 0;
1425 for(; first_diff < std::min(
1426 current_st->nested_level(),
1427 state_to_be_set.nested_level() );
1428 ++first_diff )
1429 if( old_path[ first_diff ] != new_path[ first_diff ] )
1430 break;
1431
1432 // Do call for on_exit and on_enter for states.
1433 // on_exit and on_enter should not throw exceptions.
1434 so_5::details::invoke_noexcept_code( [&] {
1435
1436 impl::msg_tracing_helpers::safe_trace_state_leaving(
1437 *this, *current_st );
1438
1439 for( std::size_t i = current_st->nested_level();
1440 i >= first_diff; )
1441 {
1442 // Modify current state before calling on_exit handler.
1443 m_current_state_ptr = old_path[ i ];
1444 // Perform on_exit actions.
1445 old_path[ i ]->call_on_exit();
1446 if( i )
1447 --i;
1448 else
1449 break;
1450 }
1451
1452 impl::msg_tracing_helpers::safe_trace_state_entering(
1453 *this, state_to_be_set );
1454
1455 for( std::size_t i = first_diff;
1456 i <= state_to_be_set.nested_level();
1457 ++i )
1458 {
1459 // Modify current state before calling on_exit handler.
1460 m_current_state_ptr = new_path[ i ];
1461
1462 // Perform on_enter actions.
1463 new_path[ i ]->call_on_enter();
1464 }
1465 } );
1466
1467 // Now the current state for the agent can be changed.
1468 m_current_state_ptr = &state_to_be_set;
1469 m_current_state_ptr->update_history_in_parent_states();
1470 }
1471
1472 void
return_to_default_state_if_possible()1473 agent_t::return_to_default_state_if_possible() noexcept
1474 {
1475 if( !( st_default == so_current_state() ||
1476 awaiting_deregistration_state == so_current_state() ) )
1477 {
1478 // The agent must be returned to the default state.
1479 // All on_exit handlers must be called at this point.
1480 so_change_state( st_default );
1481 }
1482 }
1483
1484 } /* namespace so_5 */
1485
1486