1%% This Source Code Form is subject to the terms of the Mozilla Public
2%% License, v. 2.0. If a copy of the MPL was not distributed with this
3%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4%%
5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates.  All rights reserved.
6%%
7
8-module(rabbit_binding).
9-include_lib("rabbit_common/include/rabbit.hrl").
10-include("amqqueue.hrl").
11
12-export([recover/0, recover/2, exists/1, add/2, add/3, remove/1, remove/2, remove/3, remove/4]).
13-export([list/1, list_for_source/1, list_for_destination/1,
14         list_for_source_and_destination/2, list_explicit/0]).
15-export([new_deletions/0, combine_deletions/2, add_deletion/3,
16         process_deletions/2, binding_action/3]).
17-export([info_keys/0, info/1, info/2, info_all/1, info_all/2, info_all/4]).
18%% these must all be run inside a mnesia tx
19-export([has_for_source/1, remove_for_source/1,
20         remove_for_destination/2, remove_transient_for_destination/1,
21         remove_default_exchange_binding_rows_of/1]).
22
23-export([implicit_for_destination/1, reverse_binding/1]).
24-export([new/4]).
25
26-define(DEFAULT_EXCHANGE(VHostPath), #resource{virtual_host = VHostPath,
27                                              kind = exchange,
28                                              name = <<>>}).
29
30%%----------------------------------------------------------------------------
31
32-export_type([key/0, deletions/0]).
33
34-type key() :: binary().
35
36-type bind_errors() :: rabbit_types:error(
37                         {'resources_missing',
38                          [{'not_found', (rabbit_types:binding_source() |
39                                          rabbit_types:binding_destination())} |
40                           {'absent', amqqueue:amqqueue()}]}).
41
42-type bind_ok_or_error() :: 'ok' | bind_errors() |
43                            rabbit_types:error(
44                              {'binding_invalid', string(), [any()]}).
45-type bind_res() :: bind_ok_or_error() | rabbit_misc:thunk(bind_ok_or_error()).
46-type inner_fun() ::
47        fun((rabbit_types:exchange(),
48             rabbit_types:exchange() | amqqueue:amqqueue()) ->
49                   rabbit_types:ok_or_error(rabbit_types:amqp_error())).
50-type bindings() :: [rabbit_types:binding()].
51
52%% TODO this should really be opaque but that seems to confuse 17.1's
53%% dialyzer into objecting to everything that uses it.
54-type deletions() :: dict:dict().
55
56%%----------------------------------------------------------------------------
57
58-spec new(rabbit_types:exchange(),
59          key(),
60          rabbit_types:exchange() | amqqueue:amqqueue(),
61          rabbit_framing:amqp_table()) ->
62    rabbit_types:binding().
63
64new(Src, RoutingKey, Dst, #{}) ->
65    new(Src, RoutingKey, Dst, []);
66new(Src, RoutingKey, Dst, Arguments) when is_map(Arguments) ->
67    new(Src, RoutingKey, Dst, maps:to_list(Arguments));
68new(Src, RoutingKey, Dst, Arguments) ->
69    #binding{source = Src, key = RoutingKey, destination = Dst, args = Arguments}.
70
71
72-define(INFO_KEYS, [source_name, source_kind,
73                    destination_name, destination_kind,
74                    routing_key, arguments,
75                    vhost]).
76
77%% Global table recovery
78
79recover() ->
80    rabbit_misc:execute_mnesia_transaction(
81        fun () ->
82            mnesia:lock({table, rabbit_durable_route}, read),
83            mnesia:lock({table, rabbit_semi_durable_route}, write),
84            Routes = rabbit_misc:dirty_read_all(rabbit_durable_route),
85            Fun = fun(Route) ->
86                mnesia:dirty_write(rabbit_semi_durable_route, Route)
87            end,
88        lists:foreach(Fun, Routes)
89    end).
90
91%% Virtual host-specific recovery
92
93-spec recover([rabbit_exchange:name()], [rabbit_amqqueue:name()]) ->
94                        'ok'.
95recover(XNames, QNames) ->
96    XNameSet = sets:from_list(XNames),
97    QNameSet = sets:from_list(QNames),
98    SelectSet = fun (#resource{kind = exchange}) -> XNameSet;
99                    (#resource{kind = queue})    -> QNameSet
100                end,
101    {ok, Gatherer} = gatherer:start_link(),
102    [recover_semi_durable_route(Gatherer, R, SelectSet(Dst)) ||
103        R = #route{binding = #binding{destination = Dst}} <-
104            rabbit_misc:dirty_read_all(rabbit_semi_durable_route)],
105    empty = gatherer:out(Gatherer),
106    ok = gatherer:stop(Gatherer),
107    ok.
108
109recover_semi_durable_route(Gatherer, R = #route{binding = B}, ToRecover) ->
110    #binding{source = Src, destination = Dst} = B,
111    case sets:is_element(Dst, ToRecover) of
112        true  -> {ok, X} = rabbit_exchange:lookup(Src),
113                 ok = gatherer:fork(Gatherer),
114                 ok = worker_pool:submit_async(
115                        fun () ->
116                                recover_semi_durable_route_txn(R, X),
117                                gatherer:finish(Gatherer)
118                        end);
119        false -> ok
120    end.
121
122recover_semi_durable_route_txn(R = #route{binding = B}, X) ->
123    rabbit_misc:execute_mnesia_transaction(
124      fun () ->
125              case mnesia:read(rabbit_semi_durable_route, B, read) of
126                  [] -> no_recover;
127                  _  -> ok = sync_transient_route(R, fun mnesia:write/3),
128                        rabbit_exchange:serial(X)
129              end
130      end,
131      fun (no_recover, _)     -> ok;
132          (_Serial,    true)  -> x_callback(transaction, X, add_binding, B);
133          (Serial,     false) -> x_callback(Serial,      X, add_binding, B)
134      end).
135
136-spec exists(rabbit_types:binding()) -> boolean() | bind_errors().
137
138exists(#binding{source = ?DEFAULT_EXCHANGE(_),
139                destination = #resource{kind = queue, name = QName} = Queue,
140                key = QName,
141                args = []}) ->
142    case rabbit_amqqueue:lookup(Queue) of
143        {ok, _} -> true;
144        {error, not_found} -> false
145    end;
146exists(Binding) ->
147    binding_action(
148      Binding, fun (_Src, _Dst, B) ->
149                       rabbit_misc:const(mnesia:read({rabbit_route, B}) /= [])
150               end, fun not_found_or_absent_errs/1).
151
152-spec add(rabbit_types:binding(), rabbit_types:username()) -> bind_res().
153
154add(Binding, ActingUser) -> add(Binding, fun (_Src, _Dst) -> ok end, ActingUser).
155
156-spec add(rabbit_types:binding(), inner_fun(), rabbit_types:username()) -> bind_res().
157
158add(Binding, InnerFun, ActingUser) ->
159    binding_action(
160      Binding,
161      fun (Src, Dst, B) ->
162              case rabbit_exchange:validate_binding(Src, B) of
163                  ok ->
164                      lock_resource(Src, read),
165                      lock_resource(Dst, read),
166                      %% this argument is used to check queue exclusivity;
167                      %% in general, we want to fail on that in preference to
168                      %% anything else
169                      case InnerFun(Src, Dst) of
170                          ok ->
171                              case mnesia:read({rabbit_route, B}) of
172                                  []  -> add(Src, Dst, B, ActingUser);
173                                  [_] -> fun () -> ok end
174                              end;
175                          {error, _} = Err ->
176                              rabbit_misc:const(Err)
177                      end;
178                  {error, _} = Err ->
179                      rabbit_misc:const(Err)
180              end
181      end, fun not_found_or_absent_errs/1).
182
183add(Src, Dst, B, ActingUser) ->
184    [SrcDurable, DstDurable] = [durable(E) || E <- [Src, Dst]],
185    ok = sync_route(#route{binding = B}, SrcDurable, DstDurable,
186                    fun mnesia:write/3),
187    x_callback(transaction, Src, add_binding, B),
188    Serial = rabbit_exchange:serial(Src),
189    fun () ->
190        x_callback(Serial, Src, add_binding, B),
191        ok = rabbit_event:notify(
192            binding_created,
193            info(B) ++ [{user_who_performed_action, ActingUser}])
194    end.
195
196-spec remove(rabbit_types:binding()) -> bind_res().
197remove(Binding) -> remove(Binding, fun (_Src, _Dst) -> ok end, ?INTERNAL_USER).
198
199-spec remove(rabbit_types:binding(), rabbit_types:username()) -> bind_res().
200remove(Binding, ActingUser) -> remove(Binding, fun (_Src, _Dst) -> ok end, ActingUser).
201
202
203-spec remove(rabbit_types:binding(), inner_fun(), rabbit_types:username()) -> bind_res().
204remove(Binding, InnerFun, ActingUser) ->
205    binding_action(
206      Binding,
207      fun (Src, Dst, B) ->
208              lock_resource(Src, read),
209              lock_resource(Dst, read),
210              case mnesia:read(rabbit_route, B, write) of
211                  [] -> case mnesia:read(rabbit_durable_route, B, write) of
212                            [] -> rabbit_misc:const(ok);
213                            %% We still delete the binding and run
214                            %% all post-delete functions if there is only
215                            %% a durable route in the database
216                            _  -> remove(Src, Dst, B, ActingUser)
217                        end;
218                  _  -> case InnerFun(Src, Dst) of
219                            ok               -> remove(Src, Dst, B, ActingUser);
220                            {error, _} = Err -> rabbit_misc:const(Err)
221                        end
222              end
223      end, fun absent_errs_only/1).
224
225remove(Src, Dst, B, ActingUser) ->
226    ok = sync_route(#route{binding = B}, durable(Src), durable(Dst),
227                    fun delete/3),
228    Deletions = maybe_auto_delete(
229                  B#binding.source, [B], new_deletions(), false),
230    process_deletions(Deletions, ActingUser).
231
232%% Implicit bindings are implicit as of rabbitmq/rabbitmq-server#1721.
233remove_default_exchange_binding_rows_of(Dst = #resource{}) ->
234    case implicit_for_destination(Dst) of
235        [Binding] ->
236            mnesia:dirty_delete(rabbit_durable_route, Binding),
237            mnesia:dirty_delete(rabbit_semi_durable_route, Binding),
238            mnesia:dirty_delete(rabbit_reverse_route,
239                                reverse_binding(Binding)),
240            mnesia:dirty_delete(rabbit_route, Binding);
241        _ ->
242            %% no binding to remove or
243            %% a competing tx has beaten us to it?
244            ok
245    end,
246    ok.
247
248-spec list_explicit() -> bindings().
249
250list_explicit() ->
251    mnesia:async_dirty(
252        fun () ->
253            AllRoutes = mnesia:dirty_match_object(rabbit_route, #route{_ = '_'}),
254            %% if there are any default exchange bindings left after an upgrade
255            %% of a pre-3.8 database, filter them out
256            AllBindings = [B || #route{binding = B} <- AllRoutes],
257            lists:filter(fun(#binding{source = S}) ->
258                            not (S#resource.kind =:= exchange andalso S#resource.name =:= <<>>)
259                         end, AllBindings)
260            end).
261
262-spec list(rabbit_types:vhost()) -> bindings().
263
264list(VHostPath) ->
265    VHostResource = rabbit_misc:r(VHostPath, '_'),
266    Route = #route{binding = #binding{source      = VHostResource,
267                                      destination = VHostResource,
268                                      _           = '_'},
269                   _       = '_'},
270    %% if there are any default exchange bindings left after an upgrade
271    %% of a pre-3.8 database, filter them out
272    AllBindings = [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route,
273                                                                         Route)],
274    Filtered    = lists:filter(fun(#binding{source = S}) ->
275                                       S =/= ?DEFAULT_EXCHANGE(VHostPath)
276                               end, AllBindings),
277    implicit_bindings(VHostPath) ++ Filtered.
278
279-spec list_for_source
280        (rabbit_types:binding_source()) -> bindings().
281
282list_for_source(?DEFAULT_EXCHANGE(VHostPath)) ->
283    implicit_bindings(VHostPath);
284list_for_source(SrcName) ->
285    mnesia:async_dirty(
286      fun() ->
287              Route = #route{binding = #binding{source = SrcName, _ = '_'}},
288              [B || #route{binding = B}
289                        <- mnesia:match_object(rabbit_route, Route, read)]
290      end).
291
292-spec list_for_destination
293        (rabbit_types:binding_destination()) -> bindings().
294
295list_for_destination(DstName = #resource{virtual_host = VHostPath}) ->
296    AllBindings = mnesia:async_dirty(
297          fun() ->
298                  Route = #route{binding = #binding{destination = DstName,
299                                                    _ = '_'}},
300                  [reverse_binding(B) ||
301                      #reverse_route{reverse_binding = B} <-
302                          mnesia:match_object(rabbit_reverse_route,
303                                              reverse_route(Route), read)]
304          end),
305    Filtered    = lists:filter(fun(#binding{source = S}) ->
306                                       S =/= ?DEFAULT_EXCHANGE(VHostPath)
307                               end, AllBindings),
308    implicit_for_destination(DstName) ++ Filtered.
309
310implicit_bindings(VHostPath) ->
311    DstQueues = rabbit_amqqueue:list_names(VHostPath),
312    [ #binding{source = ?DEFAULT_EXCHANGE(VHostPath),
313               destination = DstQueue,
314               key = QName,
315               args = []}
316      || DstQueue = #resource{name = QName} <- DstQueues ].
317
318implicit_for_destination(DstQueue = #resource{kind = queue,
319                                              virtual_host = VHostPath,
320                                              name = QName}) ->
321    [#binding{source = ?DEFAULT_EXCHANGE(VHostPath),
322              destination = DstQueue,
323              key = QName,
324              args = []}];
325implicit_for_destination(_) ->
326    [].
327
328-spec list_for_source_and_destination
329        (rabbit_types:binding_source(), rabbit_types:binding_destination()) ->
330                                                bindings().
331
332list_for_source_and_destination(?DEFAULT_EXCHANGE(VHostPath),
333                                #resource{kind = queue,
334                                          virtual_host = VHostPath,
335                                          name = QName} = DstQueue) ->
336    [#binding{source = ?DEFAULT_EXCHANGE(VHostPath),
337              destination = DstQueue,
338              key = QName,
339              args = []}];
340list_for_source_and_destination(SrcName, DstName) ->
341    mnesia:async_dirty(
342      fun() ->
343              Route = #route{binding = #binding{source      = SrcName,
344                                                destination = DstName,
345                                                _           = '_'}},
346              [B || #route{binding = B} <- mnesia:match_object(rabbit_route,
347                                                               Route, read)]
348      end).
349
350-spec info_keys() -> rabbit_types:info_keys().
351
352info_keys() -> ?INFO_KEYS.
353
354map(VHostPath, F) ->
355    %% TODO: there is scope for optimisation here, e.g. using a
356    %% cursor, parallelising the function invocation
357    lists:map(F, list(VHostPath)).
358
359infos(Items, B) -> [{Item, i(Item, B)} || Item <- Items].
360
361i(source_name,      #binding{source      = SrcName})    -> SrcName#resource.name;
362i(source_kind,      #binding{source      = SrcName})    -> SrcName#resource.kind;
363i(vhost,            #binding{source      = SrcName})    -> SrcName#resource.virtual_host;
364i(destination_name, #binding{destination = DstName})    -> DstName#resource.name;
365i(destination_kind, #binding{destination = DstName})    -> DstName#resource.kind;
366i(routing_key,      #binding{key         = RoutingKey}) -> RoutingKey;
367i(arguments,        #binding{args        = Arguments})  -> Arguments;
368i(Item, _) -> throw({bad_argument, Item}).
369
370-spec info(rabbit_types:binding()) -> rabbit_types:infos().
371
372info(B = #binding{}) -> infos(?INFO_KEYS, B).
373
374-spec info(rabbit_types:binding(), rabbit_types:info_keys()) ->
375          rabbit_types:infos().
376
377info(B = #binding{}, Items) -> infos(Items, B).
378
379-spec info_all(rabbit_types:vhost()) -> [rabbit_types:infos()].
380
381info_all(VHostPath) -> map(VHostPath, fun (B) -> info(B) end).
382
383-spec info_all(rabbit_types:vhost(), rabbit_types:info_keys()) ->
384          [rabbit_types:infos()].
385
386info_all(VHostPath, Items) -> map(VHostPath, fun (B) -> info(B, Items) end).
387
388-spec info_all(rabbit_types:vhost(), rabbit_types:info_keys(),
389                    reference(), pid()) -> 'ok'.
390
391info_all(VHostPath, Items, Ref, AggregatorPid) ->
392    rabbit_control_misc:emitting_map(
393      AggregatorPid, Ref, fun(B) -> info(B, Items) end, list(VHostPath)).
394
395-spec has_for_source(rabbit_types:binding_source()) -> boolean().
396
397has_for_source(SrcName) ->
398    Match = #route{binding = #binding{source = SrcName, _ = '_'}},
399    %% we need to check for semi-durable routes (which subsumes
400    %% durable routes) here too in case a bunch of routes to durable
401    %% queues have been removed temporarily as a result of a node
402    %% failure
403    contains(rabbit_route, Match) orelse
404        contains(rabbit_semi_durable_route, Match).
405
406-spec remove_for_source(rabbit_types:binding_source()) -> bindings().
407
408remove_for_source(SrcName) ->
409    lock_resource(SrcName),
410    Match = #route{binding = #binding{source = SrcName, _ = '_'}},
411    remove_routes(
412      lists:usort(
413        mnesia:dirty_match_object(rabbit_route, Match) ++
414            mnesia:dirty_match_object(rabbit_semi_durable_route, Match))).
415
416-spec remove_for_destination
417        (rabbit_types:binding_destination(), boolean()) -> deletions().
418
419remove_for_destination(DstName, OnlyDurable) ->
420    remove_for_destination(DstName, OnlyDurable, fun remove_routes/1).
421
422-spec remove_transient_for_destination
423        (rabbit_types:binding_destination()) -> deletions().
424
425remove_transient_for_destination(DstName) ->
426    remove_for_destination(DstName, false, fun remove_transient_routes/1).
427
428%%----------------------------------------------------------------------------
429
430durable(#exchange{durable = D}) -> D;
431durable(Q) when ?is_amqqueue(Q) ->
432    amqqueue:is_durable(Q).
433
434binding_action(Binding = #binding{source      = SrcName,
435                                  destination = DstName,
436                                  args        = Arguments}, Fun, ErrFun) ->
437    call_with_source_and_destination(
438      SrcName, DstName,
439      fun (Src, Dst) ->
440              SortedArgs = rabbit_misc:sort_field_table(Arguments),
441              Fun(Src, Dst, Binding#binding{args = SortedArgs})
442      end, ErrFun).
443
444sync_route(Route, true, true, Fun) ->
445    ok = Fun(rabbit_durable_route, Route, write),
446    sync_route(Route, false, true, Fun);
447
448sync_route(Route, false, true, Fun) ->
449    ok = Fun(rabbit_semi_durable_route, Route, write),
450    sync_route(Route, false, false, Fun);
451
452sync_route(Route, _SrcDurable, false, Fun) ->
453    sync_transient_route(Route, Fun).
454
455sync_transient_route(Route, Fun) ->
456    ok = Fun(rabbit_route, Route, write),
457    ok = Fun(rabbit_reverse_route, reverse_route(Route), write).
458
459call_with_source_and_destination(SrcName, DstName, Fun, ErrFun) ->
460    SrcTable = table_for_resource(SrcName),
461    DstTable = table_for_resource(DstName),
462    rabbit_misc:execute_mnesia_tx_with_tail(
463      fun () ->
464              case {mnesia:read({SrcTable, SrcName}),
465                    mnesia:read({DstTable, DstName})} of
466                  {[Src], [Dst]} -> Fun(Src, Dst);
467                  {[],    [_]  } -> ErrFun([SrcName]);
468                  {[_],   []   } -> ErrFun([DstName]);
469                  {[],    []   } -> ErrFun([SrcName, DstName])
470              end
471      end).
472
473not_found_or_absent_errs(Names) ->
474    Errs = [not_found_or_absent(Name) || Name <- Names],
475    rabbit_misc:const({error, {resources_missing, Errs}}).
476
477absent_errs_only(Names) ->
478    Errs = [E || Name <- Names,
479                 {absent, _Q, _Reason} = E <- [not_found_or_absent(Name)]],
480    rabbit_misc:const(case Errs of
481                          [] -> ok;
482                          _  -> {error, {resources_missing, Errs}}
483                      end).
484
485table_for_resource(#resource{kind = exchange}) -> rabbit_exchange;
486table_for_resource(#resource{kind = queue})    -> rabbit_queue.
487
488not_found_or_absent(#resource{kind = exchange} = Name) ->
489    {not_found, Name};
490not_found_or_absent(#resource{kind = queue}    = Name) ->
491    case rabbit_amqqueue:not_found_or_absent(Name) of
492        not_found                 -> {not_found, Name};
493        {absent, _Q, _Reason} = R -> R
494    end.
495
496contains(Table, MatchHead) ->
497    continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read)).
498
499continue('$end_of_table')    -> false;
500continue({[_|_], _})         -> true;
501continue({[], Continuation}) -> continue(mnesia:select(Continuation)).
502
503remove_routes(Routes) ->
504    %% This partitioning allows us to suppress unnecessary delete
505    %% operations on disk tables, which require an fsync.
506    {RamRoutes, DiskRoutes} =
507        lists:partition(fun (R) -> mnesia:read(
508                                     rabbit_durable_route, R#route.binding, read) == [] end,
509                        Routes),
510    {RamOnlyRoutes, SemiDurableRoutes} =
511        lists:partition(fun (R) -> mnesia:read(
512                                     rabbit_semi_durable_route, R#route.binding, read) == [] end,
513                        RamRoutes),
514    %% Of course the destination might not really be durable but it's
515    %% just as easy to try to delete it from the semi-durable table
516    %% than check first
517    [ok = sync_route(R, true,  true, fun delete/3) ||
518        R <- DiskRoutes],
519    [ok = sync_route(R, false, true, fun delete/3) ||
520        R <- SemiDurableRoutes],
521    [ok = sync_route(R, false, false, fun delete/3) ||
522        R <- RamOnlyRoutes],
523    [R#route.binding || R <- Routes].
524
525
526delete(Tab, #route{binding = B}, LockKind) ->
527    mnesia:delete(Tab, B, LockKind);
528delete(Tab, #reverse_route{reverse_binding = B}, LockKind) ->
529    mnesia:delete(Tab, B, LockKind).
530
531remove_transient_routes(Routes) ->
532    [begin
533         ok = sync_transient_route(R, fun delete/3),
534         R#route.binding
535     end || R <- Routes].
536
537remove_for_destination(DstName, OnlyDurable, Fun) ->
538    lock_resource(DstName),
539    MatchFwd = #route{binding = #binding{destination = DstName, _ = '_'}},
540    MatchRev = reverse_route(MatchFwd),
541    Routes = case OnlyDurable of
542                 false ->
543                        [reverse_route(R) ||
544                              R <- mnesia:dirty_match_object(
545                                     rabbit_reverse_route, MatchRev)];
546                 true  -> lists:usort(
547                            mnesia:dirty_match_object(
548                              rabbit_durable_route, MatchFwd) ++
549                                mnesia:dirty_match_object(
550                                  rabbit_semi_durable_route, MatchFwd))
551             end,
552    Bindings = Fun(Routes),
553    group_bindings_fold(fun maybe_auto_delete/4, new_deletions(),
554                        lists:keysort(#binding.source, Bindings), OnlyDurable).
555
556%% Instead of locking entire table on remove operations we can lock the
557%% affected resource only.
558lock_resource(Name) -> lock_resource(Name, write).
559
560lock_resource(Name, LockKind) ->
561    mnesia:lock({global, Name, mnesia:table_info(rabbit_route, where_to_write)},
562                LockKind).
563
564%% Requires that its input binding list is sorted in exchange-name
565%% order, so that the grouping of bindings (for passing to
566%% group_bindings_and_auto_delete1) works properly.
567group_bindings_fold(_Fun, Acc, [], _OnlyDurable) ->
568    Acc;
569group_bindings_fold(Fun, Acc, [B = #binding{source = SrcName} | Bs],
570                    OnlyDurable) ->
571    group_bindings_fold(Fun, SrcName, Acc, Bs, [B], OnlyDurable).
572
573group_bindings_fold(
574  Fun, SrcName, Acc, [B = #binding{source = SrcName} | Bs], Bindings,
575  OnlyDurable) ->
576    group_bindings_fold(Fun, SrcName, Acc, Bs, [B | Bindings], OnlyDurable);
577group_bindings_fold(Fun, SrcName, Acc, Removed, Bindings, OnlyDurable) ->
578    %% Either Removed is [], or its head has a non-matching SrcName.
579    group_bindings_fold(Fun, Fun(SrcName, Bindings, Acc, OnlyDurable), Removed,
580                        OnlyDurable).
581
582maybe_auto_delete(XName, Bindings, Deletions, OnlyDurable) ->
583    {Entry, Deletions1} =
584        case mnesia:read({case OnlyDurable of
585                              true  -> rabbit_durable_exchange;
586                              false -> rabbit_exchange
587                          end, XName}) of
588            []  -> {{undefined, not_deleted, Bindings}, Deletions};
589            [X] -> case rabbit_exchange:maybe_auto_delete(X, OnlyDurable) of
590                       not_deleted ->
591                           {{X, not_deleted, Bindings}, Deletions};
592                       {deleted, Deletions2} ->
593                           {{X, deleted, Bindings},
594                            combine_deletions(Deletions, Deletions2)}
595                   end
596        end,
597    add_deletion(XName, Entry, Deletions1).
598
599reverse_route(#route{binding = Binding}) ->
600    #reverse_route{reverse_binding = reverse_binding(Binding)};
601
602reverse_route(#reverse_route{reverse_binding = Binding}) ->
603    #route{binding = reverse_binding(Binding)}.
604
605reverse_binding(#reverse_binding{source      = SrcName,
606                                 destination = DstName,
607                                 key         = Key,
608                                 args        = Args}) ->
609    #binding{source      = SrcName,
610             destination = DstName,
611             key         = Key,
612             args        = Args};
613
614reverse_binding(#binding{source      = SrcName,
615                         destination = DstName,
616                         key         = Key,
617                         args        = Args}) ->
618    #reverse_binding{source      = SrcName,
619                     destination = DstName,
620                     key         = Key,
621                     args        = Args}.
622
623%% ----------------------------------------------------------------------------
624%% Binding / exchange deletion abstraction API
625%% ----------------------------------------------------------------------------
626
627anything_but( NotThis, NotThis, NotThis) -> NotThis;
628anything_but( NotThis, NotThis,    This) -> This;
629anything_but( NotThis,    This, NotThis) -> This;
630anything_but(_NotThis,    This,    This) -> This.
631
632-spec new_deletions() -> deletions().
633
634new_deletions() -> dict:new().
635
636-spec add_deletion
637        (rabbit_exchange:name(),
638         {'undefined' | rabbit_types:exchange(),
639          'deleted' | 'not_deleted',
640          bindings()},
641         deletions()) ->
642            deletions().
643
644add_deletion(XName, Entry, Deletions) ->
645    dict:update(XName, fun (Entry1) -> merge_entry(Entry1, Entry) end,
646                Entry, Deletions).
647
648-spec combine_deletions(deletions(), deletions()) -> deletions().
649
650combine_deletions(Deletions1, Deletions2) ->
651    dict:merge(fun (_XName, Entry1, Entry2) -> merge_entry(Entry1, Entry2) end,
652               Deletions1, Deletions2).
653
654merge_entry({X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) ->
655    {anything_but(undefined, X1, X2),
656     anything_but(not_deleted, Deleted1, Deleted2),
657     [Bindings1 | Bindings2]}.
658
659-spec process_deletions(deletions(), rabbit_types:username()) -> rabbit_misc:thunk('ok').
660
661process_deletions(Deletions, ActingUser) ->
662    AugmentedDeletions =
663        dict:map(fun (_XName, {X, deleted, Bindings}) ->
664                         Bs = lists:flatten(Bindings),
665                         x_callback(transaction, X, delete, Bs),
666                         {X, deleted, Bs, none};
667                     (_XName, {X, not_deleted, Bindings}) ->
668                         Bs = lists:flatten(Bindings),
669                         x_callback(transaction, X, remove_bindings, Bs),
670                         {X, not_deleted, Bs, rabbit_exchange:serial(X)}
671                 end, Deletions),
672    fun() ->
673            dict:fold(fun (XName, {X, deleted, Bs, Serial}, ok) ->
674                              ok = rabbit_event:notify(
675                                     exchange_deleted,
676                                     [{name, XName},
677                                      {user_who_performed_action, ActingUser}]),
678                              del_notify(Bs, ActingUser),
679                              x_callback(Serial, X, delete, Bs);
680                          (_XName, {X, not_deleted, Bs, Serial}, ok) ->
681                              del_notify(Bs, ActingUser),
682                              x_callback(Serial, X, remove_bindings, Bs)
683                      end, ok, AugmentedDeletions)
684    end.
685
686del_notify(Bs, ActingUser) -> [rabbit_event:notify(
687                               binding_deleted,
688                               info(B) ++ [{user_who_performed_action, ActingUser}])
689                             || B <- Bs].
690
691x_callback(Serial, X, F, Bs) ->
692    ok = rabbit_exchange:callback(X, F, Serial, [X, Bs]).
693