1%% This Source Code Form is subject to the terms of the Mozilla Public 2%% License, v. 2.0. If a copy of the MPL was not distributed with this 3%% file, You can obtain one at https://mozilla.org/MPL/2.0/. 4%% 5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved. 6%% 7 8-module(rabbit_mgmt_external_stats). 9 10%% Transitional step until we can require Erlang/OTP 21 and 11%% use the now recommended try/catch syntax for obtaining the stack trace. 12-compile(nowarn_deprecated_function). 13 14-behaviour(gen_server). 15 16-export([start_link/0]). 17-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, 18 code_change/3]). 19 20-export([list_registry_plugins/1]). 21 22-import(rabbit_misc, [pget/2]). 23 24-include_lib("rabbit_common/include/rabbit.hrl"). 25 26-define(METRICS_KEYS, [fd_used, sockets_used, mem_used, disk_free, proc_used, gc_num, 27 gc_bytes_reclaimed, context_switches]). 28 29-define(PERSISTER_KEYS, [persister_stats]). 30 31-define(OTHER_KEYS, [name, partitions, os_pid, fd_total, sockets_total, mem_limit, 32 mem_alarm, disk_free_limit, disk_free_alarm, proc_total, 33 rates_mode, uptime, run_queue, processors, exchange_types, 34 auth_mechanisms, applications, contexts, log_files, 35 db_dir, config_files, net_ticktime, enabled_plugins, 36 mem_calculation_strategy, ra_open_file_metrics]). 37 38-define(TEN_MINUTES_AS_SECONDS, 600). 39 40%%-------------------------------------------------------------------- 41 42-record(state, { 43 fd_total, 44 fhc_stats, 45 node_owners, 46 last_ts, 47 interval, 48 error_logged_time 49}). 50 51%%-------------------------------------------------------------------- 52 53start_link() -> 54 gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). 55 56%%-------------------------------------------------------------------- 57 58get_used_fd(State0) -> 59 try 60 case get_used_fd(os:type(), State0) of 61 {State1, UsedFd} when is_number(UsedFd) -> 62 {State1, UsedFd}; 63 {State1, _Other} -> 64 %% Defaults to 0 if data is not available 65 {State1, 0} 66 end 67 catch 68 _:Error -> 69 State2 = log_fd_error("Could not infer the number of file handles used: ~p", [Error], State0), 70 {State2, 0} 71 end. 72 73get_used_fd({unix, linux}, State0) -> 74 case file:list_dir("/proc/" ++ os:getpid() ++ "/fd") of 75 {ok, Files} -> 76 {State0, length(Files)}; 77 {error, _} -> 78 get_used_fd({unix, generic}, State0) 79 end; 80 81get_used_fd({unix, BSD}, State0) 82 when BSD == openbsd; BSD == freebsd; BSD == netbsd -> 83 IsDigit = fun (D) -> lists:member(D, "0123456789*") end, 84 Output = os:cmd("fstat -p " ++ os:getpid()), 85 try 86 F = fun (Line) -> 87 lists:all(IsDigit, lists:nth(4, string:tokens(Line, " "))) 88 end, 89 UsedFd = length(lists:filter(F, string:tokens(Output, "\n"))), 90 {State0, UsedFd} 91 catch _:Error:Stacktrace -> 92 State1 = log_fd_error("Could not parse fstat output:~n~s~n~p", 93 [Output, {Error, Stacktrace}], State0), 94 {State1, 0} 95 end; 96 97get_used_fd({unix, _}, State0) -> 98 Cmd = rabbit_misc:format( 99 "lsof -d \"0-9999999\" -lna -p ~s || echo failed", [os:getpid()]), 100 Res = os:cmd(Cmd), 101 case string:right(Res, 7) of 102 "failed\n" -> 103 State1 = log_fd_error("Could not obtain lsof output", [], State0), 104 {State1, 0}; 105 _ -> 106 UsedFd = string:words(Res, $\n) - 1, 107 {State0, UsedFd} 108 end; 109 110%% handle.exe can be obtained from 111%% https://technet.microsoft.com/en-us/sysinternals/bb896655.aspx 112 113%% Output looks like: 114 115%% Handle v3.42 116%% Copyright (C) 1997-2008 Mark Russinovich 117%% Sysinternals - www.sysinternals.com 118%% 119%% Handle type summary: 120%% ALPC Port : 2 121%% Desktop : 1 122%% Directory : 1 123%% Event : 108 124%% File : 25 125%% IoCompletion : 3 126%% Key : 7 127%% KeyedEvent : 1 128%% Mutant : 1 129%% Process : 3 130%% Process : 38 131%% Thread : 41 132%% Timer : 3 133%% TpWorkerFactory : 2 134%% WindowStation : 2 135%% Total handles: 238 136 137%% Nthandle v4.22 - Handle viewer 138%% Copyright (C) 1997-2019 Mark Russinovich 139%% Sysinternals - www.sysinternals.com 140%% 141%% Handle type summary: 142%% <Unknown type> : 1 143%% <Unknown type> : 166 144%% ALPC Port : 11 145%% Desktop : 1 146%% Directory : 2 147%% Event : 226 148%% File : 122 149%% IoCompletion : 8 150%% IRTimer : 6 151%% Key : 42 152%% Mutant : 7 153%% Process : 3 154%% Section : 2 155%% Semaphore : 43 156%% Thread : 36 157%% TpWorkerFactory : 3 158%% WaitCompletionPacket: 25 159%% WindowStation : 2 160%% Total handles: 706 161 162%% Note that the "File" number appears to include network sockets too; I assume 163%% that's the number we care about. Note also that if you omit "-s" you will 164%% see a list of file handles *without* network sockets. If you then add "-a" 165%% you will see a list of handles of various types, including network sockets 166%% shown as file handles to \Device\Afd. 167 168get_used_fd({win32, _}, State0) -> 169 Handle = rabbit_misc:os_cmd( 170 "handle.exe /accepteula -s -p " ++ os:getpid() ++ " 2> nul"), 171 case Handle of 172 [] -> 173 State1 = log_fd_error("Could not find handle.exe, please install from sysinternals", [], State0), 174 {State1, 0}; 175 _ -> 176 case find_files_line(string:tokens(Handle, "\r\n")) of 177 unknown -> 178 State1 = log_fd_error("handle.exe output did not contain " 179 "a line beginning with ' File ', unable " 180 "to determine used file descriptor " 181 "count: ~p", [Handle], State0), 182 {State1, 0}; 183 UsedFd -> 184 {State0, UsedFd} 185 end 186 end. 187 188find_files_line([]) -> 189 unknown; 190find_files_line([" File " ++ Rest | _T]) -> 191 [Files] = string:tokens(Rest, ": "), 192 list_to_integer(Files); 193find_files_line([_H | T]) -> 194 find_files_line(T). 195 196-define(SAFE_CALL(Fun, NoProcFailResult), 197 try 198 Fun 199 catch exit:{noproc, _} -> NoProcFailResult 200 end). 201 202get_disk_free_limit() -> ?SAFE_CALL(rabbit_disk_monitor:get_disk_free_limit(), 203 disk_free_monitoring_disabled). 204 205get_disk_free() -> ?SAFE_CALL(rabbit_disk_monitor:get_disk_free(), 206 disk_free_monitoring_disabled). 207 208log_fd_error(Fmt, Args, #state{error_logged_time = undefined}=State) -> 209 % rabbitmq/rabbitmq-management#90 210 % no errors have been logged, so log it and make a note of when 211 Now = erlang:monotonic_time(second), 212 ok = rabbit_log:error(Fmt, Args), 213 State#state{error_logged_time = Now}; 214log_fd_error(Fmt, Args, #state{error_logged_time = Time}=State) -> 215 Now = erlang:monotonic_time(second), 216 case Now >= Time + ?TEN_MINUTES_AS_SECONDS of 217 true -> 218 % rabbitmq/rabbitmq-management#90 219 % it has been longer than 10 minutes, 220 % re-log the error 221 ok = rabbit_log:error(Fmt, Args), 222 State#state{error_logged_time = Now}; 223 _ -> 224 % 10 minutes have not yet passed 225 State 226 end. 227%%-------------------------------------------------------------------- 228 229infos([], Acc, State) -> 230 {State, lists:reverse(Acc)}; 231infos([Item|T], Acc0, State0) -> 232 {State1, Infos} = i(Item, State0), 233 Acc1 = [{Item, Infos}|Acc0], 234 infos(T, Acc1, State1). 235 236i(name, State) -> 237 {State, node()}; 238i(partitions, State) -> 239 {State, rabbit_node_monitor:partitions()}; 240i(fd_used, State) -> 241 get_used_fd(State); 242i(fd_total, #state{fd_total = FdTotal}=State) -> 243 {State, FdTotal}; 244i(sockets_used, State) -> 245 {State, proplists:get_value(sockets_used, file_handle_cache:info([sockets_used]))}; 246i(sockets_total, State) -> 247 {State, proplists:get_value(sockets_limit, file_handle_cache:info([sockets_limit]))}; 248i(os_pid, State) -> 249 {State, list_to_binary(os:getpid())}; 250i(mem_used, State) -> 251 {State, vm_memory_monitor:get_process_memory()}; 252i(mem_calculation_strategy, State) -> 253 {State, vm_memory_monitor:get_memory_calculation_strategy()}; 254i(mem_limit, State) -> 255 {State, vm_memory_monitor:get_memory_limit()}; 256i(mem_alarm, State) -> 257 {State, resource_alarm_set(memory)}; 258i(proc_used, State) -> 259 {State, erlang:system_info(process_count)}; 260i(proc_total, State) -> 261 {State, erlang:system_info(process_limit)}; 262i(run_queue, State) -> 263 {State, erlang:statistics(run_queue)}; 264i(processors, State) -> 265 {State, erlang:system_info(logical_processors)}; 266i(disk_free_limit, State) -> 267 {State, get_disk_free_limit()}; 268i(disk_free, State) -> 269 {State, get_disk_free()}; 270i(disk_free_alarm, State) -> 271 {State, resource_alarm_set(disk)}; 272i(contexts, State) -> 273 {State, rabbit_web_dispatch_contexts()}; 274i(uptime, State) -> 275 {Total, _} = erlang:statistics(wall_clock), 276 {State, Total}; 277i(rates_mode, State) -> 278 {State, rabbit_mgmt_db_handler:rates_mode()}; 279i(exchange_types, State) -> 280 {State, list_registry_plugins(exchange)}; 281i(log_files, State) -> 282 {State, [list_to_binary(F) || F <- rabbit:log_locations()]}; 283i(db_dir, State) -> 284 {State, list_to_binary(rabbit_mnesia:dir())}; 285i(config_files, State) -> 286 {State, [list_to_binary(F) || F <- rabbit:config_files()]}; 287i(net_ticktime, State) -> 288 {State, net_kernel:get_net_ticktime()}; 289i(persister_stats, State) -> 290 {State, persister_stats(State)}; 291i(enabled_plugins, State) -> 292 {ok, Dir} = application:get_env(rabbit, enabled_plugins_file), 293 {State, rabbit_plugins:read_enabled(Dir)}; 294i(auth_mechanisms, State) -> 295 {ok, Mechanisms} = application:get_env(rabbit, auth_mechanisms), 296 F = fun (N) -> 297 lists:member(list_to_atom(binary_to_list(N)), Mechanisms) 298 end, 299 {State, list_registry_plugins(auth_mechanism, F)}; 300i(applications, State) -> 301 {State, [format_application(A) || A <- lists:keysort(1, rabbit_misc:which_applications())]}; 302i(gc_num, State) -> 303 {GCs, _, _} = erlang:statistics(garbage_collection), 304 {State, GCs}; 305i(gc_bytes_reclaimed, State) -> 306 {_, Words, _} = erlang:statistics(garbage_collection), 307 {State, Words * erlang:system_info(wordsize)}; 308i(context_switches, State) -> 309 {Sw, 0} = erlang:statistics(context_switches), 310 {State, Sw}; 311i(ra_open_file_metrics, State) -> 312 {State, [{ra_log_wal, ra_metrics(ra_log_wal)}, 313 {ra_log_segment_writer, ra_metrics(ra_log_segment_writer)}]}. 314 315ra_metrics(K) -> 316 try 317 case ets:lookup(ra_open_file_metrics, whereis(K)) of 318 [] -> 0; 319 [{_, C}] -> C 320 end 321 catch 322 error:badarg -> 323 %% On startup the mgmt might start before ra does 324 0 325 end. 326 327resource_alarm_set(Source) -> 328 lists:member({{resource_limit, Source, node()},[]}, 329 rabbit_alarm:get_alarms()). 330 331list_registry_plugins(Type) -> 332 list_registry_plugins(Type, fun(_) -> true end). 333 334list_registry_plugins(Type, Fun) -> 335 [registry_plugin_enabled(set_plugin_name(Name, Module), Fun) || 336 {Name, Module} <- rabbit_registry:lookup_all(Type)]. 337 338registry_plugin_enabled(Desc, Fun) -> 339 Desc ++ [{enabled, Fun(proplists:get_value(name, Desc))}]. 340 341format_application({Application, Description, Version}) -> 342 [{name, Application}, 343 {description, list_to_binary(Description)}, 344 {version, list_to_binary(Version)}]. 345 346set_plugin_name(Name, Module) -> 347 [{name, list_to_binary(atom_to_list(Name))} | 348 proplists:delete(name, Module:description())]. 349 350persister_stats(#state{fhc_stats = FHC}) -> 351 [{flatten_key(K), V} || {{_Op, _Type} = K, V} <- FHC]. 352 353flatten_key({A, B}) -> 354 list_to_atom(atom_to_list(A) ++ "_" ++ atom_to_list(B)). 355 356cluster_links() -> 357 {ok, Items} = net_kernel:nodes_info(), 358 [Link || Item <- Items, 359 Link <- [format_nodes_info(Item)], Link =/= undefined]. 360 361format_nodes_info({Node, Info}) -> 362 Owner = proplists:get_value(owner, Info), 363 case catch process_info(Owner, links) of 364 {links, Links} -> 365 case [Link || Link <- Links, is_port(Link)] of 366 [Port] -> 367 {Node, Owner, format_nodes_info1(Port)}; 368 _ -> 369 undefined 370 end; 371 _ -> 372 undefined 373 end. 374 375format_nodes_info1(Port) -> 376 case {rabbit_net:socket_ends(Port, inbound), 377 rabbit_net:getstat(Port, [recv_oct, send_oct])} of 378 {{ok, {PeerAddr, PeerPort, SockAddr, SockPort}}, {ok, Stats}} -> 379 [{peer_addr, maybe_ntoab(PeerAddr)}, 380 {peer_port, PeerPort}, 381 {sock_addr, maybe_ntoab(SockAddr)}, 382 {sock_port, SockPort}, 383 {recv_bytes, pget(recv_oct, Stats)}, 384 {send_bytes, pget(send_oct, Stats)}]; 385 _ -> 386 [] 387 end. 388 389maybe_ntoab(A) when is_tuple(A) -> list_to_binary(rabbit_misc:ntoab(A)); 390maybe_ntoab(H) -> H. 391 392%%-------------------------------------------------------------------- 393 394%% This is slightly icky in that we introduce knowledge of 395%% rabbit_web_dispatch, which is not a dependency. But the last thing I 396%% want to do is create a rabbitmq_mochiweb_management_agent plugin. 397rabbit_web_dispatch_contexts() -> 398 [format_context(C) || C <- rabbit_web_dispatch_registry_list_all()]. 399 400%% For similar reasons we don't declare a dependency on 401%% rabbitmq_mochiweb - so at startup there's no guarantee it will be 402%% running. So we have to catch this noproc. 403rabbit_web_dispatch_registry_list_all() -> 404 case code:is_loaded(rabbit_web_dispatch_registry) of 405 false -> []; 406 _ -> try 407 M = rabbit_web_dispatch_registry, %% Fool xref 408 M:list_all() 409 catch exit:{noproc, _} -> 410 [] 411 end 412 end. 413 414format_context({Path, Description, Rest}) -> 415 [{description, list_to_binary(Description)}, 416 {path, list_to_binary("/" ++ Path)} | 417 format_mochiweb_option_list(Rest)]. 418 419format_mochiweb_option_list(C) -> 420 [{K, format_mochiweb_option(K, V)} || {K, V} <- C]. 421 422format_mochiweb_option(ssl_opts, V) -> 423 format_mochiweb_option_list(V); 424format_mochiweb_option(_K, V) -> 425 case io_lib:printable_list(V) of 426 true -> list_to_binary(V); 427 false -> list_to_binary(rabbit_misc:format("~w", [V])) 428 end. 429 430%%-------------------------------------------------------------------- 431 432init([]) -> 433 {ok, Interval} = application:get_env(rabbit, collect_statistics_interval), 434 State = #state{fd_total = file_handle_cache:ulimit(), 435 fhc_stats = get_fhc_stats(), 436 node_owners = sets:new(), 437 interval = Interval}, 438 %% We can update stats straight away as they need to be available 439 %% when the mgmt plugin starts a collector 440 {ok, emit_update(State)}. 441 442handle_call(_Req, _From, State) -> 443 {reply, unknown_request, State}. 444 445handle_cast(_C, State) -> 446 {noreply, State}. 447 448handle_info(emit_update, State) -> 449 {noreply, emit_update(State)}; 450 451handle_info(_I, State) -> 452 {noreply, State}. 453 454terminate(_, _) -> ok. 455 456code_change(_, State, _) -> {ok, State}. 457 458%%-------------------------------------------------------------------- 459 460emit_update(State0) -> 461 State1 = update_state(State0), 462 {State2, MStats} = infos(?METRICS_KEYS, [], State1), 463 {State3, PStats} = infos(?PERSISTER_KEYS, [], State2), 464 {State4, OStats} = infos(?OTHER_KEYS, [], State3), 465 [{persister_stats, PStats0}] = PStats, 466 [{name, _Name} | OStats0] = OStats, 467 rabbit_core_metrics:node_stats(persister_metrics, PStats0), 468 rabbit_core_metrics:node_stats(coarse_metrics, MStats), 469 rabbit_core_metrics:node_stats(node_metrics, OStats0), 470 rabbit_event:notify(node_stats, PStats ++ MStats ++ OStats), 471 erlang:send_after(State4#state.interval, self(), emit_update), 472 emit_node_node_stats(State4). 473 474emit_node_node_stats(State = #state{node_owners = Owners}) -> 475 Links = cluster_links(), 476 NewOwners = sets:from_list([{Node, Owner} || {Node, Owner, _} <- Links]), 477 Dead = sets:to_list(sets:subtract(Owners, NewOwners)), 478 [rabbit_event:notify( 479 node_node_deleted, [{route, Route}]) || {Node, _Owner} <- Dead, 480 Route <- [{node(), Node}, 481 {Node, node()}]], 482 [begin 483 rabbit_core_metrics:node_node_stats({node(), Node}, Stats), 484 rabbit_event:notify( 485 node_node_stats, [{route, {node(), Node}} | Stats]) 486 end || {Node, _Owner, Stats} <- Links], 487 State#state{node_owners = NewOwners}. 488 489update_state(State0) -> 490 %% Store raw data, the average operation time is calculated during querying 491 %% from the accumulated total 492 FHC = get_fhc_stats(), 493 State0#state{fhc_stats = FHC}. 494 495get_fhc_stats() -> 496 dict:to_list(dict:merge(fun(_, V1, V2) -> V1 + V2 end, 497 dict:from_list(file_handle_cache_stats:get()), 498 dict:from_list(get_ra_io_metrics()))). 499 500get_ra_io_metrics() -> 501 lists:sort(ets:tab2list(ra_io_metrics)). 502