1%% This Source Code Form is subject to the terms of the Mozilla Public 2%% License, v. 2.0. If a copy of the MPL was not distributed with this 3%% file, You can obtain one at https://mozilla.org/MPL/2.0/. 4%% 5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved. 6%% 7 8-module(rabbit_core_metrics). 9 10-include("rabbit_core_metrics.hrl"). 11 12-export([create_table/1]). 13-export([init/0]). 14-export([terminate/0]). 15 16-export([connection_created/2, 17 connection_closed/1, 18 connection_stats/2, 19 connection_stats/4]). 20 21-export([channel_created/2, 22 channel_closed/1, 23 channel_stats/2, 24 channel_stats/3, 25 channel_stats/4, 26 channel_queue_down/1, 27 channel_queue_exchange_down/1, 28 channel_exchange_down/1]). 29 30-export([consumer_created/9, 31 consumer_updated/9, 32 consumer_deleted/3]). 33 34-export([queue_stats/2, 35 queue_stats/5, 36 queue_declared/1, 37 queue_created/1, 38 queue_deleted/1, 39 queues_deleted/1]). 40 41-export([node_stats/2]). 42 43-export([node_node_stats/2]). 44 45-export([gen_server2_stats/2, 46 gen_server2_deleted/1, 47 get_gen_server2_stats/1]). 48 49-export([delete/2]). 50 51-export([auth_attempt_failed/3, 52 auth_attempt_succeeded/3, 53 reset_auth_attempt_metrics/0, 54 get_auth_attempts/0, 55 get_auth_attempts_by_source/0]). 56 57%%---------------------------------------------------------------------------- 58%% Types 59%%---------------------------------------------------------------------------- 60-type(channel_stats_id() :: pid() | 61 {pid(), 62 {rabbit_amqqueue:name(), rabbit_exchange:name()}} | 63 {pid(), rabbit_amqqueue:name()} | 64 {pid(), rabbit_exchange:name()}). 65 66-type(channel_stats_type() :: queue_exchange_stats | queue_stats | 67 exchange_stats | reductions). 68 69-type(activity_status() :: up | single_active | waiting | suspected_down). 70%%---------------------------------------------------------------------------- 71%% Specs 72%%---------------------------------------------------------------------------- 73-spec init() -> ok. 74-spec connection_created(pid(), rabbit_types:infos()) -> ok. 75-spec connection_closed(pid()) -> ok. 76-spec connection_stats(pid(), rabbit_types:infos()) -> ok. 77-spec connection_stats(pid(), integer(), integer(), integer()) -> ok. 78-spec channel_created(pid(), rabbit_types:infos()) -> ok. 79-spec channel_closed(pid()) -> ok. 80-spec channel_stats(pid(), rabbit_types:infos()) -> ok. 81-spec channel_stats(channel_stats_type(), channel_stats_id(), 82 rabbit_types:infos() | integer()) -> ok. 83-spec channel_queue_down({pid(), rabbit_amqqueue:name()}) -> ok. 84-spec channel_queue_exchange_down({pid(), {rabbit_amqqueue:name(), 85 rabbit_exchange:name()}}) -> ok. 86-spec channel_exchange_down({pid(), rabbit_exchange:name()}) -> ok. 87-spec consumer_created(pid(), binary(), boolean(), boolean(), 88 rabbit_amqqueue:name(), integer(), boolean(), activity_status(), list()) -> ok. 89-spec consumer_updated(pid(), binary(), boolean(), boolean(), 90 rabbit_amqqueue:name(), integer(), boolean(), activity_status(), list()) -> ok. 91-spec consumer_deleted(pid(), binary(), rabbit_amqqueue:name()) -> ok. 92-spec queue_stats(rabbit_amqqueue:name(), rabbit_types:infos()) -> ok. 93-spec queue_stats(rabbit_amqqueue:name(), integer(), integer(), integer(), 94 integer()) -> ok. 95-spec node_stats(atom(), rabbit_types:infos()) -> ok. 96-spec node_node_stats({node(), node()}, rabbit_types:infos()) -> ok. 97-spec gen_server2_stats(pid(), integer()) -> ok. 98-spec gen_server2_deleted(pid()) -> ok. 99-spec get_gen_server2_stats(pid()) -> integer() | 'not_found'. 100-spec delete(atom(), any()) -> ok. 101%%---------------------------------------------------------------------------- 102%% Storage of the raw metrics in RabbitMQ core. All the processing of stats 103%% is done by the management plugin. 104%%---------------------------------------------------------------------------- 105%%---------------------------------------------------------------------------- 106%% API 107%%---------------------------------------------------------------------------- 108 109create_table({Table, Type}) -> 110 ets:new(Table, [Type, public, named_table, {write_concurrency, true}, 111 {read_concurrency, true}]). 112 113init() -> 114 _ = [create_table({Table, Type}) 115 || {Table, Type} <- ?CORE_TABLES ++ ?CORE_EXTRA_TABLES], 116 ok. 117 118terminate() -> 119 [ets:delete(Table) 120 || {Table, _Type} <- ?CORE_TABLES ++ ?CORE_EXTRA_TABLES], 121 ok. 122 123connection_created(Pid, Infos) -> 124 ets:insert(connection_created, {Pid, Infos}), 125 ets:update_counter(connection_churn_metrics, node(), {2, 1}, 126 ?CONNECTION_CHURN_METRICS), 127 ok. 128 129connection_closed(Pid) -> 130 ets:delete(connection_created, Pid), 131 ets:delete(connection_metrics, Pid), 132 %% Delete marker 133 ets:update_element(connection_coarse_metrics, Pid, {5, 1}), 134 ets:update_counter(connection_churn_metrics, node(), {3, 1}, 135 ?CONNECTION_CHURN_METRICS), 136 ok. 137 138connection_stats(Pid, Infos) -> 139 ets:insert(connection_metrics, {Pid, Infos}), 140 ok. 141 142connection_stats(Pid, Recv_oct, Send_oct, Reductions) -> 143 %% Includes delete marker 144 ets:insert(connection_coarse_metrics, {Pid, Recv_oct, Send_oct, Reductions, 0}), 145 ok. 146 147channel_created(Pid, Infos) -> 148 ets:insert(channel_created, {Pid, Infos}), 149 ets:update_counter(connection_churn_metrics, node(), {4, 1}, 150 ?CONNECTION_CHURN_METRICS), 151 ok. 152 153channel_closed(Pid) -> 154 ets:delete(channel_created, Pid), 155 ets:delete(channel_metrics, Pid), 156 ets:delete(channel_process_metrics, Pid), 157 ets:update_counter(connection_churn_metrics, node(), {5, 1}, 158 ?CONNECTION_CHURN_METRICS), 159 ok. 160 161channel_stats(Pid, Infos) -> 162 ets:insert(channel_metrics, {Pid, Infos}), 163 ok. 164 165channel_stats(reductions, Id, Value) -> 166 ets:insert(channel_process_metrics, {Id, Value}), 167 ok. 168 169channel_stats(exchange_stats, publish, Id, Value) -> 170 %% Includes delete marker 171 _ = ets:update_counter(channel_exchange_metrics, Id, {2, Value}, {Id, 0, 0, 0, 0, 0}), 172 ok; 173channel_stats(exchange_stats, confirm, Id, Value) -> 174 %% Includes delete marker 175 _ = ets:update_counter(channel_exchange_metrics, Id, {3, Value}, {Id, 0, 0, 0, 0, 0}), 176 ok; 177channel_stats(exchange_stats, return_unroutable, Id, Value) -> 178 %% Includes delete marker 179 _ = ets:update_counter(channel_exchange_metrics, Id, {4, Value}, {Id, 0, 0, 0, 0, 0}), 180 ok; 181channel_stats(exchange_stats, drop_unroutable, Id, Value) -> 182 %% Includes delete marker 183 _ = ets:update_counter(channel_exchange_metrics, Id, {5, Value}, {Id, 0, 0, 0, 0, 0}), 184 ok; 185channel_stats(queue_exchange_stats, publish, Id, Value) -> 186 %% Includes delete marker 187 _ = ets:update_counter(channel_queue_exchange_metrics, Id, Value, {Id, 0, 0}), 188 ok; 189channel_stats(queue_stats, get, Id, Value) -> 190 %% Includes delete marker 191 _ = ets:update_counter(channel_queue_metrics, Id, {2, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}), 192 ok; 193channel_stats(queue_stats, get_no_ack, Id, Value) -> 194 %% Includes delete marker 195 _ = ets:update_counter(channel_queue_metrics, Id, {3, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}), 196 ok; 197channel_stats(queue_stats, deliver, Id, Value) -> 198 %% Includes delete marker 199 _ = ets:update_counter(channel_queue_metrics, Id, {4, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}), 200 ok; 201channel_stats(queue_stats, deliver_no_ack, Id, Value) -> 202 %% Includes delete marker 203 _ = ets:update_counter(channel_queue_metrics, Id, {5, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}), 204 ok; 205channel_stats(queue_stats, redeliver, Id, Value) -> 206 %% Includes delete marker 207 _ = ets:update_counter(channel_queue_metrics, Id, {6, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}), 208 ok; 209channel_stats(queue_stats, ack, Id, Value) -> 210 %% Includes delete marker 211 _ = ets:update_counter(channel_queue_metrics, Id, {7, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}), 212 ok; 213channel_stats(queue_stats, get_empty, Id, Value) -> 214 %% Includes delete marker 215 _ = ets:update_counter(channel_queue_metrics, Id, {8, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}), 216 ok. 217 218delete(Table, Key) -> 219 ets:delete(Table, Key), 220 ok. 221 222channel_queue_down(Id) -> 223 %% Delete marker 224 ets:update_element(channel_queue_metrics, Id, {9, 1}), 225 ok. 226 227channel_queue_exchange_down(Id) -> 228 %% Delete marker 229 ets:update_element(channel_queue_exchange_metrics, Id, {3, 1}), 230 ok. 231 232channel_exchange_down(Id) -> 233 %% Delete marker 234 ets:update_element(channel_exchange_metrics, Id, {6, 1}), 235 ok. 236 237consumer_created(ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName, 238 PrefetchCount, Active, ActivityStatus, Args) -> 239 ets:insert(consumer_created, {{QName, ChPid, ConsumerTag}, ExclusiveConsume, 240 AckRequired, PrefetchCount, Active, ActivityStatus, Args}), 241 ok. 242 243consumer_updated(ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName, 244 PrefetchCount, Active, ActivityStatus, Args) -> 245 ets:insert(consumer_created, {{QName, ChPid, ConsumerTag}, ExclusiveConsume, 246 AckRequired, PrefetchCount, Active, ActivityStatus, Args}), 247 ok. 248 249consumer_deleted(ChPid, ConsumerTag, QName) -> 250 ets:delete(consumer_created, {QName, ChPid, ConsumerTag}), 251 ok. 252 253queue_stats(Name, Infos) -> 254 %% Includes delete marker 255 ets:insert(queue_metrics, {Name, Infos, 0}), 256 ok. 257 258queue_stats(Name, MessagesReady, MessagesUnacknowledge, Messages, Reductions) -> 259 ets:insert(queue_coarse_metrics, {Name, MessagesReady, MessagesUnacknowledge, 260 Messages, Reductions}), 261 ok. 262 263queue_declared(_Name) -> 264 %% Name is not needed, but might be useful in the future. 265 ets:update_counter(connection_churn_metrics, node(), {6, 1}, 266 ?CONNECTION_CHURN_METRICS), 267 ok. 268 269queue_created(_Name) -> 270 %% Name is not needed, but might be useful in the future. 271 ets:update_counter(connection_churn_metrics, node(), {7, 1}, 272 ?CONNECTION_CHURN_METRICS), 273 ok. 274 275queue_deleted(Name) -> 276 ets:delete(queue_coarse_metrics, Name), 277 ets:update_counter(connection_churn_metrics, node(), {8, 1}, 278 ?CONNECTION_CHURN_METRICS), 279 %% Delete markers 280 ets:update_element(queue_metrics, Name, {3, 1}), 281 CQX = ets:select(channel_queue_exchange_metrics, match_spec_cqx(Name)), 282 lists:foreach(fun(Key) -> 283 ets:update_element(channel_queue_exchange_metrics, Key, {3, 1}) 284 end, CQX), 285 CQ = ets:select(channel_queue_metrics, match_spec_cq(Name)), 286 lists:foreach(fun(Key) -> 287 ets:update_element(channel_queue_metrics, Key, {9, 1}) 288 end, CQ). 289 290queues_deleted(Queues) -> 291 ets:update_counter(connection_churn_metrics, node(), {8, length(Queues)}, 292 ?CONNECTION_CHURN_METRICS), 293 [ delete_queue_metrics(Queue) || Queue <- Queues ], 294 [ 295 begin 296 MatchSpecCondition = build_match_spec_conditions_to_delete_all_queues(QueuesPartition), 297 delete_channel_queue_exchange_metrics(MatchSpecCondition), 298 delete_channel_queue_metrics(MatchSpecCondition) 299 end || QueuesPartition <- partition_queues(Queues) 300 ], 301 ok. 302 303partition_queues(Queues) when length(Queues) >= 1000 -> 304 {Partition, Rest} = lists:split(1000, Queues), 305 [Partition | partition_queues(Rest)]; 306partition_queues(Queues) -> 307 [Queues]. 308 309delete_queue_metrics(Queue) -> 310 ets:delete(queue_coarse_metrics, Queue), 311 ets:update_element(queue_metrics, Queue, {3, 1}), 312 ok. 313 314delete_channel_queue_exchange_metrics(MatchSpecCondition) -> 315 ChannelQueueExchangeMetricsToUpdate = ets:select( 316 channel_queue_exchange_metrics, 317 [ 318 { 319 {{'$2', {'$1', '$3'}}, '_', '_'}, 320 [MatchSpecCondition], 321 [{{'$2', {{'$1', '$3'}}}}] 322 } 323 ] 324 ), 325 lists:foreach(fun(Key) -> 326 ets:update_element(channel_queue_exchange_metrics, Key, {3, 1}) 327 end, ChannelQueueExchangeMetricsToUpdate). 328 329delete_channel_queue_metrics(MatchSpecCondition) -> 330 ChannelQueueMetricsToUpdate = ets:select( 331 channel_queue_metrics, 332 [ 333 { 334 {{'$2', '$1'}, '_', '_', '_', '_', '_', '_', '_', '_'}, 335 [MatchSpecCondition], 336 [{{'$2', '$1'}}] 337 } 338 ] 339 ), 340 lists:foreach(fun(Key) -> 341 ets:update_element(channel_queue_metrics, Key, {9, 1}) 342 end, ChannelQueueMetricsToUpdate). 343 344% [{'orelse', 345% {'==', {Queue}, '$1'}, 346% {'orelse', 347% {'==', {Queue}, '$1'}, 348% % ... 349% {'orelse', 350% {'==', {Queue}, '$1'}, 351% {'==', true, true} 352% } 353% } 354% }], 355build_match_spec_conditions_to_delete_all_queues([Queue|Queues]) -> 356 {'orelse', 357 {'==', {Queue}, '$1'}, 358 build_match_spec_conditions_to_delete_all_queues(Queues) 359 }; 360build_match_spec_conditions_to_delete_all_queues([]) -> 361 true. 362 363node_stats(persister_metrics, Infos) -> 364 ets:insert(node_persister_metrics, {node(), Infos}), 365 ok; 366node_stats(coarse_metrics, Infos) -> 367 ets:insert(node_coarse_metrics, {node(), Infos}), 368 ok; 369node_stats(node_metrics, Infos) -> 370 ets:insert(node_metrics, {node(), Infos}), 371 ok. 372 373node_node_stats(Id, Infos) -> 374 ets:insert(node_node_metrics, {Id, Infos}), 375 ok. 376 377match_spec_cqx(Id) -> 378 [{{{'$2', {'$1', '$3'}}, '_', '_'}, [{'==', {Id}, '$1'}], [{{'$2', {{'$1', '$3'}}}}]}]. 379 380match_spec_cq(Id) -> 381 [{{{'$2', '$1'}, '_', '_', '_', '_', '_', '_', '_', '_'}, [{'==', {Id}, '$1'}], [{{'$2', '$1'}}]}]. 382 383gen_server2_stats(Pid, BufferLength) -> 384 ets:insert(gen_server2_metrics, {Pid, BufferLength}), 385 ok. 386 387gen_server2_deleted(Pid) -> 388 ets:delete(gen_server2_metrics, Pid), 389 ok. 390 391get_gen_server2_stats(Pid) -> 392 case ets:lookup(gen_server2_metrics, Pid) of 393 [{Pid, BufferLength}] -> 394 BufferLength; 395 [] -> 396 not_found 397 end. 398 399auth_attempt_succeeded(RemoteAddress, Username, Protocol) -> 400 %% ETS entry is {Key = {RemoteAddress, Username}, Total, Succeeded, Failed} 401 update_auth_attempt(RemoteAddress, Username, Protocol, [{2, 1}, {3, 1}]). 402 403auth_attempt_failed(RemoteAddress, Username, Protocol) -> 404 %% ETS entry is {Key = {RemoteAddress, Username}, Total, Succeeded, Failed} 405 update_auth_attempt(RemoteAddress, Username, Protocol, [{2, 1}, {4, 1}]). 406 407update_auth_attempt(RemoteAddress, Username, Protocol, Incr) -> 408 %% It should default to false as per ip/user metrics could keep growing indefinitely 409 %% It's up to the operator to enable them, and reset it required 410 case application:get_env(rabbit, track_auth_attempt_source) of 411 {ok, true} -> 412 case {RemoteAddress, Username} of 413 {<<>>, <<>>} -> 414 ok; 415 _ -> 416 Key = {RemoteAddress, Username, Protocol}, 417 _ = ets:update_counter(auth_attempt_detailed_metrics, Key, Incr, {Key, 0, 0, 0}) 418 end; 419 {ok, false} -> 420 ok 421 end, 422 _ = ets:update_counter(auth_attempt_metrics, Protocol, Incr, {Protocol, 0, 0, 0}), 423 ok. 424 425reset_auth_attempt_metrics() -> 426 ets:delete_all_objects(auth_attempt_metrics), 427 ets:delete_all_objects(auth_attempt_detailed_metrics), 428 ok. 429 430get_auth_attempts() -> 431 [format_auth_attempt(A) || A <- ets:tab2list(auth_attempt_metrics)]. 432 433get_auth_attempts_by_source() -> 434 [format_auth_attempt(A) || A <- ets:tab2list(auth_attempt_detailed_metrics)]. 435 436format_auth_attempt({{RemoteAddress, Username, Protocol}, Total, Succeeded, Failed}) -> 437 [{remote_address, RemoteAddress}, {username, Username}, 438 {protocol, atom_to_binary(Protocol, utf8)}, {auth_attempts, Total}, 439 {auth_attempts_failed, Failed}, {auth_attempts_succeeded, Succeeded}]; 440format_auth_attempt({Protocol, Total, Succeeded, Failed}) -> 441 [{protocol, atom_to_binary(Protocol, utf8)}, {auth_attempts, Total}, 442 {auth_attempts_failed, Failed}, {auth_attempts_succeeded, Succeeded}]. 443