1%% This Source Code Form is subject to the terms of the Mozilla Public 2%% License, v. 2.0. If a copy of the MPL was not distributed with this 3%% file, You can obtain one at https://mozilla.org/MPL/2.0/. 4%% 5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved. 6%% 7 8-module(rabbit_heartbeat). 9 10-export([start/6, start/7]). 11-export([start_heartbeat_sender/4, start_heartbeat_receiver/4, 12 pause_monitor/1, resume_monitor/1]). 13 14-export([system_continue/3, system_terminate/4, system_code_change/4]). 15 16-include("rabbit.hrl"). 17 18%%---------------------------------------------------------------------------- 19 20-export_type([heartbeaters/0]). 21 22-type heartbeaters() :: {rabbit_types:maybe(pid()), rabbit_types:maybe(pid())}. 23 24-type heartbeat_callback() :: fun (() -> any()). 25 26-export_type([heartbeat_timeout/0]). 27-type heartbeat_timeout() :: non_neg_integer(). 28 29-spec start 30 (pid(), rabbit_net:socket(), heartbeat_timeout(), heartbeat_callback(), 31 heartbeat_timeout(), heartbeat_callback()) -> 32 heartbeaters(). 33 34-spec start 35 (pid(), rabbit_net:socket(), rabbit_types:proc_name(), 36 heartbeat_timeout(), heartbeat_callback(), heartbeat_timeout(), 37 heartbeat_callback()) -> 38 heartbeaters(). 39 40-spec start_heartbeat_sender 41 (rabbit_net:socket(), heartbeat_timeout(), heartbeat_callback(), 42 rabbit_types:proc_type_and_name()) -> 43 rabbit_types:ok(pid()). 44-spec start_heartbeat_receiver 45 (rabbit_net:socket(), heartbeat_timeout(), heartbeat_callback(), 46 rabbit_types:proc_type_and_name()) -> 47 rabbit_types:ok(pid()). 48 49-spec pause_monitor(heartbeaters()) -> 'ok'. 50-spec resume_monitor(heartbeaters()) -> 'ok'. 51 52-spec system_code_change(_,_,_,_) -> {'ok',_}. 53-spec system_continue(_,_,{_, _}) -> any(). 54-spec system_terminate(_,_,_,_) -> no_return(). 55 56%%---------------------------------------------------------------------------- 57start(SupPid, Sock, SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) -> 58 start(SupPid, Sock, unknown, 59 SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun). 60 61start(SupPid, Sock, Identity, 62 SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) -> 63 {ok, Sender} = 64 start_heartbeater(SendTimeoutSec, SupPid, Sock, 65 SendFun, heartbeat_sender, 66 start_heartbeat_sender, Identity), 67 {ok, Receiver} = 68 start_heartbeater(ReceiveTimeoutSec, SupPid, Sock, 69 ReceiveFun, heartbeat_receiver, 70 start_heartbeat_receiver, Identity), 71 {Sender, Receiver}. 72 73start_heartbeat_sender(Sock, TimeoutSec, SendFun, Identity) -> 74 %% the 'div 2' is there so that we don't end up waiting for nearly 75 %% 2 * TimeoutSec before sending a heartbeat in the boundary case 76 %% where the last message was sent just after a heartbeat. 77 heartbeater({Sock, TimeoutSec * 1000 div 2, send_oct, 0, 78 fun () -> SendFun(), continue end}, Identity). 79 80start_heartbeat_receiver(Sock, TimeoutSec, ReceiveFun, Identity) -> 81 %% we check for incoming data every interval, and time out after 82 %% two checks with no change. As a result we will time out between 83 %% 2 and 3 intervals after the last data has been received. 84 heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1, 85 fun () -> ReceiveFun(), stop end}, Identity). 86 87pause_monitor({_Sender, none}) -> ok; 88pause_monitor({_Sender, Receiver}) -> Receiver ! pause, ok. 89 90resume_monitor({_Sender, none}) -> ok; 91resume_monitor({_Sender, Receiver}) -> Receiver ! resume, ok. 92 93system_continue(_Parent, Deb, {Params, State}) -> 94 heartbeater(Params, Deb, State). 95 96system_terminate(Reason, _Parent, _Deb, _State) -> 97 exit(Reason). 98 99system_code_change(Misc, _Module, _OldVsn, _Extra) -> 100 {ok, Misc}. 101 102%%---------------------------------------------------------------------------- 103start_heartbeater(0, _SupPid, _Sock, _TimeoutFun, _Name, _Callback, 104 _Identity) -> 105 {ok, none}; 106start_heartbeater(TimeoutSec, SupPid, Sock, TimeoutFun, Name, Callback, 107 Identity) -> 108 supervisor2:start_child( 109 SupPid, {Name, 110 {rabbit_heartbeat, Callback, 111 [Sock, TimeoutSec, TimeoutFun, {Name, Identity}]}, 112 transient, ?WORKER_WAIT, worker, [rabbit_heartbeat]}). 113 114heartbeater(Params, Identity) -> 115 Deb = sys:debug_options([]), 116 {ok, proc_lib:spawn_link(fun () -> 117 rabbit_misc:store_proc_name(Identity), 118 heartbeater(Params, Deb, {0, 0}) 119 end)}. 120 121heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params, 122 Deb, {StatVal0, SameCount} = State) -> 123 Recurse = fun (State1) -> heartbeater(Params, Deb, State1) end, 124 System = fun (From, Req) -> 125 sys:handle_system_msg( 126 Req, From, self(), ?MODULE, Deb, {Params, State}) 127 end, 128 receive 129 pause -> 130 receive 131 resume -> Recurse({0, 0}); 132 {system, From, Req} -> System(From, Req); 133 Other -> exit({unexpected_message, Other}) 134 end; 135 {system, From, Req} -> 136 System(From, Req); 137 Other -> 138 exit({unexpected_message, Other}) 139 after TimeoutMillisec -> 140 OkFun = fun(StatVal1) -> 141 if StatVal0 =:= 0 andalso StatName =:= send_oct -> 142 % Note: this clause is necessary to ensure the 143 % first RMQ -> client heartbeat is sent at the 144 % first interval, instead of waiting the first 145 % two intervals 146 {run_handler, {StatVal1, 0}}; 147 StatVal1 =/= StatVal0 -> 148 {recurse, {StatVal1, 0}}; 149 SameCount < Threshold -> 150 {recurse, {StatVal1, SameCount +1}}; 151 true -> 152 {run_handler, {StatVal1, 0}} 153 end 154 end, 155 SSResult = get_sock_stats(Sock, StatName, OkFun), 156 handle_get_sock_stats(SSResult, Sock, StatName, Recurse, Handler) 157 end. 158 159handle_get_sock_stats(stop, _Sock, _StatName, _Recurse, _Handler) -> 160 ok; 161handle_get_sock_stats({recurse, RecurseArg}, _Sock, _StatName, Recurse, _Handler) -> 162 Recurse(RecurseArg); 163handle_get_sock_stats({run_handler, {_, SameCount}}, Sock, StatName, Recurse, Handler) -> 164 case Handler() of 165 stop -> ok; 166 continue -> 167 OkFun = fun(StatVal) -> 168 {recurse, {StatVal, SameCount}} 169 end, 170 SSResult = get_sock_stats(Sock, StatName, OkFun), 171 handle_get_sock_stats(SSResult, Sock, StatName, Recurse, Handler) 172 end. 173 174get_sock_stats(Sock, StatName, OkFun) -> 175 case rabbit_net:getstat(Sock, [StatName]) of 176 {ok, [{StatName, StatVal}]} -> 177 OkFun(StatVal); 178 {error, einval} -> 179 %% the socket is dead, most likely because the 180 %% connection is being shut down -> terminate 181 stop; 182 {error, Reason} -> 183 exit({cannot_get_socket_stats, Reason}) 184 end. 185