1%% This Source Code Form is subject to the terms of the Mozilla Public
2%% License, v. 2.0. If a copy of the MPL was not distributed with this
3%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4%%
5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates.  All rights reserved.
6%%
7
8-module(rabbit_exchange).
9-include_lib("rabbit_common/include/rabbit.hrl").
10-include_lib("rabbit_common/include/rabbit_framing.hrl").
11
12-export([recover/1, policy_changed/2, callback/4, declare/7,
13         assert_equivalence/6, assert_args_equivalence/2, check_type/1,
14         lookup/1, lookup_many/1, lookup_or_die/1, list/0, list/1, lookup_scratch/2,
15         update_scratch/3, update_decorators/1, immutable/1,
16         info_keys/0, info/1, info/2, info_all/1, info_all/2, info_all/4,
17         route/2, delete/3, validate_binding/2, count/0]).
18-export([list_names/0, is_amq_prefixed/1]).
19%% these must be run inside a mnesia tx
20-export([maybe_auto_delete/2, serial/1, peek_serial/1, update/2]).
21
22%%----------------------------------------------------------------------------
23
24-export_type([name/0, type/0]).
25
26-type name() :: rabbit_types:r('exchange').
27-type type() :: atom().
28-type fun_name() :: atom().
29
30%%----------------------------------------------------------------------------
31
32-define(INFO_KEYS, [name, type, durable, auto_delete, internal, arguments,
33                    policy, user_who_performed_action]).
34
35-spec recover(rabbit_types:vhost()) -> [name()].
36
37recover(VHost) ->
38    Xs = rabbit_misc:table_filter(
39           fun (#exchange{name = XName}) ->
40                XName#resource.virtual_host =:= VHost andalso
41                mnesia:read({rabbit_exchange, XName}) =:= []
42           end,
43           fun (X, Tx) ->
44                   X1 = case Tx of
45                            true  -> store_ram(X);
46                            false -> rabbit_exchange_decorator:set(X)
47                        end,
48                   callback(X1, create, map_create_tx(Tx), [X1])
49           end,
50           rabbit_durable_exchange),
51    [XName || #exchange{name = XName} <- Xs].
52
53-spec callback
54        (rabbit_types:exchange(), fun_name(),
55         fun((boolean()) -> non_neg_integer()) | atom(), [any()]) -> 'ok'.
56
57callback(X = #exchange{type       = XType,
58                       decorators = Decorators}, Fun, Serial0, Args) ->
59    Serial = if is_function(Serial0) -> Serial0;
60                is_atom(Serial0)     -> fun (_Bool) -> Serial0 end
61             end,
62    [ok = apply(M, Fun, [Serial(M:serialise_events(X)) | Args]) ||
63        M <- rabbit_exchange_decorator:select(all, Decorators)],
64    Module = type_to_module(XType),
65    apply(Module, Fun, [Serial(Module:serialise_events()) | Args]).
66
67-spec policy_changed
68        (rabbit_types:exchange(), rabbit_types:exchange()) -> 'ok'.
69
70policy_changed(X  = #exchange{type       = XType,
71                              decorators = Decorators},
72               X1 = #exchange{decorators = Decorators1}) ->
73    D  = rabbit_exchange_decorator:select(all, Decorators),
74    D1 = rabbit_exchange_decorator:select(all, Decorators1),
75    DAll = lists:usort(D ++ D1),
76    [ok = M:policy_changed(X, X1) || M <- [type_to_module(XType) | DAll]],
77    ok.
78
79serialise_events(X = #exchange{type = Type, decorators = Decorators}) ->
80    lists:any(fun (M) -> M:serialise_events(X) end,
81              rabbit_exchange_decorator:select(all, Decorators))
82        orelse (type_to_module(Type)):serialise_events().
83
84-spec serial(rabbit_types:exchange()) ->
85                       fun((boolean()) -> 'none' | pos_integer()).
86
87serial(#exchange{name = XName} = X) ->
88    Serial = case serialise_events(X) of
89                 true  -> next_serial(XName);
90                 false -> none
91             end,
92    fun (true)  -> Serial;
93        (false) -> none
94    end.
95
96-spec is_amq_prefixed(rabbit_types:exchange() | binary()) -> boolean().
97
98is_amq_prefixed(Name) when is_binary(Name) ->
99    case re:run(Name, <<"^amq\.">>) of
100        nomatch    -> false;
101        {match, _} -> true
102    end;
103is_amq_prefixed(#exchange{name = #resource{name = <<>>}}) ->
104    false;
105is_amq_prefixed(#exchange{name = #resource{name = Name}}) ->
106    is_amq_prefixed(Name).
107
108-spec declare
109        (name(), type(), boolean(), boolean(), boolean(),
110         rabbit_framing:amqp_table(), rabbit_types:username())
111        -> rabbit_types:exchange().
112
113declare(XName, Type, Durable, AutoDelete, Internal, Args, Username) ->
114    X = rabbit_exchange_decorator:set(
115          rabbit_policy:set(#exchange{name        = XName,
116                                      type        = Type,
117                                      durable     = Durable,
118                                      auto_delete = AutoDelete,
119                                      internal    = Internal,
120                                      arguments   = Args,
121                                      options     = #{user => Username}})),
122    XT = type_to_module(Type),
123    %% We want to upset things if it isn't ok
124    ok = XT:validate(X),
125    %% Avoid a channel exception if there's a race condition
126    %% with an exchange.delete operation.
127    %%
128    %% See rabbitmq/rabbitmq-federation#7.
129    case rabbit_runtime_parameters:lookup(XName#resource.virtual_host,
130                                          ?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT,
131                                          XName#resource.name) of
132        not_found ->
133            rabbit_misc:execute_mnesia_transaction(
134              fun () ->
135                      case mnesia:wread({rabbit_exchange, XName}) of
136                          [] ->
137                              {new, store(X)};
138                          [ExistingX] ->
139                              {existing, ExistingX}
140                      end
141              end,
142              fun ({new, Exchange}, Tx) ->
143                      ok = callback(X, create, map_create_tx(Tx), [Exchange]),
144                      rabbit_event:notify_if(not Tx, exchange_created, info(Exchange)),
145                      Exchange;
146                  ({existing, Exchange}, _Tx) ->
147                      Exchange;
148                  (Err, _Tx) ->
149                      Err
150              end);
151        _ ->
152            rabbit_log:warning("ignoring exchange.declare for exchange ~p,
153                                exchange.delete in progress~n.", [XName]),
154            X
155    end.
156
157map_create_tx(true)  -> transaction;
158map_create_tx(false) -> none.
159
160
161store(X = #exchange{durable = true}) ->
162    mnesia:write(rabbit_durable_exchange, X#exchange{decorators = undefined},
163                 write),
164    store_ram(X);
165store(X = #exchange{durable = false}) ->
166    store_ram(X).
167
168store_ram(X) ->
169    X1 = rabbit_exchange_decorator:set(X),
170    ok = mnesia:write(rabbit_exchange, rabbit_exchange_decorator:set(X1),
171                      write),
172    X1.
173
174%% Used with binaries sent over the wire; the type may not exist.
175
176-spec check_type
177        (binary()) -> atom() | rabbit_types:connection_exit().
178
179check_type(TypeBin) ->
180    case rabbit_registry:binary_to_type(rabbit_data_coercion:to_binary(TypeBin)) of
181        {error, not_found} ->
182            rabbit_misc:protocol_error(
183              command_invalid, "unknown exchange type '~s'", [TypeBin]);
184        T ->
185            case rabbit_registry:lookup_module(exchange, T) of
186                {error, not_found} -> rabbit_misc:protocol_error(
187                                        command_invalid,
188                                        "invalid exchange type '~s'", [T]);
189                {ok, _Module}      -> T
190            end
191    end.
192
193-spec assert_equivalence
194        (rabbit_types:exchange(), atom(), boolean(), boolean(), boolean(),
195         rabbit_framing:amqp_table())
196        -> 'ok' | rabbit_types:connection_exit().
197
198assert_equivalence(X = #exchange{ name        = XName,
199                                  durable     = Durable,
200                                  auto_delete = AutoDelete,
201                                  internal    = Internal,
202                                  type        = Type},
203                   ReqType, ReqDurable, ReqAutoDelete, ReqInternal, ReqArgs) ->
204    AFE = fun rabbit_misc:assert_field_equivalence/4,
205    AFE(Type,       ReqType,       XName, type),
206    AFE(Durable,    ReqDurable,    XName, durable),
207    AFE(AutoDelete, ReqAutoDelete, XName, auto_delete),
208    AFE(Internal,   ReqInternal,   XName, internal),
209    (type_to_module(Type)):assert_args_equivalence(X, ReqArgs).
210
211-spec assert_args_equivalence
212        (rabbit_types:exchange(), rabbit_framing:amqp_table())
213        -> 'ok' | rabbit_types:connection_exit().
214
215assert_args_equivalence(#exchange{ name = Name, arguments = Args },
216                        RequiredArgs) ->
217    %% The spec says "Arguments are compared for semantic
218    %% equivalence".  The only arg we care about is
219    %% "alternate-exchange".
220    rabbit_misc:assert_args_equivalence(Args, RequiredArgs, Name,
221                                        [<<"alternate-exchange">>]).
222
223-spec lookup
224        (name()) -> rabbit_types:ok(rabbit_types:exchange()) |
225                    rabbit_types:error('not_found').
226
227lookup(Name) ->
228    rabbit_misc:dirty_read({rabbit_exchange, Name}).
229
230
231-spec lookup_many([name()]) -> [rabbit_types:exchange()].
232
233lookup_many([])     -> [];
234lookup_many([Name]) -> ets:lookup(rabbit_exchange, Name);
235lookup_many(Names) when is_list(Names) ->
236    %% Normally we'd call mnesia:dirty_read/1 here, but that is quite
237    %% expensive for reasons explained in rabbit_misc:dirty_read/1.
238    lists:append([ets:lookup(rabbit_exchange, Name) || Name <- Names]).
239
240
241-spec lookup_or_die
242        (name()) -> rabbit_types:exchange() |
243                    rabbit_types:channel_exit().
244
245lookup_or_die(Name) ->
246    case lookup(Name) of
247        {ok, X}            -> X;
248        {error, not_found} -> rabbit_amqqueue:not_found(Name)
249    end.
250
251-spec list() -> [rabbit_types:exchange()].
252
253list() -> mnesia:dirty_match_object(rabbit_exchange, #exchange{_ = '_'}).
254
255-spec count() -> non_neg_integer().
256
257count() ->
258    mnesia:table_info(rabbit_exchange, size).
259
260-spec list_names() -> [rabbit_exchange:name()].
261
262list_names() -> mnesia:dirty_all_keys(rabbit_exchange).
263
264%% Not dirty_match_object since that would not be transactional when used in a
265%% tx context
266
267-spec list(rabbit_types:vhost()) -> [rabbit_types:exchange()].
268
269list(VHostPath) ->
270    mnesia:async_dirty(
271      fun () ->
272              mnesia:match_object(
273                rabbit_exchange,
274                #exchange{name = rabbit_misc:r(VHostPath, exchange), _ = '_'},
275                read)
276      end).
277
278-spec lookup_scratch(name(), atom()) ->
279                               rabbit_types:ok(term()) |
280                               rabbit_types:error('not_found').
281
282lookup_scratch(Name, App) ->
283    case lookup(Name) of
284        {ok, #exchange{scratches = undefined}} ->
285            {error, not_found};
286        {ok, #exchange{scratches = Scratches}} ->
287            case orddict:find(App, Scratches) of
288                {ok, Value} -> {ok, Value};
289                error       -> {error, not_found}
290            end;
291        {error, not_found} ->
292            {error, not_found}
293    end.
294
295-spec update_scratch(name(), atom(), fun((any()) -> any())) -> 'ok'.
296
297update_scratch(Name, App, Fun) ->
298    rabbit_misc:execute_mnesia_transaction(
299      fun() ->
300              update(Name,
301                     fun(X = #exchange{scratches = Scratches0}) ->
302                             Scratches1 = case Scratches0 of
303                                              undefined -> orddict:new();
304                                              _         -> Scratches0
305                                          end,
306                             Scratch = case orddict:find(App, Scratches1) of
307                                           {ok, S} -> S;
308                                           error   -> undefined
309                                       end,
310                             Scratches2 = orddict:store(
311                                            App, Fun(Scratch), Scratches1),
312                             X#exchange{scratches = Scratches2}
313                     end),
314              ok
315      end).
316
317-spec update_decorators(name()) -> 'ok'.
318
319update_decorators(Name) ->
320    rabbit_misc:execute_mnesia_transaction(
321      fun() ->
322              case mnesia:wread({rabbit_exchange, Name}) of
323                  [X] -> store_ram(X),
324                         ok;
325                  []  -> ok
326              end
327      end).
328
329-spec update
330        (name(),
331         fun((rabbit_types:exchange()) -> rabbit_types:exchange()))
332         -> not_found | rabbit_types:exchange().
333
334update(Name, Fun) ->
335    case mnesia:wread({rabbit_exchange, Name}) of
336        [X] -> X1 = Fun(X),
337               store(X1);
338        []  -> not_found
339    end.
340
341-spec immutable(rabbit_types:exchange()) -> rabbit_types:exchange().
342
343immutable(X) -> X#exchange{scratches  = none,
344                           policy     = none,
345                           decorators = none}.
346
347-spec info_keys() -> rabbit_types:info_keys().
348
349info_keys() -> ?INFO_KEYS.
350
351map(VHostPath, F) ->
352    %% TODO: there is scope for optimisation here, e.g. using a
353    %% cursor, parallelising the function invocation
354    lists:map(F, list(VHostPath)).
355
356infos(Items, X) -> [{Item, i(Item, X)} || Item <- Items].
357
358i(name,        #exchange{name        = Name})       -> Name;
359i(type,        #exchange{type        = Type})       -> Type;
360i(durable,     #exchange{durable     = Durable})    -> Durable;
361i(auto_delete, #exchange{auto_delete = AutoDelete}) -> AutoDelete;
362i(internal,    #exchange{internal    = Internal})   -> Internal;
363i(arguments,   #exchange{arguments   = Arguments})  -> Arguments;
364i(policy,      X) ->  case rabbit_policy:name(X) of
365                          none   -> '';
366                          Policy -> Policy
367                      end;
368i(user_who_performed_action, #exchange{options = Opts}) ->
369    maps:get(user, Opts, ?UNKNOWN_USER);
370i(Item, #exchange{type = Type} = X) ->
371    case (type_to_module(Type)):info(X, [Item]) of
372        [{Item, I}] -> I;
373        []          -> throw({bad_argument, Item})
374    end.
375
376-spec info(rabbit_types:exchange()) -> rabbit_types:infos().
377
378info(X = #exchange{type = Type}) ->
379    infos(?INFO_KEYS, X) ++ (type_to_module(Type)):info(X).
380
381-spec info
382        (rabbit_types:exchange(), rabbit_types:info_keys())
383        -> rabbit_types:infos().
384
385info(X = #exchange{type = _Type}, Items) ->
386    infos(Items, X).
387
388-spec info_all(rabbit_types:vhost()) -> [rabbit_types:infos()].
389
390info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end).
391
392-spec info_all(rabbit_types:vhost(), rabbit_types:info_keys())
393                   -> [rabbit_types:infos()].
394
395info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end).
396
397-spec info_all(rabbit_types:vhost(), rabbit_types:info_keys(),
398                    reference(), pid())
399                   -> 'ok'.
400
401info_all(VHostPath, Items, Ref, AggregatorPid) ->
402    rabbit_control_misc:emitting_map(
403      AggregatorPid, Ref, fun(X) -> info(X, Items) end, list(VHostPath)).
404
405-spec route(rabbit_types:exchange(), rabbit_types:delivery())
406                 -> [rabbit_amqqueue:name()].
407
408route(#exchange{name = #resource{virtual_host = VHost, name = RName} = XName,
409                decorators = Decorators} = X,
410      #delivery{message = #basic_message{routing_keys = RKs}} = Delivery) ->
411    case RName of
412        <<>> ->
413            RKsSorted = lists:usort(RKs),
414            [rabbit_channel:deliver_reply(RK, Delivery) ||
415                RK <- RKsSorted, virtual_reply_queue(RK)],
416            [rabbit_misc:r(VHost, queue, RK) || RK <- RKsSorted,
417                                                not virtual_reply_queue(RK)];
418        _ ->
419            Decs = rabbit_exchange_decorator:select(route, Decorators),
420            lists:usort(route1(Delivery, Decs, {[X], XName, []}))
421    end.
422
423virtual_reply_queue(<<"amq.rabbitmq.reply-to.", _/binary>>) -> true;
424virtual_reply_queue(_)                                      -> false.
425
426route1(_, _, {[], _, QNames}) ->
427    QNames;
428route1(Delivery, Decorators,
429       {[X = #exchange{type = Type} | WorkList], SeenXs, QNames}) ->
430    ExchangeDests  = (type_to_module(Type)):route(X, Delivery),
431    DecorateDests  = process_decorators(X, Decorators, Delivery),
432    AlternateDests = process_alternate(X, ExchangeDests),
433    route1(Delivery, Decorators,
434           lists:foldl(fun process_route/2, {WorkList, SeenXs, QNames},
435                       AlternateDests ++ DecorateDests  ++ ExchangeDests)).
436
437process_alternate(X = #exchange{name = XName}, []) ->
438    case rabbit_policy:get_arg(
439           <<"alternate-exchange">>, <<"alternate-exchange">>, X) of
440        undefined -> [];
441        AName     -> [rabbit_misc:r(XName, exchange, AName)]
442    end;
443process_alternate(_X, _Results) ->
444    [].
445
446process_decorators(_, [], _) -> %% optimisation
447    [];
448process_decorators(X, Decorators, Delivery) ->
449    lists:append([Decorator:route(X, Delivery) || Decorator <- Decorators]).
450
451process_route(#resource{kind = exchange} = XName,
452              {_WorkList, XName, _QNames} = Acc) ->
453    Acc;
454process_route(#resource{kind = exchange} = XName,
455              {WorkList, #resource{kind = exchange} = SeenX, QNames}) ->
456    {cons_if_present(XName, WorkList),
457     gb_sets:from_list([SeenX, XName]), QNames};
458process_route(#resource{kind = exchange} = XName,
459              {WorkList, SeenXs, QNames} = Acc) ->
460    case gb_sets:is_element(XName, SeenXs) of
461        true  -> Acc;
462        false -> {cons_if_present(XName, WorkList),
463                  gb_sets:add_element(XName, SeenXs), QNames}
464    end;
465process_route(#resource{kind = queue} = QName,
466              {WorkList, SeenXs, QNames}) ->
467    {WorkList, SeenXs, [QName | QNames]}.
468
469cons_if_present(XName, L) ->
470    case lookup(XName) of
471        {ok, X}            -> [X | L];
472        {error, not_found} -> L
473    end.
474
475call_with_exchange(XName, Fun) ->
476    rabbit_misc:execute_mnesia_tx_with_tail(
477      fun () -> case mnesia:read({rabbit_exchange, XName}) of
478                    []  -> rabbit_misc:const({error, not_found});
479                    [X] -> Fun(X)
480                end
481      end).
482
483-spec delete
484        (name(),  'true', rabbit_types:username()) ->
485                    'ok'| rabbit_types:error('not_found' | 'in_use');
486        (name(), 'false', rabbit_types:username()) ->
487                    'ok' | rabbit_types:error('not_found').
488
489delete(XName, IfUnused, Username) ->
490    Fun = case IfUnused of
491              true  -> fun conditional_delete/2;
492              false -> fun unconditional_delete/2
493          end,
494    try
495        %% guard exchange.declare operations from failing when there's
496        %% a race condition between it and an exchange.delete.
497        %%
498        %% see rabbitmq/rabbitmq-federation#7
499        rabbit_runtime_parameters:set(XName#resource.virtual_host,
500                                      ?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT,
501                                      XName#resource.name, true, Username),
502        call_with_exchange(
503          XName,
504          fun (X) ->
505                  case Fun(X, false) of
506                      {deleted, X, Bs, Deletions} ->
507                          rabbit_binding:process_deletions(
508                            rabbit_binding:add_deletion(
509                              XName, {X, deleted, Bs}, Deletions), Username);
510                      {error, _InUseOrNotFound} = E ->
511                          rabbit_misc:const(E)
512                  end
513          end)
514    after
515        rabbit_runtime_parameters:clear(XName#resource.virtual_host,
516                                        ?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT,
517                                        XName#resource.name, Username)
518    end.
519
520-spec validate_binding
521        (rabbit_types:exchange(), rabbit_types:binding())
522        -> rabbit_types:ok_or_error({'binding_invalid', string(), [any()]}).
523
524validate_binding(X = #exchange{type = XType}, Binding) ->
525    Module = type_to_module(XType),
526    Module:validate_binding(X, Binding).
527
528-spec maybe_auto_delete
529        (rabbit_types:exchange(), boolean())
530        -> 'not_deleted' | {'deleted', rabbit_binding:deletions()}.
531
532maybe_auto_delete(#exchange{auto_delete = false}, _OnlyDurable) ->
533    not_deleted;
534maybe_auto_delete(#exchange{auto_delete = true} = X, OnlyDurable) ->
535    case conditional_delete(X, OnlyDurable) of
536        {error, in_use}             -> not_deleted;
537        {deleted, X, [], Deletions} -> {deleted, Deletions}
538    end.
539
540conditional_delete(X = #exchange{name = XName}, OnlyDurable) ->
541    case rabbit_binding:has_for_source(XName) of
542        false  -> internal_delete(X, OnlyDurable, false);
543        true   -> {error, in_use}
544    end.
545
546unconditional_delete(X, OnlyDurable) ->
547    internal_delete(X, OnlyDurable, true).
548
549internal_delete(X = #exchange{name = XName}, OnlyDurable, RemoveBindingsForSource) ->
550    ok = mnesia:delete({rabbit_exchange, XName}),
551    ok = mnesia:delete({rabbit_exchange_serial, XName}),
552    mnesia:delete({rabbit_durable_exchange, XName}),
553    Bindings = case RemoveBindingsForSource of
554        true  -> rabbit_binding:remove_for_source(XName);
555        false -> []
556    end,
557    {deleted, X, Bindings, rabbit_binding:remove_for_destination(
558                             XName, OnlyDurable)}.
559
560next_serial(XName) ->
561    Serial = peek_serial(XName, write),
562    ok = mnesia:write(rabbit_exchange_serial,
563                      #exchange_serial{name = XName, next = Serial + 1}, write),
564    Serial.
565
566-spec peek_serial(name()) -> pos_integer() | 'undefined'.
567
568peek_serial(XName) -> peek_serial(XName, read).
569
570peek_serial(XName, LockType) ->
571    case mnesia:read(rabbit_exchange_serial, XName, LockType) of
572        [#exchange_serial{next = Serial}]  -> Serial;
573        _                                  -> 1
574    end.
575
576invalid_module(T) ->
577    rabbit_log:warning("Could not find exchange type ~s.", [T]),
578    put({xtype_to_module, T}, rabbit_exchange_type_invalid),
579    rabbit_exchange_type_invalid.
580
581%% Used with atoms from records; e.g., the type is expected to exist.
582type_to_module(T) ->
583    case get({xtype_to_module, T}) of
584        undefined ->
585            case rabbit_registry:lookup_module(exchange, T) of
586                {ok, Module}       -> put({xtype_to_module, T}, Module),
587                                      Module;
588                {error, not_found} -> invalid_module(T)
589            end;
590        Module ->
591            Module
592    end.
593