1 // This file is part of CAF, the C++ Actor Framework. See the file LICENSE in
2 // the main distribution directory for license terms and copyright or visit
3 // https://github.com/actor-framework/actor-framework/blob/master/LICENSE.
4
5 #pragma once
6
7 #include <vector>
8
9 #include "caf/default_downstream_manager.hpp"
10 #include "caf/detail/stream_stage_driver_impl.hpp"
11 #include "caf/detail/stream_stage_impl.hpp"
12 #include "caf/downstream_manager.hpp"
13 #include "caf/fwd.hpp"
14 #include "caf/make_stage_result.hpp"
15 #include "caf/policy/arg.hpp"
16 #include "caf/stream.hpp"
17 #include "caf/stream_stage.hpp"
18 #include "caf/unit.hpp"
19
20 namespace caf {
21
22 /// Attaches a new stream stage to `self` by creating a default stream stage
23 /// manager with `Driver`.
24 /// @param self Points to the hosting actor.
25 /// @param in Stream handshake from upstream path.
26 /// @param xs User-defined arguments for the downstream handshake.
27 /// @param ys Additional constructor arguments for `Driver`.
28 /// @returns The new `stream_manager`, an inbound slot, and an outbound slot.
29 template <class Driver, class In, class... Ts, class... Us>
30 make_stage_result_t<In, typename Driver::downstream_manager_type, Ts...>
attach_stream_stage(scheduled_actor * self,const stream<In> & in,std::tuple<Ts...> xs,Us &&...ys)31 attach_stream_stage(scheduled_actor* self, const stream<In>& in,
32 std::tuple<Ts...> xs, Us&&... ys) {
33 using detail::make_stream_stage;
34 auto mgr = make_stream_stage<Driver>(self, std::forward<Us>(ys)...);
35 auto islot = mgr->add_inbound_path(in);
36 auto oslot = mgr->add_outbound_path(std::move(xs));
37 return {islot, oslot, std::move(mgr)};
38 }
39
40 /// Attaches a new stream stage to `self` by creating a default stream stage
41 /// manager from given callbacks.
42 /// @param self Points to the hosting actor.
43 /// @param in Stream handshake from upstream path.
44 /// @param xs User-defined arguments for the downstream handshake.
45 /// @param init Function object for initializing the state of the stage.
46 /// @param fun Processing function.
47 /// @param fin Optional cleanup handler.
48 /// @param token Policy token for selecting a downstream manager
49 /// implementation.
50 /// @returns The new `stream_manager`, an inbound slot, and an outbound slot.
51 template <class In, class... Ts, class Init, class Fun, class Finalize = unit_t,
52 class DownstreamManager = default_downstream_manager_t<Fun>,
53 class Trait = stream_stage_trait_t<Fun>>
54 make_stage_result_t<In, DownstreamManager, Ts...>
attach_stream_stage(scheduled_actor * self,const stream<In> & in,std::tuple<Ts...> xs,Init init,Fun fun,Finalize fin={},policy::arg<DownstreamManager> token={})55 attach_stream_stage(scheduled_actor* self, const stream<In>& in,
56 std::tuple<Ts...> xs, Init init, Fun fun, Finalize fin = {},
57 policy::arg<DownstreamManager> token = {}) {
58 CAF_IGNORE_UNUSED(token);
59 using output_type = typename stream_stage_trait_t<Fun>::output;
60 using state_type = typename stream_stage_trait_t<Fun>::state;
61 static_assert(
62 std::is_same<void(state_type&),
63 typename detail::get_callable_trait<Init>::fun_sig>::value,
64 "Expected signature `void (State&)` for init function");
65 using consume_one = void(state_type&, downstream<output_type>&, In);
66 using consume_all
67 = void(state_type&, downstream<output_type>&, std::vector<In>&);
68 using fun_sig = typename detail::get_callable_trait<Fun>::fun_sig;
69 static_assert(std::is_same<fun_sig, consume_one>::value
70 || std::is_same<fun_sig, consume_all>::value,
71 "Expected signature `void (State&, downstream<Out>&, In)` "
72 "or `void (State&, downstream<Out>&, std::vector<In>&)` "
73 "for consume function");
74 using driver
75 = detail::stream_stage_driver_impl<typename Trait::input, DownstreamManager,
76 Fun, Finalize>;
77 return attach_stream_stage<driver>(self, in, std::move(xs), std::move(init),
78 std::move(fun), std::move(fin));
79 }
80
81 /// Attaches a new stream stage to `self` by creating a default stream stage
82 /// manager from given callbacks.
83 /// @param self Points to the hosting actor.
84 /// @param in Stream handshake from upstream path.
85 /// @param init Function object for initializing the state of the stage.
86 /// @param fun Processing function.
87 /// @param fin Optional cleanup handler.
88 /// @param token Policy token for selecting a downstream manager
89 /// implementation.
90 /// @returns The new `stream_manager`, an inbound slot, and an outbound slot.
91 template <class In, class Init, class Fun, class Finalize = unit_t,
92 class DownstreamManager = default_downstream_manager_t<Fun>,
93 class Trait = stream_stage_trait_t<Fun>>
94 make_stage_result_t<In, DownstreamManager>
attach_stream_stage(scheduled_actor * self,const stream<In> & in,Init init,Fun fun,Finalize fin={},policy::arg<DownstreamManager> token={})95 attach_stream_stage(scheduled_actor* self, const stream<In>& in, Init init,
96 Fun fun, Finalize fin = {},
97 policy::arg<DownstreamManager> token = {}) {
98 return attach_stream_stage(self, in, std::make_tuple(), std::move(init),
99 std::move(fun), std::move(fin), token);
100 }
101
102 } // namespace caf
103