1%% This Source Code Form is subject to the terms of the Mozilla Public
2%% License, v. 2.0. If a copy of the MPL was not distributed with this
3%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4%%
5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates.  All rights reserved.
6%%
7
8-module(rabbit_shovel_dyn_worker_sup_sup).
9-behaviour(mirrored_supervisor).
10
11-export([start_link/0, init/1, adjust/2, stop_child/1, cleanup_specs/0]).
12
13-import(rabbit_misc, [pget/2]).
14-import(rabbit_data_coercion, [to_map/1, to_list/1]).
15
16-include("rabbit_shovel.hrl").
17-include_lib("rabbit_common/include/rabbit.hrl").
18-define(SUPERVISOR, ?MODULE).
19
20start_link() ->
21    Pid = case mirrored_supervisor:start_link(
22                  {local, ?SUPERVISOR}, ?SUPERVISOR,
23                  fun rabbit_misc:execute_mnesia_transaction/1, ?MODULE, []) of
24            {ok, Pid0}                       -> Pid0;
25            {error, {already_started, Pid0}} -> Pid0
26          end,
27    Shovels = rabbit_runtime_parameters:list_component(<<"shovel">>),
28    [start_child({pget(vhost, Shovel), pget(name, Shovel)},
29                 pget(value, Shovel)) || Shovel <- Shovels],
30    {ok, Pid}.
31
32adjust(Name, Def) ->
33    case child_exists(Name) of
34        true  -> stop_child(Name);
35        false -> ok
36    end,
37    start_child(Name, Def).
38
39start_child({VHost, ShovelName} = Name, Def) ->
40    rabbit_log_shovel:debug("Asked to start a dynamic Shovel named '~s' in virtual host '~s'", [ShovelName, VHost]),
41    LockId = rabbit_shovel_locks:lock(Name),
42    cleanup_specs(),
43    rabbit_log_shovel:debug("Starting a mirrored supervisor named '~s' in virtual host '~s'", [ShovelName, VHost]),
44    Result = case mirrored_supervisor:start_child(
45           ?SUPERVISOR,
46           {Name, {rabbit_shovel_dyn_worker_sup, start_link, [Name, obfuscated_uris_parameters(Def)]},
47            transient, ?WORKER_WAIT, worker, [rabbit_shovel_dyn_worker_sup]}) of
48        {ok,                      _Pid}  -> ok;
49        {error, {already_started, _Pid}} -> ok
50    end,
51    %% release the lock if we managed to acquire one
52    rabbit_shovel_locks:unlock(LockId),
53    Result.
54
55obfuscated_uris_parameters(Def) when is_map(Def) ->
56    to_map(rabbit_shovel_parameters:obfuscate_uris_in_definition(to_list(Def)));
57obfuscated_uris_parameters(Def) when is_list(Def) ->
58    rabbit_shovel_parameters:obfuscate_uris_in_definition(Def).
59
60child_exists(Name) ->
61    lists:any(fun ({N, _, _, _}) -> N =:= Name end,
62              mirrored_supervisor:which_children(?SUPERVISOR)).
63
64stop_child({VHost, ShovelName} = Name) ->
65    rabbit_log_shovel:debug("Asked to stop a dynamic Shovel named '~s' in virtual host '~s'", [ShovelName, VHost]),
66    LockId = rabbit_shovel_locks:lock(Name),
67    case get({shovel_worker_autodelete, Name}) of
68        true -> ok; %% [1]
69        _ ->
70            ok = mirrored_supervisor:terminate_child(?SUPERVISOR, Name),
71            ok = mirrored_supervisor:delete_child(?SUPERVISOR, Name),
72            rabbit_shovel_status:remove(Name)
73    end,
74    rabbit_shovel_locks:unlock(LockId),
75    ok.
76
77%% [1] An autodeleting worker removes its own parameter, and thus ends
78%% up here via the parameter callback. It is a transient worker that
79%% is just about to terminate normally - so we don't need to tell the
80%% supervisor to stop us - and as usual if we call into our own
81%% supervisor we risk deadlock.
82%%
83%% See rabbit_shovel_worker:terminate/2
84
85cleanup_specs() ->
86    SpecsSet = sets:from_list([element(1, S) || S <- mirrored_supervisor:which_children(?SUPERVISOR)]),
87    ParamsSet = sets:from_list(rabbit_runtime_parameters:list_component(<<"shovel">>)),
88    F = fun(Spec, ok) ->
89            _ = mirrored_supervisor:delete_child(?SUPERVISOR, Spec),
90            ok
91        end,
92    ok = sets:fold(F, ok, sets:subtract(SpecsSet, ParamsSet)).
93
94%%----------------------------------------------------------------------------
95
96init([]) ->
97    {ok, {{one_for_one, 3, 10}, []}}.
98