1%% This Source Code Form is subject to the terms of the Mozilla Public
2%% License, v. 2.0. If a copy of the MPL was not distributed with this
3%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4%%
5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates.  All rights reserved.
6%%
7
8
9%% This module handles the node-wide memory statistics.
10%% It receives statistics from all queues, counts the desired
11%% queue length (in seconds), and sends this information back to
12%% queues.
13
14-module(rabbit_memory_monitor).
15
16-behaviour(gen_server2).
17
18-export([start_link/0, register/2, deregister/1,
19         report_ram_duration/2, stop/0, conserve_resources/3, memory_use/1]).
20
21-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
22         terminate/2, code_change/3]).
23
24-record(process, {pid, reported, sent, callback, monitor}).
25
26-record(state, {timer,                %% 'internal_update' timer
27                queue_durations,      %% ets #process
28                queue_duration_sum,   %% sum of all queue_durations
29                queue_duration_count, %% number of elements in sum
30                desired_duration,     %% the desired queue duration
31                disk_alarm            %% disable paging, disk alarm has fired
32               }).
33
34-define(SERVER, ?MODULE).
35-define(TABLE_NAME, ?MODULE).
36
37%% If all queues are pushed to disk (duration 0), then the sum of
38%% their reported lengths will be 0. If memory then becomes available,
39%% unless we manually intervene, the sum will remain 0, and the queues
40%% will never get a non-zero duration.  Thus when the mem use is <
41%% SUM_INC_THRESHOLD, increase the sum artificially by SUM_INC_AMOUNT.
42-define(SUM_INC_THRESHOLD, 0.95).
43-define(SUM_INC_AMOUNT, 1.0).
44
45-define(EPSILON, 0.000001). %% less than this and we clamp to 0
46
47%%----------------------------------------------------------------------------
48%% Public API
49%%----------------------------------------------------------------------------
50
51-spec start_link() -> rabbit_types:ok_pid_or_error().
52
53start_link() ->
54    gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []).
55
56-spec register(pid(), {atom(),atom(),[any()]}) -> 'ok'.
57
58register(Pid, MFA = {_M, _F, _A}) ->
59    gen_server2:call(?SERVER, {register, Pid, MFA}, infinity).
60
61-spec deregister(pid()) -> 'ok'.
62
63deregister(Pid) ->
64    gen_server2:cast(?SERVER, {deregister, Pid}).
65
66-spec report_ram_duration
67        (pid(), float() | 'infinity') -> number() | 'infinity'.
68
69report_ram_duration(Pid, QueueDuration) ->
70    gen_server2:call(?SERVER,
71                     {report_ram_duration, Pid, QueueDuration}, infinity).
72
73-spec stop() -> 'ok'.
74
75stop() ->
76    gen_server2:cast(?SERVER, stop).
77
78%% Paging should be enabled/disabled only in response to disk resource alarms
79%% for the current node.
80conserve_resources(Pid, disk, {_, Conserve, Node}) when node(Pid) =:= Node ->
81    gen_server2:cast(Pid, {disk_alarm, Conserve});
82conserve_resources(_Pid, _Source, _Conserve) ->
83    ok.
84
85memory_use(Type) ->
86    vm_memory_monitor:get_memory_use(Type).
87
88%%----------------------------------------------------------------------------
89%% Gen_server callbacks
90%%----------------------------------------------------------------------------
91
92init([]) ->
93    {ok, Interval} = application:get_env(rabbit, memory_monitor_interval),
94    {ok, TRef} = timer:send_interval(Interval, update),
95
96    Ets = ets:new(?TABLE_NAME, [set, private, {keypos, #process.pid}]),
97    Alarms = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
98    {ok, internal_update(
99           #state { timer                = TRef,
100                    queue_durations      = Ets,
101                    queue_duration_sum   = 0.0,
102                    queue_duration_count = 0,
103                    desired_duration     = infinity,
104                    disk_alarm           = lists:member(disk, Alarms)})}.
105
106handle_call({report_ram_duration, Pid, QueueDuration}, From,
107            State = #state { queue_duration_sum = Sum,
108                             queue_duration_count = Count,
109                             queue_durations = Durations,
110                             desired_duration = SendDuration }) ->
111
112    [Proc = #process { reported = PrevQueueDuration }] =
113        ets:lookup(Durations, Pid),
114
115    gen_server2:reply(From, SendDuration),
116
117    {Sum1, Count1} =
118        case {PrevQueueDuration, QueueDuration} of
119            {infinity, infinity} -> {Sum, Count};
120            {infinity, _}        -> {Sum + QueueDuration,    Count + 1};
121            {_, infinity}        -> {Sum - PrevQueueDuration, Count - 1};
122            {_, _}               -> {Sum - PrevQueueDuration + QueueDuration,
123                                     Count}
124        end,
125    true = ets:insert(Durations, Proc #process { reported = QueueDuration,
126                                                 sent = SendDuration }),
127    {noreply, State #state { queue_duration_sum = zero_clamp(Sum1),
128                             queue_duration_count = Count1 }};
129
130handle_call({register, Pid, MFA}, _From,
131            State = #state { queue_durations = Durations }) ->
132    MRef = erlang:monitor(process, Pid),
133    true = ets:insert(Durations, #process { pid = Pid, reported = infinity,
134                                            sent = infinity, callback = MFA,
135                                            monitor = MRef }),
136    {reply, ok, State};
137
138handle_call(_Request, _From, State) ->
139    {noreply, State}.
140
141handle_cast({disk_alarm, Alarm}, State = #state{disk_alarm = Alarm}) ->
142    {noreply, State};
143
144handle_cast({disk_alarm, Alarm}, State) ->
145    {noreply, internal_update(State#state{disk_alarm = Alarm})};
146
147handle_cast({deregister, Pid}, State) ->
148    {noreply, internal_deregister(Pid, true, State)};
149
150handle_cast(stop, State) ->
151    {stop, normal, State};
152
153handle_cast(_Request, State) ->
154    {noreply, State}.
155
156handle_info(update, State) ->
157    {noreply, internal_update(State)};
158
159handle_info({'DOWN', _MRef, process, Pid, _Reason}, State) ->
160    {noreply, internal_deregister(Pid, false, State)};
161
162handle_info(_Info, State) ->
163    {noreply, State}.
164
165terminate(_Reason, #state { timer = TRef }) ->
166    timer:cancel(TRef),
167    ok.
168
169code_change(_OldVsn, State, _Extra) ->
170    {ok, State}.
171
172
173%%----------------------------------------------------------------------------
174%% Internal functions
175%%----------------------------------------------------------------------------
176
177zero_clamp(Sum) when Sum < ?EPSILON -> 0.0;
178zero_clamp(Sum)                     -> Sum.
179
180internal_deregister(Pid, Demonitor,
181                    State = #state { queue_duration_sum = Sum,
182                                     queue_duration_count = Count,
183                                     queue_durations = Durations }) ->
184    case ets:lookup(Durations, Pid) of
185        [] -> State;
186        [#process { reported = PrevQueueDuration, monitor = MRef }] ->
187            true = case Demonitor of
188                       true  -> erlang:demonitor(MRef);
189                       false -> true
190                   end,
191            {Sum1, Count1} =
192                case PrevQueueDuration of
193                    infinity -> {Sum, Count};
194                    _        -> {zero_clamp(Sum - PrevQueueDuration),
195                                 Count - 1}
196                end,
197            true = ets:delete(Durations, Pid),
198            State #state { queue_duration_sum = Sum1,
199                           queue_duration_count = Count1 }
200    end.
201
202internal_update(State = #state{queue_durations  = Durations,
203                               desired_duration = DesiredDurationAvg,
204                               disk_alarm       = DiskAlarm}) ->
205    DesiredDurationAvg1 = desired_duration_average(State),
206    ShouldInform = should_inform_predicate(DiskAlarm),
207    case ShouldInform(DesiredDurationAvg, DesiredDurationAvg1) of
208        true  -> inform_queues(ShouldInform, DesiredDurationAvg1, Durations);
209        false -> ok
210    end,
211    State#state{desired_duration = DesiredDurationAvg1}.
212
213desired_duration_average(#state{disk_alarm           = true}) ->
214    infinity;
215desired_duration_average(#state{disk_alarm           = false,
216                                queue_duration_sum   = Sum,
217                                queue_duration_count = Count}) ->
218    {ok, LimitThreshold} =
219        application:get_env(rabbit, vm_memory_high_watermark_paging_ratio),
220    MemoryRatio = memory_use(ratio),
221    if MemoryRatio =:= infinity ->
222            0.0;
223       MemoryRatio < LimitThreshold orelse Count == 0 ->
224            infinity;
225       MemoryRatio < ?SUM_INC_THRESHOLD ->
226            ((Sum + ?SUM_INC_AMOUNT) / Count) / MemoryRatio;
227       true ->
228            (Sum / Count) / MemoryRatio
229    end.
230
231inform_queues(ShouldInform, DesiredDurationAvg, Durations) ->
232    true =
233        ets:foldl(
234          fun (Proc = #process{reported = QueueDuration,
235                               sent     = PrevSendDuration,
236                               callback = {M, F, A}}, true) ->
237                  case ShouldInform(PrevSendDuration, DesiredDurationAvg)
238                      andalso ShouldInform(QueueDuration, DesiredDurationAvg) of
239                      true  -> ok = erlang:apply(
240                                      M, F, A ++ [DesiredDurationAvg]),
241                               ets:insert(
242                                 Durations,
243                                 Proc#process{sent = DesiredDurationAvg});
244                      false -> true
245                  end
246          end, true, Durations).
247
248%% In normal use, we only inform queues immediately if the desired
249%% duration has decreased, we want to ensure timely paging.
250should_inform_predicate(false) -> fun greater_than/2;
251%% When the disk alarm has gone off though, we want to inform queues
252%% immediately if the desired duration has *increased* - we want to
253%% ensure timely stopping paging.
254should_inform_predicate(true) ->  fun (D1, D2) -> greater_than(D2, D1) end.
255
256greater_than(infinity, infinity) -> false;
257greater_than(infinity, _D2)      -> true;
258greater_than(_D1,      infinity) -> false;
259greater_than(D1,       D2)       -> D1 > D2.
260