1 // This file is part of CAF, the C++ Actor Framework. See the file LICENSE in
2 // the main distribution directory for license terms and copyright or visit
3 // https://github.com/actor-framework/actor-framework/blob/master/LICENSE.
4 
5 #define CAF_SUITE dynamic_spawn
6 
7 #include "caf/actor_system.hpp"
8 
9 #include "core-test.hpp"
10 
11 #include <atomic>
12 #include <chrono>
13 #include <functional>
14 #include <iostream>
15 #include <stack>
16 
17 #include "caf/all.hpp"
18 
19 using namespace caf;
20 
21 namespace {
22 
23 std::atomic<long> s_max_actor_instances;
24 std::atomic<long> s_actor_instances;
25 
inc_actor_instances()26 void inc_actor_instances() {
27   long v1 = ++s_actor_instances;
28   long v2 = s_max_actor_instances.load();
29   while (v1 > v2) {
30     s_max_actor_instances.compare_exchange_strong(v2, v1);
31   }
32 }
33 
dec_actor_instances()34 void dec_actor_instances() {
35   --s_actor_instances;
36 }
37 
38 class event_testee : public event_based_actor {
39 public:
event_testee(actor_config & cfg)40   event_testee(actor_config& cfg) : event_based_actor(cfg) {
41     inc_actor_instances();
42     wait4string.assign([=](const std::string&) { become(wait4int); },
43                        [=](get_atom) { return "wait4string"; });
44     wait4float.assign([=](float) { become(wait4string); },
45                       [=](get_atom) { return "wait4float"; });
46     wait4int.assign([=](int) { become(wait4float); },
47                     [=](get_atom) { return "wait4int"; });
48   }
49 
~event_testee()50   ~event_testee() override {
51     dec_actor_instances();
52   }
53 
make_behavior()54   behavior make_behavior() override {
55     return wait4int;
56   }
57 
58   behavior wait4string;
59   behavior wait4float;
60   behavior wait4int;
61 };
62 
63 // quits after 5 timeouts
spawn_event_testee2(scoped_actor & parent)64 actor spawn_event_testee2(scoped_actor& parent) {
65   struct wrapper : event_based_actor {
66     actor parent;
67     wrapper(actor_config& cfg, actor parent_actor)
68       : event_based_actor(cfg), parent(std::move(parent_actor)) {
69       inc_actor_instances();
70     }
71     ~wrapper() override {
72       dec_actor_instances();
73     }
74     behavior wait4timeout(int remaining) {
75       return {
76         after(std::chrono::milliseconds(1)) >>
77           [=] {
78             CAF_MESSAGE("remaining: " << std::to_string(remaining));
79             if (remaining == 1) {
80               send(parent, ok_atom_v);
81               quit();
82             } else
83               become(wait4timeout(remaining - 1));
84           },
85       };
86     }
87     behavior make_behavior() override {
88       return wait4timeout(5);
89     }
90   };
91   return parent->spawn<wrapper>(parent);
92 }
93 
94 class testee_actor : public blocking_actor {
95 public:
testee_actor(actor_config & cfg)96   testee_actor(actor_config& cfg) : blocking_actor(cfg) {
97     inc_actor_instances();
98   }
99 
~testee_actor()100   ~testee_actor() override {
101     dec_actor_instances();
102   }
103 
act()104   void act() override {
105     bool running = true;
106     receive_while(running)([&](int) { wait4float(); },
107                            [&](get_atom) { return "wait4int"; },
108                            [&](exit_msg& em) {
109                              if (em.reason) {
110                                fail_state(std::move(em.reason));
111                                running = false;
112                              }
113                            });
114   }
115 
116 private:
wait4string()117   void wait4string() {
118     bool string_received = false;
119     do_receive([&](const std::string&) { string_received = true; },
120                [&](get_atom) { return "wait4string"; })
121       .until([&] { return string_received; });
122   }
123 
wait4float()124   void wait4float() {
125     bool float_received = false;
126     do_receive([&](float) { float_received = true; },
127                [&](get_atom) { return "wait4float"; })
128       .until([&] { return float_received; });
129     wait4string();
130   }
131 };
132 
133 // self->receives one timeout and quits
134 class testee1 : public event_based_actor {
135 public:
testee1(actor_config & cfg)136   testee1(actor_config& cfg) : event_based_actor(cfg) {
137     inc_actor_instances();
138   }
139 
~testee1()140   ~testee1() override {
141     dec_actor_instances();
142   }
143 
make_behavior()144   behavior make_behavior() override {
145     return {
146       after(std::chrono::milliseconds(10)) >> [=] { unbecome(); },
147     };
148   }
149 };
150 
151 class echo_actor : public event_based_actor {
152 public:
echo_actor(actor_config & cfg)153   echo_actor(actor_config& cfg) : event_based_actor(cfg) {
154     inc_actor_instances();
155   }
156 
~echo_actor()157   ~echo_actor() override {
158     dec_actor_instances();
159   }
160 
make_behavior()161   behavior make_behavior() override {
162     set_default_handler(reflect);
163     return {
164       [] {
165         // nop
166       },
167     };
168   }
169 };
170 
171 class simple_mirror : public event_based_actor {
172 public:
simple_mirror(actor_config & cfg)173   simple_mirror(actor_config& cfg) : event_based_actor(cfg) {
174     inc_actor_instances();
175   }
176 
~simple_mirror()177   ~simple_mirror() override {
178     dec_actor_instances();
179   }
180 
make_behavior()181   behavior make_behavior() override {
182     set_default_handler(reflect);
183     return {
184       [] {
185         // nop
186       },
187     };
188   }
189 };
190 
master(event_based_actor * self)191 behavior master(event_based_actor* self) {
192   return {
193     [=](ok_atom) {
194       CAF_MESSAGE("master: received done");
195       self->quit(exit_reason::user_shutdown);
196     },
197   };
198 }
199 
slave(event_based_actor * self,const actor & master)200 behavior slave(event_based_actor* self, const actor& master) {
201   self->link_to(master);
202   self->set_exit_handler([=](exit_msg& msg) {
203     CAF_MESSAGE("slave: received exit message");
204     self->quit(msg.reason);
205   });
206   return {
207     [] {
208       // nop
209     },
210   };
211 }
212 
213 class counting_actor : public event_based_actor {
214 public:
counting_actor(actor_config & cfg)215   counting_actor(actor_config& cfg) : event_based_actor(cfg) {
216     inc_actor_instances();
217   }
218 
~counting_actor()219   ~counting_actor() override {
220     dec_actor_instances();
221   }
222 
make_behavior()223   behavior make_behavior() override {
224     for (int i = 0; i < 100; ++i) {
225       send(this, ok_atom_v);
226     }
227     CAF_CHECK_EQUAL(mailbox().size(), 100u);
228     for (int i = 0; i < 100; ++i) {
229       send(this, ok_atom_v);
230     }
231     CAF_CHECK_EQUAL(mailbox().size(), 200u);
232     return {};
233   }
234 };
235 
236 struct fixture {
237   actor_system_config cfg;
238   // put inside a union to control ctor/dtor timing
239   union {
240     actor_system system;
241   };
242 
fixture__anon1872d5c10111::fixture243   fixture() {
244     new (&system) actor_system(cfg);
245   }
246 
~fixture__anon1872d5c10111::fixture247   ~fixture() {
248     system.~actor_system();
249     // destructor of actor_system must make sure all
250     // destructors of all actors have been run
251     CAF_CHECK_EQUAL(s_actor_instances.load(), 0);
252     CAF_MESSAGE("max. # of actor instances: " << s_max_actor_instances.load());
253   }
254 };
255 
256 } // namespace
257 
CAF_TEST_FIXTURE_SCOPE(dynamic_spawn_tests,test_coordinator_fixture<>)258 CAF_TEST_FIXTURE_SCOPE(dynamic_spawn_tests, test_coordinator_fixture<>)
259 
260 CAF_TEST(mirror) {
261   auto mirror = self->spawn<simple_mirror>();
262   auto dummy = self->spawn([=](event_based_actor* ptr) -> behavior {
263     ptr->send(mirror, "hello mirror");
264     return {
265       [](const std::string& msg) { CAF_CHECK_EQUAL(msg, "hello mirror"); }};
266   });
267   run();
268   /*
269   self->send(mirror, "hello mirror");
270   run();
271   self->receive (
272     [](const std::string& msg) {
273       CAF_CHECK_EQUAL(msg, "hello mirror");
274     }
275   );
276   */
277 }
278 
279 CAF_TEST_FIXTURE_SCOPE_END()
280 
CAF_TEST_FIXTURE_SCOPE(atom_tests,fixture)281 CAF_TEST_FIXTURE_SCOPE(atom_tests, fixture)
282 
283 CAF_TEST(count_mailbox) {
284   system.spawn<counting_actor>();
285 }
286 
CAF_TEST(detached_actors_and_schedulued_actors)287 CAF_TEST(detached_actors_and_schedulued_actors) {
288   scoped_actor self{system};
289   // check whether detached actors and scheduled actors interact w/o errors
290   auto m = system.spawn<detached>(master);
291   system.spawn(slave, m);
292   system.spawn(slave, m);
293   self->send(m, ok_atom_v);
294 }
295 
CAF_TEST(self_receive_with_zero_timeout)296 CAF_TEST(self_receive_with_zero_timeout) {
297   scoped_actor self{system};
298   self->receive([&] { CAF_ERROR("Unexpected message"); },
299                 after(std::chrono::seconds(0)) >>
300                   [] {
301                     // mailbox empty
302                   });
303 }
304 
CAF_TEST(detached_mirror)305 CAF_TEST(detached_mirror) {
306   scoped_actor self{system};
307   auto mirror = self->spawn<simple_mirror, detached>();
308   self->send(mirror, "hello mirror");
309   self->receive(
310     [](const std::string& msg) { CAF_CHECK_EQUAL(msg, "hello mirror"); });
311 }
312 
CAF_TEST(send_to_self)313 CAF_TEST(send_to_self) {
314   scoped_actor self{system};
315   self->send(self, 1, 2, 3, true);
316   self->receive([](int a, int b, int c, bool d) {
317     CAF_CHECK_EQUAL(a, 1);
318     CAF_CHECK_EQUAL(b, 2);
319     CAF_CHECK_EQUAL(c, 3);
320     CAF_CHECK_EQUAL(d, true);
321   });
322   self->send(self, message{});
323   self->receive([] {});
324 }
325 
CAF_TEST(echo_actor_messaging)326 CAF_TEST(echo_actor_messaging) {
327   scoped_actor self{system};
328   auto mecho = system.spawn<echo_actor>();
329   self->send(mecho, "hello echo");
330   self->receive(
331     [](const std::string& arg) { CAF_CHECK_EQUAL(arg, "hello echo"); });
332 }
333 
CAF_TEST(delayed_send)334 CAF_TEST(delayed_send) {
335   scoped_actor self{system};
336   self->delayed_send(self, std::chrono::milliseconds(1), 1, 2, 3);
337   self->receive([](int a, int b, int c) {
338     CAF_CHECK_EQUAL(a, 1);
339     CAF_CHECK_EQUAL(b, 2);
340     CAF_CHECK_EQUAL(c, 3);
341   });
342 }
343 
CAF_TEST(delayed_spawn)344 CAF_TEST(delayed_spawn) {
345   scoped_actor self{system};
346   self->receive(after(std::chrono::milliseconds(1)) >> [] {});
347   system.spawn<testee1>();
348 }
349 
CAF_TEST(spawn_event_testee2_test)350 CAF_TEST(spawn_event_testee2_test) {
351   scoped_actor self{system};
352   spawn_event_testee2(self);
353   self->receive([](ok_atom) { CAF_MESSAGE("Received 'ok'"); });
354 }
355 
CAF_TEST(function_spawn)356 CAF_TEST(function_spawn) {
357   scoped_actor self{system};
358   auto f = [](const std::string& name) -> behavior {
359     return ([name](get_atom) { return make_result(name_atom_v, name); });
360   };
361   auto a1 = system.spawn(f, "alice");
362   auto a2 = system.spawn(f, "bob");
363   self->send(a1, get_atom_v);
364   self->receive([&](name_atom, const std::string& name) {
365     CAF_CHECK_EQUAL(name, "alice");
366   });
367   self->send(a2, get_atom_v);
368   self->receive(
369     [&](name_atom, const std::string& name) { CAF_CHECK_EQUAL(name, "bob"); });
370   self->send_exit(a1, exit_reason::user_shutdown);
371   self->send_exit(a2, exit_reason::user_shutdown);
372 }
373 
374 using typed_testee = typed_actor<replies_to<abc_atom>::with<std::string>>;
375 
testee()376 typed_testee::behavior_type testee() {
377   return {[](abc_atom) {
378     CAF_MESSAGE("received 'abc'");
379     return "abc";
380   }};
381 }
382 
CAF_TEST(typed_await)383 CAF_TEST(typed_await) {
384   scoped_actor self{system};
385   auto f = make_function_view(system.spawn(testee));
386   CAF_CHECK_EQUAL(f(abc_atom_v), "abc");
387 }
388 
389 // tests attach_functor() inside of an actor's constructor
CAF_TEST(constructor_attach)390 CAF_TEST(constructor_attach) {
391   class testee : public event_based_actor {
392   public:
393     testee(actor_config& cfg, actor buddy)
394       : event_based_actor(cfg), buddy_(buddy) {
395       attach_functor(
396         [=](const error& reason) { send(buddy, ok_atom_v, reason); });
397     }
398 
399     behavior make_behavior() override {
400       return {
401         [] {
402           // nop
403         },
404       };
405     }
406 
407     void on_exit() override {
408       destroy(buddy_);
409     }
410 
411   private:
412     actor buddy_;
413   };
414   class spawner : public event_based_actor {
415   public:
416     spawner(actor_config& cfg)
417       : event_based_actor(cfg),
418         downs_(0),
419         testee_(spawn<testee, monitored>(this)) {
420       set_down_handler([=](down_msg& msg) {
421         CAF_CHECK_EQUAL(msg.reason, exit_reason::user_shutdown);
422         if (++downs_ == 2)
423           quit(msg.reason);
424       });
425       set_exit_handler(
426         [=](exit_msg& msg) { send_exit(testee_, std::move(msg.reason)); });
427     }
428 
429     behavior make_behavior() override {
430       return {
431         [=](ok_atom, const error& reason) {
432           CAF_CHECK_EQUAL(reason, exit_reason::user_shutdown);
433           if (++downs_ == 2)
434             quit(reason);
435         },
436       };
437     }
438 
439     void on_exit() override {
440       CAF_MESSAGE("spawner::on_exit()");
441       destroy(testee_);
442     }
443 
444   private:
445     int downs_;
446     actor testee_;
447   };
448   anon_send_exit(system.spawn<spawner>(), exit_reason::user_shutdown);
449 }
450 
CAF_TEST(kill_the_immortal)451 CAF_TEST(kill_the_immortal) {
452   auto wannabe_immortal = system.spawn([](event_based_actor* self) -> behavior {
453     self->set_exit_handler([](local_actor*, exit_msg&) {
454       // nop
455     });
456     return {
457       [] {
458         // nop
459       },
460     };
461   });
462   scoped_actor self{system};
463   self->send_exit(wannabe_immortal, exit_reason::kill);
464   self->wait_for(wannabe_immortal);
465 }
466 
CAF_TEST(move_only_argument)467 CAF_TEST(move_only_argument) {
468   using unique_int = std::unique_ptr<int>;
469   unique_int uptr{new int(42)};
470   auto wrapper = [](event_based_actor* self, unique_int ptr) -> behavior {
471     auto i = *ptr;
472     return {
473       [=](float) {
474         self->quit();
475         return i;
476       },
477     };
478   };
479   auto f = make_function_view(system.spawn(wrapper, std::move(uptr)));
480   CAF_CHECK_EQUAL(to_tuple<int>(unbox(f(1.f))), std::make_tuple(42));
481 }
482 
483 CAF_TEST(move - only function object) {
484   struct move_only_fun {
485     move_only_fun() = default;
486     move_only_fun(const move_only_fun&) = delete;
487     move_only_fun(move_only_fun&&) = default;
488 
operator ()move_only_fun489     behavior operator()(event_based_actor*) {
490       return {};
491     }
492   };
493   actor_system_config cfg;
494   actor_system sys{cfg};
495   move_only_fun f;
496   sys.spawn(std::move(f));
497 }
498 
499 CAF_TEST_FIXTURE_SCOPE_END()
500