1%% This Source Code Form is subject to the terms of the Mozilla Public 2%% License, v. 2.0. If a copy of the MPL was not distributed with this 3%% file, You can obtain one at https://mozilla.org/MPL/2.0/. 4%% 5%% Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved. 6%% 7 8-module(rabbit_channel_tracking). 9 10%% Abstracts away how tracked connection records are stored 11%% and queried. 12%% 13%% See also: 14%% 15%% * rabbit_channel_tracking_handler 16%% * rabbit_reader 17%% * rabbit_event 18-behaviour(rabbit_tracking). 19 20-export([boot/0, 21 update_tracked/1, 22 handle_cast/1, 23 register_tracked/1, 24 unregister_tracked/1, 25 count_tracked_items_in/1, 26 clear_tracking_tables/0, 27 shutdown_tracked_items/2]). 28 29-export([list/0, list_of_user/1, list_on_node/1, 30 tracked_channel_table_name_for/1, 31 tracked_channel_per_user_table_name_for/1, 32 get_all_tracked_channel_table_names_for_node/1, 33 delete_tracked_channel_user_entry/1]). 34 35-include_lib("rabbit_common/include/rabbit.hrl"). 36 37-import(rabbit_misc, [pget/2]). 38 39%% 40%% API 41%% 42 43%% Sets up and resets channel tracking tables for this node. 44-spec boot() -> ok. 45 46boot() -> 47 ensure_tracked_channels_table_for_this_node(), 48 rabbit_log:info("Setting up a table for channel tracking on this node: ~p", 49 [tracked_channel_table_name_for(node())]), 50 ensure_per_user_tracked_channels_table_for_node(), 51 rabbit_log:info("Setting up a table for channel tracking on this node: ~p", 52 [tracked_channel_per_user_table_name_for(node())]), 53 clear_tracking_tables(), 54 ok. 55 56-spec update_tracked(term()) -> ok. 57 58update_tracked(Event) -> 59 spawn(?MODULE, handle_cast, [Event]), 60 ok. 61 62%% Asynchronously handle update events 63-spec handle_cast(term()) -> ok. 64 65handle_cast({channel_created, Details}) -> 66 ThisNode = node(), 67 case node(pget(pid, Details)) of 68 ThisNode -> 69 TrackedCh = #tracked_channel{id = TrackedChId} = 70 tracked_channel_from_channel_created_event(Details), 71 try 72 register_tracked(TrackedCh) 73 catch 74 error:{no_exists, _} -> 75 Msg = "Could not register channel ~p for tracking, " 76 "its table is not ready yet or the channel terminated prematurely", 77 rabbit_log_connection:warning(Msg, [TrackedChId]), 78 ok; 79 error:Err -> 80 Msg = "Could not register channel ~p for tracking: ~p", 81 rabbit_log_connection:warning(Msg, [TrackedChId, Err]), 82 ok 83 end; 84 _OtherNode -> 85 %% ignore 86 ok 87 end; 88handle_cast({channel_closed, Details}) -> 89 %% channel has terminated, unregister if local 90 case get_tracked_channel_by_pid(pget(pid, Details)) of 91 [#tracked_channel{name = Name}] -> 92 unregister_tracked(rabbit_tracking:id(node(), Name)); 93 _Other -> ok 94 end; 95handle_cast({connection_closed, ConnDetails}) -> 96 ThisNode= node(), 97 ConnPid = pget(pid, ConnDetails), 98 99 case pget(node, ConnDetails) of 100 ThisNode -> 101 TrackedChs = get_tracked_channels_by_connection_pid(ConnPid), 102 rabbit_log_channel:debug( 103 "Closing all channels from connection '~s' " 104 "because it has been closed", [pget(name, ConnDetails)]), 105 %% Shutting down channels will take care of unregistering the 106 %% corresponding tracking. 107 shutdown_tracked_items(TrackedChs, undefined), 108 ok; 109 _DifferentNode -> 110 ok 111 end; 112handle_cast({user_deleted, Details}) -> 113 Username = pget(name, Details), 114 %% Schedule user entry deletion, allowing time for connections to close 115 _ = timer:apply_after(?TRACKING_EXECUTION_TIMEOUT, ?MODULE, 116 delete_tracked_channel_user_entry, [Username]), 117 ok; 118handle_cast({node_deleted, Details}) -> 119 Node = pget(node, Details), 120 rabbit_log_channel:info( 121 "Node '~s' was removed from the cluster, deleting" 122 " its channel tracking tables...", [Node]), 123 delete_tracked_channels_table_for_node(Node), 124 delete_per_user_tracked_channels_table_for_node(Node). 125 126-spec register_tracked(rabbit_types:tracked_channel()) -> ok. 127-dialyzer([{nowarn_function, [register_tracked/1]}, race_conditions]). 128 129register_tracked(TrackedCh = 130 #tracked_channel{node = Node, name = Name, username = Username}) -> 131 ChId = rabbit_tracking:id(Node, Name), 132 TableName = tracked_channel_table_name_for(Node), 133 PerUserChTableName = tracked_channel_per_user_table_name_for(Node), 134 %% upsert 135 case mnesia:dirty_read(TableName, ChId) of 136 [] -> 137 mnesia:dirty_write(TableName, TrackedCh), 138 mnesia:dirty_update_counter(PerUserChTableName, Username, 1); 139 [#tracked_channel{}] -> 140 ok 141 end, 142 ok. 143 144-spec unregister_tracked(rabbit_types:tracked_channel_id()) -> ok. 145 146unregister_tracked(ChId = {Node, _Name}) when Node =:= node() -> 147 TableName = tracked_channel_table_name_for(Node), 148 PerUserChannelTableName = tracked_channel_per_user_table_name_for(Node), 149 case mnesia:dirty_read(TableName, ChId) of 150 [] -> ok; 151 [#tracked_channel{username = Username}] -> 152 mnesia:dirty_update_counter(PerUserChannelTableName, Username, -1), 153 mnesia:dirty_delete(TableName, ChId) 154 end. 155 156-spec count_tracked_items_in({atom(), rabbit_types:username()}) -> non_neg_integer(). 157 158count_tracked_items_in({user, Username}) -> 159 rabbit_tracking:count_tracked_items( 160 fun tracked_channel_per_user_table_name_for/1, 161 #tracked_channel_per_user.channel_count, Username, 162 "channels in vhost"). 163 164-spec clear_tracking_tables() -> ok. 165 166clear_tracking_tables() -> 167 clear_tracked_channel_tables_for_this_node(), 168 ok. 169 170-spec shutdown_tracked_items(list(), term()) -> ok. 171 172shutdown_tracked_items(TrackedItems, _Args) -> 173 close_channels(TrackedItems). 174 175%% helper functions 176-spec list() -> [rabbit_types:tracked_channel()]. 177 178list() -> 179 lists:foldl( 180 fun (Node, Acc) -> 181 Tab = tracked_channel_table_name_for(Node), 182 try 183 Acc ++ 184 mnesia:dirty_match_object(Tab, #tracked_channel{_ = '_'}) 185 catch 186 exit:{aborted, {no_exists, [Tab, _]}} -> 187 %% The table might not exist yet (or is already gone) 188 %% between the time rabbit_nodes:all_running() runs and 189 %% returns a specific node, and 190 %% mnesia:dirty_match_object() is called for that node's 191 %% table. 192 Acc 193 end 194 end, [], rabbit_nodes:all_running()). 195 196-spec list_of_user(rabbit_types:username()) -> [rabbit_types:tracked_channel()]. 197 198list_of_user(Username) -> 199 rabbit_tracking:match_tracked_items( 200 fun tracked_channel_table_name_for/1, 201 #tracked_channel{username = Username, _ = '_'}). 202 203-spec list_on_node(node()) -> [rabbit_types:tracked_channel()]. 204 205list_on_node(Node) -> 206 try mnesia:dirty_match_object( 207 tracked_channel_table_name_for(Node), 208 #tracked_channel{_ = '_'}) 209 catch exit:{aborted, {no_exists, _}} -> [] 210 end. 211 212-spec tracked_channel_table_name_for(node()) -> atom(). 213 214tracked_channel_table_name_for(Node) -> 215 list_to_atom(rabbit_misc:format("tracked_channel_on_node_~s", [Node])). 216 217-spec tracked_channel_per_user_table_name_for(node()) -> atom(). 218 219tracked_channel_per_user_table_name_for(Node) -> 220 list_to_atom(rabbit_misc:format( 221 "tracked_channel_table_per_user_on_node_~s", [Node])). 222 223%% internal 224ensure_tracked_channels_table_for_this_node() -> 225 ensure_tracked_channels_table_for_node(node()). 226 227ensure_per_user_tracked_channels_table_for_node() -> 228 ensure_per_user_tracked_channels_table_for_node(node()). 229 230%% Create tables 231ensure_tracked_channels_table_for_node(Node) -> 232 TableName = tracked_channel_table_name_for(Node), 233 case mnesia:create_table(TableName, [{record_name, tracked_channel}, 234 {attributes, record_info(fields, tracked_channel)}]) of 235 {atomic, ok} -> ok; 236 {aborted, {already_exists, _}} -> ok; 237 {aborted, Error} -> 238 rabbit_log:error("Failed to create a tracked channel table for node ~p: ~p", [Node, Error]), 239 ok 240 end. 241 242ensure_per_user_tracked_channels_table_for_node(Node) -> 243 TableName = tracked_channel_per_user_table_name_for(Node), 244 case mnesia:create_table(TableName, [{record_name, tracked_channel_per_user}, 245 {attributes, record_info(fields, tracked_channel_per_user)}]) of 246 {atomic, ok} -> ok; 247 {aborted, {already_exists, _}} -> ok; 248 {aborted, Error} -> 249 rabbit_log:error("Failed to create a per-user tracked channel table for node ~p: ~p", [Node, Error]), 250 ok 251 end. 252 253clear_tracked_channel_tables_for_this_node() -> 254 [rabbit_tracking:clear_tracking_table(T) 255 || T <- get_all_tracked_channel_table_names_for_node(node())]. 256 257delete_tracked_channels_table_for_node(Node) -> 258 TableName = tracked_channel_table_name_for(Node), 259 rabbit_tracking:delete_tracking_table(TableName, Node, "tracked channel"). 260 261delete_per_user_tracked_channels_table_for_node(Node) -> 262 TableName = tracked_channel_per_user_table_name_for(Node), 263 rabbit_tracking:delete_tracking_table(TableName, Node, 264 "per-user tracked channels"). 265 266get_all_tracked_channel_table_names_for_node(Node) -> 267 [tracked_channel_table_name_for(Node), 268 tracked_channel_per_user_table_name_for(Node)]. 269 270get_tracked_channels_by_connection_pid(ConnPid) -> 271 rabbit_tracking:match_tracked_items( 272 fun tracked_channel_table_name_for/1, 273 #tracked_channel{connection = ConnPid, _ = '_'}). 274 275get_tracked_channel_by_pid(ChPid) -> 276 rabbit_tracking:match_tracked_items( 277 fun tracked_channel_table_name_for/1, 278 #tracked_channel{pid = ChPid, _ = '_'}). 279 280delete_tracked_channel_user_entry(Username) -> 281 rabbit_tracking:delete_tracked_entry( 282 {rabbit_auth_backend_internal, exists, [Username]}, 283 fun tracked_channel_per_user_table_name_for/1, 284 Username). 285 286tracked_channel_from_channel_created_event(ChannelDetails) -> 287 Node = node(ChPid = pget(pid, ChannelDetails)), 288 Name = pget(name, ChannelDetails), 289 #tracked_channel{ 290 id = rabbit_tracking:id(Node, Name), 291 name = Name, 292 node = Node, 293 vhost = pget(vhost, ChannelDetails), 294 pid = ChPid, 295 connection = pget(connection, ChannelDetails), 296 username = pget(user, ChannelDetails)}. 297 298close_channels(TrackedChannels = [#tracked_channel{}|_]) -> 299 [rabbit_channel:shutdown(ChPid) 300 || #tracked_channel{pid = ChPid} <- TrackedChannels], 301 ok; 302close_channels(_TrackedChannels = []) -> ok. 303