1%% This Source Code Form is subject to the terms of the Mozilla Public 2%% License, v. 2.0. If a copy of the MPL was not distributed with this 3%% file, You can obtain one at https://mozilla.org/MPL/2.0/. 4%% 5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved. 6%% 7 8 9%% This module handles the node-wide memory statistics. 10%% It receives statistics from all queues, counts the desired 11%% queue length (in seconds), and sends this information back to 12%% queues. 13 14-module(rabbit_memory_monitor). 15 16-behaviour(gen_server2). 17 18-export([start_link/0, register/2, deregister/1, 19 report_ram_duration/2, stop/0, conserve_resources/3, memory_use/1]). 20 21-export([init/1, handle_call/3, handle_cast/2, handle_info/2, 22 terminate/2, code_change/3]). 23 24-record(process, {pid, reported, sent, callback, monitor}). 25 26-record(state, {timer, %% 'internal_update' timer 27 queue_durations, %% ets #process 28 queue_duration_sum, %% sum of all queue_durations 29 queue_duration_count, %% number of elements in sum 30 desired_duration, %% the desired queue duration 31 disk_alarm %% disable paging, disk alarm has fired 32 }). 33 34-define(SERVER, ?MODULE). 35-define(TABLE_NAME, ?MODULE). 36 37%% If all queues are pushed to disk (duration 0), then the sum of 38%% their reported lengths will be 0. If memory then becomes available, 39%% unless we manually intervene, the sum will remain 0, and the queues 40%% will never get a non-zero duration. Thus when the mem use is < 41%% SUM_INC_THRESHOLD, increase the sum artificially by SUM_INC_AMOUNT. 42-define(SUM_INC_THRESHOLD, 0.95). 43-define(SUM_INC_AMOUNT, 1.0). 44 45-define(EPSILON, 0.000001). %% less than this and we clamp to 0 46 47%%---------------------------------------------------------------------------- 48%% Public API 49%%---------------------------------------------------------------------------- 50 51-spec start_link() -> rabbit_types:ok_pid_or_error(). 52 53start_link() -> 54 gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []). 55 56-spec register(pid(), {atom(),atom(),[any()]}) -> 'ok'. 57 58register(Pid, MFA = {_M, _F, _A}) -> 59 gen_server2:call(?SERVER, {register, Pid, MFA}, infinity). 60 61-spec deregister(pid()) -> 'ok'. 62 63deregister(Pid) -> 64 gen_server2:cast(?SERVER, {deregister, Pid}). 65 66-spec report_ram_duration 67 (pid(), float() | 'infinity') -> number() | 'infinity'. 68 69report_ram_duration(Pid, QueueDuration) -> 70 gen_server2:call(?SERVER, 71 {report_ram_duration, Pid, QueueDuration}, infinity). 72 73-spec stop() -> 'ok'. 74 75stop() -> 76 gen_server2:cast(?SERVER, stop). 77 78%% Paging should be enabled/disabled only in response to disk resource alarms 79%% for the current node. 80conserve_resources(Pid, disk, {_, Conserve, Node}) when node(Pid) =:= Node -> 81 gen_server2:cast(Pid, {disk_alarm, Conserve}); 82conserve_resources(_Pid, _Source, _Conserve) -> 83 ok. 84 85memory_use(Type) -> 86 vm_memory_monitor:get_memory_use(Type). 87 88%%---------------------------------------------------------------------------- 89%% Gen_server callbacks 90%%---------------------------------------------------------------------------- 91 92init([]) -> 93 {ok, Interval} = application:get_env(rabbit, memory_monitor_interval), 94 {ok, TRef} = timer:send_interval(Interval, update), 95 96 Ets = ets:new(?TABLE_NAME, [set, private, {keypos, #process.pid}]), 97 Alarms = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}), 98 {ok, internal_update( 99 #state { timer = TRef, 100 queue_durations = Ets, 101 queue_duration_sum = 0.0, 102 queue_duration_count = 0, 103 desired_duration = infinity, 104 disk_alarm = lists:member(disk, Alarms)})}. 105 106handle_call({report_ram_duration, Pid, QueueDuration}, From, 107 State = #state { queue_duration_sum = Sum, 108 queue_duration_count = Count, 109 queue_durations = Durations, 110 desired_duration = SendDuration }) -> 111 112 [Proc = #process { reported = PrevQueueDuration }] = 113 ets:lookup(Durations, Pid), 114 115 gen_server2:reply(From, SendDuration), 116 117 {Sum1, Count1} = 118 case {PrevQueueDuration, QueueDuration} of 119 {infinity, infinity} -> {Sum, Count}; 120 {infinity, _} -> {Sum + QueueDuration, Count + 1}; 121 {_, infinity} -> {Sum - PrevQueueDuration, Count - 1}; 122 {_, _} -> {Sum - PrevQueueDuration + QueueDuration, 123 Count} 124 end, 125 true = ets:insert(Durations, Proc #process { reported = QueueDuration, 126 sent = SendDuration }), 127 {noreply, State #state { queue_duration_sum = zero_clamp(Sum1), 128 queue_duration_count = Count1 }}; 129 130handle_call({register, Pid, MFA}, _From, 131 State = #state { queue_durations = Durations }) -> 132 MRef = erlang:monitor(process, Pid), 133 true = ets:insert(Durations, #process { pid = Pid, reported = infinity, 134 sent = infinity, callback = MFA, 135 monitor = MRef }), 136 {reply, ok, State}; 137 138handle_call(_Request, _From, State) -> 139 {noreply, State}. 140 141handle_cast({disk_alarm, Alarm}, State = #state{disk_alarm = Alarm}) -> 142 {noreply, State}; 143 144handle_cast({disk_alarm, Alarm}, State) -> 145 {noreply, internal_update(State#state{disk_alarm = Alarm})}; 146 147handle_cast({deregister, Pid}, State) -> 148 {noreply, internal_deregister(Pid, true, State)}; 149 150handle_cast(stop, State) -> 151 {stop, normal, State}; 152 153handle_cast(_Request, State) -> 154 {noreply, State}. 155 156handle_info(update, State) -> 157 {noreply, internal_update(State)}; 158 159handle_info({'DOWN', _MRef, process, Pid, _Reason}, State) -> 160 {noreply, internal_deregister(Pid, false, State)}; 161 162handle_info(_Info, State) -> 163 {noreply, State}. 164 165terminate(_Reason, #state { timer = TRef }) -> 166 timer:cancel(TRef), 167 ok. 168 169code_change(_OldVsn, State, _Extra) -> 170 {ok, State}. 171 172 173%%---------------------------------------------------------------------------- 174%% Internal functions 175%%---------------------------------------------------------------------------- 176 177zero_clamp(Sum) when Sum < ?EPSILON -> 0.0; 178zero_clamp(Sum) -> Sum. 179 180internal_deregister(Pid, Demonitor, 181 State = #state { queue_duration_sum = Sum, 182 queue_duration_count = Count, 183 queue_durations = Durations }) -> 184 case ets:lookup(Durations, Pid) of 185 [] -> State; 186 [#process { reported = PrevQueueDuration, monitor = MRef }] -> 187 true = case Demonitor of 188 true -> erlang:demonitor(MRef); 189 false -> true 190 end, 191 {Sum1, Count1} = 192 case PrevQueueDuration of 193 infinity -> {Sum, Count}; 194 _ -> {zero_clamp(Sum - PrevQueueDuration), 195 Count - 1} 196 end, 197 true = ets:delete(Durations, Pid), 198 State #state { queue_duration_sum = Sum1, 199 queue_duration_count = Count1 } 200 end. 201 202internal_update(State = #state{queue_durations = Durations, 203 desired_duration = DesiredDurationAvg, 204 disk_alarm = DiskAlarm}) -> 205 DesiredDurationAvg1 = desired_duration_average(State), 206 ShouldInform = should_inform_predicate(DiskAlarm), 207 case ShouldInform(DesiredDurationAvg, DesiredDurationAvg1) of 208 true -> inform_queues(ShouldInform, DesiredDurationAvg1, Durations); 209 false -> ok 210 end, 211 State#state{desired_duration = DesiredDurationAvg1}. 212 213desired_duration_average(#state{disk_alarm = true}) -> 214 infinity; 215desired_duration_average(#state{disk_alarm = false, 216 queue_duration_sum = Sum, 217 queue_duration_count = Count}) -> 218 {ok, LimitThreshold} = 219 application:get_env(rabbit, vm_memory_high_watermark_paging_ratio), 220 MemoryRatio = memory_use(ratio), 221 if MemoryRatio =:= infinity -> 222 0.0; 223 MemoryRatio < LimitThreshold orelse Count == 0 -> 224 infinity; 225 MemoryRatio < ?SUM_INC_THRESHOLD -> 226 ((Sum + ?SUM_INC_AMOUNT) / Count) / MemoryRatio; 227 true -> 228 (Sum / Count) / MemoryRatio 229 end. 230 231inform_queues(ShouldInform, DesiredDurationAvg, Durations) -> 232 true = 233 ets:foldl( 234 fun (Proc = #process{reported = QueueDuration, 235 sent = PrevSendDuration, 236 callback = {M, F, A}}, true) -> 237 case ShouldInform(PrevSendDuration, DesiredDurationAvg) 238 andalso ShouldInform(QueueDuration, DesiredDurationAvg) of 239 true -> ok = erlang:apply( 240 M, F, A ++ [DesiredDurationAvg]), 241 ets:insert( 242 Durations, 243 Proc#process{sent = DesiredDurationAvg}); 244 false -> true 245 end 246 end, true, Durations). 247 248%% In normal use, we only inform queues immediately if the desired 249%% duration has decreased, we want to ensure timely paging. 250should_inform_predicate(false) -> fun greater_than/2; 251%% When the disk alarm has gone off though, we want to inform queues 252%% immediately if the desired duration has *increased* - we want to 253%% ensure timely stopping paging. 254should_inform_predicate(true) -> fun (D1, D2) -> greater_than(D2, D1) end. 255 256greater_than(infinity, infinity) -> false; 257greater_than(infinity, _D2) -> true; 258greater_than(_D1, infinity) -> false; 259greater_than(D1, D2) -> D1 > D2. 260