1%% This Source Code Form is subject to the terms of the Mozilla Public
2%% License, v. 2.0. If a copy of the MPL was not distributed with this
3%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4%%
5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates.  All rights reserved.
6%%
7
8-module(rabbit_heartbeat).
9
10-export([start/6, start/7]).
11-export([start_heartbeat_sender/4, start_heartbeat_receiver/4,
12         pause_monitor/1, resume_monitor/1]).
13
14-export([system_continue/3, system_terminate/4, system_code_change/4]).
15
16-include("rabbit.hrl").
17
18%%----------------------------------------------------------------------------
19
20-export_type([heartbeaters/0]).
21
22-type heartbeaters() :: {rabbit_types:maybe(pid()), rabbit_types:maybe(pid())}.
23
24-type heartbeat_callback() :: fun (() -> any()).
25
26-export_type([heartbeat_timeout/0]).
27-type heartbeat_timeout() :: non_neg_integer().
28
29-spec start
30        (pid(), rabbit_net:socket(), heartbeat_timeout(), heartbeat_callback(),
31         heartbeat_timeout(), heartbeat_callback()) ->
32            heartbeaters().
33
34-spec start
35        (pid(), rabbit_net:socket(), rabbit_types:proc_name(),
36         heartbeat_timeout(), heartbeat_callback(), heartbeat_timeout(),
37         heartbeat_callback()) ->
38            heartbeaters().
39
40-spec start_heartbeat_sender
41        (rabbit_net:socket(), heartbeat_timeout(), heartbeat_callback(),
42         rabbit_types:proc_type_and_name()) ->
43            rabbit_types:ok(pid()).
44-spec start_heartbeat_receiver
45        (rabbit_net:socket(), heartbeat_timeout(), heartbeat_callback(),
46         rabbit_types:proc_type_and_name()) ->
47            rabbit_types:ok(pid()).
48
49-spec pause_monitor(heartbeaters()) -> 'ok'.
50-spec resume_monitor(heartbeaters()) -> 'ok'.
51
52-spec system_code_change(_,_,_,_) -> {'ok',_}.
53-spec system_continue(_,_,{_, _}) -> any().
54-spec system_terminate(_,_,_,_) -> no_return().
55
56%%----------------------------------------------------------------------------
57start(SupPid, Sock, SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) ->
58    start(SupPid, Sock, unknown,
59          SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun).
60
61start(SupPid, Sock, Identity,
62      SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) ->
63    {ok, Sender} =
64        start_heartbeater(SendTimeoutSec, SupPid, Sock,
65                          SendFun, heartbeat_sender,
66                          start_heartbeat_sender, Identity),
67    {ok, Receiver} =
68        start_heartbeater(ReceiveTimeoutSec, SupPid, Sock,
69                          ReceiveFun, heartbeat_receiver,
70                          start_heartbeat_receiver, Identity),
71    {Sender, Receiver}.
72
73start_heartbeat_sender(Sock, TimeoutSec, SendFun, Identity) ->
74    %% the 'div 2' is there so that we don't end up waiting for nearly
75    %% 2 * TimeoutSec before sending a heartbeat in the boundary case
76    %% where the last message was sent just after a heartbeat.
77    heartbeater({Sock, TimeoutSec * 1000 div 2, send_oct, 0,
78                 fun () -> SendFun(), continue end}, Identity).
79
80start_heartbeat_receiver(Sock, TimeoutSec, ReceiveFun, Identity) ->
81    %% we check for incoming data every interval, and time out after
82    %% two checks with no change. As a result we will time out between
83    %% 2 and 3 intervals after the last data has been received.
84    heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1,
85                 fun () -> ReceiveFun(), stop end}, Identity).
86
87pause_monitor({_Sender,     none}) -> ok;
88pause_monitor({_Sender, Receiver}) -> Receiver ! pause, ok.
89
90resume_monitor({_Sender,     none}) -> ok;
91resume_monitor({_Sender, Receiver}) -> Receiver ! resume, ok.
92
93system_continue(_Parent, Deb, {Params, State}) ->
94    heartbeater(Params, Deb, State).
95
96system_terminate(Reason, _Parent, _Deb, _State) ->
97    exit(Reason).
98
99system_code_change(Misc, _Module, _OldVsn, _Extra) ->
100    {ok, Misc}.
101
102%%----------------------------------------------------------------------------
103start_heartbeater(0, _SupPid, _Sock, _TimeoutFun, _Name, _Callback,
104                  _Identity) ->
105    {ok, none};
106start_heartbeater(TimeoutSec, SupPid, Sock, TimeoutFun, Name, Callback,
107                  Identity) ->
108    supervisor2:start_child(
109      SupPid, {Name,
110               {rabbit_heartbeat, Callback,
111                [Sock, TimeoutSec, TimeoutFun, {Name, Identity}]},
112               transient, ?WORKER_WAIT, worker, [rabbit_heartbeat]}).
113
114heartbeater(Params, Identity) ->
115    Deb = sys:debug_options([]),
116    {ok, proc_lib:spawn_link(fun () ->
117                                     rabbit_misc:store_proc_name(Identity),
118                                     heartbeater(Params, Deb, {0, 0})
119                             end)}.
120
121heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params,
122            Deb, {StatVal0, SameCount} = State) ->
123    Recurse = fun (State1) -> heartbeater(Params, Deb, State1) end,
124    System  = fun (From, Req) ->
125                      sys:handle_system_msg(
126                        Req, From, self(), ?MODULE, Deb, {Params, State})
127              end,
128    receive
129        pause ->
130            receive
131                resume              -> Recurse({0, 0});
132                {system, From, Req} -> System(From, Req);
133                Other               -> exit({unexpected_message, Other})
134            end;
135        {system, From, Req} ->
136            System(From, Req);
137        Other ->
138            exit({unexpected_message, Other})
139    after TimeoutMillisec ->
140              OkFun = fun(StatVal1) ->
141                              if StatVal0 =:= 0 andalso StatName =:= send_oct ->
142                                     % Note: this clause is necessary to ensure the
143                                     % first RMQ -> client heartbeat is sent at the
144                                     % first interval, instead of waiting the first
145                                     % two intervals
146                                     {run_handler, {StatVal1, 0}};
147                                 StatVal1 =/= StatVal0 ->
148                                     {recurse, {StatVal1, 0}};
149                                 SameCount < Threshold ->
150                                     {recurse, {StatVal1, SameCount +1}};
151                                 true ->
152                                     {run_handler, {StatVal1, 0}}
153                              end
154                      end,
155              SSResult = get_sock_stats(Sock, StatName, OkFun),
156              handle_get_sock_stats(SSResult, Sock, StatName, Recurse, Handler)
157    end.
158
159handle_get_sock_stats(stop, _Sock, _StatName, _Recurse, _Handler) ->
160    ok;
161handle_get_sock_stats({recurse, RecurseArg}, _Sock, _StatName, Recurse, _Handler) ->
162    Recurse(RecurseArg);
163handle_get_sock_stats({run_handler, {_, SameCount}}, Sock, StatName, Recurse, Handler) ->
164    case Handler() of
165        stop     -> ok;
166        continue ->
167            OkFun = fun(StatVal) ->
168                            {recurse, {StatVal, SameCount}}
169                    end,
170            SSResult = get_sock_stats(Sock, StatName, OkFun),
171            handle_get_sock_stats(SSResult, Sock, StatName, Recurse, Handler)
172    end.
173
174get_sock_stats(Sock, StatName, OkFun) ->
175    case rabbit_net:getstat(Sock, [StatName]) of
176        {ok, [{StatName, StatVal}]} ->
177            OkFun(StatVal);
178        {error, einval} ->
179            %% the socket is dead, most likely because the
180            %% connection is being shut down -> terminate
181            stop;
182        {error, Reason} ->
183            exit({cannot_get_socket_stats, Reason})
184    end.
185