1%% This Source Code Form is subject to the terms of the Mozilla Public 2%% License, v. 2.0. If a copy of the MPL was not distributed with this 3%% file, You can obtain one at https://mozilla.org/MPL/2.0/. 4%% 5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved. 6%% 7 8-module(rabbit_exchange_type_topic). 9 10-include_lib("rabbit_common/include/rabbit.hrl"). 11 12-behaviour(rabbit_exchange_type). 13 14-export([description/0, serialise_events/0, route/2]). 15-export([validate/1, validate_binding/2, 16 create/2, delete/3, policy_changed/2, add_binding/3, 17 remove_bindings/3, assert_args_equivalence/2]). 18-export([info/1, info/2]). 19 20-rabbit_boot_step({?MODULE, 21 [{description, "exchange type topic"}, 22 {mfa, {rabbit_registry, register, 23 [exchange, <<"topic">>, ?MODULE]}}, 24 {requires, rabbit_registry}, 25 {enables, kernel_ready}]}). 26 27%%---------------------------------------------------------------------------- 28 29info(_X) -> []. 30info(_X, _) -> []. 31 32description() -> 33 [{description, <<"AMQP topic exchange, as per the AMQP specification">>}]. 34 35serialise_events() -> false. 36 37%% NB: This may return duplicate results in some situations (that's ok) 38route(#exchange{name = X}, 39 #delivery{message = #basic_message{routing_keys = Routes}}) -> 40 lists:append([begin 41 Words = split_topic_key(RKey), 42 mnesia:async_dirty(fun trie_match/2, [X, Words]) 43 end || RKey <- Routes]). 44 45validate(_X) -> ok. 46validate_binding(_X, _B) -> ok. 47create(_Tx, _X) -> ok. 48 49delete(transaction, #exchange{name = X}, _Bs) -> 50 trie_remove_all_nodes(X), 51 trie_remove_all_edges(X), 52 trie_remove_all_bindings(X), 53 ok; 54delete(none, _Exchange, _Bs) -> 55 ok. 56 57policy_changed(_X1, _X2) -> ok. 58 59add_binding(transaction, _Exchange, Binding) -> 60 internal_add_binding(Binding); 61add_binding(none, _Exchange, _Binding) -> 62 ok. 63 64remove_bindings(transaction, _X, Bs) -> 65 %% See rabbit_binding:lock_route_tables for the rationale for 66 %% taking table locks. 67 case Bs of 68 [_] -> ok; 69 _ -> [mnesia:lock({table, T}, write) || 70 T <- [rabbit_topic_trie_node, 71 rabbit_topic_trie_edge, 72 rabbit_topic_trie_binding]] 73 end, 74 [case follow_down_get_path(X, split_topic_key(K)) of 75 {ok, Path = [{FinalNode, _} | _]} -> 76 trie_remove_binding(X, FinalNode, D, Args), 77 remove_path_if_empty(X, Path); 78 {error, _Node, _RestW} -> 79 %% We're trying to remove a binding that no longer exists. 80 %% That's unexpected, but shouldn't be a problem. 81 ok 82 end || #binding{source = X, key = K, destination = D, args = Args} <- Bs], 83 ok; 84remove_bindings(none, _X, _Bs) -> 85 ok. 86 87assert_args_equivalence(X, Args) -> 88 rabbit_exchange:assert_args_equivalence(X, Args). 89 90%%---------------------------------------------------------------------------- 91 92internal_add_binding(#binding{source = X, key = K, destination = D, 93 args = Args}) -> 94 FinalNode = follow_down_create(X, split_topic_key(K)), 95 trie_add_binding(X, FinalNode, D, Args), 96 ok. 97 98trie_match(X, Words) -> 99 trie_match(X, root, Words, []). 100 101trie_match(X, Node, [], ResAcc) -> 102 trie_match_part(X, Node, "#", fun trie_match_skip_any/4, [], 103 trie_bindings(X, Node) ++ ResAcc); 104trie_match(X, Node, [W | RestW] = Words, ResAcc) -> 105 lists:foldl(fun ({WArg, MatchFun, RestWArg}, Acc) -> 106 trie_match_part(X, Node, WArg, MatchFun, RestWArg, Acc) 107 end, ResAcc, [{W, fun trie_match/4, RestW}, 108 {"*", fun trie_match/4, RestW}, 109 {"#", fun trie_match_skip_any/4, Words}]). 110 111trie_match_part(X, Node, Search, MatchFun, RestW, ResAcc) -> 112 case trie_child(X, Node, Search) of 113 {ok, NextNode} -> MatchFun(X, NextNode, RestW, ResAcc); 114 error -> ResAcc 115 end. 116 117trie_match_skip_any(X, Node, [], ResAcc) -> 118 trie_match(X, Node, [], ResAcc); 119trie_match_skip_any(X, Node, [_ | RestW] = Words, ResAcc) -> 120 trie_match_skip_any(X, Node, RestW, 121 trie_match(X, Node, Words, ResAcc)). 122 123follow_down_create(X, Words) -> 124 case follow_down_last_node(X, Words) of 125 {ok, FinalNode} -> FinalNode; 126 {error, Node, RestW} -> lists:foldl( 127 fun (W, CurNode) -> 128 NewNode = new_node_id(), 129 trie_add_edge(X, CurNode, NewNode, W), 130 NewNode 131 end, Node, RestW) 132 end. 133 134follow_down_last_node(X, Words) -> 135 follow_down(X, fun (_, Node, _) -> Node end, root, Words). 136 137follow_down_get_path(X, Words) -> 138 follow_down(X, fun (W, Node, PathAcc) -> [{Node, W} | PathAcc] end, 139 [{root, none}], Words). 140 141follow_down(X, AccFun, Acc0, Words) -> 142 follow_down(X, root, AccFun, Acc0, Words). 143 144follow_down(_X, _CurNode, _AccFun, Acc, []) -> 145 {ok, Acc}; 146follow_down(X, CurNode, AccFun, Acc, Words = [W | RestW]) -> 147 case trie_child(X, CurNode, W) of 148 {ok, NextNode} -> follow_down(X, NextNode, AccFun, 149 AccFun(W, NextNode, Acc), RestW); 150 error -> {error, Acc, Words} 151 end. 152 153remove_path_if_empty(_, [{root, none}]) -> 154 ok; 155remove_path_if_empty(X, [{Node, W} | [{Parent, _} | _] = RestPath]) -> 156 case mnesia:read(rabbit_topic_trie_node, 157 #trie_node{exchange_name = X, node_id = Node}, write) of 158 [] -> trie_remove_edge(X, Parent, Node, W), 159 remove_path_if_empty(X, RestPath); 160 _ -> ok 161 end. 162 163trie_child(X, Node, Word) -> 164 case mnesia:read({rabbit_topic_trie_edge, 165 #trie_edge{exchange_name = X, 166 node_id = Node, 167 word = Word}}) of 168 [#topic_trie_edge{node_id = NextNode}] -> {ok, NextNode}; 169 [] -> error 170 end. 171 172trie_bindings(X, Node) -> 173 MatchHead = #topic_trie_binding{ 174 trie_binding = #trie_binding{exchange_name = X, 175 node_id = Node, 176 destination = '$1', 177 arguments = '_'}}, 178 mnesia:select(rabbit_topic_trie_binding, [{MatchHead, [], ['$1']}]). 179 180trie_update_node_counts(X, Node, Field, Delta) -> 181 E = case mnesia:read(rabbit_topic_trie_node, 182 #trie_node{exchange_name = X, 183 node_id = Node}, write) of 184 [] -> #topic_trie_node{trie_node = #trie_node{ 185 exchange_name = X, 186 node_id = Node}, 187 edge_count = 0, 188 binding_count = 0}; 189 [E0] -> E0 190 end, 191 case setelement(Field, E, element(Field, E) + Delta) of 192 #topic_trie_node{edge_count = 0, binding_count = 0} -> 193 ok = mnesia:delete_object(rabbit_topic_trie_node, E, write); 194 EN -> 195 ok = mnesia:write(rabbit_topic_trie_node, EN, write) 196 end. 197 198trie_add_edge(X, FromNode, ToNode, W) -> 199 trie_update_node_counts(X, FromNode, #topic_trie_node.edge_count, +1), 200 trie_edge_op(X, FromNode, ToNode, W, fun mnesia:write/3). 201 202trie_remove_edge(X, FromNode, ToNode, W) -> 203 trie_update_node_counts(X, FromNode, #topic_trie_node.edge_count, -1), 204 trie_edge_op(X, FromNode, ToNode, W, fun mnesia:delete_object/3). 205 206trie_edge_op(X, FromNode, ToNode, W, Op) -> 207 ok = Op(rabbit_topic_trie_edge, 208 #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X, 209 node_id = FromNode, 210 word = W}, 211 node_id = ToNode}, 212 write). 213 214trie_add_binding(X, Node, D, Args) -> 215 trie_update_node_counts(X, Node, #topic_trie_node.binding_count, +1), 216 trie_binding_op(X, Node, D, Args, fun mnesia:write/3). 217 218trie_remove_binding(X, Node, D, Args) -> 219 trie_update_node_counts(X, Node, #topic_trie_node.binding_count, -1), 220 trie_binding_op(X, Node, D, Args, fun mnesia:delete_object/3). 221 222trie_binding_op(X, Node, D, Args, Op) -> 223 ok = Op(rabbit_topic_trie_binding, 224 #topic_trie_binding{ 225 trie_binding = #trie_binding{exchange_name = X, 226 node_id = Node, 227 destination = D, 228 arguments = Args}}, 229 write). 230 231trie_remove_all_nodes(X) -> 232 remove_all(rabbit_topic_trie_node, 233 #topic_trie_node{trie_node = #trie_node{exchange_name = X, 234 _ = '_'}, 235 _ = '_'}). 236 237trie_remove_all_edges(X) -> 238 remove_all(rabbit_topic_trie_edge, 239 #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X, 240 _ = '_'}, 241 _ = '_'}). 242 243trie_remove_all_bindings(X) -> 244 remove_all(rabbit_topic_trie_binding, 245 #topic_trie_binding{ 246 trie_binding = #trie_binding{exchange_name = X, _ = '_'}, 247 _ = '_'}). 248 249remove_all(Table, Pattern) -> 250 lists:foreach(fun (R) -> mnesia:delete_object(Table, R, write) end, 251 mnesia:match_object(Table, Pattern, write)). 252 253new_node_id() -> 254 rabbit_guid:gen(). 255 256split_topic_key(Key) -> 257 split_topic_key(Key, [], []). 258 259split_topic_key(<<>>, [], []) -> 260 []; 261split_topic_key(<<>>, RevWordAcc, RevResAcc) -> 262 lists:reverse([lists:reverse(RevWordAcc) | RevResAcc]); 263split_topic_key(<<$., Rest/binary>>, RevWordAcc, RevResAcc) -> 264 split_topic_key(Rest, [], [lists:reverse(RevWordAcc) | RevResAcc]); 265split_topic_key(<<C:8, Rest/binary>>, RevWordAcc, RevResAcc) -> 266 split_topic_key(Rest, [C | RevWordAcc], RevResAcc). 267