1%% This Source Code Form is subject to the terms of the Mozilla Public
2%% License, v. 2.0. If a copy of the MPL was not distributed with this
3%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4%%
5%% Copyright (c) 2020-2021 VMware, Inc. or its affiliates.  All rights reserved.
6%%
7
8-module(rabbit_channel_tracking).
9
10%% Abstracts away how tracked connection records are stored
11%% and queried.
12%%
13%% See also:
14%%
15%%  * rabbit_channel_tracking_handler
16%%  * rabbit_reader
17%%  * rabbit_event
18-behaviour(rabbit_tracking).
19
20-export([boot/0,
21         update_tracked/1,
22         handle_cast/1,
23         register_tracked/1,
24         unregister_tracked/1,
25         count_tracked_items_in/1,
26         clear_tracking_tables/0,
27         shutdown_tracked_items/2]).
28
29-export([list/0, list_of_user/1, list_on_node/1,
30         tracked_channel_table_name_for/1,
31         tracked_channel_per_user_table_name_for/1,
32         get_all_tracked_channel_table_names_for_node/1,
33         delete_tracked_channel_user_entry/1]).
34
35-include_lib("rabbit_common/include/rabbit.hrl").
36
37-import(rabbit_misc, [pget/2]).
38
39%%
40%% API
41%%
42
43%% Sets up and resets channel tracking tables for this node.
44-spec boot() -> ok.
45
46boot() ->
47    ensure_tracked_channels_table_for_this_node(),
48    rabbit_log:info("Setting up a table for channel tracking on this node: ~p",
49                    [tracked_channel_table_name_for(node())]),
50    ensure_per_user_tracked_channels_table_for_node(),
51    rabbit_log:info("Setting up a table for channel tracking on this node: ~p",
52                    [tracked_channel_per_user_table_name_for(node())]),
53    clear_tracking_tables(),
54    ok.
55
56-spec update_tracked(term()) -> ok.
57
58update_tracked(Event) ->
59    spawn(?MODULE, handle_cast, [Event]),
60    ok.
61
62%% Asynchronously handle update events
63-spec handle_cast(term()) -> ok.
64
65handle_cast({channel_created, Details}) ->
66    ThisNode = node(),
67    case node(pget(pid, Details)) of
68        ThisNode ->
69            TrackedCh = #tracked_channel{id = TrackedChId} =
70                tracked_channel_from_channel_created_event(Details),
71            try
72                register_tracked(TrackedCh)
73            catch
74                error:{no_exists, _} ->
75                    Msg = "Could not register channel ~p for tracking, "
76                          "its table is not ready yet or the channel terminated prematurely",
77                    rabbit_log_connection:warning(Msg, [TrackedChId]),
78                    ok;
79                error:Err ->
80                    Msg = "Could not register channel ~p for tracking: ~p",
81                    rabbit_log_connection:warning(Msg, [TrackedChId, Err]),
82                    ok
83            end;
84        _OtherNode ->
85            %% ignore
86            ok
87    end;
88handle_cast({channel_closed, Details}) ->
89    %% channel has terminated, unregister if local
90    case get_tracked_channel_by_pid(pget(pid, Details)) of
91        [#tracked_channel{name = Name}] ->
92            unregister_tracked(rabbit_tracking:id(node(), Name));
93        _Other -> ok
94    end;
95handle_cast({connection_closed, ConnDetails}) ->
96    ThisNode= node(),
97    ConnPid = pget(pid, ConnDetails),
98
99    case pget(node, ConnDetails) of
100        ThisNode ->
101            TrackedChs = get_tracked_channels_by_connection_pid(ConnPid),
102            rabbit_log_channel:debug(
103                "Closing all channels from connection '~s' "
104                "because it has been closed", [pget(name, ConnDetails)]),
105            %% Shutting down channels will take care of unregistering the
106            %% corresponding tracking.
107            shutdown_tracked_items(TrackedChs, undefined),
108            ok;
109        _DifferentNode ->
110            ok
111    end;
112handle_cast({user_deleted, Details}) ->
113    Username = pget(name, Details),
114    %% Schedule user entry deletion, allowing time for connections to close
115    _ = timer:apply_after(?TRACKING_EXECUTION_TIMEOUT, ?MODULE,
116            delete_tracked_channel_user_entry, [Username]),
117    ok;
118handle_cast({node_deleted, Details}) ->
119    Node = pget(node, Details),
120    rabbit_log_channel:info(
121        "Node '~s' was removed from the cluster, deleting"
122        " its channel tracking tables...", [Node]),
123    delete_tracked_channels_table_for_node(Node),
124    delete_per_user_tracked_channels_table_for_node(Node).
125
126-spec register_tracked(rabbit_types:tracked_channel()) -> ok.
127-dialyzer([{nowarn_function, [register_tracked/1]}, race_conditions]).
128
129register_tracked(TrackedCh =
130  #tracked_channel{node = Node, name = Name, username = Username}) ->
131    ChId = rabbit_tracking:id(Node, Name),
132    TableName = tracked_channel_table_name_for(Node),
133    PerUserChTableName = tracked_channel_per_user_table_name_for(Node),
134    %% upsert
135    case mnesia:dirty_read(TableName, ChId) of
136      []    ->
137          mnesia:dirty_write(TableName, TrackedCh),
138          mnesia:dirty_update_counter(PerUserChTableName, Username, 1);
139      [#tracked_channel{}] ->
140          ok
141    end,
142    ok.
143
144-spec unregister_tracked(rabbit_types:tracked_channel_id()) -> ok.
145
146unregister_tracked(ChId = {Node, _Name}) when Node =:= node() ->
147    TableName = tracked_channel_table_name_for(Node),
148    PerUserChannelTableName = tracked_channel_per_user_table_name_for(Node),
149    case mnesia:dirty_read(TableName, ChId) of
150        []     -> ok;
151        [#tracked_channel{username = Username}] ->
152            mnesia:dirty_update_counter(PerUserChannelTableName, Username, -1),
153            mnesia:dirty_delete(TableName, ChId)
154    end.
155
156-spec count_tracked_items_in({atom(), rabbit_types:username()}) -> non_neg_integer().
157
158count_tracked_items_in({user, Username}) ->
159    rabbit_tracking:count_tracked_items(
160        fun tracked_channel_per_user_table_name_for/1,
161        #tracked_channel_per_user.channel_count, Username,
162        "channels in vhost").
163
164-spec clear_tracking_tables() -> ok.
165
166clear_tracking_tables() ->
167    clear_tracked_channel_tables_for_this_node(),
168    ok.
169
170-spec shutdown_tracked_items(list(), term()) -> ok.
171
172shutdown_tracked_items(TrackedItems, _Args) ->
173    close_channels(TrackedItems).
174
175%% helper functions
176-spec list() -> [rabbit_types:tracked_channel()].
177
178list() ->
179    lists:foldl(
180      fun (Node, Acc) ->
181              Tab = tracked_channel_table_name_for(Node),
182              try
183                  Acc ++
184                  mnesia:dirty_match_object(Tab, #tracked_channel{_ = '_'})
185              catch
186                  exit:{aborted, {no_exists, [Tab, _]}} ->
187                      %% The table might not exist yet (or is already gone)
188                      %% between the time rabbit_nodes:all_running() runs and
189                      %% returns a specific node, and
190                      %% mnesia:dirty_match_object() is called for that node's
191                      %% table.
192                      Acc
193              end
194      end, [], rabbit_nodes:all_running()).
195
196-spec list_of_user(rabbit_types:username()) -> [rabbit_types:tracked_channel()].
197
198list_of_user(Username) ->
199    rabbit_tracking:match_tracked_items(
200        fun tracked_channel_table_name_for/1,
201        #tracked_channel{username = Username, _ = '_'}).
202
203-spec list_on_node(node()) -> [rabbit_types:tracked_channel()].
204
205list_on_node(Node) ->
206    try mnesia:dirty_match_object(
207          tracked_channel_table_name_for(Node),
208          #tracked_channel{_ = '_'})
209    catch exit:{aborted, {no_exists, _}} -> []
210    end.
211
212-spec tracked_channel_table_name_for(node()) -> atom().
213
214tracked_channel_table_name_for(Node) ->
215    list_to_atom(rabbit_misc:format("tracked_channel_on_node_~s", [Node])).
216
217-spec tracked_channel_per_user_table_name_for(node()) -> atom().
218
219tracked_channel_per_user_table_name_for(Node) ->
220    list_to_atom(rabbit_misc:format(
221        "tracked_channel_table_per_user_on_node_~s", [Node])).
222
223%% internal
224ensure_tracked_channels_table_for_this_node() ->
225    ensure_tracked_channels_table_for_node(node()).
226
227ensure_per_user_tracked_channels_table_for_node() ->
228    ensure_per_user_tracked_channels_table_for_node(node()).
229
230%% Create tables
231ensure_tracked_channels_table_for_node(Node) ->
232    TableName = tracked_channel_table_name_for(Node),
233    case mnesia:create_table(TableName, [{record_name, tracked_channel},
234                                         {attributes, record_info(fields, tracked_channel)}]) of
235        {atomic, ok}                   -> ok;
236        {aborted, {already_exists, _}} -> ok;
237        {aborted, Error}               ->
238            rabbit_log:error("Failed to create a tracked channel table for node ~p: ~p", [Node, Error]),
239            ok
240    end.
241
242ensure_per_user_tracked_channels_table_for_node(Node) ->
243    TableName = tracked_channel_per_user_table_name_for(Node),
244    case mnesia:create_table(TableName, [{record_name, tracked_channel_per_user},
245                                         {attributes, record_info(fields, tracked_channel_per_user)}]) of
246        {atomic, ok}                   -> ok;
247        {aborted, {already_exists, _}} -> ok;
248        {aborted, Error}               ->
249            rabbit_log:error("Failed to create a per-user tracked channel table for node ~p: ~p", [Node, Error]),
250            ok
251    end.
252
253clear_tracked_channel_tables_for_this_node() ->
254    [rabbit_tracking:clear_tracking_table(T)
255        || T <- get_all_tracked_channel_table_names_for_node(node())].
256
257delete_tracked_channels_table_for_node(Node) ->
258    TableName = tracked_channel_table_name_for(Node),
259    rabbit_tracking:delete_tracking_table(TableName, Node, "tracked channel").
260
261delete_per_user_tracked_channels_table_for_node(Node) ->
262    TableName = tracked_channel_per_user_table_name_for(Node),
263    rabbit_tracking:delete_tracking_table(TableName, Node,
264        "per-user tracked channels").
265
266get_all_tracked_channel_table_names_for_node(Node) ->
267    [tracked_channel_table_name_for(Node),
268        tracked_channel_per_user_table_name_for(Node)].
269
270get_tracked_channels_by_connection_pid(ConnPid) ->
271    rabbit_tracking:match_tracked_items(
272        fun tracked_channel_table_name_for/1,
273        #tracked_channel{connection = ConnPid, _ = '_'}).
274
275get_tracked_channel_by_pid(ChPid) ->
276    rabbit_tracking:match_tracked_items(
277        fun tracked_channel_table_name_for/1,
278        #tracked_channel{pid = ChPid, _ = '_'}).
279
280delete_tracked_channel_user_entry(Username) ->
281    rabbit_tracking:delete_tracked_entry(
282        {rabbit_auth_backend_internal, exists, [Username]},
283        fun tracked_channel_per_user_table_name_for/1,
284        Username).
285
286tracked_channel_from_channel_created_event(ChannelDetails) ->
287    Node = node(ChPid = pget(pid, ChannelDetails)),
288    Name = pget(name, ChannelDetails),
289    #tracked_channel{
290        id    = rabbit_tracking:id(Node, Name),
291        name  = Name,
292        node  = Node,
293        vhost = pget(vhost, ChannelDetails),
294        pid   = ChPid,
295        connection = pget(connection, ChannelDetails),
296        username   = pget(user, ChannelDetails)}.
297
298close_channels(TrackedChannels = [#tracked_channel{}|_]) ->
299    [rabbit_channel:shutdown(ChPid)
300        || #tracked_channel{pid = ChPid} <- TrackedChannels],
301    ok;
302close_channels(_TrackedChannels = []) -> ok.
303