1 // This file is part of CAF, the C++ Actor Framework. See the file LICENSE in
2 // the main distribution directory for license terms and copyright or visit
3 // https://github.com/actor-framework/actor-framework/blob/master/LICENSE.
4 
5 #pragma once
6 
7 #include <vector>
8 
9 #include "caf/default_downstream_manager.hpp"
10 #include "caf/detail/stream_stage_driver_impl.hpp"
11 #include "caf/detail/stream_stage_impl.hpp"
12 #include "caf/downstream_manager.hpp"
13 #include "caf/fwd.hpp"
14 #include "caf/make_stage_result.hpp"
15 #include "caf/policy/arg.hpp"
16 #include "caf/stream.hpp"
17 #include "caf/stream_stage.hpp"
18 #include "caf/unit.hpp"
19 
20 namespace caf {
21 
22 /// Attaches a new stream stage to `self` by creating a default stream stage
23 /// manager with `Driver`.
24 /// @param self Points to the hosting actor.
25 /// @param in Stream handshake from upstream path.
26 /// @param xs User-defined arguments for the downstream handshake.
27 /// @param ys Additional constructor arguments for `Driver`.
28 /// @returns The new `stream_manager`, an inbound slot, and an outbound slot.
29 template <class Driver, class In, class... Ts, class... Us>
30 make_stage_result_t<In, typename Driver::downstream_manager_type, Ts...>
attach_stream_stage(scheduled_actor * self,const stream<In> & in,std::tuple<Ts...> xs,Us &&...ys)31 attach_stream_stage(scheduled_actor* self, const stream<In>& in,
32                     std::tuple<Ts...> xs, Us&&... ys) {
33   using detail::make_stream_stage;
34   auto mgr = make_stream_stage<Driver>(self, std::forward<Us>(ys)...);
35   auto islot = mgr->add_inbound_path(in);
36   auto oslot = mgr->add_outbound_path(std::move(xs));
37   return {islot, oslot, std::move(mgr)};
38 }
39 
40 /// Attaches a new stream stage to `self` by creating a default stream stage
41 /// manager from given callbacks.
42 /// @param self Points to the hosting actor.
43 /// @param in Stream handshake from upstream path.
44 /// @param xs User-defined arguments for the downstream handshake.
45 /// @param init Function object for initializing the state of the stage.
46 /// @param fun Processing function.
47 /// @param fin Optional cleanup handler.
48 /// @param token Policy token for selecting a downstream manager
49 ///              implementation.
50 /// @returns The new `stream_manager`, an inbound slot, and an outbound slot.
51 template <class In, class... Ts, class Init, class Fun, class Finalize = unit_t,
52           class DownstreamManager = default_downstream_manager_t<Fun>,
53           class Trait = stream_stage_trait_t<Fun>>
54 make_stage_result_t<In, DownstreamManager, Ts...>
attach_stream_stage(scheduled_actor * self,const stream<In> & in,std::tuple<Ts...> xs,Init init,Fun fun,Finalize fin={},policy::arg<DownstreamManager> token={})55 attach_stream_stage(scheduled_actor* self, const stream<In>& in,
56                     std::tuple<Ts...> xs, Init init, Fun fun, Finalize fin = {},
57                     policy::arg<DownstreamManager> token = {}) {
58   CAF_IGNORE_UNUSED(token);
59   using output_type = typename stream_stage_trait_t<Fun>::output;
60   using state_type = typename stream_stage_trait_t<Fun>::state;
61   static_assert(
62     std::is_same<void(state_type&),
63                  typename detail::get_callable_trait<Init>::fun_sig>::value,
64     "Expected signature `void (State&)` for init function");
65   using consume_one = void(state_type&, downstream<output_type>&, In);
66   using consume_all
67     = void(state_type&, downstream<output_type>&, std::vector<In>&);
68   using fun_sig = typename detail::get_callable_trait<Fun>::fun_sig;
69   static_assert(std::is_same<fun_sig, consume_one>::value
70                   || std::is_same<fun_sig, consume_all>::value,
71                 "Expected signature `void (State&, downstream<Out>&, In)` "
72                 "or `void (State&, downstream<Out>&, std::vector<In>&)` "
73                 "for consume function");
74   using driver
75     = detail::stream_stage_driver_impl<typename Trait::input, DownstreamManager,
76                                        Fun, Finalize>;
77   return attach_stream_stage<driver>(self, in, std::move(xs), std::move(init),
78                                      std::move(fun), std::move(fin));
79 }
80 
81 /// Attaches a new stream stage to `self` by creating a default stream stage
82 /// manager from given callbacks.
83 /// @param self Points to the hosting actor.
84 /// @param in Stream handshake from upstream path.
85 /// @param init Function object for initializing the state of the stage.
86 /// @param fun Processing function.
87 /// @param fin Optional cleanup handler.
88 /// @param token Policy token for selecting a downstream manager
89 ///              implementation.
90 /// @returns The new `stream_manager`, an inbound slot, and an outbound slot.
91 template <class In, class Init, class Fun, class Finalize = unit_t,
92           class DownstreamManager = default_downstream_manager_t<Fun>,
93           class Trait = stream_stage_trait_t<Fun>>
94 make_stage_result_t<In, DownstreamManager>
attach_stream_stage(scheduled_actor * self,const stream<In> & in,Init init,Fun fun,Finalize fin={},policy::arg<DownstreamManager> token={})95 attach_stream_stage(scheduled_actor* self, const stream<In>& in, Init init,
96                     Fun fun, Finalize fin = {},
97                     policy::arg<DownstreamManager> token = {}) {
98   return attach_stream_stage(self, in, std::make_tuple(), std::move(init),
99                              std::move(fun), std::move(fin), token);
100 }
101 
102 } // namespace caf
103