1%% This Source Code Form is subject to the terms of the Mozilla Public
2%% License, v. 2.0. If a copy of the MPL was not distributed with this
3%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4%%
5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates.  All rights reserved.
6%%
7
8-module(rabbit_exchange_type_topic).
9
10-include_lib("rabbit_common/include/rabbit.hrl").
11
12-behaviour(rabbit_exchange_type).
13
14-export([description/0, serialise_events/0, route/2]).
15-export([validate/1, validate_binding/2,
16         create/2, delete/3, policy_changed/2, add_binding/3,
17         remove_bindings/3, assert_args_equivalence/2]).
18-export([info/1, info/2]).
19
20-rabbit_boot_step({?MODULE,
21                   [{description, "exchange type topic"},
22                    {mfa,         {rabbit_registry, register,
23                                   [exchange, <<"topic">>, ?MODULE]}},
24                    {requires,    rabbit_registry},
25                    {enables,     kernel_ready}]}).
26
27%%----------------------------------------------------------------------------
28
29info(_X) -> [].
30info(_X, _) -> [].
31
32description() ->
33    [{description, <<"AMQP topic exchange, as per the AMQP specification">>}].
34
35serialise_events() -> false.
36
37%% NB: This may return duplicate results in some situations (that's ok)
38route(#exchange{name = X},
39      #delivery{message = #basic_message{routing_keys = Routes}}) ->
40    lists:append([begin
41                      Words = split_topic_key(RKey),
42                      mnesia:async_dirty(fun trie_match/2, [X, Words])
43                  end || RKey <- Routes]).
44
45validate(_X) -> ok.
46validate_binding(_X, _B) -> ok.
47create(_Tx, _X) -> ok.
48
49delete(transaction, #exchange{name = X}, _Bs) ->
50    trie_remove_all_nodes(X),
51    trie_remove_all_edges(X),
52    trie_remove_all_bindings(X),
53    ok;
54delete(none, _Exchange, _Bs) ->
55    ok.
56
57policy_changed(_X1, _X2) -> ok.
58
59add_binding(transaction, _Exchange, Binding) ->
60    internal_add_binding(Binding);
61add_binding(none, _Exchange, _Binding) ->
62    ok.
63
64remove_bindings(transaction, _X, Bs) ->
65    %% See rabbit_binding:lock_route_tables for the rationale for
66    %% taking table locks.
67    case Bs of
68        [_] -> ok;
69        _   -> [mnesia:lock({table, T}, write) ||
70                   T <- [rabbit_topic_trie_node,
71                         rabbit_topic_trie_edge,
72                         rabbit_topic_trie_binding]]
73    end,
74    [case follow_down_get_path(X, split_topic_key(K)) of
75         {ok, Path = [{FinalNode, _} | _]} ->
76             trie_remove_binding(X, FinalNode, D, Args),
77             remove_path_if_empty(X, Path);
78         {error, _Node, _RestW} ->
79             %% We're trying to remove a binding that no longer exists.
80             %% That's unexpected, but shouldn't be a problem.
81             ok
82     end ||  #binding{source = X, key = K, destination = D, args = Args} <- Bs],
83    ok;
84remove_bindings(none, _X, _Bs) ->
85    ok.
86
87assert_args_equivalence(X, Args) ->
88    rabbit_exchange:assert_args_equivalence(X, Args).
89
90%%----------------------------------------------------------------------------
91
92internal_add_binding(#binding{source = X, key = K, destination = D,
93                              args = Args}) ->
94    FinalNode = follow_down_create(X, split_topic_key(K)),
95    trie_add_binding(X, FinalNode, D, Args),
96    ok.
97
98trie_match(X, Words) ->
99    trie_match(X, root, Words, []).
100
101trie_match(X, Node, [], ResAcc) ->
102    trie_match_part(X, Node, "#", fun trie_match_skip_any/4, [],
103                    trie_bindings(X, Node) ++ ResAcc);
104trie_match(X, Node, [W | RestW] = Words, ResAcc) ->
105    lists:foldl(fun ({WArg, MatchFun, RestWArg}, Acc) ->
106                        trie_match_part(X, Node, WArg, MatchFun, RestWArg, Acc)
107                end, ResAcc, [{W, fun trie_match/4, RestW},
108                              {"*", fun trie_match/4, RestW},
109                              {"#", fun trie_match_skip_any/4, Words}]).
110
111trie_match_part(X, Node, Search, MatchFun, RestW, ResAcc) ->
112    case trie_child(X, Node, Search) of
113        {ok, NextNode} -> MatchFun(X, NextNode, RestW, ResAcc);
114        error          -> ResAcc
115    end.
116
117trie_match_skip_any(X, Node, [], ResAcc) ->
118    trie_match(X, Node, [], ResAcc);
119trie_match_skip_any(X, Node, [_ | RestW] = Words, ResAcc) ->
120    trie_match_skip_any(X, Node, RestW,
121                        trie_match(X, Node, Words, ResAcc)).
122
123follow_down_create(X, Words) ->
124    case follow_down_last_node(X, Words) of
125        {ok, FinalNode}      -> FinalNode;
126        {error, Node, RestW} -> lists:foldl(
127                                  fun (W, CurNode) ->
128                                          NewNode = new_node_id(),
129                                          trie_add_edge(X, CurNode, NewNode, W),
130                                          NewNode
131                                  end, Node, RestW)
132    end.
133
134follow_down_last_node(X, Words) ->
135    follow_down(X, fun (_, Node, _) -> Node end, root, Words).
136
137follow_down_get_path(X, Words) ->
138    follow_down(X, fun (W, Node, PathAcc) -> [{Node, W} | PathAcc] end,
139                [{root, none}], Words).
140
141follow_down(X, AccFun, Acc0, Words) ->
142    follow_down(X, root, AccFun, Acc0, Words).
143
144follow_down(_X, _CurNode, _AccFun, Acc, []) ->
145    {ok, Acc};
146follow_down(X, CurNode, AccFun, Acc, Words = [W | RestW]) ->
147    case trie_child(X, CurNode, W) of
148        {ok, NextNode} -> follow_down(X, NextNode, AccFun,
149                                      AccFun(W, NextNode, Acc), RestW);
150        error          -> {error, Acc, Words}
151    end.
152
153remove_path_if_empty(_, [{root, none}]) ->
154    ok;
155remove_path_if_empty(X, [{Node, W} | [{Parent, _} | _] = RestPath]) ->
156    case mnesia:read(rabbit_topic_trie_node,
157                     #trie_node{exchange_name = X, node_id = Node}, write) of
158        [] -> trie_remove_edge(X, Parent, Node, W),
159              remove_path_if_empty(X, RestPath);
160        _  -> ok
161    end.
162
163trie_child(X, Node, Word) ->
164    case mnesia:read({rabbit_topic_trie_edge,
165                      #trie_edge{exchange_name = X,
166                                 node_id       = Node,
167                                 word          = Word}}) of
168        [#topic_trie_edge{node_id = NextNode}] -> {ok, NextNode};
169        []                                     -> error
170    end.
171
172trie_bindings(X, Node) ->
173    MatchHead = #topic_trie_binding{
174      trie_binding = #trie_binding{exchange_name = X,
175                                   node_id       = Node,
176                                   destination   = '$1',
177                                   arguments     = '_'}},
178    mnesia:select(rabbit_topic_trie_binding, [{MatchHead, [], ['$1']}]).
179
180trie_update_node_counts(X, Node, Field, Delta) ->
181    E = case mnesia:read(rabbit_topic_trie_node,
182                         #trie_node{exchange_name = X,
183                                    node_id       = Node}, write) of
184            []   -> #topic_trie_node{trie_node = #trie_node{
185                                       exchange_name = X,
186                                       node_id       = Node},
187                                     edge_count    = 0,
188                                     binding_count = 0};
189            [E0] -> E0
190        end,
191    case setelement(Field, E, element(Field, E) + Delta) of
192        #topic_trie_node{edge_count = 0, binding_count = 0} ->
193            ok = mnesia:delete_object(rabbit_topic_trie_node, E, write);
194        EN ->
195            ok = mnesia:write(rabbit_topic_trie_node, EN, write)
196    end.
197
198trie_add_edge(X, FromNode, ToNode, W) ->
199    trie_update_node_counts(X, FromNode, #topic_trie_node.edge_count, +1),
200    trie_edge_op(X, FromNode, ToNode, W, fun mnesia:write/3).
201
202trie_remove_edge(X, FromNode, ToNode, W) ->
203    trie_update_node_counts(X, FromNode, #topic_trie_node.edge_count, -1),
204    trie_edge_op(X, FromNode, ToNode, W, fun mnesia:delete_object/3).
205
206trie_edge_op(X, FromNode, ToNode, W, Op) ->
207    ok = Op(rabbit_topic_trie_edge,
208            #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X,
209                                                    node_id       = FromNode,
210                                                    word          = W},
211                             node_id   = ToNode},
212            write).
213
214trie_add_binding(X, Node, D, Args) ->
215    trie_update_node_counts(X, Node, #topic_trie_node.binding_count, +1),
216    trie_binding_op(X, Node, D, Args, fun mnesia:write/3).
217
218trie_remove_binding(X, Node, D, Args) ->
219    trie_update_node_counts(X, Node, #topic_trie_node.binding_count, -1),
220    trie_binding_op(X, Node, D, Args, fun mnesia:delete_object/3).
221
222trie_binding_op(X, Node, D, Args, Op) ->
223    ok = Op(rabbit_topic_trie_binding,
224            #topic_trie_binding{
225              trie_binding = #trie_binding{exchange_name = X,
226                                           node_id       = Node,
227                                           destination   = D,
228                                           arguments     = Args}},
229            write).
230
231trie_remove_all_nodes(X) ->
232    remove_all(rabbit_topic_trie_node,
233               #topic_trie_node{trie_node = #trie_node{exchange_name = X,
234                                                       _             = '_'},
235                                _         = '_'}).
236
237trie_remove_all_edges(X) ->
238    remove_all(rabbit_topic_trie_edge,
239               #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X,
240                                                       _             = '_'},
241                                _         = '_'}).
242
243trie_remove_all_bindings(X) ->
244    remove_all(rabbit_topic_trie_binding,
245               #topic_trie_binding{
246                 trie_binding = #trie_binding{exchange_name = X, _ = '_'},
247                 _            = '_'}).
248
249remove_all(Table, Pattern) ->
250    lists:foreach(fun (R) -> mnesia:delete_object(Table, R, write) end,
251                  mnesia:match_object(Table, Pattern, write)).
252
253new_node_id() ->
254    rabbit_guid:gen().
255
256split_topic_key(Key) ->
257    split_topic_key(Key, [], []).
258
259split_topic_key(<<>>, [], []) ->
260    [];
261split_topic_key(<<>>, RevWordAcc, RevResAcc) ->
262    lists:reverse([lists:reverse(RevWordAcc) | RevResAcc]);
263split_topic_key(<<$., Rest/binary>>, RevWordAcc, RevResAcc) ->
264    split_topic_key(Rest, [], [lists:reverse(RevWordAcc) | RevResAcc]);
265split_topic_key(<<C:8, Rest/binary>>, RevWordAcc, RevResAcc) ->
266    split_topic_key(Rest, [C | RevWordAcc], RevResAcc).
267