1%% 2%% %CopyrightBegin% 3%% 4%% Copyright Ericsson AB 2010-2018. All Rights Reserved. 5%% 6%% Licensed under the Apache License, Version 2.0 (the "License"); 7%% you may not use this file except in compliance with the License. 8%% You may obtain a copy of the License at 9%% 10%% http://www.apache.org/licenses/LICENSE-2.0 11%% 12%% Unless required by applicable law or agreed to in writing, software 13%% distributed under the License is distributed on an "AS IS" BASIS, 14%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15%% See the License for the specific language governing permissions and 16%% limitations under the License. 17%% 18%% %CopyrightEnd% 19%% 20 21%% 22%% Implements the process that represents a service. 23%% 24 25-module(diameter_service). 26-behaviour(gen_server). 27 28%% towards diameter_service_sup 29-export([start_link/1]). 30 31%% towards diameter 32-export([subscribe/1, 33 unsubscribe/1, 34 services/0, 35 peer_info/1, 36 info/2]). 37 38%% towards diameter_config 39-export([start/1, 40 stop/1, 41 start_transport/2, 42 stop_transport/2]). 43 44%% towards diameter_peer 45-export([notify/2]). 46 47%% towards diameter_traffic 48-export([find_incoming_app/4, 49 pick_peer/3]). 50 51%% test/debug 52-export([services/1, 53 subscriptions/1, 54 subscriptions/0, 55 call_module/3, 56 whois/1, 57 state/1, 58 uptime/1]). 59 60%% gen_server callbacks 61-export([init/1, 62 handle_call/3, 63 handle_cast/2, 64 handle_info/2, 65 terminate/2, 66 code_change/3]). 67 68-include_lib("diameter/include/diameter.hrl"). 69-include("diameter_internal.hrl"). 70 71%% RFC 3539 watchdog states. 72-define(WD_INITIAL, initial). 73-define(WD_OKAY, okay). 74-define(WD_SUSPECT, suspect). 75-define(WD_DOWN, down). 76-define(WD_REOPEN, reopen). 77 78-type wd_state() :: ?WD_INITIAL 79 | ?WD_OKAY 80 | ?WD_SUSPECT 81 | ?WD_DOWN 82 | ?WD_REOPEN. 83 84-define(DEFAULT_TC, 30000). %% RFC 3588 ch 2.1 85-define(RESTART_TC, 1000). %% if restart was this recent 86 87%% Maintain state in a table since a service's state is accessed 88%% outside of the service process. 89-define(STATE_TABLE, ?MODULE). 90 91%% Workaround for dialyzer's lack of understanding of match specs. 92-type match(T) 93 :: T | '_' | '$1' | '$2'. 94 95%% State of service gen_server. Note that the state term itself 96%% doesn't change, which is relevant for the stateless application 97%% callbacks since the state is retrieved from ?STATE_TABLE from 98%% outside the service process. The pid in the service record is used 99%% to determine whether or not we need to call the process for a 100%% pick_peer callback in the statefull case. 101-record(state, 102 {id = diameter_lib:now(), 103 service_name :: diameter:service_name(), %% key in ?STATE_TABLE 104 service :: #diameter_service{}, 105 watchdogT = ets_new(watchdogs) %% #watchdog{} at start 106 :: ets:tid(), 107 local :: {ets:tid(), ets:tid(), ets:tid()}, 108 remote :: {ets:tid(), ets:tid(), ets:tid()}, 109 monitor = false :: false | pid(), %% process to die with 110 options :: #{sequence := diameter:sequence(), %% sequence mask 111 share_peers := diameter:remotes(),%% broadcast to 112 use_shared_peers := diameter:remotes(),%% use from 113 restrict_connections := diameter:restriction(), 114 incoming_maxlen := diameter:message_length(), 115 strict_arities => diameter:strict_arities(), 116 strict_mbit := boolean(), 117 decode_format := diameter:decode_format(), 118 avp_dictionaries => nonempty_list(module()), 119 traffic_counters := boolean(), 120 string_decode := boolean(), 121 capabilities_cb => diameter:evaluable(), 122 pool_size => pos_integer(), 123 capx_timeout => diameter:'Unsigned32'(), 124 strict_capx => boolean(), 125 disconnect_cb => diameter:evaluable(), 126 dpr_timeout => diameter:'Unsigned32'(), 127 dpa_timeout => diameter:'Unsigned32'(), 128 length_errors => exit | handle | discard, 129 connect_timer => diameter:'Unsigned32'(), 130 watchdog_timer => diameter:'Unsigned32'() 131 | {module(), atom(), list()}, 132 watchdog_config => [{okay|suspect, non_neg_integer()}], 133 spawn_opt := list() | {module(), atom(), list()}}}). 134 135%% Record representing an RFC 3539 watchdog process implemented by 136%% diameter_watchdog. 137-record(watchdog, 138 {pid :: match(pid()) | undefined, 139 type :: match(connect | accept), 140 ref :: match(reference()), %% key into diameter_config 141 options :: match([diameter:transport_opt()]),%% from start_transport 142 state = ?WD_INITIAL :: match(wd_state()), 143 started = diameter_lib:now(),%% at process start 144 peer = false :: match(boolean() | pid())}). 145 %% true at accepted/remove, pid() at okay/reopen 146 147%% Record representing a Peer State Machine processes implemented by 148%% diameter_peer_fsm. 149-record(peer, 150 {pid :: pid(), 151 apps :: match([{0..16#FFFFFFFF, diameter:app_alias()}] %% {Id, Alias} 152 | [diameter:app_alias()]), %% remote 153 caps :: match(#diameter_caps{}), 154 started = diameter_lib:now(), %% at connection_up 155 watchdog :: match(pid() %% key into watchdogT 156 | undefined)}). %% undefined if remote 157 158%% --------------------------------------------------------------------------- 159%% # start/1 160%% --------------------------------------------------------------------------- 161 162start(SvcName) -> 163 diameter_service_sup:start_child(SvcName). 164 165start_link(SvcName) -> 166 Options = [{spawn_opt, diameter_lib:spawn_opts(server, [])}], 167 gen_server:start_link(?MODULE, [SvcName], Options). 168%% Put the arbitrary term SvcName in a list in case we ever want to 169%% send more than this and need to distinguish old from new. 170 171%% --------------------------------------------------------------------------- 172%% # stop/1 173%% --------------------------------------------------------------------------- 174 175stop(SvcName) -> 176 case whois(SvcName) of 177 undefined -> 178 {error, not_started}; 179 Pid -> 180 stop(call_service(Pid, stop), Pid) 181 end. 182 183stop(ok, Pid) -> 184 MRef = monitor(process, Pid), 185 receive {'DOWN', MRef, process, _, _} -> ok end; 186stop(No, _) -> 187 No. 188 189%% --------------------------------------------------------------------------- 190%% # start_transport/3 191%% --------------------------------------------------------------------------- 192 193start_transport(SvcName, {_Ref, _Type, _Opts} = T) -> 194 call_service_by_name(SvcName, {start, T}). 195 196%% --------------------------------------------------------------------------- 197%% # stop_transport/2 198%% --------------------------------------------------------------------------- 199 200stop_transport(_, []) -> 201 ok; 202stop_transport(SvcName, [_|_] = Refs) -> 203 call_service_by_name(SvcName, {stop, Refs}). 204 205%% --------------------------------------------------------------------------- 206%% # info/2 207%% --------------------------------------------------------------------------- 208 209info(SvcName, Item) -> 210 case lookup_state(SvcName) of 211 [S] -> 212 service_info(Item, S); 213 [] -> 214 undefined 215 end. 216 217%% lookup_state/1 218 219lookup_state(SvcName) -> 220 case ets:lookup(?STATE_TABLE, SvcName) of 221 [#state{}] = L -> 222 L; 223 _ -> 224 [] 225 end. 226 227%% --------------------------------------------------------------------------- 228%% # peer_info/2 229%% --------------------------------------------------------------------------- 230 231%% An extended version of info_peer/1 for peer_info/1. 232peer_info(Pid) -> 233 try 234 {_, PD} = process_info(Pid, dictionary), 235 {_, T} = lists:keyfind({diameter_peer_fsm, start}, 1, PD), 236 {TPid, {{Type, Ref}, TMod, Cfg}} = T, 237 {_, TD} = process_info(TPid, dictionary), 238 {_, Data} = lists:keyfind({TMod, info}, 1, TD), 239 [{ref, Ref}, 240 {type, Type}, 241 {owner, TPid}, 242 {module, TMod}, 243 {config, Cfg} 244 | try TMod:info(Data) catch _:_ -> [] end] 245 catch 246 error:_ -> 247 [] 248 end. 249 250%% --------------------------------------------------------------------------- 251%% # subscribe/1 252%% # unsubscribe/1 253%% --------------------------------------------------------------------------- 254 255subscribe(SvcName) -> 256 diameter_reg:add({?MODULE, subscriber, SvcName}). 257 258unsubscribe(SvcName) -> 259 diameter_reg:remove({?MODULE, subscriber, SvcName}). 260 261subscriptions(Pat) -> 262 pmap(diameter_reg:match({?MODULE, subscriber, Pat})). 263 264subscriptions() -> 265 subscriptions('_'). 266 267pmap(Props) -> 268 lists:map(fun({{?MODULE, _, Name}, Pid}) -> {Name, Pid} end, Props). 269 270%% --------------------------------------------------------------------------- 271%% # services/1 272%% --------------------------------------------------------------------------- 273 274services(Pat) -> 275 pmap(diameter_reg:match({?MODULE, service, Pat})). 276 277services() -> 278 services('_'). 279 280whois(SvcName) -> 281 case diameter_reg:match({?MODULE, service, SvcName}) of 282 [{_, Pid}] -> 283 Pid; 284 [] -> 285 undefined 286 end. 287 288%% --------------------------------------------------------------------------- 289%% # pick_peer/3 290%% --------------------------------------------------------------------------- 291 292-spec pick_peer(SvcName, AppOrAlias, Opts) 293 -> {{{TPid, Caps}, App}, SvcOpts} 294 | false %% no selection 295 | {error, no_service} 296 when SvcName :: diameter:service_name(), 297 AppOrAlias :: #diameter_app{} 298 | {alias, diameter:app_alias()}, 299 Opts :: {fun((Dict :: module()) -> [term()]), 300 diameter:peer_filter(), 301 Xtra :: list(), 302 [diameter:peer_ref()]}, 303 TPid :: pid(), 304 Caps :: #diameter_caps{}, 305 App :: #diameter_app{}, 306 SvcOpts :: map(). 307 308pick_peer(SvcName, App, Opts) -> 309 pick(lookup_state(SvcName), App, Opts). 310 311pick([], _, _) -> 312 {error, no_service}; 313 314pick([S], App, Opts) -> 315 pick(S, App, Opts); 316 317pick(#state{service = #diameter_service{applications = Apps}} 318 = S, 319 {alias, Alias}, 320 Opts) -> %% initial call from diameter:call/4 321 pick(S, find_outgoing_app(Alias, Apps), Opts); 322 323pick(_, false = No, _) -> 324 No; 325 326pick(#state{options = SvcOpts} 327 = S, 328 #diameter_app{module = ModX, dictionary = Dict} 329 = App0, 330 {DestF, Filter, Xtra, TPids}) -> 331 App = App0#diameter_app{module = ModX ++ Xtra}, 332 [_,_] = RealmAndHost = diameter_lib:eval([DestF, Dict]), 333 case pick_peer(App, RealmAndHost, [Filter | TPids], S) of 334 {_TPid, _Caps} = TC -> 335 {{TC, App}, SvcOpts}; 336 false = No -> 337 No 338 end. 339 340%% --------------------------------------------------------------------------- 341%% # find_incoming_app/4 342%% --------------------------------------------------------------------------- 343 344-spec find_incoming_app(PeerT, TPid, Id, Apps) 345 -> {#diameter_app{}, #diameter_caps{}} %% connection and suitable app 346 | #diameter_caps{} %% connection but no suitable app 347 | false %% no connection 348 when PeerT :: ets:tid(), 349 TPid :: pid(), 350 Id :: non_neg_integer(), 351 Apps :: [#diameter_app{}]. 352 353find_incoming_app(PeerT, TPid, Id, Apps) -> 354 try ets:lookup(PeerT, TPid) of 355 [#peer{} = P] -> 356 find_incoming_app(P, Id, Apps); 357 [] -> %% transport has gone down 358 false 359 catch 360 error: badarg -> %% service has gone down (and taken table with it) 361 false 362 end. 363 364%% --------------------------------------------------------------------------- 365%% # notify/2 366%% --------------------------------------------------------------------------- 367 368notify(SvcName, Msg) -> 369 Pid = whois(SvcName), 370 is_pid(Pid) andalso (Pid ! Msg). 371 372%% =========================================================================== 373%% =========================================================================== 374 375state(Svc) -> 376 call_service(Svc, state). 377 378uptime(Svc) -> 379 call_service(Svc, uptime). 380 381%% call_module/3 382 383call_module(Service, AppMod, Request) -> 384 call_service(Service, {call_module, AppMod, Request}). 385 386%% --------------------------------------------------------------------------- 387%% # init/1 388%% --------------------------------------------------------------------------- 389 390init([SvcName]) -> 391 process_flag(trap_exit, true), %% ensure terminate(shutdown, _) 392 i(SvcName, diameter_reg:add_new({?MODULE, service, SvcName})). 393 394i(SvcName, true) -> 395 {ok, i(SvcName)}; 396i(_, false) -> 397 {stop, {shutdown, already_started}}. 398 399%% --------------------------------------------------------------------------- 400%% # handle_call/3 401%% --------------------------------------------------------------------------- 402 403handle_call(state, _, S) -> 404 {reply, S, S}; 405 406handle_call(uptime, _, #state{id = T} = S) -> 407 {reply, diameter_lib:now_diff(T), S}; 408 409%% Start a transport. 410handle_call({start, {Ref, Type, Opts}}, _From, S) -> 411 {reply, start(Ref, {Type, Opts}, S), S}; 412 413%% Stop transports. 414handle_call({stop, Refs}, _From, S) -> 415 shutdown(Refs, S), 416 {reply, ok, S}; 417 418%% pick_peer with mutable state 419handle_call({pick_peer, Local, Remote, App}, _From, S) -> 420 #diameter_app{mutable = true} = App, %% assert 421 {reply, pick_peer(Local, Remote, self(), S#state.service_name, App), S}; 422 423handle_call({call_module, AppMod, Req}, From, S) -> 424 call_module(AppMod, Req, From, S); 425 426handle_call(stop, _From, S) -> 427 shutdown(service, S), 428 {stop, normal, ok, S}; 429%% The server currently isn't guaranteed to be dead when the caller 430%% gets the reply. We deal with this in the call to the server, 431%% stating a monitor that waits for DOWN before returning. 432 433handle_call(Req, From, S) -> 434 unexpected(handle_call, [Req, From], S), 435 {reply, nok, S}. 436 437%% --------------------------------------------------------------------------- 438%% # handle_cast/2 439%% --------------------------------------------------------------------------- 440 441handle_cast(Req, S) -> 442 unexpected(handle_cast, [Req], S), 443 {noreply, S}. 444 445%% --------------------------------------------------------------------------- 446%% # handle_info/2 447%% --------------------------------------------------------------------------- 448 449handle_info(T, #state{} = S) -> 450 case transition(T,S) of 451 ok -> 452 {noreply, S}; 453 {stop, Reason} -> 454 {stop, {shutdown, Reason}, S} 455 end. 456 457%% transition/2 458 459%% Peer process is telling us to start a new accept process. 460transition({accepted, Pid, TPid}, S) -> 461 accepted(Pid, TPid, S), 462 ok; 463 464%% Connecting transport is being restarted by watchdog. 465transition({reconnect, Pid}, S) -> 466 reconnect(Pid, S), 467 ok; 468 469%% Watchdog is sending notification of transport death. 470transition({close, Pid, Reason}, #state{service_name = SvcName, 471 watchdogT = WatchdogT}) -> 472 #watchdog{state = WS, 473 ref = Ref, 474 type = Type, 475 options = Opts} 476 = fetch(WatchdogT, Pid), 477 WS /= ?WD_OKAY 478 andalso 479 send_event(SvcName, {closed, Ref, Reason, {type(Type), Opts}}), 480 ok; 481 482%% Watchdog is sending notification of a state transition. 483transition({watchdog, Pid, {[TPid | Data], From, To}}, 484 #state{service_name = SvcName, 485 watchdogT = WatchdogT} 486 = S) -> 487 #watchdog{ref = Ref, type = T, options = Opts} 488 = Wd 489 = fetch(WatchdogT, Pid), 490 watchdog(TPid, Data, From, To, Wd, S), 491 send_event(SvcName, {watchdog, Ref, TPid, {From, To}, {T, Opts}}), 492 ok; 493%% Death of a watchdog process (#watchdog.pid) results in the removal of 494%% it's peer and any associated conn record when 'DOWN' is received. 495%% Death of a peer process process (#peer.pid, #watchdog.peer) results in 496%% ?WD_DOWN. 497 498%% Monitor process has died. Just die with a reason that tells 499%% diameter_config about the happening. If a cleaner shutdown is 500%% required then someone should stop us. 501transition({'DOWN', MRef, process, _, Reason}, #state{monitor = MRef}) -> 502 {stop, {monitor, Reason}}; 503 504%% Local watchdog process has died. 505transition({'DOWN', _, process, Pid, _Reason}, S) 506 when node(Pid) == node() -> 507 watchdog_down(Pid, S), 508 ok; 509 510%% Remote service wants to know about shared peers. 511transition({service, Pid}, S) -> 512 share_peers(Pid, S), 513 ok; 514 515%% Remote service is communicating a shared peer. 516transition({peer, TPid, Aliases, Caps}, S) -> 517 remote_peer_up(TPid, Aliases, Caps, S), 518 ok; 519transition({peer, TPid}, S) -> 520 remote_peer_down(TPid, S), 521 ok; 522 523%% Remote peer process has died. 524transition({'DOWN', _, process, TPid, _}, S) -> 525 remote_peer_down(TPid, S), 526 ok; 527 528%% Restart after tc expiry. 529transition({tc_timeout, T}, S) -> 530 tc_timeout(T, S), 531 ok; 532 533transition({nodeup, Node, _}, S) -> 534 nodeup(Node, S), 535 ok; 536 537transition({nodedown, _Node, _}, _) -> 538 ok; 539 540transition(Req, S) -> 541 unexpected(handle_info, [Req], S), 542 ok. 543 544%% --------------------------------------------------------------------------- 545%% # terminate/2 546%% --------------------------------------------------------------------------- 547 548terminate(Reason, #state{service_name = Name, local = {PeerT, _, _}} = S) -> 549 send_event(Name, stop), 550 ets:delete(?STATE_TABLE, Name), 551 552 %% Communicate pending loss of any peers that connection_down/3 553 %% won't. This is needed when stopping a service since we don't 554 %% wait for watchdog state changes to take care of if. That this 555 %% takes place after deleting the state entry ensures that the 556 %% resulting failover by request processes accomplishes nothing. 557 ets:foldl(fun peer_down/2, ok, PeerT), 558 559 shutdown == Reason %% application shutdown 560 andalso shutdown(application, S). 561 562%% peer_down/1 563%% 564%% Entries with watchdog state SUSPECT are already down: ignore the 565%% expected failure. This assumes the current implementation, but 566%% double the number of lookups (in the typical case) could be the 567%% greater evil if there are many peer connections. 568 569peer_down(#peer{pid = TPid}, _) -> 570 try 571 diameter_traffic:peer_down(TPid) 572 catch 573 error: {badmatch, []} -> ok 574 end. 575 576%% --------------------------------------------------------------------------- 577%% # code_change/3 578%% --------------------------------------------------------------------------- 579 580code_change(_FromVsn, S, _Extra) -> 581 {ok, S}. 582 583%% =========================================================================== 584%% =========================================================================== 585 586unexpected(F, A, #state{service_name = Name}) -> 587 ?UNEXPECTED(F, A ++ [Name]). 588 589eval([M|X], F, A) -> 590 apply(M, F, A ++ X). 591 592%% Callback with state. 593 594state_cb(#diameter_app{module = ModX, mutable = false, init_state = S}, 595 pick_peer = F, 596 A) -> 597 eval(ModX, F, A ++ [S]); 598 599state_cb(#diameter_app{module = ModX, alias = Alias}, F, A) -> 600 eval(ModX, F, A ++ [mod_state(Alias)]). 601 602choose(true, X, _) -> X; 603choose(false, _, X) -> X. 604 605ets_new(Tbl) -> 606 ets:new(Tbl, [{keypos, 2}]). 607 608%% Using the process dictionary for the callback state was initially 609%% just a way to make what was horrendous trace (big state record and 610%% much else everywhere) somewhat more readable. There's not as much 611%% need for it now but it's no worse (except possibly that we don't 612%% see the table identifier being passed around) than an ets table so 613%% keep it. 614 615mod_state(Alias) -> 616 get({?MODULE, mod_state, Alias}). 617 618mod_state(Alias, ModS) -> 619 put({?MODULE, mod_state, Alias}, ModS). 620 621%% --------------------------------------------------------------------------- 622%% # shutdown/2 623%% --------------------------------------------------------------------------- 624 625%% remove_transport 626shutdown(Refs, #state{watchdogT = WatchdogT}) 627 when is_list(Refs) -> 628 ets:insert(WatchdogT, ets:foldl(fun(R,A) -> st(R, Refs, A) end, 629 [], 630 WatchdogT)); 631 632%% application/service shutdown 633shutdown(Reason, #state{watchdogT = WatchdogT}) 634 when Reason == application; 635 Reason == service -> 636 diameter_lib:wait(ets:foldl(fun(P,A) -> ss(P, Reason, A) end, 637 [], 638 WatchdogT)). 639 640%% st/3 641 642%% Mark replacement as started so that a subsequent accept doesn't 643%% result in a new process that isn't terminated. 644st(#watchdog{ref = Ref, pid = Pid, peer = P} = Rec, Refs, Acc) -> 645 case lists:member(Ref, Refs) of 646 true -> 647 Pid ! {shutdown, self(), transport}, %% 'DOWN' cleans up 648 [Rec#watchdog{peer = true} || P == false] ++ Acc; 649 false -> 650 Acc 651 end. 652 653%% ss/3 654 655ss(#watchdog{pid = Pid}, Reason, Acc) -> 656 MRef = monitor(process, Pid), 657 Pid ! {shutdown, self(), Reason}, 658 [MRef | Acc]. 659 660%% --------------------------------------------------------------------------- 661%% # call_service/2 662%% --------------------------------------------------------------------------- 663 664call_service(Pid, Req) 665 when is_pid(Pid) -> 666 cs(Pid, Req); 667call_service(SvcName, Req) -> 668 call_service_by_name(SvcName, Req). 669 670call_service_by_name(SvcName, Req) -> 671 cs(whois(SvcName), Req). 672 673cs(Pid, Req) 674 when is_pid(Pid) -> 675 try 676 gen_server:call(Pid, Req, infinity) 677 catch 678 E: Reason when E == exit -> 679 {error, {E, Reason}} 680 end; 681 682cs(undefined, _) -> 683 {error, no_service}. 684 685%% --------------------------------------------------------------------------- 686%% # i/1 687%% --------------------------------------------------------------------------- 688 689%% Intialize the state of a service gen_server. 690 691i(SvcName) -> 692 %% Split the config into a server state and a list of transports. 693 {#state{} = S, CL} = lists:foldl(fun cfg_acc/2, 694 {false, []}, 695 diameter_config:lookup(SvcName)), 696 697 %% Publish the state in order to be able to access it outside of 698 %% the service process. Originally table identifiers were only 699 %% known to the service process but we now want to provide the 700 %% option of application callbacks being 'stateless' in order to 701 %% avoid having to go through a common process. (Eg. An agent that 702 %% sends a request for every incoming request.) 703 true = ets:insert_new(?STATE_TABLE, S), 704 705 %% Start fsms for each transport. 706 send_event(SvcName, start), 707 lists:foreach(fun(T) -> start_fsm(T,S) end, CL), 708 709 init_shared(S), 710 S. 711 712cfg_acc({SvcName, #diameter_service{applications = Apps} = Rec, Opts}, 713 {false, Acc}) -> 714 lists:foreach(fun init_mod/1, Apps), 715 #{monitor := M} 716 = SvcOpts 717 = service_opts(Opts), 718 S = #state{service_name = SvcName, 719 service = Rec#diameter_service{pid = self()}, 720 local = init_peers(), 721 remote = init_peers(), 722 monitor = mref(M), 723 options = maps:remove(monitor, SvcOpts)}, 724 {S, Acc}; 725 726cfg_acc({_Ref, Type, _Opts} = T, {S, Acc}) 727 when Type == connect; 728 Type == listen -> 729 {S, [T | Acc]}. 730 731init_peers() -> 732 {ets_new(caps), %% #peer{} 733 ets:new(apps, [bag]), %% {Alias, TPid} 734 ets:new(idents, [bag])}. %% {{host, OH} | {realm, OR} | {OR, OH}, 735 %% Alias, 736 %% TPid} 737 738service_opts(Opts) -> 739 remove([{strict_arities, true}, 740 {avp_dictionaries, []}], 741 maps:merge(maps:from_list([{monitor, false} | def_opts()]), 742 maps:from_list(Opts))). 743 744remove(List, Map) -> 745 maps:filter(fun(K,V) -> not lists:member({K,V}, List) end, 746 Map). 747 748def_opts() -> %% defaults on the service map 749 [{share_peers, false}, 750 {use_shared_peers, false}, 751 {sequence, {0,32}}, 752 {restrict_connections, nodes}, 753 {incoming_maxlen, 16#FFFFFF}, 754 {strict_arities, true}, 755 {strict_mbit, true}, 756 {decode_format, record}, 757 {avp_dictionaries, []}, 758 {traffic_counters, true}, 759 {string_decode, true}, 760 {spawn_opt, []}]. 761 762mref(false = No) -> 763 No; 764mref(P) -> 765 monitor(process, P). 766 767init_shared(#state{options = #{use_shared_peers := T}, 768 service_name = Svc}) -> 769 T == false orelse net_kernel:monitor_nodes(true, [{node_type, visible}, 770 nodedown_reason]), 771 notify(T, Svc, {service, self()}). 772 773init_mod(#diameter_app{alias = Alias, 774 init_state = S}) -> 775 mod_state(Alias, S). 776 777start_fsm({Ref, Type, Opts}, S) -> 778 start(Ref, {Type, Opts}, S). 779 780notify(Share, SvcName, T) -> 781 Nodes = remotes(Share), 782 [] /= Nodes andalso diameter_peer:notify(Nodes, SvcName, T). 783%% Test for the empty list for upgrade reasons: there's no 784%% diameter_peer:notify/3 in old code. 785 786nodeup(Node, #state{options = #{share_peers := SP}, 787 service_name = SvcName}) -> 788 lists:member(Node, remotes(SP)) 789 andalso diameter_peer:notify([Node], SvcName, {service, self()}). 790 791remotes(false) -> 792 []; 793 794remotes(true) -> 795 nodes(); 796 797remotes(Nodes) 798 when is_atom(hd(Nodes)); 799 Nodes == [] -> 800 Nodes; 801 802remotes(F) -> 803 try diameter_lib:eval(F) of 804 L when is_list(L) -> 805 L; 806 T -> 807 ?LOG(invalid_return, {F,T}), 808 error_report(invalid_return, share_peers, F), 809 [] 810 catch 811 E:R:S -> 812 ?LOG(failure, {E, R, F, diameter_lib:stacktrace(S)}), 813 error_report(failure, share_peers, F), 814 [] 815 end. 816 817%% error_report/3 818 819error_report(T, What, F) -> 820 Reason = io_lib:format("~s from ~p callback", [reason(T), What]), 821 diameter_lib:error_report(Reason, diameter_lib:eval_name(F)). 822 823reason(invalid_return) -> 824 "invalid return"; 825reason(failure) -> 826 "failure". 827 828%% --------------------------------------------------------------------------- 829%% # start/3 830%% --------------------------------------------------------------------------- 831 832%% If the initial start/3 at service/transport start succeeds then 833%% subsequent calls to start/4 on the same service will also succeed 834%% since they involve the same call to merge_service/2. We merge here 835%% rather than earlier since the service may not yet be configured 836%% when the transport is configured. 837 838start(Ref, {T, Opts}, S) 839 when T == connect; 840 T == listen -> 841 N = proplists:get_value(pool_size, Opts, 1), 842 try 843 {ok, start(Ref, type(T), Opts, N, S)} 844 catch 845 ?FAILURE(Reason) -> 846 {error, Reason} 847 end. 848%% TODO: don't actually raise any errors yet 849 850%% There used to be a difference here between the handling of 851%% configured listening and connecting transports but now we simply 852%% tell the transport_module to start an accepting or connecting 853%% process respectively, the transport implementation initiating 854%% listening on a port as required. 855type(listen) -> accept; 856type(accept) -> listen; 857type(connect = T) -> T. 858 859%% start/4 860 861start(Ref, Type, Opts, State) -> 862 start(Ref, Type, Opts, 1, State). 863 864%% start/5 865 866start(Ref, Type, Opts, N, #state{watchdogT = WatchdogT, 867 local = {PeerT, _, _}, 868 options = #{string_decode := SD} 869 = SvcOpts, 870 service_name = SvcName, 871 service = Svc0}) 872 when Type == connect; 873 Type == accept -> 874 #diameter_service{applications = Apps} 875 = Svc1 876 = merge_service(Opts, Svc0), 877 Svc = binary_caps(Svc1, SD), 878 {SOpts, TOpts} = merge_opts(SvcOpts, Opts), 879 RecvData = diameter_traffic:make_recvdata([SvcName, PeerT, Apps, SOpts]), 880 T = {TOpts, SOpts, RecvData, Svc}, 881 Rec = #watchdog{type = Type, 882 ref = Ref, 883 options = TOpts}, 884 885 diameter_lib:fold_n(fun(_,A) -> 886 [wd(Type, Ref, T, WatchdogT, Rec) | A] 887 end, 888 [], 889 N). 890 891merge_opts(SvcOpts, Opts) -> 892 Keys = [K || {K,_} <- def_opts()], 893 SO = [T || {K,_} = T <- Opts, lists:member(K, Keys)], 894 TO = Opts -- SO, 895 {maps:merge(maps:with(Keys, SvcOpts), maps:from_list(SO)), 896 TO ++ [T || {K,_} = T <- maps:to_list(SvcOpts), 897 not lists:member(K, Keys), 898 not lists:keymember(K, 1, Opts)]}. 899 900binary_caps(Svc, true) -> 901 Svc; 902binary_caps(#diameter_service{capabilities = Caps} = Svc, false) -> 903 Svc#diameter_service{capabilities = diameter_capx:binary_caps(Caps)}. 904 905wd(Type, Ref, T, WatchdogT, Rec) -> 906 Pid = start_watchdog(Type, Ref, T), 907 ets:insert(WatchdogT, Rec#watchdog{pid = Pid}), 908 Pid. 909 910%% Note that the service record passed into the watchdog is the merged 911%% record so that each watchdog may get a different record. This 912%% record is what is passed back into application callbacks. 913 914start_watchdog(Type, Ref, T) -> 915 {_MRef, Pid} = diameter_watchdog:start({Type, Ref}, T), 916 Pid. 917 918%% merge_service/2 919 920merge_service(Opts, Svc) -> 921 lists:foldl(fun ms/2, Svc, Opts). 922 923%% Limit the applications known to the fsm to those in the 'apps' 924%% option. That this might be empty is checked by the fsm. It's not 925%% checked at config-time since there's no requirement that the 926%% service be configured first. (Which could be considered a bit odd.) 927ms({applications, As}, #diameter_service{applications = Apps} = S) 928 when is_list(As) -> 929 S#diameter_service{applications 930 = [A || A <- Apps, 931 lists:member(A#diameter_app.alias, As)]}; 932 933%% The fact that all capabilities can be configured on the transports 934%% means that the service doesn't necessarily represent a single 935%% locally implemented Diameter node as identified by Origin-Host: a 936%% transport can configure its own Origin-Host. This means that the 937%% service little more than a placeholder for default capabilities 938%% plus a list of applications that individual transports can choose 939%% to support (or not). 940ms({capabilities, Opts}, #diameter_service{capabilities = Caps0} = Svc) 941 when is_list(Opts) -> 942 %% make_caps has already succeeded in diameter_config so it will succeed 943 %% again here. 944 {ok, Caps} = diameter_capx:make_caps(Caps0, Opts), 945 Svc#diameter_service{capabilities = Caps}; 946 947ms(_, Svc) -> 948 Svc. 949 950%% --------------------------------------------------------------------------- 951%% # accepted/3 952%% --------------------------------------------------------------------------- 953 954accepted(Pid, _TPid, #state{watchdogT = WatchdogT} = S) -> 955 #watchdog{type = accept = T, peer = P} 956 = Wd 957 = fetch(WatchdogT, Pid), 958 if not P -> 959 #watchdog{ref = Ref, options = Opts} = Wd, 960 %% Mark replacement started, and start new watchdog. 961 ets:insert(WatchdogT, Wd#watchdog{peer = true}), 962 start(Ref, T, Opts, S); 963 P -> 964 %% Transport removal in progress: true has been set in 965 %% shutdown/2, and the transport will die as a 966 %% consequence. 967 ok 968 end. 969 970%% fetch/2 971 972fetch(Tid, Key) -> 973 [T] = ets:lookup(Tid, Key), 974 T. 975 976%% --------------------------------------------------------------------------- 977%% # watchdog/6 978%% 979%% React to a watchdog state transition. 980%% --------------------------------------------------------------------------- 981 982%% Watchdog has a new open connection. 983watchdog(TPid, [T], _, ?WD_OKAY, Wd, State) -> 984 connection_up({TPid, T}, Wd, State); 985 986%% Watchdog has a new connection that will be opened after DW[RA] 987%% exchange. 988watchdog(TPid, [T], _, ?WD_REOPEN, Wd, State) -> 989 reopen({TPid, T}, Wd, State); 990 991%% Watchdog has recovered a suspect connection. 992watchdog(TPid, [], ?WD_SUSPECT, ?WD_OKAY, Wd, State) -> 993 #watchdog{peer = TPid} = Wd, %% assert 994 connection_up(Wd, State); 995 996%% Watchdog has an unresponsive connection. Note that the peer table 997%% entry isn't removed until DOWN. 998watchdog(TPid, [], ?WD_OKAY, ?WD_SUSPECT = To, Wd, State) -> 999 #watchdog{peer = TPid} = Wd, %% assert 1000 watchdog_down(Wd, To, State); 1001 1002%% Watchdog has lost its connection. 1003watchdog(TPid, [], _, ?WD_DOWN = To, Wd, #state{local = {PeerT, _, _}} = S) -> 1004 close(Wd), 1005 watchdog_down(Wd, To, S), 1006 ets:delete(PeerT, TPid); 1007 1008watchdog(_, [], _, _, _, _) -> 1009 ok. 1010 1011watchdog_down(Wd, To, #state{watchdogT = WatchdogT} = S) -> 1012 ets:insert(WatchdogT, Wd#watchdog{state = To}), 1013 connection_down(Wd, To, S). 1014 1015%% --------------------------------------------------------------------------- 1016%% # connection_up/3 1017%% --------------------------------------------------------------------------- 1018 1019%% Watchdog process has reached state OKAY. 1020 1021connection_up({TPid, {Caps, SupportedApps, Pkt}}, 1022 #watchdog{pid = Pid} 1023 = Wd, 1024 #state{local = {PeerT, _, _}} 1025 = S) -> 1026 Rec = #peer{pid = TPid, 1027 apps = SupportedApps, 1028 caps = Caps, 1029 watchdog = Pid}, 1030 ets:insert(PeerT, Rec), 1031 connection_up([Pkt], Wd#watchdog{peer = TPid}, Rec, S). 1032 1033%% --------------------------------------------------------------------------- 1034%% # reopen/3 1035%% --------------------------------------------------------------------------- 1036 1037reopen({TPid, {Caps, SupportedApps, _Pkt}}, 1038 #watchdog{pid = Pid} 1039 = Wd, 1040 #state{watchdogT = WatchdogT, 1041 local = {PeerT, _, _}}) -> 1042 ets:insert(PeerT, #peer{pid = TPid, 1043 apps = SupportedApps, 1044 caps = Caps, 1045 watchdog = Pid}), 1046 ets:insert(WatchdogT, Wd#watchdog{state = ?WD_REOPEN, 1047 peer = TPid}). 1048 1049%% --------------------------------------------------------------------------- 1050%% # connection_up/2 1051%% --------------------------------------------------------------------------- 1052 1053%% Watchdog has recovered a suspect connection. Note that there has 1054%% been no new capabilties exchange in this case. 1055 1056connection_up(#watchdog{peer = TPid} = Wd, #state{local = {PeerT, _, _}} 1057 = S) -> 1058 connection_up([], Wd, fetch(PeerT, TPid), S). 1059 1060%% connection_up/4 1061 1062connection_up(Extra, 1063 #watchdog{peer = TPid} 1064 = Wd, 1065 #peer{apps = SApps, caps = Caps} 1066 = Pr, 1067 #state{watchdogT = WatchdogT, 1068 local = LT, 1069 service_name = SvcName, 1070 service = #diameter_service{applications = Apps}} 1071 = S) -> 1072 ets:insert(WatchdogT, Wd#watchdog{state = ?WD_OKAY}), 1073 diameter_traffic:peer_up(TPid), 1074 local_peer_up(SApps, {TPid, Caps}, {SvcName, Apps}, LT), 1075 report_status(up, Wd, Pr, S, Extra). 1076 1077local_peer_up(SApps, {TPid, Caps} = TC, SA, LT) -> 1078 insert_peer(TPid, [A || {_,A} <- SApps], Caps, LT), 1079 lists:foreach(fun(A) -> peer_up(A, TC, SA) end, SApps). 1080 1081peer_up({Id, Alias}, TC, SA) -> 1082 peer_up(Id, Alias, TC, SA). 1083 1084peer_up(Id, Alias, {TPid, _} = TC, {SvcName, Apps}) -> 1085 #diameter_app{id = Id} %% assert 1086 = App 1087 = find_app(Alias, Apps), 1088 1089 peer_cb(App, peer_up, [SvcName, TC]) 1090 orelse exit(TPid, kill). %% fake transport failure 1091 1092%% --------------------------------------------------------------------------- 1093%% # find_incoming_app/3 1094%% --------------------------------------------------------------------------- 1095 1096%% No one should be sending the relay identifier. 1097find_incoming_app(#peer{caps = Caps}, ?APP_ID_RELAY, _) -> 1098 Caps; 1099 1100find_incoming_app(Peer, Id, Apps) 1101 when is_integer(Id) -> 1102 find_incoming_app(Peer, [Id, ?APP_ID_RELAY], Apps); 1103 1104%% Note that the apps represented in SApps may be a strict subset of 1105%% those in Apps. 1106find_incoming_app(#peer{apps = SApps, caps = Caps}, Ids, Apps) -> 1107 case keyfind(Ids, 1, SApps) of 1108 {_Id, Alias} -> 1109 {#diameter_app{} = find_app(Alias, Apps), Caps}; 1110 false -> 1111 Caps 1112 end. 1113 1114%% keyfind/3 1115 1116keyfind([], _, _) -> 1117 false; 1118keyfind([Key | Rest], Pos, L) -> 1119 case lists:keyfind(Key, Pos, L) of 1120 false -> 1121 keyfind(Rest, Pos, L); 1122 T -> 1123 T 1124 end. 1125 1126%% find_outgoing_app/2 1127 1128find_outgoing_app(Alias, Apps) -> 1129 case find_app(Alias, Apps) of 1130 #diameter_app{id = ?APP_ID_RELAY} -> 1131 false; 1132 A -> 1133 A 1134 end. 1135 1136%% find_app/2 1137 1138find_app(Alias, Apps) -> 1139 lists:keyfind(Alias, #diameter_app.alias, Apps). 1140 1141%% Don't bring down the service (and all associated connections) 1142%% regardless of what happens. 1143peer_cb(App, F, A) -> 1144 try state_cb(App, F, A) of 1145 ModS -> 1146 mod_state(App#diameter_app.alias, ModS), 1147 true 1148 catch 1149 E:R:S -> 1150 %% Don't include arguments since a #diameter_caps{} strings 1151 %% from the peer, which could be anything (especially, large). 1152 [Mod|X] = App#diameter_app.module, 1153 ?LOG(failure, {E, R, Mod, F, diameter_lib:stacktrace(S)}), 1154 error_report(failure, F, {Mod, F, A ++ X}), 1155 false 1156 end. 1157 1158%% --------------------------------------------------------------------------- 1159%% # connection_down/3 1160%% --------------------------------------------------------------------------- 1161 1162connection_down(#watchdog{state = ?WD_OKAY, 1163 peer = TPid} 1164 = Wd, 1165 #peer{caps = Caps, 1166 apps = SApps} 1167 = Pr, 1168 #state{service_name = SvcName, 1169 service = #diameter_service{applications = Apps}, 1170 local = LT} 1171 = S) -> 1172 report_status(down, Wd, Pr, S, []), 1173 local_peer_down(SApps, {TPid, Caps}, {SvcName, Apps}, LT), 1174 diameter_traffic:peer_down(TPid); 1175 1176connection_down(#watchdog{state = ?WD_OKAY, 1177 peer = TPid} 1178 = Wd, 1179 To, 1180 #state{local = {PeerT, _, _}} 1181 = S) 1182 when is_atom(To) -> 1183 connection_down(Wd, #peer{} = fetch(PeerT, TPid), S); 1184 1185connection_down(#watchdog{}, _, _) -> 1186 ok. 1187 1188local_peer_down(SApps, {TPid, _Caps} = TC, SA, LT) -> 1189 delete_peer(TPid, LT), 1190 lists:foreach(fun(A) -> peer_down(A, TC, SA) end, SApps). 1191 1192peer_down({Id, Alias}, TC, SA) -> 1193 peer_down(Id, Alias, TC, SA). 1194 1195peer_down(Id, Alias, TC, {SvcName, Apps}) -> 1196 #diameter_app{id = Id} %% assert 1197 = App 1198 = find_app(Alias, Apps), 1199 1200 peer_cb(App, peer_down, [SvcName, TC]). 1201 1202%% --------------------------------------------------------------------------- 1203%% # watchdog_down/2 1204%% --------------------------------------------------------------------------- 1205 1206%% Watchdog process has died. 1207 1208watchdog_down(Pid, #state{watchdogT = WatchdogT} = S) -> 1209 Wd = fetch(WatchdogT, Pid), 1210 ets:delete_object(WatchdogT, Wd), 1211 restart(Wd,S), 1212 wd_down(Wd,S). 1213 1214%% Watchdog has never reached OKAY ... 1215wd_down(#watchdog{peer = B}, _) 1216 when is_boolean(B) -> 1217 ok; 1218 1219%% ... or maybe it has. 1220wd_down(#watchdog{peer = TPid} = Wd, #state{local = {PeerT, _, _}} = S) -> 1221 connection_down(Wd, ?WD_DOWN, S), 1222 ets:delete(PeerT, TPid). 1223 1224%% restart/2 1225 1226restart(Wd, S) -> 1227 q_restart(restart(Wd), S). 1228 1229%% restart/1 1230 1231%% Always try to reconnect. 1232restart(#watchdog{ref = Ref, 1233 type = connect = T, 1234 options = Opts, 1235 started = Time}) -> 1236 {Time, {Ref, T, Opts}}; 1237 1238%% Transport connection hasn't yet been accepted ... 1239restart(#watchdog{ref = Ref, 1240 type = accept = T, 1241 options = Opts, 1242 peer = false, 1243 started = Time}) -> 1244 {Time, {Ref, T, Opts}}; 1245 1246%% ... or it has: a replacement has already been spawned. 1247restart(#watchdog{type = accept}) -> 1248 false. 1249 1250%% q_restart/2 1251 1252%% Start the reconnect timer. 1253q_restart({Time, {_Ref, Type, Opts} = T}, S) -> 1254 start_tc(tc(Time, default_tc(Type, Opts)), T, S); 1255q_restart(false, _) -> 1256 ok. 1257 1258%% RFC 3588, 2.1: 1259%% 1260%% When no transport connection exists with a peer, an attempt to 1261%% connect SHOULD be periodically made. This behavior is handled via 1262%% the Tc timer, whose recommended value is 30 seconds. There are 1263%% certain exceptions to this rule, such as when a peer has terminated 1264%% the transport connection stating that it does not wish to 1265%% communicate. 1266 1267default_tc(connect, Opts) -> 1268 connect_timer(Opts, ?DEFAULT_TC); 1269default_tc(accept, _) -> 1270 0. 1271 1272%% Accept both connect_timer and the (older) reconnect_timer, the 1273%% latter being a remnant from a time in which the timer did apply to 1274%% reconnect attempts. 1275connect_timer(Opts, Def0) -> 1276 Def = proplists:get_value(reconnect_timer, Opts, Def0), 1277 proplists:get_value(connect_timer, Opts, Def). 1278 1279%% Bound tc below if the watchdog was restarted recently to avoid 1280%% continuous restarted in case of faulty config or other problems. 1281tc(Time, Tc) -> 1282 choose(Tc > ?RESTART_TC 1283 orelse diameter_lib:micro_diff(Time) > 1000*?RESTART_TC, 1284 Tc, 1285 ?RESTART_TC). 1286 1287start_tc(0, T, S) -> 1288 tc_timeout(T, S); 1289start_tc(Tc, T, _) -> 1290 erlang:send_after(Tc, self(), {tc_timeout, T}). 1291 1292%% tc_timeout/2 1293 1294tc_timeout({Ref, _Type, _Opts} = T, #state{service_name = SvcName} = S) -> 1295 tc(diameter_config:have_transport(SvcName, Ref), T, S). 1296 1297tc(true, {Ref, Type, Opts}, #state{service_name = SvcName} = S) -> 1298 send_event(SvcName, {reconnect, Ref, Opts}), 1299 start(Ref, Type, Opts, S); 1300tc(false = No, _, _) -> %% removed 1301 No. 1302 1303%% --------------------------------------------------------------------------- 1304%% # close/2 1305%% --------------------------------------------------------------------------- 1306 1307%% The watchdog doesn't start a new fsm in the accept case, it 1308%% simply stays alive until someone tells it to die in order for 1309%% another watchdog to be able to detect that it should transition 1310%% from initial into reopen rather than okay. That someone is either 1311%% the accepting watchdog upon reception of a CER from the previously 1312%% connected peer, or us after connect_timer timeout or immediately. 1313 1314close(#watchdog{type = connect}) -> 1315 ok; 1316 1317close(#watchdog{type = accept, 1318 pid = Pid, 1319 options = Opts}) -> 1320 Tc = connect_timer(Opts, 2*?DEFAULT_TC), 1321 erlang:send_after(Tc, Pid, close). 1322%% The RFC's only document the behaviour of Tc, our connect_timer, 1323%% for the establishment of connections but we also give 1324%% connect_timer semantics for a listener, being the time within 1325%% which a new connection attempt is expected of a connecting peer. 1326%% The value should be greater than the peer's Tc + jitter. 1327 1328%% --------------------------------------------------------------------------- 1329%% # reconnect/2 1330%% --------------------------------------------------------------------------- 1331 1332reconnect(Pid, #state{service_name = SvcName, 1333 watchdogT = WatchdogT}) -> 1334 #watchdog{ref = Ref, 1335 type = connect, 1336 options = Opts} 1337 = fetch(WatchdogT, Pid), 1338 send_event(SvcName, {reconnect, Ref, Opts}). 1339 1340%% --------------------------------------------------------------------------- 1341%% # call_module/4 1342%% --------------------------------------------------------------------------- 1343 1344%% Backwards compatibility and never documented/advertised. May be 1345%% removed. 1346 1347call_module(Mod, Req, From, #state{service 1348 = #diameter_service{applications = Apps}, 1349 service_name = Svc} 1350 = S) -> 1351 case cm([A || A <- Apps, Mod == hd(A#diameter_app.module)], 1352 Req, 1353 From, 1354 Svc) 1355 of 1356 {reply = T, RC} -> 1357 {T, RC, S}; 1358 noreply = T -> 1359 {T, S}; 1360 Reason -> 1361 {reply, {error, Reason}, S} 1362 end. 1363 1364cm([#diameter_app{alias = Alias} = App], Req, From, Svc) -> 1365 Args = [Req, From, Svc], 1366 1367 try state_cb(App, handle_call, Args) of 1368 {noreply = T, ModS} -> 1369 mod_state(Alias, ModS), 1370 T; 1371 {reply = T, RC, ModS} -> 1372 mod_state(Alias, ModS), 1373 {T, RC}; 1374 T -> 1375 ModX = App#diameter_app.module, 1376 ?LOG(invalid_return, {ModX, handle_call, Args, T}), 1377 invalid 1378 catch 1379 E: Reason: S -> 1380 ModX = App#diameter_app.module, 1381 Stack = diameter_lib:stacktrace(S), 1382 ?LOG(failure, {E, Reason, ModX, handle_call, Stack}), 1383 failure 1384 end; 1385 1386cm([], _, _, _) -> 1387 unknown; 1388 1389cm([_,_|_], _, _, _) -> 1390 multiple. 1391 1392%% --------------------------------------------------------------------------- 1393%% # report_status/5 1394%% --------------------------------------------------------------------------- 1395 1396report_status(Status, 1397 #watchdog{ref = Ref, 1398 peer = TPid, 1399 type = Type, 1400 options = Opts}, 1401 #peer{apps = [_|_] = Apps, 1402 caps = Caps}, 1403 #state{service_name = SvcName} 1404 = S, 1405 Extra) -> 1406 share_peer(Status, Caps, Apps, TPid, S), 1407 Info = [Status, Ref, {TPid, Caps}, {type(Type), Opts} | Extra], 1408 send_event(SvcName, list_to_tuple(Info)). 1409 1410%% send_event/2 1411 1412send_event(SvcName, Info) -> 1413 send_event(#diameter_event{service = SvcName, 1414 info = Info}). 1415 1416send_event(#diameter_event{service = SvcName} = E) -> 1417 lists:foreach(fun({_, Pid}) -> Pid ! E end, subscriptions(SvcName)). 1418 1419%% --------------------------------------------------------------------------- 1420%% # share_peer/5 1421%% --------------------------------------------------------------------------- 1422 1423share_peer(up, Caps, Apps, TPid, #state{options = #{share_peers := SP}, 1424 service_name = Svc}) -> 1425 notify(SP, Svc, {peer, TPid, [A || {_,A} <- Apps], Caps}); 1426 1427share_peer(down, _Caps, _Apps, TPid, #state{options = #{share_peers := SP}, 1428 service_name = Svc}) -> 1429 notify(SP, Svc, {peer, TPid}). 1430 1431%% --------------------------------------------------------------------------- 1432%% # share_peers/2 1433%% --------------------------------------------------------------------------- 1434 1435share_peers(Pid, #state{options = #{share_peers := SP}, 1436 local = {PeerT, AppT, _}}) -> 1437 is_remote(Pid, SP) 1438 andalso ets:foldl(fun(T, N) -> N + sp(Pid, AppT, T) end, 1439 0, 1440 PeerT). 1441 1442%% An entry in the peer table doesn't mean the watchdog state is OKAY, 1443%% an entry in the app table does. 1444 1445sp(Pid, AppT, #peer{pid = TPid, 1446 apps = [{_, Alias} | _] = Apps, 1447 caps = Caps}) -> 1448 Spec = [{{'$1', TPid}, 1449 [{'==', '$1', {const, Alias}}], 1450 ['$_']}], 1451 case ets:select(AppT, Spec, 1) of 1452 '$end_of_table' -> 1453 0; 1454 _ -> 1455 Pid ! {peer, TPid, [A || {_,A} <- Apps], Caps}, 1456 1 1457 end. 1458 1459is_remote(Pid, T) -> 1460 Node = node(Pid), 1461 Node /= node() andalso lists:member(Node, remotes(T)). 1462 1463%% --------------------------------------------------------------------------- 1464%% # remote_peer_up/4 1465%% --------------------------------------------------------------------------- 1466 1467remote_peer_up(TPid, Aliases, Caps, #state{options = #{use_shared_peers := T}, 1468 remote = {PeerT, _, _}} 1469 = S) -> 1470 is_remote(TPid, T) 1471 andalso not ets:member(PeerT, TPid) 1472 andalso rpu(TPid, Aliases, Caps, S). 1473 1474%% Notification can be duplicate since remote nodes push and the local 1475%% node pulls. 1476 1477rpu(TPid, Aliases, Caps, #state{service = Svc, remote = RT}) -> 1478 #diameter_service{applications = Apps} = Svc, 1479 Key = #diameter_app.alias, 1480 F = fun(A) -> lists:keymember(A, Key, Apps) end, 1481 rpu(TPid, lists:filter(F, Aliases), Caps, RT); 1482 1483rpu(_, [] = No, _, _) -> 1484 No; 1485 1486rpu(TPid, Aliases, Caps, {PeerT, _, _} = RT) -> 1487 monitor(process, TPid), 1488 ets:insert(PeerT, #peer{pid = TPid, 1489 apps = Aliases, 1490 caps = Caps}), 1491 insert_peer(TPid, Aliases, Caps, RT). 1492 1493%% insert_peer/4 1494 1495insert_peer(TPid, Aliases, Caps, {_PeerT, AppT, IdentT}) -> 1496 #diameter_caps{origin_host = {_, OH}, 1497 origin_realm = {_, OR}} 1498 = Caps, 1499 ets:insert(AppT, [{A, TPid} || A <- Aliases]), 1500 H = iolist_to_binary(OH), 1501 R = iolist_to_binary(OR), 1502 ets:insert(IdentT, [{T, A, TPid} || T <- [{host, H}, {realm, R}, {R, H}], 1503 A <- Aliases]). 1504 1505%% --------------------------------------------------------------------------- 1506%% # remote_peer_down/2 1507%% --------------------------------------------------------------------------- 1508 1509remote_peer_down(TPid, #state{remote = {PeerT, _, _} = RT}) -> 1510 ets:delete(PeerT, TPid), 1511 delete_peer(TPid, RT). 1512 1513%% delete_peer/2 1514 1515delete_peer(TPid, {_PeerT, AppT, IdentT}) -> 1516 ets:select_delete(AppT, [{{'_', TPid}, [], [true]}]), 1517 ets:select_delete(IdentT, [{{'_', '_', TPid}, [], [true]}]). 1518 1519%% --------------------------------------------------------------------------- 1520%% pick_peer/4 1521%% --------------------------------------------------------------------------- 1522 1523pick_peer(#diameter_app{alias = Alias} 1524 = App, 1525 RealmAndHost, 1526 Filter, 1527 #state{local = LT, 1528 remote = RT, 1529 service_name = SvcName, 1530 service = #diameter_service{pid = Pid}}) -> 1531 pick_peer(peers(Alias, RealmAndHost, Filter, LT), 1532 peers(Alias, RealmAndHost, Filter, RT), 1533 Pid, 1534 SvcName, 1535 App). 1536 1537%% pick_peer/5 1538 1539pick_peer([], [], _, _, _) -> 1540 false; 1541 1542%% App state is mutable but we're not in the service process: go there. 1543pick_peer(Local, Remote, Pid, _SvcName, #diameter_app{mutable = true} = App) 1544 when self() /= Pid -> 1545 case call_service(Pid, {pick_peer, Local, Remote, App}) of 1546 {TPid, _} = T when is_pid(TPid) -> 1547 T; 1548 false = No -> 1549 No; 1550 {error, _} -> 1551 false 1552 end; 1553 1554%% App state isn't mutable or it is and we're in the service process: 1555%% do the deed. 1556pick_peer(Local, 1557 Remote, 1558 _Pid, 1559 SvcName, 1560 #diameter_app{alias = Alias, 1561 init_state = S, 1562 mutable = M} 1563 = App) -> 1564 Args = [Local, Remote, SvcName], 1565 1566 try state_cb(App, pick_peer, Args) of 1567 {ok, {TPid, #diameter_caps{}} = T} when is_pid(TPid) -> 1568 T; 1569 {{TPid, #diameter_caps{}} = T, ModS} when is_pid(TPid), M -> 1570 mod_state(Alias, ModS), 1571 T; 1572 {false = No, ModS} when M -> 1573 mod_state(Alias, ModS), 1574 No; 1575 {ok, false = No} -> 1576 No; 1577 false = No -> 1578 No; 1579 {{TPid, #diameter_caps{}} = T, S} when is_pid(TPid) -> 1580 T; %% Accept returned state in the immutable 1581 {false = No, S} -> %% case as long it isn't changed. 1582 No; 1583 T when M -> 1584 ModX = App#diameter_app.module, 1585 ?LOG(invalid_return, {ModX, pick_peer, T}), 1586 false 1587 catch 1588 E: Reason: Stack when M -> 1589 ModX = App#diameter_app.module, 1590 Z = diameter_lib:stacktrace(Stack), 1591 ?LOG(failure, {E, Reason, ModX, pick_peer, Z}), 1592 false 1593 end. 1594 1595%% peers/4 1596 1597%% No peer options pointing at specific peers: search for them. 1598peers(Alias, RH, [Filter], T) -> 1599 filter(Alias, RH, Filter, T, true); 1600 1601%% Or just lookup. 1602peers(_Alias, RH, [Filter | TPids], {PeerT, _AppT, _IdentT}) -> 1603 {Ts, _} = filter(caps(PeerT, TPids), RH, Filter), 1604 Ts. 1605 1606%% filter/5 1607%% 1608%% Try to limit the peers list by starting with a host/realm lookup. 1609 1610filter(Alias, RH, {neg, F}, T, B) -> 1611 filter(Alias, RH, F, T, not B); 1612 1613filter(_, _, none, _, false) -> 1614 []; 1615 1616filter(Alias, _, none, T, true) -> 1617 all_peers(Alias, T); 1618 1619filter(Alias, [DR,DH] = RH, K, T, B) 1620 when K == realm, DR == undefined; 1621 K == host, DH == undefined -> 1622 filter(Alias, RH, none, T, B); 1623 1624filter(Alias, [DR,_] = RH, realm = K, T, B) -> 1625 filter(Alias, RH, {K, DR}, T, B); 1626 1627filter(Alias, [_,DH] = RH, host = K, T, B) -> 1628 filter(Alias, RH, {K, DH}, T, B); 1629 1630filter(Alias, _, {K, D}, {PeerT, _AppT, IdentT}, true) 1631 when K == host; 1632 K == realm -> 1633 try iolist_to_binary(D) of 1634 B -> 1635 caps(PeerT, ets:select(IdentT, [{{{K, B}, '$1', '$2'}, 1636 [{'==', '$1', {const, Alias}}], 1637 ['$2']}])) 1638 catch 1639 error:_ -> 1640 [] 1641 end; 1642 1643filter(Alias, RH, {all, Filters}, T, B) 1644 when is_list(Filters) -> 1645 fltr_all(Alias, RH, Filters, T, B); 1646 1647filter(Alias, RH, {first, Filters}, T, B) 1648 when is_list(Filters) -> 1649 fltr_first(Alias, RH, Filters, T, B); 1650 1651filter(Alias, RH, Filter, T, B) -> 1652 {Ts, Fs} = filter(all_peers(Alias, T), RH, Filter), 1653 choose(B, Ts, Fs). 1654 1655%% fltr_all/5 1656 1657fltr_all(Alias, RH, [{K, any} | Filters], T, B) 1658 when K == host; 1659 K == realm -> 1660 fltr_all(Alias, RH, Filters, T, B); 1661 1662fltr_all(Alias, RH, [{host, _} = H, {realm, _} = R | Filters], T, B) -> 1663 fltr_all(Alias, RH, [R, H | Filters], T, B); 1664 1665fltr_all(Alias, RH, [{realm, _} = R, {host, any} | Filters], T, B) -> 1666 fltr_all(Alias, RH, [R | Filters], T, B); 1667 1668fltr_all(Alias, RH, [{realm, OR}, {host, OH} | Filters], T, true) -> 1669 {PeerT, _AppT, IdentT} = T, 1670 try {iolist_to_binary(OR), iolist_to_binary(OH)} of 1671 BT -> 1672 Peers = caps(PeerT, 1673 ets:select(IdentT, [{{BT, '$1', '$2'}, 1674 [{'==', '$1', {const, Alias}}], 1675 ['$2']}])), 1676 {Ts, _} = filter(Peers, RH, {all, Filters}), 1677 Ts 1678 catch 1679 error:_ -> 1680 [] 1681 end; 1682 1683fltr_all(Alias, [undefined,_] = RH, [realm | Filters], T, B) -> 1684 fltr_all(Alias, RH, Filters, T, B); 1685 1686fltr_all(Alias, [DR,_] = RH, [realm | Filters], T, B) -> 1687 fltr_all(Alias, RH, [{realm, DR} | Filters], T, B); 1688 1689fltr_all(Alias, [_,undefined] = RH, [host | Filters], T, B) -> 1690 fltr_all(Alias, RH, Filters, T, B); 1691 1692fltr_all(Alias, [_,DH] = RH, [host | Filters], T, B) -> 1693 fltr_all(Alias, RH, [{host, DH} | Filters], T, B); 1694 1695fltr_all(Alias, RH, [{K, _} = KT, KA | Filters], T, B) 1696 when K == host, KA == realm; 1697 K == realm, KA == host -> 1698 fltr_all(Alias, RH, [KA, KT | Filters], T, B); 1699 1700fltr_all(Alias, RH, [F | Filters], T, B) -> 1701 {Ts, Fs} = filter(filter(Alias, RH, F, T, B), RH, {all, Filters}), 1702 choose(B, Ts, Fs); 1703 1704fltr_all(Alias, RH, [], T, B) -> 1705 filter(Alias, RH, none, T, B). 1706 1707%% fltr_first/5 1708%% 1709%% Like any, but stop at the first filter with any matches. 1710 1711fltr_first(Alias, RH, [F | Filters], T, B) -> 1712 case filter(Alias, RH, F, T, B) of 1713 [] -> 1714 fltr_first(Alias, RH, Filters, T, B); 1715 [_|_] = Ts -> 1716 Ts 1717 end; 1718 1719fltr_first(Alias, RH, [], T, B) -> 1720 filter(Alias, RH, none, T, not B). 1721 1722%% all_peers/2 1723 1724all_peers(Alias, {PeerT, AppT, _}) -> 1725 ets:select(PeerT, [{#peer{pid = P, caps = '$1', _ = '_'}, 1726 [], 1727 [{{P, '$1'}}]} 1728 || {_,P} <- ets:lookup(AppT, Alias)]). 1729 1730%% caps/2 1731 1732caps(PeerT, Pids) -> 1733 ets:select(PeerT, [{#peer{pid = P, caps = '$1', _ = '_'}, 1734 [], 1735 [{{P, '$1'}}]} 1736 || P <- Pids]). 1737 1738%% filter/3 1739%% 1740%% Return peers in match order. 1741 1742filter(Peers, _, none) -> 1743 {Peers, []}; 1744 1745filter(Peers, RH, {neg, F}) -> 1746 {Ts, Fs} = filter(Peers, RH, F), 1747 {Fs, Ts}; 1748 1749filter(Peers, RH, {all, L}) 1750 when is_list(L) -> 1751 lists:foldl(fun(F,A) -> fltr_all(F, A, RH) end, 1752 {Peers, []}, 1753 L); 1754 1755filter(Peers, RH, {any, L}) 1756 when is_list(L) -> 1757 lists:foldl(fun(F,A) -> fltr_any(F, A, RH) end, 1758 {[], Peers}, 1759 L); 1760 1761filter(Peers, RH, {first, L}) 1762 when is_list(L) -> 1763 fltr_first(Peers, RH, L); 1764 1765filter(Peers, RH, F) -> 1766 lists:partition(fun({_,C}) -> caps_filter(C, RH, F) end, Peers). 1767 1768%% fltr_all/3 1769 1770fltr_all(F, {Ts0, Fs0}, RH) -> 1771 {Ts1, Fs1} = filter(Ts0, RH, F), 1772 {Ts1, Fs0 ++ Fs1}. 1773 1774%% fltr_any/3 1775 1776fltr_any(F, {Ts0, Fs0}, RH) -> 1777 {Ts1, Fs1} = filter(Fs0, RH, F), 1778 {Ts0 ++ Ts1, Fs1}. 1779 1780%% fltr_first/3 1781 1782fltr_first(Peers, _, []) -> 1783 {[], Peers}; 1784 1785fltr_first(Peers, RH, [F | Filters]) -> 1786 case filter(Peers, RH, F) of 1787 {[], _} -> 1788 fltr_first(Peers, RH, Filters); 1789 {_, _} = T -> 1790 T 1791 end. 1792 1793%% caps_filter/3 1794 1795caps_filter(#diameter_caps{origin_host = {_,OH}}, [_,DH], host) -> 1796 eq(undefined, DH, OH); 1797 1798caps_filter(#diameter_caps{origin_realm = {_,OR}}, [DR,_], realm) -> 1799 eq(undefined, DR, OR); 1800 1801caps_filter(C, _, Filter) -> 1802 caps_filter(C, Filter). 1803 1804%% caps_filter/2 1805 1806caps_filter(#diameter_caps{origin_host = {_,OH}}, {host, H}) -> 1807 eq(any, H, OH); 1808 1809caps_filter(#diameter_caps{origin_realm = {_,OR}}, {realm, R}) -> 1810 eq(any, R, OR); 1811 1812%% Anything else is expected to be an eval filter. Filter failure is 1813%% documented as being equivalent to a non-matching filter. 1814 1815caps_filter(C, T) -> 1816 try 1817 {eval, F} = T, 1818 diameter_lib:eval([F,C]) 1819 catch 1820 _:_ -> false 1821 end. 1822 1823eq(Any, Id, PeerId) -> 1824 Any == Id orelse try 1825 iolist_to_binary(Id) == iolist_to_binary(PeerId) 1826 catch 1827 _:_ -> false 1828 end. 1829%% OctetString() can be specified as an iolist() so test for string 1830%% rather then term equality. 1831 1832%% --------------------------------------------------------------------------- 1833%% # service_info/2 1834%% --------------------------------------------------------------------------- 1835 1836%% The config passed to diameter:start_service/2. 1837-define(CAP_INFO, ['Origin-Host', 1838 'Origin-Realm', 1839 'Vendor-Id', 1840 'Product-Name', 1841 'Origin-State-Id', 1842 'Host-IP-Address', 1843 'Supported-Vendor-Id', 1844 'Auth-Application-Id', 1845 'Inband-Security-Id', 1846 'Acct-Application-Id', 1847 'Vendor-Specific-Application-Id', 1848 'Firmware-Revision']). 1849 1850%% The config returned by diameter:service_info(SvcName, all). 1851-define(ALL_INFO, [capabilities, 1852 applications, 1853 transport, 1854 options]). 1855 1856%% The rest. 1857-define(OTHER_INFO, [connections, 1858 name, 1859 peers, 1860 statistics, 1861 info]). 1862 1863service_info(Item, S) 1864 when is_atom(Item) -> 1865 case tagged_info(Item, S) of 1866 {_, T} -> T; 1867 undefined = No -> No 1868 end; 1869 1870service_info(Items, S) -> 1871 tagged_info(Items, S). 1872 1873tagged_info(Item, S) 1874 when is_atom(Item) -> 1875 case complete(Item) of 1876 {value, I} -> 1877 {I, complete_info(I,S)}; 1878 false -> 1879 undefined 1880 end; 1881 1882tagged_info(TPid, #state{watchdogT = WatchdogT, local = {PeerT, _, _}}) 1883 when is_pid(TPid) -> 1884 try 1885 [#peer{watchdog = Pid}] = ets:lookup(PeerT, TPid), 1886 [#watchdog{ref = Ref, type = Type, options = Opts}] 1887 = ets:lookup(WatchdogT, Pid), 1888 [{ref, Ref}, 1889 {type, Type}, 1890 {options, Opts}] 1891 catch 1892 error:_ -> 1893 [] 1894 end; 1895 1896tagged_info(Items, S) 1897 when is_list(Items) -> 1898 [T || I <- Items, T <- [tagged_info(I,S)], T /= undefined, T /= []]; 1899 1900tagged_info(_, _) -> 1901 undefined. 1902 1903complete_info(Item, #state{service = Svc} = S) -> 1904 case Item of 1905 name -> 1906 S#state.service_name; 1907 'Origin-Host' -> 1908 (Svc#diameter_service.capabilities) 1909 #diameter_caps.origin_host; 1910 'Origin-Realm' -> 1911 (Svc#diameter_service.capabilities) 1912 #diameter_caps.origin_realm; 1913 'Vendor-Id' -> 1914 (Svc#diameter_service.capabilities) 1915 #diameter_caps.vendor_id; 1916 'Product-Name' -> 1917 (Svc#diameter_service.capabilities) 1918 #diameter_caps.product_name; 1919 'Origin-State-Id' -> 1920 (Svc#diameter_service.capabilities) 1921 #diameter_caps.origin_state_id; 1922 'Host-IP-Address' -> 1923 (Svc#diameter_service.capabilities) 1924 #diameter_caps.host_ip_address; 1925 'Supported-Vendor-Id' -> 1926 (Svc#diameter_service.capabilities) 1927 #diameter_caps.supported_vendor_id; 1928 'Auth-Application-Id' -> 1929 (Svc#diameter_service.capabilities) 1930 #diameter_caps.auth_application_id; 1931 'Inband-Security-Id' -> 1932 (Svc#diameter_service.capabilities) 1933 #diameter_caps.inband_security_id; 1934 'Acct-Application-Id' -> 1935 (Svc#diameter_service.capabilities) 1936 #diameter_caps.acct_application_id; 1937 'Vendor-Specific-Application-Id' -> 1938 (Svc#diameter_service.capabilities) 1939 #diameter_caps.vendor_specific_application_id; 1940 'Firmware-Revision' -> 1941 (Svc#diameter_service.capabilities) 1942 #diameter_caps.firmware_revision; 1943 capabilities -> service_info(?CAP_INFO, S); 1944 applications -> info_apps(S); 1945 transport -> info_transport(S); 1946 options -> info_options(S); 1947 keys -> ?ALL_INFO ++ ?CAP_INFO ++ ?OTHER_INFO; 1948 all -> service_info(?ALL_INFO, S); 1949 statistics -> info_stats(S); 1950 info -> info_info(S); 1951 connections -> info_connections(S); 1952 peers -> info_peers(S) 1953 end. 1954 1955complete(I) 1956 when I == keys; 1957 I == all -> 1958 {value, I}; 1959complete(Pre) -> 1960 P = atom_to_list(Pre), 1961 case [I || I <- ?ALL_INFO ++ ?CAP_INFO ++ ?OTHER_INFO, 1962 lists:prefix(P, atom_to_list(I))] 1963 of 1964 [I] -> {value, I}; 1965 _ -> false 1966 end. 1967 1968%% info_stats/1 1969 1970info_stats(#state{watchdogT = WatchdogT}) -> 1971 MatchSpec = [{#watchdog{ref = '$1', peer = '$2', _ = '_'}, 1972 [{'is_pid', '$2'}], 1973 [['$1', '$2']]}], 1974 try ets:select(WatchdogT, MatchSpec) of 1975 L -> 1976 diameter_stats:read(lists:append(L)) 1977 catch 1978 error: badarg -> [] %% service has gone down 1979 end. 1980 1981%% info_transport/1 1982%% 1983%% One entry per configured transport. Statistics for each entry are 1984%% the accumulated values for the ref and associated watchdog/peer 1985%% pids. 1986 1987info_transport(S) -> 1988 PeerD = peer_dict(S, config_dict(S)), 1989 Stats = diameter_stats:sum(dict:fetch_keys(PeerD)), 1990 dict:fold(fun(R, Ls, A) -> 1991 Cs = proplists:get_value(R, Stats, []), 1992 [[{ref, R} | transport(Ls)] ++ [{statistics, Cs}] | A] 1993 end, 1994 [], 1995 PeerD). 1996 1997%% Single config entry. Distinguish between pool_size config or not on 1998%% a connecting transport for backwards compatibility: with the option 1999%% the form is similar to the listening case, with connections grouped 2000%% in a pool tuple (for lack of a better name), without as before. 2001transport([[{type, Type}, {options, Opts}] = L]) 2002 when Type == listen; 2003 Type == connect -> 2004 L ++ [{K, []} || [{_,K}] <- [keys(Type, Opts)]]; 2005 2006%% Peer entries: discard config. Note that the peer entries have 2007%% length at least 3. 2008transport([[_,_] | L]) -> 2009 transport(L); 2010 2011%% Multiple tranports. Note that all have the same options by 2012%% construction, which is not terribly space efficient. 2013transport([[{type, Type}, {options, Opts} | _] | _] = Ls) -> 2014 transport(keys(Type, Opts), Ls). 2015 2016%% Group transports in an accept or pool tuple ... 2017transport([{Type, Key}], [[{type, _}, {options, Opts} | _] | _] = Ls) -> 2018 [{type, Type}, 2019 {options, Opts}, 2020 {Key, [tl(tl(L)) || L <- Ls]}]; 2021 2022%% ... or not: there can only be one. 2023transport([], [L]) -> 2024 L. 2025 2026keys(connect = T, Opts) -> 2027 [{T, pool} || lists:keymember(pool_size, 1, Opts)]; 2028keys(_, _) -> 2029 [{listen, accept}]. 2030 2031peer_dict(#state{watchdogT = WatchdogT, local = {PeerT, _, _}}, Dict0) -> 2032 try ets:tab2list(WatchdogT) of 2033 L -> lists:foldl(fun(T,A) -> peer_acc(PeerT, A, T) end, Dict0, L) 2034 catch 2035 error: badarg -> Dict0 %% service has gone down 2036 end. 2037 2038peer_acc(PeerT, Acc, #watchdog{pid = Pid, 2039 type = Type, 2040 ref = Ref, 2041 options = Opts, 2042 state = WS, 2043 started = At, 2044 peer = TPid}) -> 2045 Info = [{type, Type}, 2046 {options, Opts}, 2047 {watchdog, {Pid, At, WS}} 2048 | info_peer(PeerT, TPid, WS)], 2049 dict:append(Ref, Info ++ [{info, info_process_info(Info)}], Acc). 2050 2051info_peer(PeerT, TPid, WS) 2052 when is_pid(TPid), WS /= ?WD_DOWN -> 2053 try ets:lookup(PeerT, TPid) of 2054 T -> info_peer(T) 2055 catch 2056 error: badarg -> [] %% service has gone down 2057 end; 2058info_peer(_, _, _) -> 2059 []. 2060 2061info_process_info(Info) -> 2062 lists:flatmap(fun ipi/1, Info). 2063 2064ipi({watchdog, {Pid, _, _}}) -> 2065 info_pid(Pid); 2066 2067ipi({peer, {Pid, _}}) -> 2068 info_pid(Pid); 2069 2070ipi({port, [{owner, Pid} | _]}) -> 2071 info_pid(Pid); 2072 2073ipi(_) -> 2074 []. 2075 2076info_pid(Pid) -> 2077 case process_info(Pid, [message_queue_len, memory, binary]) of 2078 undefined -> 2079 []; 2080 L -> 2081 [{Pid, lists:map(fun({K,V}) -> {K, map_info(K,V)} end, L)}] 2082 end. 2083 2084%% The binary list consists of 3-tuples {Ptr, Size, Count}, where Ptr 2085%% is a C pointer value, Size is the size of a referenced binary in 2086%% bytes, and Count is a global reference count. The same Ptr can 2087%% occur multiple times, once for each reference on the process heap. 2088%% In this case, the corresponding tuples will have Size in common but 2089%% Count may differ just because no global lock is taken when the 2090%% value is retrieved. 2091%% 2092%% The list can be quite large, and we aren't often interested in the 2093%% pointers or counts, so whittle this down to the number of binaries 2094%% referenced and their total byte count. 2095map_info(binary, L) -> 2096 SzD = lists:foldl(fun({P,S,_}, D) -> dict:store(P,S,D) end, 2097 dict:new(), 2098 L), 2099 {dict:size(SzD), dict:fold(fun(_,S,N) -> S + N end, 0, SzD)}; 2100 2101map_info(_, T) -> 2102 T. 2103 2104%% The point of extracting the config here is so that 'transport' info 2105%% has one entry for each transport ref, the peer table only 2106%% containing entries that have a living watchdog. 2107 2108config_dict(#state{service_name = SvcName}) -> 2109 lists:foldl(fun config_acc/2, 2110 dict:new(), 2111 diameter_config:lookup(SvcName)). 2112 2113config_acc({Ref, T, Opts}, Dict) 2114 when T == listen; 2115 T == connect -> 2116 dict:store(Ref, [[{type, T}, {options, Opts}]], Dict); 2117config_acc(_, Dict) -> 2118 Dict. 2119 2120info_peer([#peer{pid = Pid, apps = SApps, caps = Caps, started = T}]) -> 2121 [{peer, {Pid, T}}, 2122 {apps, SApps}, 2123 {caps, info_caps(Caps)} 2124 | try [{port, info_port(Pid)}] catch _:_ -> [] end]; 2125info_peer([] = No) -> 2126 No. 2127 2128%% Extract information that the processes involved are expected to 2129%% "publish" in their process dictionaries. Simple but backhanded. 2130info_port(Pid) -> 2131 {_, PD} = process_info(Pid, dictionary), 2132 {_, T} = lists:keyfind({diameter_peer_fsm, start}, 1, PD), 2133 {TPid, {_Type, TMod, _Cfg}} = T, 2134 {_, TD} = process_info(TPid, dictionary), 2135 {_, Data} = lists:keyfind({TMod, info}, 1, TD), 2136 [{owner, TPid}, 2137 {module, TMod} 2138 | try TMod:info(Data) catch _:_ -> [] end]. 2139 2140%% Use the fields names from diameter_caps instead of 2141%% diameter_base_CER to distinguish between the 2-tuple values 2142%% compared to the single capabilities values. Note also that the 2143%% returned list is tagged 'caps' rather than 'capabilities' to 2144%% emphasize the difference. 2145info_caps(#diameter_caps{} = C) -> 2146 lists:zip(record_info(fields, diameter_caps), tl(tuple_to_list(C))). 2147 2148info_apps(#state{service = #diameter_service{applications = Apps}}) -> 2149 lists:map(fun mk_app/1, Apps). 2150 2151mk_app(#diameter_app{} = A) -> 2152 lists:zip(record_info(fields, diameter_app), tl(tuple_to_list(A))). 2153 2154%% info_info/1 2155%% 2156%% Extract process_info from connections info. 2157 2158info_info(S) -> 2159 [I || L <- conn_list(S), {info, I} <- L]. 2160 2161%% info_connections/1 2162%% 2163%% One entry per transport connection. Statistics for each entry are 2164%% for the peer pid only. 2165 2166info_connections(S) -> 2167 ConnL = conn_list(S), 2168 Stats = diameter_stats:read([P || L <- ConnL, {peer, {P,_}} <- L]), 2169 [L ++ [stats([P], Stats)] || L <- ConnL, {peer, {P,_}} <- L]. 2170 2171conn_list(S) -> 2172 lists:append(dict:fold(fun conn_acc/3, [], peer_dict(S, dict:new()))). 2173 2174conn_acc(Ref, Peers, Acc) -> 2175 [[[{ref, Ref} | L] || L <- Peers, lists:keymember(peer, 1, L)] 2176 | Acc]. 2177 2178stats(Refs, Stats) -> 2179 {statistics, dict:to_list(lists:foldl(fun(R,D) -> 2180 stats_acc(R, D, Stats) 2181 end, 2182 dict:new(), 2183 Refs))}. 2184 2185stats_acc(Ref, Dict, Stats) -> 2186 lists:foldl(fun({C,N}, D) -> dict:update_counter(C, N, D) end, 2187 Dict, 2188 proplists:get_value(Ref, Stats, [])). 2189 2190%% info_peers/1 2191%% 2192%% One entry per peer Origin-Host. Statistics for each entry are 2193%% accumulated values for all peer pids. 2194 2195info_peers(S) -> 2196 {PeerD, RefD} = lists:foldl(fun peer_acc/2, 2197 {dict:new(), dict:new()}, 2198 conn_list(S)), 2199 Refs = lists:append(dict:fold(fun(_, Rs, A) -> [Rs|A] end, 2200 [], 2201 RefD)), 2202 Stats = diameter_stats:read(Refs), 2203 dict:fold(fun(OH, Cs, A) -> 2204 Rs = dict:fetch(OH, RefD), 2205 [{OH, [{connections, Cs}, stats(Rs, Stats)]} | A] 2206 end, 2207 [], 2208 PeerD). 2209 2210peer_acc(Peer, {PeerD, RefD}) -> 2211 [{TPid, _}, [{origin_host, {_, OH}} | _]] 2212 = [proplists:get_value(K, Peer) || K <- [peer, caps]], 2213 {dict:append(OH, Peer, PeerD), dict:append(OH, TPid, RefD)}. 2214 2215%% info_options/1 2216 2217info_options(S) -> 2218 S#state.options. 2219