1%%
2%% %CopyrightBegin%
3%%
4%% Copyright Ericsson AB 2010-2017. All Rights Reserved.
5%%
6%% Licensed under the Apache License, Version 2.0 (the "License");
7%% you may not use this file except in compliance with the License.
8%% You may obtain a copy of the License at
9%%
10%%     http://www.apache.org/licenses/LICENSE-2.0
11%%
12%% Unless required by applicable law or agreed to in writing, software
13%% distributed under the License is distributed on an "AS IS" BASIS,
14%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15%% See the License for the specific language governing permissions and
16%% limitations under the License.
17%%
18%% %CopyrightEnd%
19%%
20
21%%
22%% This module implements (as a process) the state machine documented
23%% in Appendix A of RFC 3539.
24%%
25
26-module(diameter_watchdog).
27-behaviour(gen_server).
28
29%% towards diameter_service
30-export([start/2]).
31
32%% gen_server callbacks
33-export([init/1,
34         handle_call/3,
35         handle_cast/2,
36         handle_info/2,
37         terminate/2,
38         code_change/3]).
39
40%% diameter_watchdog_sup callback
41-export([start_link/1]).
42
43-include_lib("diameter/include/diameter.hrl").
44-include("diameter_internal.hrl").
45
46-define(DEFAULT_TW_INIT, 30000). %% RFC 3539 ch 3.4.1
47-define(NOMASK, {0,32}).  %% default sequence mask
48
49-define(BASE, ?DIAMETER_DICT_COMMON).
50
51-define(IS_NATURAL(N), (is_integer(N) andalso 0 =< N)).
52
53-record(watchdog,
54        {%% PCB - Peer Control Block; see RFC 3539, Appendix A
55         status = initial :: initial | okay | suspect | down | reopen,
56         pending = false  :: boolean(),  %% DWA
57         tw :: 6000..16#FFFFFFFF | {module(), atom(), list()},
58                                %% {M,F,A} -> integer() >= 0
59         num_dwa = 0 :: -1 | non_neg_integer(),
60                     %% number of DWAs received in reopen,
61                     %% or number of timeouts before okay -> suspect
62         %% end PCB
63         parent = self() :: pid(),              %% service process
64         transport       :: pid() | undefined,  %% peer_fsm process
65         tref :: reference()      %% reference for current watchdog timer
66               | integer()        %% monotonic time
67               | undefined,
68         dictionary :: module(),  %% common dictionary
69         receive_data :: term(),  %% term passed with incoming message
70         config :: #{sequence := diameter:sequence(),  %% mask
71                     restrict_connections := diameter:restriction(),
72                     restrict := boolean(),
73                     suspect := non_neg_integer(), %% OKAY -> SUSPECT
74                     okay := non_neg_integer()},   %% REOPEN -> OKAY
75         codec :: #{decode_format := none,
76                    string_decode := false,
77                    strict_mbit := boolean(),
78                    rfc := 3588 | 6733,
79                    ordered_encode := false},
80         shutdown = false :: boolean()}).
81
82%% ---------------------------------------------------------------------------
83%% start/2
84%%
85%% Start a monitor before the watchdog is allowed to proceed to ensure
86%% that a failed capabilities exchange produces the desired exit
87%% reason.
88%% ---------------------------------------------------------------------------
89
90-spec start(Type, {[Opt], SvcOpts, RecvData, #diameter_service{}})
91   -> {reference(), pid()}
92 when Type :: {connect|accept, diameter:transport_ref()},
93      Opt :: diameter:transport_opt(),
94      SvcOpts :: map(),
95      RecvData :: term().
96
97start({_,_} = Type, T) ->
98    Ack = make_ref(),
99    {ok, Pid} = diameter_watchdog_sup:start_child({Ack, Type, self(), T}),
100    try
101        {monitor(process, Pid), Pid}
102    after
103        send(Pid, Ack)
104    end.
105
106start_link(T) ->
107    {ok, _} = proc_lib:start_link(?MODULE,
108                                  init,
109                                  [T],
110                                  infinity,
111                                  diameter_lib:spawn_opts(server, [])).
112
113%% ===========================================================================
114%% ===========================================================================
115
116%% init/1
117
118init(T) ->
119    proc_lib:init_ack({ok, self()}),
120    gen_server:enter_loop(?MODULE, [], i(T)).
121
122i({Ack, T, Pid, {Opts,
123                 #{restrict_connections := Restrict}
124                 = SvcOpts0,
125                 RecvData,
126                 #diameter_service{applications = Apps,
127                                   capabilities = Caps}
128                 = Svc}}) ->
129    monitor(process, Pid),
130    wait(Ack, Pid),
131
132    Dict0 = common_dictionary(Apps),
133    SvcOpts = SvcOpts0#{rfc => rfc(Dict0)},
134    putr(restart, {T, Opts, Svc, SvcOpts}),  %% save seeing it in trace
135    putr(dwr, dwr(Caps)),                    %%
136    Nodes = restrict_nodes(Restrict),
137    #watchdog{parent = Pid,
138              transport = start(T, Opts, SvcOpts, Nodes, Dict0, Svc),
139              tw = proplists:get_value(watchdog_timer,
140                                       Opts,
141                                       ?DEFAULT_TW_INIT),
142              receive_data = RecvData,
143              dictionary = Dict0,
144              config = maps:with([sequence,
145                                  restrict_connections,
146                                  restrict,
147                                  suspect,
148                                  okay],
149                                 config(SvcOpts#{restrict => restrict(Nodes),
150                                                 suspect => 1,
151                                                 okay => 3},
152                                        Opts)),
153              codec = maps:with([decode_format,
154                                 strict_mbit,
155                                 string_decode,
156                                 rfc,
157                                 ordered_encode],
158                                SvcOpts#{decode_format := none,
159                                         string_decode := false,
160                                         ordered_encode => false})}.
161
162wait(Ref, Pid) ->
163    receive
164        Ref ->
165            ok;
166        {'DOWN', _, process, Pid, _} = D ->
167            exit({shutdown, D})
168    end.
169
170%% Regard anything but the generated RFC 3588 dictionary as modern.
171%% This affects the interpretation of defaults during the decode
172%% of values of type DiameterURI, this having changed from RFC 3588.
173%% (So much for backwards compatibility.)
174rfc(?BASE) ->
175    3588;
176rfc(_) ->
177    6733.
178
179%% config/2
180%%
181%% Could also configure counts for SUSPECT to DOWN and REOPEN to DOWN,
182%% but don't.
183
184config(Map, Opts) ->
185    Config = proplists:get_value(watchdog_config, Opts, []),
186    lists:foldl(fun cfg/2, Map, Config).
187
188cfg({suspect, N}, Map)
189  when ?IS_NATURAL(N) ->
190    Map#{suspect := N};
191
192cfg({okay, N}, Map)
193  when ?IS_NATURAL(N) ->
194    Map#{okay := N}.
195
196%% start/6
197
198start(T, Opts, SvcOpts, Nodes, Dict0, Svc) ->
199    {_MRef, Pid}
200        = diameter_peer_fsm:start(T, Opts, {SvcOpts, Nodes, Dict0, Svc}),
201    Pid.
202
203%% common_dictionary/1
204%%
205%% Determine the dictionary of the Diameter common application with
206%% Application Id 0. Fail on config errors.
207
208common_dictionary(Apps) ->
209    case
210        orddict:fold(fun dict0/3,
211                     false,
212                     lists:foldl(fun(#diameter_app{dictionary = M}, D) ->
213                                         orddict:append(M:id(), M, D)
214                                 end,
215                                 orddict:new(),
216                                 Apps))
217    of
218        {value, Mod} ->
219            Mod;
220        false ->
221            %% A transport should configure a common dictionary but
222            %% don't require it. Not configuring a common dictionary
223            %% means a user won't be able either send of receive
224            %% messages in the common dictionary: incoming request
225            %% will be answered with 3007 and outgoing requests cannot
226            %% be sent. The dictionary returned here is only used for
227            %% messages diameter sends and receives: CER/CEA, DPR/DPA
228            %% and DWR/DWA.
229            ?BASE
230    end.
231
232%% Each application should be represented by a single dictionary.
233dict0(Id, [_,_|_] = Ms, _) ->
234    config_error({multiple_dictionaries, Ms, {application_id, Id}});
235
236%% An explicit common dictionary.
237dict0(?APP_ID_COMMON, [Mod], _) ->
238    {value, Mod};
239
240%% A pure relay, in which case the common application is implicit.
241%% This uses the fact that the common application will already have
242%% been folded.
243dict0(?APP_ID_RELAY, _, false) ->
244    {value, ?BASE};
245
246dict0(_, _, Acc) ->
247    Acc.
248
249config_error(T) ->
250    exit({shutdown, {configuration_error, T}}).
251
252%% handle_call/3
253
254handle_call(_, _, State) ->
255    {reply, nok, State}.
256
257%% handle_cast/2
258
259handle_cast(_, State) ->
260    {noreply, State}.
261
262%% handle_info/2
263
264handle_info(T, #watchdog{} = State) ->
265    case transition(T, State) of
266        ok ->
267            {noreply, State};
268        #watchdog{} = S ->
269            close(T, State),     %% service expects 'close' message
270            event(T, State, S),  %%   before 'watchdog'
271            {noreply, S};
272        stop ->
273            ?LOG(stop, truncate(T)),
274            event(T, State, State#watchdog{status = down}),
275            {stop, {shutdown, T}, State}
276    end.
277
278truncate({'DOWN' = T, _, process, Pid, _}) ->
279    {T, Pid};
280truncate(T) ->
281    T.
282
283close({'DOWN', _, process, TPid, {shutdown, Reason}},
284      #watchdog{transport = TPid,
285                parent = Pid}) ->
286    send(Pid, {close, self(), Reason});
287
288close(_, _) ->
289    ok.
290
291event(_,
292      #watchdog{status = From, transport = F},
293      #watchdog{status = To, transport = T})
294  when F == undefined, T == undefined;  %% transport not started
295       From == initial, To == down;     %% never really left INITIAL
296       From == To ->                    %% no state transition
297    ok;
298%% Note that there is no INITIAL -> DOWN transition in RFC 3539: ours
299%% is just a consequence of stop.
300
301event(Msg,
302      #watchdog{status = From, transport = F, parent = Pid},
303      #watchdog{status = To, transport = T}) ->
304    TPid = tpid(F,T),
305    E = {[TPid | data(Msg, TPid, From, To)], From, To},
306    send(Pid, {watchdog, self(), E}),
307    ?LOG(transition, {From, To}).
308
309data(Msg, TPid, reopen, okay) ->
310    {recv, TPid, _, 'DWA', _Pkt} = Msg,  %% assert
311    {TPid, T} = eraser(open),
312    [T];
313
314data({open, TPid, _Hosts, T}, TPid, _From, To)
315  when To == okay;
316       To == reopen ->
317    [T];
318
319data(_, _, _, _) ->
320    [].
321
322tpid(_, Pid)
323  when is_pid(Pid) ->
324    Pid;
325
326tpid(Pid, _) ->
327    Pid.
328
329%% send/2
330
331send(Pid, T) ->
332    Pid ! T.
333
334%% terminate/2
335
336terminate(_, _) ->
337    ok.
338
339%% code_change/3
340
341code_change(_, State, _) ->
342    {ok, State}.
343
344%% ===========================================================================
345%% ===========================================================================
346
347%% transition/2
348%%
349%% The state transitions documented here are extracted from RFC 3539,
350%% the commentary is ours.
351
352%% Service is telling the watchdog of an accepting transport to die
353%% following transport death in state INITIAL, or after connect_timer
354%% expiry; or another watchdog is saying the same after reestablishing
355%% a connection previously had by this one.
356transition(close, #watchdog{}) ->
357    {accept, _} = role(), %% assert
358    stop;
359
360%% Service is asking for the peer to be taken down gracefully.
361transition({shutdown, Pid, _}, #watchdog{parent = Pid,
362                                         transport = undefined}) ->
363    stop;
364transition({shutdown = T, Pid, Reason}, #watchdog{parent = Pid,
365                                                  transport = TPid}
366                                        = S) ->
367    send(TPid, {T, self(), Reason}),
368    S#watchdog{shutdown = true};
369
370%% Transport is telling us that DPA has been sent in response to DPR,
371%% or that DPR has been explicitly sent: transport death should lead
372%% to ours.
373transition({'DPR', TPid}, #watchdog{transport = TPid} = S) ->
374    S#watchdog{shutdown = true};
375
376%% Parent process has died,
377transition({'DOWN', _, process, Pid, _Reason},
378           #watchdog{parent = Pid}) ->
379    stop;
380
381%% Transport has accepted a connection.
382transition({accepted = T, TPid}, #watchdog{transport = TPid,
383                                           parent = Pid}) ->
384    send(Pid, {T, self(), TPid}),
385    ok;
386
387%%   STATE         Event                Actions              New State
388%%   =====         ------               -------              ----------
389%%   INITIAL       Connection up        SetWatchdog()        OKAY
390
391%% By construction, the watchdog timer isn't set until we move into
392%% state okay as the result of the Peer State Machine reaching the
393%% Open state.
394%%
395%% If we're accepting then we may be resuming a connection that went
396%% down in another watchdog process, in which case this is the
397%% transition below, from down to reopen. That is, it's not until we
398%% know the identity of the peer (ie. now) that we know that we're in
399%% state down rather than initial.
400
401transition({open, TPid, Hosts, _} = Open,
402           #watchdog{transport = TPid,
403                     status = initial,
404                     config = #{restrict := R,
405                                suspect := OS}}
406           = S) ->
407    case okay(role(), Hosts, R) of
408        okay ->
409            set_watchdog(S#watchdog{status = okay,
410                                    num_dwa = OS});
411        reopen ->
412            transition(Open, S#watchdog{status = down})
413    end;
414
415%%   DOWN          Connection up        NumDWA = 0
416%%                                      SendWatchdog()
417%%                                      SetWatchdog()
418%%                                      Pending = TRUE       REOPEN
419
420transition({open = Key, TPid, _Hosts, T},
421           #watchdog{transport = TPid,
422                     status = down,
423                     config = #{suspect := OS,
424                                okay := RO}}
425           = S) ->
426    case RO of
427        0 ->  %% non-standard: skip REOPEN
428            set_watchdog(S#watchdog{status = okay,
429                                    num_dwa = OS});
430        _ ->
431            %% Store the info we need to notify the parent to reopen
432            %% the connection after the requisite DWA's are received,
433            %% at which time we eraser(open).
434            putr(Key, {TPid, T}),
435            set_watchdog(send_watchdog(S#watchdog{status = reopen,
436                                                  num_dwa = 0}))
437    end;
438
439%%   OKAY          Connection down      CloseConnection()
440%%                                      Failover()
441%%                                      SetWatchdog()        DOWN
442%%   SUSPECT       Connection down      CloseConnection()
443%%                                      SetWatchdog()        DOWN
444%%   REOPEN        Connection down      CloseConnection()
445%%                                      SetWatchdog()        DOWN
446
447%% Transport has died after DPA or service requested termination ...
448transition({'DOWN', _, process, TPid, _Reason},
449           #watchdog{transport = TPid,
450                     shutdown = true}) ->
451    stop;
452
453%% ... or not.
454transition({'DOWN', _, process, TPid, _Reason} = D,
455           #watchdog{transport = TPid,
456                     status = T,
457                     config = #{restrict := R}}
458           = S0) ->
459    S = S0#watchdog{pending = false,
460                    transport = undefined},
461    {M,_} = role(),
462
463    %% Close an accepting watchdog immediately if there's no
464    %% restriction on the number of connections to the same peer: the
465    %% state machine never enters state REOPEN in this case.
466
467    if T == initial;
468       M == accept, not R ->
469            close(D, S0),
470            stop;
471       true ->
472            set_watchdog(S#watchdog{status = down})
473    end;
474
475%% Incoming message.
476transition({recv, TPid, Route, Name, Pkt},
477           #watchdog{transport = TPid}
478           = S) ->
479    try incoming(Route, Name, Pkt, S) of
480        #watchdog{dictionary = Dict0, receive_data = T} = NS ->
481            diameter_traffic:receive_message(TPid, Route, Pkt, Dict0, T),
482            NS
483    catch
484        #watchdog{} = NS ->
485            NS
486    end;
487
488%% Current watchdog has timed out.
489transition({timeout, TRef, tw}, #watchdog{tref = TRef} = S) ->
490    set_watchdog(0, timeout(S));
491
492%% Message has arrived since the timer was started: subtract time
493%% already elapsed from new timer.
494transition({timeout, _, tw}, #watchdog{tref = T0} = S) ->
495    set_watchdog(diameter_lib:micro_diff(T0) div 1000, S);
496
497%% State query.
498transition({state, Pid}, #watchdog{status = S}) ->
499    send(Pid, {self(), S}),
500    ok.
501
502%% ===========================================================================
503
504putr(Key, Val) ->
505    put({?MODULE, Key}, Val).
506
507getr(Key) ->
508    get({?MODULE, Key}).
509
510eraser(Key) ->
511    erase({?MODULE, Key}).
512
513%% encode/4
514
515encode(dwr = M, Dict0, Opts, Mask) ->
516    Msg = getr(M),
517    Seq = diameter_session:sequence(Mask),
518    Hdr = #diameter_header{version = ?DIAMETER_VERSION,
519                           end_to_end_id = Seq,
520                           hop_by_hop_id = Seq},
521    Pkt = #diameter_packet{header = Hdr,
522                           msg = Msg},
523    diameter_codec:encode(Dict0, Opts, Pkt);
524
525encode(dwa, Dict0, Opts, #diameter_packet{header = H, transport_data = TD}
526                         = ReqPkt) ->
527    AnsPkt = #diameter_packet{header
528                              = H#diameter_header{is_request = false,
529                                                  is_error = undefined,
530                                                  is_retransmitted = false},
531                              msg = dwa(ReqPkt),
532                              transport_data = TD},
533
534    diameter_codec:encode(Dict0, Opts, AnsPkt).
535
536%% okay/3
537
538okay({accept, Ref}, Hosts, Restrict) ->
539    T = {?MODULE, connection, Ref, Hosts},
540    diameter_reg:add(T),
541    if Restrict ->
542            okay(diameter_reg:match(T));
543       true ->
544            okay
545    end;
546%% Register before matching so that at least one of two registering
547%% processes will match the other.
548
549okay({connect, _}, _, _) ->
550    okay.
551
552%% okay/2
553
554%% The peer hasn't been connected recently ...
555okay([{_,P}]) ->
556    P = self(),  %% assert
557    okay;
558
559%% ... or it has.
560okay(C) ->
561    [_|_] = [send(P, close) || {_,P} <- C, self() /= P],
562    reopen.
563
564%% role/0
565
566role() ->
567    element(1, getr(restart)).
568
569%% set_watchdog/1
570
571%% Timer not yet set.
572set_watchdog(#watchdog{tref = undefined} = S) ->
573    set_watchdog(0, S);
574
575%% Timer already set: start at new one only at expiry.
576set_watchdog(#watchdog{} = S) ->
577    S#watchdog{tref = diameter_lib:now()}.
578
579%% set_watchdog/2
580
581set_watchdog(_, stop = No) ->
582    No;
583
584set_watchdog(Ms, #watchdog{tw = TwInit} = S) ->
585    S#watchdog{tref = erlang:start_timer(tw(TwInit, Ms), self(), tw)}.
586
587%% A callback could return anything, so ensure the result isn't
588%% negative. Don't prevent abuse, even though the smallest valid
589%% timeout is 4000.
590tw(TwInit, Ms) ->
591    max(tw(TwInit) - Ms, 0).
592
593tw(T)
594  when is_integer(T), T >= 6000 ->
595    T - 2000 + (rand:uniform(4001) - 1); %% RFC3539 jitter of +/- 2 sec.
596tw({M,F,A}) ->
597    apply(M,F,A).
598
599%% send_watchdog/1
600
601send_watchdog(#watchdog{pending = false,
602                        transport = TPid,
603                        dictionary = Dict0,
604                        config = #{sequence := Mask},
605                        codec = Opts}
606              = S) ->
607    #diameter_packet{bin = Bin} = EPkt = encode(dwr, Dict0, Opts, Mask),
608    diameter_traffic:incr(send, EPkt, TPid, Dict0),
609    send(TPid, {send, Bin}),
610    ?LOG(send, 'DWR'),
611    S#watchdog{pending = true}.
612
613%% Don't count encode errors since we don't expect any on DWR/DWA.
614
615%% incoming/4
616
617incoming(Route, Name, Pkt, S) ->
618    try rcv(Name, S) of
619        NS -> rcv(Name, Pkt, NS)
620    catch
621        #watchdog{transport = TPid} = NS when Route ->  %% incoming request
622            send(TPid, {send, false}),                  %%    requiring ack
623            throw(NS)
624    end.
625
626%% rcv/3
627
628rcv('DWR', Pkt, #watchdog{transport = TPid,
629                          dictionary = Dict0,
630                          codec = Opts}
631                = S) ->
632    ?LOG(recv, 'DWR'),
633    DPkt = diameter_codec:decode(Dict0, Opts, Pkt),
634    diameter_traffic:incr(recv, DPkt, TPid, Dict0),
635    diameter_traffic:incr_error(recv, DPkt, TPid, Dict0),
636    #diameter_packet{header = H,
637                     transport_data = T,
638                     bin = Bin}
639        = EPkt
640        = encode(dwa, Dict0, Opts, Pkt),
641    diameter_traffic:incr(send, EPkt, TPid, Dict0),
642    diameter_traffic:incr_rc(send, EPkt, TPid, Dict0),
643
644    %% Strip potentially large message terms.
645    send(TPid, {send, #diameter_packet{header = H,
646                                       transport_data = T,
647                                       bin = Bin}}),
648    ?LOG(send, 'DWA'),
649    throw(S);
650
651rcv('DWA', Pkt, #watchdog{transport = TPid,
652                          dictionary = Dict0,
653                          codec = Opts}
654                = S) ->
655    ?LOG(recv, 'DWA'),
656    diameter_traffic:incr(recv, Pkt, TPid, Dict0),
657    diameter_traffic:incr_rc(recv,
658                             diameter_codec:decode(Dict0, Opts, Pkt),
659                             TPid,
660                             Dict0),
661    throw(S);
662
663rcv(N, _, S)
664  when N == 'CER';
665       N == 'CEA';
666       N == 'DPR' ->
667    throw(S);
668%% DPR can be sent explicitly with diameter:call/4. Only the
669%% corresponding DPAs arrive here.
670
671rcv(_, _, S)->
672    S.
673
674%% rcv/2
675%%
676%% The lack of Hop-by-Hop and End-to-End Identifiers checks in a
677%% received DWA is intentional. The purpose of the message is to
678%% demonstrate life but a peer that consistently bungles it by sending
679%% the wrong identifiers causes the connection to toggle between OPEN
680%% and SUSPECT, with failover and failback as result, despite there
681%% being no real problem with connectivity. Thus, relax and accept any
682%% incoming DWA as being in response to an outgoing DWR.
683
684%%   INITIAL       Receive DWA          Pending = FALSE
685%%                                      Throwaway()          INITIAL
686%%   INITIAL       Receive non-DWA      Throwaway()          INITIAL
687
688rcv('DWA', #watchdog{status = initial} = S) ->
689    throw(S#watchdog{pending = false});
690
691rcv(_, #watchdog{status = initial} = S) ->
692    throw(S);
693
694%%   DOWN          Receive DWA          Pending = FALSE
695%%                                      Throwaway()          DOWN
696%%   DOWN          Receive non-DWA      Throwaway()          DOWN
697
698rcv('DWA', #watchdog{status = down} = S) ->
699    throw(S#watchdog{pending = false});
700
701rcv(_, #watchdog{status = down} = S) ->
702    throw(S);
703
704%%   OKAY          Receive DWA          Pending = FALSE
705%%                                      SetWatchdog()        OKAY
706%%   OKAY          Receive non-DWA      SetWatchdog()        OKAY
707
708rcv('DWA', #watchdog{status = okay} = S) ->
709    set_watchdog(S#watchdog{pending = false});
710
711rcv(_, #watchdog{status = okay} = S) ->
712    set_watchdog(S);
713
714%%   SUSPECT       Receive DWA          Pending = FALSE
715%%                                      Failback()
716%%                                      SetWatchdog()        OKAY
717%%   SUSPECT       Receive non-DWA      Failback()
718%%                                      SetWatchdog()        OKAY
719
720rcv('DWA', #watchdog{status = suspect, config = #{suspect := OS}} = S) ->
721    set_watchdog(S#watchdog{status = okay,
722                            num_dwa = OS,
723                            pending = false});
724
725rcv(_, #watchdog{status = suspect, config = #{suspect := OS}} = S) ->
726    set_watchdog(S#watchdog{status = okay,
727                            num_dwa = OS});
728
729%%   REOPEN        Receive DWA &        Pending = FALSE
730%%                 NumDWA == 2          NumDWA++
731%%                                      Failback()           OKAY
732
733rcv('DWA', #watchdog{status = reopen,
734                     num_dwa = N,
735                     config = #{suspect := OS,
736                                okay := RO}}
737           = S)
738  when N+1 == RO ->
739    S#watchdog{status = okay,
740               num_dwa = OS,
741               pending = false};
742
743%%   REOPEN        Receive DWA &        Pending = FALSE
744%%                 NumDWA < 2           NumDWA++             REOPEN
745
746rcv('DWA', #watchdog{status = reopen,
747                     num_dwa = N}
748           = S) ->
749    S#watchdog{num_dwa = N+1,
750               pending = false};
751
752%%   REOPEN        Receive non-DWA      Throwaway()          REOPEN
753
754rcv('DWR', #watchdog{status = reopen} = S) ->
755    S;  %% ensure DWA: the RFC isn't explicit about answering
756
757rcv(_, #watchdog{status = reopen} = S) ->
758    throw(S).
759
760%% timeout/1
761%%
762%% The caller sets the watchdog on the return value.
763
764%%   OKAY          Timer expires &      SendWatchdog()
765%%                 !Pending             SetWatchdog()
766%%                                      Pending = TRUE       OKAY
767%%   REOPEN        Timer expires &      SendWatchdog()
768%%                 !Pending             SetWatchdog()
769%%                                      Pending = TRUE       REOPEN
770
771timeout(#watchdog{status = T,
772                  pending = false}
773        = S)
774  when T == okay;
775       T == reopen ->
776    send_watchdog(S);
777
778%%   OKAY          Timer expires &      Failover()
779%%                 Pending              SetWatchdog()        SUSPECT
780
781timeout(#watchdog{status = okay,
782                  pending = true,
783                  num_dwa = N}
784        = S) ->
785    case N of
786        1 ->
787            S#watchdog{status = suspect};
788        0 ->  %% non-standard: never move to suspect
789            S;
790        N ->  %% non-standard: more timeouts before moving
791            S#watchdog{num_dwa = N-1}
792    end;
793
794%%   SUSPECT       Timer expires        CloseConnection()
795%%                                      SetWatchdog()        DOWN
796%%   REOPEN        Timer expires &      CloseConnection()
797%%                 Pending &            SetWatchdog()
798%%                 NumDWA < 0                                DOWN
799
800timeout(#watchdog{status = T,
801                  pending = P,
802                  num_dwa = N,
803                  transport = TPid}
804        = S)
805  when T == suspect;
806       T == reopen, P, N < 0 ->
807    exit(TPid, {shutdown, watchdog_timeout}),
808    S#watchdog{status = down};
809
810%%   REOPEN        Timer expires &      NumDWA = -1
811%%                 Pending &            SetWatchdog()
812%%                 NumDWA >= 0                               REOPEN
813
814timeout(#watchdog{status = reopen,
815                  pending = true,
816                  num_dwa = N}
817        = S)
818  when 0 =< N ->
819    S#watchdog{num_dwa = -1};
820
821%%   DOWN          Timer expires        AttemptOpen()
822%%                                      SetWatchdog()        DOWN
823%%   INITIAL       Timer expires        AttemptOpen()
824%%                                      SetWatchdog()        INITIAL
825
826%% RFC 3539, 3.4.1:
827%%
828%%   [5] While the connection is in the closed state, the AAA client MUST
829%%       NOT attempt to send further watchdog messages on the connection.
830%%       However, after the connection is closed, the AAA client continues
831%%       to periodically attempt to reopen the connection.
832%%
833%%       The AAA client SHOULD wait for the transport layer to report
834%%       connection failure before attempting again, but MAY choose to
835%%       bound this wait time by the watchdog interval, Tw.
836
837%% Don't bound, restarting the peer process only when the previous
838%% process has died. We only need to handle state down since we start
839%% the first watchdog when transitioning out of initial.
840
841timeout(#watchdog{status = T} = S)
842  when T == initial;
843       T == down ->
844    restart(S).
845
846%% restart/1
847
848restart(#watchdog{transport = undefined} = S) ->
849    restart(getr(restart), S);
850restart(S) ->  %% reconnect has won race with timeout
851    S.
852
853%% restart/2
854%%
855%% Only restart the transport in the connecting case. For an accepting
856%% transport, there's no guarantee that an accepted connection in a
857%% restarted transport if from the peer we've lost contact with so
858%% have to be prepared for another watchdog to handle it. This is what
859%% the diameter_reg registration in this module is for: the peer
860%% connection is registered when leaving state initial and this is
861%% used by a new accepting watchdog to realize that it's actually in
862%% state down rather then initial when receiving notification of an
863%% open connection.
864
865restart({{connect, _} = T, Opts, Svc, SvcOpts},
866        #watchdog{parent = Pid,
867                  config = #{restrict_connections := R}
868                         = M,
869                  dictionary = Dict0}
870        = S) ->
871    send(Pid, {reconnect, self()}),
872    Nodes = restrict_nodes(R),
873    S#watchdog{transport = start(T, Opts, SvcOpts, Nodes, Dict0, Svc),
874               config = M#{restrict => restrict(Nodes)}};
875
876%% No restriction on the number of connections to the same peer: just
877%% die. Note that a state machine never enters state REOPEN in this
878%% case.
879restart({{accept, _}, _, _, _}, #watchdog{config = #{restrict := false}}) ->
880    stop;
881
882%% Otherwise hang around until told to die, either by the service or
883%% by another watchdog.
884restart({{accept, _}, _, _, _}, S) ->
885    S.
886
887%% Don't currently use Opts/Svc in the accept case.
888
889%% dwr/1
890
891dwr(#diameter_caps{origin_host = OH,
892                   origin_realm = OR,
893                   origin_state_id = OSI}) ->
894    ['DWR', {'Origin-Host', OH},
895            {'Origin-Realm', OR},
896            {'Origin-State-Id', OSI}].
897
898%% dwa/1
899
900dwa(#diameter_packet{header = H, errors = Es}) ->
901    {RC, FailedAVP} = diameter_peer_fsm:result_code(H, Es),
902    ['DWA', {'Result-Code', RC}
903          | tl(getr(dwr)) ++ FailedAVP].
904
905%% restrict_nodes/1
906
907restrict_nodes(false) ->
908    [];
909
910restrict_nodes(nodes) ->
911    [node() | nodes()];
912
913restrict_nodes(node) ->
914    [node()];
915
916restrict_nodes(Nodes)
917  when [] == Nodes;
918       is_atom(hd(Nodes)) ->
919    Nodes;
920
921restrict_nodes(F) ->
922    diameter_lib:eval(F).
923
924%% restrict/1
925
926restrict(Nodes) ->
927    lists:member(node(), Nodes).
928