1%% This Source Code Form is subject to the terms of the Mozilla Public 2%% License, v. 2.0. If a copy of the MPL was not distributed with this 3%% file, You can obtain one at https://mozilla.org/MPL/2.0/. 4%% 5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved. 6%% 7 8-module(rabbit_shovel_worker). 9-behaviour(gen_server2). 10 11-export([start_link/3]). 12-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, 13 code_change/3]). 14 15%% for testing purposes 16-export([get_connection_name/1]). 17 18-include_lib("amqp_client/include/amqp_client.hrl"). 19-include("rabbit_shovel.hrl"). 20 21-record(state, {inbound_conn, inbound_ch, outbound_conn, outbound_ch, 22 name, type, config, inbound_uri, outbound_uri, unacked, 23 remaining, %% [1] 24 remaining_unacked}). %% [2] 25 26%% [1] Counts down until we shut down in all modes 27%% [2] Counts down until we stop publishing in on-confirm mode 28 29start_link(Type, Name, Config) -> 30 ShovelParameter = rabbit_shovel_util:get_shovel_parameter(Name), 31 maybe_start_link(ShovelParameter, Type, Name, Config). 32 33maybe_start_link(not_found, dynamic, _Name, _Config) -> 34 %% See rabbitmq/rabbitmq-server#2655. 35 %% All dynamic shovels require that their associated parameter is present. 36 %% If not, this shovel has been deleted and stale child spec information 37 %% may still reside in the supervisor. 38 %% 39 %% We return 'ignore' to ensure that the child is not [re-]added in such case. 40 ignore; 41maybe_start_link(_, Type, Name, Config) -> 42 ok = rabbit_shovel_status:report(Name, Type, starting), 43 gen_server2:start_link(?MODULE, [Type, Name, Config], []). 44 45%%--------------------------- 46%% Gen Server Implementation 47%%--------------------------- 48 49init([Type, Name, Config0]) -> 50 Config = case Type of 51 static -> 52 Config0; 53 dynamic -> 54 ClusterName = rabbit_nodes:cluster_name(), 55 {ok, Conf} = rabbit_shovel_parameters:parse(Name, 56 ClusterName, 57 Config0), 58 Conf 59 end, 60 rabbit_log_shovel:debug("Initialising a Shovel ~s of type '~s'", [human_readable_name(Name), Type]), 61 gen_server2:cast(self(), init), 62 {ok, #state{name = Name, type = Type, config = Config}}. 63 64handle_call(_Msg, _From, State) -> 65 {noreply, State}. 66 67handle_cast(init, State = #state{config = Config0}) -> 68 try rabbit_shovel_behaviour:connect_source(Config0) of 69 Config -> 70 rabbit_log_shovel:debug("Shovel ~s connected to source", [human_readable_name(maps:get(name, Config))]), 71 %% this makes sure that connection pid is updated in case 72 %% any of the subsequent connection/init steps fail. See 73 %% rabbitmq/rabbitmq-shovel#54 for context. 74 gen_server2:cast(self(), connect_dest), 75 {noreply, State#state{config = Config}} 76 catch _:_ -> 77 rabbit_log_shovel:error("Shovel ~s could not connect to source", [human_readable_name(maps:get(name, Config0))]), 78 {stop, shutdown, State} 79 end; 80handle_cast(connect_dest, State = #state{config = Config0}) -> 81 try rabbit_shovel_behaviour:connect_dest(Config0) of 82 Config -> 83 rabbit_log_shovel:debug("Shovel ~s connected to destination", [human_readable_name(maps:get(name, Config))]), 84 gen_server2:cast(self(), init_shovel), 85 {noreply, State#state{config = Config}} 86 catch _:_ -> 87 rabbit_log_shovel:error("Shovel ~s could not connect to destination", [human_readable_name(maps:get(name, Config0))]), 88 {stop, shutdown, State} 89 end; 90handle_cast(init_shovel, State = #state{config = Config}) -> 91 %% Don't trap exits until we have established connections so that 92 %% if we try to shut down while waiting for a connection to be 93 %% established then we don't block 94 process_flag(trap_exit, true), 95 Config1 = rabbit_shovel_behaviour:init_dest(Config), 96 Config2 = rabbit_shovel_behaviour:init_source(Config1), 97 rabbit_log_shovel:debug("Shovel ~s has finished setting up its topology", [human_readable_name(maps:get(name, Config2))]), 98 State1 = State#state{config = Config2}, 99 ok = report_running(State1), 100 {noreply, State1}. 101 102 103handle_info(Msg, State = #state{config = Config, name = Name}) -> 104 case rabbit_shovel_behaviour:handle_source(Msg, Config) of 105 not_handled -> 106 case rabbit_shovel_behaviour:handle_dest(Msg, Config) of 107 not_handled -> 108 rabbit_log_shovel:warning("Shovel ~s could not handle a destination message ~p", [human_readable_name(Name), Msg]), 109 {noreply, State}; 110 {stop, {outbound_conn_died, heartbeat_timeout}} -> 111 rabbit_log_shovel:error("Shovel ~s detected missed heartbeats on destination connection", [human_readable_name(Name)]), 112 {stop, {shutdown, heartbeat_timeout}, State}; 113 {stop, {outbound_conn_died, Reason}} -> 114 rabbit_log_shovel:error("Shovel ~s detected destination connection failure: ~p", [human_readable_name(Name), Reason]), 115 {stop, Reason, State}; 116 {stop, Reason} -> 117 rabbit_log_shovel:debug("Shovel ~s decided to stop due a message from destination: ~p", [human_readable_name(Name), Reason]), 118 {stop, Reason, State}; 119 Config1 -> 120 {noreply, State#state{config = Config1}} 121 end; 122 {stop, {inbound_conn_died, heartbeat_timeout}} -> 123 rabbit_log_shovel:error("Shovel ~s detected missed heartbeats on source connection", [human_readable_name(Name)]), 124 {stop, {shutdown, heartbeat_timeout}, State}; 125 {stop, {inbound_conn_died, Reason}} -> 126 rabbit_log_shovel:error("Shovel ~s detected source connection failure: ~p", [human_readable_name(Name), Reason]), 127 {stop, Reason, State}; 128 {stop, Reason} -> 129 rabbit_log_shovel:error("Shovel ~s decided to stop due a message from source: ~p", [human_readable_name(Name), Reason]), 130 {stop, Reason, State}; 131 Config1 -> 132 {noreply, State#state{config = Config1}} 133 end. 134 135terminate({shutdown, autodelete}, State = #state{name = Name, 136 type = dynamic}) -> 137 {VHost, ShovelName} = Name, 138 rabbit_log_shovel:info("Shovel '~s' is stopping (it was configured to autodelete and transfer is completed)", 139 [human_readable_name(Name)]), 140 close_connections(State), 141 %% See rabbit_shovel_dyn_worker_sup_sup:stop_child/1 142 put({shovel_worker_autodelete, Name}, true), 143 _ = rabbit_runtime_parameters:clear(VHost, <<"shovel">>, ShovelName, ?SHOVEL_USER), 144 rabbit_shovel_status:remove(Name), 145 ok; 146terminate(shutdown, State) -> 147 close_connections(State), 148 ok; 149terminate(socket_closed_unexpectedly, State) -> 150 close_connections(State), 151 ok; 152terminate({'EXIT', heartbeat_timeout}, State = #state{name = Name}) -> 153 rabbit_log_shovel:error("Shovel ~s is stopping because of a heartbeat timeout", [human_readable_name(Name)]), 154 rabbit_shovel_status:report(State#state.name, State#state.type, 155 {terminated, "heartbeat timeout"}), 156 close_connections(State), 157 ok; 158terminate({'EXIT', outbound_conn_died}, State = #state{name = Name}) -> 159 rabbit_log_shovel:error("Shovel ~s is stopping because destination connection failed", [human_readable_name(Name)]), 160 rabbit_shovel_status:report(State#state.name, State#state.type, 161 {terminated, "destination connection failed"}), 162 close_connections(State), 163 ok; 164terminate({'EXIT', inbound_conn_died}, State = #state{name = Name}) -> 165 rabbit_log_shovel:error("Shovel ~s is stopping because destination connection failed", [human_readable_name(Name)]), 166 rabbit_shovel_status:report(State#state.name, State#state.type, 167 {terminated, "source connection failed"}), 168 close_connections(State), 169 ok; 170terminate({shutdown, heartbeat_timeout}, State = #state{name = Name}) -> 171 rabbit_log_shovel:error("Shovel ~s is stopping because of a heartbeat timeout", [human_readable_name(Name)]), 172 rabbit_shovel_status:report(State#state.name, State#state.type, 173 {terminated, "heartbeat timeout"}), 174 close_connections(State), 175 ok; 176terminate({shutdown, restart}, State = #state{name = Name}) -> 177 rabbit_log_shovel:error("Shovel ~s is stopping to restart", [human_readable_name(Name)]), 178 rabbit_shovel_status:report(State#state.name, State#state.type, 179 {terminated, "needed a restart"}), 180 close_connections(State), 181 ok; 182terminate({{shutdown, {server_initiated_close, Code, Reason}}, _}, State = #state{name = Name}) -> 183 rabbit_log_shovel:error("Shovel ~s is stopping: one of its connections closed " 184 "with code ~b, reason: ~s", 185 [human_readable_name(Name), Code, Reason]), 186 rabbit_shovel_status:report(State#state.name, State#state.type, 187 {terminated, "needed a restart"}), 188 close_connections(State), 189 ok; 190terminate(Reason, State = #state{name = Name}) -> 191 rabbit_log_shovel:error("Shovel ~s is stopping, reason: ~p", [human_readable_name(Name), Reason]), 192 rabbit_shovel_status:report(State#state.name, State#state.type, 193 {terminated, Reason}), 194 close_connections(State), 195 ok. 196 197code_change(_OldVsn, State, _Extra) -> 198 {ok, State}. 199 200%%--------------------------- 201%% Helpers 202%%--------------------------- 203 204human_readable_name(Name) -> 205 case Name of 206 {VHost, ShovelName} -> rabbit_misc:format("'~s' in virtual host '~s'", [ShovelName, VHost]); 207 ShovelName -> rabbit_misc:format("'~s'", [ShovelName]) 208 end. 209 210report_running(#state{config = Config} = State) -> 211 InUri = rabbit_shovel_behaviour:source_uri(Config), 212 OutUri = rabbit_shovel_behaviour:dest_uri(Config), 213 InProto = rabbit_shovel_behaviour:source_protocol(Config), 214 OutProto = rabbit_shovel_behaviour:dest_protocol(Config), 215 InEndpoint = rabbit_shovel_behaviour:source_endpoint(Config), 216 OutEndpoint = rabbit_shovel_behaviour:dest_endpoint(Config), 217 rabbit_shovel_status:report(State#state.name, State#state.type, 218 {running, [{src_uri, rabbit_data_coercion:to_binary(InUri)}, 219 {src_protocol, rabbit_data_coercion:to_binary(InProto)}, 220 {dest_protocol, rabbit_data_coercion:to_binary(OutProto)}, 221 {dest_uri, rabbit_data_coercion:to_binary(OutUri)}] 222 ++ props_to_binary(InEndpoint) ++ props_to_binary(OutEndpoint) 223 }). 224 225props_to_binary(Props) -> 226 [{K, rabbit_data_coercion:to_binary(V)} || {K, V} <- Props]. 227 228%% for static shovels, name is an atom from the configuration file 229get_connection_name(ShovelName) when is_atom(ShovelName) -> 230 Prefix = <<"Shovel ">>, 231 ShovelNameAsBinary = atom_to_binary(ShovelName, utf8), 232 <<Prefix/binary, ShovelNameAsBinary/binary>>; 233 234%% for dynamic shovels, name is a tuple with a binary 235get_connection_name({_, Name}) when is_binary(Name) -> 236 Prefix = <<"Shovel ">>, 237 <<Prefix/binary, Name/binary>>; 238 239%% fallback 240get_connection_name(_) -> 241 <<"Shovel">>. 242 243close_connections(#state{config = Conf}) -> 244 ok = rabbit_shovel_behaviour:close_source(Conf), 245 ok = rabbit_shovel_behaviour:close_dest(Conf). 246