1%% 2%% %CopyrightBegin% 3%% 4%% Copyright Ericsson AB 2010-2017. All Rights Reserved. 5%% 6%% Licensed under the Apache License, Version 2.0 (the "License"); 7%% you may not use this file except in compliance with the License. 8%% You may obtain a copy of the License at 9%% 10%% http://www.apache.org/licenses/LICENSE-2.0 11%% 12%% Unless required by applicable law or agreed to in writing, software 13%% distributed under the License is distributed on an "AS IS" BASIS, 14%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15%% See the License for the specific language governing permissions and 16%% limitations under the License. 17%% 18%% %CopyrightEnd% 19%% 20 21%% 22%% This module implements (as a process) the state machine documented 23%% in Appendix A of RFC 3539. 24%% 25 26-module(diameter_watchdog). 27-behaviour(gen_server). 28 29%% towards diameter_service 30-export([start/2]). 31 32%% gen_server callbacks 33-export([init/1, 34 handle_call/3, 35 handle_cast/2, 36 handle_info/2, 37 terminate/2, 38 code_change/3]). 39 40%% diameter_watchdog_sup callback 41-export([start_link/1]). 42 43-include_lib("diameter/include/diameter.hrl"). 44-include("diameter_internal.hrl"). 45 46-define(DEFAULT_TW_INIT, 30000). %% RFC 3539 ch 3.4.1 47-define(NOMASK, {0,32}). %% default sequence mask 48 49-define(BASE, ?DIAMETER_DICT_COMMON). 50 51-define(IS_NATURAL(N), (is_integer(N) andalso 0 =< N)). 52 53-record(watchdog, 54 {%% PCB - Peer Control Block; see RFC 3539, Appendix A 55 status = initial :: initial | okay | suspect | down | reopen, 56 pending = false :: boolean(), %% DWA 57 tw :: 6000..16#FFFFFFFF | {module(), atom(), list()}, 58 %% {M,F,A} -> integer() >= 0 59 num_dwa = 0 :: -1 | non_neg_integer(), 60 %% number of DWAs received in reopen, 61 %% or number of timeouts before okay -> suspect 62 %% end PCB 63 parent = self() :: pid(), %% service process 64 transport :: pid() | undefined, %% peer_fsm process 65 tref :: reference() %% reference for current watchdog timer 66 | integer() %% monotonic time 67 | undefined, 68 dictionary :: module(), %% common dictionary 69 receive_data :: term(), %% term passed with incoming message 70 config :: #{sequence := diameter:sequence(), %% mask 71 restrict_connections := diameter:restriction(), 72 restrict := boolean(), 73 suspect := non_neg_integer(), %% OKAY -> SUSPECT 74 okay := non_neg_integer()}, %% REOPEN -> OKAY 75 codec :: #{decode_format := none, 76 string_decode := false, 77 strict_mbit := boolean(), 78 rfc := 3588 | 6733, 79 ordered_encode := false}, 80 shutdown = false :: boolean()}). 81 82%% --------------------------------------------------------------------------- 83%% start/2 84%% 85%% Start a monitor before the watchdog is allowed to proceed to ensure 86%% that a failed capabilities exchange produces the desired exit 87%% reason. 88%% --------------------------------------------------------------------------- 89 90-spec start(Type, {[Opt], SvcOpts, RecvData, #diameter_service{}}) 91 -> {reference(), pid()} 92 when Type :: {connect|accept, diameter:transport_ref()}, 93 Opt :: diameter:transport_opt(), 94 SvcOpts :: map(), 95 RecvData :: term(). 96 97start({_,_} = Type, T) -> 98 Ack = make_ref(), 99 {ok, Pid} = diameter_watchdog_sup:start_child({Ack, Type, self(), T}), 100 try 101 {monitor(process, Pid), Pid} 102 after 103 send(Pid, Ack) 104 end. 105 106start_link(T) -> 107 {ok, _} = proc_lib:start_link(?MODULE, 108 init, 109 [T], 110 infinity, 111 diameter_lib:spawn_opts(server, [])). 112 113%% =========================================================================== 114%% =========================================================================== 115 116%% init/1 117 118init(T) -> 119 proc_lib:init_ack({ok, self()}), 120 gen_server:enter_loop(?MODULE, [], i(T)). 121 122i({Ack, T, Pid, {Opts, 123 #{restrict_connections := Restrict} 124 = SvcOpts0, 125 RecvData, 126 #diameter_service{applications = Apps, 127 capabilities = Caps} 128 = Svc}}) -> 129 monitor(process, Pid), 130 wait(Ack, Pid), 131 132 Dict0 = common_dictionary(Apps), 133 SvcOpts = SvcOpts0#{rfc => rfc(Dict0)}, 134 putr(restart, {T, Opts, Svc, SvcOpts}), %% save seeing it in trace 135 putr(dwr, dwr(Caps)), %% 136 Nodes = restrict_nodes(Restrict), 137 #watchdog{parent = Pid, 138 transport = start(T, Opts, SvcOpts, Nodes, Dict0, Svc), 139 tw = proplists:get_value(watchdog_timer, 140 Opts, 141 ?DEFAULT_TW_INIT), 142 receive_data = RecvData, 143 dictionary = Dict0, 144 config = maps:with([sequence, 145 restrict_connections, 146 restrict, 147 suspect, 148 okay], 149 config(SvcOpts#{restrict => restrict(Nodes), 150 suspect => 1, 151 okay => 3}, 152 Opts)), 153 codec = maps:with([decode_format, 154 strict_mbit, 155 string_decode, 156 rfc, 157 ordered_encode], 158 SvcOpts#{decode_format := none, 159 string_decode := false, 160 ordered_encode => false})}. 161 162wait(Ref, Pid) -> 163 receive 164 Ref -> 165 ok; 166 {'DOWN', _, process, Pid, _} = D -> 167 exit({shutdown, D}) 168 end. 169 170%% Regard anything but the generated RFC 3588 dictionary as modern. 171%% This affects the interpretation of defaults during the decode 172%% of values of type DiameterURI, this having changed from RFC 3588. 173%% (So much for backwards compatibility.) 174rfc(?BASE) -> 175 3588; 176rfc(_) -> 177 6733. 178 179%% config/2 180%% 181%% Could also configure counts for SUSPECT to DOWN and REOPEN to DOWN, 182%% but don't. 183 184config(Map, Opts) -> 185 Config = proplists:get_value(watchdog_config, Opts, []), 186 lists:foldl(fun cfg/2, Map, Config). 187 188cfg({suspect, N}, Map) 189 when ?IS_NATURAL(N) -> 190 Map#{suspect := N}; 191 192cfg({okay, N}, Map) 193 when ?IS_NATURAL(N) -> 194 Map#{okay := N}. 195 196%% start/6 197 198start(T, Opts, SvcOpts, Nodes, Dict0, Svc) -> 199 {_MRef, Pid} 200 = diameter_peer_fsm:start(T, Opts, {SvcOpts, Nodes, Dict0, Svc}), 201 Pid. 202 203%% common_dictionary/1 204%% 205%% Determine the dictionary of the Diameter common application with 206%% Application Id 0. Fail on config errors. 207 208common_dictionary(Apps) -> 209 case 210 orddict:fold(fun dict0/3, 211 false, 212 lists:foldl(fun(#diameter_app{dictionary = M}, D) -> 213 orddict:append(M:id(), M, D) 214 end, 215 orddict:new(), 216 Apps)) 217 of 218 {value, Mod} -> 219 Mod; 220 false -> 221 %% A transport should configure a common dictionary but 222 %% don't require it. Not configuring a common dictionary 223 %% means a user won't be able either send of receive 224 %% messages in the common dictionary: incoming request 225 %% will be answered with 3007 and outgoing requests cannot 226 %% be sent. The dictionary returned here is only used for 227 %% messages diameter sends and receives: CER/CEA, DPR/DPA 228 %% and DWR/DWA. 229 ?BASE 230 end. 231 232%% Each application should be represented by a single dictionary. 233dict0(Id, [_,_|_] = Ms, _) -> 234 config_error({multiple_dictionaries, Ms, {application_id, Id}}); 235 236%% An explicit common dictionary. 237dict0(?APP_ID_COMMON, [Mod], _) -> 238 {value, Mod}; 239 240%% A pure relay, in which case the common application is implicit. 241%% This uses the fact that the common application will already have 242%% been folded. 243dict0(?APP_ID_RELAY, _, false) -> 244 {value, ?BASE}; 245 246dict0(_, _, Acc) -> 247 Acc. 248 249config_error(T) -> 250 exit({shutdown, {configuration_error, T}}). 251 252%% handle_call/3 253 254handle_call(_, _, State) -> 255 {reply, nok, State}. 256 257%% handle_cast/2 258 259handle_cast(_, State) -> 260 {noreply, State}. 261 262%% handle_info/2 263 264handle_info(T, #watchdog{} = State) -> 265 case transition(T, State) of 266 ok -> 267 {noreply, State}; 268 #watchdog{} = S -> 269 close(T, State), %% service expects 'close' message 270 event(T, State, S), %% before 'watchdog' 271 {noreply, S}; 272 stop -> 273 ?LOG(stop, truncate(T)), 274 event(T, State, State#watchdog{status = down}), 275 {stop, {shutdown, T}, State} 276 end. 277 278truncate({'DOWN' = T, _, process, Pid, _}) -> 279 {T, Pid}; 280truncate(T) -> 281 T. 282 283close({'DOWN', _, process, TPid, {shutdown, Reason}}, 284 #watchdog{transport = TPid, 285 parent = Pid}) -> 286 send(Pid, {close, self(), Reason}); 287 288close(_, _) -> 289 ok. 290 291event(_, 292 #watchdog{status = From, transport = F}, 293 #watchdog{status = To, transport = T}) 294 when F == undefined, T == undefined; %% transport not started 295 From == initial, To == down; %% never really left INITIAL 296 From == To -> %% no state transition 297 ok; 298%% Note that there is no INITIAL -> DOWN transition in RFC 3539: ours 299%% is just a consequence of stop. 300 301event(Msg, 302 #watchdog{status = From, transport = F, parent = Pid}, 303 #watchdog{status = To, transport = T}) -> 304 TPid = tpid(F,T), 305 E = {[TPid | data(Msg, TPid, From, To)], From, To}, 306 send(Pid, {watchdog, self(), E}), 307 ?LOG(transition, {From, To}). 308 309data(Msg, TPid, reopen, okay) -> 310 {recv, TPid, _, 'DWA', _Pkt} = Msg, %% assert 311 {TPid, T} = eraser(open), 312 [T]; 313 314data({open, TPid, _Hosts, T}, TPid, _From, To) 315 when To == okay; 316 To == reopen -> 317 [T]; 318 319data(_, _, _, _) -> 320 []. 321 322tpid(_, Pid) 323 when is_pid(Pid) -> 324 Pid; 325 326tpid(Pid, _) -> 327 Pid. 328 329%% send/2 330 331send(Pid, T) -> 332 Pid ! T. 333 334%% terminate/2 335 336terminate(_, _) -> 337 ok. 338 339%% code_change/3 340 341code_change(_, State, _) -> 342 {ok, State}. 343 344%% =========================================================================== 345%% =========================================================================== 346 347%% transition/2 348%% 349%% The state transitions documented here are extracted from RFC 3539, 350%% the commentary is ours. 351 352%% Service is telling the watchdog of an accepting transport to die 353%% following transport death in state INITIAL, or after connect_timer 354%% expiry; or another watchdog is saying the same after reestablishing 355%% a connection previously had by this one. 356transition(close, #watchdog{}) -> 357 {accept, _} = role(), %% assert 358 stop; 359 360%% Service is asking for the peer to be taken down gracefully. 361transition({shutdown, Pid, _}, #watchdog{parent = Pid, 362 transport = undefined}) -> 363 stop; 364transition({shutdown = T, Pid, Reason}, #watchdog{parent = Pid, 365 transport = TPid} 366 = S) -> 367 send(TPid, {T, self(), Reason}), 368 S#watchdog{shutdown = true}; 369 370%% Transport is telling us that DPA has been sent in response to DPR, 371%% or that DPR has been explicitly sent: transport death should lead 372%% to ours. 373transition({'DPR', TPid}, #watchdog{transport = TPid} = S) -> 374 S#watchdog{shutdown = true}; 375 376%% Parent process has died, 377transition({'DOWN', _, process, Pid, _Reason}, 378 #watchdog{parent = Pid}) -> 379 stop; 380 381%% Transport has accepted a connection. 382transition({accepted = T, TPid}, #watchdog{transport = TPid, 383 parent = Pid}) -> 384 send(Pid, {T, self(), TPid}), 385 ok; 386 387%% STATE Event Actions New State 388%% ===== ------ ------- ---------- 389%% INITIAL Connection up SetWatchdog() OKAY 390 391%% By construction, the watchdog timer isn't set until we move into 392%% state okay as the result of the Peer State Machine reaching the 393%% Open state. 394%% 395%% If we're accepting then we may be resuming a connection that went 396%% down in another watchdog process, in which case this is the 397%% transition below, from down to reopen. That is, it's not until we 398%% know the identity of the peer (ie. now) that we know that we're in 399%% state down rather than initial. 400 401transition({open, TPid, Hosts, _} = Open, 402 #watchdog{transport = TPid, 403 status = initial, 404 config = #{restrict := R, 405 suspect := OS}} 406 = S) -> 407 case okay(role(), Hosts, R) of 408 okay -> 409 set_watchdog(S#watchdog{status = okay, 410 num_dwa = OS}); 411 reopen -> 412 transition(Open, S#watchdog{status = down}) 413 end; 414 415%% DOWN Connection up NumDWA = 0 416%% SendWatchdog() 417%% SetWatchdog() 418%% Pending = TRUE REOPEN 419 420transition({open = Key, TPid, _Hosts, T}, 421 #watchdog{transport = TPid, 422 status = down, 423 config = #{suspect := OS, 424 okay := RO}} 425 = S) -> 426 case RO of 427 0 -> %% non-standard: skip REOPEN 428 set_watchdog(S#watchdog{status = okay, 429 num_dwa = OS}); 430 _ -> 431 %% Store the info we need to notify the parent to reopen 432 %% the connection after the requisite DWA's are received, 433 %% at which time we eraser(open). 434 putr(Key, {TPid, T}), 435 set_watchdog(send_watchdog(S#watchdog{status = reopen, 436 num_dwa = 0})) 437 end; 438 439%% OKAY Connection down CloseConnection() 440%% Failover() 441%% SetWatchdog() DOWN 442%% SUSPECT Connection down CloseConnection() 443%% SetWatchdog() DOWN 444%% REOPEN Connection down CloseConnection() 445%% SetWatchdog() DOWN 446 447%% Transport has died after DPA or service requested termination ... 448transition({'DOWN', _, process, TPid, _Reason}, 449 #watchdog{transport = TPid, 450 shutdown = true}) -> 451 stop; 452 453%% ... or not. 454transition({'DOWN', _, process, TPid, _Reason} = D, 455 #watchdog{transport = TPid, 456 status = T, 457 config = #{restrict := R}} 458 = S0) -> 459 S = S0#watchdog{pending = false, 460 transport = undefined}, 461 {M,_} = role(), 462 463 %% Close an accepting watchdog immediately if there's no 464 %% restriction on the number of connections to the same peer: the 465 %% state machine never enters state REOPEN in this case. 466 467 if T == initial; 468 M == accept, not R -> 469 close(D, S0), 470 stop; 471 true -> 472 set_watchdog(S#watchdog{status = down}) 473 end; 474 475%% Incoming message. 476transition({recv, TPid, Route, Name, Pkt}, 477 #watchdog{transport = TPid} 478 = S) -> 479 try incoming(Route, Name, Pkt, S) of 480 #watchdog{dictionary = Dict0, receive_data = T} = NS -> 481 diameter_traffic:receive_message(TPid, Route, Pkt, Dict0, T), 482 NS 483 catch 484 #watchdog{} = NS -> 485 NS 486 end; 487 488%% Current watchdog has timed out. 489transition({timeout, TRef, tw}, #watchdog{tref = TRef} = S) -> 490 set_watchdog(0, timeout(S)); 491 492%% Message has arrived since the timer was started: subtract time 493%% already elapsed from new timer. 494transition({timeout, _, tw}, #watchdog{tref = T0} = S) -> 495 set_watchdog(diameter_lib:micro_diff(T0) div 1000, S); 496 497%% State query. 498transition({state, Pid}, #watchdog{status = S}) -> 499 send(Pid, {self(), S}), 500 ok. 501 502%% =========================================================================== 503 504putr(Key, Val) -> 505 put({?MODULE, Key}, Val). 506 507getr(Key) -> 508 get({?MODULE, Key}). 509 510eraser(Key) -> 511 erase({?MODULE, Key}). 512 513%% encode/4 514 515encode(dwr = M, Dict0, Opts, Mask) -> 516 Msg = getr(M), 517 Seq = diameter_session:sequence(Mask), 518 Hdr = #diameter_header{version = ?DIAMETER_VERSION, 519 end_to_end_id = Seq, 520 hop_by_hop_id = Seq}, 521 Pkt = #diameter_packet{header = Hdr, 522 msg = Msg}, 523 diameter_codec:encode(Dict0, Opts, Pkt); 524 525encode(dwa, Dict0, Opts, #diameter_packet{header = H, transport_data = TD} 526 = ReqPkt) -> 527 AnsPkt = #diameter_packet{header 528 = H#diameter_header{is_request = false, 529 is_error = undefined, 530 is_retransmitted = false}, 531 msg = dwa(ReqPkt), 532 transport_data = TD}, 533 534 diameter_codec:encode(Dict0, Opts, AnsPkt). 535 536%% okay/3 537 538okay({accept, Ref}, Hosts, Restrict) -> 539 T = {?MODULE, connection, Ref, Hosts}, 540 diameter_reg:add(T), 541 if Restrict -> 542 okay(diameter_reg:match(T)); 543 true -> 544 okay 545 end; 546%% Register before matching so that at least one of two registering 547%% processes will match the other. 548 549okay({connect, _}, _, _) -> 550 okay. 551 552%% okay/2 553 554%% The peer hasn't been connected recently ... 555okay([{_,P}]) -> 556 P = self(), %% assert 557 okay; 558 559%% ... or it has. 560okay(C) -> 561 [_|_] = [send(P, close) || {_,P} <- C, self() /= P], 562 reopen. 563 564%% role/0 565 566role() -> 567 element(1, getr(restart)). 568 569%% set_watchdog/1 570 571%% Timer not yet set. 572set_watchdog(#watchdog{tref = undefined} = S) -> 573 set_watchdog(0, S); 574 575%% Timer already set: start at new one only at expiry. 576set_watchdog(#watchdog{} = S) -> 577 S#watchdog{tref = diameter_lib:now()}. 578 579%% set_watchdog/2 580 581set_watchdog(_, stop = No) -> 582 No; 583 584set_watchdog(Ms, #watchdog{tw = TwInit} = S) -> 585 S#watchdog{tref = erlang:start_timer(tw(TwInit, Ms), self(), tw)}. 586 587%% A callback could return anything, so ensure the result isn't 588%% negative. Don't prevent abuse, even though the smallest valid 589%% timeout is 4000. 590tw(TwInit, Ms) -> 591 max(tw(TwInit) - Ms, 0). 592 593tw(T) 594 when is_integer(T), T >= 6000 -> 595 T - 2000 + (rand:uniform(4001) - 1); %% RFC3539 jitter of +/- 2 sec. 596tw({M,F,A}) -> 597 apply(M,F,A). 598 599%% send_watchdog/1 600 601send_watchdog(#watchdog{pending = false, 602 transport = TPid, 603 dictionary = Dict0, 604 config = #{sequence := Mask}, 605 codec = Opts} 606 = S) -> 607 #diameter_packet{bin = Bin} = EPkt = encode(dwr, Dict0, Opts, Mask), 608 diameter_traffic:incr(send, EPkt, TPid, Dict0), 609 send(TPid, {send, Bin}), 610 ?LOG(send, 'DWR'), 611 S#watchdog{pending = true}. 612 613%% Don't count encode errors since we don't expect any on DWR/DWA. 614 615%% incoming/4 616 617incoming(Route, Name, Pkt, S) -> 618 try rcv(Name, S) of 619 NS -> rcv(Name, Pkt, NS) 620 catch 621 #watchdog{transport = TPid} = NS when Route -> %% incoming request 622 send(TPid, {send, false}), %% requiring ack 623 throw(NS) 624 end. 625 626%% rcv/3 627 628rcv('DWR', Pkt, #watchdog{transport = TPid, 629 dictionary = Dict0, 630 codec = Opts} 631 = S) -> 632 ?LOG(recv, 'DWR'), 633 DPkt = diameter_codec:decode(Dict0, Opts, Pkt), 634 diameter_traffic:incr(recv, DPkt, TPid, Dict0), 635 diameter_traffic:incr_error(recv, DPkt, TPid, Dict0), 636 #diameter_packet{header = H, 637 transport_data = T, 638 bin = Bin} 639 = EPkt 640 = encode(dwa, Dict0, Opts, Pkt), 641 diameter_traffic:incr(send, EPkt, TPid, Dict0), 642 diameter_traffic:incr_rc(send, EPkt, TPid, Dict0), 643 644 %% Strip potentially large message terms. 645 send(TPid, {send, #diameter_packet{header = H, 646 transport_data = T, 647 bin = Bin}}), 648 ?LOG(send, 'DWA'), 649 throw(S); 650 651rcv('DWA', Pkt, #watchdog{transport = TPid, 652 dictionary = Dict0, 653 codec = Opts} 654 = S) -> 655 ?LOG(recv, 'DWA'), 656 diameter_traffic:incr(recv, Pkt, TPid, Dict0), 657 diameter_traffic:incr_rc(recv, 658 diameter_codec:decode(Dict0, Opts, Pkt), 659 TPid, 660 Dict0), 661 throw(S); 662 663rcv(N, _, S) 664 when N == 'CER'; 665 N == 'CEA'; 666 N == 'DPR' -> 667 throw(S); 668%% DPR can be sent explicitly with diameter:call/4. Only the 669%% corresponding DPAs arrive here. 670 671rcv(_, _, S)-> 672 S. 673 674%% rcv/2 675%% 676%% The lack of Hop-by-Hop and End-to-End Identifiers checks in a 677%% received DWA is intentional. The purpose of the message is to 678%% demonstrate life but a peer that consistently bungles it by sending 679%% the wrong identifiers causes the connection to toggle between OPEN 680%% and SUSPECT, with failover and failback as result, despite there 681%% being no real problem with connectivity. Thus, relax and accept any 682%% incoming DWA as being in response to an outgoing DWR. 683 684%% INITIAL Receive DWA Pending = FALSE 685%% Throwaway() INITIAL 686%% INITIAL Receive non-DWA Throwaway() INITIAL 687 688rcv('DWA', #watchdog{status = initial} = S) -> 689 throw(S#watchdog{pending = false}); 690 691rcv(_, #watchdog{status = initial} = S) -> 692 throw(S); 693 694%% DOWN Receive DWA Pending = FALSE 695%% Throwaway() DOWN 696%% DOWN Receive non-DWA Throwaway() DOWN 697 698rcv('DWA', #watchdog{status = down} = S) -> 699 throw(S#watchdog{pending = false}); 700 701rcv(_, #watchdog{status = down} = S) -> 702 throw(S); 703 704%% OKAY Receive DWA Pending = FALSE 705%% SetWatchdog() OKAY 706%% OKAY Receive non-DWA SetWatchdog() OKAY 707 708rcv('DWA', #watchdog{status = okay} = S) -> 709 set_watchdog(S#watchdog{pending = false}); 710 711rcv(_, #watchdog{status = okay} = S) -> 712 set_watchdog(S); 713 714%% SUSPECT Receive DWA Pending = FALSE 715%% Failback() 716%% SetWatchdog() OKAY 717%% SUSPECT Receive non-DWA Failback() 718%% SetWatchdog() OKAY 719 720rcv('DWA', #watchdog{status = suspect, config = #{suspect := OS}} = S) -> 721 set_watchdog(S#watchdog{status = okay, 722 num_dwa = OS, 723 pending = false}); 724 725rcv(_, #watchdog{status = suspect, config = #{suspect := OS}} = S) -> 726 set_watchdog(S#watchdog{status = okay, 727 num_dwa = OS}); 728 729%% REOPEN Receive DWA & Pending = FALSE 730%% NumDWA == 2 NumDWA++ 731%% Failback() OKAY 732 733rcv('DWA', #watchdog{status = reopen, 734 num_dwa = N, 735 config = #{suspect := OS, 736 okay := RO}} 737 = S) 738 when N+1 == RO -> 739 S#watchdog{status = okay, 740 num_dwa = OS, 741 pending = false}; 742 743%% REOPEN Receive DWA & Pending = FALSE 744%% NumDWA < 2 NumDWA++ REOPEN 745 746rcv('DWA', #watchdog{status = reopen, 747 num_dwa = N} 748 = S) -> 749 S#watchdog{num_dwa = N+1, 750 pending = false}; 751 752%% REOPEN Receive non-DWA Throwaway() REOPEN 753 754rcv('DWR', #watchdog{status = reopen} = S) -> 755 S; %% ensure DWA: the RFC isn't explicit about answering 756 757rcv(_, #watchdog{status = reopen} = S) -> 758 throw(S). 759 760%% timeout/1 761%% 762%% The caller sets the watchdog on the return value. 763 764%% OKAY Timer expires & SendWatchdog() 765%% !Pending SetWatchdog() 766%% Pending = TRUE OKAY 767%% REOPEN Timer expires & SendWatchdog() 768%% !Pending SetWatchdog() 769%% Pending = TRUE REOPEN 770 771timeout(#watchdog{status = T, 772 pending = false} 773 = S) 774 when T == okay; 775 T == reopen -> 776 send_watchdog(S); 777 778%% OKAY Timer expires & Failover() 779%% Pending SetWatchdog() SUSPECT 780 781timeout(#watchdog{status = okay, 782 pending = true, 783 num_dwa = N} 784 = S) -> 785 case N of 786 1 -> 787 S#watchdog{status = suspect}; 788 0 -> %% non-standard: never move to suspect 789 S; 790 N -> %% non-standard: more timeouts before moving 791 S#watchdog{num_dwa = N-1} 792 end; 793 794%% SUSPECT Timer expires CloseConnection() 795%% SetWatchdog() DOWN 796%% REOPEN Timer expires & CloseConnection() 797%% Pending & SetWatchdog() 798%% NumDWA < 0 DOWN 799 800timeout(#watchdog{status = T, 801 pending = P, 802 num_dwa = N, 803 transport = TPid} 804 = S) 805 when T == suspect; 806 T == reopen, P, N < 0 -> 807 exit(TPid, {shutdown, watchdog_timeout}), 808 S#watchdog{status = down}; 809 810%% REOPEN Timer expires & NumDWA = -1 811%% Pending & SetWatchdog() 812%% NumDWA >= 0 REOPEN 813 814timeout(#watchdog{status = reopen, 815 pending = true, 816 num_dwa = N} 817 = S) 818 when 0 =< N -> 819 S#watchdog{num_dwa = -1}; 820 821%% DOWN Timer expires AttemptOpen() 822%% SetWatchdog() DOWN 823%% INITIAL Timer expires AttemptOpen() 824%% SetWatchdog() INITIAL 825 826%% RFC 3539, 3.4.1: 827%% 828%% [5] While the connection is in the closed state, the AAA client MUST 829%% NOT attempt to send further watchdog messages on the connection. 830%% However, after the connection is closed, the AAA client continues 831%% to periodically attempt to reopen the connection. 832%% 833%% The AAA client SHOULD wait for the transport layer to report 834%% connection failure before attempting again, but MAY choose to 835%% bound this wait time by the watchdog interval, Tw. 836 837%% Don't bound, restarting the peer process only when the previous 838%% process has died. We only need to handle state down since we start 839%% the first watchdog when transitioning out of initial. 840 841timeout(#watchdog{status = T} = S) 842 when T == initial; 843 T == down -> 844 restart(S). 845 846%% restart/1 847 848restart(#watchdog{transport = undefined} = S) -> 849 restart(getr(restart), S); 850restart(S) -> %% reconnect has won race with timeout 851 S. 852 853%% restart/2 854%% 855%% Only restart the transport in the connecting case. For an accepting 856%% transport, there's no guarantee that an accepted connection in a 857%% restarted transport if from the peer we've lost contact with so 858%% have to be prepared for another watchdog to handle it. This is what 859%% the diameter_reg registration in this module is for: the peer 860%% connection is registered when leaving state initial and this is 861%% used by a new accepting watchdog to realize that it's actually in 862%% state down rather then initial when receiving notification of an 863%% open connection. 864 865restart({{connect, _} = T, Opts, Svc, SvcOpts}, 866 #watchdog{parent = Pid, 867 config = #{restrict_connections := R} 868 = M, 869 dictionary = Dict0} 870 = S) -> 871 send(Pid, {reconnect, self()}), 872 Nodes = restrict_nodes(R), 873 S#watchdog{transport = start(T, Opts, SvcOpts, Nodes, Dict0, Svc), 874 config = M#{restrict => restrict(Nodes)}}; 875 876%% No restriction on the number of connections to the same peer: just 877%% die. Note that a state machine never enters state REOPEN in this 878%% case. 879restart({{accept, _}, _, _, _}, #watchdog{config = #{restrict := false}}) -> 880 stop; 881 882%% Otherwise hang around until told to die, either by the service or 883%% by another watchdog. 884restart({{accept, _}, _, _, _}, S) -> 885 S. 886 887%% Don't currently use Opts/Svc in the accept case. 888 889%% dwr/1 890 891dwr(#diameter_caps{origin_host = OH, 892 origin_realm = OR, 893 origin_state_id = OSI}) -> 894 ['DWR', {'Origin-Host', OH}, 895 {'Origin-Realm', OR}, 896 {'Origin-State-Id', OSI}]. 897 898%% dwa/1 899 900dwa(#diameter_packet{header = H, errors = Es}) -> 901 {RC, FailedAVP} = diameter_peer_fsm:result_code(H, Es), 902 ['DWA', {'Result-Code', RC} 903 | tl(getr(dwr)) ++ FailedAVP]. 904 905%% restrict_nodes/1 906 907restrict_nodes(false) -> 908 []; 909 910restrict_nodes(nodes) -> 911 [node() | nodes()]; 912 913restrict_nodes(node) -> 914 [node()]; 915 916restrict_nodes(Nodes) 917 when [] == Nodes; 918 is_atom(hd(Nodes)) -> 919 Nodes; 920 921restrict_nodes(F) -> 922 diameter_lib:eval(F). 923 924%% restrict/1 925 926restrict(Nodes) -> 927 lists:member(node(), Nodes). 928