1 // This file is part of CAF, the C++ Actor Framework. See the file LICENSE in
2 // the main distribution directory for license terms and copyright or visit
3 // https://github.com/actor-framework/actor-framework/blob/master/LICENSE.
4
5 #pragma once
6
7 #include <array>
8 #include <atomic>
9 #include <condition_variable>
10 #include <cstddef>
11 #include <functional>
12 #include <memory>
13 #include <mutex>
14 #include <string>
15 #include <thread>
16 #include <typeinfo>
17
18 #include "caf/abstract_actor.hpp"
19 #include "caf/actor_cast.hpp"
20 #include "caf/actor_clock.hpp"
21 #include "caf/actor_config.hpp"
22 #include "caf/actor_profiler.hpp"
23 #include "caf/actor_registry.hpp"
24 #include "caf/actor_traits.hpp"
25 #include "caf/detail/core_export.hpp"
26 #include "caf/detail/init_fun_factory.hpp"
27 #include "caf/detail/private_thread_pool.hpp"
28 #include "caf/detail/set_thread_name.hpp"
29 #include "caf/detail/spawn_fwd.hpp"
30 #include "caf/detail/spawnable.hpp"
31 #include "caf/fwd.hpp"
32 #include "caf/group_manager.hpp"
33 #include "caf/infer_handle.hpp"
34 #include "caf/is_typed_actor.hpp"
35 #include "caf/logger.hpp"
36 #include "caf/make_actor.hpp"
37 #include "caf/prohibit_top_level_spawn_marker.hpp"
38 #include "caf/scoped_execution_unit.hpp"
39 #include "caf/spawn_options.hpp"
40 #include "caf/string_algorithms.hpp"
41 #include "caf/telemetry/metric_registry.hpp"
42 #include "caf/type_id.hpp"
43
44 namespace caf::detail {
45
46 template <class>
47 struct typed_mpi_access;
48
49 template <class Out, class... In>
50 struct typed_mpi_access<Out(In...)> {
operator ()caf::detail::typed_mpi_access51 std::string operator()() const {
52 static_assert(sizeof...(In) > 0, "typed MPI without inputs");
53 std::vector<std::string> inputs{type_name_v<In>...};
54 std::string result = "(";
55 result += join(inputs, ",");
56 result += ") -> ";
57 result += type_name_v<Out>;
58 return result;
59 }
60 };
61
62 template <class... Out, class... In>
63 struct typed_mpi_access<result<Out...>(In...)> {
64 std::string operator()() const {
65 static_assert(sizeof...(In) > 0, "typed MPI without inputs");
66 static_assert(sizeof...(Out) > 0, "typed MPI without outputs");
67 std::vector<std::string> inputs{to_string(type_name_v<In>)...};
68 std::vector<std::string> outputs1{to_string(type_name_v<Out>)...};
69 std::string result = "(";
70 result += join(inputs, ",");
71 result += ") -> (";
72 result += join(outputs1, ",");
73 result += ")";
74 return result;
75 }
76 };
77
78 template <class T>
get_rtti_from_mpi()79 std::string get_rtti_from_mpi() {
80 typed_mpi_access<T> f;
81 return f();
82 }
83
84 } // namespace caf::detail
85
86 namespace caf {
87
88 /// Actor environment including scheduler, registry, and optional components
89 /// such as a middleman.
90 class CAF_CORE_EXPORT actor_system {
91 public:
92 friend class logger;
93 friend class io::middleman;
94 friend class net::middleman;
95 friend class abstract_actor;
96
97 /// Returns the internal actor for dynamic spawn operations.
spawn_serv() const98 const strong_actor_ptr& spawn_serv() const {
99 return spawn_serv_;
100 }
101
102 /// Returns the internal actor for storing the runtime configuration
103 /// for this actor system.
config_serv() const104 const strong_actor_ptr& config_serv() const {
105 return config_serv_;
106 }
107
108 actor_system() = delete;
109 actor_system(const actor_system&) = delete;
110 actor_system& operator=(const actor_system&) = delete;
111
112 /// An (optional) component of the actor system.
113 class CAF_CORE_EXPORT module {
114 public:
115 enum id_t {
116 scheduler,
117 middleman,
118 openssl_manager,
119 network_manager,
120 num_ids
121 };
122
123 virtual ~module();
124
125 /// Returns the human-redable name of the module.
126 const char* name() const noexcept;
127
128 /// Starts any background threads needed by the module.
129 virtual void start() = 0;
130
131 /// Stops all background threads of the module.
132 virtual void stop() = 0;
133
134 /// Allows the module to change the
135 /// configuration of the actor system during startup.
136 virtual void init(actor_system_config&) = 0;
137
138 /// Returns the identifier of this module.
139 virtual id_t id() const = 0;
140
141 /// Returns a pointer to the subtype.
142 virtual void* subtype_ptr() = 0;
143 };
144
145 using module_ptr = std::unique_ptr<module>;
146
147 using module_array = std::array<module_ptr, module::num_ids>;
148
149 /// An (optional) component of the actor system with networking capabilities.
150 class CAF_CORE_EXPORT networking_module : public module {
151 public:
152 ~networking_module() override;
153
154 /// Causes the module to send a `node_down_msg` to `observer` if this system
155 /// loses connection to `node`.
156 virtual void monitor(const node_id& node, const actor_addr& observer) = 0;
157
158 /// Causes the module remove one entry for `observer` from the list of
159 /// actors that receive a `node_down_msg` if this system loses connection to
160 /// `node`. Each call to `monitor` requires one call to `demonitor` in order
161 /// to unsubscribe the `observer` completely.
162 virtual void demonitor(const node_id& node, const actor_addr& observer) = 0;
163 };
164
165 /// Metrics that the actor system collects by default.
166 /// @warning Do not modify these metrics in user code. Some may be used by the
167 /// system for synchronization.
168 struct base_metrics_t {
169 /// Counts the number of messages that where rejected because the target
170 /// mailbox was closed or did not exist.
171 telemetry::int_counter* rejected_messages;
172
173 /// Counts the total number of processed messages.
174 telemetry::int_counter* processed_messages;
175
176 /// Tracks the current number of running actors in the system.
177 telemetry::int_gauge* running_actors;
178
179 /// Counts the total number of messages that wait in a mailbox.
180 telemetry::int_gauge* queued_messages;
181 };
182
183 /// Metrics that some actors may collect in addition to the base metrics. All
184 /// families in this set use the label dimension *name* (the user-defined name
185 /// of the actor).
186 struct actor_metric_families_t {
187 /// Samples how long the actor needs to process messages.
188 telemetry::dbl_histogram_family* processing_time = nullptr;
189
190 /// Samples how long a message waits in the mailbox before the actor
191 /// processes it.
192 telemetry::dbl_histogram_family* mailbox_time = nullptr;
193
194 /// Counts how many messages are currently waiting in the mailbox.
195 telemetry::int_gauge_family* mailbox_size = nullptr;
196
197 struct {
198 // -- inbound ------------------------------------------------------------
199
200 /// Counts the total number of processed stream elements from upstream.
201 telemetry::int_counter_family* processed_elements = nullptr;
202
203 /// Tracks how many stream elements from upstream are currently buffered.
204 telemetry::int_gauge_family* input_buffer_size = nullptr;
205
206 // -- outbound -----------------------------------------------------------
207
208 /// Counts the total number of elements that have been pushed downstream.
209 telemetry::int_counter_family* pushed_elements = nullptr;
210
211 /// Tracks how many stream elements are currently waiting in the output
212 /// buffer due to insufficient credit.
213 telemetry::int_gauge_family* output_buffer_size = nullptr;
214 }
215
216 /// Wraps streaming-related actor metric families.
217 stream;
218 };
219
220 /// @warning The system stores a reference to `cfg`, which means the
221 /// config object must outlive the actor system.
222 explicit actor_system(actor_system_config& cfg);
223
224 virtual ~actor_system();
225
226 /// A message passing interface (MPI) in run-time checkable representation.
227 using mpi = std::set<std::string>;
228
229 template <class T,
230 class E = typename std::enable_if<!is_typed_actor<T>::value>::type>
message_types(detail::type_list<T>) const231 mpi message_types(detail::type_list<T>) const {
232 return mpi{};
233 }
234
235 template <class... Ts>
message_types(detail::type_list<typed_actor<Ts...>>) const236 mpi message_types(detail::type_list<typed_actor<Ts...>>) const {
237 static_assert(sizeof...(Ts) > 0, "empty typed actor handle given");
238 mpi result{detail::get_rtti_from_mpi<Ts>()...};
239 return result;
240 }
241
242 template <class T,
243 class E
244 = typename std::enable_if<!detail::is_type_list<T>::value>::type>
message_types(const T &) const245 mpi message_types(const T&) const {
246 detail::type_list<T> token;
247 return message_types(token);
248 }
249
250 /// Returns a string representation of the messaging
251 /// interface using portable names;
252 template <class T>
message_types() const253 mpi message_types() const {
254 detail::type_list<T> token;
255 return message_types(token);
256 }
257
258 /// Returns whether actor handles described by `xs`
259 /// can be assigned to actor handles described by `ys`.
260 /// @experimental
assignable(const mpi & xs,const mpi & ys) const261 bool assignable(const mpi& xs, const mpi& ys) const {
262 if (ys.empty())
263 return xs.empty();
264 if (xs.size() == ys.size())
265 return xs == ys;
266 return std::includes(xs.begin(), xs.end(), ys.begin(), ys.end());
267 }
268
269 /// Returns whether actor handles described by `xs`
270 /// can be assigned to actor handles of type `T`.
271 /// @experimental
272 template <class T>
assignable(const std::set<std::string> & xs) const273 bool assignable(const std::set<std::string>& xs) const {
274 return assignable(xs, message_types<T>());
275 }
276
277 /// Returns the metrics registry for this system.
metrics()278 telemetry::metric_registry& metrics() noexcept {
279 return metrics_;
280 }
281
282 /// Returns the metrics registry for this system.
metrics() const283 const telemetry::metric_registry& metrics() const noexcept {
284 return metrics_;
285 }
286
287 /// Returns the host-local identifier for this system.
node() const288 const node_id& node() const {
289 return node_;
290 }
291
292 /// Returns the scheduler instance.
293 scheduler::abstract_coordinator& scheduler();
294
295 /// Returns the system-wide event logger.
296 caf::logger& logger();
297
298 /// Returns the system-wide actor registry.
299 actor_registry& registry();
300
301 /// Returns a string representation for `err`.
302 [[deprecated("please use to_string() on the error")]] std::string
303 render(const error& x) const;
304
305 /// Returns the system-wide group manager.
306 group_manager& groups();
307
308 /// Returns `true` if the I/O module is available, `false` otherwise.
309 bool has_middleman() const;
310
311 /// Returns the middleman instance from the I/O module.
312 /// @throws `std::logic_error` if module is not loaded.
313 io::middleman& middleman();
314
315 /// Returns `true` if the openssl module is available, `false` otherwise.
316 bool has_openssl_manager() const;
317
318 /// Returns the manager instance from the OpenSSL module.
319 /// @throws `std::logic_error` if module is not loaded.
320 openssl::manager& openssl_manager() const;
321
322 /// Returns `true` if the network module is available, `false` otherwise.
323 bool has_network_manager() const noexcept;
324
325 /// Returns the network manager (middleman) instance.
326 /// @throws `std::logic_error` if module is not loaded.
327 net::middleman& network_manager();
328
329 /// Returns a dummy execution unit that forwards
330 /// everything to the scheduler.
331 scoped_execution_unit* dummy_execution_unit();
332
333 /// Returns a new actor ID.
334 actor_id next_actor_id();
335
336 /// Returns the last given actor ID.
337 actor_id latest_actor_id() const;
338
339 /// Blocks this caller until all actors are done.
340 void await_all_actors_done() const;
341
342 /// Send a `node_down_msg` to `observer` if this system loses connection to
343 /// `node`.
344 /// @note Calling this function *n* times causes the system to send
345 /// `node_down_msg` *n* times to the observer. In order to not receive
346 /// the messages, the observer must call `demonitor` *n* times.
347 void monitor(const node_id& node, const actor_addr& observer);
348
349 /// Removes `observer` from the list of actors that receive a `node_down_msg`
350 /// if this system loses connection to `node`.
351 void demonitor(const node_id& node, const actor_addr& observer);
352
353 /// Called by `spawn` when used to create a class-based actor to
354 /// apply automatic conversions to `xs` before spawning the actor.
355 /// Should not be called by users of the library directly.
356 /// @param cfg To-be-filled config for the actor.
357 /// @param xs Constructor arguments for `C`.
358 template <class C, spawn_options Os, class... Ts>
spawn_class(actor_config & cfg,Ts &&...xs)359 infer_handle_from_class_t<C> spawn_class(actor_config& cfg, Ts&&... xs) {
360 return spawn_impl<C, Os>(cfg, detail::spawn_fwd<Ts>(xs)...);
361 }
362
363 /// Returns a new actor of type `C` using `xs...` as constructor
364 /// arguments. The behavior of `spawn` can be modified by setting `Os`, e.g.,
365 /// to opt-out of the cooperative scheduling.
366 /// @param xs Constructor arguments for `C`.
367 template <class C, spawn_options Os = no_spawn_options, class... Ts>
spawn(Ts &&...xs)368 infer_handle_from_class_t<C> spawn(Ts&&... xs) {
369 check_invariants<C>();
370 actor_config cfg;
371 return spawn_impl<C, Os>(cfg, detail::spawn_fwd<Ts>(xs)...);
372 }
373
374 /// Called by `spawn` when used to create a functor-based actor to select a
375 /// proper implementation and then delegates to `spawn_impl`.
376 /// @param cfg To-be-filled config for the actor.
377 /// @param fun Function object for the actor's behavior; will be moved.
378 /// @param xs Arguments for `fun`.
379 /// @private
380 template <spawn_options Os = no_spawn_options, class F, class... Ts>
381 infer_handle_from_fun_t<F>
spawn_functor(std::true_type,actor_config & cfg,F & fun,Ts &&...xs)382 spawn_functor(std::true_type, actor_config& cfg, F& fun, Ts&&... xs) {
383 using impl = infer_impl_from_fun_t<F>;
384 detail::init_fun_factory<impl, F> fac;
385 cfg.init_fun = fac(std::move(fun), std::forward<Ts>(xs)...);
386 return spawn_impl<impl, Os>(cfg);
387 }
388
389 /// Fallback no-op overload.
390 /// @private
391 template <spawn_options Os = no_spawn_options, class F, class... Ts>
392 infer_handle_from_fun_t<F>
spawn_functor(std::false_type,actor_config &,F &,Ts &&...)393 spawn_functor(std::false_type, actor_config&, F&, Ts&&...) {
394 return {};
395 }
396
397 /// Returns a new functor-based actor. The first argument must be the functor,
398 /// the remainder of `xs...` is used to invoke the functor.
399 /// The behavior of `spawn` can be modified by setting `Os`, e.g.,
400 /// to opt-out of the cooperative scheduling.
401 template <spawn_options Os = no_spawn_options, class F, class... Ts>
spawn(F fun,Ts &&...xs)402 infer_handle_from_fun_t<F> spawn(F fun, Ts&&... xs) {
403 using impl = infer_impl_from_fun_t<F>;
404 check_invariants<impl>();
405 static constexpr bool spawnable = detail::spawnable<F, impl, Ts...>();
406 static_assert(spawnable,
407 "cannot spawn function-based actor with given arguments");
408 actor_config cfg;
409 return spawn_functor<Os>(detail::bool_token<spawnable>{}, cfg, fun,
410 std::forward<Ts>(xs)...);
411 }
412
413 /// Returns a new actor with run-time type `name`, constructed
414 /// with the arguments stored in `args`.
415 /// @experimental
416 template <class Handle,
417 class E = typename std::enable_if<is_handle<Handle>::value>::type>
418 expected<Handle>
spawn(const std::string & name,message args,execution_unit * ctx=nullptr,bool check_interface=true,const mpi * expected_ifs=nullptr)419 spawn(const std::string& name, message args, execution_unit* ctx = nullptr,
420 bool check_interface = true, const mpi* expected_ifs = nullptr) {
421 mpi tmp;
422 if (check_interface && !expected_ifs) {
423 tmp = message_types<Handle>();
424 expected_ifs = &tmp;
425 }
426 auto res = dyn_spawn_impl(name, args, ctx, check_interface, expected_ifs);
427 if (!res)
428 return std::move(res.error());
429 return actor_cast<Handle>(std::move(*res));
430 }
431
432 /// Spawns a class-based actor `T` immediately joining the groups in
433 /// range `[first, last)`.
434 /// @private
435 template <class T, spawn_options Os, class Iter, class... Ts>
436 infer_handle_from_class_t<T>
spawn_class_in_groups(actor_config & cfg,Iter first,Iter last,Ts &&...xs)437 spawn_class_in_groups(actor_config& cfg, Iter first, Iter last, Ts&&... xs) {
438 static_assert(std::is_same<infer_handle_from_class_t<T>, actor>::value,
439 "only dynamically-typed actors can be spawned in a group");
440 check_invariants<T>();
441 auto irange = make_input_range(first, last);
442 cfg.groups = &irange;
443 return spawn_class<T, Os>(cfg, std::forward<Ts>(xs)...);
444 }
445
446 /// Spawns a class-based actor `T` immediately joining the groups in
447 /// range `[first, last)`.
448 /// @private
449 template <spawn_options Os, class Iter, class F, class... Ts>
spawn_fun_in_groups(actor_config & cfg,Iter first,Iter second,F & fun,Ts &&...xs)450 infer_handle_from_fun_t<F> spawn_fun_in_groups(actor_config& cfg, Iter first,
451 Iter second, F& fun,
452 Ts&&... xs) {
453 using impl = infer_impl_from_fun_t<F>;
454 check_invariants<impl>();
455 using traits = actor_traits<impl>;
456 static_assert(traits::is_dynamically_typed,
457 "only dynamically-typed actors can join groups");
458 static constexpr bool spawnable = detail::spawnable<F, impl, Ts...>();
459 static_assert(spawnable,
460 "cannot spawn function-based actor with given arguments");
461 static constexpr bool enabled = traits::is_dynamically_typed && spawnable;
462 auto irange = make_input_range(first, second);
463 cfg.groups = &irange;
464 return spawn_functor<Os>(detail::bool_token<enabled>{}, cfg, fun,
465 std::forward<Ts>(xs)...);
466 }
467
468 /// Returns a new functor-based actor subscribed to all groups in `gs`.
469 template <spawn_options Os = no_spawn_options, class F, class... Ts>
470 infer_handle_from_fun_t<F>
spawn_in_groups(std::initializer_list<group> gs,F fun,Ts &&...xs)471 spawn_in_groups(std::initializer_list<group> gs, F fun, Ts&&... xs) {
472 actor_config cfg;
473 return spawn_fun_in_groups<Os>(cfg, gs.begin(), gs.end(), fun,
474 std::forward<Ts>(xs)...);
475 }
476
477 /// Returns a new functor-based actor subscribed to all groups in `gs`.
478 template <spawn_options Os = no_spawn_options, class Gs, class F, class... Ts>
spawn_in_groups(const Gs & gs,F fun,Ts &&...xs)479 infer_handle_from_fun_t<F> spawn_in_groups(const Gs& gs, F fun, Ts&&... xs) {
480 actor_config cfg;
481 return spawn_fun_in_groups<Os>(cfg, gs.begin(), gs.end(), fun,
482 std::forward<Ts>(xs)...);
483 }
484
485 /// Returns a new functor-based actor subscribed to all groups in `gs`.
486 template <spawn_options Os = no_spawn_options, class F, class... Ts>
487 infer_handle_from_fun_t<F>
spawn_in_group(const group & grp,F fun,Ts &&...xs)488 spawn_in_group(const group& grp, F fun, Ts&&... xs) {
489 return spawn_in_groups<Os>({grp}, std::move(fun), std::forward<Ts>(xs)...);
490 }
491
492 /// Returns a new class-based actor subscribed to all groups in `gs`.
493 template <class T, spawn_options Os = no_spawn_options, class... Ts>
494 infer_handle_from_class_t<T>
spawn_in_groups(std::initializer_list<group> gs,Ts &&...xs)495 spawn_in_groups(std::initializer_list<group> gs, Ts&&... xs) {
496 actor_config cfg;
497 return spawn_class_in_groups<T, Os>(cfg, gs.begin(), gs.end(),
498 std::forward<Ts>(xs)...);
499 }
500
501 /// Returns a new class-based actor subscribed to all groups in `gs`.
502 template <class T, spawn_options Os = no_spawn_options, class Gs, class... Ts>
spawn_in_groups(const Gs & gs,Ts &&...xs)503 infer_handle_from_class_t<T> spawn_in_groups(const Gs& gs, Ts&&... xs) {
504 actor_config cfg;
505 return spawn_class_in_groups<T, Os>(cfg, gs.begin(), gs.end(),
506 std::forward<Ts>(xs)...);
507 }
508
509 /// Returns a new class-based actor subscribed to all groups in `gs`.
510 template <class T, spawn_options Os = no_spawn_options, class... Ts>
spawn_in_group(const group & grp,Ts &&...xs)511 infer_handle_from_class_t<T> spawn_in_group(const group& grp, Ts&&... xs) {
512 return spawn_in_groups<T, Os>({grp}, std::forward<Ts>(xs)...);
513 }
514
515 /// Returns whether this actor system calls `await_all_actors_done`
516 /// in its destructor before shutting down.
await_actors_before_shutdown() const517 bool await_actors_before_shutdown() const {
518 return await_actors_before_shutdown_;
519 }
520
521 /// Configures whether this actor system calls `await_all_actors_done`
522 /// in its destructor before shutting down.
await_actors_before_shutdown(bool x)523 void await_actors_before_shutdown(bool x) {
524 await_actors_before_shutdown_ = x;
525 }
526
527 /// Returns the configuration of this actor system.
config() const528 const actor_system_config& config() const {
529 return cfg_;
530 }
531
532 /// Returns the system-wide clock.
533 actor_clock& clock() noexcept;
534
535 /// Returns the number of detached actors.
536 size_t detached_actors() const noexcept;
537
538 /// @cond PRIVATE
539
540 /// Calls all thread started hooks
541 /// @warning must be called by thread which is about to start
542 void thread_started();
543
544 /// Calls all thread terminates hooks
545 /// @warning must be called by thread which is about to terminate
546 void thread_terminates();
547
548 template <class F>
launch_thread(const char * thread_name,F fun)549 std::thread launch_thread(const char* thread_name, F fun) {
550 auto body = [this, thread_name, f{std::move(fun)}](auto guard) {
551 CAF_IGNORE_UNUSED(guard);
552 CAF_SET_LOGGER_SYS(this);
553 detail::set_thread_name(thread_name);
554 thread_started();
555 f();
556 thread_terminates();
557 };
558 return std::thread{std::move(body), meta_objects_guard_};
559 }
560
meta_objects_guard() const561 auto meta_objects_guard() const noexcept {
562 return meta_objects_guard_;
563 }
564
metrics_actors_includes() const565 const auto& metrics_actors_includes() const noexcept {
566 return metrics_actors_includes_;
567 }
568
metrics_actors_excludes() const569 const auto& metrics_actors_excludes() const noexcept {
570 return metrics_actors_excludes_;
571 }
572
573 template <class C, spawn_options Os, class... Ts>
spawn_impl(actor_config & cfg,Ts &&...xs)574 infer_handle_from_class_t<C> spawn_impl(actor_config& cfg, Ts&&... xs) {
575 static_assert(is_unbound(Os),
576 "top-level spawns cannot have monitor or link flag");
577 // TODO: use `if constexpr` when switching to C++17
578 if (has_detach_flag(Os) || std::is_base_of<blocking_actor, C>::value)
579 cfg.flags |= abstract_actor::is_detached_flag;
580 if (has_hide_flag(Os))
581 cfg.flags |= abstract_actor::is_hidden_flag;
582 if (cfg.host == nullptr)
583 cfg.host = dummy_execution_unit();
584 CAF_SET_LOGGER_SYS(this);
585 auto res = make_actor<C>(next_actor_id(), node(), this, cfg,
586 std::forward<Ts>(xs)...);
587 auto ptr = static_cast<C*>(actor_cast<abstract_actor*>(res));
588 #ifdef CAF_ENABLE_ACTOR_PROFILER
589 profiler_add_actor(*ptr, cfg.parent);
590 #endif
591 ptr->launch(cfg.host, has_lazy_init_flag(Os), has_hide_flag(Os));
592 return res;
593 }
594
profiler_add_actor(const local_actor & self,const local_actor * parent)595 void profiler_add_actor(const local_actor& self, const local_actor* parent) {
596 if (profiler_)
597 profiler_->add_actor(self, parent);
598 }
599
profiler_remove_actor(const local_actor & self)600 void profiler_remove_actor(const local_actor& self) {
601 if (profiler_)
602 profiler_->remove_actor(self);
603 }
604
profiler_before_processing(const local_actor & self,const mailbox_element & element)605 void profiler_before_processing(const local_actor& self,
606 const mailbox_element& element) {
607 if (profiler_)
608 profiler_->before_processing(self, element);
609 }
610
profiler_after_processing(const local_actor & self,invoke_message_result result)611 void profiler_after_processing(const local_actor& self,
612 invoke_message_result result) {
613 if (profiler_)
614 profiler_->after_processing(self, result);
615 }
616
profiler_before_sending(const local_actor & self,mailbox_element & element)617 void profiler_before_sending(const local_actor& self,
618 mailbox_element& element) {
619 if (profiler_)
620 profiler_->before_sending(self, element);
621 }
622
profiler_before_sending_scheduled(const local_actor & self,caf::actor_clock::time_point timeout,mailbox_element & element)623 void profiler_before_sending_scheduled(const local_actor& self,
624 caf::actor_clock::time_point timeout,
625 mailbox_element& element) {
626 if (profiler_)
627 profiler_->before_sending_scheduled(self, timeout, element);
628 }
629
base_metrics()630 base_metrics_t& base_metrics() noexcept {
631 return base_metrics_;
632 }
633
base_metrics() const634 const auto& base_metrics() const noexcept {
635 return base_metrics_;
636 }
637
actor_metric_families() const638 const auto& actor_metric_families() const noexcept {
639 return actor_metric_families_;
640 }
641
tracing_context() const642 tracing_data_factory* tracing_context() const noexcept {
643 return tracing_context_;
644 }
645
646 detail::private_thread* acquire_private_thread();
647
648 void release_private_thread(detail::private_thread*);
649
650 /// @endcond
651
652 private:
653 template <class T>
check_invariants()654 void check_invariants() {
655 static_assert(!std::is_base_of<prohibit_top_level_spawn_marker, T>::value,
656 "This actor type cannot be spawned through an actor system. "
657 "Probably you have tried to spawn a broker.");
658 }
659
660 expected<strong_actor_ptr>
661 dyn_spawn_impl(const std::string& name, message& args, execution_unit* ctx,
662 bool check_interface, optional<const mpi&> expected_ifs);
663
664 /// Sets the internal actor for dynamic spawn operations.
spawn_serv(strong_actor_ptr x)665 void spawn_serv(strong_actor_ptr x) {
666 spawn_serv_ = std::move(x);
667 }
668
669 /// Sets the internal actor for storing the runtime configuration.
config_serv(strong_actor_ptr x)670 void config_serv(strong_actor_ptr x) {
671 config_serv_ = std::move(x);
672 }
673
674 // -- member variables -------------------------------------------------------
675
676 /// Provides system-wide callbacks for several actor operations.
677 actor_profiler* profiler_;
678
679 /// Used to generate ascending actor IDs.
680 std::atomic<size_t> ids_;
681
682 /// Manages all metrics collected by the system.
683 telemetry::metric_registry metrics_;
684
685 /// Stores all metrics that the actor system collects by default.
686 base_metrics_t base_metrics_;
687
688 /// Identifies this actor system in a distributed setting.
689 node_id node_;
690
691 /// Manages log output.
692 intrusive_ptr<caf::logger> logger_;
693
694 /// Maps well-known actor names to actor handles.
695 actor_registry registry_;
696
697 /// Maps well-known group names to group handles.
698 group_manager groups_;
699
700 /// Stores optional actor system components.
701 module_array modules_;
702
703 /// Provides pseudo scheduling context to actors.
704 scoped_execution_unit dummy_execution_unit_;
705
706 /// Stores whether the system should wait for running actors on shutdown.
707 bool await_actors_before_shutdown_;
708
709 /// Stores config parameters.
710 strong_actor_ptr config_serv_;
711
712 /// Allows fully dynamic spawning of actors.
713 strong_actor_ptr spawn_serv_;
714
715 /// The system-wide, user-provided configuration.
716 actor_system_config& cfg_;
717
718 /// Stores whether the logger has run its destructor and stopped any thread,
719 /// file handle, etc.
720 std::atomic<bool> logger_dtor_done_;
721
722 /// Guards `logger_dtor_done_`.
723 mutable std::mutex logger_dtor_mtx_;
724
725 /// Allows waiting on specific values for `logger_dtor_done_`.
726 mutable std::condition_variable logger_dtor_cv_;
727
728 /// Stores the system-wide factory for deserializing tracing data.
729 tracing_data_factory* tracing_context_;
730
731 /// Caches the configuration parameter `caf.metrics-filters.actors.includes`
732 /// for faster lookups at runtime.
733 std::vector<std::string> metrics_actors_includes_;
734
735 /// Caches the configuration parameter `caf.metrics-filters.actors.excludes`
736 /// for faster lookups at runtime.
737 std::vector<std::string> metrics_actors_excludes_;
738
739 /// Caches families for optional actor metrics.
740 actor_metric_families_t actor_metric_families_;
741
742 /// Manages threads for detached actors.
743 detail::private_thread_pool private_threads_;
744
745 /// Ties the lifetime of the meta objects table to the actor system.
746 detail::global_meta_objects_guard_type meta_objects_guard_;
747 };
748
749 } // namespace caf
750