1 // This file is part of CAF, the C++ Actor Framework. See the file LICENSE in
2 // the main distribution directory for license terms and copyright or visit
3 // https://github.com/actor-framework/actor-framework/blob/master/LICENSE.
4 
5 #pragma once
6 
7 #include <array>
8 #include <atomic>
9 #include <condition_variable>
10 #include <cstddef>
11 #include <functional>
12 #include <memory>
13 #include <mutex>
14 #include <string>
15 #include <thread>
16 #include <typeinfo>
17 
18 #include "caf/abstract_actor.hpp"
19 #include "caf/actor_cast.hpp"
20 #include "caf/actor_clock.hpp"
21 #include "caf/actor_config.hpp"
22 #include "caf/actor_profiler.hpp"
23 #include "caf/actor_registry.hpp"
24 #include "caf/actor_traits.hpp"
25 #include "caf/detail/core_export.hpp"
26 #include "caf/detail/init_fun_factory.hpp"
27 #include "caf/detail/private_thread_pool.hpp"
28 #include "caf/detail/set_thread_name.hpp"
29 #include "caf/detail/spawn_fwd.hpp"
30 #include "caf/detail/spawnable.hpp"
31 #include "caf/fwd.hpp"
32 #include "caf/group_manager.hpp"
33 #include "caf/infer_handle.hpp"
34 #include "caf/is_typed_actor.hpp"
35 #include "caf/logger.hpp"
36 #include "caf/make_actor.hpp"
37 #include "caf/prohibit_top_level_spawn_marker.hpp"
38 #include "caf/scoped_execution_unit.hpp"
39 #include "caf/spawn_options.hpp"
40 #include "caf/string_algorithms.hpp"
41 #include "caf/telemetry/metric_registry.hpp"
42 #include "caf/type_id.hpp"
43 
44 namespace caf::detail {
45 
46 template <class>
47 struct typed_mpi_access;
48 
49 template <class Out, class... In>
50 struct typed_mpi_access<Out(In...)> {
operator ()caf::detail::typed_mpi_access51   std::string operator()() const {
52     static_assert(sizeof...(In) > 0, "typed MPI without inputs");
53     std::vector<std::string> inputs{type_name_v<In>...};
54     std::string result = "(";
55     result += join(inputs, ",");
56     result += ") -> ";
57     result += type_name_v<Out>;
58     return result;
59   }
60 };
61 
62 template <class... Out, class... In>
63 struct typed_mpi_access<result<Out...>(In...)> {
64   std::string operator()() const {
65     static_assert(sizeof...(In) > 0, "typed MPI without inputs");
66     static_assert(sizeof...(Out) > 0, "typed MPI without outputs");
67     std::vector<std::string> inputs{to_string(type_name_v<In>)...};
68     std::vector<std::string> outputs1{to_string(type_name_v<Out>)...};
69     std::string result = "(";
70     result += join(inputs, ",");
71     result += ") -> (";
72     result += join(outputs1, ",");
73     result += ")";
74     return result;
75   }
76 };
77 
78 template <class T>
get_rtti_from_mpi()79 std::string get_rtti_from_mpi() {
80   typed_mpi_access<T> f;
81   return f();
82 }
83 
84 } // namespace caf::detail
85 
86 namespace caf {
87 
88 /// Actor environment including scheduler, registry, and optional components
89 /// such as a middleman.
90 class CAF_CORE_EXPORT actor_system {
91 public:
92   friend class logger;
93   friend class io::middleman;
94   friend class net::middleman;
95   friend class abstract_actor;
96 
97   /// Returns the internal actor for dynamic spawn operations.
spawn_serv() const98   const strong_actor_ptr& spawn_serv() const {
99     return spawn_serv_;
100   }
101 
102   /// Returns the internal actor for storing the runtime configuration
103   /// for this actor system.
config_serv() const104   const strong_actor_ptr& config_serv() const {
105     return config_serv_;
106   }
107 
108   actor_system() = delete;
109   actor_system(const actor_system&) = delete;
110   actor_system& operator=(const actor_system&) = delete;
111 
112   /// An (optional) component of the actor system.
113   class CAF_CORE_EXPORT module {
114   public:
115     enum id_t {
116       scheduler,
117       middleman,
118       openssl_manager,
119       network_manager,
120       num_ids
121     };
122 
123     virtual ~module();
124 
125     /// Returns the human-redable name of the module.
126     const char* name() const noexcept;
127 
128     /// Starts any background threads needed by the module.
129     virtual void start() = 0;
130 
131     /// Stops all background threads of the module.
132     virtual void stop() = 0;
133 
134     /// Allows the module to change the
135     /// configuration of the actor system during startup.
136     virtual void init(actor_system_config&) = 0;
137 
138     /// Returns the identifier of this module.
139     virtual id_t id() const = 0;
140 
141     /// Returns a pointer to the subtype.
142     virtual void* subtype_ptr() = 0;
143   };
144 
145   using module_ptr = std::unique_ptr<module>;
146 
147   using module_array = std::array<module_ptr, module::num_ids>;
148 
149   /// An (optional) component of the actor system with networking capabilities.
150   class CAF_CORE_EXPORT networking_module : public module {
151   public:
152     ~networking_module() override;
153 
154     /// Causes the module to send a `node_down_msg` to `observer` if this system
155     /// loses connection to `node`.
156     virtual void monitor(const node_id& node, const actor_addr& observer) = 0;
157 
158     /// Causes the module remove one entry for `observer` from the list of
159     /// actors that receive a `node_down_msg` if this system loses connection to
160     /// `node`. Each call to `monitor` requires one call to `demonitor` in order
161     /// to unsubscribe the `observer` completely.
162     virtual void demonitor(const node_id& node, const actor_addr& observer) = 0;
163   };
164 
165   /// Metrics that the actor system collects by default.
166   /// @warning Do not modify these metrics in user code. Some may be used by the
167   ///          system for synchronization.
168   struct base_metrics_t {
169     /// Counts the number of messages that where rejected because the target
170     /// mailbox was closed or did not exist.
171     telemetry::int_counter* rejected_messages;
172 
173     /// Counts the total number of processed messages.
174     telemetry::int_counter* processed_messages;
175 
176     /// Tracks the current number of running actors in the system.
177     telemetry::int_gauge* running_actors;
178 
179     /// Counts the total number of messages that wait in a mailbox.
180     telemetry::int_gauge* queued_messages;
181   };
182 
183   /// Metrics that some actors may collect in addition to the base metrics. All
184   /// families in this set use the label dimension *name* (the user-defined name
185   /// of the actor).
186   struct actor_metric_families_t {
187     /// Samples how long the actor needs to process messages.
188     telemetry::dbl_histogram_family* processing_time = nullptr;
189 
190     /// Samples how long a message waits in the mailbox before the actor
191     /// processes it.
192     telemetry::dbl_histogram_family* mailbox_time = nullptr;
193 
194     /// Counts how many messages are currently waiting in the mailbox.
195     telemetry::int_gauge_family* mailbox_size = nullptr;
196 
197     struct {
198       // -- inbound ------------------------------------------------------------
199 
200       /// Counts the total number of processed stream elements from upstream.
201       telemetry::int_counter_family* processed_elements = nullptr;
202 
203       /// Tracks how many stream elements from upstream are currently buffered.
204       telemetry::int_gauge_family* input_buffer_size = nullptr;
205 
206       // -- outbound -----------------------------------------------------------
207 
208       /// Counts the total number of elements that have been pushed downstream.
209       telemetry::int_counter_family* pushed_elements = nullptr;
210 
211       /// Tracks how many stream elements are currently waiting in the output
212       /// buffer due to insufficient credit.
213       telemetry::int_gauge_family* output_buffer_size = nullptr;
214     }
215 
216     /// Wraps streaming-related actor metric families.
217     stream;
218   };
219 
220   /// @warning The system stores a reference to `cfg`, which means the
221   ///          config object must outlive the actor system.
222   explicit actor_system(actor_system_config& cfg);
223 
224   virtual ~actor_system();
225 
226   /// A message passing interface (MPI) in run-time checkable representation.
227   using mpi = std::set<std::string>;
228 
229   template <class T,
230             class E = typename std::enable_if<!is_typed_actor<T>::value>::type>
message_types(detail::type_list<T>) const231   mpi message_types(detail::type_list<T>) const {
232     return mpi{};
233   }
234 
235   template <class... Ts>
message_types(detail::type_list<typed_actor<Ts...>>) const236   mpi message_types(detail::type_list<typed_actor<Ts...>>) const {
237     static_assert(sizeof...(Ts) > 0, "empty typed actor handle given");
238     mpi result{detail::get_rtti_from_mpi<Ts>()...};
239     return result;
240   }
241 
242   template <class T,
243             class E
244             = typename std::enable_if<!detail::is_type_list<T>::value>::type>
message_types(const T &) const245   mpi message_types(const T&) const {
246     detail::type_list<T> token;
247     return message_types(token);
248   }
249 
250   /// Returns a string representation of the messaging
251   /// interface using portable names;
252   template <class T>
message_types() const253   mpi message_types() const {
254     detail::type_list<T> token;
255     return message_types(token);
256   }
257 
258   /// Returns whether actor handles described by `xs`
259   /// can be assigned to actor handles described by `ys`.
260   /// @experimental
assignable(const mpi & xs,const mpi & ys) const261   bool assignable(const mpi& xs, const mpi& ys) const {
262     if (ys.empty())
263       return xs.empty();
264     if (xs.size() == ys.size())
265       return xs == ys;
266     return std::includes(xs.begin(), xs.end(), ys.begin(), ys.end());
267   }
268 
269   /// Returns whether actor handles described by `xs`
270   /// can be assigned to actor handles of type `T`.
271   /// @experimental
272   template <class T>
assignable(const std::set<std::string> & xs) const273   bool assignable(const std::set<std::string>& xs) const {
274     return assignable(xs, message_types<T>());
275   }
276 
277   /// Returns the metrics registry for this system.
metrics()278   telemetry::metric_registry& metrics() noexcept {
279     return metrics_;
280   }
281 
282   /// Returns the metrics registry for this system.
metrics() const283   const telemetry::metric_registry& metrics() const noexcept {
284     return metrics_;
285   }
286 
287   /// Returns the host-local identifier for this system.
node() const288   const node_id& node() const {
289     return node_;
290   }
291 
292   /// Returns the scheduler instance.
293   scheduler::abstract_coordinator& scheduler();
294 
295   /// Returns the system-wide event logger.
296   caf::logger& logger();
297 
298   /// Returns the system-wide actor registry.
299   actor_registry& registry();
300 
301   /// Returns a string representation for `err`.
302   [[deprecated("please use to_string() on the error")]] std::string
303   render(const error& x) const;
304 
305   /// Returns the system-wide group manager.
306   group_manager& groups();
307 
308   /// Returns `true` if the I/O module is available, `false` otherwise.
309   bool has_middleman() const;
310 
311   /// Returns the middleman instance from the I/O module.
312   /// @throws `std::logic_error` if module is not loaded.
313   io::middleman& middleman();
314 
315   /// Returns `true` if the openssl module is available, `false` otherwise.
316   bool has_openssl_manager() const;
317 
318   /// Returns the manager instance from the OpenSSL module.
319   /// @throws `std::logic_error` if module is not loaded.
320   openssl::manager& openssl_manager() const;
321 
322   /// Returns `true` if the network module is available, `false` otherwise.
323   bool has_network_manager() const noexcept;
324 
325   /// Returns the network manager (middleman) instance.
326   /// @throws `std::logic_error` if module is not loaded.
327   net::middleman& network_manager();
328 
329   /// Returns a dummy execution unit that forwards
330   /// everything to the scheduler.
331   scoped_execution_unit* dummy_execution_unit();
332 
333   /// Returns a new actor ID.
334   actor_id next_actor_id();
335 
336   /// Returns the last given actor ID.
337   actor_id latest_actor_id() const;
338 
339   /// Blocks this caller until all actors are done.
340   void await_all_actors_done() const;
341 
342   /// Send a `node_down_msg` to `observer` if this system loses connection to
343   /// `node`.
344   /// @note Calling this function *n* times causes the system to send
345   ///       `node_down_msg` *n* times to the observer. In order to not receive
346   ///       the messages, the observer must call `demonitor` *n* times.
347   void monitor(const node_id& node, const actor_addr& observer);
348 
349   /// Removes `observer` from the list of actors that receive a `node_down_msg`
350   /// if this system loses connection to `node`.
351   void demonitor(const node_id& node, const actor_addr& observer);
352 
353   /// Called by `spawn` when used to create a class-based actor to
354   /// apply automatic conversions to `xs` before spawning the actor.
355   /// Should not be called by users of the library directly.
356   /// @param cfg To-be-filled config for the actor.
357   /// @param xs Constructor arguments for `C`.
358   template <class C, spawn_options Os, class... Ts>
spawn_class(actor_config & cfg,Ts &&...xs)359   infer_handle_from_class_t<C> spawn_class(actor_config& cfg, Ts&&... xs) {
360     return spawn_impl<C, Os>(cfg, detail::spawn_fwd<Ts>(xs)...);
361   }
362 
363   /// Returns a new actor of type `C` using `xs...` as constructor
364   /// arguments. The behavior of `spawn` can be modified by setting `Os`, e.g.,
365   /// to opt-out of the cooperative scheduling.
366   /// @param xs Constructor arguments for `C`.
367   template <class C, spawn_options Os = no_spawn_options, class... Ts>
spawn(Ts &&...xs)368   infer_handle_from_class_t<C> spawn(Ts&&... xs) {
369     check_invariants<C>();
370     actor_config cfg;
371     return spawn_impl<C, Os>(cfg, detail::spawn_fwd<Ts>(xs)...);
372   }
373 
374   /// Called by `spawn` when used to create a functor-based actor to select a
375   /// proper implementation and then delegates to `spawn_impl`.
376   /// @param cfg To-be-filled config for the actor.
377   /// @param fun Function object for the actor's behavior; will be moved.
378   /// @param xs Arguments for `fun`.
379   /// @private
380   template <spawn_options Os = no_spawn_options, class F, class... Ts>
381   infer_handle_from_fun_t<F>
spawn_functor(std::true_type,actor_config & cfg,F & fun,Ts &&...xs)382   spawn_functor(std::true_type, actor_config& cfg, F& fun, Ts&&... xs) {
383     using impl = infer_impl_from_fun_t<F>;
384     detail::init_fun_factory<impl, F> fac;
385     cfg.init_fun = fac(std::move(fun), std::forward<Ts>(xs)...);
386     return spawn_impl<impl, Os>(cfg);
387   }
388 
389   /// Fallback no-op overload.
390   /// @private
391   template <spawn_options Os = no_spawn_options, class F, class... Ts>
392   infer_handle_from_fun_t<F>
spawn_functor(std::false_type,actor_config &,F &,Ts &&...)393   spawn_functor(std::false_type, actor_config&, F&, Ts&&...) {
394     return {};
395   }
396 
397   /// Returns a new functor-based actor. The first argument must be the functor,
398   /// the remainder of `xs...` is used to invoke the functor.
399   /// The behavior of `spawn` can be modified by setting `Os`, e.g.,
400   /// to opt-out of the cooperative scheduling.
401   template <spawn_options Os = no_spawn_options, class F, class... Ts>
spawn(F fun,Ts &&...xs)402   infer_handle_from_fun_t<F> spawn(F fun, Ts&&... xs) {
403     using impl = infer_impl_from_fun_t<F>;
404     check_invariants<impl>();
405     static constexpr bool spawnable = detail::spawnable<F, impl, Ts...>();
406     static_assert(spawnable,
407                   "cannot spawn function-based actor with given arguments");
408     actor_config cfg;
409     return spawn_functor<Os>(detail::bool_token<spawnable>{}, cfg, fun,
410                              std::forward<Ts>(xs)...);
411   }
412 
413   /// Returns a new actor with run-time type `name`, constructed
414   /// with the arguments stored in `args`.
415   /// @experimental
416   template <class Handle,
417             class E = typename std::enable_if<is_handle<Handle>::value>::type>
418   expected<Handle>
spawn(const std::string & name,message args,execution_unit * ctx=nullptr,bool check_interface=true,const mpi * expected_ifs=nullptr)419   spawn(const std::string& name, message args, execution_unit* ctx = nullptr,
420         bool check_interface = true, const mpi* expected_ifs = nullptr) {
421     mpi tmp;
422     if (check_interface && !expected_ifs) {
423       tmp = message_types<Handle>();
424       expected_ifs = &tmp;
425     }
426     auto res = dyn_spawn_impl(name, args, ctx, check_interface, expected_ifs);
427     if (!res)
428       return std::move(res.error());
429     return actor_cast<Handle>(std::move(*res));
430   }
431 
432   /// Spawns a class-based actor `T` immediately joining the groups in
433   /// range `[first, last)`.
434   /// @private
435   template <class T, spawn_options Os, class Iter, class... Ts>
436   infer_handle_from_class_t<T>
spawn_class_in_groups(actor_config & cfg,Iter first,Iter last,Ts &&...xs)437   spawn_class_in_groups(actor_config& cfg, Iter first, Iter last, Ts&&... xs) {
438     static_assert(std::is_same<infer_handle_from_class_t<T>, actor>::value,
439                   "only dynamically-typed actors can be spawned in a group");
440     check_invariants<T>();
441     auto irange = make_input_range(first, last);
442     cfg.groups = &irange;
443     return spawn_class<T, Os>(cfg, std::forward<Ts>(xs)...);
444   }
445 
446   /// Spawns a class-based actor `T` immediately joining the groups in
447   /// range `[first, last)`.
448   /// @private
449   template <spawn_options Os, class Iter, class F, class... Ts>
spawn_fun_in_groups(actor_config & cfg,Iter first,Iter second,F & fun,Ts &&...xs)450   infer_handle_from_fun_t<F> spawn_fun_in_groups(actor_config& cfg, Iter first,
451                                                  Iter second, F& fun,
452                                                  Ts&&... xs) {
453     using impl = infer_impl_from_fun_t<F>;
454     check_invariants<impl>();
455     using traits = actor_traits<impl>;
456     static_assert(traits::is_dynamically_typed,
457                   "only dynamically-typed actors can join groups");
458     static constexpr bool spawnable = detail::spawnable<F, impl, Ts...>();
459     static_assert(spawnable,
460                   "cannot spawn function-based actor with given arguments");
461     static constexpr bool enabled = traits::is_dynamically_typed && spawnable;
462     auto irange = make_input_range(first, second);
463     cfg.groups = &irange;
464     return spawn_functor<Os>(detail::bool_token<enabled>{}, cfg, fun,
465                              std::forward<Ts>(xs)...);
466   }
467 
468   /// Returns a new functor-based actor subscribed to all groups in `gs`.
469   template <spawn_options Os = no_spawn_options, class F, class... Ts>
470   infer_handle_from_fun_t<F>
spawn_in_groups(std::initializer_list<group> gs,F fun,Ts &&...xs)471   spawn_in_groups(std::initializer_list<group> gs, F fun, Ts&&... xs) {
472     actor_config cfg;
473     return spawn_fun_in_groups<Os>(cfg, gs.begin(), gs.end(), fun,
474                                    std::forward<Ts>(xs)...);
475   }
476 
477   /// Returns a new functor-based actor subscribed to all groups in `gs`.
478   template <spawn_options Os = no_spawn_options, class Gs, class F, class... Ts>
spawn_in_groups(const Gs & gs,F fun,Ts &&...xs)479   infer_handle_from_fun_t<F> spawn_in_groups(const Gs& gs, F fun, Ts&&... xs) {
480     actor_config cfg;
481     return spawn_fun_in_groups<Os>(cfg, gs.begin(), gs.end(), fun,
482                                    std::forward<Ts>(xs)...);
483   }
484 
485   /// Returns a new functor-based actor subscribed to all groups in `gs`.
486   template <spawn_options Os = no_spawn_options, class F, class... Ts>
487   infer_handle_from_fun_t<F>
spawn_in_group(const group & grp,F fun,Ts &&...xs)488   spawn_in_group(const group& grp, F fun, Ts&&... xs) {
489     return spawn_in_groups<Os>({grp}, std::move(fun), std::forward<Ts>(xs)...);
490   }
491 
492   /// Returns a new class-based actor subscribed to all groups in `gs`.
493   template <class T, spawn_options Os = no_spawn_options, class... Ts>
494   infer_handle_from_class_t<T>
spawn_in_groups(std::initializer_list<group> gs,Ts &&...xs)495   spawn_in_groups(std::initializer_list<group> gs, Ts&&... xs) {
496     actor_config cfg;
497     return spawn_class_in_groups<T, Os>(cfg, gs.begin(), gs.end(),
498                                         std::forward<Ts>(xs)...);
499   }
500 
501   /// Returns a new class-based actor subscribed to all groups in `gs`.
502   template <class T, spawn_options Os = no_spawn_options, class Gs, class... Ts>
spawn_in_groups(const Gs & gs,Ts &&...xs)503   infer_handle_from_class_t<T> spawn_in_groups(const Gs& gs, Ts&&... xs) {
504     actor_config cfg;
505     return spawn_class_in_groups<T, Os>(cfg, gs.begin(), gs.end(),
506                                         std::forward<Ts>(xs)...);
507   }
508 
509   /// Returns a new class-based actor subscribed to all groups in `gs`.
510   template <class T, spawn_options Os = no_spawn_options, class... Ts>
spawn_in_group(const group & grp,Ts &&...xs)511   infer_handle_from_class_t<T> spawn_in_group(const group& grp, Ts&&... xs) {
512     return spawn_in_groups<T, Os>({grp}, std::forward<Ts>(xs)...);
513   }
514 
515   /// Returns whether this actor system calls `await_all_actors_done`
516   /// in its destructor before shutting down.
await_actors_before_shutdown() const517   bool await_actors_before_shutdown() const {
518     return await_actors_before_shutdown_;
519   }
520 
521   /// Configures whether this actor system calls `await_all_actors_done`
522   /// in its destructor before shutting down.
await_actors_before_shutdown(bool x)523   void await_actors_before_shutdown(bool x) {
524     await_actors_before_shutdown_ = x;
525   }
526 
527   /// Returns the configuration of this actor system.
config() const528   const actor_system_config& config() const {
529     return cfg_;
530   }
531 
532   /// Returns the system-wide clock.
533   actor_clock& clock() noexcept;
534 
535   /// Returns the number of detached actors.
536   size_t detached_actors() const noexcept;
537 
538   /// @cond PRIVATE
539 
540   /// Calls all thread started hooks
541   /// @warning must be called by thread which is about to start
542   void thread_started();
543 
544   /// Calls all thread terminates hooks
545   /// @warning must be called by thread which is about to terminate
546   void thread_terminates();
547 
548   template <class F>
launch_thread(const char * thread_name,F fun)549   std::thread launch_thread(const char* thread_name, F fun) {
550     auto body = [this, thread_name, f{std::move(fun)}](auto guard) {
551       CAF_IGNORE_UNUSED(guard);
552       CAF_SET_LOGGER_SYS(this);
553       detail::set_thread_name(thread_name);
554       thread_started();
555       f();
556       thread_terminates();
557     };
558     return std::thread{std::move(body), meta_objects_guard_};
559   }
560 
meta_objects_guard() const561   auto meta_objects_guard() const noexcept {
562     return meta_objects_guard_;
563   }
564 
metrics_actors_includes() const565   const auto& metrics_actors_includes() const noexcept {
566     return metrics_actors_includes_;
567   }
568 
metrics_actors_excludes() const569   const auto& metrics_actors_excludes() const noexcept {
570     return metrics_actors_excludes_;
571   }
572 
573   template <class C, spawn_options Os, class... Ts>
spawn_impl(actor_config & cfg,Ts &&...xs)574   infer_handle_from_class_t<C> spawn_impl(actor_config& cfg, Ts&&... xs) {
575     static_assert(is_unbound(Os),
576                   "top-level spawns cannot have monitor or link flag");
577     // TODO: use `if constexpr` when switching to C++17
578     if (has_detach_flag(Os) || std::is_base_of<blocking_actor, C>::value)
579       cfg.flags |= abstract_actor::is_detached_flag;
580     if (has_hide_flag(Os))
581       cfg.flags |= abstract_actor::is_hidden_flag;
582     if (cfg.host == nullptr)
583       cfg.host = dummy_execution_unit();
584     CAF_SET_LOGGER_SYS(this);
585     auto res = make_actor<C>(next_actor_id(), node(), this, cfg,
586                              std::forward<Ts>(xs)...);
587     auto ptr = static_cast<C*>(actor_cast<abstract_actor*>(res));
588 #ifdef CAF_ENABLE_ACTOR_PROFILER
589     profiler_add_actor(*ptr, cfg.parent);
590 #endif
591     ptr->launch(cfg.host, has_lazy_init_flag(Os), has_hide_flag(Os));
592     return res;
593   }
594 
profiler_add_actor(const local_actor & self,const local_actor * parent)595   void profiler_add_actor(const local_actor& self, const local_actor* parent) {
596     if (profiler_)
597       profiler_->add_actor(self, parent);
598   }
599 
profiler_remove_actor(const local_actor & self)600   void profiler_remove_actor(const local_actor& self) {
601     if (profiler_)
602       profiler_->remove_actor(self);
603   }
604 
profiler_before_processing(const local_actor & self,const mailbox_element & element)605   void profiler_before_processing(const local_actor& self,
606                                   const mailbox_element& element) {
607     if (profiler_)
608       profiler_->before_processing(self, element);
609   }
610 
profiler_after_processing(const local_actor & self,invoke_message_result result)611   void profiler_after_processing(const local_actor& self,
612                                  invoke_message_result result) {
613     if (profiler_)
614       profiler_->after_processing(self, result);
615   }
616 
profiler_before_sending(const local_actor & self,mailbox_element & element)617   void profiler_before_sending(const local_actor& self,
618                                mailbox_element& element) {
619     if (profiler_)
620       profiler_->before_sending(self, element);
621   }
622 
profiler_before_sending_scheduled(const local_actor & self,caf::actor_clock::time_point timeout,mailbox_element & element)623   void profiler_before_sending_scheduled(const local_actor& self,
624                                          caf::actor_clock::time_point timeout,
625                                          mailbox_element& element) {
626     if (profiler_)
627       profiler_->before_sending_scheduled(self, timeout, element);
628   }
629 
base_metrics()630   base_metrics_t& base_metrics() noexcept {
631     return base_metrics_;
632   }
633 
base_metrics() const634   const auto& base_metrics() const noexcept {
635     return base_metrics_;
636   }
637 
actor_metric_families() const638   const auto& actor_metric_families() const noexcept {
639     return actor_metric_families_;
640   }
641 
tracing_context() const642   tracing_data_factory* tracing_context() const noexcept {
643     return tracing_context_;
644   }
645 
646   detail::private_thread* acquire_private_thread();
647 
648   void release_private_thread(detail::private_thread*);
649 
650   /// @endcond
651 
652 private:
653   template <class T>
check_invariants()654   void check_invariants() {
655     static_assert(!std::is_base_of<prohibit_top_level_spawn_marker, T>::value,
656                   "This actor type cannot be spawned through an actor system. "
657                   "Probably you have tried to spawn a broker.");
658   }
659 
660   expected<strong_actor_ptr>
661   dyn_spawn_impl(const std::string& name, message& args, execution_unit* ctx,
662                  bool check_interface, optional<const mpi&> expected_ifs);
663 
664   /// Sets the internal actor for dynamic spawn operations.
spawn_serv(strong_actor_ptr x)665   void spawn_serv(strong_actor_ptr x) {
666     spawn_serv_ = std::move(x);
667   }
668 
669   /// Sets the internal actor for storing the runtime configuration.
config_serv(strong_actor_ptr x)670   void config_serv(strong_actor_ptr x) {
671     config_serv_ = std::move(x);
672   }
673 
674   // -- member variables -------------------------------------------------------
675 
676   /// Provides system-wide callbacks for several actor operations.
677   actor_profiler* profiler_;
678 
679   /// Used to generate ascending actor IDs.
680   std::atomic<size_t> ids_;
681 
682   /// Manages all metrics collected by the system.
683   telemetry::metric_registry metrics_;
684 
685   /// Stores all metrics that the actor system collects by default.
686   base_metrics_t base_metrics_;
687 
688   /// Identifies this actor system in a distributed setting.
689   node_id node_;
690 
691   /// Manages log output.
692   intrusive_ptr<caf::logger> logger_;
693 
694   /// Maps well-known actor names to actor handles.
695   actor_registry registry_;
696 
697   /// Maps well-known group names to group handles.
698   group_manager groups_;
699 
700   /// Stores optional actor system components.
701   module_array modules_;
702 
703   /// Provides pseudo scheduling context to actors.
704   scoped_execution_unit dummy_execution_unit_;
705 
706   /// Stores whether the system should wait for running actors on shutdown.
707   bool await_actors_before_shutdown_;
708 
709   /// Stores config parameters.
710   strong_actor_ptr config_serv_;
711 
712   /// Allows fully dynamic spawning of actors.
713   strong_actor_ptr spawn_serv_;
714 
715   /// The system-wide, user-provided configuration.
716   actor_system_config& cfg_;
717 
718   /// Stores whether the logger has run its destructor and stopped any thread,
719   /// file handle, etc.
720   std::atomic<bool> logger_dtor_done_;
721 
722   /// Guards `logger_dtor_done_`.
723   mutable std::mutex logger_dtor_mtx_;
724 
725   /// Allows waiting on specific values for `logger_dtor_done_`.
726   mutable std::condition_variable logger_dtor_cv_;
727 
728   /// Stores the system-wide factory for deserializing tracing data.
729   tracing_data_factory* tracing_context_;
730 
731   /// Caches the configuration parameter `caf.metrics-filters.actors.includes`
732   /// for faster lookups at runtime.
733   std::vector<std::string> metrics_actors_includes_;
734 
735   /// Caches the configuration parameter `caf.metrics-filters.actors.excludes`
736   /// for faster lookups at runtime.
737   std::vector<std::string> metrics_actors_excludes_;
738 
739   /// Caches families for optional actor metrics.
740   actor_metric_families_t actor_metric_families_;
741 
742   /// Manages threads for detached actors.
743   detail::private_thread_pool private_threads_;
744 
745   /// Ties the lifetime of the meta objects table to the actor system.
746   detail::global_meta_objects_guard_type meta_objects_guard_;
747 };
748 
749 } // namespace caf
750