1%% This Source Code Form is subject to the terms of the Mozilla Public 2%% License, v. 2.0. If a copy of the MPL was not distributed with this 3%% file, You can obtain one at https://mozilla.org/MPL/2.0/. 4%% 5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved. 6%% 7 8-module(rabbit_exchange). 9-include_lib("rabbit_common/include/rabbit.hrl"). 10-include_lib("rabbit_common/include/rabbit_framing.hrl"). 11 12-export([recover/1, policy_changed/2, callback/4, declare/7, 13 assert_equivalence/6, assert_args_equivalence/2, check_type/1, 14 lookup/1, lookup_many/1, lookup_or_die/1, list/0, list/1, lookup_scratch/2, 15 update_scratch/3, update_decorators/1, immutable/1, 16 info_keys/0, info/1, info/2, info_all/1, info_all/2, info_all/4, 17 route/2, delete/3, validate_binding/2, count/0]). 18-export([list_names/0, is_amq_prefixed/1]). 19%% these must be run inside a mnesia tx 20-export([maybe_auto_delete/2, serial/1, peek_serial/1, update/2]). 21 22%%---------------------------------------------------------------------------- 23 24-export_type([name/0, type/0]). 25 26-type name() :: rabbit_types:r('exchange'). 27-type type() :: atom(). 28-type fun_name() :: atom(). 29 30%%---------------------------------------------------------------------------- 31 32-define(INFO_KEYS, [name, type, durable, auto_delete, internal, arguments, 33 policy, user_who_performed_action]). 34 35-spec recover(rabbit_types:vhost()) -> [name()]. 36 37recover(VHost) -> 38 Xs = rabbit_misc:table_filter( 39 fun (#exchange{name = XName}) -> 40 XName#resource.virtual_host =:= VHost andalso 41 mnesia:read({rabbit_exchange, XName}) =:= [] 42 end, 43 fun (X, Tx) -> 44 X1 = case Tx of 45 true -> store_ram(X); 46 false -> rabbit_exchange_decorator:set(X) 47 end, 48 callback(X1, create, map_create_tx(Tx), [X1]) 49 end, 50 rabbit_durable_exchange), 51 [XName || #exchange{name = XName} <- Xs]. 52 53-spec callback 54 (rabbit_types:exchange(), fun_name(), 55 fun((boolean()) -> non_neg_integer()) | atom(), [any()]) -> 'ok'. 56 57callback(X = #exchange{type = XType, 58 decorators = Decorators}, Fun, Serial0, Args) -> 59 Serial = if is_function(Serial0) -> Serial0; 60 is_atom(Serial0) -> fun (_Bool) -> Serial0 end 61 end, 62 [ok = apply(M, Fun, [Serial(M:serialise_events(X)) | Args]) || 63 M <- rabbit_exchange_decorator:select(all, Decorators)], 64 Module = type_to_module(XType), 65 apply(Module, Fun, [Serial(Module:serialise_events()) | Args]). 66 67-spec policy_changed 68 (rabbit_types:exchange(), rabbit_types:exchange()) -> 'ok'. 69 70policy_changed(X = #exchange{type = XType, 71 decorators = Decorators}, 72 X1 = #exchange{decorators = Decorators1}) -> 73 D = rabbit_exchange_decorator:select(all, Decorators), 74 D1 = rabbit_exchange_decorator:select(all, Decorators1), 75 DAll = lists:usort(D ++ D1), 76 [ok = M:policy_changed(X, X1) || M <- [type_to_module(XType) | DAll]], 77 ok. 78 79serialise_events(X = #exchange{type = Type, decorators = Decorators}) -> 80 lists:any(fun (M) -> M:serialise_events(X) end, 81 rabbit_exchange_decorator:select(all, Decorators)) 82 orelse (type_to_module(Type)):serialise_events(). 83 84-spec serial(rabbit_types:exchange()) -> 85 fun((boolean()) -> 'none' | pos_integer()). 86 87serial(#exchange{name = XName} = X) -> 88 Serial = case serialise_events(X) of 89 true -> next_serial(XName); 90 false -> none 91 end, 92 fun (true) -> Serial; 93 (false) -> none 94 end. 95 96-spec is_amq_prefixed(rabbit_types:exchange() | binary()) -> boolean(). 97 98is_amq_prefixed(Name) when is_binary(Name) -> 99 case re:run(Name, <<"^amq\.">>) of 100 nomatch -> false; 101 {match, _} -> true 102 end; 103is_amq_prefixed(#exchange{name = #resource{name = <<>>}}) -> 104 false; 105is_amq_prefixed(#exchange{name = #resource{name = Name}}) -> 106 is_amq_prefixed(Name). 107 108-spec declare 109 (name(), type(), boolean(), boolean(), boolean(), 110 rabbit_framing:amqp_table(), rabbit_types:username()) 111 -> rabbit_types:exchange(). 112 113declare(XName, Type, Durable, AutoDelete, Internal, Args, Username) -> 114 X = rabbit_exchange_decorator:set( 115 rabbit_policy:set(#exchange{name = XName, 116 type = Type, 117 durable = Durable, 118 auto_delete = AutoDelete, 119 internal = Internal, 120 arguments = Args, 121 options = #{user => Username}})), 122 XT = type_to_module(Type), 123 %% We want to upset things if it isn't ok 124 ok = XT:validate(X), 125 %% Avoid a channel exception if there's a race condition 126 %% with an exchange.delete operation. 127 %% 128 %% See rabbitmq/rabbitmq-federation#7. 129 case rabbit_runtime_parameters:lookup(XName#resource.virtual_host, 130 ?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT, 131 XName#resource.name) of 132 not_found -> 133 rabbit_misc:execute_mnesia_transaction( 134 fun () -> 135 case mnesia:wread({rabbit_exchange, XName}) of 136 [] -> 137 {new, store(X)}; 138 [ExistingX] -> 139 {existing, ExistingX} 140 end 141 end, 142 fun ({new, Exchange}, Tx) -> 143 ok = callback(X, create, map_create_tx(Tx), [Exchange]), 144 rabbit_event:notify_if(not Tx, exchange_created, info(Exchange)), 145 Exchange; 146 ({existing, Exchange}, _Tx) -> 147 Exchange; 148 (Err, _Tx) -> 149 Err 150 end); 151 _ -> 152 rabbit_log:warning("ignoring exchange.declare for exchange ~p, 153 exchange.delete in progress~n.", [XName]), 154 X 155 end. 156 157map_create_tx(true) -> transaction; 158map_create_tx(false) -> none. 159 160 161store(X = #exchange{durable = true}) -> 162 mnesia:write(rabbit_durable_exchange, X#exchange{decorators = undefined}, 163 write), 164 store_ram(X); 165store(X = #exchange{durable = false}) -> 166 store_ram(X). 167 168store_ram(X) -> 169 X1 = rabbit_exchange_decorator:set(X), 170 ok = mnesia:write(rabbit_exchange, rabbit_exchange_decorator:set(X1), 171 write), 172 X1. 173 174%% Used with binaries sent over the wire; the type may not exist. 175 176-spec check_type 177 (binary()) -> atom() | rabbit_types:connection_exit(). 178 179check_type(TypeBin) -> 180 case rabbit_registry:binary_to_type(rabbit_data_coercion:to_binary(TypeBin)) of 181 {error, not_found} -> 182 rabbit_misc:protocol_error( 183 command_invalid, "unknown exchange type '~s'", [TypeBin]); 184 T -> 185 case rabbit_registry:lookup_module(exchange, T) of 186 {error, not_found} -> rabbit_misc:protocol_error( 187 command_invalid, 188 "invalid exchange type '~s'", [T]); 189 {ok, _Module} -> T 190 end 191 end. 192 193-spec assert_equivalence 194 (rabbit_types:exchange(), atom(), boolean(), boolean(), boolean(), 195 rabbit_framing:amqp_table()) 196 -> 'ok' | rabbit_types:connection_exit(). 197 198assert_equivalence(X = #exchange{ name = XName, 199 durable = Durable, 200 auto_delete = AutoDelete, 201 internal = Internal, 202 type = Type}, 203 ReqType, ReqDurable, ReqAutoDelete, ReqInternal, ReqArgs) -> 204 AFE = fun rabbit_misc:assert_field_equivalence/4, 205 AFE(Type, ReqType, XName, type), 206 AFE(Durable, ReqDurable, XName, durable), 207 AFE(AutoDelete, ReqAutoDelete, XName, auto_delete), 208 AFE(Internal, ReqInternal, XName, internal), 209 (type_to_module(Type)):assert_args_equivalence(X, ReqArgs). 210 211-spec assert_args_equivalence 212 (rabbit_types:exchange(), rabbit_framing:amqp_table()) 213 -> 'ok' | rabbit_types:connection_exit(). 214 215assert_args_equivalence(#exchange{ name = Name, arguments = Args }, 216 RequiredArgs) -> 217 %% The spec says "Arguments are compared for semantic 218 %% equivalence". The only arg we care about is 219 %% "alternate-exchange". 220 rabbit_misc:assert_args_equivalence(Args, RequiredArgs, Name, 221 [<<"alternate-exchange">>]). 222 223-spec lookup 224 (name()) -> rabbit_types:ok(rabbit_types:exchange()) | 225 rabbit_types:error('not_found'). 226 227lookup(Name) -> 228 rabbit_misc:dirty_read({rabbit_exchange, Name}). 229 230 231-spec lookup_many([name()]) -> [rabbit_types:exchange()]. 232 233lookup_many([]) -> []; 234lookup_many([Name]) -> ets:lookup(rabbit_exchange, Name); 235lookup_many(Names) when is_list(Names) -> 236 %% Normally we'd call mnesia:dirty_read/1 here, but that is quite 237 %% expensive for reasons explained in rabbit_misc:dirty_read/1. 238 lists:append([ets:lookup(rabbit_exchange, Name) || Name <- Names]). 239 240 241-spec lookup_or_die 242 (name()) -> rabbit_types:exchange() | 243 rabbit_types:channel_exit(). 244 245lookup_or_die(Name) -> 246 case lookup(Name) of 247 {ok, X} -> X; 248 {error, not_found} -> rabbit_amqqueue:not_found(Name) 249 end. 250 251-spec list() -> [rabbit_types:exchange()]. 252 253list() -> mnesia:dirty_match_object(rabbit_exchange, #exchange{_ = '_'}). 254 255-spec count() -> non_neg_integer(). 256 257count() -> 258 mnesia:table_info(rabbit_exchange, size). 259 260-spec list_names() -> [rabbit_exchange:name()]. 261 262list_names() -> mnesia:dirty_all_keys(rabbit_exchange). 263 264%% Not dirty_match_object since that would not be transactional when used in a 265%% tx context 266 267-spec list(rabbit_types:vhost()) -> [rabbit_types:exchange()]. 268 269list(VHostPath) -> 270 mnesia:async_dirty( 271 fun () -> 272 mnesia:match_object( 273 rabbit_exchange, 274 #exchange{name = rabbit_misc:r(VHostPath, exchange), _ = '_'}, 275 read) 276 end). 277 278-spec lookup_scratch(name(), atom()) -> 279 rabbit_types:ok(term()) | 280 rabbit_types:error('not_found'). 281 282lookup_scratch(Name, App) -> 283 case lookup(Name) of 284 {ok, #exchange{scratches = undefined}} -> 285 {error, not_found}; 286 {ok, #exchange{scratches = Scratches}} -> 287 case orddict:find(App, Scratches) of 288 {ok, Value} -> {ok, Value}; 289 error -> {error, not_found} 290 end; 291 {error, not_found} -> 292 {error, not_found} 293 end. 294 295-spec update_scratch(name(), atom(), fun((any()) -> any())) -> 'ok'. 296 297update_scratch(Name, App, Fun) -> 298 rabbit_misc:execute_mnesia_transaction( 299 fun() -> 300 update(Name, 301 fun(X = #exchange{scratches = Scratches0}) -> 302 Scratches1 = case Scratches0 of 303 undefined -> orddict:new(); 304 _ -> Scratches0 305 end, 306 Scratch = case orddict:find(App, Scratches1) of 307 {ok, S} -> S; 308 error -> undefined 309 end, 310 Scratches2 = orddict:store( 311 App, Fun(Scratch), Scratches1), 312 X#exchange{scratches = Scratches2} 313 end), 314 ok 315 end). 316 317-spec update_decorators(name()) -> 'ok'. 318 319update_decorators(Name) -> 320 rabbit_misc:execute_mnesia_transaction( 321 fun() -> 322 case mnesia:wread({rabbit_exchange, Name}) of 323 [X] -> store_ram(X), 324 ok; 325 [] -> ok 326 end 327 end). 328 329-spec update 330 (name(), 331 fun((rabbit_types:exchange()) -> rabbit_types:exchange())) 332 -> not_found | rabbit_types:exchange(). 333 334update(Name, Fun) -> 335 case mnesia:wread({rabbit_exchange, Name}) of 336 [X] -> X1 = Fun(X), 337 store(X1); 338 [] -> not_found 339 end. 340 341-spec immutable(rabbit_types:exchange()) -> rabbit_types:exchange(). 342 343immutable(X) -> X#exchange{scratches = none, 344 policy = none, 345 decorators = none}. 346 347-spec info_keys() -> rabbit_types:info_keys(). 348 349info_keys() -> ?INFO_KEYS. 350 351map(VHostPath, F) -> 352 %% TODO: there is scope for optimisation here, e.g. using a 353 %% cursor, parallelising the function invocation 354 lists:map(F, list(VHostPath)). 355 356infos(Items, X) -> [{Item, i(Item, X)} || Item <- Items]. 357 358i(name, #exchange{name = Name}) -> Name; 359i(type, #exchange{type = Type}) -> Type; 360i(durable, #exchange{durable = Durable}) -> Durable; 361i(auto_delete, #exchange{auto_delete = AutoDelete}) -> AutoDelete; 362i(internal, #exchange{internal = Internal}) -> Internal; 363i(arguments, #exchange{arguments = Arguments}) -> Arguments; 364i(policy, X) -> case rabbit_policy:name(X) of 365 none -> ''; 366 Policy -> Policy 367 end; 368i(user_who_performed_action, #exchange{options = Opts}) -> 369 maps:get(user, Opts, ?UNKNOWN_USER); 370i(Item, #exchange{type = Type} = X) -> 371 case (type_to_module(Type)):info(X, [Item]) of 372 [{Item, I}] -> I; 373 [] -> throw({bad_argument, Item}) 374 end. 375 376-spec info(rabbit_types:exchange()) -> rabbit_types:infos(). 377 378info(X = #exchange{type = Type}) -> 379 infos(?INFO_KEYS, X) ++ (type_to_module(Type)):info(X). 380 381-spec info 382 (rabbit_types:exchange(), rabbit_types:info_keys()) 383 -> rabbit_types:infos(). 384 385info(X = #exchange{type = _Type}, Items) -> 386 infos(Items, X). 387 388-spec info_all(rabbit_types:vhost()) -> [rabbit_types:infos()]. 389 390info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end). 391 392-spec info_all(rabbit_types:vhost(), rabbit_types:info_keys()) 393 -> [rabbit_types:infos()]. 394 395info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end). 396 397-spec info_all(rabbit_types:vhost(), rabbit_types:info_keys(), 398 reference(), pid()) 399 -> 'ok'. 400 401info_all(VHostPath, Items, Ref, AggregatorPid) -> 402 rabbit_control_misc:emitting_map( 403 AggregatorPid, Ref, fun(X) -> info(X, Items) end, list(VHostPath)). 404 405-spec route(rabbit_types:exchange(), rabbit_types:delivery()) 406 -> [rabbit_amqqueue:name()]. 407 408route(#exchange{name = #resource{virtual_host = VHost, name = RName} = XName, 409 decorators = Decorators} = X, 410 #delivery{message = #basic_message{routing_keys = RKs}} = Delivery) -> 411 case RName of 412 <<>> -> 413 RKsSorted = lists:usort(RKs), 414 [rabbit_channel:deliver_reply(RK, Delivery) || 415 RK <- RKsSorted, virtual_reply_queue(RK)], 416 [rabbit_misc:r(VHost, queue, RK) || RK <- RKsSorted, 417 not virtual_reply_queue(RK)]; 418 _ -> 419 Decs = rabbit_exchange_decorator:select(route, Decorators), 420 lists:usort(route1(Delivery, Decs, {[X], XName, []})) 421 end. 422 423virtual_reply_queue(<<"amq.rabbitmq.reply-to.", _/binary>>) -> true; 424virtual_reply_queue(_) -> false. 425 426route1(_, _, {[], _, QNames}) -> 427 QNames; 428route1(Delivery, Decorators, 429 {[X = #exchange{type = Type} | WorkList], SeenXs, QNames}) -> 430 ExchangeDests = (type_to_module(Type)):route(X, Delivery), 431 DecorateDests = process_decorators(X, Decorators, Delivery), 432 AlternateDests = process_alternate(X, ExchangeDests), 433 route1(Delivery, Decorators, 434 lists:foldl(fun process_route/2, {WorkList, SeenXs, QNames}, 435 AlternateDests ++ DecorateDests ++ ExchangeDests)). 436 437process_alternate(X = #exchange{name = XName}, []) -> 438 case rabbit_policy:get_arg( 439 <<"alternate-exchange">>, <<"alternate-exchange">>, X) of 440 undefined -> []; 441 AName -> [rabbit_misc:r(XName, exchange, AName)] 442 end; 443process_alternate(_X, _Results) -> 444 []. 445 446process_decorators(_, [], _) -> %% optimisation 447 []; 448process_decorators(X, Decorators, Delivery) -> 449 lists:append([Decorator:route(X, Delivery) || Decorator <- Decorators]). 450 451process_route(#resource{kind = exchange} = XName, 452 {_WorkList, XName, _QNames} = Acc) -> 453 Acc; 454process_route(#resource{kind = exchange} = XName, 455 {WorkList, #resource{kind = exchange} = SeenX, QNames}) -> 456 {cons_if_present(XName, WorkList), 457 gb_sets:from_list([SeenX, XName]), QNames}; 458process_route(#resource{kind = exchange} = XName, 459 {WorkList, SeenXs, QNames} = Acc) -> 460 case gb_sets:is_element(XName, SeenXs) of 461 true -> Acc; 462 false -> {cons_if_present(XName, WorkList), 463 gb_sets:add_element(XName, SeenXs), QNames} 464 end; 465process_route(#resource{kind = queue} = QName, 466 {WorkList, SeenXs, QNames}) -> 467 {WorkList, SeenXs, [QName | QNames]}. 468 469cons_if_present(XName, L) -> 470 case lookup(XName) of 471 {ok, X} -> [X | L]; 472 {error, not_found} -> L 473 end. 474 475call_with_exchange(XName, Fun) -> 476 rabbit_misc:execute_mnesia_tx_with_tail( 477 fun () -> case mnesia:read({rabbit_exchange, XName}) of 478 [] -> rabbit_misc:const({error, not_found}); 479 [X] -> Fun(X) 480 end 481 end). 482 483-spec delete 484 (name(), 'true', rabbit_types:username()) -> 485 'ok'| rabbit_types:error('not_found' | 'in_use'); 486 (name(), 'false', rabbit_types:username()) -> 487 'ok' | rabbit_types:error('not_found'). 488 489delete(XName, IfUnused, Username) -> 490 Fun = case IfUnused of 491 true -> fun conditional_delete/2; 492 false -> fun unconditional_delete/2 493 end, 494 try 495 %% guard exchange.declare operations from failing when there's 496 %% a race condition between it and an exchange.delete. 497 %% 498 %% see rabbitmq/rabbitmq-federation#7 499 rabbit_runtime_parameters:set(XName#resource.virtual_host, 500 ?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT, 501 XName#resource.name, true, Username), 502 call_with_exchange( 503 XName, 504 fun (X) -> 505 case Fun(X, false) of 506 {deleted, X, Bs, Deletions} -> 507 rabbit_binding:process_deletions( 508 rabbit_binding:add_deletion( 509 XName, {X, deleted, Bs}, Deletions), Username); 510 {error, _InUseOrNotFound} = E -> 511 rabbit_misc:const(E) 512 end 513 end) 514 after 515 rabbit_runtime_parameters:clear(XName#resource.virtual_host, 516 ?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT, 517 XName#resource.name, Username) 518 end. 519 520-spec validate_binding 521 (rabbit_types:exchange(), rabbit_types:binding()) 522 -> rabbit_types:ok_or_error({'binding_invalid', string(), [any()]}). 523 524validate_binding(X = #exchange{type = XType}, Binding) -> 525 Module = type_to_module(XType), 526 Module:validate_binding(X, Binding). 527 528-spec maybe_auto_delete 529 (rabbit_types:exchange(), boolean()) 530 -> 'not_deleted' | {'deleted', rabbit_binding:deletions()}. 531 532maybe_auto_delete(#exchange{auto_delete = false}, _OnlyDurable) -> 533 not_deleted; 534maybe_auto_delete(#exchange{auto_delete = true} = X, OnlyDurable) -> 535 case conditional_delete(X, OnlyDurable) of 536 {error, in_use} -> not_deleted; 537 {deleted, X, [], Deletions} -> {deleted, Deletions} 538 end. 539 540conditional_delete(X = #exchange{name = XName}, OnlyDurable) -> 541 case rabbit_binding:has_for_source(XName) of 542 false -> internal_delete(X, OnlyDurable, false); 543 true -> {error, in_use} 544 end. 545 546unconditional_delete(X, OnlyDurable) -> 547 internal_delete(X, OnlyDurable, true). 548 549internal_delete(X = #exchange{name = XName}, OnlyDurable, RemoveBindingsForSource) -> 550 ok = mnesia:delete({rabbit_exchange, XName}), 551 ok = mnesia:delete({rabbit_exchange_serial, XName}), 552 mnesia:delete({rabbit_durable_exchange, XName}), 553 Bindings = case RemoveBindingsForSource of 554 true -> rabbit_binding:remove_for_source(XName); 555 false -> [] 556 end, 557 {deleted, X, Bindings, rabbit_binding:remove_for_destination( 558 XName, OnlyDurable)}. 559 560next_serial(XName) -> 561 Serial = peek_serial(XName, write), 562 ok = mnesia:write(rabbit_exchange_serial, 563 #exchange_serial{name = XName, next = Serial + 1}, write), 564 Serial. 565 566-spec peek_serial(name()) -> pos_integer() | 'undefined'. 567 568peek_serial(XName) -> peek_serial(XName, read). 569 570peek_serial(XName, LockType) -> 571 case mnesia:read(rabbit_exchange_serial, XName, LockType) of 572 [#exchange_serial{next = Serial}] -> Serial; 573 _ -> 1 574 end. 575 576invalid_module(T) -> 577 rabbit_log:warning("Could not find exchange type ~s.", [T]), 578 put({xtype_to_module, T}, rabbit_exchange_type_invalid), 579 rabbit_exchange_type_invalid. 580 581%% Used with atoms from records; e.g., the type is expected to exist. 582type_to_module(T) -> 583 case get({xtype_to_module, T}) of 584 undefined -> 585 case rabbit_registry:lookup_module(exchange, T) of 586 {ok, Module} -> put({xtype_to_module, T}, Module), 587 Module; 588 {error, not_found} -> invalid_module(T) 589 end; 590 Module -> 591 Module 592 end. 593