1%% This Source Code Form is subject to the terms of the Mozilla Public 2%% License, v. 2.0. If a copy of the MPL was not distributed with this 3%% file, You can obtain one at https://mozilla.org/MPL/2.0/. 4%% 5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved. 6%% 7 8-module(rabbit_policy). 9 10%% Policies is a way to apply optional arguments ("x-args") 11%% to exchanges and queues in bulk, using name matching. 12%% 13%% Only one policy can apply to a given queue or exchange 14%% at a time. Priorities help determine what policy should 15%% take precedence. 16%% 17%% Policies build on runtime parameters. Policy-driven parameters 18%% are well known and therefore validated. 19%% 20%% See also: 21%% 22%% * rabbit_runtime_parameters 23%% * rabbit_policies 24%% * rabbit_registry 25 26%% TODO specs 27 28-behaviour(rabbit_runtime_parameter). 29 30-include_lib("rabbit_common/include/rabbit.hrl"). 31-include("amqqueue.hrl"). 32 33-import(rabbit_misc, [pget/2, pget/3]). 34 35-export([register/0]). 36-export([invalidate/0, recover/0]). 37-export([name/1, name_op/1, effective_definition/1, merge_operator_definitions/2, get/2, get_arg/3, set/1]). 38-export([validate/5, notify/5, notify_clear/4]). 39-export([parse_set/7, set/7, delete/3, lookup/2, list/0, list/1, 40 list_formatted/1, list_formatted/3, info_keys/0]). 41-export([parse_set_op/7, set_op/7, delete_op/3, lookup_op/2, list_op/0, list_op/1, list_op/2, 42 list_formatted_op/1, list_formatted_op/3, 43 match_all/2, match_as_map/1, match_op_as_map/1, definition_keys/1, 44 list_in/1, list_in/2, list_as_maps/0, list_as_maps/1, list_op_as_maps/1 45 ]). 46-export([sort_by_priority/1]). 47 48-rabbit_boot_step({?MODULE, 49 [{description, "policy parameters"}, 50 {mfa, {rabbit_policy, register, []}}, 51 {requires, rabbit_registry}, 52 {enables, recovery}]}). 53 54register() -> 55 rabbit_registry:register(runtime_parameter, <<"policy">>, ?MODULE), 56 rabbit_registry:register(runtime_parameter, <<"operator_policy">>, ?MODULE). 57 58name(Q) when ?is_amqqueue(Q) -> 59 Policy = amqqueue:get_policy(Q), 60 name0(Policy); 61name(#exchange{policy = Policy}) -> name0(Policy). 62 63name_op(Q) when ?is_amqqueue(Q) -> 64 OpPolicy = amqqueue:get_operator_policy(Q), 65 name0(OpPolicy); 66name_op(#exchange{operator_policy = Policy}) -> name0(Policy). 67 68name0(undefined) -> none; 69name0(Policy) -> pget(name, Policy). 70 71effective_definition(Q) when ?is_amqqueue(Q) -> 72 Policy = amqqueue:get_policy(Q), 73 OpPolicy = amqqueue:get_operator_policy(Q), 74 merge_operator_definitions(Policy, OpPolicy); 75effective_definition(#exchange{policy = Policy, operator_policy = OpPolicy}) -> 76 merge_operator_definitions(Policy, OpPolicy). 77 78merge_operator_definitions(undefined, undefined) -> undefined; 79merge_operator_definitions(Policy, undefined) -> pget(definition, Policy); 80merge_operator_definitions(undefined, OpPolicy) -> pget(definition, OpPolicy); 81merge_operator_definitions(Policy, OpPolicy) -> 82 OpDefinition = rabbit_data_coercion:to_map(pget(definition, OpPolicy, [])), 83 Definition = rabbit_data_coercion:to_map(pget(definition, Policy, [])), 84 Keys = maps:keys(Definition), 85 OpKeys = maps:keys(OpDefinition), 86 lists:map(fun(Key) -> 87 case {maps:get(Key, Definition, undefined), maps:get(Key, OpDefinition, undefined)} of 88 {Val, undefined} -> {Key, Val}; 89 {undefined, OpVal} -> {Key, OpVal}; 90 {Val, OpVal} -> {Key, merge_policy_value(Key, Val, OpVal)} 91 end 92 end, 93 lists:umerge(Keys, OpKeys)). 94 95set(Q0) when ?is_amqqueue(Q0) -> 96 Name = amqqueue:get_name(Q0), 97 Policy = match(Name), 98 OpPolicy = match_op(Name), 99 Q1 = amqqueue:set_policy(Q0, Policy), 100 Q2 = amqqueue:set_operator_policy(Q1, OpPolicy), 101 Q2; 102set(X = #exchange{name = Name}) -> 103 X#exchange{policy = match(Name), operator_policy = match_op(Name)}. 104 105 106list() -> 107 list('_'). 108 109list(VHost) -> 110 list0(VHost, fun ident/1). 111 112list_in(VHost) -> 113 list(VHost). 114 115list_in(VHost, DefinitionKeys) -> 116 [P || P <- list_in(VHost), keys_overlap(definition_keys(P), DefinitionKeys)]. 117 118list_as_maps() -> 119 list_as_maps('_'). 120 121list_as_maps(VHost) -> 122 [maps:from_list(PL) || PL <- sort_by_priority(list0(VHost, fun maps:from_list/1))]. 123 124list_op_as_maps(VHost) -> 125 [maps:from_list(PL) || PL <- sort_by_priority(list0_op(VHost, fun maps:from_list/1))]. 126 127list_formatted(VHost) -> 128 sort_by_priority(list0(VHost, fun rabbit_json:encode/1)). 129 130list_formatted(VHost, Ref, AggregatorPid) -> 131 rabbit_control_misc:emitting_map(AggregatorPid, Ref, 132 fun(P) -> P end, list_formatted(VHost)). 133 134list_op() -> 135 list_op('_'). 136 137list_op(VHost) -> 138 list0_op(VHost, fun ident/1). 139 140list_op(VHost, DefinitionKeys) -> 141 [P || P <- list_op(VHost), keys_overlap(definition_keys(P), DefinitionKeys)]. 142 143list_formatted_op(VHost) -> 144 sort_by_priority(list0_op(VHost, fun rabbit_json:encode/1)). 145 146list_formatted_op(VHost, Ref, AggregatorPid) -> 147 rabbit_control_misc:emitting_map(AggregatorPid, Ref, 148 fun(P) -> P end, list_formatted_op(VHost)). 149 150match(Name = #resource{virtual_host = VHost}) -> 151 match(Name, list(VHost)). 152 153match_op(Name = #resource{virtual_host = VHost}) -> 154 match(Name, list_op(VHost)). 155 156match_as_map(Name = #resource{virtual_host = VHost}) -> 157 [maps:from_list(PL) || PL <- match(Name, list(VHost))]. 158 159match_op_as_map(Name = #resource{virtual_host = VHost}) -> 160 [maps:from_list(PL) || PL <- match(Name, list_op(VHost))]. 161 162get(Name, Q) when ?is_amqqueue(Q) -> 163 Policy = amqqueue:get_policy(Q), 164 OpPolicy = amqqueue:get_operator_policy(Q), 165 get0(Name, Policy, OpPolicy); 166get(Name, #exchange{policy = Policy, operator_policy = OpPolicy}) -> 167 get0(Name, Policy, OpPolicy); 168 169%% Caution - SLOW. 170get(Name, EntityName = #resource{virtual_host = VHost}) -> 171 get0(Name, 172 match(EntityName, list(VHost)), 173 match(EntityName, list_op(VHost))). 174 175match(Name, Policies) -> 176 case match_all(Name, Policies) of 177 [] -> undefined; 178 [Policy | _] -> Policy 179 end. 180 181match_all(Name, Policies) -> 182 lists:sort(fun priority_comparator/2, [P || P <- Policies, matches(Name, P)]). 183 184matches(#resource{name = Name, kind = Kind, virtual_host = VHost} = Resource, Policy) -> 185 matches_type(Kind, pget('apply-to', Policy)) andalso 186 is_applicable(Resource, pget(definition, Policy)) andalso 187 match =:= re:run(Name, pget(pattern, Policy), [{capture, none}]) andalso 188 VHost =:= pget(vhost, Policy). 189 190get0(_Name, undefined, undefined) -> undefined; 191get0(Name, undefined, OpPolicy) -> pget(Name, pget(definition, OpPolicy, [])); 192get0(Name, Policy, undefined) -> pget(Name, pget(definition, Policy, [])); 193get0(Name, Policy, OpPolicy) -> 194 OpDefinition = pget(definition, OpPolicy, []), 195 Definition = pget(definition, Policy, []), 196 case {pget(Name, Definition), pget(Name, OpDefinition)} of 197 {undefined, undefined} -> undefined; 198 {Val, undefined} -> Val; 199 {undefined, Val} -> Val; 200 {Val, OpVal} -> merge_policy_value(Name, Val, OpVal) 201 end. 202 203merge_policy_value(Name, PolicyVal, OpVal) -> 204 case policy_merge_strategy(Name) of 205 {ok, Module} -> Module:merge_policy_value(Name, PolicyVal, OpVal); 206 {error, not_found} -> rabbit_policies:merge_policy_value(Name, PolicyVal, OpVal) 207 end. 208 209policy_merge_strategy(Name) -> 210 case rabbit_registry:binary_to_type(rabbit_data_coercion:to_binary(Name)) of 211 {error, not_found} -> 212 {error, not_found}; 213 T -> 214 rabbit_registry:lookup_module(policy_merge_strategy, T) 215 end. 216 217%% Many heads for optimisation 218get_arg(_AName, _PName, #exchange{arguments = [], policy = undefined}) -> 219 undefined; 220get_arg(_AName, PName, X = #exchange{arguments = []}) -> 221 get(PName, X); 222get_arg(AName, PName, X = #exchange{arguments = Args}) -> 223 case rabbit_misc:table_lookup(Args, AName) of 224 undefined -> get(PName, X); 225 {_Type, Arg} -> Arg 226 end. 227 228%%---------------------------------------------------------------------------- 229 230%% Gets called during upgrades - therefore must not assume anything about the 231%% state of Mnesia 232invalidate() -> 233 rabbit_file:write_file(invalid_file(), <<"">>). 234 235recover() -> 236 case rabbit_file:is_file(invalid_file()) of 237 true -> recover0(), 238 rabbit_file:delete(invalid_file()); 239 false -> ok 240 end. 241 242%% To get here we have to have just completed an Mnesia upgrade - i.e. we are 243%% the first node starting. So we can rewrite the whole database. Note that 244%% recovery has not yet happened; we must work with the rabbit_durable_<thing> 245%% variants. 246recover0() -> 247 Xs = mnesia:dirty_match_object(rabbit_durable_exchange, #exchange{_ = '_'}), 248 Qs = rabbit_amqqueue:list_with_possible_retry( 249 fun() -> 250 mnesia:dirty_match_object( 251 rabbit_durable_queue, amqqueue:pattern_match_all()) 252 end), 253 Policies = list(), 254 OpPolicies = list_op(), 255 [rabbit_misc:execute_mnesia_transaction( 256 fun () -> 257 mnesia:write( 258 rabbit_durable_exchange, 259 rabbit_exchange_decorator:set( 260 X#exchange{policy = match(Name, Policies), 261 operator_policy = match(Name, OpPolicies)}), 262 write) 263 end) || X = #exchange{name = Name} <- Xs], 264 [begin 265 QName = amqqueue:get_name(Q0), 266 Policy1 = match(QName, Policies), 267 Q1 = amqqueue:set_policy(Q0, Policy1), 268 OpPolicy1 = match(QName, OpPolicies), 269 Q2 = amqqueue:set_operator_policy(Q1, OpPolicy1), 270 Q3 = rabbit_queue_decorator:set(Q2), 271 ?try_mnesia_tx_or_upgrade_amqqueue_and_retry( 272 rabbit_misc:execute_mnesia_transaction( 273 fun () -> 274 mnesia:write(rabbit_durable_queue, Q3, write) 275 end), 276 begin 277 Q4 = amqqueue:upgrade(Q3), 278 rabbit_misc:execute_mnesia_transaction( 279 fun () -> 280 mnesia:write(rabbit_durable_queue, Q4, write) 281 end) 282 end) 283 end || Q0 <- Qs], 284 ok. 285 286invalid_file() -> 287 filename:join(rabbit_mnesia:dir(), "policies_are_invalid"). 288 289%%---------------------------------------------------------------------------- 290 291parse_set_op(VHost, Name, Pattern, Definition, Priority, ApplyTo, ActingUser) -> 292 parse_set(<<"operator_policy">>, VHost, Name, Pattern, Definition, Priority, 293 ApplyTo, ActingUser). 294 295parse_set(VHost, Name, Pattern, Definition, Priority, ApplyTo, ActingUser) -> 296 parse_set(<<"policy">>, VHost, Name, Pattern, Definition, Priority, ApplyTo, 297 ActingUser). 298 299parse_set(Type, VHost, Name, Pattern, Definition, Priority, ApplyTo, ActingUser) -> 300 try rabbit_data_coercion:to_integer(Priority) of 301 Num -> parse_set0(Type, VHost, Name, Pattern, Definition, Num, ApplyTo, 302 ActingUser) 303 catch 304 error:badarg -> {error, "~p priority must be a number", [Priority]} 305 end. 306 307parse_set0(Type, VHost, Name, Pattern, Defn, Priority, ApplyTo, ActingUser) -> 308 case rabbit_json:try_decode(Defn) of 309 {ok, Term} -> 310 R = set0(Type, VHost, Name, 311 [{<<"pattern">>, Pattern}, 312 {<<"definition">>, maps:to_list(Term)}, 313 {<<"priority">>, Priority}, 314 {<<"apply-to">>, ApplyTo}], 315 ActingUser), 316 rabbit_log:info("Successfully set policy '~s' matching ~s names in virtual host '~s' using pattern '~s'", 317 [Name, ApplyTo, VHost, Pattern]), 318 R; 319 {error, Reason} -> 320 {error_string, 321 rabbit_misc:format("JSON decoding error. Reason: ~ts", [Reason])} 322 end. 323 324set_op(VHost, Name, Pattern, Definition, Priority, ApplyTo, ActingUser) -> 325 set(<<"operator_policy">>, VHost, Name, Pattern, Definition, Priority, ApplyTo, ActingUser). 326 327set(VHost, Name, Pattern, Definition, Priority, ApplyTo, ActingUser) -> 328 set(<<"policy">>, VHost, Name, Pattern, Definition, Priority, ApplyTo, ActingUser). 329 330set(Type, VHost, Name, Pattern, Definition, Priority, ApplyTo, ActingUser) -> 331 PolicyProps = [{<<"pattern">>, Pattern}, 332 {<<"definition">>, Definition}, 333 {<<"priority">>, case Priority of 334 undefined -> 0; 335 _ -> Priority 336 end}, 337 {<<"apply-to">>, case ApplyTo of 338 undefined -> <<"all">>; 339 _ -> ApplyTo 340 end}], 341 set0(Type, VHost, Name, PolicyProps, ActingUser). 342 343set0(Type, VHost, Name, Term, ActingUser) -> 344 rabbit_runtime_parameters:set_any(VHost, Type, Name, Term, ActingUser). 345 346delete_op(VHost, Name, ActingUser) -> 347 rabbit_runtime_parameters:clear_any(VHost, <<"operator_policy">>, Name, ActingUser). 348 349delete(VHost, Name, ActingUser) -> 350 rabbit_runtime_parameters:clear_any(VHost, <<"policy">>, Name, ActingUser). 351 352lookup_op(VHost, Name) -> 353 case rabbit_runtime_parameters:lookup(VHost, <<"operator_policy">>, Name) of 354 not_found -> not_found; 355 P -> p(P, fun ident/1) 356 end. 357 358lookup(VHost, Name) -> 359 case rabbit_runtime_parameters:lookup(VHost, <<"policy">>, Name) of 360 not_found -> not_found; 361 P -> p(P, fun ident/1) 362 end. 363 364list0_op(VHost, DefnFun) -> 365 [p(P, DefnFun) 366 || P <- rabbit_runtime_parameters:list(VHost, <<"operator_policy">>)]. 367 368list0(VHost, DefnFun) -> 369 [p(P, DefnFun) || P <- rabbit_runtime_parameters:list(VHost, <<"policy">>)]. 370 371sort_by_priority(PropList) -> 372 lists:sort(fun (A, B) -> not priority_comparator(A, B) end, PropList). 373 374p(Parameter, DefnFun) -> 375 Value = pget(value, Parameter), 376 [{vhost, pget(vhost, Parameter)}, 377 {name, pget(name, Parameter)}, 378 {pattern, pget(<<"pattern">>, Value)}, 379 {'apply-to', pget(<<"apply-to">>, Value)}, 380 {definition, DefnFun(pget(<<"definition">>, Value))}, 381 {priority, pget(<<"priority">>, Value)}]. 382 383ident(X) -> X. 384 385info_keys() -> [vhost, name, 'apply-to', pattern, definition, priority]. 386 387definition_keys(Policy) -> 388 case rabbit_data_coercion:to_map(Policy) of 389 #{definition := Def} -> 390 maps:keys(rabbit_data_coercion:to_map(Def)); 391 _ -> [] 392 end. 393 394keys_overlap(A, B) -> 395 lists:any(fun(Item) -> lists:member(Item, B) end, A). 396 397%%---------------------------------------------------------------------------- 398 399validate(_VHost, <<"policy">>, Name, Term, _User) -> 400 rabbit_parameter_validation:proplist( 401 Name, policy_validation(), Term); 402validate(_VHost, <<"operator_policy">>, Name, Term, _User) -> 403 rabbit_parameter_validation:proplist( 404 Name, operator_policy_validation(), Term). 405 406notify(VHost, <<"policy">>, Name, Term0, ActingUser) -> 407 Term = rabbit_data_coercion:atomize_keys(Term0), 408 update_matched_objects(VHost, Term, ActingUser), 409 rabbit_event:notify(policy_set, [{name, Name}, {vhost, VHost}, 410 {user_who_performed_action, ActingUser} | Term]); 411notify(VHost, <<"operator_policy">>, Name, Term0, ActingUser) -> 412 Term = rabbit_data_coercion:atomize_keys(Term0), 413 update_matched_objects(VHost, Term, ActingUser), 414 rabbit_event:notify(policy_set, [{name, Name}, {vhost, VHost}, 415 {user_who_performed_action, ActingUser} | Term]). 416 417notify_clear(VHost, <<"policy">>, Name, ActingUser) -> 418 update_matched_objects(VHost, undefined, ActingUser), 419 rabbit_event:notify(policy_cleared, [{name, Name}, {vhost, VHost}, 420 {user_who_performed_action, ActingUser}]); 421notify_clear(VHost, <<"operator_policy">>, Name, ActingUser) -> 422 update_matched_objects(VHost, undefined, ActingUser), 423 rabbit_event:notify(operator_policy_cleared, 424 [{name, Name}, {vhost, VHost}, 425 {user_who_performed_action, ActingUser}]). 426 427%%---------------------------------------------------------------------------- 428 429%% [1] We need to prevent this from becoming O(n^2) in a similar 430%% manner to rabbit_binding:remove_for_{source,destination}. So see 431%% the comment in rabbit_binding:lock_route_tables/0 for more rationale. 432%% [2] We could be here in a post-tx fun after the vhost has been 433%% deleted; in which case it's fine to do nothing. 434update_matched_objects(VHost, PolicyDef, ActingUser) -> 435 Tabs = [rabbit_queue, rabbit_durable_queue, 436 rabbit_exchange, rabbit_durable_exchange], 437 {XUpdateResults, QUpdateResults} = rabbit_misc:execute_mnesia_transaction( 438 fun() -> 439 [mnesia:lock({table, T}, write) || T <- Tabs], %% [1] 440 case catch {list(VHost), list_op(VHost)} of 441 {'EXIT', {throw, {error, {no_such_vhost, _}}}} -> 442 {[], []}; %% [2] 443 {'EXIT', Exit} -> 444 exit(Exit); 445 {Policies, OpPolicies} -> 446 {[update_exchange(X, Policies, OpPolicies) || 447 X <- rabbit_exchange:list(VHost)], 448 [update_queue(Q, Policies, OpPolicies) || 449 Q <- rabbit_amqqueue:list(VHost)]} 450 end 451 end), 452 [catch maybe_notify_of_policy_change(XRes, PolicyDef, ActingUser) || XRes <- XUpdateResults], 453 [catch maybe_notify_of_policy_change(QRes, PolicyDef, ActingUser) || QRes <- QUpdateResults], 454 ok. 455 456update_exchange(X = #exchange{name = XName, 457 policy = OldPolicy, 458 operator_policy = OldOpPolicy}, 459 Policies, OpPolicies) -> 460 case {match(XName, Policies), match(XName, OpPolicies)} of 461 {OldPolicy, OldOpPolicy} -> no_change; 462 {NewPolicy, NewOpPolicy} -> 463 NewExchange = rabbit_exchange:update( 464 XName, 465 fun(X0) -> 466 rabbit_exchange_decorator:set( 467 X0 #exchange{policy = NewPolicy, 468 operator_policy = NewOpPolicy}) 469 end), 470 case NewExchange of 471 #exchange{} = X1 -> {X, X1}; 472 not_found -> {X, X } 473 end 474 end. 475 476update_queue(Q0, Policies, OpPolicies) when ?is_amqqueue(Q0) -> 477 QName = amqqueue:get_name(Q0), 478 OldPolicy = amqqueue:get_policy(Q0), 479 OldOpPolicy = amqqueue:get_operator_policy(Q0), 480 case {match(QName, Policies), match(QName, OpPolicies)} of 481 {OldPolicy, OldOpPolicy} -> no_change; 482 {NewPolicy, NewOpPolicy} -> 483 F = fun (QFun0) -> 484 QFun1 = amqqueue:set_policy(QFun0, NewPolicy), 485 QFun2 = amqqueue:set_operator_policy(QFun1, NewOpPolicy), 486 NewPolicyVersion = amqqueue:get_policy_version(QFun2) + 1, 487 QFun3 = amqqueue:set_policy_version(QFun2, NewPolicyVersion), 488 rabbit_queue_decorator:set(QFun3) 489 end, 490 NewQueue = rabbit_amqqueue:update(QName, F), 491 case NewQueue of 492 Q1 when ?is_amqqueue(Q1) -> 493 {Q0, Q1}; 494 not_found -> 495 {Q0, Q0} 496 end 497 end. 498 499maybe_notify_of_policy_change(no_change, _PolicyDef, _ActingUser)-> 500 ok; 501maybe_notify_of_policy_change({X1 = #exchange{}, X2 = #exchange{}}, _PolicyDef, _ActingUser) -> 502 rabbit_exchange:policy_changed(X1, X2); 503%% policy has been cleared 504maybe_notify_of_policy_change({Q1, Q2}, undefined, ActingUser) when ?is_amqqueue(Q1), ?is_amqqueue(Q2) -> 505 rabbit_event:notify(queue_policy_cleared, [ 506 {name, amqqueue:get_name(Q2)}, 507 {vhost, amqqueue:get_vhost(Q2)}, 508 {type, amqqueue:get_type(Q2)}, 509 {user_who_performed_action, ActingUser} 510 ]), 511 rabbit_amqqueue:policy_changed(Q1, Q2); 512%% policy has been added or updated 513maybe_notify_of_policy_change({Q1, Q2}, PolicyDef, ActingUser) when ?is_amqqueue(Q1), ?is_amqqueue(Q2) -> 514 rabbit_event:notify(queue_policy_updated, [ 515 {name, amqqueue:get_name(Q2)}, 516 {vhost, amqqueue:get_vhost(Q2)}, 517 {type, amqqueue:get_type(Q2)}, 518 {user_who_performed_action, ActingUser} | PolicyDef 519 ]), 520 rabbit_amqqueue:policy_changed(Q1, Q2). 521 522matches_type(exchange, <<"exchanges">>) -> true; 523matches_type(queue, <<"queues">>) -> true; 524matches_type(exchange, <<"all">>) -> true; 525matches_type(queue, <<"all">>) -> true; 526matches_type(_, _) -> false. 527 528priority_comparator(A, B) -> pget(priority, A) >= pget(priority, B). 529 530is_applicable(#resource{kind = queue} = Resource, Policy) -> 531 rabbit_amqqueue:is_policy_applicable(Resource, rabbit_data_coercion:to_list(Policy)); 532is_applicable(_, _) -> 533 true. 534 535%%---------------------------------------------------------------------------- 536 537operator_policy_validation() -> 538 [{<<"priority">>, fun rabbit_parameter_validation:number/2, mandatory}, 539 {<<"pattern">>, fun rabbit_parameter_validation:regex/2, mandatory}, 540 {<<"apply-to">>, fun apply_to_validation/2, optional}, 541 {<<"definition">>, fun validation_op/2, mandatory}]. 542 543policy_validation() -> 544 [{<<"priority">>, fun rabbit_parameter_validation:number/2, mandatory}, 545 {<<"pattern">>, fun rabbit_parameter_validation:regex/2, mandatory}, 546 {<<"apply-to">>, fun apply_to_validation/2, optional}, 547 {<<"definition">>, fun validation/2, mandatory}]. 548 549validation_op(Name, Terms) -> 550 validation(Name, Terms, operator_policy_validator). 551 552validation(Name, Terms) -> 553 validation(Name, Terms, policy_validator). 554 555validation(_Name, [], _Validator) -> 556 {error, "no policy provided", []}; 557validation(Name, Terms0, Validator) when is_map(Terms0) -> 558 Terms = maps:to_list(Terms0), 559 validation(Name, Terms, Validator); 560validation(_Name, Terms, Validator) when is_list(Terms) -> 561 {Keys, Modules} = lists:unzip( 562 rabbit_registry:lookup_all(Validator)), 563 [] = dups(Keys), %% ASSERTION 564 Validators = lists:zipwith(fun (M, K) -> {M, a2b(K)} end, Modules, Keys), 565 case is_proplist(Terms) of 566 true -> {TermKeys, _} = lists:unzip(Terms), 567 case dups(TermKeys) of 568 [] -> validation0(Validators, Terms); 569 Dup -> {error, "~p duplicate keys not allowed", [Dup]} 570 end; 571 false -> {error, "definition must be a dictionary: ~p", [Terms]} 572 end; 573validation(Name, Term, Validator) -> 574 {error, "parse error while reading policy ~s: ~p. Validator: ~p.", 575 [Name, Term, Validator]}. 576 577validation0(Validators, Terms) -> 578 case lists:foldl( 579 fun (Mod, {ok, TermsLeft}) -> 580 ModKeys = proplists:get_all_values(Mod, Validators), 581 case [T || {Key, _} = T <- TermsLeft, 582 lists:member(Key, ModKeys)] of 583 [] -> {ok, TermsLeft}; 584 Scope -> {Mod:validate_policy(Scope), TermsLeft -- Scope} 585 end; 586 (_, Acc) -> 587 Acc 588 end, {ok, Terms}, proplists:get_keys(Validators)) of 589 {ok, []} -> 590 ok; 591 {ok, Unvalidated} -> 592 {error, "~p are not recognised policy settings", [Unvalidated]}; 593 {Error, _} -> 594 Error 595 end. 596 597a2b(A) -> list_to_binary(atom_to_list(A)). 598 599dups(L) -> L -- lists:usort(L). 600 601is_proplist(L) -> length(L) =:= length([I || I = {_, _} <- L]). 602 603apply_to_validation(_Name, <<"all">>) -> ok; 604apply_to_validation(_Name, <<"exchanges">>) -> ok; 605apply_to_validation(_Name, <<"queues">>) -> ok; 606apply_to_validation(_Name, Term) -> 607 {error, "apply-to '~s' unrecognised; should be 'queues', 'exchanges' " 608 "or 'all'", [Term]}. 609