1%% This Source Code Form is subject to the terms of the Mozilla Public 2%% License, v. 2.0. If a copy of the MPL was not distributed with this 3%% file, You can obtain one at https://mozilla.org/MPL/2.0/. 4%% 5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved. 6%% 7 8-module(rabbit_shovel_config). 9 10-export([parse/2, 11 ensure_defaults/2]). 12 13-include_lib("amqp_client/include/amqp_client.hrl"). 14-include("rabbit_shovel.hrl"). 15 16resolve_module(amqp091) -> rabbit_amqp091_shovel; 17resolve_module(amqp10) -> rabbit_amqp10_shovel. 18 19is_legacy(Config) -> 20 not proplists:is_defined(source, Config). 21 22get_brokers(Props) -> 23 case proplists:get_value(brokers, Props) of 24 undefined -> 25 [get_value(broker, Props)]; 26 Brokers -> 27 Brokers 28 end. 29 30convert_from_legacy(Config) -> 31 S = get_value(sources, Config), 32 validate(S), 33 SUris = get_brokers(S), 34 validate_uris(brokers, SUris), 35 D = get_value(destinations, Config), 36 validate(D), 37 DUris = get_brokers(D), 38 validate_uris(brokers, DUris), 39 Q = get_value(queue, Config), 40 DA = proplists:get_value(delete_after, Config, never), 41 Pref = proplists:get_value(prefetch_count, Config, ?DEFAULT_PREFETCH), 42 RD = proplists:get_value(reconnect_delay, Config, ?DEFAULT_RECONNECT_DELAY), 43 AckMode = proplists:get_value(ack_mode, Config, ?DEFAULT_ACK_MODE), 44 validate_ack_mode(AckMode), 45 PubFields = proplists:get_value(publish_fields, Config, []), 46 PubProps = proplists:get_value(publish_properties, Config, []), 47 AFH = proplists:get_value(add_forward_headers, Config, false), 48 ATH = proplists:get_value(add_timestamp_header, Config, false), 49 SourceDecls = proplists:get_value(declarations, S, []), 50 validate_list(SourceDecls), 51 DestDecls = proplists:get_value(declarations, D, []), 52 validate_list(DestDecls), 53 [{source, [{protocol, amqp091}, 54 {uris, SUris}, 55 {declarations, SourceDecls}, 56 {queue, Q}, 57 {delete_after, DA}, 58 {prefetch_count, Pref}]}, 59 {destination, [{protocol, amqp091}, 60 {uris, DUris}, 61 {declarations, DestDecls}, 62 {publish_properties, PubProps}, 63 {publish_fields, PubFields}, 64 {add_forward_headers, AFH}, 65 {add_timestamp_header, ATH}]}, 66 {ack_mode, AckMode}, 67 {reconnect_delay, RD}]. 68 69parse(ShovelName, Config0) -> 70 try 71 validate(Config0), 72 case is_legacy(Config0) of 73 true -> 74 Config = convert_from_legacy(Config0), 75 parse_current(ShovelName, Config); 76 false -> 77 parse_current(ShovelName, Config0) 78 end 79 catch throw:{error, Reason} -> 80 {error, {invalid_shovel_configuration, ShovelName, Reason}}; 81 throw:Reason -> 82 {error, {invalid_shovel_configuration, ShovelName, Reason}} 83 end. 84 85validate(Props) -> 86 validate_proplist(Props), 87 validate_duplicates(Props). 88 89validate_proplist(Props) when is_list (Props) -> 90 case lists:filter(fun ({_, _}) -> false; 91 (_) -> true 92 end, Props) of 93 [] -> ok; 94 Invalid -> 95 throw({invalid_parameters, Invalid}) 96 end; 97validate_proplist(X) -> 98 throw({require_list, X}). 99 100validate_duplicates(Props) -> 101 case duplicate_keys(Props) of 102 [] -> ok; 103 Invalid -> 104 throw({duplicate_parameters, Invalid}) 105 end. 106 107validate_list(L) when is_list(L) -> ok; 108validate_list(L) -> 109 throw({require_list, L}). 110 111validate_uris(Key, L) when not is_list(L) -> 112 throw({require_list, Key, L}); 113validate_uris(Key, []) -> 114 throw({expected_non_empty_list, Key}); 115validate_uris(_Key, L) -> 116 validate_uris0(L). 117 118validate_uris0([Uri | Uris]) -> 119 case amqp_uri:parse(Uri) of 120 {ok, _Params} -> 121 validate_uris0(Uris); 122 {error, _} = Err -> 123 throw(Err) 124 end; 125validate_uris0([]) -> ok. 126 127parse_current(ShovelName, Config) -> 128 {source, Source} = proplists:lookup(source, Config), 129 validate(Source), 130 SrcMod = resolve_module(proplists:get_value(protocol, Source, amqp091)), 131 {destination, Destination} = proplists:lookup(destination, Config), 132 validate(Destination), 133 DstMod = resolve_module(proplists:get_value(protocol, Destination, amqp091)), 134 AckMode = proplists:get_value(ack_mode, Config, no_ack), 135 validate_ack_mode(AckMode), 136 {ok, #{name => ShovelName, 137 shovel_type => static, 138 ack_mode => AckMode, 139 reconnect_delay => proplists:get_value(reconnect_delay, Config, 140 ?DEFAULT_RECONNECT_DELAY), 141 source => rabbit_shovel_behaviour:parse(SrcMod, ShovelName, 142 {source, Source}), 143 dest => rabbit_shovel_behaviour:parse(DstMod, ShovelName, 144 {destination, Destination})}}. 145 146%% ensures that any defaults that have been applied to a parsed 147%% shovel, are written back to the original proplist 148ensure_defaults(ShovelConfig, ParsedShovel) -> 149 lists:keystore(reconnect_delay, 1, 150 ShovelConfig, 151 {reconnect_delay, 152 ParsedShovel#shovel.reconnect_delay}). 153 154-spec fail(term()) -> no_return(). 155fail(Reason) -> throw({error, Reason}). 156 157validate_ack_mode(Val) when Val =:= no_ack orelse 158 Val =:= on_publish orelse 159 Val =:= on_confirm -> 160 ok; 161validate_ack_mode(WrongVal) -> 162 fail({invalid_parameter_value, ack_mode, 163 {ack_mode_value_requires_one_of, {no_ack, on_publish, on_confirm}, 164 WrongVal}}). 165 166duplicate_keys(PropList) when is_list(PropList) -> 167 proplists:get_keys( 168 lists:foldl(fun (K, L) -> lists:keydelete(K, 1, L) end, PropList, 169 proplists:get_keys(PropList))). 170 171get_value(Key, Props) -> 172 case proplists:get_value(Key, Props) of 173 undefined -> 174 throw({missing_parameter, Key}); 175 V -> V 176 end. 177