1 // This file is part of CAF, the C++ Actor Framework. See the file LICENSE in
2 // the main distribution directory for license terms and copyright or visit
3 // https://github.com/actor-framework/actor-framework/blob/master/LICENSE.
4
5 #include "caf/blocking_actor.hpp"
6
7 #include <utility>
8
9 #include "caf/actor_registry.hpp"
10 #include "caf/actor_system.hpp"
11 #include "caf/detail/default_invoke_result_visitor.hpp"
12 #include "caf/detail/invoke_result_visitor.hpp"
13 #include "caf/detail/private_thread.hpp"
14 #include "caf/detail/set_thread_name.hpp"
15 #include "caf/detail/sync_request_bouncer.hpp"
16 #include "caf/invoke_message_result.hpp"
17 #include "caf/logger.hpp"
18 #include "caf/scheduled_actor.hpp"
19 #include "caf/telemetry/timer.hpp"
20
21 namespace caf {
22
~receive_cond()23 blocking_actor::receive_cond::~receive_cond() {
24 // nop
25 }
26
pre()27 bool blocking_actor::receive_cond::pre() {
28 return true;
29 }
30
post()31 bool blocking_actor::receive_cond::post() {
32 return true;
33 }
34
~accept_one_cond()35 blocking_actor::accept_one_cond::~accept_one_cond() {
36 // nop
37 }
38
post()39 bool blocking_actor::accept_one_cond::post() {
40 return false;
41 }
42
blocking_actor(actor_config & cfg)43 blocking_actor::blocking_actor(actor_config& cfg)
44 : super(cfg.add_flag(local_actor::is_blocking_flag)),
45 mailbox_(unit, unit, unit) {
46 // nop
47 }
48
~blocking_actor()49 blocking_actor::~blocking_actor() {
50 // avoid weak-vtables warning
51 }
52
enqueue(mailbox_element_ptr ptr,execution_unit *)53 void blocking_actor::enqueue(mailbox_element_ptr ptr, execution_unit*) {
54 CAF_ASSERT(ptr != nullptr);
55 CAF_ASSERT(getf(is_blocking_flag));
56 CAF_LOG_TRACE(CAF_ARG(*ptr));
57 CAF_LOG_SEND_EVENT(ptr);
58 auto mid = ptr->mid;
59 auto src = ptr->sender;
60 auto collects_metrics = getf(abstract_actor::collects_metrics_flag);
61 if (collects_metrics) {
62 ptr->set_enqueue_time();
63 metrics_.mailbox_size->inc();
64 }
65 // returns false if mailbox has been closed
66 if (!mailbox().synchronized_push_back(mtx_, cv_, std::move(ptr))) {
67 CAF_LOG_REJECT_EVENT();
68 home_system().base_metrics().rejected_messages->inc();
69 if (collects_metrics)
70 metrics_.mailbox_size->dec();
71 if (mid.is_request()) {
72 detail::sync_request_bouncer srb{exit_reason()};
73 srb(src, mid);
74 }
75 } else {
76 CAF_LOG_ACCEPT_EVENT(false);
77 }
78 }
79
peek_at_next_mailbox_element()80 mailbox_element* blocking_actor::peek_at_next_mailbox_element() {
81 return mailbox().closed() || mailbox().blocked() ? nullptr : mailbox().peek();
82 }
83
name() const84 const char* blocking_actor::name() const {
85 return "user.blocking-actor";
86 }
87
88 namespace {
89
90 // Runner for passing a blocking actor to a private_thread. We don't actually
91 // need a reference count here, because the private thread calls
92 // intrusive_ptr_release_impl exactly once after running this function object.
93 class blocking_actor_runner : public resumable {
94 public:
blocking_actor_runner(blocking_actor * self,detail::private_thread * thread,bool hidden)95 explicit blocking_actor_runner(blocking_actor* self,
96 detail::private_thread* thread, bool hidden)
97 : self_(self), thread_(thread), hidden_(hidden) {
98 intrusive_ptr_add_ref(self->ctrl());
99 }
100
subtype() const101 resumable::subtype_t subtype() const override {
102 return resumable::function_object;
103 }
104
resume(execution_unit * ctx,size_t)105 resumable::resume_result resume(execution_unit* ctx, size_t) override {
106 CAF_PUSH_AID_FROM_PTR(self_);
107 self_->context(ctx);
108 self_->initialize();
109 error rsn;
110 #ifdef CAF_ENABLE_EXCEPTIONS
111 try {
112 self_->act();
113 rsn = self_->fail_state();
114 } catch (...) {
115 auto ptr = std::current_exception();
116 rsn = scheduled_actor::default_exception_handler(self_, ptr);
117 }
118 try {
119 self_->on_exit();
120 } catch (...) {
121 // simply ignore exception
122 }
123 #else
124 self_->act();
125 rsn = self_->fail_state();
126 self_->on_exit();
127 #endif
128 self_->cleanup(std::move(rsn), ctx);
129 intrusive_ptr_release(self_->ctrl());
130 auto& sys = ctx->system();
131 sys.release_private_thread(thread_);
132 if (!hidden_) {
133 [[maybe_unused]] auto count = sys.registry().dec_running();
134 CAF_LOG_DEBUG("actor" << self_->id() << "decreased running count to"
135 << count);
136 }
137 return resumable::done;
138 }
139
intrusive_ptr_add_ref_impl()140 void intrusive_ptr_add_ref_impl() override {
141 // nop
142 }
143
intrusive_ptr_release_impl()144 void intrusive_ptr_release_impl() override {
145 delete this;
146 }
147
148 private:
149 blocking_actor* self_;
150 detail::private_thread* thread_;
151 bool hidden_;
152 };
153
154 } // namespace
155
launch(execution_unit *,bool,bool hide)156 void blocking_actor::launch(execution_unit*, bool, bool hide) {
157 CAF_PUSH_AID_FROM_PTR(this);
158 CAF_LOG_TRACE(CAF_ARG(hide));
159 CAF_ASSERT(getf(is_blocking_flag));
160 // Try to acquire a thread before incrementing the running count, since this
161 // may throw.
162 auto& sys = home_system();
163 auto thread = sys.acquire_private_thread();
164 // Note: must *not* call register_at_system() to stop actor cleanup from
165 // decrementing the count before releasing the thread.
166 if (!hide) {
167 [[maybe_unused]] auto count = sys.registry().inc_running();
168 CAF_LOG_DEBUG("actor" << id() << "increased running count to" << count);
169 }
170 thread->resume(new blocking_actor_runner(this, thread, hide));
171 }
172
173 blocking_actor::receive_while_helper
receive_while(std::function<bool ()> stmt)174 blocking_actor::receive_while(std::function<bool()> stmt) {
175 return {this, std::move(stmt)};
176 }
177
178 blocking_actor::receive_while_helper
receive_while(const bool & ref)179 blocking_actor::receive_while(const bool& ref) {
180 return receive_while([&] { return ref; });
181 }
182
await_all_other_actors_done()183 void blocking_actor::await_all_other_actors_done() {
184 system().registry().await_running_count_equal(getf(is_registered_flag) ? 1
185 : 0);
186 }
187
act()188 void blocking_actor::act() {
189 CAF_LOG_TRACE("");
190 if (initial_behavior_fac_)
191 initial_behavior_fac_(this);
192 }
193
fail_state(error err)194 void blocking_actor::fail_state(error err) {
195 fail_state_ = std::move(err);
196 }
197
198 intrusive::task_result
operator ()(mailbox_element & x)199 blocking_actor::mailbox_visitor::operator()(mailbox_element& x) {
200 CAF_LOG_TRACE(CAF_ARG(x));
201 CAF_LOG_RECEIVE_EVENT((&x));
202 CAF_BEFORE_PROCESSING(self, x);
203 // Wrap the actual body for the function.
204 auto body = [this, &x] {
205 auto check_if_done = [&]() -> intrusive::task_result {
206 // Stop consuming items when reaching the end of the user-defined receive
207 // loop either via post or pre condition.
208 if (rcc.post() && rcc.pre())
209 return intrusive::task_result::resume;
210 done = true;
211 return intrusive::task_result::stop;
212 };
213 // Skip messages that don't match our message ID.
214 if (mid.is_response()) {
215 if (mid != x.mid) {
216 return intrusive::task_result::skip;
217 }
218 } else if (x.mid.is_response()) {
219 return intrusive::task_result::skip;
220 }
221 // Automatically unlink from actors after receiving an exit.
222 if (auto view = make_const_typed_message_view<exit_msg>(x.content()))
223 self->unlink_from(get<0>(view).source);
224 // Blocking actors can nest receives => push/pop `current_element_`
225 auto prev_element = self->current_element_;
226 self->current_element_ = &x;
227 auto g = detail::make_scope_guard(
228 [&] { self->current_element_ = prev_element; });
229 // Dispatch on x.
230 detail::default_invoke_result_visitor<blocking_actor> visitor{self};
231 if (bhvr.nested(visitor, x.content()))
232 return check_if_done();
233 // Blocking actors can have fallback handlers for catch-all rules.
234 auto sres = bhvr.fallback(self->current_element_->payload);
235 auto f = detail::make_overload(
236 [&](skip_t&) {
237 // Response handlers must get re-invoked with an error when
238 // receiving an unexpected message.
239 if (mid.is_response()) {
240 auto err = make_error(sec::unexpected_response, std::move(x.payload));
241 mailbox_element tmp{std::move(x.sender), x.mid, std::move(x.stages),
242 make_message(std::move(err))};
243 self->current_element_ = &tmp;
244 bhvr.nested(tmp.content());
245 return check_if_done();
246 }
247 return intrusive::task_result::skip;
248 },
249 [&](auto& res) {
250 visitor(res);
251 return check_if_done();
252 });
253 return visit(f, sres);
254 };
255 // Post-process the returned value from the function body.
256 if (!self->getf(abstract_actor::collects_metrics_flag)) {
257 auto result = body();
258 if (result == intrusive::task_result::skip) {
259 CAF_AFTER_PROCESSING(self, invoke_message_result::skipped);
260 CAF_LOG_SKIP_EVENT();
261 } else {
262 CAF_AFTER_PROCESSING(self, invoke_message_result::consumed);
263 CAF_LOG_FINALIZE_EVENT();
264 }
265 return result;
266 } else {
267 auto t0 = std::chrono::steady_clock::now();
268 auto mbox_time = x.seconds_until(t0);
269 auto result = body();
270 if (result == intrusive::task_result::skip) {
271 CAF_AFTER_PROCESSING(self, invoke_message_result::skipped);
272 CAF_LOG_SKIP_EVENT();
273 auto& builtins = self->builtin_metrics();
274 telemetry::timer::observe(builtins.processing_time, t0);
275 builtins.mailbox_time->observe(mbox_time);
276 builtins.mailbox_size->dec();
277 } else {
278 CAF_AFTER_PROCESSING(self, invoke_message_result::consumed);
279 CAF_LOG_FINALIZE_EVENT();
280 }
281 return result;
282 }
283 }
284
receive_impl(receive_cond & rcc,message_id mid,detail::blocking_behavior & bhvr)285 void blocking_actor::receive_impl(receive_cond& rcc, message_id mid,
286 detail::blocking_behavior& bhvr) {
287 CAF_LOG_TRACE(CAF_ARG(mid));
288 // Set to `true` by the visitor when done.
289 bool done = false;
290 // Make sure each receive sees all mailbox elements.
291 mailbox_visitor f{this, done, rcc, mid, bhvr};
292 mailbox().flush_cache();
293 // Check pre-condition once before entering the message consumption loop. The
294 // consumer performs any future check on pre and post conditions via
295 // check_if_done.
296 if (!rcc.pre())
297 return;
298 // Read incoming messages for as long as the user's receive loop accepts more
299 // messages.
300 do {
301 // Reset the timeout each iteration.
302 auto rel_tout = bhvr.timeout();
303 if (rel_tout == infinite) {
304 await_data();
305 } else {
306 auto abs_tout = std::chrono::high_resolution_clock::now();
307 abs_tout += rel_tout;
308 if (!await_data(abs_tout)) {
309 // Short-circuit "loop body".
310 bhvr.handle_timeout();
311 if (rcc.post() && rcc.pre())
312 continue;
313 else
314 return;
315 }
316 }
317 mailbox_.new_round(3, f);
318 } while (!done);
319 }
320
await_data()321 void blocking_actor::await_data() {
322 mailbox().synchronized_await(mtx_, cv_);
323 }
324
await_data(timeout_type timeout)325 bool blocking_actor::await_data(timeout_type timeout) {
326 return mailbox().synchronized_await(mtx_, cv_, timeout);
327 }
328
dequeue()329 mailbox_element_ptr blocking_actor::dequeue() {
330 mailbox().flush_cache();
331 await_data();
332 mailbox().fetch_more();
333 auto& qs = mailbox().queue().queues();
334 auto result = get<mailbox_policy::urgent_queue_index>(qs).take_front();
335 if (!result)
336 result = get<mailbox_policy::normal_queue_index>(qs).take_front();
337 CAF_ASSERT(result != nullptr);
338 return result;
339 }
340
varargs_tup_receive(receive_cond & rcc,message_id mid,std::tuple<behavior &> & tup)341 void blocking_actor::varargs_tup_receive(receive_cond& rcc, message_id mid,
342 std::tuple<behavior&>& tup) {
343 using namespace detail;
344 auto& bhvr = std::get<0>(tup);
345 if (bhvr.timeout() == infinite) {
346 auto fun = make_blocking_behavior(&bhvr);
347 receive_impl(rcc, mid, fun);
348 } else {
349 auto tmp = after(bhvr.timeout()) >> [&] { bhvr.handle_timeout(); };
350 auto fun = make_blocking_behavior(&bhvr, std::move(tmp));
351 receive_impl(rcc, mid, fun);
352 }
353 }
354
build_pipeline(stream_slot,stream_slot,stream_manager_ptr)355 sec blocking_actor::build_pipeline(stream_slot, stream_slot,
356 stream_manager_ptr) {
357 CAF_LOG_ERROR("blocking_actor::build_pipeline called");
358 return sec::bad_function_call;
359 }
360
attach_functor(const actor & x)361 size_t blocking_actor::attach_functor(const actor& x) {
362 return attach_functor(actor_cast<strong_actor_ptr>(x));
363 }
364
attach_functor(const actor_addr & x)365 size_t blocking_actor::attach_functor(const actor_addr& x) {
366 return attach_functor(actor_cast<strong_actor_ptr>(x));
367 }
368
attach_functor(const strong_actor_ptr & ptr)369 size_t blocking_actor::attach_functor(const strong_actor_ptr& ptr) {
370 if (!ptr)
371 return 0;
372 actor self{this};
373 auto f = [self](const error&) { caf::anon_send(self, wait_for_atom_v); };
374 ptr->get()->attach_functor(std::move(f));
375 return 1;
376 }
377
cleanup(error && fail_state,execution_unit * host)378 bool blocking_actor::cleanup(error&& fail_state, execution_unit* host) {
379 if (!mailbox_.closed()) {
380 mailbox_.close();
381 // TODO: messages that are stuck in the cache can get lost
382 detail::sync_request_bouncer bounce{fail_state};
383 auto dropped = mailbox_.queue().new_round(1000, bounce).consumed_items;
384 while (dropped > 0) {
385 if (getf(abstract_actor::collects_metrics_flag)) {
386 auto val = static_cast<int64_t>(dropped);
387 metrics_.mailbox_size->dec(val);
388 }
389 dropped = mailbox_.queue().new_round(1000, bounce).consumed_items;
390 }
391 }
392 // Dispatch to parent's `cleanup` function.
393 return super::cleanup(std::move(fail_state), host);
394 }
395
396 } // namespace caf
397