1%% This Source Code Form is subject to the terms of the Mozilla Public
2%% License, v. 2.0. If a copy of the MPL was not distributed with this
3%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4%%
5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates.  All rights reserved.
6%%
7
8-module(rabbit_shovel_config).
9
10-export([parse/2,
11         ensure_defaults/2]).
12
13-include_lib("amqp_client/include/amqp_client.hrl").
14-include("rabbit_shovel.hrl").
15
16resolve_module(amqp091) -> rabbit_amqp091_shovel;
17resolve_module(amqp10) -> rabbit_amqp10_shovel.
18
19is_legacy(Config) ->
20    not proplists:is_defined(source, Config).
21
22get_brokers(Props) ->
23    case proplists:get_value(brokers, Props) of
24        undefined ->
25            [get_value(broker, Props)];
26        Brokers ->
27            Brokers
28    end.
29
30convert_from_legacy(Config) ->
31    S = get_value(sources, Config),
32    validate(S),
33    SUris = get_brokers(S),
34    validate_uris(brokers, SUris),
35    D = get_value(destinations, Config),
36    validate(D),
37    DUris = get_brokers(D),
38    validate_uris(brokers, DUris),
39    Q = get_value(queue, Config),
40    DA = proplists:get_value(delete_after, Config, never),
41    Pref = proplists:get_value(prefetch_count, Config, ?DEFAULT_PREFETCH),
42    RD = proplists:get_value(reconnect_delay, Config, ?DEFAULT_RECONNECT_DELAY),
43    AckMode = proplists:get_value(ack_mode, Config, ?DEFAULT_ACK_MODE),
44    validate_ack_mode(AckMode),
45    PubFields = proplists:get_value(publish_fields, Config, []),
46    PubProps = proplists:get_value(publish_properties, Config, []),
47    AFH = proplists:get_value(add_forward_headers, Config, false),
48    ATH = proplists:get_value(add_timestamp_header, Config, false),
49    SourceDecls = proplists:get_value(declarations, S, []),
50    validate_list(SourceDecls),
51    DestDecls = proplists:get_value(declarations, D, []),
52    validate_list(DestDecls),
53    [{source, [{protocol, amqp091},
54               {uris, SUris},
55               {declarations, SourceDecls},
56               {queue, Q},
57               {delete_after, DA},
58               {prefetch_count, Pref}]},
59     {destination, [{protocol, amqp091},
60                    {uris, DUris},
61                    {declarations, DestDecls},
62                    {publish_properties, PubProps},
63                    {publish_fields, PubFields},
64                    {add_forward_headers, AFH},
65                    {add_timestamp_header, ATH}]},
66     {ack_mode, AckMode},
67     {reconnect_delay, RD}].
68
69parse(ShovelName, Config0) ->
70    try
71        validate(Config0),
72        case is_legacy(Config0) of
73            true ->
74                Config = convert_from_legacy(Config0),
75                parse_current(ShovelName, Config);
76            false ->
77                parse_current(ShovelName, Config0)
78        end
79    catch throw:{error, Reason} ->
80              {error, {invalid_shovel_configuration, ShovelName, Reason}};
81          throw:Reason ->
82              {error, {invalid_shovel_configuration, ShovelName, Reason}}
83    end.
84
85validate(Props) ->
86    validate_proplist(Props),
87    validate_duplicates(Props).
88
89validate_proplist(Props) when is_list (Props) ->
90    case lists:filter(fun ({_, _}) -> false;
91                          (_) -> true
92                      end, Props) of
93        [] -> ok;
94        Invalid ->
95            throw({invalid_parameters, Invalid})
96    end;
97validate_proplist(X) ->
98    throw({require_list, X}).
99
100validate_duplicates(Props) ->
101    case duplicate_keys(Props) of
102        [] -> ok;
103        Invalid ->
104            throw({duplicate_parameters, Invalid})
105    end.
106
107validate_list(L) when is_list(L) -> ok;
108validate_list(L) ->
109    throw({require_list, L}).
110
111validate_uris(Key, L) when not is_list(L) ->
112    throw({require_list, Key, L});
113validate_uris(Key, []) ->
114    throw({expected_non_empty_list, Key});
115validate_uris(_Key, L) ->
116    validate_uris0(L).
117
118validate_uris0([Uri | Uris]) ->
119    case amqp_uri:parse(Uri) of
120        {ok, _Params} ->
121            validate_uris0(Uris);
122        {error, _} = Err ->
123            throw(Err)
124    end;
125validate_uris0([]) -> ok.
126
127parse_current(ShovelName, Config) ->
128    {source, Source} = proplists:lookup(source, Config),
129    validate(Source),
130    SrcMod = resolve_module(proplists:get_value(protocol, Source, amqp091)),
131    {destination, Destination} = proplists:lookup(destination, Config),
132    validate(Destination),
133    DstMod = resolve_module(proplists:get_value(protocol, Destination, amqp091)),
134    AckMode = proplists:get_value(ack_mode, Config, no_ack),
135    validate_ack_mode(AckMode),
136    {ok, #{name => ShovelName,
137           shovel_type => static,
138           ack_mode => AckMode,
139           reconnect_delay => proplists:get_value(reconnect_delay, Config,
140                                                  ?DEFAULT_RECONNECT_DELAY),
141           source => rabbit_shovel_behaviour:parse(SrcMod, ShovelName,
142                                                   {source, Source}),
143           dest => rabbit_shovel_behaviour:parse(DstMod, ShovelName,
144                                                 {destination, Destination})}}.
145
146%% ensures that any defaults that have been applied to a parsed
147%% shovel, are written back to the original proplist
148ensure_defaults(ShovelConfig, ParsedShovel) ->
149    lists:keystore(reconnect_delay, 1,
150                   ShovelConfig,
151                   {reconnect_delay,
152                    ParsedShovel#shovel.reconnect_delay}).
153
154-spec fail(term()) -> no_return().
155fail(Reason) -> throw({error, Reason}).
156
157validate_ack_mode(Val) when Val =:= no_ack orelse
158                                Val =:= on_publish orelse
159                                Val =:= on_confirm ->
160    ok;
161validate_ack_mode(WrongVal) ->
162    fail({invalid_parameter_value, ack_mode,
163          {ack_mode_value_requires_one_of, {no_ack, on_publish, on_confirm},
164          WrongVal}}).
165
166duplicate_keys(PropList) when is_list(PropList) ->
167    proplists:get_keys(
168      lists:foldl(fun (K, L) -> lists:keydelete(K, 1, L) end, PropList,
169                  proplists:get_keys(PropList))).
170
171get_value(Key, Props) ->
172    case proplists:get_value(Key, Props) of
173        undefined ->
174            throw({missing_parameter, Key});
175        V -> V
176    end.
177