1%% This Source Code Form is subject to the terms of the Mozilla Public
2%% License, v. 2.0. If a copy of the MPL was not distributed with this
3%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4%%
5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates.  All rights reserved.
6%%
7
8-module(rabbit_core_metrics).
9
10-include("rabbit_core_metrics.hrl").
11
12-export([create_table/1]).
13-export([init/0]).
14-export([terminate/0]).
15
16-export([connection_created/2,
17         connection_closed/1,
18         connection_stats/2,
19         connection_stats/4]).
20
21-export([channel_created/2,
22         channel_closed/1,
23         channel_stats/2,
24         channel_stats/3,
25         channel_stats/4,
26         channel_queue_down/1,
27         channel_queue_exchange_down/1,
28         channel_exchange_down/1]).
29
30-export([consumer_created/9,
31         consumer_updated/9,
32         consumer_deleted/3]).
33
34-export([queue_stats/2,
35         queue_stats/5,
36         queue_declared/1,
37         queue_created/1,
38         queue_deleted/1,
39         queues_deleted/1]).
40
41-export([node_stats/2]).
42
43-export([node_node_stats/2]).
44
45-export([gen_server2_stats/2,
46         gen_server2_deleted/1,
47         get_gen_server2_stats/1]).
48
49-export([delete/2]).
50
51-export([auth_attempt_failed/3,
52         auth_attempt_succeeded/3,
53         reset_auth_attempt_metrics/0,
54         get_auth_attempts/0,
55         get_auth_attempts_by_source/0]).
56
57%%----------------------------------------------------------------------------
58%% Types
59%%----------------------------------------------------------------------------
60-type(channel_stats_id() :: pid() |
61			    {pid(),
62			     {rabbit_amqqueue:name(), rabbit_exchange:name()}} |
63			    {pid(), rabbit_amqqueue:name()} |
64			    {pid(), rabbit_exchange:name()}).
65
66-type(channel_stats_type() :: queue_exchange_stats | queue_stats |
67			      exchange_stats | reductions).
68
69-type(activity_status() :: up | single_active | waiting | suspected_down).
70%%----------------------------------------------------------------------------
71%% Specs
72%%----------------------------------------------------------------------------
73-spec init() -> ok.
74-spec connection_created(pid(), rabbit_types:infos()) -> ok.
75-spec connection_closed(pid()) -> ok.
76-spec connection_stats(pid(), rabbit_types:infos()) -> ok.
77-spec connection_stats(pid(), integer(), integer(), integer()) -> ok.
78-spec channel_created(pid(), rabbit_types:infos()) -> ok.
79-spec channel_closed(pid()) -> ok.
80-spec channel_stats(pid(), rabbit_types:infos()) -> ok.
81-spec channel_stats(channel_stats_type(), channel_stats_id(),
82                    rabbit_types:infos() | integer()) -> ok.
83-spec channel_queue_down({pid(), rabbit_amqqueue:name()}) -> ok.
84-spec channel_queue_exchange_down({pid(), {rabbit_amqqueue:name(),
85                                   rabbit_exchange:name()}}) -> ok.
86-spec channel_exchange_down({pid(), rabbit_exchange:name()}) -> ok.
87-spec consumer_created(pid(), binary(), boolean(), boolean(),
88                       rabbit_amqqueue:name(), integer(), boolean(), activity_status(), list()) -> ok.
89-spec consumer_updated(pid(), binary(), boolean(), boolean(),
90                       rabbit_amqqueue:name(), integer(), boolean(), activity_status(), list()) -> ok.
91-spec consumer_deleted(pid(), binary(), rabbit_amqqueue:name()) -> ok.
92-spec queue_stats(rabbit_amqqueue:name(), rabbit_types:infos()) -> ok.
93-spec queue_stats(rabbit_amqqueue:name(), integer(), integer(), integer(),
94                  integer()) -> ok.
95-spec node_stats(atom(), rabbit_types:infos()) -> ok.
96-spec node_node_stats({node(), node()}, rabbit_types:infos()) -> ok.
97-spec gen_server2_stats(pid(), integer()) -> ok.
98-spec gen_server2_deleted(pid()) -> ok.
99-spec get_gen_server2_stats(pid()) -> integer() | 'not_found'.
100-spec delete(atom(), any()) -> ok.
101%%----------------------------------------------------------------------------
102%% Storage of the raw metrics in RabbitMQ core. All the processing of stats
103%% is done by the management plugin.
104%%----------------------------------------------------------------------------
105%%----------------------------------------------------------------------------
106%% API
107%%----------------------------------------------------------------------------
108
109create_table({Table, Type}) ->
110   ets:new(Table, [Type, public, named_table, {write_concurrency, true},
111    {read_concurrency, true}]).
112
113init() ->
114  _ = [create_table({Table, Type})
115         || {Table, Type} <- ?CORE_TABLES ++ ?CORE_EXTRA_TABLES],
116    ok.
117
118terminate() ->
119    [ets:delete(Table)
120     || {Table, _Type} <- ?CORE_TABLES ++ ?CORE_EXTRA_TABLES],
121    ok.
122
123connection_created(Pid, Infos) ->
124    ets:insert(connection_created, {Pid, Infos}),
125    ets:update_counter(connection_churn_metrics, node(), {2, 1},
126                       ?CONNECTION_CHURN_METRICS),
127    ok.
128
129connection_closed(Pid) ->
130    ets:delete(connection_created, Pid),
131    ets:delete(connection_metrics, Pid),
132    %% Delete marker
133    ets:update_element(connection_coarse_metrics, Pid, {5, 1}),
134    ets:update_counter(connection_churn_metrics, node(), {3, 1},
135                       ?CONNECTION_CHURN_METRICS),
136    ok.
137
138connection_stats(Pid, Infos) ->
139    ets:insert(connection_metrics, {Pid, Infos}),
140    ok.
141
142connection_stats(Pid, Recv_oct, Send_oct, Reductions) ->
143    %% Includes delete marker
144    ets:insert(connection_coarse_metrics, {Pid, Recv_oct, Send_oct, Reductions, 0}),
145    ok.
146
147channel_created(Pid, Infos) ->
148    ets:insert(channel_created, {Pid, Infos}),
149    ets:update_counter(connection_churn_metrics, node(), {4, 1},
150                       ?CONNECTION_CHURN_METRICS),
151    ok.
152
153channel_closed(Pid) ->
154    ets:delete(channel_created, Pid),
155    ets:delete(channel_metrics, Pid),
156    ets:delete(channel_process_metrics, Pid),
157    ets:update_counter(connection_churn_metrics, node(), {5, 1},
158                       ?CONNECTION_CHURN_METRICS),
159    ok.
160
161channel_stats(Pid, Infos) ->
162    ets:insert(channel_metrics, {Pid, Infos}),
163    ok.
164
165channel_stats(reductions, Id, Value) ->
166    ets:insert(channel_process_metrics, {Id, Value}),
167    ok.
168
169channel_stats(exchange_stats, publish, Id, Value) ->
170    %% Includes delete marker
171    _ = ets:update_counter(channel_exchange_metrics, Id, {2, Value}, {Id, 0, 0, 0, 0, 0}),
172    ok;
173channel_stats(exchange_stats, confirm, Id, Value) ->
174    %% Includes delete marker
175    _ = ets:update_counter(channel_exchange_metrics, Id, {3, Value}, {Id, 0, 0, 0, 0, 0}),
176    ok;
177channel_stats(exchange_stats, return_unroutable, Id, Value) ->
178    %% Includes delete marker
179    _ = ets:update_counter(channel_exchange_metrics, Id, {4, Value}, {Id, 0, 0, 0, 0, 0}),
180    ok;
181channel_stats(exchange_stats, drop_unroutable, Id, Value) ->
182    %% Includes delete marker
183    _ = ets:update_counter(channel_exchange_metrics, Id, {5, Value}, {Id, 0, 0, 0, 0, 0}),
184    ok;
185channel_stats(queue_exchange_stats, publish, Id, Value) ->
186    %% Includes delete marker
187    _ = ets:update_counter(channel_queue_exchange_metrics, Id, Value, {Id, 0, 0}),
188    ok;
189channel_stats(queue_stats, get, Id, Value) ->
190    %% Includes delete marker
191    _ = ets:update_counter(channel_queue_metrics, Id, {2, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}),
192    ok;
193channel_stats(queue_stats, get_no_ack, Id, Value) ->
194    %% Includes delete marker
195    _ = ets:update_counter(channel_queue_metrics, Id, {3, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}),
196    ok;
197channel_stats(queue_stats, deliver, Id, Value) ->
198    %% Includes delete marker
199    _ = ets:update_counter(channel_queue_metrics, Id, {4, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}),
200    ok;
201channel_stats(queue_stats, deliver_no_ack, Id, Value) ->
202    %% Includes delete marker
203    _ = ets:update_counter(channel_queue_metrics, Id, {5, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}),
204    ok;
205channel_stats(queue_stats, redeliver, Id, Value) ->
206    %% Includes delete marker
207    _ = ets:update_counter(channel_queue_metrics, Id, {6, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}),
208    ok;
209channel_stats(queue_stats, ack, Id, Value) ->
210    %% Includes delete marker
211    _ = ets:update_counter(channel_queue_metrics, Id, {7, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}),
212    ok;
213channel_stats(queue_stats, get_empty, Id, Value) ->
214    %% Includes delete marker
215    _ = ets:update_counter(channel_queue_metrics, Id, {8, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}),
216    ok.
217
218delete(Table, Key) ->
219    ets:delete(Table, Key),
220    ok.
221
222channel_queue_down(Id) ->
223    %% Delete marker
224    ets:update_element(channel_queue_metrics, Id, {9, 1}),
225    ok.
226
227channel_queue_exchange_down(Id) ->
228    %% Delete marker
229    ets:update_element(channel_queue_exchange_metrics, Id, {3, 1}),
230    ok.
231
232channel_exchange_down(Id) ->
233    %% Delete marker
234    ets:update_element(channel_exchange_metrics, Id, {6, 1}),
235    ok.
236
237consumer_created(ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName,
238                 PrefetchCount, Active, ActivityStatus, Args) ->
239    ets:insert(consumer_created, {{QName, ChPid, ConsumerTag}, ExclusiveConsume,
240                                   AckRequired, PrefetchCount, Active, ActivityStatus, Args}),
241    ok.
242
243consumer_updated(ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName,
244                 PrefetchCount, Active, ActivityStatus, Args) ->
245    ets:insert(consumer_created, {{QName, ChPid, ConsumerTag}, ExclusiveConsume,
246                                   AckRequired, PrefetchCount, Active, ActivityStatus, Args}),
247    ok.
248
249consumer_deleted(ChPid, ConsumerTag, QName) ->
250    ets:delete(consumer_created, {QName, ChPid, ConsumerTag}),
251    ok.
252
253queue_stats(Name, Infos) ->
254    %% Includes delete marker
255    ets:insert(queue_metrics, {Name, Infos, 0}),
256    ok.
257
258queue_stats(Name, MessagesReady, MessagesUnacknowledge, Messages, Reductions) ->
259    ets:insert(queue_coarse_metrics, {Name, MessagesReady, MessagesUnacknowledge,
260                                      Messages, Reductions}),
261    ok.
262
263queue_declared(_Name) ->
264    %% Name is not needed, but might be useful in the future.
265    ets:update_counter(connection_churn_metrics, node(), {6, 1},
266                       ?CONNECTION_CHURN_METRICS),
267    ok.
268
269queue_created(_Name) ->
270    %% Name is not needed, but might be useful in the future.
271    ets:update_counter(connection_churn_metrics, node(), {7, 1},
272                       ?CONNECTION_CHURN_METRICS),
273    ok.
274
275queue_deleted(Name) ->
276    ets:delete(queue_coarse_metrics, Name),
277    ets:update_counter(connection_churn_metrics, node(), {8, 1},
278                       ?CONNECTION_CHURN_METRICS),
279    %% Delete markers
280    ets:update_element(queue_metrics, Name, {3, 1}),
281    CQX = ets:select(channel_queue_exchange_metrics, match_spec_cqx(Name)),
282    lists:foreach(fun(Key) ->
283                          ets:update_element(channel_queue_exchange_metrics, Key, {3, 1})
284                  end, CQX),
285    CQ = ets:select(channel_queue_metrics, match_spec_cq(Name)),
286    lists:foreach(fun(Key) ->
287                          ets:update_element(channel_queue_metrics, Key, {9, 1})
288                  end, CQ).
289
290queues_deleted(Queues) ->
291    ets:update_counter(connection_churn_metrics, node(), {8, length(Queues)},
292                       ?CONNECTION_CHURN_METRICS),
293    [ delete_queue_metrics(Queue) || Queue <- Queues ],
294    [
295        begin
296            MatchSpecCondition = build_match_spec_conditions_to_delete_all_queues(QueuesPartition),
297            delete_channel_queue_exchange_metrics(MatchSpecCondition),
298            delete_channel_queue_metrics(MatchSpecCondition)
299        end || QueuesPartition <- partition_queues(Queues)
300    ],
301    ok.
302
303partition_queues(Queues) when length(Queues) >= 1000 ->
304    {Partition, Rest} = lists:split(1000, Queues),
305    [Partition | partition_queues(Rest)];
306partition_queues(Queues) ->
307    [Queues].
308
309delete_queue_metrics(Queue) ->
310    ets:delete(queue_coarse_metrics, Queue),
311    ets:update_element(queue_metrics, Queue, {3, 1}),
312    ok.
313
314delete_channel_queue_exchange_metrics(MatchSpecCondition) ->
315    ChannelQueueExchangeMetricsToUpdate = ets:select(
316        channel_queue_exchange_metrics,
317        [
318            {
319                {{'$2', {'$1', '$3'}}, '_', '_'},
320                [MatchSpecCondition],
321                [{{'$2', {{'$1', '$3'}}}}]
322            }
323        ]
324    ),
325    lists:foreach(fun(Key) ->
326        ets:update_element(channel_queue_exchange_metrics, Key, {3, 1})
327    end, ChannelQueueExchangeMetricsToUpdate).
328
329delete_channel_queue_metrics(MatchSpecCondition) ->
330    ChannelQueueMetricsToUpdate = ets:select(
331        channel_queue_metrics,
332        [
333            {
334                {{'$2', '$1'}, '_', '_', '_', '_', '_', '_', '_', '_'},
335                [MatchSpecCondition],
336                [{{'$2', '$1'}}]
337            }
338        ]
339    ),
340    lists:foreach(fun(Key) ->
341        ets:update_element(channel_queue_metrics, Key, {9, 1})
342    end, ChannelQueueMetricsToUpdate).
343
344% [{'orelse',
345%    {'==', {Queue}, '$1'},
346%    {'orelse',
347%        {'==', {Queue}, '$1'},
348%        % ...
349%            {'orelse',
350%                {'==', {Queue}, '$1'},
351%                {'==', true, true}
352%            }
353%    }
354%  }],
355build_match_spec_conditions_to_delete_all_queues([Queue|Queues]) ->
356     {'orelse',
357         {'==', {Queue}, '$1'},
358         build_match_spec_conditions_to_delete_all_queues(Queues)
359     };
360build_match_spec_conditions_to_delete_all_queues([]) ->
361    true.
362
363node_stats(persister_metrics, Infos) ->
364    ets:insert(node_persister_metrics, {node(), Infos}),
365    ok;
366node_stats(coarse_metrics, Infos) ->
367    ets:insert(node_coarse_metrics, {node(), Infos}),
368    ok;
369node_stats(node_metrics, Infos) ->
370    ets:insert(node_metrics, {node(), Infos}),
371    ok.
372
373node_node_stats(Id, Infos) ->
374    ets:insert(node_node_metrics, {Id, Infos}),
375    ok.
376
377match_spec_cqx(Id) ->
378    [{{{'$2', {'$1', '$3'}}, '_', '_'}, [{'==', {Id}, '$1'}], [{{'$2', {{'$1', '$3'}}}}]}].
379
380match_spec_cq(Id) ->
381    [{{{'$2', '$1'}, '_', '_', '_', '_', '_', '_', '_', '_'}, [{'==', {Id}, '$1'}], [{{'$2', '$1'}}]}].
382
383gen_server2_stats(Pid, BufferLength) ->
384    ets:insert(gen_server2_metrics, {Pid, BufferLength}),
385    ok.
386
387gen_server2_deleted(Pid) ->
388    ets:delete(gen_server2_metrics, Pid),
389    ok.
390
391get_gen_server2_stats(Pid) ->
392    case ets:lookup(gen_server2_metrics, Pid) of
393        [{Pid, BufferLength}] ->
394            BufferLength;
395        [] ->
396            not_found
397    end.
398
399auth_attempt_succeeded(RemoteAddress, Username, Protocol) ->
400    %% ETS entry is {Key = {RemoteAddress, Username}, Total, Succeeded, Failed}
401    update_auth_attempt(RemoteAddress, Username, Protocol, [{2, 1}, {3, 1}]).
402
403auth_attempt_failed(RemoteAddress, Username, Protocol) ->
404    %% ETS entry is {Key = {RemoteAddress, Username}, Total, Succeeded, Failed}
405    update_auth_attempt(RemoteAddress, Username, Protocol, [{2, 1}, {4, 1}]).
406
407update_auth_attempt(RemoteAddress, Username, Protocol, Incr) ->
408    %% It should default to false as per ip/user metrics could keep growing indefinitely
409    %% It's up to the operator to enable them, and reset it required
410    case application:get_env(rabbit, track_auth_attempt_source) of
411        {ok, true} ->
412            case {RemoteAddress, Username} of
413                {<<>>, <<>>} ->
414                    ok;
415                _ ->
416                    Key = {RemoteAddress, Username, Protocol},
417                    _ = ets:update_counter(auth_attempt_detailed_metrics, Key, Incr, {Key, 0, 0, 0})
418            end;
419        {ok, false} ->
420            ok
421    end,
422    _ = ets:update_counter(auth_attempt_metrics, Protocol, Incr, {Protocol, 0, 0, 0}),
423    ok.
424
425reset_auth_attempt_metrics() ->
426    ets:delete_all_objects(auth_attempt_metrics),
427    ets:delete_all_objects(auth_attempt_detailed_metrics),
428    ok.
429
430get_auth_attempts() ->
431    [format_auth_attempt(A) || A <- ets:tab2list(auth_attempt_metrics)].
432
433get_auth_attempts_by_source() ->
434    [format_auth_attempt(A) || A <- ets:tab2list(auth_attempt_detailed_metrics)].
435
436format_auth_attempt({{RemoteAddress, Username, Protocol}, Total, Succeeded, Failed}) ->
437    [{remote_address, RemoteAddress}, {username, Username},
438     {protocol, atom_to_binary(Protocol, utf8)}, {auth_attempts, Total},
439     {auth_attempts_failed, Failed}, {auth_attempts_succeeded, Succeeded}];
440format_auth_attempt({Protocol, Total, Succeeded, Failed}) ->
441    [{protocol, atom_to_binary(Protocol, utf8)}, {auth_attempts, Total},
442     {auth_attempts_failed, Failed}, {auth_attempts_succeeded, Succeeded}].
443