1%% This Source Code Form is subject to the terms of the Mozilla Public 2%% License, v. 2.0. If a copy of the MPL was not distributed with this 3%% file, You can obtain one at https://mozilla.org/MPL/2.0/. 4%% 5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved. 6%% 7 8-module(rabbit_shovel_dyn_worker_sup_sup). 9-behaviour(mirrored_supervisor). 10 11-export([start_link/0, init/1, adjust/2, stop_child/1, cleanup_specs/0]). 12 13-import(rabbit_misc, [pget/2]). 14-import(rabbit_data_coercion, [to_map/1, to_list/1]). 15 16-include("rabbit_shovel.hrl"). 17-include_lib("rabbit_common/include/rabbit.hrl"). 18-define(SUPERVISOR, ?MODULE). 19 20start_link() -> 21 Pid = case mirrored_supervisor:start_link( 22 {local, ?SUPERVISOR}, ?SUPERVISOR, 23 fun rabbit_misc:execute_mnesia_transaction/1, ?MODULE, []) of 24 {ok, Pid0} -> Pid0; 25 {error, {already_started, Pid0}} -> Pid0 26 end, 27 Shovels = rabbit_runtime_parameters:list_component(<<"shovel">>), 28 [start_child({pget(vhost, Shovel), pget(name, Shovel)}, 29 pget(value, Shovel)) || Shovel <- Shovels], 30 {ok, Pid}. 31 32adjust(Name, Def) -> 33 case child_exists(Name) of 34 true -> stop_child(Name); 35 false -> ok 36 end, 37 start_child(Name, Def). 38 39start_child({VHost, ShovelName} = Name, Def) -> 40 rabbit_log_shovel:debug("Asked to start a dynamic Shovel named '~s' in virtual host '~s'", [ShovelName, VHost]), 41 LockId = rabbit_shovel_locks:lock(Name), 42 cleanup_specs(), 43 rabbit_log_shovel:debug("Starting a mirrored supervisor named '~s' in virtual host '~s'", [ShovelName, VHost]), 44 Result = case mirrored_supervisor:start_child( 45 ?SUPERVISOR, 46 {Name, {rabbit_shovel_dyn_worker_sup, start_link, [Name, obfuscated_uris_parameters(Def)]}, 47 transient, ?WORKER_WAIT, worker, [rabbit_shovel_dyn_worker_sup]}) of 48 {ok, _Pid} -> ok; 49 {error, {already_started, _Pid}} -> ok 50 end, 51 %% release the lock if we managed to acquire one 52 rabbit_shovel_locks:unlock(LockId), 53 Result. 54 55obfuscated_uris_parameters(Def) when is_map(Def) -> 56 to_map(rabbit_shovel_parameters:obfuscate_uris_in_definition(to_list(Def))); 57obfuscated_uris_parameters(Def) when is_list(Def) -> 58 rabbit_shovel_parameters:obfuscate_uris_in_definition(Def). 59 60child_exists(Name) -> 61 lists:any(fun ({N, _, _, _}) -> N =:= Name end, 62 mirrored_supervisor:which_children(?SUPERVISOR)). 63 64stop_child({VHost, ShovelName} = Name) -> 65 rabbit_log_shovel:debug("Asked to stop a dynamic Shovel named '~s' in virtual host '~s'", [ShovelName, VHost]), 66 LockId = rabbit_shovel_locks:lock(Name), 67 case get({shovel_worker_autodelete, Name}) of 68 true -> ok; %% [1] 69 _ -> 70 ok = mirrored_supervisor:terminate_child(?SUPERVISOR, Name), 71 ok = mirrored_supervisor:delete_child(?SUPERVISOR, Name), 72 rabbit_shovel_status:remove(Name) 73 end, 74 rabbit_shovel_locks:unlock(LockId), 75 ok. 76 77%% [1] An autodeleting worker removes its own parameter, and thus ends 78%% up here via the parameter callback. It is a transient worker that 79%% is just about to terminate normally - so we don't need to tell the 80%% supervisor to stop us - and as usual if we call into our own 81%% supervisor we risk deadlock. 82%% 83%% See rabbit_shovel_worker:terminate/2 84 85cleanup_specs() -> 86 SpecsSet = sets:from_list([element(1, S) || S <- mirrored_supervisor:which_children(?SUPERVISOR)]), 87 ParamsSet = sets:from_list(rabbit_runtime_parameters:list_component(<<"shovel">>)), 88 F = fun(Spec, ok) -> 89 _ = mirrored_supervisor:delete_child(?SUPERVISOR, Spec), 90 ok 91 end, 92 ok = sets:fold(F, ok, sets:subtract(SpecsSet, ParamsSet)). 93 94%%---------------------------------------------------------------------------- 95 96init([]) -> 97 {ok, {{one_for_one, 3, 10}, []}}. 98