1 // This file is part of CAF, the C++ Actor Framework. See the file LICENSE in
2 // the main distribution directory for license terms and copyright or visit
3 // https://github.com/actor-framework/actor-framework/blob/master/LICENSE.
4 
5 #include "caf/actor_registry.hpp"
6 
7 #include <mutex>
8 #include <limits>
9 #include <stdexcept>
10 #include <unordered_map>
11 #include <unordered_set>
12 
13 #include "caf/actor_system.hpp"
14 #include "caf/attachable.hpp"
15 #include "caf/detail/shared_spinlock.hpp"
16 #include "caf/event_based_actor.hpp"
17 #include "caf/exit_reason.hpp"
18 #include "caf/locks.hpp"
19 #include "caf/logger.hpp"
20 #include "caf/scoped_actor.hpp"
21 #include "caf/sec.hpp"
22 #include "caf/stateful_actor.hpp"
23 
24 namespace caf {
25 
26 namespace {
27 
28 using exclusive_guard = unique_lock<detail::shared_spinlock>;
29 using shared_guard = shared_lock<detail::shared_spinlock>;
30 
31 } // namespace
32 
~actor_registry()33 actor_registry::~actor_registry() {
34   // nop
35 }
36 
actor_registry(actor_system & sys)37 actor_registry::actor_registry(actor_system& sys) : system_(sys) {
38   // nop
39 }
40 
get_impl(actor_id key) const41 strong_actor_ptr actor_registry::get_impl(actor_id key) const {
42   shared_guard guard(instances_mtx_);
43   auto i = entries_.find(key);
44   if (i != entries_.end())
45     return i->second;
46   CAF_LOG_DEBUG("key invalid, assume actor no longer exists:" << CAF_ARG(key));
47   return nullptr;
48 }
49 
put_impl(actor_id key,strong_actor_ptr val)50 void actor_registry::put_impl(actor_id key, strong_actor_ptr val) {
51   CAF_LOG_TRACE(CAF_ARG(key));
52   if (!val)
53     return;
54   { // lifetime scope of guard
55     exclusive_guard guard(instances_mtx_);
56     if (!entries_.emplace(key, val).second)
57       return;
58   }
59   // attach functor without lock
60   CAF_LOG_DEBUG("added actor:" << CAF_ARG(key));
61   actor_registry* reg = this;
62   val->get()->attach_functor([key, reg]() {
63     reg->erase(key);
64   });
65 }
66 
erase(actor_id key)67 void actor_registry::erase(actor_id key) {
68   // Stores a reference to the actor we're going to remove. This guarantees
69   // that we aren't releasing the last reference to an actor while erasing it.
70   // Releasing the final ref can trigger the actor to call its cleanup function
71   // that in turn calls this function and we can end up in a deadlock.
72   strong_actor_ptr ref;
73   { // Lifetime scope of guard.
74     exclusive_guard guard{instances_mtx_};
75     auto i = entries_.find(key);
76     if (i != entries_.end()) {
77       ref.swap(i->second);
78       entries_.erase(i);
79     }
80   }
81 }
82 
inc_running()83 size_t actor_registry::inc_running() {
84   return ++*system_.base_metrics().running_actors;
85 }
86 
running() const87 size_t actor_registry::running() const {
88   return static_cast<size_t>(system_.base_metrics().running_actors->value());
89 }
90 
dec_running()91 size_t actor_registry::dec_running() {
92   size_t new_val = --*system_.base_metrics().running_actors;
93   if (new_val <= 1) {
94     std::unique_lock<std::mutex> guard(running_mtx_);
95     running_cv_.notify_all();
96   }
97   return new_val;
98 }
99 
await_running_count_equal(size_t expected) const100 void actor_registry::await_running_count_equal(size_t expected) const {
101   CAF_ASSERT(expected == 0 || expected == 1);
102   CAF_LOG_TRACE(CAF_ARG(expected));
103   std::unique_lock<std::mutex> guard{running_mtx_};
104   while (running() != expected) {
105     CAF_LOG_DEBUG(CAF_ARG(running()));
106     running_cv_.wait(guard);
107   }
108 }
109 
get_impl(const std::string & key) const110 strong_actor_ptr actor_registry::get_impl(const std::string& key) const {
111   shared_guard guard{named_entries_mtx_};
112   auto i = named_entries_.find(key);
113   if (i == named_entries_.end())
114     return nullptr;
115   return i->second;
116 }
117 
put_impl(const std::string & key,strong_actor_ptr value)118 void actor_registry::put_impl(const std::string& key, strong_actor_ptr value) {
119   if (value == nullptr) {
120     erase(key);
121     return;
122   }
123   exclusive_guard guard{named_entries_mtx_};
124   named_entries_.emplace(std::move(key), std::move(value));
125 }
126 
erase(const std::string & key)127 void actor_registry::erase(const std::string& key) {
128   // Stores a reference to the actor we're going to remove for the same
129   // reasoning as in erase(actor_id).
130   strong_actor_ptr ref;
131   { // Lifetime scope of guard.
132     exclusive_guard guard{named_entries_mtx_};
133     auto i = named_entries_.find(key);
134     if (i != named_entries_.end()) {
135       ref.swap(i->second);
136       named_entries_.erase(i);
137     }
138   }
139 }
140 
named_actors() const141 auto actor_registry::named_actors() const -> name_map {
142   shared_guard guard{named_entries_mtx_};
143   return named_entries_;
144 }
145 
start()146 void actor_registry::start() {
147   // nop
148 }
149 
stop()150 void actor_registry::stop() {
151   {
152     exclusive_guard guard{instances_mtx_};
153     entries_.clear();
154   }
155   {
156     exclusive_guard guard{named_entries_mtx_};
157     named_entries_.clear();
158   }
159 }
160 
161 } // namespace caf
162