1 /*
2 SObjectizer 5.
3 */
4
5 #include <so_5/environment.hpp>
6
7 #include <string>
8
9 #include <so_5/impl/internal_env_iface.hpp>
10 #include <so_5/impl/coop_private_iface.hpp>
11
12 #include <so_5/impl/mbox_core.hpp>
13 #include <so_5/impl/layer_core.hpp>
14 #include <so_5/impl/stop_guard_repo.hpp>
15 #include <so_5/impl/std_msg_tracer_holder.hpp>
16
17 #include <so_5/impl/run_stage.hpp>
18
19 #include <so_5/stats/impl/std_controller.hpp>
20 #include <so_5/stats/impl/ds_mbox_core_stats.hpp>
21 #include <so_5/stats/impl/ds_agent_core_stats.hpp>
22 #include <so_5/stats/impl/ds_timer_thread_stats.hpp>
23
24 #include <so_5/env_infrastructures.hpp>
25
26 #include <so_5/details/rollback_on_exception.hpp>
27
28 namespace so_5
29 {
30
31 //
32 // environment_params_t
33 //
34
environment_params_t()35 environment_params_t::environment_params_t()
36 : m_event_exception_logger( create_std_event_exception_logger() )
37 , m_exception_reaction( abort_on_exception )
38 , m_autoshutdown_disabled( false )
39 , m_error_logger( create_stderr_logger() )
40 , m_work_thread_activity_tracking(
41 work_thread_activity_tracking_t::unspecified )
42 , m_infrastructure_factory( env_infrastructures::default_mt::factory() )
43 , m_event_queue_hook( make_empty_event_queue_hook_unique_ptr() )
44 {
45 }
46
environment_params_t(environment_params_t && other)47 environment_params_t::environment_params_t(
48 environment_params_t && other )
49 : m_timer_thread_factory( std::move( other.m_timer_thread_factory ) )
50 , m_so_layers( std::move( other.m_so_layers ) )
51 , m_coop_listener( std::move( other.m_coop_listener ) )
52 , m_event_exception_logger( std::move( other.m_event_exception_logger ) )
53 , m_exception_reaction( other.m_exception_reaction )
54 , m_autoshutdown_disabled( other.m_autoshutdown_disabled )
55 , m_error_logger( std::move( other.m_error_logger ) )
56 , m_message_delivery_tracer( std::move( other.m_message_delivery_tracer ) )
57 , m_message_delivery_tracer_filter(
58 std::move( other.m_message_delivery_tracer_filter ) )
59 , m_work_thread_activity_tracking(
60 other.m_work_thread_activity_tracking )
61 , m_queue_locks_defaults_manager( std::move( other.m_queue_locks_defaults_manager ) )
62 , m_infrastructure_factory( std::move(other.m_infrastructure_factory) )
63 , m_event_queue_hook( std::move(other.m_event_queue_hook) )
64 {}
65
~environment_params_t()66 environment_params_t::~environment_params_t()
67 {
68 }
69
70 environment_params_t &
operator =(environment_params_t && other)71 environment_params_t::operator=( environment_params_t && other )
72 {
73 environment_params_t tmp( std::move( other ) );
74 swap( *this, tmp );
75
76 return *this;
77 }
78
79 SO_5_FUNC void
swap(environment_params_t & a,environment_params_t & b)80 swap( environment_params_t & a, environment_params_t & b )
81 {
82 using std::swap;
83
84 swap( a.m_timer_thread_factory, b.m_timer_thread_factory );
85 swap( a.m_so_layers, b.m_so_layers );
86 swap( a.m_coop_listener, b.m_coop_listener );
87 swap( a.m_event_exception_logger, b.m_event_exception_logger );
88
89 swap( a.m_exception_reaction, b.m_exception_reaction );
90 swap( a.m_autoshutdown_disabled, b.m_autoshutdown_disabled );
91
92 swap( a.m_error_logger, b.m_error_logger );
93 swap( a.m_message_delivery_tracer, b.m_message_delivery_tracer );
94 swap( a.m_message_delivery_tracer_filter, b.m_message_delivery_tracer_filter );
95
96 swap( a.m_work_thread_activity_tracking, b.m_work_thread_activity_tracking );
97
98 swap( a.m_queue_locks_defaults_manager, b.m_queue_locks_defaults_manager );
99
100 swap( a.m_infrastructure_factory, b.m_infrastructure_factory );
101
102 swap( a.m_event_queue_hook, b.m_event_queue_hook );
103 }
104
105 environment_params_t &
timer_thread(so_5::timer_thread_factory_t factory)106 environment_params_t::timer_thread(
107 so_5::timer_thread_factory_t factory )
108 {
109 m_timer_thread_factory = std::move( factory );
110 return *this;
111 }
112
113 environment_params_t &
coop_listener(coop_listener_unique_ptr_t coop_listener)114 environment_params_t::coop_listener(
115 coop_listener_unique_ptr_t coop_listener )
116 {
117 m_coop_listener = std::move( coop_listener );
118 return *this;
119 }
120
121 environment_params_t &
event_exception_logger(event_exception_logger_unique_ptr_t logger)122 environment_params_t::event_exception_logger(
123 event_exception_logger_unique_ptr_t logger )
124 {
125 if( nullptr != logger.get() )
126 m_event_exception_logger = std::move( logger );
127
128 return *this;
129 }
130
131 void
add_layer(const std::type_index & type,layer_unique_ptr_t layer_ptr)132 environment_params_t::add_layer(
133 const std::type_index & type,
134 layer_unique_ptr_t layer_ptr )
135 {
136 m_so_layers[ type ] = layer_ref_t( layer_ptr.release() );
137 }
138
139 namespace
140 {
141
142 /*!
143 * \brief A bunch of data sources for core objects.
144 * \since
145 * v.5.5.4
146 */
147 class core_data_sources_t
148 {
149 public :
core_data_sources_t(outliving_reference_t<stats::repository_t> ds_repository,impl::mbox_core_t & mbox_repository,so_5::environment_infrastructure_t & infrastructure)150 core_data_sources_t(
151 outliving_reference_t< stats::repository_t > ds_repository,
152 impl::mbox_core_t & mbox_repository,
153 so_5::environment_infrastructure_t & infrastructure )
154 : m_mbox_repository( ds_repository, mbox_repository )
155 , m_coop_repository( ds_repository, infrastructure )
156 , m_timer_thread( ds_repository, infrastructure )
157 {}
158
159 private :
160 //! Data source for mboxes repository.
161 stats::auto_registered_source_holder_t<
162 stats::impl::ds_mbox_core_stats_t >
163 m_mbox_repository;
164
165 //! Data source for cooperations repository.
166 stats::auto_registered_source_holder_t<
167 stats::impl::ds_agent_core_stats_t >
168 m_coop_repository;
169
170 //! Data source for timer thread.
171 stats::auto_registered_source_holder_t<
172 stats::impl::ds_timer_thread_stats_t >
173 m_timer_thread;
174 };
175
176 /*!
177 * \brief Helper function for creation of appropriate manager
178 * object if necessary.
179 *
180 * \since
181 * v.5.5.18
182 */
183 queue_locks_defaults_manager_unique_ptr_t
ensure_locks_defaults_manager_exists(queue_locks_defaults_manager_unique_ptr_t current)184 ensure_locks_defaults_manager_exists(
185 //! The current value. Note: can be nullptr.
186 queue_locks_defaults_manager_unique_ptr_t current )
187 {
188 queue_locks_defaults_manager_unique_ptr_t result( std::move(current) );
189
190 if( !result )
191 result = make_defaults_manager_for_combined_locks();
192
193 return result;
194 }
195
196 //
197 // default_event_queue_hook_t
198 //
199 /*!
200 * \brief Default implementation of event_queue_hook.
201 *
202 * Do nothing.
203 *
204 * \since
205 * v.5.5.24
206 */
207 class default_event_queue_hook_t final : public event_queue_hook_t
208 {
209 public :
210 [[nodiscard]]
211 event_queue_t *
on_bind(agent_t *,event_queue_t * original_queue)212 on_bind(
213 agent_t * /*agent*/,
214 event_queue_t * original_queue ) noexcept override
215 {
216 return original_queue;
217 }
218
219 void
on_unbind(agent_t *,event_queue_t *)220 on_unbind(
221 agent_t * /*agent*/,
222 event_queue_t * /*queue*/ ) noexcept override
223 {
224 }
225 };
226
227 /*!
228 * \brief Helper function for creation of appropriate event_queue_hook
229 * object if necessary.
230 *
231 * \since
232 * v.5.5.24
233 */
234 [[nodiscard]]
235 event_queue_hook_unique_ptr_t
ensure_event_queue_hook_exists(event_queue_hook_unique_ptr_t current)236 ensure_event_queue_hook_exists(
237 //! The current value. Note: can be nullptr.
238 event_queue_hook_unique_ptr_t current )
239 {
240 event_queue_hook_unique_ptr_t result( std::move(current) );
241
242 if( !result )
243 result = make_event_queue_hook< default_event_queue_hook_t >(
244 &event_queue_hook_t::default_deleter );
245
246 return result;
247 }
248
249 } /* namespace anonymous */
250
251 //
252 // environment_t::internals_t
253 //
254 /*!
255 * \since
256 * v.5.5.0
257 *
258 * \brief Internal details of SObjectizer Environment object.
259 */
260 struct environment_t::internals_t
261 {
262 /*!
263 * \since
264 * v.5.5.0
265 *
266 * \brief Error logger object for this environment.
267 *
268 * \attention Must be the first attribute of the object!
269 * It must be created and initilized first and destroyed last.
270 */
271 error_logger_shptr_t m_error_logger;
272
273 /*!
274 * \brief Holder of stuff related to message delivery tracing.
275 *
276 * \attention This field must be declared and initialized
277 * before m_mbox_core because a reference to that object will be passed
278 * to the constructor of m_mbox_core.
279 *
280 * \since
281 * v.5.5.22
282 */
283 so_5::msg_tracing::impl::std_holder_t m_msg_tracing_stuff;
284
285 //! An utility for mboxes.
286 impl::mbox_core_ref_t m_mbox_core;
287
288 /*!
289 * \brief A repository of stop_guards.
290 *
291 * \since
292 * v.5.5.19.2
293 */
294 impl::stop_guard_repository_t m_stop_guards;
295
296 /*!
297 * \brief A specific infrastructure for environment.
298 *
299 * Note: infrastructure takes care about coop repository,
300 * timer threads/managers and default dispatcher.
301 *
302 * \since
303 * v.5.5.19
304 */
305 environment_infrastructure_unique_ptr_t m_infrastructure;
306
307 //! An utility for layers.
308 impl::layer_core_t m_layer_core;
309
310 /*!
311 * \brief An exception reaction for the whole SO Environment.
312 * \since
313 * v.5.3.0
314 */
315 const exception_reaction_t m_exception_reaction;
316
317 /*!
318 * \brief Is autoshutdown when there is no more cooperation disabled?
319 *
320 * \see environment_params_t::disable_autoshutdown()
321 *
322 * \since
323 * v.5.4.0
324 */
325 const bool m_autoshutdown_disabled;
326
327 /*!
328 * \brief Data sources for core objects.
329 *
330 * \attention This instance must be created after stats_controller
331 * and destroyed before it. Because of that m_core_data_sources declared
332 * after m_stats_controller and after all corresponding objects.
333 * NOTE: since v.5.5.19 stats_controller and stats_repository are parts
334 * of environment_infrastructure. Because of that m_core_data_sources
335 * declared and created after m_infrastructure.
336 *
337 * \since
338 * v.5.5.4
339 */
340 core_data_sources_t m_core_data_sources;
341
342 /*!
343 * \brief Work thread activity tracking for the whole Environment.
344 * \since
345 * v.5.5.18
346 */
347 work_thread_activity_tracking_t m_work_thread_activity_tracking;
348
349 /*!
350 * \brief Manager for defaults of queue locks.
351 *
352 * \since
353 * v.5.5.18
354 */
355 queue_locks_defaults_manager_unique_ptr_t m_queue_locks_defaults_manager;
356
357 /*!
358 * \brief Actual event_queue_hook.
359 *
360 * \note
361 * If there is no event_queue_hook in environment_params_t then
362 * an instance of default_event_queue_hook_t will be created and used.
363 *
364 * \since
365 * v.5.5.24
366 */
367 event_queue_hook_unique_ptr_t m_event_queue_hook;
368
369 /*!
370 * \brief Lock object for protection of exception logger object.
371 *
372 * \note
373 * Manipulations with m_event_exception_logger are performed
374 * only under that lock.
375 *
376 * \since
377 * v.5.6.0
378 */
379 std::mutex m_event_exception_logger_lock;
380
381 /*!
382 * \brief Logger for exceptions thrown from event-handlers.
383 *
384 * \since
385 * v.5.6.0
386 */
387 event_exception_logger_unique_ptr_t m_event_exception_logger;
388
389 //! Constructor.
internals_tso_5::environment_t::internals_t390 internals_t(
391 environment_t & env,
392 environment_params_t && params )
393 : m_error_logger( params.so5__error_logger() )
394 , m_msg_tracing_stuff{
395 params.so5__giveout_message_delivery_tracer_filter(),
396 params.so5__giveout_message_delivery_tracer() }
397 , m_mbox_core(
398 new impl::mbox_core_t{
399 outliving_mutable( m_msg_tracing_stuff ) } )
400 , m_infrastructure(
401 (params.infrastructure_factory())(
402 env,
403 params,
404 // A special mbox for distributing monitoring information
405 // must be created and passed to stats_controller.
406 m_mbox_core->create_mbox(env) ) )
407 , m_layer_core(
408 env,
409 params.so5__layers_map() )
410 , m_exception_reaction( params.exception_reaction() )
411 , m_autoshutdown_disabled( params.autoshutdown_disabled() )
412 , m_core_data_sources(
413 outliving_mutable(m_infrastructure->stats_repository()),
414 *m_mbox_core,
415 *m_infrastructure )
416 , m_work_thread_activity_tracking(
417 params.work_thread_activity_tracking() )
418 , m_queue_locks_defaults_manager(
419 ensure_locks_defaults_manager_exists(
420 params.so5__giveout_queue_locks_defaults_manager() ) )
421 , m_event_queue_hook(
422 ensure_event_queue_hook_exists(
423 params.so5__giveout_event_queue_hook() ) )
424 , m_event_exception_logger{
425 params.so5__giveout_event_exception_logger() }
426 {}
427 };
428
429 //
430 // environment_t
431 //
432
433 environment_t &
self_ref()434 environment_t::self_ref()
435 {
436 return *this;
437 }
438
439
environment_t(environment_params_t && params)440 environment_t::environment_t(
441 environment_params_t && params )
442 : m_impl( new internals_t( self_ref(), std::move(params) ) )
443 {
444 }
445
~environment_t()446 environment_t::~environment_t()
447 {
448 }
449
450 mbox_t
create_mbox()451 environment_t::create_mbox()
452 {
453 return m_impl->m_mbox_core->create_mbox( *this );
454 }
455
456 mbox_t
create_mbox(nonempty_name_t nonempty_name)457 environment_t::create_mbox(
458 nonempty_name_t nonempty_name )
459 {
460 return m_impl->m_mbox_core->create_mbox( *this, std::move(nonempty_name) );
461 }
462
463 mchain_t
create_mchain(const mchain_params_t & params)464 environment_t::create_mchain(
465 const mchain_params_t & params )
466 {
467 return m_impl->m_mbox_core->create_mchain( *this, params );
468 }
469
470 void
install_exception_logger(event_exception_logger_unique_ptr_t logger)471 environment_t::install_exception_logger(
472 event_exception_logger_unique_ptr_t logger )
473 {
474 if( logger )
475 {
476 std::lock_guard< std::mutex > lock{
477 m_impl->m_event_exception_logger_lock };
478
479 using std::swap;
480 swap( m_impl->m_event_exception_logger, logger );
481
482 m_impl->m_event_exception_logger->on_install( std::move( logger ) );
483 }
484 }
485
486 [[nodiscard]]
487 coop_unique_holder_t
make_coop()488 environment_t::make_coop()
489 {
490 return m_impl->m_infrastructure->make_coop(
491 coop_handle_t{}, // No parent.
492 so_make_default_disp_binder() );
493 }
494
495 [[nodiscard]]
496 coop_unique_holder_t
make_coop(disp_binder_shptr_t disp_binder)497 environment_t::make_coop(
498 disp_binder_shptr_t disp_binder )
499 {
500 return m_impl->m_infrastructure->make_coop(
501 coop_handle_t{}, // No parent.
502 std::move(disp_binder) );
503 }
504
505 [[nodiscard]]
506 coop_unique_holder_t
make_coop(coop_handle_t parent)507 environment_t::make_coop(
508 coop_handle_t parent )
509 {
510 return m_impl->m_infrastructure->make_coop(
511 std::move(parent),
512 so_make_default_disp_binder() );
513 }
514
515 [[nodiscard]]
516 coop_unique_holder_t
make_coop(coop_handle_t parent,disp_binder_shptr_t disp_binder)517 environment_t::make_coop(
518 coop_handle_t parent,
519 disp_binder_shptr_t disp_binder )
520 {
521 return m_impl->m_infrastructure->make_coop(
522 std::move(parent),
523 std::move(disp_binder) );
524 }
525
526 coop_handle_t
register_coop(coop_unique_holder_t agent_coop)527 environment_t::register_coop(
528 coop_unique_holder_t agent_coop )
529 {
530 return m_impl->m_infrastructure->register_coop( std::move( agent_coop ) );
531 }
532
533 so_5::timer_id_t
so_schedule_timer(const low_level_api::schedule_timer_params_t params)534 environment_t::so_schedule_timer(
535 const low_level_api::schedule_timer_params_t params )
536 {
537 // Since v.5.5.21 pause and period should be checked for negative
538 // values.
539 using duration = std::chrono::steady_clock::duration;
540 if( params.m_pause < duration::zero() )
541 SO_5_THROW_EXCEPTION(
542 so_5::rc_negative_value_for_pause,
543 "an attempt to call schedule_timer() with negative pause value" );
544 if( params.m_period < duration::zero() )
545 SO_5_THROW_EXCEPTION(
546 so_5::rc_negative_value_for_period,
547 "an attempt to call schedule_timer() with negative period value" );
548
549 // If it is a mutable message then there must be some restrictions:
550 if( message_mutability_t::mutable_message == message_mutability(params.m_msg) )
551 {
552 // Mutable message can be sent only as delayed message.
553 if( std::chrono::steady_clock::duration::zero() != params.m_period )
554 SO_5_THROW_EXCEPTION(
555 so_5::rc_mutable_msg_cannot_be_periodic,
556 "unable to schedule periodic timer for mutable message,"
557 " msg_type=" + std::string(params.m_msg_type.name()) );
558 // Mutable message can't be passed to MPMC-mbox.
559 else if( mbox_type_t::multi_producer_multi_consumer == params.m_mbox->type() )
560 SO_5_THROW_EXCEPTION(
561 so_5::rc_mutable_msg_cannot_be_delivered_via_mpmc_mbox,
562 "unable to schedule timer for mutable message and "
563 "MPMC mbox, msg_type=" + std::string(params.m_msg_type.name()) );
564 }
565
566 return m_impl->m_infrastructure->schedule_timer(
567 params.m_msg_type,
568 params.m_msg,
569 params.m_mbox,
570 params.m_pause,
571 params.m_period );
572 }
573
574 void
so_single_timer(const low_level_api::single_timer_params_t params)575 environment_t::so_single_timer(
576 const low_level_api::single_timer_params_t params )
577 {
578 // Since v.5.5.21 pause should be checked for negative values.
579 using duration = std::chrono::steady_clock::duration;
580 if( params.m_pause < duration::zero() )
581 SO_5_THROW_EXCEPTION(
582 so_5::rc_negative_value_for_pause,
583 "an attempt to call single_timer() with negative pause value" );
584
585 // Mutable message can't be passed to MPMC-mbox.
586 if( message_mutability_t::mutable_message == message_mutability(params.m_msg) &&
587 mbox_type_t::multi_producer_multi_consumer == params.m_mbox->type() )
588 SO_5_THROW_EXCEPTION(
589 so_5::rc_mutable_msg_cannot_be_delivered_via_mpmc_mbox,
590 "unable to schedule single timer for mutable message and "
591 "MPMC mbox, msg_type=" + std::string(params.m_msg_type.name()) );
592
593 m_impl->m_infrastructure->single_timer(
594 params.m_msg_type,
595 params.m_msg,
596 params.m_mbox,
597 params.m_pause );
598 }
599
600 layer_t *
query_layer(const std::type_index & type) const601 environment_t::query_layer(
602 const std::type_index & type ) const
603 {
604 return m_impl->m_layer_core.query_layer( type );
605 }
606
607 void
add_extra_layer(const std::type_index & type,const layer_ref_t & layer)608 environment_t::add_extra_layer(
609 const std::type_index & type,
610 const layer_ref_t & layer )
611 {
612 m_impl->m_layer_core.add_extra_layer( type, layer );
613 }
614
615 void
run()616 environment_t::run()
617 {
618 try
619 {
620 impl__run_stats_controller_and_go_further();
621 }
622 catch( const so_5::exception_t & )
623 {
624 // Rethrow our exception because it already has all information.
625 throw;
626 }
627 catch( const std::exception & x )
628 {
629 SO_5_THROW_EXCEPTION(
630 rc_environment_error,
631 std::string( "some unexpected error during "
632 "environment launching: " ) + x.what() );
633 }
634 }
635
636 void
stop()637 environment_t::stop()
638 {
639 // Since v.5.5.19.2 there is a new shutdown procedure:
640 const auto action = m_impl->m_stop_guards.initiate_stop();
641 if( impl::stop_guard_repository_t::action_t::do_actual_stop == action )
642 m_impl->m_infrastructure->stop();
643 }
644
645 void
call_exception_logger(const std::exception & event_exception,const coop_handle_t & coop)646 environment_t::call_exception_logger(
647 const std::exception & event_exception,
648 const coop_handle_t & coop ) noexcept
649 {
650 std::lock_guard< std::mutex > lock{ m_impl->m_event_exception_logger_lock };
651
652 m_impl->m_event_exception_logger->log_exception( event_exception, coop );
653 }
654
655 exception_reaction_t
exception_reaction() const656 environment_t::exception_reaction() const
657 {
658 return m_impl->m_exception_reaction;
659 }
660
661 error_logger_t &
error_logger() const662 environment_t::error_logger() const
663 {
664 return *(m_impl->m_error_logger);
665 }
666
667 stats::controller_t &
stats_controller()668 environment_t::stats_controller()
669 {
670 return m_impl->m_infrastructure->stats_controller();
671 }
672
673 stats::repository_t &
stats_repository()674 environment_t::stats_repository()
675 {
676 return m_impl->m_infrastructure->stats_repository();
677 }
678
679 work_thread_activity_tracking_t
work_thread_activity_tracking() const680 environment_t::work_thread_activity_tracking() const
681 {
682 return m_impl->m_work_thread_activity_tracking;
683 }
684
685 disp_binder_shptr_t
so_make_default_disp_binder()686 environment_t::so_make_default_disp_binder()
687 {
688 return m_impl->m_infrastructure->make_default_disp_binder();
689 }
690
691 bool
autoshutdown_disabled() const692 environment_t::autoshutdown_disabled() const
693 {
694 return m_impl->m_autoshutdown_disabled;
695 }
696
697 mbox_t
do_make_custom_mbox(custom_mbox_details::creator_iface_t & creator)698 environment_t::do_make_custom_mbox(
699 custom_mbox_details::creator_iface_t & creator )
700 {
701 return m_impl->m_mbox_core->create_custom_mbox( *this, creator );
702 }
703
704 stop_guard_t::setup_result_t
setup_stop_guard(stop_guard_shptr_t guard,stop_guard_t::what_if_stop_in_progress_t reaction_on_stop_in_progress)705 environment_t::setup_stop_guard(
706 stop_guard_shptr_t guard,
707 stop_guard_t::what_if_stop_in_progress_t reaction_on_stop_in_progress )
708 {
709 const auto result = m_impl->m_stop_guards.setup_guard( std::move(guard) );
710 if( stop_guard_t::setup_result_t::stop_already_in_progress == result
711 && stop_guard_t::what_if_stop_in_progress_t::throw_exception ==
712 reaction_on_stop_in_progress )
713 {
714 SO_5_THROW_EXCEPTION(
715 rc_cannot_set_stop_guard_when_stop_is_started,
716 "stop_guard can't be set because the stop operation is "
717 "already in progress" );
718 }
719
720 return result;
721 }
722
723 void
remove_stop_guard(stop_guard_shptr_t guard)724 environment_t::remove_stop_guard(
725 stop_guard_shptr_t guard )
726 {
727 const auto action = m_impl->m_stop_guards.remove_guard( std::move(guard) );
728 if( impl::stop_guard_repository_t::action_t::do_actual_stop == action )
729 m_impl->m_infrastructure->stop();
730 }
731
732 void
change_message_delivery_tracer_filter(so_5::msg_tracing::filter_shptr_t filter)733 environment_t::change_message_delivery_tracer_filter(
734 so_5::msg_tracing::filter_shptr_t filter )
735 {
736 if( !m_impl->m_msg_tracing_stuff.is_msg_tracing_enabled() )
737 SO_5_THROW_EXCEPTION(
738 rc_msg_tracing_disabled,
739 "msg_tracing's filter can't be changed when msg_tracing "
740 "is disabled" );
741
742 m_impl->m_msg_tracing_stuff.change_filter( std::move(filter) );
743 }
744
745 void
impl__run_stats_controller_and_go_further()746 environment_t::impl__run_stats_controller_and_go_further()
747 {
748 impl::run_stage(
749 "run_stats_controller",
750 [] {
751 /* there is no need to turn_on controller automatically */
752 },
753 [this] { m_impl->m_infrastructure->stats_controller().turn_off(); },
754 [this] { impl__run_layers_and_go_further(); } );
755 }
756
757 void
impl__run_layers_and_go_further()758 environment_t::impl__run_layers_and_go_further()
759 {
760 impl::run_stage(
761 "run_layers",
762 [this] { m_impl->m_layer_core.start(); },
763 [this] { m_impl->m_layer_core.finish(); },
764 [this] { impl__run_infrastructure(); } );
765 }
766
767 namespace
768 {
769 class autoshutdown_guard_t final
770 {
771 environment_t & m_env;
772 const bool m_autoshutdown_disabled;
773 coop_handle_t m_guard_coop;
774
775 public :
autoshutdown_guard_t(environment_t & env,bool autoshutdown_disabled)776 autoshutdown_guard_t(
777 environment_t & env,
778 bool autoshutdown_disabled )
779 : m_env{ env }
780 , m_autoshutdown_disabled{ autoshutdown_disabled }
781 {
782 if( !m_autoshutdown_disabled )
783 {
784 m_guard_coop = env.register_coop( env.make_coop() );
785 }
786 }
787
~autoshutdown_guard_t()788 ~autoshutdown_guard_t()
789 {
790 if( !m_autoshutdown_disabled )
791 m_env.deregister_coop( m_guard_coop, dereg_reason::normal );
792 }
793 };
794
795 } /* namespace anonymous */
796
797 void
impl__run_infrastructure()798 environment_t::impl__run_infrastructure()
799 {
800 m_impl->m_infrastructure->launch(
801 [this]()
802 {
803 // init method must be protected from autoshutdown feature.
804 autoshutdown_guard_t guard{
805 *this,
806 m_impl->m_autoshutdown_disabled };
807
808 // Initilizing environment.
809 init();
810 } );
811 }
812
813 namespace impl
814 {
815
816 mbox_t
create_mpsc_mbox(agent_t * single_consumer,const so_5::message_limit::impl::info_storage_t * limits_storage)817 internal_env_iface_t::create_mpsc_mbox(
818 agent_t * single_consumer,
819 const so_5::message_limit::impl::info_storage_t * limits_storage )
820 {
821 return m_env.m_impl->m_mbox_core->create_mpsc_mbox(
822 single_consumer,
823 limits_storage );
824 }
825
826 void
ready_to_deregister_notify(coop_shptr_t coop)827 internal_env_iface_t::ready_to_deregister_notify(
828 coop_shptr_t coop ) noexcept
829 {
830 m_env.m_impl->m_infrastructure->ready_to_deregister_notify( std::move(coop) );
831 }
832
833 void
final_deregister_coop(coop_shptr_t coop)834 internal_env_iface_t::final_deregister_coop(
835 coop_shptr_t coop )
836 {
837 bool any_cooperation_alive =
838 m_env.m_impl->m_infrastructure->final_deregister_coop(
839 std::move(coop) );
840
841 if( !any_cooperation_alive && !m_env.m_impl->m_autoshutdown_disabled )
842 m_env.stop();
843 }
844
845 bool
is_msg_tracing_enabled() const846 internal_env_iface_t::is_msg_tracing_enabled() const
847 {
848 return m_env.m_impl->m_msg_tracing_stuff.is_msg_tracing_enabled();
849 }
850
851 [[nodiscard]] so_5::msg_tracing::holder_t &
msg_tracing_stuff() const852 internal_env_iface_t::msg_tracing_stuff() const
853 {
854 if( !is_msg_tracing_enabled() )
855 SO_5_THROW_EXCEPTION( rc_msg_tracing_disabled,
856 "msg_tracer cannot be accessed because msg_tracing is disabled" );
857
858 return m_env.m_impl->m_msg_tracing_stuff;
859 }
860
861 [[nodiscard]] so_5::msg_tracing::holder_t &
msg_tracing_stuff_nonchecked() const862 internal_env_iface_t::msg_tracing_stuff_nonchecked() const noexcept
863 {
864 return m_env.m_impl->m_msg_tracing_stuff;
865 }
866
867 so_5::disp::mpsc_queue_traits::lock_factory_t
default_mpsc_queue_lock_factory() const868 internal_env_iface_t::default_mpsc_queue_lock_factory() const
869 {
870 return m_env.m_impl->m_queue_locks_defaults_manager->
871 mpsc_queue_lock_factory();
872 }
873
874 so_5::disp::mpmc_queue_traits::lock_factory_t
default_mpmc_queue_lock_factory() const875 internal_env_iface_t::default_mpmc_queue_lock_factory() const
876 {
877 return m_env.m_impl->m_queue_locks_defaults_manager->
878 mpmc_queue_lock_factory();
879 }
880
881 [[nodiscard]]
882 event_queue_t *
event_queue_on_bind(agent_t * agent,event_queue_t * original_queue)883 internal_env_iface_t::event_queue_on_bind(
884 agent_t * agent,
885 event_queue_t * original_queue ) noexcept
886 {
887 return m_env.m_impl->m_event_queue_hook->on_bind( agent, original_queue );
888 }
889
890 void
event_queue_on_unbind(agent_t * agent,event_queue_t * queue)891 internal_env_iface_t::event_queue_on_unbind(
892 agent_t * agent,
893 event_queue_t * queue ) noexcept
894 {
895 m_env.m_impl->m_event_queue_hook->on_unbind( agent, queue );
896 }
897
898 [[nodiscard]] mbox_id_t
allocate_mbox_id()899 internal_env_iface_t::allocate_mbox_id() noexcept
900 {
901 return m_env.m_impl->m_mbox_core->allocate_mbox_id();
902 }
903
904 } /* namespace impl */
905
906 } /* namespace so_5 */
907
908