1%% This Source Code Form is subject to the terms of the Mozilla Public
2%% License, v. 2.0. If a copy of the MPL was not distributed with this
3%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4%%
5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates.  All rights reserved.
6%%
7
8-module(rabbit_mgmt_external_stats).
9
10%% Transitional step until we can require Erlang/OTP 21 and
11%% use the now recommended try/catch syntax for obtaining the stack trace.
12-compile(nowarn_deprecated_function).
13
14-behaviour(gen_server).
15
16-export([start_link/0]).
17-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
18         code_change/3]).
19
20-export([list_registry_plugins/1]).
21
22-import(rabbit_misc, [pget/2]).
23
24-include_lib("rabbit_common/include/rabbit.hrl").
25
26-define(METRICS_KEYS, [fd_used, sockets_used, mem_used, disk_free, proc_used, gc_num,
27                       gc_bytes_reclaimed, context_switches]).
28
29-define(PERSISTER_KEYS, [persister_stats]).
30
31-define(OTHER_KEYS, [name, partitions, os_pid, fd_total, sockets_total, mem_limit,
32                     mem_alarm, disk_free_limit, disk_free_alarm, proc_total,
33                     rates_mode, uptime, run_queue, processors, exchange_types,
34                     auth_mechanisms, applications, contexts, log_files,
35                     db_dir, config_files, net_ticktime, enabled_plugins,
36                     mem_calculation_strategy, ra_open_file_metrics]).
37
38-define(TEN_MINUTES_AS_SECONDS, 600).
39
40%%--------------------------------------------------------------------
41
42-record(state, {
43    fd_total,
44    fhc_stats,
45    node_owners,
46    last_ts,
47    interval,
48    error_logged_time
49}).
50
51%%--------------------------------------------------------------------
52
53start_link() ->
54    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
55
56%%--------------------------------------------------------------------
57
58get_used_fd(State0) ->
59    try
60        case get_used_fd(os:type(), State0) of
61            {State1, UsedFd} when is_number(UsedFd) ->
62                {State1, UsedFd};
63            {State1, _Other} ->
64                %% Defaults to 0 if data is not available
65                {State1, 0}
66        end
67    catch
68        _:Error ->
69            State2 = log_fd_error("Could not infer the number of file handles used: ~p", [Error], State0),
70            {State2, 0}
71    end.
72
73get_used_fd({unix, linux}, State0) ->
74    case file:list_dir("/proc/" ++ os:getpid() ++ "/fd") of
75        {ok, Files} ->
76            {State0, length(Files)};
77        {error, _}  ->
78            get_used_fd({unix, generic}, State0)
79    end;
80
81get_used_fd({unix, BSD}, State0)
82  when BSD == openbsd; BSD == freebsd; BSD == netbsd ->
83    IsDigit = fun (D) -> lists:member(D, "0123456789*") end,
84    Output = os:cmd("fstat -p " ++ os:getpid()),
85    try
86        F = fun (Line) ->
87                    lists:all(IsDigit, lists:nth(4, string:tokens(Line, " ")))
88            end,
89        UsedFd = length(lists:filter(F, string:tokens(Output, "\n"))),
90        {State0, UsedFd}
91    catch _:Error:Stacktrace ->
92              State1 = log_fd_error("Could not parse fstat output:~n~s~n~p",
93                                    [Output, {Error, Stacktrace}], State0),
94              {State1, 0}
95    end;
96
97get_used_fd({unix, _}, State0) ->
98    Cmd = rabbit_misc:format(
99            "lsof -d \"0-9999999\" -lna -p ~s || echo failed", [os:getpid()]),
100    Res = os:cmd(Cmd),
101    case string:right(Res, 7) of
102        "failed\n" ->
103            State1 = log_fd_error("Could not obtain lsof output", [], State0),
104            {State1, 0};
105        _ ->
106            UsedFd = string:words(Res, $\n) - 1,
107            {State0, UsedFd}
108    end;
109
110%% handle.exe can be obtained from
111%% https://technet.microsoft.com/en-us/sysinternals/bb896655.aspx
112
113%% Output looks like:
114
115%% Handle v3.42
116%% Copyright (C) 1997-2008 Mark Russinovich
117%% Sysinternals - www.sysinternals.com
118%%
119%% Handle type summary:
120%%   ALPC Port       : 2
121%%   Desktop         : 1
122%%   Directory       : 1
123%%   Event           : 108
124%%   File            : 25
125%%   IoCompletion    : 3
126%%   Key             : 7
127%%   KeyedEvent      : 1
128%%   Mutant          : 1
129%%   Process         : 3
130%%   Process         : 38
131%%   Thread          : 41
132%%   Timer           : 3
133%%   TpWorkerFactory : 2
134%%   WindowStation   : 2
135%% Total handles: 238
136
137%% Nthandle v4.22 - Handle viewer
138%% Copyright (C) 1997-2019 Mark Russinovich
139%% Sysinternals - www.sysinternals.com
140%%
141%% Handle type summary:
142%%   <Unknown type>  : 1
143%%   <Unknown type>  : 166
144%%   ALPC Port       : 11
145%%   Desktop         : 1
146%%   Directory       : 2
147%%   Event           : 226
148%%   File            : 122
149%%   IoCompletion    : 8
150%%   IRTimer         : 6
151%%   Key             : 42
152%%   Mutant          : 7
153%%   Process         : 3
154%%   Section         : 2
155%%   Semaphore       : 43
156%%   Thread          : 36
157%%   TpWorkerFactory : 3
158%%   WaitCompletionPacket: 25
159%%   WindowStation   : 2
160%% Total handles: 706
161
162%% Note that the "File" number appears to include network sockets too; I assume
163%% that's the number we care about. Note also that if you omit "-s" you will
164%% see a list of file handles *without* network sockets. If you then add "-a"
165%% you will see a list of handles of various types, including network sockets
166%% shown as file handles to \Device\Afd.
167
168get_used_fd({win32, _}, State0) ->
169    Handle = rabbit_misc:os_cmd(
170               "handle.exe /accepteula -s -p " ++ os:getpid() ++ " 2> nul"),
171    case Handle of
172        [] ->
173            State1 = log_fd_error("Could not find handle.exe, please install from sysinternals", [], State0),
174            {State1, 0};
175        _  ->
176            case find_files_line(string:tokens(Handle, "\r\n")) of
177                unknown ->
178                    State1 = log_fd_error("handle.exe output did not contain "
179                                          "a line beginning with '  File ', unable "
180                                          "to determine used file descriptor "
181                                          "count: ~p", [Handle], State0),
182                    {State1, 0};
183                UsedFd ->
184                    {State0, UsedFd}
185            end
186    end.
187
188find_files_line([]) ->
189    unknown;
190find_files_line(["  File " ++ Rest | _T]) ->
191    [Files] = string:tokens(Rest, ": "),
192    list_to_integer(Files);
193find_files_line([_H | T]) ->
194    find_files_line(T).
195
196-define(SAFE_CALL(Fun, NoProcFailResult),
197    try
198        Fun
199    catch exit:{noproc, _} -> NoProcFailResult
200    end).
201
202get_disk_free_limit() -> ?SAFE_CALL(rabbit_disk_monitor:get_disk_free_limit(),
203                                    disk_free_monitoring_disabled).
204
205get_disk_free() -> ?SAFE_CALL(rabbit_disk_monitor:get_disk_free(),
206                              disk_free_monitoring_disabled).
207
208log_fd_error(Fmt, Args, #state{error_logged_time = undefined}=State) ->
209    % rabbitmq/rabbitmq-management#90
210    % no errors have been logged, so log it and make a note of when
211    Now = erlang:monotonic_time(second),
212    ok = rabbit_log:error(Fmt, Args),
213    State#state{error_logged_time = Now};
214log_fd_error(Fmt, Args, #state{error_logged_time = Time}=State) ->
215    Now = erlang:monotonic_time(second),
216    case Now >= Time + ?TEN_MINUTES_AS_SECONDS of
217        true ->
218            % rabbitmq/rabbitmq-management#90
219            % it has been longer than 10 minutes,
220            % re-log the error
221            ok = rabbit_log:error(Fmt, Args),
222            State#state{error_logged_time = Now};
223        _ ->
224            % 10 minutes have not yet passed
225            State
226    end.
227%%--------------------------------------------------------------------
228
229infos([], Acc, State) ->
230    {State, lists:reverse(Acc)};
231infos([Item|T], Acc0, State0) ->
232    {State1, Infos} = i(Item, State0),
233    Acc1 = [{Item, Infos}|Acc0],
234    infos(T, Acc1, State1).
235
236i(name, State) ->
237    {State, node()};
238i(partitions, State) ->
239    {State, rabbit_node_monitor:partitions()};
240i(fd_used, State) ->
241    get_used_fd(State);
242i(fd_total, #state{fd_total = FdTotal}=State) ->
243    {State, FdTotal};
244i(sockets_used, State) ->
245    {State, proplists:get_value(sockets_used, file_handle_cache:info([sockets_used]))};
246i(sockets_total, State) ->
247    {State, proplists:get_value(sockets_limit, file_handle_cache:info([sockets_limit]))};
248i(os_pid, State) ->
249    {State, list_to_binary(os:getpid())};
250i(mem_used, State) ->
251    {State, vm_memory_monitor:get_process_memory()};
252i(mem_calculation_strategy, State) ->
253    {State, vm_memory_monitor:get_memory_calculation_strategy()};
254i(mem_limit, State) ->
255    {State, vm_memory_monitor:get_memory_limit()};
256i(mem_alarm, State) ->
257    {State, resource_alarm_set(memory)};
258i(proc_used, State) ->
259    {State, erlang:system_info(process_count)};
260i(proc_total, State) ->
261    {State, erlang:system_info(process_limit)};
262i(run_queue, State) ->
263    {State, erlang:statistics(run_queue)};
264i(processors, State) ->
265    {State, erlang:system_info(logical_processors)};
266i(disk_free_limit, State) ->
267    {State, get_disk_free_limit()};
268i(disk_free, State) ->
269    {State, get_disk_free()};
270i(disk_free_alarm, State) ->
271    {State, resource_alarm_set(disk)};
272i(contexts, State) ->
273    {State, rabbit_web_dispatch_contexts()};
274i(uptime, State) ->
275    {Total, _} = erlang:statistics(wall_clock),
276    {State, Total};
277i(rates_mode, State) ->
278    {State, rabbit_mgmt_db_handler:rates_mode()};
279i(exchange_types, State) ->
280    {State, list_registry_plugins(exchange)};
281i(log_files, State) ->
282    {State, [list_to_binary(F) || F <- rabbit:log_locations()]};
283i(db_dir, State) ->
284    {State, list_to_binary(rabbit_mnesia:dir())};
285i(config_files, State) ->
286    {State, [list_to_binary(F) || F <- rabbit:config_files()]};
287i(net_ticktime, State) ->
288    {State, net_kernel:get_net_ticktime()};
289i(persister_stats, State) ->
290    {State, persister_stats(State)};
291i(enabled_plugins, State) ->
292    {ok, Dir} = application:get_env(rabbit, enabled_plugins_file),
293    {State, rabbit_plugins:read_enabled(Dir)};
294i(auth_mechanisms, State) ->
295    {ok, Mechanisms} = application:get_env(rabbit, auth_mechanisms),
296    F = fun (N) ->
297                lists:member(list_to_atom(binary_to_list(N)), Mechanisms)
298        end,
299    {State, list_registry_plugins(auth_mechanism, F)};
300i(applications, State) ->
301    {State, [format_application(A) || A <- lists:keysort(1, rabbit_misc:which_applications())]};
302i(gc_num, State) ->
303    {GCs, _, _} = erlang:statistics(garbage_collection),
304    {State, GCs};
305i(gc_bytes_reclaimed, State) ->
306    {_, Words, _} = erlang:statistics(garbage_collection),
307    {State, Words * erlang:system_info(wordsize)};
308i(context_switches, State) ->
309    {Sw, 0} = erlang:statistics(context_switches),
310    {State, Sw};
311i(ra_open_file_metrics, State) ->
312    {State, [{ra_log_wal, ra_metrics(ra_log_wal)},
313             {ra_log_segment_writer, ra_metrics(ra_log_segment_writer)}]}.
314
315ra_metrics(K) ->
316    try
317        case ets:lookup(ra_open_file_metrics, whereis(K)) of
318            [] -> 0;
319            [{_, C}] -> C
320        end
321    catch
322        error:badarg ->
323            %% On startup the mgmt might start before ra does
324            0
325    end.
326
327resource_alarm_set(Source) ->
328    lists:member({{resource_limit, Source, node()},[]},
329                 rabbit_alarm:get_alarms()).
330
331list_registry_plugins(Type) ->
332    list_registry_plugins(Type, fun(_) -> true end).
333
334list_registry_plugins(Type, Fun) ->
335    [registry_plugin_enabled(set_plugin_name(Name, Module), Fun) ||
336        {Name, Module} <- rabbit_registry:lookup_all(Type)].
337
338registry_plugin_enabled(Desc, Fun) ->
339    Desc ++ [{enabled, Fun(proplists:get_value(name, Desc))}].
340
341format_application({Application, Description, Version}) ->
342    [{name, Application},
343     {description, list_to_binary(Description)},
344     {version, list_to_binary(Version)}].
345
346set_plugin_name(Name, Module) ->
347    [{name, list_to_binary(atom_to_list(Name))} |
348     proplists:delete(name, Module:description())].
349
350persister_stats(#state{fhc_stats = FHC}) ->
351    [{flatten_key(K), V} || {{_Op, _Type} = K, V} <- FHC].
352
353flatten_key({A, B}) ->
354    list_to_atom(atom_to_list(A) ++ "_" ++ atom_to_list(B)).
355
356cluster_links() ->
357    {ok, Items} = net_kernel:nodes_info(),
358    [Link || Item <- Items,
359             Link <- [format_nodes_info(Item)], Link =/= undefined].
360
361format_nodes_info({Node, Info}) ->
362    Owner = proplists:get_value(owner, Info),
363    case catch process_info(Owner, links) of
364        {links, Links} ->
365            case [Link || Link <- Links, is_port(Link)] of
366                [Port] ->
367                    {Node, Owner, format_nodes_info1(Port)};
368                _ ->
369                    undefined
370            end;
371        _ ->
372            undefined
373    end.
374
375format_nodes_info1(Port) ->
376    case {rabbit_net:socket_ends(Port, inbound),
377          rabbit_net:getstat(Port, [recv_oct, send_oct])} of
378        {{ok, {PeerAddr, PeerPort, SockAddr, SockPort}}, {ok, Stats}} ->
379            [{peer_addr, maybe_ntoab(PeerAddr)},
380             {peer_port, PeerPort},
381             {sock_addr, maybe_ntoab(SockAddr)},
382             {sock_port, SockPort},
383             {recv_bytes, pget(recv_oct, Stats)},
384             {send_bytes, pget(send_oct, Stats)}];
385        _ ->
386            []
387    end.
388
389maybe_ntoab(A) when is_tuple(A) -> list_to_binary(rabbit_misc:ntoab(A));
390maybe_ntoab(H)                  -> H.
391
392%%--------------------------------------------------------------------
393
394%% This is slightly icky in that we introduce knowledge of
395%% rabbit_web_dispatch, which is not a dependency. But the last thing I
396%% want to do is create a rabbitmq_mochiweb_management_agent plugin.
397rabbit_web_dispatch_contexts() ->
398    [format_context(C) || C <- rabbit_web_dispatch_registry_list_all()].
399
400%% For similar reasons we don't declare a dependency on
401%% rabbitmq_mochiweb - so at startup there's no guarantee it will be
402%% running. So we have to catch this noproc.
403rabbit_web_dispatch_registry_list_all() ->
404    case code:is_loaded(rabbit_web_dispatch_registry) of
405        false -> [];
406        _     -> try
407                     M = rabbit_web_dispatch_registry, %% Fool xref
408                     M:list_all()
409                 catch exit:{noproc, _} ->
410                         []
411                 end
412    end.
413
414format_context({Path, Description, Rest}) ->
415    [{description, list_to_binary(Description)},
416     {path,        list_to_binary("/" ++ Path)} |
417     format_mochiweb_option_list(Rest)].
418
419format_mochiweb_option_list(C) ->
420    [{K, format_mochiweb_option(K, V)} || {K, V} <- C].
421
422format_mochiweb_option(ssl_opts, V) ->
423    format_mochiweb_option_list(V);
424format_mochiweb_option(_K, V) ->
425    case io_lib:printable_list(V) of
426        true  -> list_to_binary(V);
427        false -> list_to_binary(rabbit_misc:format("~w", [V]))
428    end.
429
430%%--------------------------------------------------------------------
431
432init([]) ->
433    {ok, Interval}   = application:get_env(rabbit, collect_statistics_interval),
434    State = #state{fd_total    = file_handle_cache:ulimit(),
435                   fhc_stats   = get_fhc_stats(),
436                   node_owners = sets:new(),
437                   interval    = Interval},
438    %% We can update stats straight away as they need to be available
439    %% when the mgmt plugin starts a collector
440    {ok, emit_update(State)}.
441
442handle_call(_Req, _From, State) ->
443    {reply, unknown_request, State}.
444
445handle_cast(_C, State) ->
446    {noreply, State}.
447
448handle_info(emit_update, State) ->
449    {noreply, emit_update(State)};
450
451handle_info(_I, State) ->
452    {noreply, State}.
453
454terminate(_, _) -> ok.
455
456code_change(_, State, _) -> {ok, State}.
457
458%%--------------------------------------------------------------------
459
460emit_update(State0) ->
461    State1 = update_state(State0),
462    {State2, MStats} = infos(?METRICS_KEYS, [], State1),
463    {State3, PStats} = infos(?PERSISTER_KEYS, [], State2),
464    {State4, OStats} = infos(?OTHER_KEYS, [], State3),
465    [{persister_stats, PStats0}] = PStats,
466    [{name, _Name} | OStats0] = OStats,
467    rabbit_core_metrics:node_stats(persister_metrics, PStats0),
468    rabbit_core_metrics:node_stats(coarse_metrics, MStats),
469    rabbit_core_metrics:node_stats(node_metrics, OStats0),
470    rabbit_event:notify(node_stats, PStats ++ MStats ++ OStats),
471    erlang:send_after(State4#state.interval, self(), emit_update),
472    emit_node_node_stats(State4).
473
474emit_node_node_stats(State = #state{node_owners = Owners}) ->
475    Links = cluster_links(),
476    NewOwners = sets:from_list([{Node, Owner} || {Node, Owner, _} <- Links]),
477    Dead = sets:to_list(sets:subtract(Owners, NewOwners)),
478    [rabbit_event:notify(
479       node_node_deleted, [{route, Route}]) || {Node, _Owner} <- Dead,
480                                                Route <- [{node(), Node},
481                                                          {Node,   node()}]],
482    [begin
483         rabbit_core_metrics:node_node_stats({node(), Node}, Stats),
484         rabbit_event:notify(
485           node_node_stats, [{route, {node(), Node}} | Stats])
486     end || {Node, _Owner, Stats} <- Links],
487    State#state{node_owners = NewOwners}.
488
489update_state(State0) ->
490    %% Store raw data, the average operation time is calculated during querying
491    %% from the accumulated total
492    FHC = get_fhc_stats(),
493    State0#state{fhc_stats = FHC}.
494
495get_fhc_stats() ->
496    dict:to_list(dict:merge(fun(_, V1, V2) -> V1 + V2 end,
497                            dict:from_list(file_handle_cache_stats:get()),
498                            dict:from_list(get_ra_io_metrics()))).
499
500get_ra_io_metrics() ->
501    lists:sort(ets:tab2list(ra_io_metrics)).
502