1%% This Source Code Form is subject to the terms of the Mozilla Public
2%% License, v. 2.0. If a copy of the MPL was not distributed with this
3%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4%%
5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates.  All rights reserved.
6%%
7
8-module(rabbit_shovel_worker).
9-behaviour(gen_server2).
10
11-export([start_link/3]).
12-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
13         code_change/3]).
14
15%% for testing purposes
16-export([get_connection_name/1]).
17
18-include_lib("amqp_client/include/amqp_client.hrl").
19-include("rabbit_shovel.hrl").
20
21-record(state, {inbound_conn, inbound_ch, outbound_conn, outbound_ch,
22                name, type, config, inbound_uri, outbound_uri, unacked,
23                remaining, %% [1]
24                remaining_unacked}). %% [2]
25
26%% [1] Counts down until we shut down in all modes
27%% [2] Counts down until we stop publishing in on-confirm mode
28
29start_link(Type, Name, Config) ->
30    ShovelParameter = rabbit_shovel_util:get_shovel_parameter(Name),
31    maybe_start_link(ShovelParameter, Type, Name, Config).
32
33maybe_start_link(not_found, dynamic, _Name, _Config) ->
34    %% See rabbitmq/rabbitmq-server#2655.
35    %% All dynamic shovels require that their associated parameter is present.
36    %% If not, this shovel has been deleted and stale child spec information
37    %% may still reside in the supervisor.
38    %%
39    %% We return 'ignore' to ensure that the child is not [re-]added in such case.
40    ignore;
41maybe_start_link(_, Type, Name, Config) ->
42    ok = rabbit_shovel_status:report(Name, Type, starting),
43    gen_server2:start_link(?MODULE, [Type, Name, Config], []).
44
45%%---------------------------
46%% Gen Server Implementation
47%%---------------------------
48
49init([Type, Name, Config0]) ->
50    Config = case Type of
51                static ->
52                     Config0;
53                dynamic ->
54                    ClusterName = rabbit_nodes:cluster_name(),
55                    {ok, Conf} = rabbit_shovel_parameters:parse(Name,
56                                                                ClusterName,
57                                                                Config0),
58                    Conf
59            end,
60    rabbit_log_shovel:debug("Initialising a Shovel ~s of type '~s'", [human_readable_name(Name), Type]),
61    gen_server2:cast(self(), init),
62    {ok, #state{name = Name, type = Type, config = Config}}.
63
64handle_call(_Msg, _From, State) ->
65    {noreply, State}.
66
67handle_cast(init, State = #state{config = Config0}) ->
68    try rabbit_shovel_behaviour:connect_source(Config0) of
69      Config ->
70        rabbit_log_shovel:debug("Shovel ~s connected to source", [human_readable_name(maps:get(name, Config))]),
71        %% this makes sure that connection pid is updated in case
72        %% any of the subsequent connection/init steps fail. See
73        %% rabbitmq/rabbitmq-shovel#54 for context.
74        gen_server2:cast(self(), connect_dest),
75        {noreply, State#state{config = Config}}
76    catch _:_ ->
77      rabbit_log_shovel:error("Shovel ~s could not connect to source", [human_readable_name(maps:get(name, Config0))]),
78      {stop, shutdown, State}
79    end;
80handle_cast(connect_dest, State = #state{config = Config0}) ->
81    try rabbit_shovel_behaviour:connect_dest(Config0) of
82      Config ->
83        rabbit_log_shovel:debug("Shovel ~s connected to destination", [human_readable_name(maps:get(name, Config))]),
84        gen_server2:cast(self(), init_shovel),
85        {noreply, State#state{config = Config}}
86    catch _:_ ->
87      rabbit_log_shovel:error("Shovel ~s could not connect to destination", [human_readable_name(maps:get(name, Config0))]),
88      {stop, shutdown, State}
89    end;
90handle_cast(init_shovel, State = #state{config = Config}) ->
91    %% Don't trap exits until we have established connections so that
92    %% if we try to shut down while waiting for a connection to be
93    %% established then we don't block
94    process_flag(trap_exit, true),
95    Config1 = rabbit_shovel_behaviour:init_dest(Config),
96    Config2 = rabbit_shovel_behaviour:init_source(Config1),
97    rabbit_log_shovel:debug("Shovel ~s has finished setting up its topology", [human_readable_name(maps:get(name, Config2))]),
98    State1 = State#state{config = Config2},
99    ok = report_running(State1),
100    {noreply, State1}.
101
102
103handle_info(Msg, State = #state{config = Config, name = Name}) ->
104    case rabbit_shovel_behaviour:handle_source(Msg, Config) of
105        not_handled ->
106            case rabbit_shovel_behaviour:handle_dest(Msg, Config) of
107                not_handled ->
108                    rabbit_log_shovel:warning("Shovel ~s could not handle a destination message ~p", [human_readable_name(Name), Msg]),
109                    {noreply, State};
110                {stop, {outbound_conn_died, heartbeat_timeout}} ->
111                    rabbit_log_shovel:error("Shovel ~s detected missed heartbeats on destination connection", [human_readable_name(Name)]),
112                    {stop, {shutdown, heartbeat_timeout}, State};
113                {stop, {outbound_conn_died, Reason}} ->
114                    rabbit_log_shovel:error("Shovel ~s detected destination connection failure: ~p", [human_readable_name(Name), Reason]),
115                    {stop, Reason, State};
116                {stop, Reason} ->
117                    rabbit_log_shovel:debug("Shovel ~s decided to stop due a message from destination: ~p", [human_readable_name(Name), Reason]),
118                    {stop, Reason, State};
119                Config1 ->
120                    {noreply, State#state{config = Config1}}
121            end;
122        {stop, {inbound_conn_died, heartbeat_timeout}} ->
123            rabbit_log_shovel:error("Shovel ~s detected missed heartbeats on source connection", [human_readable_name(Name)]),
124            {stop, {shutdown, heartbeat_timeout}, State};
125        {stop, {inbound_conn_died, Reason}} ->
126            rabbit_log_shovel:error("Shovel ~s detected source connection failure: ~p", [human_readable_name(Name), Reason]),
127            {stop, Reason, State};
128        {stop, Reason} ->
129            rabbit_log_shovel:error("Shovel ~s decided to stop due a message from source: ~p", [human_readable_name(Name), Reason]),
130            {stop, Reason, State};
131        Config1 ->
132            {noreply, State#state{config = Config1}}
133    end.
134
135terminate({shutdown, autodelete}, State = #state{name = Name,
136                                                 type = dynamic}) ->
137    {VHost, ShovelName} = Name,
138    rabbit_log_shovel:info("Shovel '~s' is stopping (it was configured to autodelete and transfer is completed)",
139                           [human_readable_name(Name)]),
140    close_connections(State),
141    %% See rabbit_shovel_dyn_worker_sup_sup:stop_child/1
142    put({shovel_worker_autodelete, Name}, true),
143    _ = rabbit_runtime_parameters:clear(VHost, <<"shovel">>, ShovelName, ?SHOVEL_USER),
144    rabbit_shovel_status:remove(Name),
145    ok;
146terminate(shutdown, State) ->
147    close_connections(State),
148    ok;
149terminate(socket_closed_unexpectedly, State) ->
150    close_connections(State),
151    ok;
152terminate({'EXIT', heartbeat_timeout}, State = #state{name = Name}) ->
153    rabbit_log_shovel:error("Shovel ~s is stopping because of a heartbeat timeout", [human_readable_name(Name)]),
154    rabbit_shovel_status:report(State#state.name, State#state.type,
155                                {terminated, "heartbeat timeout"}),
156    close_connections(State),
157    ok;
158terminate({'EXIT', outbound_conn_died}, State = #state{name = Name}) ->
159    rabbit_log_shovel:error("Shovel ~s is stopping because destination connection failed", [human_readable_name(Name)]),
160    rabbit_shovel_status:report(State#state.name, State#state.type,
161                                {terminated, "destination connection failed"}),
162    close_connections(State),
163    ok;
164terminate({'EXIT', inbound_conn_died}, State = #state{name = Name}) ->
165    rabbit_log_shovel:error("Shovel ~s is stopping because destination connection failed", [human_readable_name(Name)]),
166    rabbit_shovel_status:report(State#state.name, State#state.type,
167                                {terminated, "source connection failed"}),
168    close_connections(State),
169    ok;
170terminate({shutdown, heartbeat_timeout}, State = #state{name = Name}) ->
171    rabbit_log_shovel:error("Shovel ~s is stopping because of a heartbeat timeout", [human_readable_name(Name)]),
172    rabbit_shovel_status:report(State#state.name, State#state.type,
173                                {terminated, "heartbeat timeout"}),
174    close_connections(State),
175    ok;
176terminate({shutdown, restart}, State = #state{name = Name}) ->
177    rabbit_log_shovel:error("Shovel ~s is stopping to restart", [human_readable_name(Name)]),
178    rabbit_shovel_status:report(State#state.name, State#state.type,
179                                {terminated, "needed a restart"}),
180    close_connections(State),
181    ok;
182terminate({{shutdown, {server_initiated_close, Code, Reason}}, _}, State = #state{name = Name}) ->
183    rabbit_log_shovel:error("Shovel ~s is stopping: one of its connections closed "
184                            "with code ~b, reason: ~s",
185                            [human_readable_name(Name), Code, Reason]),
186    rabbit_shovel_status:report(State#state.name, State#state.type,
187                                {terminated, "needed a restart"}),
188    close_connections(State),
189    ok;
190terminate(Reason, State = #state{name = Name}) ->
191    rabbit_log_shovel:error("Shovel ~s is stopping, reason: ~p", [human_readable_name(Name), Reason]),
192    rabbit_shovel_status:report(State#state.name, State#state.type,
193                                {terminated, Reason}),
194    close_connections(State),
195    ok.
196
197code_change(_OldVsn, State, _Extra) ->
198    {ok, State}.
199
200%%---------------------------
201%% Helpers
202%%---------------------------
203
204human_readable_name(Name) ->
205  case Name of
206    {VHost, ShovelName} -> rabbit_misc:format("'~s' in virtual host '~s'", [ShovelName, VHost]);
207    ShovelName          -> rabbit_misc:format("'~s'", [ShovelName])
208  end.
209
210report_running(#state{config = Config} = State) ->
211    InUri = rabbit_shovel_behaviour:source_uri(Config),
212    OutUri = rabbit_shovel_behaviour:dest_uri(Config),
213    InProto = rabbit_shovel_behaviour:source_protocol(Config),
214    OutProto = rabbit_shovel_behaviour:dest_protocol(Config),
215    InEndpoint = rabbit_shovel_behaviour:source_endpoint(Config),
216    OutEndpoint = rabbit_shovel_behaviour:dest_endpoint(Config),
217    rabbit_shovel_status:report(State#state.name, State#state.type,
218                                {running, [{src_uri,  rabbit_data_coercion:to_binary(InUri)},
219                                           {src_protocol, rabbit_data_coercion:to_binary(InProto)},
220                                           {dest_protocol, rabbit_data_coercion:to_binary(OutProto)},
221                                           {dest_uri, rabbit_data_coercion:to_binary(OutUri)}]
222                                 ++ props_to_binary(InEndpoint) ++ props_to_binary(OutEndpoint)
223                                }).
224
225props_to_binary(Props) ->
226    [{K, rabbit_data_coercion:to_binary(V)} || {K, V} <- Props].
227
228%% for static shovels, name is an atom from the configuration file
229get_connection_name(ShovelName) when is_atom(ShovelName) ->
230    Prefix = <<"Shovel ">>,
231    ShovelNameAsBinary = atom_to_binary(ShovelName, utf8),
232    <<Prefix/binary, ShovelNameAsBinary/binary>>;
233
234%% for dynamic shovels, name is a tuple with a binary
235get_connection_name({_, Name}) when is_binary(Name) ->
236    Prefix = <<"Shovel ">>,
237    <<Prefix/binary, Name/binary>>;
238
239%% fallback
240get_connection_name(_) ->
241    <<"Shovel">>.
242
243close_connections(#state{config = Conf}) ->
244    ok = rabbit_shovel_behaviour:close_source(Conf),
245    ok = rabbit_shovel_behaviour:close_dest(Conf).
246