1 // This file is part of CAF, the C++ Actor Framework. See the file LICENSE in
2 // the main distribution directory for license terms and copyright or visit
3 // https://github.com/actor-framework/actor-framework/blob/master/LICENSE.
4
5 #define CAF_SUITE dynamic_spawn
6
7 #include "caf/actor_system.hpp"
8
9 #include "core-test.hpp"
10
11 #include <atomic>
12 #include <chrono>
13 #include <functional>
14 #include <iostream>
15 #include <stack>
16
17 #include "caf/all.hpp"
18
19 using namespace caf;
20
21 namespace {
22
23 std::atomic<long> s_max_actor_instances;
24 std::atomic<long> s_actor_instances;
25
inc_actor_instances()26 void inc_actor_instances() {
27 long v1 = ++s_actor_instances;
28 long v2 = s_max_actor_instances.load();
29 while (v1 > v2) {
30 s_max_actor_instances.compare_exchange_strong(v2, v1);
31 }
32 }
33
dec_actor_instances()34 void dec_actor_instances() {
35 --s_actor_instances;
36 }
37
38 class event_testee : public event_based_actor {
39 public:
event_testee(actor_config & cfg)40 event_testee(actor_config& cfg) : event_based_actor(cfg) {
41 inc_actor_instances();
42 wait4string.assign([=](const std::string&) { become(wait4int); },
43 [=](get_atom) { return "wait4string"; });
44 wait4float.assign([=](float) { become(wait4string); },
45 [=](get_atom) { return "wait4float"; });
46 wait4int.assign([=](int) { become(wait4float); },
47 [=](get_atom) { return "wait4int"; });
48 }
49
~event_testee()50 ~event_testee() override {
51 dec_actor_instances();
52 }
53
make_behavior()54 behavior make_behavior() override {
55 return wait4int;
56 }
57
58 behavior wait4string;
59 behavior wait4float;
60 behavior wait4int;
61 };
62
63 // quits after 5 timeouts
spawn_event_testee2(scoped_actor & parent)64 actor spawn_event_testee2(scoped_actor& parent) {
65 struct wrapper : event_based_actor {
66 actor parent;
67 wrapper(actor_config& cfg, actor parent_actor)
68 : event_based_actor(cfg), parent(std::move(parent_actor)) {
69 inc_actor_instances();
70 }
71 ~wrapper() override {
72 dec_actor_instances();
73 }
74 behavior wait4timeout(int remaining) {
75 return {
76 after(std::chrono::milliseconds(1)) >>
77 [=] {
78 CAF_MESSAGE("remaining: " << std::to_string(remaining));
79 if (remaining == 1) {
80 send(parent, ok_atom_v);
81 quit();
82 } else
83 become(wait4timeout(remaining - 1));
84 },
85 };
86 }
87 behavior make_behavior() override {
88 return wait4timeout(5);
89 }
90 };
91 return parent->spawn<wrapper>(parent);
92 }
93
94 class testee_actor : public blocking_actor {
95 public:
testee_actor(actor_config & cfg)96 testee_actor(actor_config& cfg) : blocking_actor(cfg) {
97 inc_actor_instances();
98 }
99
~testee_actor()100 ~testee_actor() override {
101 dec_actor_instances();
102 }
103
act()104 void act() override {
105 bool running = true;
106 receive_while(running)([&](int) { wait4float(); },
107 [&](get_atom) { return "wait4int"; },
108 [&](exit_msg& em) {
109 if (em.reason) {
110 fail_state(std::move(em.reason));
111 running = false;
112 }
113 });
114 }
115
116 private:
wait4string()117 void wait4string() {
118 bool string_received = false;
119 do_receive([&](const std::string&) { string_received = true; },
120 [&](get_atom) { return "wait4string"; })
121 .until([&] { return string_received; });
122 }
123
wait4float()124 void wait4float() {
125 bool float_received = false;
126 do_receive([&](float) { float_received = true; },
127 [&](get_atom) { return "wait4float"; })
128 .until([&] { return float_received; });
129 wait4string();
130 }
131 };
132
133 // self->receives one timeout and quits
134 class testee1 : public event_based_actor {
135 public:
testee1(actor_config & cfg)136 testee1(actor_config& cfg) : event_based_actor(cfg) {
137 inc_actor_instances();
138 }
139
~testee1()140 ~testee1() override {
141 dec_actor_instances();
142 }
143
make_behavior()144 behavior make_behavior() override {
145 return {
146 after(std::chrono::milliseconds(10)) >> [=] { unbecome(); },
147 };
148 }
149 };
150
151 class echo_actor : public event_based_actor {
152 public:
echo_actor(actor_config & cfg)153 echo_actor(actor_config& cfg) : event_based_actor(cfg) {
154 inc_actor_instances();
155 }
156
~echo_actor()157 ~echo_actor() override {
158 dec_actor_instances();
159 }
160
make_behavior()161 behavior make_behavior() override {
162 set_default_handler(reflect);
163 return {
164 [] {
165 // nop
166 },
167 };
168 }
169 };
170
171 class simple_mirror : public event_based_actor {
172 public:
simple_mirror(actor_config & cfg)173 simple_mirror(actor_config& cfg) : event_based_actor(cfg) {
174 inc_actor_instances();
175 }
176
~simple_mirror()177 ~simple_mirror() override {
178 dec_actor_instances();
179 }
180
make_behavior()181 behavior make_behavior() override {
182 set_default_handler(reflect);
183 return {
184 [] {
185 // nop
186 },
187 };
188 }
189 };
190
master(event_based_actor * self)191 behavior master(event_based_actor* self) {
192 return {
193 [=](ok_atom) {
194 CAF_MESSAGE("master: received done");
195 self->quit(exit_reason::user_shutdown);
196 },
197 };
198 }
199
slave(event_based_actor * self,const actor & master)200 behavior slave(event_based_actor* self, const actor& master) {
201 self->link_to(master);
202 self->set_exit_handler([=](exit_msg& msg) {
203 CAF_MESSAGE("slave: received exit message");
204 self->quit(msg.reason);
205 });
206 return {
207 [] {
208 // nop
209 },
210 };
211 }
212
213 class counting_actor : public event_based_actor {
214 public:
counting_actor(actor_config & cfg)215 counting_actor(actor_config& cfg) : event_based_actor(cfg) {
216 inc_actor_instances();
217 }
218
~counting_actor()219 ~counting_actor() override {
220 dec_actor_instances();
221 }
222
make_behavior()223 behavior make_behavior() override {
224 for (int i = 0; i < 100; ++i) {
225 send(this, ok_atom_v);
226 }
227 CAF_CHECK_EQUAL(mailbox().size(), 100u);
228 for (int i = 0; i < 100; ++i) {
229 send(this, ok_atom_v);
230 }
231 CAF_CHECK_EQUAL(mailbox().size(), 200u);
232 return {};
233 }
234 };
235
236 struct fixture {
237 actor_system_config cfg;
238 // put inside a union to control ctor/dtor timing
239 union {
240 actor_system system;
241 };
242
fixture__anon1872d5c10111::fixture243 fixture() {
244 new (&system) actor_system(cfg);
245 }
246
~fixture__anon1872d5c10111::fixture247 ~fixture() {
248 system.~actor_system();
249 // destructor of actor_system must make sure all
250 // destructors of all actors have been run
251 CAF_CHECK_EQUAL(s_actor_instances.load(), 0);
252 CAF_MESSAGE("max. # of actor instances: " << s_max_actor_instances.load());
253 }
254 };
255
256 } // namespace
257
CAF_TEST_FIXTURE_SCOPE(dynamic_spawn_tests,test_coordinator_fixture<>)258 CAF_TEST_FIXTURE_SCOPE(dynamic_spawn_tests, test_coordinator_fixture<>)
259
260 CAF_TEST(mirror) {
261 auto mirror = self->spawn<simple_mirror>();
262 auto dummy = self->spawn([=](event_based_actor* ptr) -> behavior {
263 ptr->send(mirror, "hello mirror");
264 return {
265 [](const std::string& msg) { CAF_CHECK_EQUAL(msg, "hello mirror"); }};
266 });
267 run();
268 /*
269 self->send(mirror, "hello mirror");
270 run();
271 self->receive (
272 [](const std::string& msg) {
273 CAF_CHECK_EQUAL(msg, "hello mirror");
274 }
275 );
276 */
277 }
278
279 CAF_TEST_FIXTURE_SCOPE_END()
280
CAF_TEST_FIXTURE_SCOPE(atom_tests,fixture)281 CAF_TEST_FIXTURE_SCOPE(atom_tests, fixture)
282
283 CAF_TEST(count_mailbox) {
284 system.spawn<counting_actor>();
285 }
286
CAF_TEST(detached_actors_and_schedulued_actors)287 CAF_TEST(detached_actors_and_schedulued_actors) {
288 scoped_actor self{system};
289 // check whether detached actors and scheduled actors interact w/o errors
290 auto m = system.spawn<detached>(master);
291 system.spawn(slave, m);
292 system.spawn(slave, m);
293 self->send(m, ok_atom_v);
294 }
295
CAF_TEST(self_receive_with_zero_timeout)296 CAF_TEST(self_receive_with_zero_timeout) {
297 scoped_actor self{system};
298 self->receive([&] { CAF_ERROR("Unexpected message"); },
299 after(std::chrono::seconds(0)) >>
300 [] {
301 // mailbox empty
302 });
303 }
304
CAF_TEST(detached_mirror)305 CAF_TEST(detached_mirror) {
306 scoped_actor self{system};
307 auto mirror = self->spawn<simple_mirror, detached>();
308 self->send(mirror, "hello mirror");
309 self->receive(
310 [](const std::string& msg) { CAF_CHECK_EQUAL(msg, "hello mirror"); });
311 }
312
CAF_TEST(send_to_self)313 CAF_TEST(send_to_self) {
314 scoped_actor self{system};
315 self->send(self, 1, 2, 3, true);
316 self->receive([](int a, int b, int c, bool d) {
317 CAF_CHECK_EQUAL(a, 1);
318 CAF_CHECK_EQUAL(b, 2);
319 CAF_CHECK_EQUAL(c, 3);
320 CAF_CHECK_EQUAL(d, true);
321 });
322 self->send(self, message{});
323 self->receive([] {});
324 }
325
CAF_TEST(echo_actor_messaging)326 CAF_TEST(echo_actor_messaging) {
327 scoped_actor self{system};
328 auto mecho = system.spawn<echo_actor>();
329 self->send(mecho, "hello echo");
330 self->receive(
331 [](const std::string& arg) { CAF_CHECK_EQUAL(arg, "hello echo"); });
332 }
333
CAF_TEST(delayed_send)334 CAF_TEST(delayed_send) {
335 scoped_actor self{system};
336 self->delayed_send(self, std::chrono::milliseconds(1), 1, 2, 3);
337 self->receive([](int a, int b, int c) {
338 CAF_CHECK_EQUAL(a, 1);
339 CAF_CHECK_EQUAL(b, 2);
340 CAF_CHECK_EQUAL(c, 3);
341 });
342 }
343
CAF_TEST(delayed_spawn)344 CAF_TEST(delayed_spawn) {
345 scoped_actor self{system};
346 self->receive(after(std::chrono::milliseconds(1)) >> [] {});
347 system.spawn<testee1>();
348 }
349
CAF_TEST(spawn_event_testee2_test)350 CAF_TEST(spawn_event_testee2_test) {
351 scoped_actor self{system};
352 spawn_event_testee2(self);
353 self->receive([](ok_atom) { CAF_MESSAGE("Received 'ok'"); });
354 }
355
CAF_TEST(function_spawn)356 CAF_TEST(function_spawn) {
357 scoped_actor self{system};
358 auto f = [](const std::string& name) -> behavior {
359 return ([name](get_atom) { return make_result(name_atom_v, name); });
360 };
361 auto a1 = system.spawn(f, "alice");
362 auto a2 = system.spawn(f, "bob");
363 self->send(a1, get_atom_v);
364 self->receive([&](name_atom, const std::string& name) {
365 CAF_CHECK_EQUAL(name, "alice");
366 });
367 self->send(a2, get_atom_v);
368 self->receive(
369 [&](name_atom, const std::string& name) { CAF_CHECK_EQUAL(name, "bob"); });
370 self->send_exit(a1, exit_reason::user_shutdown);
371 self->send_exit(a2, exit_reason::user_shutdown);
372 }
373
374 using typed_testee = typed_actor<replies_to<abc_atom>::with<std::string>>;
375
testee()376 typed_testee::behavior_type testee() {
377 return {[](abc_atom) {
378 CAF_MESSAGE("received 'abc'");
379 return "abc";
380 }};
381 }
382
CAF_TEST(typed_await)383 CAF_TEST(typed_await) {
384 scoped_actor self{system};
385 auto f = make_function_view(system.spawn(testee));
386 CAF_CHECK_EQUAL(f(abc_atom_v), "abc");
387 }
388
389 // tests attach_functor() inside of an actor's constructor
CAF_TEST(constructor_attach)390 CAF_TEST(constructor_attach) {
391 class testee : public event_based_actor {
392 public:
393 testee(actor_config& cfg, actor buddy)
394 : event_based_actor(cfg), buddy_(buddy) {
395 attach_functor(
396 [=](const error& reason) { send(buddy, ok_atom_v, reason); });
397 }
398
399 behavior make_behavior() override {
400 return {
401 [] {
402 // nop
403 },
404 };
405 }
406
407 void on_exit() override {
408 destroy(buddy_);
409 }
410
411 private:
412 actor buddy_;
413 };
414 class spawner : public event_based_actor {
415 public:
416 spawner(actor_config& cfg)
417 : event_based_actor(cfg),
418 downs_(0),
419 testee_(spawn<testee, monitored>(this)) {
420 set_down_handler([=](down_msg& msg) {
421 CAF_CHECK_EQUAL(msg.reason, exit_reason::user_shutdown);
422 if (++downs_ == 2)
423 quit(msg.reason);
424 });
425 set_exit_handler(
426 [=](exit_msg& msg) { send_exit(testee_, std::move(msg.reason)); });
427 }
428
429 behavior make_behavior() override {
430 return {
431 [=](ok_atom, const error& reason) {
432 CAF_CHECK_EQUAL(reason, exit_reason::user_shutdown);
433 if (++downs_ == 2)
434 quit(reason);
435 },
436 };
437 }
438
439 void on_exit() override {
440 CAF_MESSAGE("spawner::on_exit()");
441 destroy(testee_);
442 }
443
444 private:
445 int downs_;
446 actor testee_;
447 };
448 anon_send_exit(system.spawn<spawner>(), exit_reason::user_shutdown);
449 }
450
CAF_TEST(kill_the_immortal)451 CAF_TEST(kill_the_immortal) {
452 auto wannabe_immortal = system.spawn([](event_based_actor* self) -> behavior {
453 self->set_exit_handler([](local_actor*, exit_msg&) {
454 // nop
455 });
456 return {
457 [] {
458 // nop
459 },
460 };
461 });
462 scoped_actor self{system};
463 self->send_exit(wannabe_immortal, exit_reason::kill);
464 self->wait_for(wannabe_immortal);
465 }
466
CAF_TEST(move_only_argument)467 CAF_TEST(move_only_argument) {
468 using unique_int = std::unique_ptr<int>;
469 unique_int uptr{new int(42)};
470 auto wrapper = [](event_based_actor* self, unique_int ptr) -> behavior {
471 auto i = *ptr;
472 return {
473 [=](float) {
474 self->quit();
475 return i;
476 },
477 };
478 };
479 auto f = make_function_view(system.spawn(wrapper, std::move(uptr)));
480 CAF_CHECK_EQUAL(to_tuple<int>(unbox(f(1.f))), std::make_tuple(42));
481 }
482
483 CAF_TEST(move - only function object) {
484 struct move_only_fun {
485 move_only_fun() = default;
486 move_only_fun(const move_only_fun&) = delete;
487 move_only_fun(move_only_fun&&) = default;
488
operator ()move_only_fun489 behavior operator()(event_based_actor*) {
490 return {};
491 }
492 };
493 actor_system_config cfg;
494 actor_system sys{cfg};
495 move_only_fun f;
496 sys.spawn(std::move(f));
497 }
498
499 CAF_TEST_FIXTURE_SCOPE_END()
500