1 // This file is part of CAF, the C++ Actor Framework. See the file LICENSE in
2 // the main distribution directory for license terms and copyright or visit
3 // https://github.com/actor-framework/actor-framework/blob/master/LICENSE.
4 
5 #pragma once
6 
7 #include <type_traits>
8 
9 #include "caf/all.hpp"
10 #include "caf/binary_serializer.hpp"
11 #include "caf/byte_buffer.hpp"
12 #include "caf/config.hpp"
13 #include "caf/detail/gcd.hpp"
14 #include "caf/init_global_meta_objects.hpp"
15 #include "caf/test/unit_test.hpp"
16 
17 CAF_PUSH_WARNINGS
18 
19 /// The type of `_`.
20 struct wildcard {};
21 
22 /// Allows ignoring individual messages elements in `expect` clauses, e.g.
23 /// `expect((int, int), from(foo).to(bar).with(1, _))`.
24 constexpr wildcard _ = wildcard{};
25 
26 /// @relates wildcard
operator ==(const wildcard &,const wildcard &)27 constexpr bool operator==(const wildcard&, const wildcard&) {
28   return true;
29 }
30 
31 template <size_t I, class T>
cmp_one(const caf::message & x,const T & y)32 bool cmp_one(const caf::message& x, const T& y) {
33   if (std::is_same<T, wildcard>::value)
34     return true;
35   return x.match_element<T>(I) && x.get_as<T>(I) == y;
36 }
37 
38 template <size_t I, class... Ts>
39 typename std::enable_if<(I == sizeof...(Ts)), bool>::type
msg_cmp_rec(const caf::message &,const std::tuple<Ts...> &)40 msg_cmp_rec(const caf::message&, const std::tuple<Ts...>&) {
41   return true;
42 }
43 
44 template <size_t I, class... Ts>
45 typename std::enable_if<(I < sizeof...(Ts)), bool>::type
msg_cmp_rec(const caf::message & x,const std::tuple<Ts...> & ys)46 msg_cmp_rec(const caf::message& x, const std::tuple<Ts...>& ys) {
47   return cmp_one<I>(x, std::get<I>(ys)) && msg_cmp_rec<I + 1>(x, ys);
48 }
49 
50 // allow comparing arbitrary `T`s to `message` objects for the purpose of the
51 // testing DSL
52 namespace caf {
53 
54 template <class... Ts>
operator ==(const message & x,const std::tuple<Ts...> & y)55 bool operator==(const message& x, const std::tuple<Ts...>& y) {
56   return x.size() == sizeof...(Ts) && msg_cmp_rec<0>(x, y);
57 }
58 
59 template <class T>
operator ==(const message & x,const T & y)60 bool operator==(const message& x, const T& y) {
61   return x.match_elements<T>() && x.get_as<T>(0) == y;
62 }
63 
64 } // namespace caf
65 
66 // dummy function to force ADL later on
67 // int inspect(int, int);
68 
69 template <class T>
70 struct has_outer_type {
71   template <class U>
72   static auto sfinae(U* x) -> typename U::outer_type*;
73 
74   template <class U>
75   static auto sfinae(...) -> std::false_type;
76 
77   using type = decltype(sfinae<T>(nullptr));
78   static constexpr bool value = !std::is_same<type, std::false_type>::value;
79 };
80 
81 // enables ADL in `with_content`
82 template <class T, class U>
83 T get(const has_outer_type<U>&);
84 
85 // enables ADL in `with_content`
86 template <class T, class U>
87 bool is(const has_outer_type<U>&);
88 
89 template <class Tup>
90 class elementwise_compare_inspector {
91 public:
92   using result_type = bool;
93 
94   template <size_t X>
95   using pos = std::integral_constant<size_t, X>;
96 
elementwise_compare_inspector(const Tup & xs)97   explicit elementwise_compare_inspector(const Tup& xs) : xs_(xs) {
98     // nop
99   }
100 
101   template <class... Ts>
operator ()(const Ts &...xs)102   bool operator()(const Ts&... xs) {
103     return iterate(pos<0>{}, xs...);
104   }
105 
106 private:
107   template <size_t X>
iterate(pos<X>)108   bool iterate(pos<X>) {
109     // end of recursion
110     return true;
111   }
112 
113   template <size_t X, class T, class... Ts>
114   typename std::enable_if<caf::meta::is_annotation<T>::value, bool>::type
iterate(pos<X> pos,const T &,const Ts &...ys)115   iterate(pos<X> pos, const T&, const Ts&... ys) {
116     return iterate(pos, ys...);
117   }
118 
119   template <size_t X, class T, class... Ts>
120   typename std::enable_if<!caf::meta::is_annotation<T>::value, bool>::type
iterate(pos<X>,const T & y,const Ts &...ys)121   iterate(pos<X>, const T& y, const Ts&... ys) {
122     std::integral_constant<size_t, X + 1> next;
123     check(y, get<X>(xs_));
124     return iterate(next, ys...);
125   }
126 
127   template <class T, class U>
check(const T & x,const U & y)128   static void check(const T& x, const U& y) {
129     CAF_CHECK_EQUAL(x, y);
130   }
131 
132   template <class T>
check(const T &,const wildcard &)133   static void check(const T&, const wildcard&) {
134     // nop
135   }
136 
137   const Tup& xs_;
138 };
139 
140 // -- unified access to all actor handles in CAF -------------------------------
141 
142 /// Reduces any of CAF's handle types to an `abstract_actor` pointer.
143 class caf_handle : caf::detail::comparable<caf_handle>,
144                    caf::detail::comparable<caf_handle, std::nullptr_t> {
145 public:
146   using pointer = caf::abstract_actor*;
147 
caf_handle(pointer ptr=nullptr)148   constexpr caf_handle(pointer ptr = nullptr) : ptr_(ptr) {
149     // nop
150   }
151 
caf_handle(const caf::strong_actor_ptr & x)152   caf_handle(const caf::strong_actor_ptr& x) {
153     set(x);
154   }
155 
caf_handle(const caf::actor & x)156   caf_handle(const caf::actor& x) {
157     set(x);
158   }
159 
caf_handle(const caf::actor_addr & x)160   caf_handle(const caf::actor_addr& x) {
161     set(x);
162   }
163 
caf_handle(const caf::scoped_actor & x)164   caf_handle(const caf::scoped_actor& x) {
165     set(x);
166   }
167 
168   template <class... Ts>
caf_handle(const caf::typed_actor<Ts...> & x)169   caf_handle(const caf::typed_actor<Ts...>& x) {
170     set(x);
171   }
172 
173   caf_handle(const caf_handle&) = default;
174 
operator =(pointer x)175   caf_handle& operator=(pointer x) {
176     ptr_ = x;
177     return *this;
178   }
179 
180   template <class T,
181             class E = caf::detail::enable_if_t<!std::is_pointer<T>::value>>
operator =(const T & x)182   caf_handle& operator=(const T& x) {
183     set(x);
184     return *this;
185   }
186 
187   caf_handle& operator=(const caf_handle&) = default;
188 
get() const189   pointer get() const {
190     return ptr_;
191   }
192 
operator ->() const193   pointer operator->() const {
194     return get();
195   }
196 
compare(const caf_handle & other) const197   ptrdiff_t compare(const caf_handle& other) const {
198     return reinterpret_cast<ptrdiff_t>(ptr_)
199            - reinterpret_cast<ptrdiff_t>(other.ptr_);
200   }
201 
compare(std::nullptr_t) const202   ptrdiff_t compare(std::nullptr_t) const {
203     return reinterpret_cast<ptrdiff_t>(ptr_);
204   }
205 
206 private:
207   template <class T>
set(const T & x)208   void set(const T& x) {
209     ptr_ = caf::actor_cast<pointer>(x);
210   }
211 
212   caf::abstract_actor* ptr_;
213 };
214 
215 // -- introspection of the next mailbox element --------------------------------
216 
217 /// @private
218 template <class... Ts>
default_extract(caf_handle x)219 caf::optional<std::tuple<Ts...>> default_extract(caf_handle x) {
220   auto ptr = x->peek_at_next_mailbox_element();
221   if (ptr == nullptr)
222     return caf::none;
223   if (auto view = caf::make_const_typed_message_view<Ts...>(ptr->content()))
224     return to_tuple(view);
225   return caf::none;
226 }
227 
228 /// @private
229 template <class T>
unboxing_extract(caf_handle x)230 caf::optional<std::tuple<T>> unboxing_extract(caf_handle x) {
231   auto tup = default_extract<typename T::outer_type>(x);
232   if (tup == caf::none || !is<T>(get<0>(*tup)))
233     return caf::none;
234   return std::make_tuple(get<T>(get<0>(*tup)));
235 }
236 
237 /// Dispatches to `unboxing_extract` if
238 /// `sizeof...(Ts) == 0 && has_outer_type<T>::value`, otherwise dispatches to
239 /// `default_extract`.
240 /// @private
241 template <class T, bool HasOuterType, class... Ts>
242 struct try_extract_impl {
operator ()try_extract_impl243   caf::optional<std::tuple<T, Ts...>> operator()(caf_handle x) {
244     return default_extract<T, Ts...>(x);
245   }
246 };
247 
248 template <class T>
249 struct try_extract_impl<T, true> {
operator ()try_extract_impl250   caf::optional<std::tuple<T>> operator()(caf_handle x) {
251     return unboxing_extract<T>(x);
252   }
253 };
254 
255 /// Returns the content of the next mailbox element as `tuple<T, Ts...>` on a
256 /// match. Returns `none` otherwise.
257 template <class T, class... Ts>
try_extract(caf_handle x)258 caf::optional<std::tuple<T, Ts...>> try_extract(caf_handle x) {
259   try_extract_impl<T, has_outer_type<T>::value, Ts...> f;
260   return f(x);
261 }
262 
263 /// Returns the content of the next mailbox element without taking it out of
264 /// the mailbox. Fails on an empty mailbox or if the content of the next
265 /// element does not match `<T, Ts...>`.
266 template <class T, class... Ts>
extract(caf_handle x)267 std::tuple<T, Ts...> extract(caf_handle x) {
268   auto result = try_extract<T, Ts...>(x);
269   if (result == caf::none) {
270     auto ptr = x->peek_at_next_mailbox_element();
271     if (ptr == nullptr)
272       CAF_FAIL("Mailbox is empty");
273     CAF_FAIL(
274       "Message does not match expected pattern: " << to_string(ptr->content()));
275   }
276   return std::move(*result);
277 }
278 
279 template <class T, class... Ts>
received(caf_handle x)280 bool received(caf_handle x) {
281   return try_extract<T, Ts...>(x) != caf::none;
282 }
283 
284 template <class... Ts>
285 class expect_clause {
286 public:
expect_clause(caf::scheduler::test_coordinator & sched)287   explicit expect_clause(caf::scheduler::test_coordinator& sched)
288     : sched_(sched), dest_(nullptr) {
289     peek_ = [=] {
290       /// The extractor will call CAF_FAIL on a type mismatch, essentially
291       /// performing a type check when ignoring the result.
292       extract<Ts...>(dest_);
293     };
294   }
295 
296   expect_clause(expect_clause&& other) = default;
297 
~expect_clause()298   ~expect_clause() {
299     if (peek_ != nullptr) {
300       peek_();
301       run_once();
302     }
303   }
304 
from(const wildcard &)305   expect_clause& from(const wildcard&) {
306     return *this;
307   }
308 
309   template <class Handle>
from(const Handle & whom)310   expect_clause& from(const Handle& whom) {
311     src_ = caf::actor_cast<caf::strong_actor_ptr>(whom);
312     return *this;
313   }
314 
315   template <class Handle>
to(const Handle & whom)316   expect_clause& to(const Handle& whom) {
317     CAF_REQUIRE(sched_.prioritize(whom));
318     dest_ = &sched_.next_job<caf::abstract_actor>();
319     auto ptr = dest_->peek_at_next_mailbox_element();
320     CAF_REQUIRE(ptr != nullptr);
321     if (src_)
322       CAF_REQUIRE_EQUAL(ptr->sender, src_);
323     return *this;
324   }
325 
to(const caf::scoped_actor & whom)326   expect_clause& to(const caf::scoped_actor& whom) {
327     dest_ = caf::actor_cast<caf::abstract_actor*>(whom);
328     return *this;
329   }
330 
331   template <class... Us>
with(Us &&...xs)332   void with(Us&&... xs) {
333     // TODO: replace this workaround with the make_tuple() line when dropping
334     //       support for GCC 4.8.
335     std::tuple<typename std::decay<Us>::type...> tmp{std::forward<Us>(xs)...};
336     // auto tmp = std::make_tuple(std::forward<Us>(xs)...);
337     // TODO: move tmp into lambda when switching to C++14
338     peek_ = [=] {
339       using namespace caf::detail;
340       elementwise_compare_inspector<decltype(tmp)> inspector{tmp};
341       auto ys = extract<Ts...>(dest_);
342       auto ys_indices = get_indices(ys);
343       CAF_REQUIRE(apply_args(inspector, ys_indices, ys));
344     };
345   }
346 
347 protected:
run_once()348   void run_once() {
349     auto dptr = dynamic_cast<caf::blocking_actor*>(dest_);
350     if (dptr == nullptr)
351       sched_.run_once();
352     else
353       dptr->dequeue(); // Drop message.
354   }
355 
356   caf::scheduler::test_coordinator& sched_;
357   caf::strong_actor_ptr src_;
358   caf::abstract_actor* dest_;
359   std::function<void()> peek_;
360 };
361 
362 template <>
363 class expect_clause<void> {
364 public:
expect_clause(caf::scheduler::test_coordinator & sched)365   explicit expect_clause(caf::scheduler::test_coordinator& sched)
366     : sched_(sched), dest_(nullptr) {
367     // nop
368   }
369 
370   expect_clause(expect_clause&& other) = default;
371 
~expect_clause()372   ~expect_clause() {
373     auto ptr = dest_->peek_at_next_mailbox_element();
374     if (ptr == nullptr)
375       CAF_FAIL("no message found");
376     if (!ptr->content().empty())
377       CAF_FAIL("non-empty message found: " << to_string(ptr->content()));
378     run_once();
379   }
380 
from(const wildcard &)381   expect_clause& from(const wildcard&) {
382     return *this;
383   }
384 
385   template <class Handle>
from(const Handle & whom)386   expect_clause& from(const Handle& whom) {
387     src_ = caf::actor_cast<caf::strong_actor_ptr>(whom);
388     return *this;
389   }
390 
391   template <class Handle>
to(const Handle & whom)392   expect_clause& to(const Handle& whom) {
393     CAF_REQUIRE(sched_.prioritize(whom));
394     dest_ = &sched_.next_job<caf::abstract_actor>();
395     auto ptr = dest_->peek_at_next_mailbox_element();
396     CAF_REQUIRE(ptr != nullptr);
397     if (src_)
398       CAF_REQUIRE_EQUAL(ptr->sender, src_);
399     return *this;
400   }
401 
to(const caf::scoped_actor & whom)402   expect_clause& to(const caf::scoped_actor& whom) {
403     dest_ = caf::actor_cast<caf::abstract_actor*>(whom);
404     return *this;
405   }
406 
407 protected:
run_once()408   void run_once() {
409     auto dptr = dynamic_cast<caf::blocking_actor*>(dest_);
410     if (dptr == nullptr)
411       sched_.run_once();
412     else
413       dptr->dequeue(); // Drop message.
414   }
415 
416   caf::scheduler::test_coordinator& sched_;
417   caf::strong_actor_ptr src_;
418   caf::abstract_actor* dest_;
419 };
420 
421 template <class... Ts>
422 class inject_clause {
423 public:
inject_clause(caf::scheduler::test_coordinator & sched)424   explicit inject_clause(caf::scheduler::test_coordinator& sched)
425     : sched_(sched), dest_(nullptr) {
426     // nop
427   }
428 
429   inject_clause(inject_clause&& other) = default;
430 
431   template <class Handle>
from(const Handle & whom)432   inject_clause& from(const Handle& whom) {
433     src_ = caf::actor_cast<caf::strong_actor_ptr>(whom);
434     return *this;
435   }
436 
437   template <class Handle>
to(const Handle & whom)438   inject_clause& to(const Handle& whom) {
439     dest_ = caf::actor_cast<caf::strong_actor_ptr>(whom);
440     return *this;
441   }
442 
with(Ts...xs)443   void with(Ts... xs) {
444     if (dest_ == nullptr)
445       CAF_FAIL("missing .to() in inject() statement");
446     auto msg = caf::make_message(std::move(xs)...);
447     if (src_ == nullptr)
448       caf::anon_send(caf::actor_cast<caf::actor>(dest_), msg);
449     else
450       caf::send_as(caf::actor_cast<caf::actor>(src_),
451                    caf::actor_cast<caf::actor>(dest_), msg);
452     if (!sched_.prioritize(dest_))
453       CAF_FAIL("inject: failed to schedule destination actor");
454     auto dest_ptr = &sched_.next_job<caf::abstract_actor>();
455     auto ptr = dest_ptr->peek_at_next_mailbox_element();
456     if (ptr == nullptr)
457       CAF_FAIL("inject: failed to get next message from destination actor");
458     if (ptr->sender != src_)
459       CAF_FAIL("inject: found unexpected sender for the next message");
460     if (ptr->payload.cptr() != msg.cptr())
461       CAF_FAIL("inject: found unexpected message => " << ptr->payload << " !! "
462                                                       << msg);
463     msg.reset(); // drop local reference before running the actor
464     run_once();
465   }
466 
467 protected:
run_once()468   void run_once() {
469     auto ptr = caf::actor_cast<caf::abstract_actor*>(dest_);
470     auto dptr = dynamic_cast<caf::blocking_actor*>(ptr);
471     if (dptr == nullptr)
472       sched_.run_once();
473     else
474       dptr->dequeue(); // Drop message.
475   }
476 
477   caf::scheduler::test_coordinator& sched_;
478   caf::strong_actor_ptr src_;
479   caf::strong_actor_ptr dest_;
480 };
481 
482 template <class... Ts>
483 class allow_clause {
484 public:
allow_clause(caf::scheduler::test_coordinator & sched)485   explicit allow_clause(caf::scheduler::test_coordinator& sched)
486     : sched_(sched), dest_(nullptr) {
487     peek_ = [=] {
488       if (dest_ != nullptr)
489         return try_extract<Ts...>(dest_) != caf::none;
490       return false;
491     };
492   }
493 
494   allow_clause(allow_clause&& other) = default;
495 
~allow_clause()496   ~allow_clause() {
497     eval();
498   }
499 
from(const wildcard &)500   allow_clause& from(const wildcard&) {
501     return *this;
502   }
503 
504   template <class Handle>
from(const Handle & whom)505   allow_clause& from(const Handle& whom) {
506     src_ = caf::actor_cast<caf::strong_actor_ptr>(whom);
507     return *this;
508   }
509 
510   template <class Handle>
to(const Handle & whom)511   allow_clause& to(const Handle& whom) {
512     if (sched_.prioritize(whom))
513       dest_ = &sched_.next_job<caf::abstract_actor>();
514     return *this;
515   }
516 
517   template <class... Us>
with(Us &&...xs)518   void with(Us&&... xs) {
519     // TODO: replace this workaround with make_tuple() when dropping support
520     //       for GCC 4.8.
521     std::tuple<typename std::decay<Us>::type...> tmp{std::forward<Us>(xs)...};
522     // TODO: move tmp into lambda when switching to C++14
523     peek_ = [=] {
524       using namespace caf::detail;
525       elementwise_compare_inspector<decltype(tmp)> inspector{tmp};
526       auto ys = try_extract<Ts...>(dest_);
527       if (ys != caf::none) {
528         auto ys_indices = get_indices(*ys);
529         return apply_args(inspector, ys_indices, *ys);
530       }
531       return false;
532     };
533   }
534 
eval()535   bool eval() {
536     if (peek_ != nullptr) {
537       if (peek_()) {
538         run_once();
539         return true;
540       }
541     }
542     return false;
543   }
544 
545 protected:
run_once()546   void run_once() {
547     auto dptr = dynamic_cast<caf::blocking_actor*>(dest_);
548     if (dptr == nullptr)
549       sched_.run_once();
550     else
551       dptr->dequeue(); // Drop message.
552   }
553 
554   caf::scheduler::test_coordinator& sched_;
555   caf::strong_actor_ptr src_;
556   caf::abstract_actor* dest_;
557   std::function<bool()> peek_;
558 };
559 
560 template <class... Ts>
561 class disallow_clause {
562 public:
disallow_clause()563   disallow_clause() {
564     check_ = [=] {
565       auto ptr = dest_->peek_at_next_mailbox_element();
566       if (ptr == nullptr)
567         return;
568       if (src_ != nullptr && ptr->sender != src_)
569         return;
570       auto res = try_extract<Ts...>(dest_);
571       if (res != caf::none)
572         CAF_FAIL("received disallowed message: " << caf::deep_to_string(*ptr));
573     };
574   }
575 
576   disallow_clause(disallow_clause&& other) = default;
577 
~disallow_clause()578   ~disallow_clause() {
579     if (check_ != nullptr)
580       check_();
581   }
582 
from(const wildcard &)583   disallow_clause& from(const wildcard&) {
584     return *this;
585   }
586 
from(caf_handle x)587   disallow_clause& from(caf_handle x) {
588     src_ = x;
589     return *this;
590   }
591 
to(caf_handle x)592   disallow_clause& to(caf_handle x) {
593     dest_ = x;
594     return *this;
595   }
596 
597   template <class... Us>
with(Us &&...xs)598   void with(Us&&... xs) {
599     // TODO: replace this workaround with make_tuple() when dropping support
600     //       for GCC 4.8.
601     std::tuple<typename std::decay<Us>::type...> tmp{std::forward<Us>(xs)...};
602     // TODO: move tmp into lambda when switching to C++14
603     check_ = [=] {
604       auto ptr = dest_->peek_at_next_mailbox_element();
605       if (ptr == nullptr)
606         return;
607       if (src_ != nullptr && ptr->sender != src_)
608         return;
609       auto res = try_extract<Ts...>(dest_);
610       if (res != caf::none) {
611         using namespace caf::detail;
612         elementwise_compare_inspector<decltype(tmp)> inspector{tmp};
613         auto& ys = *res;
614         auto ys_indices = get_indices(ys);
615         if (apply_args(inspector, ys_indices, ys))
616           CAF_FAIL("received disallowed message: " << CAF_ARG(*res));
617       }
618     };
619   }
620 
621 protected:
622   caf_handle src_;
623   caf_handle dest_;
624   std::function<void()> check_;
625 };
626 
627 template <class... Ts>
628 struct test_coordinator_fixture_fetch_helper {
629   template <class Self, template <class> class Policy, class Interface>
630   std::tuple<Ts...>
operator ()test_coordinator_fixture_fetch_helper631   operator()(caf::response_handle<Self, Policy<Interface>>& from) const {
632     std::tuple<Ts...> result;
633     from.receive([&](Ts&... xs) { result = std::make_tuple(std::move(xs)...); },
634                  [&](caf::error& err) { CAF_FAIL(err); });
635     return result;
636   }
637 };
638 
639 template <class T>
640 struct test_coordinator_fixture_fetch_helper<T> {
641   template <class Self, template <class> class Policy, class Interface>
operator ()test_coordinator_fixture_fetch_helper642   T operator()(caf::response_handle<Self, Policy<Interface>>& from) const {
643     T result;
644     from.receive([&](T& x) { result = std::move(x); },
645                  [&](caf::error& err) { CAF_FAIL(err); });
646     return result;
647   }
648 };
649 
650 /// A fixture with a deterministic scheduler setup.
651 template <class Config = caf::actor_system_config>
652 class test_coordinator_fixture {
653 public:
654   // -- member types -----------------------------------------------------------
655 
656   /// A deterministic scheduler type.
657   using scheduler_type = caf::scheduler::test_coordinator;
658 
659   // -- constructors, destructors, and assignment operators --------------------
660 
init_config(Config & cfg)661   static Config& init_config(Config& cfg) {
662     if (auto err = cfg.parse(caf::test::engine::argc(),
663                              caf::test::engine::argv()))
664       CAF_FAIL("failed to parse config: " << to_string(err));
665     cfg.set("caf.scheduler.policy", "testing");
666     cfg.set("caf.logger.inline-output", true);
667     if (cfg.custom_options().has_category("caf.middleman")) {
668       cfg.set("caf.middleman.network-backend", "testing");
669       cfg.set("caf.middleman.manual-multiplexing", true);
670       cfg.set("caf.middleman.workers", size_t{0});
671       cfg.set("caf.middleman.heartbeat-interval", caf::timespan{0});
672     }
673     cfg.set("caf.stream.credit-policy", "token-based");
674     cfg.set("caf.stream.token-based-policy.batch-size", 50);
675     cfg.set("caf.stream.token-based-policy.buffer-size", 200);
676     return cfg;
677   }
678 
679   template <class... Ts>
test_coordinator_fixture(Ts &&...xs)680   explicit test_coordinator_fixture(Ts&&... xs)
681     : cfg(std::forward<Ts>(xs)...),
682       sys(init_config(cfg)),
683       self(sys, true),
684       sched(dynamic_cast<scheduler_type&>(sys.scheduler())) {
685     // Make sure the current time isn't 0.
686     sched.clock().current_time += std::chrono::hours(1);
687   }
688 
~test_coordinator_fixture()689   virtual ~test_coordinator_fixture() {
690     run();
691   }
692 
693   // -- DSL functions ----------------------------------------------------------
694 
695   /// Allows the next actor to consume one message from its mailbox. Fails the
696   /// test if no message was consumed.
consume_message()697   virtual bool consume_message() {
698     return sched.try_run_once();
699   }
700 
701   /// Allows each actors to consume all messages from its mailbox. Fails the
702   /// test if no message was consumed.
703   /// @returns The number of consumed messages.
consume_messages()704   size_t consume_messages() {
705     size_t result = 0;
706     while (consume_message())
707       ++result;
708     return result;
709   }
710 
711   /// Allows an simulated I/O devices to handle an event.
handle_io_event()712   virtual bool handle_io_event() {
713     return false;
714   }
715 
716   /// Allows each simulated I/O device to handle all events.
handle_io_events()717   size_t handle_io_events() {
718     size_t result = 0;
719     while (handle_io_event())
720       ++result;
721     return result;
722   }
723 
724   /// Triggers the next pending timeout.
trigger_timeout()725   virtual bool trigger_timeout() {
726     return sched.trigger_timeout();
727   }
728 
729   /// Triggers all pending timeouts.
trigger_timeouts()730   size_t trigger_timeouts() {
731     size_t timeouts = 0;
732     while (trigger_timeout())
733       ++timeouts;
734     return timeouts;
735   }
736 
737   /// Advances the clock by `interval`.
advance_time(caf::timespan interval)738   size_t advance_time(caf::timespan interval) {
739     return sched.clock().advance_time(interval);
740   }
741 
742   /// Consume messages and trigger timeouts until no activity remains.
743   /// @returns The total number of events, i.e., messages consumed and
744   ///          timeouts triggered.
run()745   size_t run() {
746     return run_until([] { return false; });
747   }
748 
749   /// Consume ones message or triggers the next timeout.
750   /// @returns `true` if a message was consumed or timeouts was triggered,
751   ///          `false` otherwise.
run_once()752   bool run_once() {
753     return run_until([] { return true; }) > 0;
754   }
755 
756   /// Consume messages and trigger timeouts until `pred` becomes `true` or
757   /// until no activity remains.
758   /// @returns The total number of events, i.e., messages consumed and
759   ///          timeouts triggered.
760   template <class BoolPredicate>
run_until(BoolPredicate predicate)761   size_t run_until(BoolPredicate predicate) {
762     CAF_LOG_TRACE("");
763     // Bookkeeping.
764     size_t events = 0;
765     // Loop until no activity remains.
766     for (;;) {
767       size_t progress = 0;
768       while (consume_message()) {
769         ++progress;
770         if (predicate()) {
771           CAF_LOG_DEBUG("stop due to predicate:" << CAF_ARG(events));
772           return events;
773         }
774       }
775       while (handle_io_event()) {
776         ++progress;
777         if (predicate()) {
778           CAF_LOG_DEBUG("stop due to predicate:" << CAF_ARG(events));
779           return events;
780         }
781       }
782       if (trigger_timeout())
783         ++progress;
784       if (progress == 0) {
785         CAF_LOG_DEBUG("no activity left:" << CAF_ARG(events));
786         return events;
787       }
788       events += progress;
789     }
790   }
791 
792   /// Call `run()` when the next scheduled actor becomes ready.
run_after_next_ready_event()793   void run_after_next_ready_event() {
794     sched.after_next_enqueue([=] { run(); });
795   }
796 
797   /// Call `run_until(predicate)` when the next scheduled actor becomes ready.
798   template <class BoolPredicate>
run_until_after_next_ready_event(BoolPredicate predicate)799   void run_until_after_next_ready_event(BoolPredicate predicate) {
800     sched.after_next_enqueue([=] { run_until(predicate); });
801   }
802 
803   /// Sends a request to `hdl`, then calls `run()`, and finally fetches and
804   /// returns the result.
805   template <class T, class... Ts, class Handle, class... Us>
806   typename std::conditional<sizeof...(Ts) == 0, T, std::tuple<T, Ts...>>::type
request(Handle hdl,Us...args)807   request(Handle hdl, Us... args) {
808     auto res_hdl = self->request(hdl, caf::infinite, std::move(args)...);
809     run();
810     test_coordinator_fixture_fetch_helper<T, Ts...> f;
811     return f(res_hdl);
812   }
813 
814   /// Returns the next message from the next pending actor's mailbox as `T`.
815   template <class T>
peek()816   const T& peek() {
817     return sched.template peek<T>();
818   }
819 
820   /// Dereferences `hdl` and downcasts it to `T`.
821   template <class T = caf::scheduled_actor, class Handle = caf::actor>
822   T& deref(const Handle& hdl) {
823     auto ptr = caf::actor_cast<caf::abstract_actor*>(hdl);
824     CAF_REQUIRE(ptr != nullptr);
825     return dynamic_cast<T&>(*ptr);
826   }
827 
828   template <class... Ts>
serialize(const Ts &...xs)829   caf::byte_buffer serialize(const Ts&... xs) {
830     caf::byte_buffer buf;
831     caf::binary_serializer sink{sys, buf};
832     if (!(sink.apply(xs) && ...))
833       CAF_FAIL("serialization failed: " << sink.get_error());
834     return buf;
835   }
836 
837   template <class... Ts>
deserialize(const caf::byte_buffer & buf,Ts &...xs)838   void deserialize(const caf::byte_buffer& buf, Ts&... xs) {
839     caf::binary_deserializer source{sys, buf};
840     if (!(source.apply(xs) && ...))
841       CAF_FAIL("deserialization failed: " << source.get_error());
842   }
843 
844   template <class T>
roundtrip(const T & x)845   T roundtrip(const T& x) {
846     T result;
847     deserialize(serialize(x), result);
848     return result;
849   }
850 
851   // -- member variables -------------------------------------------------------
852 
853   /// The user-generated system config.
854   Config cfg;
855 
856   /// Host system for (scheduled) actors.
857   caf::actor_system sys;
858 
859   /// A scoped actor for conveniently sending and receiving messages.
860   caf::scoped_actor self;
861 
862   /// Deterministic scheduler.
863   scheduler_type& sched;
864 };
865 
866 /// Unboxes an expected value or fails the test if it doesn't exist.
867 template <class T>
unbox(caf::expected<T> x)868 T unbox(caf::expected<T> x) {
869   if (!x)
870     CAF_FAIL(to_string(x.error()));
871   return std::move(*x);
872 }
873 
874 /// Unboxes an optional value or fails the test if it doesn't exist.
875 template <class T>
unbox(caf::optional<T> x)876 T unbox(caf::optional<T> x) {
877   if (!x)
878     CAF_FAIL("x == none");
879   return std::move(*x);
880 }
881 
882 /// Unboxes an optional value or fails the test if it doesn't exist.
883 template <class T>
unbox(T * x)884 T unbox(T* x) {
885   if (x == nullptr)
886     CAF_FAIL("x == nullptr");
887   return *x;
888 }
889 
890 /// Expands to its argument.
891 #define CAF_EXPAND(x) x
892 
893 /// Expands to its arguments.
894 #define CAF_DSL_LIST(...) __VA_ARGS__
895 
896 /// Convenience macro for defining expect clauses.
897 #define expect(types, fields)                                                  \
898   do {                                                                         \
899     CAF_MESSAGE("expect" << #types << "." << #fields);                         \
900     expect_clause<CAF_EXPAND(CAF_DSL_LIST types)>{sched}.fields;               \
901   } while (false)
902 
903 #define inject(types, fields)                                                  \
904   do {                                                                         \
905     CAF_MESSAGE("inject" << #types << "." << #fields);                         \
906     inject_clause<CAF_EXPAND(CAF_DSL_LIST types)>{sched}.fields;               \
907   } while (false)
908 
909 /// Convenience macro for defining allow clauses.
910 #define allow(types, fields)                                                   \
911   ([&] {                                                                       \
912     CAF_MESSAGE("allow" << #types << "." << #fields);                          \
913     allow_clause<CAF_EXPAND(CAF_DSL_LIST types)> x{sched};                     \
914     x.fields;                                                                  \
915     return x.eval();                                                           \
916   })()
917 
918 /// Convenience macro for defining disallow clauses.
919 #define disallow(types, fields)                                                \
920   do {                                                                         \
921     CAF_MESSAGE("disallow" << #types << "." << #fields);                       \
922     disallow_clause<CAF_EXPAND(CAF_DSL_LIST types)>{}.fields;                  \
923   } while (false)
924 
925 /// Defines the required base type for testee states in the current namespace.
926 #define TESTEE_SETUP()                                                         \
927   template <class T>                                                           \
928   struct testee_state_base {}
929 
930 /// Convenience macro for adding additional state to a testee.
931 #define TESTEE_STATE(tname)                                                    \
932   struct tname##_state;                                                        \
933   template <>                                                                  \
934   struct testee_state_base<tname##_state>
935 
936 /// Implementation detail for `TESTEE` and `VARARGS_TESTEE`.
937 #define TESTEE_SCAFFOLD(tname)                                                 \
938   struct tname##_state : testee_state_base<tname##_state> {                    \
939     static inline const char* name = #tname;                                   \
940   };                                                                           \
941   using tname##_actor = stateful_actor<tname##_state>
942 
943 /// Convenience macro for defining an actor named `tname`.
944 #define TESTEE(tname)                                                          \
945   TESTEE_SCAFFOLD(tname);                                                      \
946   behavior tname(tname##_actor* self)
947 
948 /// Convenience macro for defining an actor named `tname` with any number of
949 /// initialization arguments.
950 #define VARARGS_TESTEE(tname, ...)                                             \
951   TESTEE_SCAFFOLD(tname);                                                      \
952   behavior tname(tname##_actor* self, __VA_ARGS__)
953 
954 CAF_POP_WARNINGS
955