1%% This Source Code Form is subject to the terms of the Mozilla Public
2%% License, v. 2.0. If a copy of the MPL was not distributed with this
3%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4%%
5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates.  All rights reserved.
6%%
7
8-module(rabbit_policy).
9
10%% Policies is a way to apply optional arguments ("x-args")
11%% to exchanges and queues in bulk, using name matching.
12%%
13%% Only one policy can apply to a given queue or exchange
14%% at a time. Priorities help determine what policy should
15%% take precedence.
16%%
17%% Policies build on runtime parameters. Policy-driven parameters
18%% are well known and therefore validated.
19%%
20%% See also:
21%%
22%%  * rabbit_runtime_parameters
23%%  * rabbit_policies
24%%  * rabbit_registry
25
26%% TODO specs
27
28-behaviour(rabbit_runtime_parameter).
29
30-include_lib("rabbit_common/include/rabbit.hrl").
31-include("amqqueue.hrl").
32
33-import(rabbit_misc, [pget/2, pget/3]).
34
35-export([register/0]).
36-export([invalidate/0, recover/0]).
37-export([name/1, name_op/1, effective_definition/1, merge_operator_definitions/2, get/2, get_arg/3, set/1]).
38-export([validate/5, notify/5, notify_clear/4]).
39-export([parse_set/7, set/7, delete/3, lookup/2, list/0, list/1,
40         list_formatted/1, list_formatted/3, info_keys/0]).
41-export([parse_set_op/7, set_op/7, delete_op/3, lookup_op/2, list_op/0, list_op/1, list_op/2,
42         list_formatted_op/1, list_formatted_op/3,
43         match_all/2, match_as_map/1, match_op_as_map/1, definition_keys/1,
44         list_in/1, list_in/2, list_as_maps/0, list_as_maps/1, list_op_as_maps/1
45        ]).
46-export([sort_by_priority/1]).
47
48-rabbit_boot_step({?MODULE,
49                   [{description, "policy parameters"},
50                    {mfa, {rabbit_policy, register, []}},
51                    {requires, rabbit_registry},
52                    {enables, recovery}]}).
53
54register() ->
55    rabbit_registry:register(runtime_parameter, <<"policy">>, ?MODULE),
56    rabbit_registry:register(runtime_parameter, <<"operator_policy">>, ?MODULE).
57
58name(Q) when ?is_amqqueue(Q) ->
59    Policy = amqqueue:get_policy(Q),
60    name0(Policy);
61name(#exchange{policy = Policy}) -> name0(Policy).
62
63name_op(Q) when ?is_amqqueue(Q) ->
64    OpPolicy = amqqueue:get_operator_policy(Q),
65    name0(OpPolicy);
66name_op(#exchange{operator_policy = Policy}) -> name0(Policy).
67
68name0(undefined) -> none;
69name0(Policy)    -> pget(name, Policy).
70
71effective_definition(Q) when ?is_amqqueue(Q) ->
72    Policy = amqqueue:get_policy(Q),
73    OpPolicy = amqqueue:get_operator_policy(Q),
74    merge_operator_definitions(Policy, OpPolicy);
75effective_definition(#exchange{policy = Policy, operator_policy = OpPolicy}) ->
76    merge_operator_definitions(Policy, OpPolicy).
77
78merge_operator_definitions(undefined, undefined) -> undefined;
79merge_operator_definitions(Policy, undefined)    -> pget(definition, Policy);
80merge_operator_definitions(undefined, OpPolicy)  -> pget(definition, OpPolicy);
81merge_operator_definitions(Policy, OpPolicy) ->
82    OpDefinition = rabbit_data_coercion:to_map(pget(definition, OpPolicy, [])),
83    Definition   = rabbit_data_coercion:to_map(pget(definition, Policy, [])),
84    Keys = maps:keys(Definition),
85    OpKeys = maps:keys(OpDefinition),
86    lists:map(fun(Key) ->
87        case {maps:get(Key, Definition, undefined), maps:get(Key, OpDefinition, undefined)} of
88            {Val, undefined}   -> {Key, Val};
89            {undefined, OpVal} -> {Key, OpVal};
90            {Val, OpVal}       -> {Key, merge_policy_value(Key, Val, OpVal)}
91        end
92    end,
93    lists:umerge(Keys, OpKeys)).
94
95set(Q0) when ?is_amqqueue(Q0) ->
96    Name = amqqueue:get_name(Q0),
97    Policy = match(Name),
98    OpPolicy = match_op(Name),
99    Q1 = amqqueue:set_policy(Q0, Policy),
100    Q2 = amqqueue:set_operator_policy(Q1, OpPolicy),
101    Q2;
102set(X = #exchange{name = Name}) ->
103    X#exchange{policy = match(Name), operator_policy = match_op(Name)}.
104
105
106list() ->
107    list('_').
108
109list(VHost) ->
110    list0(VHost, fun ident/1).
111
112list_in(VHost) ->
113    list(VHost).
114
115list_in(VHost, DefinitionKeys) ->
116    [P || P <- list_in(VHost), keys_overlap(definition_keys(P), DefinitionKeys)].
117
118list_as_maps() ->
119    list_as_maps('_').
120
121list_as_maps(VHost) ->
122    [maps:from_list(PL) || PL <- sort_by_priority(list0(VHost, fun maps:from_list/1))].
123
124list_op_as_maps(VHost) ->
125    [maps:from_list(PL) || PL <- sort_by_priority(list0_op(VHost, fun maps:from_list/1))].
126
127list_formatted(VHost) ->
128    sort_by_priority(list0(VHost, fun rabbit_json:encode/1)).
129
130list_formatted(VHost, Ref, AggregatorPid) ->
131    rabbit_control_misc:emitting_map(AggregatorPid, Ref,
132                                     fun(P) -> P end, list_formatted(VHost)).
133
134list_op() ->
135    list_op('_').
136
137list_op(VHost) ->
138    list0_op(VHost, fun ident/1).
139
140list_op(VHost, DefinitionKeys) ->
141    [P || P <- list_op(VHost), keys_overlap(definition_keys(P), DefinitionKeys)].
142
143list_formatted_op(VHost) ->
144    sort_by_priority(list0_op(VHost, fun rabbit_json:encode/1)).
145
146list_formatted_op(VHost, Ref, AggregatorPid) ->
147    rabbit_control_misc:emitting_map(AggregatorPid, Ref,
148                                     fun(P) -> P end, list_formatted_op(VHost)).
149
150match(Name = #resource{virtual_host = VHost}) ->
151    match(Name, list(VHost)).
152
153match_op(Name = #resource{virtual_host = VHost}) ->
154    match(Name, list_op(VHost)).
155
156match_as_map(Name = #resource{virtual_host = VHost}) ->
157    [maps:from_list(PL) || PL <- match(Name, list(VHost))].
158
159match_op_as_map(Name = #resource{virtual_host = VHost}) ->
160    [maps:from_list(PL) || PL <- match(Name, list_op(VHost))].
161
162get(Name, Q) when ?is_amqqueue(Q) ->
163    Policy = amqqueue:get_policy(Q),
164    OpPolicy = amqqueue:get_operator_policy(Q),
165    get0(Name, Policy, OpPolicy);
166get(Name, #exchange{policy = Policy, operator_policy = OpPolicy}) ->
167    get0(Name, Policy, OpPolicy);
168
169%% Caution - SLOW.
170get(Name, EntityName = #resource{virtual_host = VHost}) ->
171    get0(Name,
172         match(EntityName, list(VHost)),
173         match(EntityName, list_op(VHost))).
174
175match(Name, Policies) ->
176    case match_all(Name, Policies) of
177        []           -> undefined;
178        [Policy | _] -> Policy
179    end.
180
181match_all(Name, Policies) ->
182   lists:sort(fun priority_comparator/2, [P || P <- Policies, matches(Name, P)]).
183
184matches(#resource{name = Name, kind = Kind, virtual_host = VHost} = Resource, Policy) ->
185    matches_type(Kind, pget('apply-to', Policy)) andalso
186        is_applicable(Resource, pget(definition, Policy)) andalso
187        match =:= re:run(Name, pget(pattern, Policy), [{capture, none}]) andalso
188        VHost =:= pget(vhost, Policy).
189
190get0(_Name, undefined, undefined) -> undefined;
191get0(Name, undefined, OpPolicy) -> pget(Name, pget(definition, OpPolicy, []));
192get0(Name, Policy, undefined) -> pget(Name, pget(definition, Policy, []));
193get0(Name, Policy, OpPolicy) ->
194    OpDefinition = pget(definition, OpPolicy, []),
195    Definition = pget(definition, Policy, []),
196    case {pget(Name, Definition), pget(Name, OpDefinition)} of
197        {undefined, undefined} -> undefined;
198        {Val, undefined}       -> Val;
199        {undefined, Val}       -> Val;
200        {Val, OpVal}           -> merge_policy_value(Name, Val, OpVal)
201    end.
202
203merge_policy_value(Name, PolicyVal, OpVal) ->
204    case policy_merge_strategy(Name) of
205        {ok, Module}       -> Module:merge_policy_value(Name, PolicyVal, OpVal);
206        {error, not_found} -> rabbit_policies:merge_policy_value(Name, PolicyVal, OpVal)
207    end.
208
209policy_merge_strategy(Name) ->
210    case rabbit_registry:binary_to_type(rabbit_data_coercion:to_binary(Name)) of
211        {error, not_found} ->
212            {error, not_found};
213        T                  ->
214            rabbit_registry:lookup_module(policy_merge_strategy, T)
215    end.
216
217%% Many heads for optimisation
218get_arg(_AName, _PName,     #exchange{arguments = [], policy = undefined}) ->
219    undefined;
220get_arg(_AName,  PName, X = #exchange{arguments = []}) ->
221    get(PName, X);
222get_arg(AName,   PName, X = #exchange{arguments = Args}) ->
223    case rabbit_misc:table_lookup(Args, AName) of
224        undefined    -> get(PName, X);
225        {_Type, Arg} -> Arg
226    end.
227
228%%----------------------------------------------------------------------------
229
230%% Gets called during upgrades - therefore must not assume anything about the
231%% state of Mnesia
232invalidate() ->
233    rabbit_file:write_file(invalid_file(), <<"">>).
234
235recover() ->
236    case rabbit_file:is_file(invalid_file()) of
237        true  -> recover0(),
238                 rabbit_file:delete(invalid_file());
239        false -> ok
240    end.
241
242%% To get here we have to have just completed an Mnesia upgrade - i.e. we are
243%% the first node starting. So we can rewrite the whole database.  Note that
244%% recovery has not yet happened; we must work with the rabbit_durable_<thing>
245%% variants.
246recover0() ->
247    Xs = mnesia:dirty_match_object(rabbit_durable_exchange, #exchange{_ = '_'}),
248    Qs = rabbit_amqqueue:list_with_possible_retry(
249           fun() ->
250                   mnesia:dirty_match_object(
251                     rabbit_durable_queue, amqqueue:pattern_match_all())
252           end),
253    Policies = list(),
254    OpPolicies = list_op(),
255    [rabbit_misc:execute_mnesia_transaction(
256       fun () ->
257               mnesia:write(
258                 rabbit_durable_exchange,
259                 rabbit_exchange_decorator:set(
260                   X#exchange{policy = match(Name, Policies),
261                              operator_policy = match(Name, OpPolicies)}),
262                 write)
263       end) || X = #exchange{name = Name} <- Xs],
264    [begin
265         QName = amqqueue:get_name(Q0),
266         Policy1 = match(QName, Policies),
267         Q1 = amqqueue:set_policy(Q0, Policy1),
268         OpPolicy1 = match(QName, OpPolicies),
269         Q2 = amqqueue:set_operator_policy(Q1, OpPolicy1),
270         Q3 = rabbit_queue_decorator:set(Q2),
271         ?try_mnesia_tx_or_upgrade_amqqueue_and_retry(
272            rabbit_misc:execute_mnesia_transaction(
273              fun () ->
274                      mnesia:write(rabbit_durable_queue, Q3, write)
275              end),
276            begin
277                Q4 = amqqueue:upgrade(Q3),
278                rabbit_misc:execute_mnesia_transaction(
279                  fun () ->
280                          mnesia:write(rabbit_durable_queue, Q4, write)
281                  end)
282            end)
283     end || Q0 <- Qs],
284    ok.
285
286invalid_file() ->
287    filename:join(rabbit_mnesia:dir(), "policies_are_invalid").
288
289%%----------------------------------------------------------------------------
290
291parse_set_op(VHost, Name, Pattern, Definition, Priority, ApplyTo, ActingUser) ->
292    parse_set(<<"operator_policy">>, VHost, Name, Pattern, Definition, Priority,
293              ApplyTo, ActingUser).
294
295parse_set(VHost, Name, Pattern, Definition, Priority, ApplyTo, ActingUser) ->
296    parse_set(<<"policy">>, VHost, Name, Pattern, Definition, Priority, ApplyTo,
297              ActingUser).
298
299parse_set(Type, VHost, Name, Pattern, Definition, Priority, ApplyTo, ActingUser) ->
300    try rabbit_data_coercion:to_integer(Priority) of
301        Num -> parse_set0(Type, VHost, Name, Pattern, Definition, Num, ApplyTo,
302                          ActingUser)
303    catch
304        error:badarg -> {error, "~p priority must be a number", [Priority]}
305    end.
306
307parse_set0(Type, VHost, Name, Pattern, Defn, Priority, ApplyTo, ActingUser) ->
308    case rabbit_json:try_decode(Defn) of
309        {ok, Term} ->
310            R = set0(Type, VHost, Name,
311                     [{<<"pattern">>,    Pattern},
312                      {<<"definition">>, maps:to_list(Term)},
313                      {<<"priority">>,   Priority},
314                      {<<"apply-to">>,   ApplyTo}],
315                     ActingUser),
316            rabbit_log:info("Successfully set policy '~s' matching ~s names in virtual host '~s' using pattern '~s'",
317                            [Name, ApplyTo, VHost, Pattern]),
318            R;
319        {error, Reason} ->
320            {error_string,
321                rabbit_misc:format("JSON decoding error. Reason: ~ts", [Reason])}
322    end.
323
324set_op(VHost, Name, Pattern, Definition, Priority, ApplyTo, ActingUser) ->
325    set(<<"operator_policy">>, VHost, Name, Pattern, Definition, Priority, ApplyTo, ActingUser).
326
327set(VHost, Name, Pattern, Definition, Priority, ApplyTo, ActingUser) ->
328    set(<<"policy">>, VHost, Name, Pattern, Definition, Priority, ApplyTo, ActingUser).
329
330set(Type, VHost, Name, Pattern, Definition, Priority, ApplyTo, ActingUser) ->
331    PolicyProps = [{<<"pattern">>,    Pattern},
332                   {<<"definition">>, Definition},
333                   {<<"priority">>,   case Priority of
334                                          undefined -> 0;
335                                          _         -> Priority
336                                      end},
337                   {<<"apply-to">>,   case ApplyTo of
338                                          undefined -> <<"all">>;
339                                          _         -> ApplyTo
340                                      end}],
341    set0(Type, VHost, Name, PolicyProps, ActingUser).
342
343set0(Type, VHost, Name, Term, ActingUser) ->
344    rabbit_runtime_parameters:set_any(VHost, Type, Name, Term, ActingUser).
345
346delete_op(VHost, Name, ActingUser) ->
347    rabbit_runtime_parameters:clear_any(VHost, <<"operator_policy">>, Name, ActingUser).
348
349delete(VHost, Name, ActingUser) ->
350    rabbit_runtime_parameters:clear_any(VHost, <<"policy">>, Name, ActingUser).
351
352lookup_op(VHost, Name) ->
353    case rabbit_runtime_parameters:lookup(VHost, <<"operator_policy">>, Name) of
354        not_found  -> not_found;
355        P          -> p(P, fun ident/1)
356    end.
357
358lookup(VHost, Name) ->
359    case rabbit_runtime_parameters:lookup(VHost, <<"policy">>, Name) of
360        not_found  -> not_found;
361        P          -> p(P, fun ident/1)
362    end.
363
364list0_op(VHost, DefnFun) ->
365    [p(P, DefnFun)
366     || P <- rabbit_runtime_parameters:list(VHost, <<"operator_policy">>)].
367
368list0(VHost, DefnFun) ->
369    [p(P, DefnFun) || P <- rabbit_runtime_parameters:list(VHost, <<"policy">>)].
370
371sort_by_priority(PropList) ->
372    lists:sort(fun (A, B) -> not priority_comparator(A, B) end, PropList).
373
374p(Parameter, DefnFun) ->
375    Value = pget(value, Parameter),
376    [{vhost,      pget(vhost, Parameter)},
377     {name,       pget(name, Parameter)},
378     {pattern,    pget(<<"pattern">>, Value)},
379     {'apply-to', pget(<<"apply-to">>, Value)},
380     {definition, DefnFun(pget(<<"definition">>, Value))},
381     {priority,   pget(<<"priority">>, Value)}].
382
383ident(X) -> X.
384
385info_keys() -> [vhost, name, 'apply-to', pattern, definition, priority].
386
387definition_keys(Policy) ->
388    case rabbit_data_coercion:to_map(Policy) of
389        #{definition := Def} ->
390            maps:keys(rabbit_data_coercion:to_map(Def));
391        _ -> []
392    end.
393
394keys_overlap(A, B) ->
395    lists:any(fun(Item) -> lists:member(Item, B) end, A).
396
397%%----------------------------------------------------------------------------
398
399validate(_VHost, <<"policy">>, Name, Term, _User) ->
400    rabbit_parameter_validation:proplist(
401      Name, policy_validation(), Term);
402validate(_VHost, <<"operator_policy">>, Name, Term, _User) ->
403    rabbit_parameter_validation:proplist(
404      Name, operator_policy_validation(), Term).
405
406notify(VHost, <<"policy">>, Name, Term0, ActingUser) ->
407    Term = rabbit_data_coercion:atomize_keys(Term0),
408    update_matched_objects(VHost, Term, ActingUser),
409    rabbit_event:notify(policy_set, [{name, Name}, {vhost, VHost},
410                                     {user_who_performed_action, ActingUser} | Term]);
411notify(VHost, <<"operator_policy">>, Name, Term0, ActingUser) ->
412    Term = rabbit_data_coercion:atomize_keys(Term0),
413    update_matched_objects(VHost, Term, ActingUser),
414    rabbit_event:notify(policy_set, [{name, Name}, {vhost, VHost},
415                                     {user_who_performed_action, ActingUser} | Term]).
416
417notify_clear(VHost, <<"policy">>, Name, ActingUser) ->
418    update_matched_objects(VHost, undefined, ActingUser),
419    rabbit_event:notify(policy_cleared, [{name, Name}, {vhost, VHost},
420                                         {user_who_performed_action, ActingUser}]);
421notify_clear(VHost, <<"operator_policy">>, Name, ActingUser) ->
422    update_matched_objects(VHost, undefined, ActingUser),
423    rabbit_event:notify(operator_policy_cleared,
424                        [{name, Name}, {vhost, VHost},
425                         {user_who_performed_action, ActingUser}]).
426
427%%----------------------------------------------------------------------------
428
429%% [1] We need to prevent this from becoming O(n^2) in a similar
430%% manner to rabbit_binding:remove_for_{source,destination}. So see
431%% the comment in rabbit_binding:lock_route_tables/0 for more rationale.
432%% [2] We could be here in a post-tx fun after the vhost has been
433%% deleted; in which case it's fine to do nothing.
434update_matched_objects(VHost, PolicyDef, ActingUser) ->
435    Tabs = [rabbit_queue,    rabbit_durable_queue,
436            rabbit_exchange, rabbit_durable_exchange],
437    {XUpdateResults, QUpdateResults} = rabbit_misc:execute_mnesia_transaction(
438        fun() ->
439            [mnesia:lock({table, T}, write) || T <- Tabs], %% [1]
440            case catch {list(VHost), list_op(VHost)} of
441                {'EXIT', {throw, {error, {no_such_vhost, _}}}} ->
442                    {[], []}; %% [2]
443                {'EXIT', Exit} ->
444                    exit(Exit);
445                {Policies, OpPolicies} ->
446                    {[update_exchange(X, Policies, OpPolicies) ||
447                        X <- rabbit_exchange:list(VHost)],
448                    [update_queue(Q, Policies, OpPolicies) ||
449                        Q <- rabbit_amqqueue:list(VHost)]}
450                end
451        end),
452    [catch maybe_notify_of_policy_change(XRes, PolicyDef, ActingUser) || XRes <- XUpdateResults],
453    [catch maybe_notify_of_policy_change(QRes, PolicyDef, ActingUser) || QRes <- QUpdateResults],
454    ok.
455
456update_exchange(X = #exchange{name = XName,
457                              policy = OldPolicy,
458                              operator_policy = OldOpPolicy},
459                Policies, OpPolicies) ->
460    case {match(XName, Policies), match(XName, OpPolicies)} of
461        {OldPolicy, OldOpPolicy} -> no_change;
462        {NewPolicy, NewOpPolicy} ->
463            NewExchange = rabbit_exchange:update(
464                XName,
465                fun(X0) ->
466                    rabbit_exchange_decorator:set(
467                        X0 #exchange{policy = NewPolicy,
468                                     operator_policy = NewOpPolicy})
469                    end),
470            case NewExchange of
471                #exchange{} = X1 -> {X, X1};
472                not_found        -> {X, X }
473            end
474    end.
475
476update_queue(Q0, Policies, OpPolicies) when ?is_amqqueue(Q0) ->
477    QName = amqqueue:get_name(Q0),
478    OldPolicy = amqqueue:get_policy(Q0),
479    OldOpPolicy = amqqueue:get_operator_policy(Q0),
480    case {match(QName, Policies), match(QName, OpPolicies)} of
481        {OldPolicy, OldOpPolicy} -> no_change;
482        {NewPolicy, NewOpPolicy} ->
483            F = fun (QFun0) ->
484                    QFun1 = amqqueue:set_policy(QFun0, NewPolicy),
485                    QFun2 = amqqueue:set_operator_policy(QFun1, NewOpPolicy),
486                    NewPolicyVersion = amqqueue:get_policy_version(QFun2) + 1,
487                    QFun3 = amqqueue:set_policy_version(QFun2, NewPolicyVersion),
488                    rabbit_queue_decorator:set(QFun3)
489                end,
490            NewQueue = rabbit_amqqueue:update(QName, F),
491            case NewQueue of
492                 Q1 when ?is_amqqueue(Q1) ->
493                    {Q0, Q1};
494                 not_found ->
495                    {Q0, Q0}
496             end
497    end.
498
499maybe_notify_of_policy_change(no_change, _PolicyDef, _ActingUser)->
500    ok;
501maybe_notify_of_policy_change({X1 = #exchange{}, X2 = #exchange{}}, _PolicyDef, _ActingUser) ->
502    rabbit_exchange:policy_changed(X1, X2);
503%% policy has been cleared
504maybe_notify_of_policy_change({Q1, Q2}, undefined, ActingUser) when ?is_amqqueue(Q1), ?is_amqqueue(Q2) ->
505    rabbit_event:notify(queue_policy_cleared, [
506        {name, amqqueue:get_name(Q2)},
507        {vhost, amqqueue:get_vhost(Q2)},
508        {type, amqqueue:get_type(Q2)},
509        {user_who_performed_action, ActingUser}
510    ]),
511    rabbit_amqqueue:policy_changed(Q1, Q2);
512%% policy has been added or updated
513maybe_notify_of_policy_change({Q1, Q2}, PolicyDef, ActingUser) when ?is_amqqueue(Q1), ?is_amqqueue(Q2) ->
514    rabbit_event:notify(queue_policy_updated, [
515        {name, amqqueue:get_name(Q2)},
516        {vhost, amqqueue:get_vhost(Q2)},
517        {type, amqqueue:get_type(Q2)},
518        {user_who_performed_action, ActingUser} | PolicyDef
519    ]),
520    rabbit_amqqueue:policy_changed(Q1, Q2).
521
522matches_type(exchange, <<"exchanges">>) -> true;
523matches_type(queue,    <<"queues">>)    -> true;
524matches_type(exchange, <<"all">>)       -> true;
525matches_type(queue,    <<"all">>)       -> true;
526matches_type(_,        _)               -> false.
527
528priority_comparator(A, B) -> pget(priority, A) >= pget(priority, B).
529
530is_applicable(#resource{kind = queue} = Resource, Policy) ->
531    rabbit_amqqueue:is_policy_applicable(Resource, rabbit_data_coercion:to_list(Policy));
532is_applicable(_, _) ->
533    true.
534
535%%----------------------------------------------------------------------------
536
537operator_policy_validation() ->
538    [{<<"priority">>,   fun rabbit_parameter_validation:number/2, mandatory},
539     {<<"pattern">>,    fun rabbit_parameter_validation:regex/2,  mandatory},
540     {<<"apply-to">>,   fun apply_to_validation/2,                optional},
541     {<<"definition">>, fun validation_op/2,                      mandatory}].
542
543policy_validation() ->
544    [{<<"priority">>,   fun rabbit_parameter_validation:number/2, mandatory},
545     {<<"pattern">>,    fun rabbit_parameter_validation:regex/2,  mandatory},
546     {<<"apply-to">>,   fun apply_to_validation/2,                optional},
547     {<<"definition">>, fun validation/2,                         mandatory}].
548
549validation_op(Name, Terms) ->
550    validation(Name, Terms, operator_policy_validator).
551
552validation(Name, Terms) ->
553    validation(Name, Terms, policy_validator).
554
555validation(_Name, [], _Validator) ->
556    {error, "no policy provided", []};
557validation(Name, Terms0, Validator) when is_map(Terms0) ->
558    Terms = maps:to_list(Terms0),
559    validation(Name, Terms, Validator);
560validation(_Name, Terms, Validator) when is_list(Terms) ->
561    {Keys, Modules} = lists:unzip(
562                        rabbit_registry:lookup_all(Validator)),
563    [] = dups(Keys), %% ASSERTION
564    Validators = lists:zipwith(fun (M, K) ->  {M, a2b(K)} end, Modules, Keys),
565    case is_proplist(Terms) of
566        true  -> {TermKeys, _} = lists:unzip(Terms),
567                 case dups(TermKeys) of
568                     []   -> validation0(Validators, Terms);
569                     Dup  -> {error, "~p duplicate keys not allowed", [Dup]}
570                 end;
571        false -> {error, "definition must be a dictionary: ~p", [Terms]}
572    end;
573validation(Name, Term, Validator) ->
574    {error, "parse error while reading policy ~s: ~p. Validator: ~p.",
575     [Name, Term, Validator]}.
576
577validation0(Validators, Terms) ->
578    case lists:foldl(
579           fun (Mod, {ok, TermsLeft}) ->
580                   ModKeys = proplists:get_all_values(Mod, Validators),
581                   case [T || {Key, _} = T <- TermsLeft,
582                              lists:member(Key, ModKeys)] of
583                       []    -> {ok, TermsLeft};
584                       Scope -> {Mod:validate_policy(Scope), TermsLeft -- Scope}
585                   end;
586               (_, Acc) ->
587                   Acc
588           end, {ok, Terms}, proplists:get_keys(Validators)) of
589         {ok, []} ->
590             ok;
591         {ok, Unvalidated} ->
592             {error, "~p are not recognised policy settings", [Unvalidated]};
593         {Error, _} ->
594             Error
595    end.
596
597a2b(A) -> list_to_binary(atom_to_list(A)).
598
599dups(L) -> L -- lists:usort(L).
600
601is_proplist(L) -> length(L) =:= length([I || I = {_, _} <- L]).
602
603apply_to_validation(_Name, <<"all">>)       -> ok;
604apply_to_validation(_Name, <<"exchanges">>) -> ok;
605apply_to_validation(_Name, <<"queues">>)    -> ok;
606apply_to_validation(_Name, Term) ->
607    {error, "apply-to '~s' unrecognised; should be 'queues', 'exchanges' "
608     "or 'all'", [Term]}.
609