1%% This Source Code Form is subject to the terms of the Mozilla Public 2%% License, v. 2.0. If a copy of the MPL was not distributed with this 3%% file, You can obtain one at https://mozilla.org/MPL/2.0/. 4%% 5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved. 6%% 7 8-module(rabbit_binding). 9-include_lib("rabbit_common/include/rabbit.hrl"). 10-include("amqqueue.hrl"). 11 12-export([recover/0, recover/2, exists/1, add/2, add/3, remove/1, remove/2, remove/3, remove/4]). 13-export([list/1, list_for_source/1, list_for_destination/1, 14 list_for_source_and_destination/2, list_explicit/0]). 15-export([new_deletions/0, combine_deletions/2, add_deletion/3, 16 process_deletions/2, binding_action/3]). 17-export([info_keys/0, info/1, info/2, info_all/1, info_all/2, info_all/4]). 18%% these must all be run inside a mnesia tx 19-export([has_for_source/1, remove_for_source/1, 20 remove_for_destination/2, remove_transient_for_destination/1, 21 remove_default_exchange_binding_rows_of/1]). 22 23-export([implicit_for_destination/1, reverse_binding/1]). 24-export([new/4]). 25 26-define(DEFAULT_EXCHANGE(VHostPath), #resource{virtual_host = VHostPath, 27 kind = exchange, 28 name = <<>>}). 29 30%%---------------------------------------------------------------------------- 31 32-export_type([key/0, deletions/0]). 33 34-type key() :: binary(). 35 36-type bind_errors() :: rabbit_types:error( 37 {'resources_missing', 38 [{'not_found', (rabbit_types:binding_source() | 39 rabbit_types:binding_destination())} | 40 {'absent', amqqueue:amqqueue()}]}). 41 42-type bind_ok_or_error() :: 'ok' | bind_errors() | 43 rabbit_types:error( 44 {'binding_invalid', string(), [any()]}). 45-type bind_res() :: bind_ok_or_error() | rabbit_misc:thunk(bind_ok_or_error()). 46-type inner_fun() :: 47 fun((rabbit_types:exchange(), 48 rabbit_types:exchange() | amqqueue:amqqueue()) -> 49 rabbit_types:ok_or_error(rabbit_types:amqp_error())). 50-type bindings() :: [rabbit_types:binding()]. 51 52%% TODO this should really be opaque but that seems to confuse 17.1's 53%% dialyzer into objecting to everything that uses it. 54-type deletions() :: dict:dict(). 55 56%%---------------------------------------------------------------------------- 57 58-spec new(rabbit_types:exchange(), 59 key(), 60 rabbit_types:exchange() | amqqueue:amqqueue(), 61 rabbit_framing:amqp_table()) -> 62 rabbit_types:binding(). 63 64new(Src, RoutingKey, Dst, #{}) -> 65 new(Src, RoutingKey, Dst, []); 66new(Src, RoutingKey, Dst, Arguments) when is_map(Arguments) -> 67 new(Src, RoutingKey, Dst, maps:to_list(Arguments)); 68new(Src, RoutingKey, Dst, Arguments) -> 69 #binding{source = Src, key = RoutingKey, destination = Dst, args = Arguments}. 70 71 72-define(INFO_KEYS, [source_name, source_kind, 73 destination_name, destination_kind, 74 routing_key, arguments, 75 vhost]). 76 77%% Global table recovery 78 79recover() -> 80 rabbit_misc:execute_mnesia_transaction( 81 fun () -> 82 mnesia:lock({table, rabbit_durable_route}, read), 83 mnesia:lock({table, rabbit_semi_durable_route}, write), 84 Routes = rabbit_misc:dirty_read_all(rabbit_durable_route), 85 Fun = fun(Route) -> 86 mnesia:dirty_write(rabbit_semi_durable_route, Route) 87 end, 88 lists:foreach(Fun, Routes) 89 end). 90 91%% Virtual host-specific recovery 92 93-spec recover([rabbit_exchange:name()], [rabbit_amqqueue:name()]) -> 94 'ok'. 95recover(XNames, QNames) -> 96 XNameSet = sets:from_list(XNames), 97 QNameSet = sets:from_list(QNames), 98 SelectSet = fun (#resource{kind = exchange}) -> XNameSet; 99 (#resource{kind = queue}) -> QNameSet 100 end, 101 {ok, Gatherer} = gatherer:start_link(), 102 [recover_semi_durable_route(Gatherer, R, SelectSet(Dst)) || 103 R = #route{binding = #binding{destination = Dst}} <- 104 rabbit_misc:dirty_read_all(rabbit_semi_durable_route)], 105 empty = gatherer:out(Gatherer), 106 ok = gatherer:stop(Gatherer), 107 ok. 108 109recover_semi_durable_route(Gatherer, R = #route{binding = B}, ToRecover) -> 110 #binding{source = Src, destination = Dst} = B, 111 case sets:is_element(Dst, ToRecover) of 112 true -> {ok, X} = rabbit_exchange:lookup(Src), 113 ok = gatherer:fork(Gatherer), 114 ok = worker_pool:submit_async( 115 fun () -> 116 recover_semi_durable_route_txn(R, X), 117 gatherer:finish(Gatherer) 118 end); 119 false -> ok 120 end. 121 122recover_semi_durable_route_txn(R = #route{binding = B}, X) -> 123 rabbit_misc:execute_mnesia_transaction( 124 fun () -> 125 case mnesia:read(rabbit_semi_durable_route, B, read) of 126 [] -> no_recover; 127 _ -> ok = sync_transient_route(R, fun mnesia:write/3), 128 rabbit_exchange:serial(X) 129 end 130 end, 131 fun (no_recover, _) -> ok; 132 (_Serial, true) -> x_callback(transaction, X, add_binding, B); 133 (Serial, false) -> x_callback(Serial, X, add_binding, B) 134 end). 135 136-spec exists(rabbit_types:binding()) -> boolean() | bind_errors(). 137 138exists(#binding{source = ?DEFAULT_EXCHANGE(_), 139 destination = #resource{kind = queue, name = QName} = Queue, 140 key = QName, 141 args = []}) -> 142 case rabbit_amqqueue:lookup(Queue) of 143 {ok, _} -> true; 144 {error, not_found} -> false 145 end; 146exists(Binding) -> 147 binding_action( 148 Binding, fun (_Src, _Dst, B) -> 149 rabbit_misc:const(mnesia:read({rabbit_route, B}) /= []) 150 end, fun not_found_or_absent_errs/1). 151 152-spec add(rabbit_types:binding(), rabbit_types:username()) -> bind_res(). 153 154add(Binding, ActingUser) -> add(Binding, fun (_Src, _Dst) -> ok end, ActingUser). 155 156-spec add(rabbit_types:binding(), inner_fun(), rabbit_types:username()) -> bind_res(). 157 158add(Binding, InnerFun, ActingUser) -> 159 binding_action( 160 Binding, 161 fun (Src, Dst, B) -> 162 case rabbit_exchange:validate_binding(Src, B) of 163 ok -> 164 lock_resource(Src, read), 165 lock_resource(Dst, read), 166 %% this argument is used to check queue exclusivity; 167 %% in general, we want to fail on that in preference to 168 %% anything else 169 case InnerFun(Src, Dst) of 170 ok -> 171 case mnesia:read({rabbit_route, B}) of 172 [] -> add(Src, Dst, B, ActingUser); 173 [_] -> fun () -> ok end 174 end; 175 {error, _} = Err -> 176 rabbit_misc:const(Err) 177 end; 178 {error, _} = Err -> 179 rabbit_misc:const(Err) 180 end 181 end, fun not_found_or_absent_errs/1). 182 183add(Src, Dst, B, ActingUser) -> 184 [SrcDurable, DstDurable] = [durable(E) || E <- [Src, Dst]], 185 ok = sync_route(#route{binding = B}, SrcDurable, DstDurable, 186 fun mnesia:write/3), 187 x_callback(transaction, Src, add_binding, B), 188 Serial = rabbit_exchange:serial(Src), 189 fun () -> 190 x_callback(Serial, Src, add_binding, B), 191 ok = rabbit_event:notify( 192 binding_created, 193 info(B) ++ [{user_who_performed_action, ActingUser}]) 194 end. 195 196-spec remove(rabbit_types:binding()) -> bind_res(). 197remove(Binding) -> remove(Binding, fun (_Src, _Dst) -> ok end, ?INTERNAL_USER). 198 199-spec remove(rabbit_types:binding(), rabbit_types:username()) -> bind_res(). 200remove(Binding, ActingUser) -> remove(Binding, fun (_Src, _Dst) -> ok end, ActingUser). 201 202 203-spec remove(rabbit_types:binding(), inner_fun(), rabbit_types:username()) -> bind_res(). 204remove(Binding, InnerFun, ActingUser) -> 205 binding_action( 206 Binding, 207 fun (Src, Dst, B) -> 208 lock_resource(Src, read), 209 lock_resource(Dst, read), 210 case mnesia:read(rabbit_route, B, write) of 211 [] -> case mnesia:read(rabbit_durable_route, B, write) of 212 [] -> rabbit_misc:const(ok); 213 %% We still delete the binding and run 214 %% all post-delete functions if there is only 215 %% a durable route in the database 216 _ -> remove(Src, Dst, B, ActingUser) 217 end; 218 _ -> case InnerFun(Src, Dst) of 219 ok -> remove(Src, Dst, B, ActingUser); 220 {error, _} = Err -> rabbit_misc:const(Err) 221 end 222 end 223 end, fun absent_errs_only/1). 224 225remove(Src, Dst, B, ActingUser) -> 226 ok = sync_route(#route{binding = B}, durable(Src), durable(Dst), 227 fun delete/3), 228 Deletions = maybe_auto_delete( 229 B#binding.source, [B], new_deletions(), false), 230 process_deletions(Deletions, ActingUser). 231 232%% Implicit bindings are implicit as of rabbitmq/rabbitmq-server#1721. 233remove_default_exchange_binding_rows_of(Dst = #resource{}) -> 234 case implicit_for_destination(Dst) of 235 [Binding] -> 236 mnesia:dirty_delete(rabbit_durable_route, Binding), 237 mnesia:dirty_delete(rabbit_semi_durable_route, Binding), 238 mnesia:dirty_delete(rabbit_reverse_route, 239 reverse_binding(Binding)), 240 mnesia:dirty_delete(rabbit_route, Binding); 241 _ -> 242 %% no binding to remove or 243 %% a competing tx has beaten us to it? 244 ok 245 end, 246 ok. 247 248-spec list_explicit() -> bindings(). 249 250list_explicit() -> 251 mnesia:async_dirty( 252 fun () -> 253 AllRoutes = mnesia:dirty_match_object(rabbit_route, #route{_ = '_'}), 254 %% if there are any default exchange bindings left after an upgrade 255 %% of a pre-3.8 database, filter them out 256 AllBindings = [B || #route{binding = B} <- AllRoutes], 257 lists:filter(fun(#binding{source = S}) -> 258 not (S#resource.kind =:= exchange andalso S#resource.name =:= <<>>) 259 end, AllBindings) 260 end). 261 262-spec list(rabbit_types:vhost()) -> bindings(). 263 264list(VHostPath) -> 265 VHostResource = rabbit_misc:r(VHostPath, '_'), 266 Route = #route{binding = #binding{source = VHostResource, 267 destination = VHostResource, 268 _ = '_'}, 269 _ = '_'}, 270 %% if there are any default exchange bindings left after an upgrade 271 %% of a pre-3.8 database, filter them out 272 AllBindings = [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, 273 Route)], 274 Filtered = lists:filter(fun(#binding{source = S}) -> 275 S =/= ?DEFAULT_EXCHANGE(VHostPath) 276 end, AllBindings), 277 implicit_bindings(VHostPath) ++ Filtered. 278 279-spec list_for_source 280 (rabbit_types:binding_source()) -> bindings(). 281 282list_for_source(?DEFAULT_EXCHANGE(VHostPath)) -> 283 implicit_bindings(VHostPath); 284list_for_source(SrcName) -> 285 mnesia:async_dirty( 286 fun() -> 287 Route = #route{binding = #binding{source = SrcName, _ = '_'}}, 288 [B || #route{binding = B} 289 <- mnesia:match_object(rabbit_route, Route, read)] 290 end). 291 292-spec list_for_destination 293 (rabbit_types:binding_destination()) -> bindings(). 294 295list_for_destination(DstName = #resource{virtual_host = VHostPath}) -> 296 AllBindings = mnesia:async_dirty( 297 fun() -> 298 Route = #route{binding = #binding{destination = DstName, 299 _ = '_'}}, 300 [reverse_binding(B) || 301 #reverse_route{reverse_binding = B} <- 302 mnesia:match_object(rabbit_reverse_route, 303 reverse_route(Route), read)] 304 end), 305 Filtered = lists:filter(fun(#binding{source = S}) -> 306 S =/= ?DEFAULT_EXCHANGE(VHostPath) 307 end, AllBindings), 308 implicit_for_destination(DstName) ++ Filtered. 309 310implicit_bindings(VHostPath) -> 311 DstQueues = rabbit_amqqueue:list_names(VHostPath), 312 [ #binding{source = ?DEFAULT_EXCHANGE(VHostPath), 313 destination = DstQueue, 314 key = QName, 315 args = []} 316 || DstQueue = #resource{name = QName} <- DstQueues ]. 317 318implicit_for_destination(DstQueue = #resource{kind = queue, 319 virtual_host = VHostPath, 320 name = QName}) -> 321 [#binding{source = ?DEFAULT_EXCHANGE(VHostPath), 322 destination = DstQueue, 323 key = QName, 324 args = []}]; 325implicit_for_destination(_) -> 326 []. 327 328-spec list_for_source_and_destination 329 (rabbit_types:binding_source(), rabbit_types:binding_destination()) -> 330 bindings(). 331 332list_for_source_and_destination(?DEFAULT_EXCHANGE(VHostPath), 333 #resource{kind = queue, 334 virtual_host = VHostPath, 335 name = QName} = DstQueue) -> 336 [#binding{source = ?DEFAULT_EXCHANGE(VHostPath), 337 destination = DstQueue, 338 key = QName, 339 args = []}]; 340list_for_source_and_destination(SrcName, DstName) -> 341 mnesia:async_dirty( 342 fun() -> 343 Route = #route{binding = #binding{source = SrcName, 344 destination = DstName, 345 _ = '_'}}, 346 [B || #route{binding = B} <- mnesia:match_object(rabbit_route, 347 Route, read)] 348 end). 349 350-spec info_keys() -> rabbit_types:info_keys(). 351 352info_keys() -> ?INFO_KEYS. 353 354map(VHostPath, F) -> 355 %% TODO: there is scope for optimisation here, e.g. using a 356 %% cursor, parallelising the function invocation 357 lists:map(F, list(VHostPath)). 358 359infos(Items, B) -> [{Item, i(Item, B)} || Item <- Items]. 360 361i(source_name, #binding{source = SrcName}) -> SrcName#resource.name; 362i(source_kind, #binding{source = SrcName}) -> SrcName#resource.kind; 363i(vhost, #binding{source = SrcName}) -> SrcName#resource.virtual_host; 364i(destination_name, #binding{destination = DstName}) -> DstName#resource.name; 365i(destination_kind, #binding{destination = DstName}) -> DstName#resource.kind; 366i(routing_key, #binding{key = RoutingKey}) -> RoutingKey; 367i(arguments, #binding{args = Arguments}) -> Arguments; 368i(Item, _) -> throw({bad_argument, Item}). 369 370-spec info(rabbit_types:binding()) -> rabbit_types:infos(). 371 372info(B = #binding{}) -> infos(?INFO_KEYS, B). 373 374-spec info(rabbit_types:binding(), rabbit_types:info_keys()) -> 375 rabbit_types:infos(). 376 377info(B = #binding{}, Items) -> infos(Items, B). 378 379-spec info_all(rabbit_types:vhost()) -> [rabbit_types:infos()]. 380 381info_all(VHostPath) -> map(VHostPath, fun (B) -> info(B) end). 382 383-spec info_all(rabbit_types:vhost(), rabbit_types:info_keys()) -> 384 [rabbit_types:infos()]. 385 386info_all(VHostPath, Items) -> map(VHostPath, fun (B) -> info(B, Items) end). 387 388-spec info_all(rabbit_types:vhost(), rabbit_types:info_keys(), 389 reference(), pid()) -> 'ok'. 390 391info_all(VHostPath, Items, Ref, AggregatorPid) -> 392 rabbit_control_misc:emitting_map( 393 AggregatorPid, Ref, fun(B) -> info(B, Items) end, list(VHostPath)). 394 395-spec has_for_source(rabbit_types:binding_source()) -> boolean(). 396 397has_for_source(SrcName) -> 398 Match = #route{binding = #binding{source = SrcName, _ = '_'}}, 399 %% we need to check for semi-durable routes (which subsumes 400 %% durable routes) here too in case a bunch of routes to durable 401 %% queues have been removed temporarily as a result of a node 402 %% failure 403 contains(rabbit_route, Match) orelse 404 contains(rabbit_semi_durable_route, Match). 405 406-spec remove_for_source(rabbit_types:binding_source()) -> bindings(). 407 408remove_for_source(SrcName) -> 409 lock_resource(SrcName), 410 Match = #route{binding = #binding{source = SrcName, _ = '_'}}, 411 remove_routes( 412 lists:usort( 413 mnesia:dirty_match_object(rabbit_route, Match) ++ 414 mnesia:dirty_match_object(rabbit_semi_durable_route, Match))). 415 416-spec remove_for_destination 417 (rabbit_types:binding_destination(), boolean()) -> deletions(). 418 419remove_for_destination(DstName, OnlyDurable) -> 420 remove_for_destination(DstName, OnlyDurable, fun remove_routes/1). 421 422-spec remove_transient_for_destination 423 (rabbit_types:binding_destination()) -> deletions(). 424 425remove_transient_for_destination(DstName) -> 426 remove_for_destination(DstName, false, fun remove_transient_routes/1). 427 428%%---------------------------------------------------------------------------- 429 430durable(#exchange{durable = D}) -> D; 431durable(Q) when ?is_amqqueue(Q) -> 432 amqqueue:is_durable(Q). 433 434binding_action(Binding = #binding{source = SrcName, 435 destination = DstName, 436 args = Arguments}, Fun, ErrFun) -> 437 call_with_source_and_destination( 438 SrcName, DstName, 439 fun (Src, Dst) -> 440 SortedArgs = rabbit_misc:sort_field_table(Arguments), 441 Fun(Src, Dst, Binding#binding{args = SortedArgs}) 442 end, ErrFun). 443 444sync_route(Route, true, true, Fun) -> 445 ok = Fun(rabbit_durable_route, Route, write), 446 sync_route(Route, false, true, Fun); 447 448sync_route(Route, false, true, Fun) -> 449 ok = Fun(rabbit_semi_durable_route, Route, write), 450 sync_route(Route, false, false, Fun); 451 452sync_route(Route, _SrcDurable, false, Fun) -> 453 sync_transient_route(Route, Fun). 454 455sync_transient_route(Route, Fun) -> 456 ok = Fun(rabbit_route, Route, write), 457 ok = Fun(rabbit_reverse_route, reverse_route(Route), write). 458 459call_with_source_and_destination(SrcName, DstName, Fun, ErrFun) -> 460 SrcTable = table_for_resource(SrcName), 461 DstTable = table_for_resource(DstName), 462 rabbit_misc:execute_mnesia_tx_with_tail( 463 fun () -> 464 case {mnesia:read({SrcTable, SrcName}), 465 mnesia:read({DstTable, DstName})} of 466 {[Src], [Dst]} -> Fun(Src, Dst); 467 {[], [_] } -> ErrFun([SrcName]); 468 {[_], [] } -> ErrFun([DstName]); 469 {[], [] } -> ErrFun([SrcName, DstName]) 470 end 471 end). 472 473not_found_or_absent_errs(Names) -> 474 Errs = [not_found_or_absent(Name) || Name <- Names], 475 rabbit_misc:const({error, {resources_missing, Errs}}). 476 477absent_errs_only(Names) -> 478 Errs = [E || Name <- Names, 479 {absent, _Q, _Reason} = E <- [not_found_or_absent(Name)]], 480 rabbit_misc:const(case Errs of 481 [] -> ok; 482 _ -> {error, {resources_missing, Errs}} 483 end). 484 485table_for_resource(#resource{kind = exchange}) -> rabbit_exchange; 486table_for_resource(#resource{kind = queue}) -> rabbit_queue. 487 488not_found_or_absent(#resource{kind = exchange} = Name) -> 489 {not_found, Name}; 490not_found_or_absent(#resource{kind = queue} = Name) -> 491 case rabbit_amqqueue:not_found_or_absent(Name) of 492 not_found -> {not_found, Name}; 493 {absent, _Q, _Reason} = R -> R 494 end. 495 496contains(Table, MatchHead) -> 497 continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read)). 498 499continue('$end_of_table') -> false; 500continue({[_|_], _}) -> true; 501continue({[], Continuation}) -> continue(mnesia:select(Continuation)). 502 503remove_routes(Routes) -> 504 %% This partitioning allows us to suppress unnecessary delete 505 %% operations on disk tables, which require an fsync. 506 {RamRoutes, DiskRoutes} = 507 lists:partition(fun (R) -> mnesia:read( 508 rabbit_durable_route, R#route.binding, read) == [] end, 509 Routes), 510 {RamOnlyRoutes, SemiDurableRoutes} = 511 lists:partition(fun (R) -> mnesia:read( 512 rabbit_semi_durable_route, R#route.binding, read) == [] end, 513 RamRoutes), 514 %% Of course the destination might not really be durable but it's 515 %% just as easy to try to delete it from the semi-durable table 516 %% than check first 517 [ok = sync_route(R, true, true, fun delete/3) || 518 R <- DiskRoutes], 519 [ok = sync_route(R, false, true, fun delete/3) || 520 R <- SemiDurableRoutes], 521 [ok = sync_route(R, false, false, fun delete/3) || 522 R <- RamOnlyRoutes], 523 [R#route.binding || R <- Routes]. 524 525 526delete(Tab, #route{binding = B}, LockKind) -> 527 mnesia:delete(Tab, B, LockKind); 528delete(Tab, #reverse_route{reverse_binding = B}, LockKind) -> 529 mnesia:delete(Tab, B, LockKind). 530 531remove_transient_routes(Routes) -> 532 [begin 533 ok = sync_transient_route(R, fun delete/3), 534 R#route.binding 535 end || R <- Routes]. 536 537remove_for_destination(DstName, OnlyDurable, Fun) -> 538 lock_resource(DstName), 539 MatchFwd = #route{binding = #binding{destination = DstName, _ = '_'}}, 540 MatchRev = reverse_route(MatchFwd), 541 Routes = case OnlyDurable of 542 false -> 543 [reverse_route(R) || 544 R <- mnesia:dirty_match_object( 545 rabbit_reverse_route, MatchRev)]; 546 true -> lists:usort( 547 mnesia:dirty_match_object( 548 rabbit_durable_route, MatchFwd) ++ 549 mnesia:dirty_match_object( 550 rabbit_semi_durable_route, MatchFwd)) 551 end, 552 Bindings = Fun(Routes), 553 group_bindings_fold(fun maybe_auto_delete/4, new_deletions(), 554 lists:keysort(#binding.source, Bindings), OnlyDurable). 555 556%% Instead of locking entire table on remove operations we can lock the 557%% affected resource only. 558lock_resource(Name) -> lock_resource(Name, write). 559 560lock_resource(Name, LockKind) -> 561 mnesia:lock({global, Name, mnesia:table_info(rabbit_route, where_to_write)}, 562 LockKind). 563 564%% Requires that its input binding list is sorted in exchange-name 565%% order, so that the grouping of bindings (for passing to 566%% group_bindings_and_auto_delete1) works properly. 567group_bindings_fold(_Fun, Acc, [], _OnlyDurable) -> 568 Acc; 569group_bindings_fold(Fun, Acc, [B = #binding{source = SrcName} | Bs], 570 OnlyDurable) -> 571 group_bindings_fold(Fun, SrcName, Acc, Bs, [B], OnlyDurable). 572 573group_bindings_fold( 574 Fun, SrcName, Acc, [B = #binding{source = SrcName} | Bs], Bindings, 575 OnlyDurable) -> 576 group_bindings_fold(Fun, SrcName, Acc, Bs, [B | Bindings], OnlyDurable); 577group_bindings_fold(Fun, SrcName, Acc, Removed, Bindings, OnlyDurable) -> 578 %% Either Removed is [], or its head has a non-matching SrcName. 579 group_bindings_fold(Fun, Fun(SrcName, Bindings, Acc, OnlyDurable), Removed, 580 OnlyDurable). 581 582maybe_auto_delete(XName, Bindings, Deletions, OnlyDurable) -> 583 {Entry, Deletions1} = 584 case mnesia:read({case OnlyDurable of 585 true -> rabbit_durable_exchange; 586 false -> rabbit_exchange 587 end, XName}) of 588 [] -> {{undefined, not_deleted, Bindings}, Deletions}; 589 [X] -> case rabbit_exchange:maybe_auto_delete(X, OnlyDurable) of 590 not_deleted -> 591 {{X, not_deleted, Bindings}, Deletions}; 592 {deleted, Deletions2} -> 593 {{X, deleted, Bindings}, 594 combine_deletions(Deletions, Deletions2)} 595 end 596 end, 597 add_deletion(XName, Entry, Deletions1). 598 599reverse_route(#route{binding = Binding}) -> 600 #reverse_route{reverse_binding = reverse_binding(Binding)}; 601 602reverse_route(#reverse_route{reverse_binding = Binding}) -> 603 #route{binding = reverse_binding(Binding)}. 604 605reverse_binding(#reverse_binding{source = SrcName, 606 destination = DstName, 607 key = Key, 608 args = Args}) -> 609 #binding{source = SrcName, 610 destination = DstName, 611 key = Key, 612 args = Args}; 613 614reverse_binding(#binding{source = SrcName, 615 destination = DstName, 616 key = Key, 617 args = Args}) -> 618 #reverse_binding{source = SrcName, 619 destination = DstName, 620 key = Key, 621 args = Args}. 622 623%% ---------------------------------------------------------------------------- 624%% Binding / exchange deletion abstraction API 625%% ---------------------------------------------------------------------------- 626 627anything_but( NotThis, NotThis, NotThis) -> NotThis; 628anything_but( NotThis, NotThis, This) -> This; 629anything_but( NotThis, This, NotThis) -> This; 630anything_but(_NotThis, This, This) -> This. 631 632-spec new_deletions() -> deletions(). 633 634new_deletions() -> dict:new(). 635 636-spec add_deletion 637 (rabbit_exchange:name(), 638 {'undefined' | rabbit_types:exchange(), 639 'deleted' | 'not_deleted', 640 bindings()}, 641 deletions()) -> 642 deletions(). 643 644add_deletion(XName, Entry, Deletions) -> 645 dict:update(XName, fun (Entry1) -> merge_entry(Entry1, Entry) end, 646 Entry, Deletions). 647 648-spec combine_deletions(deletions(), deletions()) -> deletions(). 649 650combine_deletions(Deletions1, Deletions2) -> 651 dict:merge(fun (_XName, Entry1, Entry2) -> merge_entry(Entry1, Entry2) end, 652 Deletions1, Deletions2). 653 654merge_entry({X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) -> 655 {anything_but(undefined, X1, X2), 656 anything_but(not_deleted, Deleted1, Deleted2), 657 [Bindings1 | Bindings2]}. 658 659-spec process_deletions(deletions(), rabbit_types:username()) -> rabbit_misc:thunk('ok'). 660 661process_deletions(Deletions, ActingUser) -> 662 AugmentedDeletions = 663 dict:map(fun (_XName, {X, deleted, Bindings}) -> 664 Bs = lists:flatten(Bindings), 665 x_callback(transaction, X, delete, Bs), 666 {X, deleted, Bs, none}; 667 (_XName, {X, not_deleted, Bindings}) -> 668 Bs = lists:flatten(Bindings), 669 x_callback(transaction, X, remove_bindings, Bs), 670 {X, not_deleted, Bs, rabbit_exchange:serial(X)} 671 end, Deletions), 672 fun() -> 673 dict:fold(fun (XName, {X, deleted, Bs, Serial}, ok) -> 674 ok = rabbit_event:notify( 675 exchange_deleted, 676 [{name, XName}, 677 {user_who_performed_action, ActingUser}]), 678 del_notify(Bs, ActingUser), 679 x_callback(Serial, X, delete, Bs); 680 (_XName, {X, not_deleted, Bs, Serial}, ok) -> 681 del_notify(Bs, ActingUser), 682 x_callback(Serial, X, remove_bindings, Bs) 683 end, ok, AugmentedDeletions) 684 end. 685 686del_notify(Bs, ActingUser) -> [rabbit_event:notify( 687 binding_deleted, 688 info(B) ++ [{user_who_performed_action, ActingUser}]) 689 || B <- Bs]. 690 691x_callback(Serial, X, F, Bs) -> 692 ok = rabbit_exchange:callback(X, F, Serial, [X, Bs]). 693