1 // This file is part of CAF, the C++ Actor Framework. See the file LICENSE in
2 // the main distribution directory for license terms and copyright or visit
3 // https://github.com/actor-framework/actor-framework/blob/master/LICENSE.
4 
5 #include "caf/blocking_actor.hpp"
6 
7 #include <utility>
8 
9 #include "caf/actor_registry.hpp"
10 #include "caf/actor_system.hpp"
11 #include "caf/detail/default_invoke_result_visitor.hpp"
12 #include "caf/detail/invoke_result_visitor.hpp"
13 #include "caf/detail/private_thread.hpp"
14 #include "caf/detail/set_thread_name.hpp"
15 #include "caf/detail/sync_request_bouncer.hpp"
16 #include "caf/invoke_message_result.hpp"
17 #include "caf/logger.hpp"
18 #include "caf/scheduled_actor.hpp"
19 #include "caf/telemetry/timer.hpp"
20 
21 namespace caf {
22 
~receive_cond()23 blocking_actor::receive_cond::~receive_cond() {
24   // nop
25 }
26 
pre()27 bool blocking_actor::receive_cond::pre() {
28   return true;
29 }
30 
post()31 bool blocking_actor::receive_cond::post() {
32   return true;
33 }
34 
~accept_one_cond()35 blocking_actor::accept_one_cond::~accept_one_cond() {
36   // nop
37 }
38 
post()39 bool blocking_actor::accept_one_cond::post() {
40   return false;
41 }
42 
blocking_actor(actor_config & cfg)43 blocking_actor::blocking_actor(actor_config& cfg)
44   : super(cfg.add_flag(local_actor::is_blocking_flag)),
45     mailbox_(unit, unit, unit) {
46   // nop
47 }
48 
~blocking_actor()49 blocking_actor::~blocking_actor() {
50   // avoid weak-vtables warning
51 }
52 
enqueue(mailbox_element_ptr ptr,execution_unit *)53 void blocking_actor::enqueue(mailbox_element_ptr ptr, execution_unit*) {
54   CAF_ASSERT(ptr != nullptr);
55   CAF_ASSERT(getf(is_blocking_flag));
56   CAF_LOG_TRACE(CAF_ARG(*ptr));
57   CAF_LOG_SEND_EVENT(ptr);
58   auto mid = ptr->mid;
59   auto src = ptr->sender;
60   auto collects_metrics = getf(abstract_actor::collects_metrics_flag);
61   if (collects_metrics) {
62     ptr->set_enqueue_time();
63     metrics_.mailbox_size->inc();
64   }
65   // returns false if mailbox has been closed
66   if (!mailbox().synchronized_push_back(mtx_, cv_, std::move(ptr))) {
67     CAF_LOG_REJECT_EVENT();
68     home_system().base_metrics().rejected_messages->inc();
69     if (collects_metrics)
70       metrics_.mailbox_size->dec();
71     if (mid.is_request()) {
72       detail::sync_request_bouncer srb{exit_reason()};
73       srb(src, mid);
74     }
75   } else {
76     CAF_LOG_ACCEPT_EVENT(false);
77   }
78 }
79 
peek_at_next_mailbox_element()80 mailbox_element* blocking_actor::peek_at_next_mailbox_element() {
81   return mailbox().closed() || mailbox().blocked() ? nullptr : mailbox().peek();
82 }
83 
name() const84 const char* blocking_actor::name() const {
85   return "user.blocking-actor";
86 }
87 
88 namespace {
89 
90 // Runner for passing a blocking actor to a private_thread. We don't actually
91 // need a reference count here, because the private thread calls
92 // intrusive_ptr_release_impl exactly once after running this function object.
93 class blocking_actor_runner : public resumable {
94 public:
blocking_actor_runner(blocking_actor * self,detail::private_thread * thread,bool hidden)95   explicit blocking_actor_runner(blocking_actor* self,
96                                  detail::private_thread* thread, bool hidden)
97     : self_(self), thread_(thread), hidden_(hidden) {
98     intrusive_ptr_add_ref(self->ctrl());
99   }
100 
subtype() const101   resumable::subtype_t subtype() const override {
102     return resumable::function_object;
103   }
104 
resume(execution_unit * ctx,size_t)105   resumable::resume_result resume(execution_unit* ctx, size_t) override {
106     CAF_PUSH_AID_FROM_PTR(self_);
107     self_->context(ctx);
108     self_->initialize();
109     error rsn;
110 #ifdef CAF_ENABLE_EXCEPTIONS
111     try {
112       self_->act();
113       rsn = self_->fail_state();
114     } catch (...) {
115       auto ptr = std::current_exception();
116       rsn = scheduled_actor::default_exception_handler(self_, ptr);
117     }
118     try {
119       self_->on_exit();
120     } catch (...) {
121       // simply ignore exception
122     }
123 #else
124     self_->act();
125     rsn = self_->fail_state();
126     self_->on_exit();
127 #endif
128     self_->cleanup(std::move(rsn), ctx);
129     intrusive_ptr_release(self_->ctrl());
130     auto& sys = ctx->system();
131     sys.release_private_thread(thread_);
132     if (!hidden_) {
133       [[maybe_unused]] auto count = sys.registry().dec_running();
134       CAF_LOG_DEBUG("actor" << self_->id() << "decreased running count to"
135                             << count);
136     }
137     return resumable::done;
138   }
139 
intrusive_ptr_add_ref_impl()140   void intrusive_ptr_add_ref_impl() override {
141     // nop
142   }
143 
intrusive_ptr_release_impl()144   void intrusive_ptr_release_impl() override {
145     delete this;
146   }
147 
148 private:
149   blocking_actor* self_;
150   detail::private_thread* thread_;
151   bool hidden_;
152 };
153 
154 } // namespace
155 
launch(execution_unit *,bool,bool hide)156 void blocking_actor::launch(execution_unit*, bool, bool hide) {
157   CAF_PUSH_AID_FROM_PTR(this);
158   CAF_LOG_TRACE(CAF_ARG(hide));
159   CAF_ASSERT(getf(is_blocking_flag));
160   // Try to acquire a thread before incrementing the running count, since this
161   // may throw.
162   auto& sys = home_system();
163   auto thread = sys.acquire_private_thread();
164   // Note: must *not* call register_at_system() to stop actor cleanup from
165   // decrementing the count before releasing the thread.
166   if (!hide) {
167     [[maybe_unused]] auto count = sys.registry().inc_running();
168     CAF_LOG_DEBUG("actor" << id() << "increased running count to" << count);
169   }
170   thread->resume(new blocking_actor_runner(this, thread, hide));
171 }
172 
173 blocking_actor::receive_while_helper
receive_while(std::function<bool ()> stmt)174 blocking_actor::receive_while(std::function<bool()> stmt) {
175   return {this, std::move(stmt)};
176 }
177 
178 blocking_actor::receive_while_helper
receive_while(const bool & ref)179 blocking_actor::receive_while(const bool& ref) {
180   return receive_while([&] { return ref; });
181 }
182 
await_all_other_actors_done()183 void blocking_actor::await_all_other_actors_done() {
184   system().registry().await_running_count_equal(getf(is_registered_flag) ? 1
185                                                                          : 0);
186 }
187 
act()188 void blocking_actor::act() {
189   CAF_LOG_TRACE("");
190   if (initial_behavior_fac_)
191     initial_behavior_fac_(this);
192 }
193 
fail_state(error err)194 void blocking_actor::fail_state(error err) {
195   fail_state_ = std::move(err);
196 }
197 
198 intrusive::task_result
operator ()(mailbox_element & x)199 blocking_actor::mailbox_visitor::operator()(mailbox_element& x) {
200   CAF_LOG_TRACE(CAF_ARG(x));
201   CAF_LOG_RECEIVE_EVENT((&x));
202   CAF_BEFORE_PROCESSING(self, x);
203   // Wrap the actual body for the function.
204   auto body = [this, &x] {
205     auto check_if_done = [&]() -> intrusive::task_result {
206       // Stop consuming items when reaching the end of the user-defined receive
207       // loop either via post or pre condition.
208       if (rcc.post() && rcc.pre())
209         return intrusive::task_result::resume;
210       done = true;
211       return intrusive::task_result::stop;
212     };
213     // Skip messages that don't match our message ID.
214     if (mid.is_response()) {
215       if (mid != x.mid) {
216         return intrusive::task_result::skip;
217       }
218     } else if (x.mid.is_response()) {
219       return intrusive::task_result::skip;
220     }
221     // Automatically unlink from actors after receiving an exit.
222     if (auto view = make_const_typed_message_view<exit_msg>(x.content()))
223       self->unlink_from(get<0>(view).source);
224     // Blocking actors can nest receives => push/pop `current_element_`
225     auto prev_element = self->current_element_;
226     self->current_element_ = &x;
227     auto g = detail::make_scope_guard(
228       [&] { self->current_element_ = prev_element; });
229     // Dispatch on x.
230     detail::default_invoke_result_visitor<blocking_actor> visitor{self};
231     if (bhvr.nested(visitor, x.content()))
232       return check_if_done();
233     // Blocking actors can have fallback handlers for catch-all rules.
234     auto sres = bhvr.fallback(self->current_element_->payload);
235     auto f = detail::make_overload(
236       [&](skip_t&) {
237         // Response handlers must get re-invoked with an error when
238         // receiving an unexpected message.
239         if (mid.is_response()) {
240           auto err = make_error(sec::unexpected_response, std::move(x.payload));
241           mailbox_element tmp{std::move(x.sender), x.mid, std::move(x.stages),
242                               make_message(std::move(err))};
243           self->current_element_ = &tmp;
244           bhvr.nested(tmp.content());
245           return check_if_done();
246         }
247         return intrusive::task_result::skip;
248       },
249       [&](auto& res) {
250         visitor(res);
251         return check_if_done();
252       });
253     return visit(f, sres);
254   };
255   // Post-process the returned value from the function body.
256   if (!self->getf(abstract_actor::collects_metrics_flag)) {
257     auto result = body();
258     if (result == intrusive::task_result::skip) {
259       CAF_AFTER_PROCESSING(self, invoke_message_result::skipped);
260       CAF_LOG_SKIP_EVENT();
261     } else {
262       CAF_AFTER_PROCESSING(self, invoke_message_result::consumed);
263       CAF_LOG_FINALIZE_EVENT();
264     }
265     return result;
266   } else {
267     auto t0 = std::chrono::steady_clock::now();
268     auto mbox_time = x.seconds_until(t0);
269     auto result = body();
270     if (result == intrusive::task_result::skip) {
271       CAF_AFTER_PROCESSING(self, invoke_message_result::skipped);
272       CAF_LOG_SKIP_EVENT();
273       auto& builtins = self->builtin_metrics();
274       telemetry::timer::observe(builtins.processing_time, t0);
275       builtins.mailbox_time->observe(mbox_time);
276       builtins.mailbox_size->dec();
277     } else {
278       CAF_AFTER_PROCESSING(self, invoke_message_result::consumed);
279       CAF_LOG_FINALIZE_EVENT();
280     }
281     return result;
282   }
283 }
284 
receive_impl(receive_cond & rcc,message_id mid,detail::blocking_behavior & bhvr)285 void blocking_actor::receive_impl(receive_cond& rcc, message_id mid,
286                                   detail::blocking_behavior& bhvr) {
287   CAF_LOG_TRACE(CAF_ARG(mid));
288   // Set to `true` by the visitor when done.
289   bool done = false;
290   // Make sure each receive sees all mailbox elements.
291   mailbox_visitor f{this, done, rcc, mid, bhvr};
292   mailbox().flush_cache();
293   // Check pre-condition once before entering the message consumption loop. The
294   // consumer performs any future check on pre and post conditions via
295   // check_if_done.
296   if (!rcc.pre())
297     return;
298   // Read incoming messages for as long as the user's receive loop accepts more
299   // messages.
300   do {
301     // Reset the timeout each iteration.
302     auto rel_tout = bhvr.timeout();
303     if (rel_tout == infinite) {
304       await_data();
305     } else {
306       auto abs_tout = std::chrono::high_resolution_clock::now();
307       abs_tout += rel_tout;
308       if (!await_data(abs_tout)) {
309         // Short-circuit "loop body".
310         bhvr.handle_timeout();
311         if (rcc.post() && rcc.pre())
312           continue;
313         else
314           return;
315       }
316     }
317     mailbox_.new_round(3, f);
318   } while (!done);
319 }
320 
await_data()321 void blocking_actor::await_data() {
322   mailbox().synchronized_await(mtx_, cv_);
323 }
324 
await_data(timeout_type timeout)325 bool blocking_actor::await_data(timeout_type timeout) {
326   return mailbox().synchronized_await(mtx_, cv_, timeout);
327 }
328 
dequeue()329 mailbox_element_ptr blocking_actor::dequeue() {
330   mailbox().flush_cache();
331   await_data();
332   mailbox().fetch_more();
333   auto& qs = mailbox().queue().queues();
334   auto result = get<mailbox_policy::urgent_queue_index>(qs).take_front();
335   if (!result)
336     result = get<mailbox_policy::normal_queue_index>(qs).take_front();
337   CAF_ASSERT(result != nullptr);
338   return result;
339 }
340 
varargs_tup_receive(receive_cond & rcc,message_id mid,std::tuple<behavior &> & tup)341 void blocking_actor::varargs_tup_receive(receive_cond& rcc, message_id mid,
342                                          std::tuple<behavior&>& tup) {
343   using namespace detail;
344   auto& bhvr = std::get<0>(tup);
345   if (bhvr.timeout() == infinite) {
346     auto fun = make_blocking_behavior(&bhvr);
347     receive_impl(rcc, mid, fun);
348   } else {
349     auto tmp = after(bhvr.timeout()) >> [&] { bhvr.handle_timeout(); };
350     auto fun = make_blocking_behavior(&bhvr, std::move(tmp));
351     receive_impl(rcc, mid, fun);
352   }
353 }
354 
build_pipeline(stream_slot,stream_slot,stream_manager_ptr)355 sec blocking_actor::build_pipeline(stream_slot, stream_slot,
356                                    stream_manager_ptr) {
357   CAF_LOG_ERROR("blocking_actor::build_pipeline called");
358   return sec::bad_function_call;
359 }
360 
attach_functor(const actor & x)361 size_t blocking_actor::attach_functor(const actor& x) {
362   return attach_functor(actor_cast<strong_actor_ptr>(x));
363 }
364 
attach_functor(const actor_addr & x)365 size_t blocking_actor::attach_functor(const actor_addr& x) {
366   return attach_functor(actor_cast<strong_actor_ptr>(x));
367 }
368 
attach_functor(const strong_actor_ptr & ptr)369 size_t blocking_actor::attach_functor(const strong_actor_ptr& ptr) {
370   if (!ptr)
371     return 0;
372   actor self{this};
373   auto f = [self](const error&) { caf::anon_send(self, wait_for_atom_v); };
374   ptr->get()->attach_functor(std::move(f));
375   return 1;
376 }
377 
cleanup(error && fail_state,execution_unit * host)378 bool blocking_actor::cleanup(error&& fail_state, execution_unit* host) {
379   if (!mailbox_.closed()) {
380     mailbox_.close();
381     // TODO: messages that are stuck in the cache can get lost
382     detail::sync_request_bouncer bounce{fail_state};
383     auto dropped = mailbox_.queue().new_round(1000, bounce).consumed_items;
384     while (dropped > 0) {
385       if (getf(abstract_actor::collects_metrics_flag)) {
386         auto val = static_cast<int64_t>(dropped);
387         metrics_.mailbox_size->dec(val);
388       }
389       dropped = mailbox_.queue().new_round(1000, bounce).consumed_items;
390     }
391   }
392   // Dispatch to parent's `cleanup` function.
393   return super::cleanup(std::move(fail_state), host);
394 }
395 
396 } // namespace caf
397