1%%
2%% %CopyrightBegin%
3%%
4%% Copyright Ericsson AB 2010-2018. All Rights Reserved.
5%%
6%% Licensed under the Apache License, Version 2.0 (the "License");
7%% you may not use this file except in compliance with the License.
8%% You may obtain a copy of the License at
9%%
10%%     http://www.apache.org/licenses/LICENSE-2.0
11%%
12%% Unless required by applicable law or agreed to in writing, software
13%% distributed under the License is distributed on an "AS IS" BASIS,
14%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15%% See the License for the specific language governing permissions and
16%% limitations under the License.
17%%
18%% %CopyrightEnd%
19%%
20
21%%
22%% Implements the process that represents a service.
23%%
24
25-module(diameter_service).
26-behaviour(gen_server).
27
28%% towards diameter_service_sup
29-export([start_link/1]).
30
31%% towards diameter
32-export([subscribe/1,
33         unsubscribe/1,
34         services/0,
35         peer_info/1,
36         info/2]).
37
38%% towards diameter_config
39-export([start/1,
40         stop/1,
41         start_transport/2,
42         stop_transport/2]).
43
44%% towards diameter_peer
45-export([notify/2]).
46
47%% towards diameter_traffic
48-export([find_incoming_app/4,
49         pick_peer/3]).
50
51%% test/debug
52-export([services/1,
53         subscriptions/1,
54         subscriptions/0,
55         call_module/3,
56         whois/1,
57         state/1,
58         uptime/1]).
59
60%% gen_server callbacks
61-export([init/1,
62         handle_call/3,
63         handle_cast/2,
64         handle_info/2,
65         terminate/2,
66         code_change/3]).
67
68-include_lib("diameter/include/diameter.hrl").
69-include("diameter_internal.hrl").
70
71%% RFC 3539 watchdog states.
72-define(WD_INITIAL, initial).
73-define(WD_OKAY,    okay).
74-define(WD_SUSPECT, suspect).
75-define(WD_DOWN,    down).
76-define(WD_REOPEN,  reopen).
77
78-type wd_state() :: ?WD_INITIAL
79                  | ?WD_OKAY
80                  | ?WD_SUSPECT
81                  | ?WD_DOWN
82                  | ?WD_REOPEN.
83
84-define(DEFAULT_TC,     30000).  %% RFC 3588 ch 2.1
85-define(RESTART_TC,      1000).  %% if restart was this recent
86
87%% Maintain state in a table since a service's state is accessed
88%% outside of the service process.
89-define(STATE_TABLE, ?MODULE).
90
91%% Workaround for dialyzer's lack of understanding of match specs.
92-type match(T)
93   :: T | '_' | '$1' | '$2'.
94
95%% State of service gen_server. Note that the state term itself
96%% doesn't change, which is relevant for the stateless application
97%% callbacks since the state is retrieved from ?STATE_TABLE from
98%% outside the service process. The pid in the service record is used
99%% to determine whether or not we need to call the process for a
100%% pick_peer callback in the statefull case.
101-record(state,
102        {id = diameter_lib:now(),
103         service_name :: diameter:service_name(), %% key in ?STATE_TABLE
104         service :: #diameter_service{},
105         watchdogT = ets_new(watchdogs) %% #watchdog{} at start
106                  :: ets:tid(),
107         local  :: {ets:tid(), ets:tid(), ets:tid()},
108         remote :: {ets:tid(), ets:tid(), ets:tid()},
109         monitor = false :: false | pid(),   %% process to die with
110         options :: #{sequence := diameter:sequence(),  %% sequence mask
111                      share_peers := diameter:remotes(),%% broadcast to
112                      use_shared_peers := diameter:remotes(),%% use from
113                      restrict_connections := diameter:restriction(),
114                      incoming_maxlen := diameter:message_length(),
115                      strict_arities => diameter:strict_arities(),
116                      strict_mbit := boolean(),
117                      decode_format := diameter:decode_format(),
118                      avp_dictionaries => nonempty_list(module()),
119                      traffic_counters := boolean(),
120                      string_decode := boolean(),
121                      capabilities_cb => diameter:evaluable(),
122                      pool_size => pos_integer(),
123                      capx_timeout => diameter:'Unsigned32'(),
124                      strict_capx => boolean(),
125                      disconnect_cb => diameter:evaluable(),
126                      dpr_timeout => diameter:'Unsigned32'(),
127                      dpa_timeout => diameter:'Unsigned32'(),
128                      length_errors => exit | handle | discard,
129                      connect_timer => diameter:'Unsigned32'(),
130                      watchdog_timer => diameter:'Unsigned32'()
131                                      | {module(), atom(), list()},
132                      watchdog_config => [{okay|suspect, non_neg_integer()}],
133                      spawn_opt := list() | {module(), atom(), list()}}}).
134
135%% Record representing an RFC 3539 watchdog process implemented by
136%% diameter_watchdog.
137-record(watchdog,
138        {pid  :: match(pid()) | undefined,
139         type :: match(connect | accept),
140         ref  :: match(reference()),  %% key into diameter_config
141         options :: match([diameter:transport_opt()]),%% from start_transport
142         state = ?WD_INITIAL :: match(wd_state()),
143         started = diameter_lib:now(),%% at process start
144         peer = false :: match(boolean() | pid())}).
145                      %% true at accepted/remove, pid() at okay/reopen
146
147%% Record representing a Peer State Machine processes implemented by
148%% diameter_peer_fsm.
149-record(peer,
150        {pid  :: pid(),
151         apps :: match([{0..16#FFFFFFFF, diameter:app_alias()}] %% {Id, Alias}
152                       | [diameter:app_alias()]),               %% remote
153         caps :: match(#diameter_caps{}),
154         started = diameter_lib:now(),     %% at connection_up
155         watchdog :: match(pid()           %% key into watchdogT
156                           | undefined)}). %% undefined if remote
157
158%% ---------------------------------------------------------------------------
159%% # start/1
160%% ---------------------------------------------------------------------------
161
162start(SvcName) ->
163    diameter_service_sup:start_child(SvcName).
164
165start_link(SvcName) ->
166    Options = [{spawn_opt, diameter_lib:spawn_opts(server, [])}],
167    gen_server:start_link(?MODULE, [SvcName], Options).
168%% Put the arbitrary term SvcName in a list in case we ever want to
169%% send more than this and need to distinguish old from new.
170
171%% ---------------------------------------------------------------------------
172%% # stop/1
173%% ---------------------------------------------------------------------------
174
175stop(SvcName) ->
176    case whois(SvcName) of
177        undefined ->
178            {error, not_started};
179        Pid ->
180            stop(call_service(Pid, stop), Pid)
181    end.
182
183stop(ok, Pid) ->
184    MRef = monitor(process, Pid),
185    receive {'DOWN', MRef, process, _, _} -> ok end;
186stop(No, _) ->
187    No.
188
189%% ---------------------------------------------------------------------------
190%% # start_transport/3
191%% ---------------------------------------------------------------------------
192
193start_transport(SvcName, {_Ref, _Type, _Opts} = T) ->
194    call_service_by_name(SvcName, {start, T}).
195
196%% ---------------------------------------------------------------------------
197%% # stop_transport/2
198%% ---------------------------------------------------------------------------
199
200stop_transport(_, []) ->
201    ok;
202stop_transport(SvcName, [_|_] = Refs) ->
203    call_service_by_name(SvcName, {stop, Refs}).
204
205%% ---------------------------------------------------------------------------
206%% # info/2
207%% ---------------------------------------------------------------------------
208
209info(SvcName, Item) ->
210    case lookup_state(SvcName) of
211        [S] ->
212            service_info(Item, S);
213        [] ->
214            undefined
215    end.
216
217%% lookup_state/1
218
219lookup_state(SvcName) ->
220    case ets:lookup(?STATE_TABLE, SvcName) of
221        [#state{}] = L ->
222            L;
223        _ ->
224            []
225    end.
226
227%% ---------------------------------------------------------------------------
228%% # peer_info/2
229%% ---------------------------------------------------------------------------
230
231%% An extended version of info_peer/1 for peer_info/1.
232peer_info(Pid) ->
233    try
234        {_, PD} = process_info(Pid, dictionary),
235        {_, T} = lists:keyfind({diameter_peer_fsm, start}, 1, PD),
236        {TPid, {{Type, Ref}, TMod, Cfg}} = T,
237        {_, TD} = process_info(TPid, dictionary),
238        {_, Data} = lists:keyfind({TMod, info}, 1, TD),
239        [{ref, Ref},
240         {type, Type},
241         {owner, TPid},
242         {module, TMod},
243         {config, Cfg}
244         | try TMod:info(Data) catch _:_ -> [] end]
245    catch
246        error:_ ->
247            []
248    end.
249
250%% ---------------------------------------------------------------------------
251%% # subscribe/1
252%% # unsubscribe/1
253%% ---------------------------------------------------------------------------
254
255subscribe(SvcName) ->
256    diameter_reg:add({?MODULE, subscriber, SvcName}).
257
258unsubscribe(SvcName) ->
259    diameter_reg:remove({?MODULE, subscriber, SvcName}).
260
261subscriptions(Pat) ->
262    pmap(diameter_reg:match({?MODULE, subscriber, Pat})).
263
264subscriptions() ->
265    subscriptions('_').
266
267pmap(Props) ->
268    lists:map(fun({{?MODULE, _, Name}, Pid}) -> {Name, Pid} end, Props).
269
270%% ---------------------------------------------------------------------------
271%% # services/1
272%% ---------------------------------------------------------------------------
273
274services(Pat) ->
275    pmap(diameter_reg:match({?MODULE, service, Pat})).
276
277services() ->
278    services('_').
279
280whois(SvcName) ->
281    case diameter_reg:match({?MODULE, service, SvcName}) of
282        [{_, Pid}] ->
283            Pid;
284        [] ->
285            undefined
286    end.
287
288%% ---------------------------------------------------------------------------
289%% # pick_peer/3
290%% ---------------------------------------------------------------------------
291
292-spec pick_peer(SvcName, AppOrAlias, Opts)
293   -> {{{TPid, Caps}, App}, SvcOpts}
294    | false  %% no selection
295    | {error, no_service}
296 when SvcName :: diameter:service_name(),
297      AppOrAlias :: #diameter_app{}
298                  | {alias, diameter:app_alias()},
299      Opts :: {fun((Dict :: module()) -> [term()]),
300               diameter:peer_filter(),
301               Xtra :: list(),
302               [diameter:peer_ref()]},
303      TPid :: pid(),
304      Caps :: #diameter_caps{},
305      App  :: #diameter_app{},
306      SvcOpts :: map().
307
308pick_peer(SvcName, App, Opts) ->
309    pick(lookup_state(SvcName), App, Opts).
310
311pick([], _, _) ->
312    {error, no_service};
313
314pick([S], App, Opts) ->
315    pick(S, App, Opts);
316
317pick(#state{service = #diameter_service{applications = Apps}}
318     = S,
319     {alias, Alias},
320     Opts) ->  %% initial call from diameter:call/4
321    pick(S, find_outgoing_app(Alias, Apps), Opts);
322
323pick(_, false = No, _) ->
324    No;
325
326pick(#state{options = SvcOpts}
327     = S,
328     #diameter_app{module = ModX, dictionary = Dict}
329     = App0,
330     {DestF, Filter, Xtra, TPids}) ->
331    App = App0#diameter_app{module = ModX ++ Xtra},
332    [_,_] = RealmAndHost = diameter_lib:eval([DestF, Dict]),
333    case pick_peer(App, RealmAndHost, [Filter | TPids], S) of
334        {_TPid, _Caps} = TC ->
335            {{TC, App}, SvcOpts};
336        false = No ->
337            No
338    end.
339
340%% ---------------------------------------------------------------------------
341%% # find_incoming_app/4
342%% ---------------------------------------------------------------------------
343
344-spec find_incoming_app(PeerT, TPid, Id, Apps)
345   -> {#diameter_app{}, #diameter_caps{}}  %% connection and suitable app
346    | #diameter_caps{}                     %% connection but no suitable app
347    | false                                %% no connection
348 when PeerT :: ets:tid(),
349      TPid  :: pid(),
350      Id    :: non_neg_integer(),
351      Apps  :: [#diameter_app{}].
352
353find_incoming_app(PeerT, TPid, Id, Apps) ->
354    try ets:lookup(PeerT, TPid) of
355        [#peer{} = P] ->
356            find_incoming_app(P, Id, Apps);
357        [] ->             %% transport has gone down
358            false
359    catch
360        error: badarg ->  %% service has gone down (and taken table with it)
361            false
362    end.
363
364%% ---------------------------------------------------------------------------
365%% # notify/2
366%% ---------------------------------------------------------------------------
367
368notify(SvcName, Msg) ->
369    Pid = whois(SvcName),
370    is_pid(Pid) andalso (Pid ! Msg).
371
372%% ===========================================================================
373%% ===========================================================================
374
375state(Svc) ->
376    call_service(Svc, state).
377
378uptime(Svc) ->
379    call_service(Svc, uptime).
380
381%% call_module/3
382
383call_module(Service, AppMod, Request) ->
384    call_service(Service, {call_module, AppMod, Request}).
385
386%% ---------------------------------------------------------------------------
387%% # init/1
388%% ---------------------------------------------------------------------------
389
390init([SvcName]) ->
391    process_flag(trap_exit, true),  %% ensure terminate(shutdown, _)
392    i(SvcName, diameter_reg:add_new({?MODULE, service, SvcName})).
393
394i(SvcName, true) ->
395    {ok, i(SvcName)};
396i(_, false) ->
397    {stop, {shutdown, already_started}}.
398
399%% ---------------------------------------------------------------------------
400%% # handle_call/3
401%% ---------------------------------------------------------------------------
402
403handle_call(state, _, S) ->
404    {reply, S, S};
405
406handle_call(uptime, _, #state{id = T} = S) ->
407    {reply, diameter_lib:now_diff(T), S};
408
409%% Start a transport.
410handle_call({start, {Ref, Type, Opts}}, _From, S) ->
411    {reply, start(Ref, {Type, Opts}, S), S};
412
413%% Stop transports.
414handle_call({stop, Refs}, _From, S) ->
415    shutdown(Refs, S),
416    {reply, ok, S};
417
418%% pick_peer with mutable state
419handle_call({pick_peer, Local, Remote, App}, _From, S) ->
420    #diameter_app{mutable = true} = App,  %% assert
421    {reply, pick_peer(Local, Remote, self(), S#state.service_name, App), S};
422
423handle_call({call_module, AppMod, Req}, From, S) ->
424    call_module(AppMod, Req, From, S);
425
426handle_call(stop, _From, S) ->
427    shutdown(service, S),
428    {stop, normal, ok, S};
429%% The server currently isn't guaranteed to be dead when the caller
430%% gets the reply. We deal with this in the call to the server,
431%% stating a monitor that waits for DOWN before returning.
432
433handle_call(Req, From, S) ->
434    unexpected(handle_call, [Req, From], S),
435    {reply, nok, S}.
436
437%% ---------------------------------------------------------------------------
438%% # handle_cast/2
439%% ---------------------------------------------------------------------------
440
441handle_cast(Req, S) ->
442    unexpected(handle_cast, [Req], S),
443    {noreply, S}.
444
445%% ---------------------------------------------------------------------------
446%% # handle_info/2
447%% ---------------------------------------------------------------------------
448
449handle_info(T, #state{} = S) ->
450    case transition(T,S) of
451        ok ->
452            {noreply, S};
453        {stop, Reason} ->
454            {stop, {shutdown, Reason}, S}
455    end.
456
457%% transition/2
458
459%% Peer process is telling us to start a new accept process.
460transition({accepted, Pid, TPid}, S) ->
461    accepted(Pid, TPid, S),
462    ok;
463
464%% Connecting transport is being restarted by watchdog.
465transition({reconnect, Pid}, S) ->
466    reconnect(Pid, S),
467    ok;
468
469%% Watchdog is sending notification of transport death.
470transition({close, Pid, Reason}, #state{service_name = SvcName,
471                                        watchdogT = WatchdogT}) ->
472    #watchdog{state = WS,
473              ref = Ref,
474              type = Type,
475              options = Opts}
476        = fetch(WatchdogT, Pid),
477    WS /= ?WD_OKAY
478        andalso
479        send_event(SvcName, {closed, Ref, Reason, {type(Type), Opts}}),
480    ok;
481
482%% Watchdog is sending notification of a state transition.
483transition({watchdog, Pid, {[TPid | Data], From, To}},
484           #state{service_name = SvcName,
485                  watchdogT = WatchdogT}
486           = S) ->
487    #watchdog{ref = Ref, type = T, options = Opts}
488        = Wd
489        = fetch(WatchdogT, Pid),
490    watchdog(TPid, Data, From, To, Wd, S),
491    send_event(SvcName, {watchdog, Ref, TPid, {From, To}, {T, Opts}}),
492    ok;
493%% Death of a watchdog process (#watchdog.pid) results in the removal of
494%% it's peer and any associated conn record when 'DOWN' is received.
495%% Death of a peer process process (#peer.pid, #watchdog.peer) results in
496%% ?WD_DOWN.
497
498%% Monitor process has died. Just die with a reason that tells
499%% diameter_config about the happening. If a cleaner shutdown is
500%% required then someone should stop us.
501transition({'DOWN', MRef, process, _, Reason}, #state{monitor = MRef}) ->
502    {stop, {monitor, Reason}};
503
504%% Local watchdog process has died.
505transition({'DOWN', _, process, Pid, _Reason}, S)
506  when node(Pid) == node() ->
507    watchdog_down(Pid, S),
508    ok;
509
510%% Remote service wants to know about shared peers.
511transition({service, Pid}, S) ->
512    share_peers(Pid, S),
513    ok;
514
515%% Remote service is communicating a shared peer.
516transition({peer, TPid, Aliases, Caps}, S) ->
517    remote_peer_up(TPid, Aliases, Caps, S),
518    ok;
519transition({peer, TPid}, S) ->
520    remote_peer_down(TPid, S),
521    ok;
522
523%% Remote peer process has died.
524transition({'DOWN', _, process, TPid, _}, S) ->
525    remote_peer_down(TPid, S),
526    ok;
527
528%% Restart after tc expiry.
529transition({tc_timeout, T}, S) ->
530    tc_timeout(T, S),
531    ok;
532
533transition({nodeup, Node, _}, S) ->
534    nodeup(Node, S),
535    ok;
536
537transition({nodedown, _Node, _}, _) ->
538    ok;
539
540transition(Req, S) ->
541    unexpected(handle_info, [Req], S),
542    ok.
543
544%% ---------------------------------------------------------------------------
545%% # terminate/2
546%% ---------------------------------------------------------------------------
547
548terminate(Reason, #state{service_name = Name, local = {PeerT, _, _}} = S) ->
549    send_event(Name, stop),
550    ets:delete(?STATE_TABLE, Name),
551
552    %% Communicate pending loss of any peers that connection_down/3
553    %% won't. This is needed when stopping a service since we don't
554    %% wait for watchdog state changes to take care of if. That this
555    %% takes place after deleting the state entry ensures that the
556    %% resulting failover by request processes accomplishes nothing.
557    ets:foldl(fun peer_down/2, ok, PeerT),
558
559    shutdown == Reason  %% application shutdown
560        andalso shutdown(application, S).
561
562%% peer_down/1
563%%
564%% Entries with watchdog state SUSPECT are already down: ignore the
565%% expected failure. This assumes the current implementation, but
566%% double the number of lookups (in the typical case) could be the
567%% greater evil if there are many peer connections.
568
569peer_down(#peer{pid = TPid}, _) ->
570    try
571        diameter_traffic:peer_down(TPid)
572    catch
573        error: {badmatch, []} -> ok
574    end.
575
576%% ---------------------------------------------------------------------------
577%% # code_change/3
578%% ---------------------------------------------------------------------------
579
580code_change(_FromVsn, S, _Extra) ->
581    {ok, S}.
582
583%% ===========================================================================
584%% ===========================================================================
585
586unexpected(F, A, #state{service_name = Name}) ->
587    ?UNEXPECTED(F, A ++ [Name]).
588
589eval([M|X], F, A) ->
590    apply(M, F, A ++ X).
591
592%% Callback with state.
593
594state_cb(#diameter_app{module = ModX, mutable = false, init_state = S},
595         pick_peer = F,
596         A) ->
597    eval(ModX, F, A ++ [S]);
598
599state_cb(#diameter_app{module = ModX, alias = Alias}, F, A) ->
600    eval(ModX, F, A ++ [mod_state(Alias)]).
601
602choose(true, X, _)  -> X;
603choose(false, _, X) -> X.
604
605ets_new(Tbl) ->
606    ets:new(Tbl, [{keypos, 2}]).
607
608%% Using the process dictionary for the callback state was initially
609%% just a way to make what was horrendous trace (big state record and
610%% much else everywhere) somewhat more readable. There's not as much
611%% need for it now but it's no worse (except possibly that we don't
612%% see the table identifier being passed around) than an ets table so
613%% keep it.
614
615mod_state(Alias) ->
616    get({?MODULE, mod_state, Alias}).
617
618mod_state(Alias, ModS) ->
619    put({?MODULE, mod_state, Alias}, ModS).
620
621%% ---------------------------------------------------------------------------
622%% # shutdown/2
623%% ---------------------------------------------------------------------------
624
625%% remove_transport
626shutdown(Refs, #state{watchdogT = WatchdogT})
627  when is_list(Refs) ->
628    ets:insert(WatchdogT, ets:foldl(fun(R,A) -> st(R, Refs, A) end,
629                                    [],
630                                    WatchdogT));
631
632%% application/service shutdown
633shutdown(Reason, #state{watchdogT = WatchdogT})
634  when Reason == application;
635       Reason == service ->
636    diameter_lib:wait(ets:foldl(fun(P,A) -> ss(P, Reason, A) end,
637                                [],
638                                WatchdogT)).
639
640%% st/3
641
642%% Mark replacement as started so that a subsequent accept doesn't
643%% result in a new process that isn't terminated.
644st(#watchdog{ref = Ref, pid = Pid, peer = P} = Rec, Refs, Acc) ->
645    case lists:member(Ref, Refs) of
646        true ->
647            Pid ! {shutdown, self(), transport},  %% 'DOWN' cleans up
648            [Rec#watchdog{peer = true} || P == false] ++ Acc;
649        false ->
650            Acc
651    end.
652
653%% ss/3
654
655ss(#watchdog{pid = Pid}, Reason, Acc) ->
656    MRef = monitor(process, Pid),
657    Pid ! {shutdown, self(), Reason},
658    [MRef | Acc].
659
660%% ---------------------------------------------------------------------------
661%% # call_service/2
662%% ---------------------------------------------------------------------------
663
664call_service(Pid, Req)
665  when is_pid(Pid) ->
666    cs(Pid, Req);
667call_service(SvcName, Req) ->
668    call_service_by_name(SvcName, Req).
669
670call_service_by_name(SvcName, Req) ->
671    cs(whois(SvcName), Req).
672
673cs(Pid, Req)
674  when is_pid(Pid) ->
675    try
676        gen_server:call(Pid, Req, infinity)
677    catch
678        E: Reason when E == exit ->
679            {error, {E, Reason}}
680    end;
681
682cs(undefined, _) ->
683    {error, no_service}.
684
685%% ---------------------------------------------------------------------------
686%% # i/1
687%% ---------------------------------------------------------------------------
688
689%% Intialize the state of a service gen_server.
690
691i(SvcName) ->
692    %% Split the config into a server state and a list of transports.
693    {#state{} = S, CL} = lists:foldl(fun cfg_acc/2,
694                                     {false, []},
695                                     diameter_config:lookup(SvcName)),
696
697    %% Publish the state in order to be able to access it outside of
698    %% the service process. Originally table identifiers were only
699    %% known to the service process but we now want to provide the
700    %% option of application callbacks being 'stateless' in order to
701    %% avoid having to go through a common process. (Eg. An agent that
702    %% sends a request for every incoming request.)
703    true = ets:insert_new(?STATE_TABLE, S),
704
705    %% Start fsms for each transport.
706    send_event(SvcName, start),
707    lists:foreach(fun(T) -> start_fsm(T,S) end, CL),
708
709    init_shared(S),
710    S.
711
712cfg_acc({SvcName, #diameter_service{applications = Apps} = Rec, Opts},
713        {false, Acc}) ->
714    lists:foreach(fun init_mod/1, Apps),
715    #{monitor := M}
716        = SvcOpts
717        = service_opts(Opts),
718    S = #state{service_name = SvcName,
719               service = Rec#diameter_service{pid = self()},
720               local   = init_peers(),
721               remote  = init_peers(),
722               monitor = mref(M),
723               options = maps:remove(monitor, SvcOpts)},
724    {S, Acc};
725
726cfg_acc({_Ref, Type, _Opts} = T, {S, Acc})
727  when Type == connect;
728       Type == listen ->
729    {S, [T | Acc]}.
730
731init_peers() ->
732    {ets_new(caps),            %% #peer{}
733     ets:new(apps, [bag]),     %% {Alias, TPid}
734     ets:new(idents, [bag])}.  %% {{host, OH} | {realm, OR} | {OR, OH},
735                               %%  Alias,
736                               %%  TPid}
737
738service_opts(Opts) ->
739    remove([{strict_arities, true},
740            {avp_dictionaries, []}],
741           maps:merge(maps:from_list([{monitor, false} | def_opts()]),
742                      maps:from_list(Opts))).
743
744remove(List, Map) ->
745    maps:filter(fun(K,V) -> not lists:member({K,V}, List) end,
746                Map).
747
748def_opts() ->  %% defaults on the service map
749    [{share_peers, false},
750     {use_shared_peers, false},
751     {sequence, {0,32}},
752     {restrict_connections, nodes},
753     {incoming_maxlen, 16#FFFFFF},
754     {strict_arities, true},
755     {strict_mbit, true},
756     {decode_format, record},
757     {avp_dictionaries, []},
758     {traffic_counters, true},
759     {string_decode, true},
760     {spawn_opt, []}].
761
762mref(false = No) ->
763    No;
764mref(P) ->
765    monitor(process, P).
766
767init_shared(#state{options = #{use_shared_peers := T},
768                   service_name = Svc}) ->
769    T == false orelse net_kernel:monitor_nodes(true, [{node_type, visible},
770                                                      nodedown_reason]),
771    notify(T, Svc, {service, self()}).
772
773init_mod(#diameter_app{alias = Alias,
774                       init_state = S}) ->
775    mod_state(Alias, S).
776
777start_fsm({Ref, Type, Opts}, S) ->
778    start(Ref, {Type, Opts}, S).
779
780notify(Share, SvcName, T) ->
781    Nodes = remotes(Share),
782    [] /= Nodes andalso diameter_peer:notify(Nodes, SvcName, T).
783%% Test for the empty list for upgrade reasons: there's no
784%% diameter_peer:notify/3 in old code.
785
786nodeup(Node, #state{options = #{share_peers := SP},
787                    service_name = SvcName}) ->
788    lists:member(Node, remotes(SP))
789        andalso diameter_peer:notify([Node], SvcName, {service, self()}).
790
791remotes(false) ->
792    [];
793
794remotes(true) ->
795    nodes();
796
797remotes(Nodes)
798  when is_atom(hd(Nodes));
799       Nodes == [] ->
800    Nodes;
801
802remotes(F) ->
803    try diameter_lib:eval(F) of
804        L when is_list(L) ->
805            L;
806        T ->
807            ?LOG(invalid_return, {F,T}),
808            error_report(invalid_return, share_peers, F),
809            []
810    catch
811        E:R:S ->
812            ?LOG(failure, {E, R, F, diameter_lib:stacktrace(S)}),
813            error_report(failure, share_peers, F),
814            []
815    end.
816
817%% error_report/3
818
819error_report(T, What, F) ->
820    Reason = io_lib:format("~s from ~p callback", [reason(T), What]),
821    diameter_lib:error_report(Reason, diameter_lib:eval_name(F)).
822
823reason(invalid_return) ->
824    "invalid return";
825reason(failure) ->
826    "failure".
827
828%% ---------------------------------------------------------------------------
829%% # start/3
830%% ---------------------------------------------------------------------------
831
832%% If the initial start/3 at service/transport start succeeds then
833%% subsequent calls to start/4 on the same service will also succeed
834%% since they involve the same call to merge_service/2. We merge here
835%% rather than earlier since the service may not yet be configured
836%% when the transport is configured.
837
838start(Ref, {T, Opts}, S)
839  when T == connect;
840       T == listen ->
841    N = proplists:get_value(pool_size, Opts, 1),
842    try
843        {ok, start(Ref, type(T), Opts, N, S)}
844    catch
845        ?FAILURE(Reason) ->
846            {error, Reason}
847    end.
848%% TODO: don't actually raise any errors yet
849
850%% There used to be a difference here between the handling of
851%% configured listening and connecting transports but now we simply
852%% tell the transport_module to start an accepting or connecting
853%% process respectively, the transport implementation initiating
854%% listening on a port as required.
855type(listen)      -> accept;
856type(accept)      -> listen;
857type(connect = T) -> T.
858
859%% start/4
860
861start(Ref, Type, Opts, State) ->
862    start(Ref, Type, Opts, 1, State).
863
864%% start/5
865
866start(Ref, Type, Opts, N, #state{watchdogT = WatchdogT,
867                                 local = {PeerT, _, _},
868                                 options = #{string_decode := SD}
869                                 = SvcOpts,
870                                 service_name = SvcName,
871                                 service = Svc0})
872  when Type == connect;
873       Type == accept ->
874    #diameter_service{applications = Apps}
875        = Svc1
876        = merge_service(Opts, Svc0),
877    Svc = binary_caps(Svc1, SD),
878    {SOpts, TOpts} = merge_opts(SvcOpts, Opts),
879    RecvData = diameter_traffic:make_recvdata([SvcName, PeerT, Apps, SOpts]),
880    T = {TOpts, SOpts, RecvData, Svc},
881    Rec = #watchdog{type = Type,
882                    ref = Ref,
883                    options = TOpts},
884
885    diameter_lib:fold_n(fun(_,A) ->
886                                [wd(Type, Ref, T, WatchdogT, Rec) | A]
887                        end,
888                        [],
889                        N).
890
891merge_opts(SvcOpts, Opts) ->
892    Keys = [K || {K,_} <- def_opts()],
893    SO = [T || {K,_} = T <- Opts, lists:member(K, Keys)],
894    TO = Opts -- SO,
895    {maps:merge(maps:with(Keys, SvcOpts), maps:from_list(SO)),
896     TO ++ [T || {K,_} = T <- maps:to_list(SvcOpts),
897                 not lists:member(K, Keys),
898                 not lists:keymember(K, 1, Opts)]}.
899
900binary_caps(Svc, true) ->
901    Svc;
902binary_caps(#diameter_service{capabilities = Caps} = Svc, false) ->
903    Svc#diameter_service{capabilities = diameter_capx:binary_caps(Caps)}.
904
905wd(Type, Ref, T, WatchdogT, Rec) ->
906    Pid = start_watchdog(Type, Ref, T),
907    ets:insert(WatchdogT, Rec#watchdog{pid = Pid}),
908    Pid.
909
910%% Note that the service record passed into the watchdog is the merged
911%% record so that each watchdog may get a different record. This
912%% record is what is passed back into application callbacks.
913
914start_watchdog(Type, Ref, T) ->
915    {_MRef, Pid} = diameter_watchdog:start({Type, Ref}, T),
916    Pid.
917
918%% merge_service/2
919
920merge_service(Opts, Svc) ->
921    lists:foldl(fun ms/2, Svc, Opts).
922
923%% Limit the applications known to the fsm to those in the 'apps'
924%% option. That this might be empty is checked by the fsm. It's not
925%% checked at config-time since there's no requirement that the
926%% service be configured first. (Which could be considered a bit odd.)
927ms({applications, As}, #diameter_service{applications = Apps} = S)
928  when is_list(As) ->
929    S#diameter_service{applications
930                       = [A || A <- Apps,
931                               lists:member(A#diameter_app.alias, As)]};
932
933%% The fact that all capabilities can be configured on the transports
934%% means that the service doesn't necessarily represent a single
935%% locally implemented Diameter node as identified by Origin-Host: a
936%% transport can configure its own Origin-Host. This means that the
937%% service little more than a placeholder for default capabilities
938%% plus a list of applications that individual transports can choose
939%% to support (or not).
940ms({capabilities, Opts}, #diameter_service{capabilities = Caps0} = Svc)
941  when is_list(Opts) ->
942    %% make_caps has already succeeded in diameter_config so it will succeed
943    %% again here.
944    {ok, Caps} = diameter_capx:make_caps(Caps0, Opts),
945    Svc#diameter_service{capabilities = Caps};
946
947ms(_, Svc) ->
948    Svc.
949
950%% ---------------------------------------------------------------------------
951%% # accepted/3
952%% ---------------------------------------------------------------------------
953
954accepted(Pid, _TPid, #state{watchdogT = WatchdogT} = S) ->
955    #watchdog{type = accept = T, peer = P}
956        = Wd
957        = fetch(WatchdogT, Pid),
958    if not P ->
959            #watchdog{ref = Ref, options = Opts} = Wd,
960            %% Mark replacement started, and start new watchdog.
961            ets:insert(WatchdogT, Wd#watchdog{peer = true}),
962            start(Ref, T, Opts, S);
963       P ->
964            %% Transport removal in progress: true has been set in
965            %% shutdown/2, and the transport will die as a
966            %% consequence.
967            ok
968    end.
969
970%% fetch/2
971
972fetch(Tid, Key) ->
973    [T] = ets:lookup(Tid, Key),
974    T.
975
976%% ---------------------------------------------------------------------------
977%% # watchdog/6
978%%
979%% React to a watchdog state transition.
980%% ---------------------------------------------------------------------------
981
982%% Watchdog has a new open connection.
983watchdog(TPid, [T], _, ?WD_OKAY, Wd, State) ->
984    connection_up({TPid, T}, Wd, State);
985
986%% Watchdog has a new connection that will be opened after DW[RA]
987%% exchange.
988watchdog(TPid, [T], _, ?WD_REOPEN, Wd, State) ->
989    reopen({TPid, T}, Wd, State);
990
991%% Watchdog has recovered a suspect connection.
992watchdog(TPid, [], ?WD_SUSPECT, ?WD_OKAY, Wd, State) ->
993    #watchdog{peer = TPid} = Wd,  %% assert
994    connection_up(Wd, State);
995
996%% Watchdog has an unresponsive connection. Note that the peer table
997%% entry isn't removed until DOWN.
998watchdog(TPid, [], ?WD_OKAY, ?WD_SUSPECT = To, Wd, State) ->
999    #watchdog{peer = TPid} = Wd,  %% assert
1000    watchdog_down(Wd, To, State);
1001
1002%% Watchdog has lost its connection.
1003watchdog(TPid, [], _, ?WD_DOWN = To, Wd, #state{local = {PeerT, _, _}} = S) ->
1004    close(Wd),
1005    watchdog_down(Wd, To, S),
1006    ets:delete(PeerT, TPid);
1007
1008watchdog(_, [], _, _, _, _) ->
1009    ok.
1010
1011watchdog_down(Wd, To, #state{watchdogT = WatchdogT} = S) ->
1012    ets:insert(WatchdogT, Wd#watchdog{state = To}),
1013    connection_down(Wd, To, S).
1014
1015%% ---------------------------------------------------------------------------
1016%% # connection_up/3
1017%% ---------------------------------------------------------------------------
1018
1019%% Watchdog process has reached state OKAY.
1020
1021connection_up({TPid, {Caps, SupportedApps, Pkt}},
1022              #watchdog{pid = Pid}
1023              = Wd,
1024              #state{local = {PeerT, _, _}}
1025              = S) ->
1026    Rec = #peer{pid = TPid,
1027                apps = SupportedApps,
1028                caps = Caps,
1029                watchdog = Pid},
1030    ets:insert(PeerT, Rec),
1031    connection_up([Pkt], Wd#watchdog{peer = TPid}, Rec, S).
1032
1033%% ---------------------------------------------------------------------------
1034%% # reopen/3
1035%% ---------------------------------------------------------------------------
1036
1037reopen({TPid, {Caps, SupportedApps, _Pkt}},
1038       #watchdog{pid = Pid}
1039       = Wd,
1040       #state{watchdogT = WatchdogT,
1041              local = {PeerT, _, _}}) ->
1042    ets:insert(PeerT, #peer{pid = TPid,
1043                            apps = SupportedApps,
1044                            caps = Caps,
1045                            watchdog = Pid}),
1046    ets:insert(WatchdogT, Wd#watchdog{state = ?WD_REOPEN,
1047                                      peer = TPid}).
1048
1049%% ---------------------------------------------------------------------------
1050%% # connection_up/2
1051%% ---------------------------------------------------------------------------
1052
1053%% Watchdog has recovered a suspect connection. Note that there has
1054%% been no new capabilties exchange in this case.
1055
1056connection_up(#watchdog{peer = TPid} = Wd, #state{local = {PeerT, _, _}}
1057                                           = S) ->
1058    connection_up([], Wd, fetch(PeerT, TPid), S).
1059
1060%% connection_up/4
1061
1062connection_up(Extra,
1063              #watchdog{peer = TPid}
1064              = Wd,
1065              #peer{apps = SApps, caps = Caps}
1066              = Pr,
1067              #state{watchdogT = WatchdogT,
1068                     local = LT,
1069                     service_name = SvcName,
1070                     service = #diameter_service{applications = Apps}}
1071              = S) ->
1072    ets:insert(WatchdogT, Wd#watchdog{state = ?WD_OKAY}),
1073    diameter_traffic:peer_up(TPid),
1074    local_peer_up(SApps, {TPid, Caps}, {SvcName, Apps}, LT),
1075    report_status(up, Wd, Pr, S, Extra).
1076
1077local_peer_up(SApps, {TPid, Caps} = TC, SA, LT) ->
1078    insert_peer(TPid, [A || {_,A} <- SApps], Caps, LT),
1079    lists:foreach(fun(A) -> peer_up(A, TC, SA) end, SApps).
1080
1081peer_up({Id, Alias}, TC, SA) ->
1082    peer_up(Id, Alias, TC, SA).
1083
1084peer_up(Id, Alias, {TPid, _} = TC, {SvcName, Apps}) ->
1085    #diameter_app{id = Id}  %% assert
1086        = App
1087        = find_app(Alias, Apps),
1088
1089    peer_cb(App, peer_up, [SvcName, TC])
1090        orelse exit(TPid, kill).  %% fake transport failure
1091
1092%% ---------------------------------------------------------------------------
1093%% # find_incoming_app/3
1094%% ---------------------------------------------------------------------------
1095
1096%% No one should be sending the relay identifier.
1097find_incoming_app(#peer{caps = Caps}, ?APP_ID_RELAY, _) ->
1098    Caps;
1099
1100find_incoming_app(Peer, Id, Apps)
1101  when is_integer(Id) ->
1102    find_incoming_app(Peer, [Id, ?APP_ID_RELAY], Apps);
1103
1104%% Note that the apps represented in SApps may be a strict subset of
1105%% those in Apps.
1106find_incoming_app(#peer{apps = SApps, caps = Caps}, Ids, Apps) ->
1107    case keyfind(Ids, 1, SApps) of
1108        {_Id, Alias} ->
1109            {#diameter_app{} = find_app(Alias, Apps), Caps};
1110        false ->
1111            Caps
1112    end.
1113
1114%% keyfind/3
1115
1116keyfind([], _, _) ->
1117    false;
1118keyfind([Key | Rest], Pos, L) ->
1119    case lists:keyfind(Key, Pos, L) of
1120        false ->
1121            keyfind(Rest, Pos, L);
1122        T ->
1123            T
1124    end.
1125
1126%% find_outgoing_app/2
1127
1128find_outgoing_app(Alias, Apps) ->
1129    case find_app(Alias, Apps) of
1130        #diameter_app{id = ?APP_ID_RELAY} ->
1131            false;
1132        A ->
1133            A
1134    end.
1135
1136%% find_app/2
1137
1138find_app(Alias, Apps) ->
1139    lists:keyfind(Alias, #diameter_app.alias, Apps).
1140
1141%% Don't bring down the service (and all associated connections)
1142%% regardless of what happens.
1143peer_cb(App, F, A) ->
1144    try state_cb(App, F, A) of
1145        ModS ->
1146            mod_state(App#diameter_app.alias, ModS),
1147            true
1148    catch
1149        E:R:S ->
1150            %% Don't include arguments since a #diameter_caps{} strings
1151            %% from the peer, which could be anything (especially, large).
1152            [Mod|X] = App#diameter_app.module,
1153            ?LOG(failure, {E, R, Mod, F, diameter_lib:stacktrace(S)}),
1154            error_report(failure, F, {Mod, F, A ++ X}),
1155            false
1156    end.
1157
1158%% ---------------------------------------------------------------------------
1159%% # connection_down/3
1160%% ---------------------------------------------------------------------------
1161
1162connection_down(#watchdog{state = ?WD_OKAY,
1163                          peer = TPid}
1164                = Wd,
1165                #peer{caps = Caps,
1166                      apps = SApps}
1167                = Pr,
1168                #state{service_name = SvcName,
1169                       service = #diameter_service{applications = Apps},
1170                       local = LT}
1171                = S) ->
1172    report_status(down, Wd, Pr, S, []),
1173    local_peer_down(SApps, {TPid, Caps}, {SvcName, Apps}, LT),
1174    diameter_traffic:peer_down(TPid);
1175
1176connection_down(#watchdog{state = ?WD_OKAY,
1177                          peer = TPid}
1178                = Wd,
1179                To,
1180                #state{local = {PeerT, _, _}}
1181                = S)
1182  when is_atom(To) ->
1183    connection_down(Wd, #peer{} = fetch(PeerT, TPid), S);
1184
1185connection_down(#watchdog{}, _, _) ->
1186    ok.
1187
1188local_peer_down(SApps, {TPid, _Caps} = TC, SA, LT) ->
1189    delete_peer(TPid, LT),
1190    lists:foreach(fun(A) -> peer_down(A, TC, SA) end, SApps).
1191
1192peer_down({Id, Alias}, TC, SA) ->
1193    peer_down(Id, Alias, TC, SA).
1194
1195peer_down(Id, Alias, TC, {SvcName, Apps}) ->
1196    #diameter_app{id = Id}  %% assert
1197        = App
1198        = find_app(Alias, Apps),
1199
1200    peer_cb(App, peer_down, [SvcName, TC]).
1201
1202%% ---------------------------------------------------------------------------
1203%% # watchdog_down/2
1204%% ---------------------------------------------------------------------------
1205
1206%% Watchdog process has died.
1207
1208watchdog_down(Pid, #state{watchdogT = WatchdogT} = S) ->
1209    Wd = fetch(WatchdogT, Pid),
1210    ets:delete_object(WatchdogT, Wd),
1211    restart(Wd,S),
1212    wd_down(Wd,S).
1213
1214%% Watchdog has never reached OKAY ...
1215wd_down(#watchdog{peer = B}, _)
1216  when is_boolean(B) ->
1217    ok;
1218
1219%% ... or maybe it has.
1220wd_down(#watchdog{peer = TPid} = Wd, #state{local = {PeerT, _, _}} = S) ->
1221    connection_down(Wd, ?WD_DOWN, S),
1222    ets:delete(PeerT, TPid).
1223
1224%% restart/2
1225
1226restart(Wd, S) ->
1227    q_restart(restart(Wd), S).
1228
1229%% restart/1
1230
1231%% Always try to reconnect.
1232restart(#watchdog{ref = Ref,
1233                  type = connect = T,
1234                  options = Opts,
1235                  started = Time}) ->
1236    {Time, {Ref, T, Opts}};
1237
1238%% Transport connection hasn't yet been accepted ...
1239restart(#watchdog{ref = Ref,
1240                  type = accept = T,
1241                  options = Opts,
1242                  peer = false,
1243                  started = Time}) ->
1244    {Time, {Ref, T, Opts}};
1245
1246%% ... or it has: a replacement has already been spawned.
1247restart(#watchdog{type = accept}) ->
1248    false.
1249
1250%% q_restart/2
1251
1252%% Start the reconnect timer.
1253q_restart({Time, {_Ref, Type, Opts} = T}, S) ->
1254    start_tc(tc(Time, default_tc(Type, Opts)), T, S);
1255q_restart(false, _) ->
1256    ok.
1257
1258%% RFC 3588, 2.1:
1259%%
1260%%   When no transport connection exists with a peer, an attempt to
1261%%   connect SHOULD be periodically made.  This behavior is handled via
1262%%   the Tc timer, whose recommended value is 30 seconds.  There are
1263%%   certain exceptions to this rule, such as when a peer has terminated
1264%%   the transport connection stating that it does not wish to
1265%%   communicate.
1266
1267default_tc(connect, Opts) ->
1268    connect_timer(Opts, ?DEFAULT_TC);
1269default_tc(accept, _) ->
1270    0.
1271
1272%% Accept both connect_timer and the (older) reconnect_timer, the
1273%% latter being a remnant from a time in which the timer did apply to
1274%% reconnect attempts.
1275connect_timer(Opts, Def0) ->
1276    Def = proplists:get_value(reconnect_timer, Opts, Def0),
1277    proplists:get_value(connect_timer, Opts, Def).
1278
1279%% Bound tc below if the watchdog was restarted recently to avoid
1280%% continuous restarted in case of faulty config or other problems.
1281tc(Time, Tc) ->
1282    choose(Tc > ?RESTART_TC
1283             orelse diameter_lib:micro_diff(Time) > 1000*?RESTART_TC,
1284           Tc,
1285           ?RESTART_TC).
1286
1287start_tc(0, T, S) ->
1288    tc_timeout(T, S);
1289start_tc(Tc, T, _) ->
1290    erlang:send_after(Tc, self(), {tc_timeout, T}).
1291
1292%% tc_timeout/2
1293
1294tc_timeout({Ref, _Type, _Opts} = T, #state{service_name = SvcName} = S) ->
1295    tc(diameter_config:have_transport(SvcName, Ref), T, S).
1296
1297tc(true, {Ref, Type, Opts}, #state{service_name = SvcName} = S) ->
1298    send_event(SvcName, {reconnect, Ref, Opts}),
1299    start(Ref, Type, Opts, S);
1300tc(false = No, _, _) ->  %% removed
1301    No.
1302
1303%% ---------------------------------------------------------------------------
1304%% # close/2
1305%% ---------------------------------------------------------------------------
1306
1307%% The watchdog doesn't start a new fsm in the accept case, it
1308%% simply stays alive until someone tells it to die in order for
1309%% another watchdog to be able to detect that it should transition
1310%% from initial into reopen rather than okay. That someone is either
1311%% the accepting watchdog upon reception of a CER from the previously
1312%% connected peer, or us after connect_timer timeout or immediately.
1313
1314close(#watchdog{type = connect}) ->
1315    ok;
1316
1317close(#watchdog{type = accept,
1318                pid = Pid,
1319                options = Opts}) ->
1320    Tc = connect_timer(Opts, 2*?DEFAULT_TC),
1321    erlang:send_after(Tc, Pid, close).
1322%% The RFC's only document the behaviour of Tc, our connect_timer,
1323%% for the establishment of connections but we also give
1324%% connect_timer semantics for a listener, being the time within
1325%% which a new connection attempt is expected of a connecting peer.
1326%% The value should be greater than the peer's Tc + jitter.
1327
1328%% ---------------------------------------------------------------------------
1329%% # reconnect/2
1330%% ---------------------------------------------------------------------------
1331
1332reconnect(Pid, #state{service_name = SvcName,
1333                      watchdogT = WatchdogT}) ->
1334    #watchdog{ref = Ref,
1335              type = connect,
1336              options = Opts}
1337        = fetch(WatchdogT, Pid),
1338    send_event(SvcName, {reconnect, Ref, Opts}).
1339
1340%% ---------------------------------------------------------------------------
1341%% # call_module/4
1342%% ---------------------------------------------------------------------------
1343
1344%% Backwards compatibility and never documented/advertised. May be
1345%% removed.
1346
1347call_module(Mod, Req, From, #state{service
1348                                   = #diameter_service{applications = Apps},
1349                                   service_name = Svc}
1350                            = S) ->
1351    case cm([A || A <- Apps, Mod == hd(A#diameter_app.module)],
1352            Req,
1353            From,
1354            Svc)
1355    of
1356        {reply = T, RC} ->
1357            {T, RC, S};
1358        noreply = T ->
1359            {T, S};
1360        Reason ->
1361            {reply, {error, Reason}, S}
1362    end.
1363
1364cm([#diameter_app{alias = Alias} = App], Req, From, Svc) ->
1365    Args = [Req, From, Svc],
1366
1367    try state_cb(App, handle_call, Args) of
1368        {noreply = T, ModS} ->
1369            mod_state(Alias, ModS),
1370            T;
1371        {reply = T, RC, ModS} ->
1372            mod_state(Alias, ModS),
1373            {T, RC};
1374        T ->
1375            ModX = App#diameter_app.module,
1376            ?LOG(invalid_return, {ModX, handle_call, Args, T}),
1377            invalid
1378    catch
1379        E: Reason: S ->
1380            ModX = App#diameter_app.module,
1381            Stack = diameter_lib:stacktrace(S),
1382            ?LOG(failure, {E, Reason, ModX, handle_call, Stack}),
1383            failure
1384    end;
1385
1386cm([], _, _, _) ->
1387    unknown;
1388
1389cm([_,_|_], _, _, _) ->
1390    multiple.
1391
1392%% ---------------------------------------------------------------------------
1393%% # report_status/5
1394%% ---------------------------------------------------------------------------
1395
1396report_status(Status,
1397              #watchdog{ref = Ref,
1398                        peer = TPid,
1399                        type = Type,
1400                        options = Opts},
1401              #peer{apps = [_|_] = Apps,
1402                    caps = Caps},
1403              #state{service_name = SvcName}
1404              = S,
1405              Extra) ->
1406    share_peer(Status, Caps, Apps, TPid, S),
1407    Info = [Status, Ref, {TPid, Caps}, {type(Type), Opts} | Extra],
1408    send_event(SvcName, list_to_tuple(Info)).
1409
1410%% send_event/2
1411
1412send_event(SvcName, Info) ->
1413    send_event(#diameter_event{service = SvcName,
1414                               info = Info}).
1415
1416send_event(#diameter_event{service = SvcName} = E) ->
1417    lists:foreach(fun({_, Pid}) -> Pid ! E end, subscriptions(SvcName)).
1418
1419%% ---------------------------------------------------------------------------
1420%% # share_peer/5
1421%% ---------------------------------------------------------------------------
1422
1423share_peer(up, Caps, Apps, TPid, #state{options = #{share_peers := SP},
1424                                        service_name = Svc}) ->
1425    notify(SP, Svc, {peer, TPid, [A || {_,A} <- Apps], Caps});
1426
1427share_peer(down, _Caps, _Apps, TPid, #state{options = #{share_peers := SP},
1428                                            service_name = Svc}) ->
1429    notify(SP, Svc, {peer, TPid}).
1430
1431%% ---------------------------------------------------------------------------
1432%% # share_peers/2
1433%% ---------------------------------------------------------------------------
1434
1435share_peers(Pid, #state{options = #{share_peers := SP},
1436                        local = {PeerT, AppT, _}}) ->
1437    is_remote(Pid, SP)
1438        andalso ets:foldl(fun(T, N) -> N + sp(Pid, AppT, T) end,
1439                          0,
1440                          PeerT).
1441
1442%% An entry in the peer table doesn't mean the watchdog state is OKAY,
1443%% an entry in the app table does.
1444
1445sp(Pid, AppT, #peer{pid = TPid,
1446                    apps = [{_, Alias} | _] = Apps,
1447                    caps = Caps}) ->
1448    Spec = [{{'$1', TPid},
1449             [{'==', '$1', {const, Alias}}],
1450             ['$_']}],
1451    case ets:select(AppT, Spec, 1) of
1452        '$end_of_table' ->
1453            0;
1454        _ ->
1455            Pid ! {peer, TPid, [A || {_,A} <- Apps], Caps},
1456            1
1457    end.
1458
1459is_remote(Pid, T) ->
1460    Node = node(Pid),
1461    Node /= node() andalso lists:member(Node, remotes(T)).
1462
1463%% ---------------------------------------------------------------------------
1464%% # remote_peer_up/4
1465%% ---------------------------------------------------------------------------
1466
1467remote_peer_up(TPid, Aliases, Caps, #state{options = #{use_shared_peers := T},
1468                                           remote = {PeerT, _, _}}
1469                                    = S) ->
1470    is_remote(TPid, T)
1471        andalso not ets:member(PeerT, TPid)
1472        andalso rpu(TPid, Aliases, Caps, S).
1473
1474%% Notification can be duplicate since remote nodes push and the local
1475%% node pulls.
1476
1477rpu(TPid, Aliases, Caps, #state{service = Svc, remote = RT}) ->
1478    #diameter_service{applications = Apps} = Svc,
1479    Key = #diameter_app.alias,
1480    F = fun(A) -> lists:keymember(A, Key, Apps) end,
1481    rpu(TPid, lists:filter(F, Aliases), Caps, RT);
1482
1483rpu(_, [] = No, _, _) ->
1484    No;
1485
1486rpu(TPid, Aliases, Caps, {PeerT, _, _} = RT) ->
1487    monitor(process, TPid),
1488    ets:insert(PeerT, #peer{pid = TPid,
1489                            apps = Aliases,
1490                            caps = Caps}),
1491    insert_peer(TPid, Aliases, Caps, RT).
1492
1493%% insert_peer/4
1494
1495insert_peer(TPid, Aliases, Caps, {_PeerT, AppT, IdentT}) ->
1496    #diameter_caps{origin_host = {_, OH},
1497                   origin_realm = {_, OR}}
1498        = Caps,
1499    ets:insert(AppT, [{A, TPid} || A <- Aliases]),
1500    H = iolist_to_binary(OH),
1501    R = iolist_to_binary(OR),
1502    ets:insert(IdentT, [{T, A, TPid} || T <- [{host, H}, {realm, R}, {R, H}],
1503                                        A <- Aliases]).
1504
1505%% ---------------------------------------------------------------------------
1506%% # remote_peer_down/2
1507%% ---------------------------------------------------------------------------
1508
1509remote_peer_down(TPid, #state{remote = {PeerT, _, _} = RT}) ->
1510    ets:delete(PeerT, TPid),
1511    delete_peer(TPid, RT).
1512
1513%% delete_peer/2
1514
1515delete_peer(TPid, {_PeerT, AppT, IdentT}) ->
1516    ets:select_delete(AppT, [{{'_', TPid}, [], [true]}]),
1517    ets:select_delete(IdentT, [{{'_', '_', TPid}, [], [true]}]).
1518
1519%% ---------------------------------------------------------------------------
1520%% pick_peer/4
1521%% ---------------------------------------------------------------------------
1522
1523pick_peer(#diameter_app{alias = Alias}
1524          = App,
1525          RealmAndHost,
1526          Filter,
1527          #state{local = LT,
1528                 remote = RT,
1529                 service_name = SvcName,
1530                 service = #diameter_service{pid = Pid}}) ->
1531    pick_peer(peers(Alias, RealmAndHost, Filter, LT),
1532              peers(Alias, RealmAndHost, Filter, RT),
1533              Pid,
1534              SvcName,
1535              App).
1536
1537%% pick_peer/5
1538
1539pick_peer([], [], _, _, _) ->
1540    false;
1541
1542%% App state is mutable but we're not in the service process: go there.
1543pick_peer(Local, Remote, Pid, _SvcName, #diameter_app{mutable = true} = App)
1544  when self() /= Pid ->
1545    case call_service(Pid, {pick_peer, Local, Remote, App}) of
1546        {TPid, _} = T when is_pid(TPid) ->
1547            T;
1548        false = No ->
1549            No;
1550        {error, _} ->
1551            false
1552    end;
1553
1554%% App state isn't mutable or it is and we're in the service process:
1555%% do the deed.
1556pick_peer(Local,
1557          Remote,
1558          _Pid,
1559          SvcName,
1560          #diameter_app{alias = Alias,
1561                        init_state = S,
1562                        mutable = M}
1563          = App) ->
1564    Args = [Local, Remote, SvcName],
1565
1566    try state_cb(App, pick_peer, Args) of
1567        {ok, {TPid, #diameter_caps{}} = T} when is_pid(TPid) ->
1568            T;
1569        {{TPid, #diameter_caps{}} = T, ModS} when is_pid(TPid), M ->
1570            mod_state(Alias, ModS),
1571            T;
1572        {false = No, ModS} when M ->
1573            mod_state(Alias, ModS),
1574            No;
1575        {ok, false = No} ->
1576            No;
1577        false = No ->
1578            No;
1579        {{TPid, #diameter_caps{}} = T, S} when is_pid(TPid) ->
1580            T;                     %% Accept returned state in the immutable
1581        {false = No, S} ->         %% case as long it isn't changed.
1582            No;
1583        T when M ->
1584            ModX = App#diameter_app.module,
1585            ?LOG(invalid_return, {ModX, pick_peer, T}),
1586            false
1587    catch
1588        E: Reason: Stack when M ->
1589            ModX = App#diameter_app.module,
1590            Z = diameter_lib:stacktrace(Stack),
1591            ?LOG(failure, {E, Reason, ModX, pick_peer, Z}),
1592            false
1593    end.
1594
1595%% peers/4
1596
1597%% No peer options pointing at specific peers: search for them.
1598peers(Alias, RH, [Filter], T) ->
1599    filter(Alias, RH, Filter, T, true);
1600
1601%% Or just lookup.
1602peers(_Alias, RH, [Filter | TPids], {PeerT, _AppT, _IdentT}) ->
1603    {Ts, _} = filter(caps(PeerT, TPids), RH, Filter),
1604    Ts.
1605
1606%% filter/5
1607%%
1608%% Try to limit the peers list by starting with a host/realm lookup.
1609
1610filter(Alias, RH, {neg, F}, T, B) ->
1611    filter(Alias, RH, F, T, not B);
1612
1613filter(_, _, none, _, false) ->
1614    [];
1615
1616filter(Alias, _, none, T, true) ->
1617    all_peers(Alias, T);
1618
1619filter(Alias, [DR,DH] = RH, K, T, B)
1620  when K == realm, DR == undefined;
1621       K == host, DH == undefined ->
1622    filter(Alias, RH, none, T, B);
1623
1624filter(Alias, [DR,_] = RH, realm = K, T, B) ->
1625    filter(Alias, RH, {K, DR}, T, B);
1626
1627filter(Alias, [_,DH] = RH, host = K, T, B) ->
1628    filter(Alias, RH, {K, DH}, T, B);
1629
1630filter(Alias, _, {K, D}, {PeerT, _AppT, IdentT}, true)
1631  when K == host;
1632       K == realm ->
1633    try iolist_to_binary(D) of
1634        B ->
1635            caps(PeerT, ets:select(IdentT, [{{{K, B}, '$1', '$2'},
1636                                             [{'==', '$1', {const, Alias}}],
1637                                             ['$2']}]))
1638    catch
1639        error:_ ->
1640            []
1641    end;
1642
1643filter(Alias, RH, {all, Filters}, T, B)
1644  when is_list(Filters) ->
1645    fltr_all(Alias, RH, Filters, T, B);
1646
1647filter(Alias, RH, {first, Filters}, T, B)
1648  when is_list(Filters) ->
1649    fltr_first(Alias, RH, Filters, T, B);
1650
1651filter(Alias, RH, Filter, T, B) ->
1652    {Ts, Fs} = filter(all_peers(Alias, T), RH, Filter),
1653    choose(B, Ts, Fs).
1654
1655%% fltr_all/5
1656
1657fltr_all(Alias, RH, [{K, any} | Filters], T, B)
1658  when K == host;
1659       K == realm ->
1660    fltr_all(Alias, RH, Filters, T, B);
1661
1662fltr_all(Alias, RH, [{host, _} = H, {realm, _} = R | Filters], T, B) ->
1663    fltr_all(Alias, RH, [R, H | Filters], T, B);
1664
1665fltr_all(Alias, RH, [{realm, _} = R, {host, any} | Filters], T, B) ->
1666    fltr_all(Alias, RH, [R | Filters], T, B);
1667
1668fltr_all(Alias, RH, [{realm, OR}, {host, OH} | Filters], T, true) ->
1669    {PeerT, _AppT, IdentT} = T,
1670    try {iolist_to_binary(OR), iolist_to_binary(OH)} of
1671        BT ->
1672            Peers = caps(PeerT,
1673                         ets:select(IdentT, [{{BT, '$1', '$2'},
1674                                              [{'==', '$1', {const, Alias}}],
1675                                              ['$2']}])),
1676            {Ts, _} = filter(Peers, RH, {all, Filters}),
1677            Ts
1678    catch
1679        error:_ ->
1680            []
1681    end;
1682
1683fltr_all(Alias, [undefined,_] = RH, [realm | Filters], T, B) ->
1684    fltr_all(Alias, RH, Filters, T, B);
1685
1686fltr_all(Alias, [DR,_] = RH, [realm | Filters], T, B) ->
1687    fltr_all(Alias, RH, [{realm, DR} | Filters], T, B);
1688
1689fltr_all(Alias, [_,undefined] = RH, [host | Filters], T, B) ->
1690    fltr_all(Alias, RH, Filters, T, B);
1691
1692fltr_all(Alias, [_,DH] = RH, [host | Filters], T, B) ->
1693    fltr_all(Alias, RH, [{host, DH} | Filters], T, B);
1694
1695fltr_all(Alias, RH, [{K, _} = KT, KA | Filters], T, B)
1696  when K == host, KA == realm;
1697       K == realm, KA == host ->
1698    fltr_all(Alias, RH, [KA, KT | Filters], T, B);
1699
1700fltr_all(Alias, RH, [F | Filters], T, B) ->
1701    {Ts, Fs} = filter(filter(Alias, RH, F, T, B), RH, {all, Filters}),
1702    choose(B, Ts, Fs);
1703
1704fltr_all(Alias, RH, [], T, B) ->
1705    filter(Alias, RH, none, T, B).
1706
1707%% fltr_first/5
1708%%
1709%% Like any, but stop at the first filter with any matches.
1710
1711fltr_first(Alias, RH, [F | Filters], T, B) ->
1712    case filter(Alias, RH, F, T, B) of
1713        [] ->
1714            fltr_first(Alias, RH, Filters, T, B);
1715        [_|_] = Ts ->
1716            Ts
1717    end;
1718
1719fltr_first(Alias, RH, [], T, B) ->
1720    filter(Alias, RH, none, T, not B).
1721
1722%% all_peers/2
1723
1724all_peers(Alias, {PeerT, AppT, _}) ->
1725    ets:select(PeerT, [{#peer{pid = P, caps = '$1', _ = '_'},
1726                        [],
1727                        [{{P, '$1'}}]}
1728                       || {_,P} <- ets:lookup(AppT, Alias)]).
1729
1730%% caps/2
1731
1732caps(PeerT, Pids) ->
1733    ets:select(PeerT, [{#peer{pid = P, caps = '$1', _ = '_'},
1734                        [],
1735                        [{{P, '$1'}}]}
1736                       || P <- Pids]).
1737
1738%% filter/3
1739%%
1740%% Return peers in match order.
1741
1742filter(Peers, _, none) ->
1743    {Peers, []};
1744
1745filter(Peers, RH, {neg, F}) ->
1746    {Ts, Fs} = filter(Peers, RH, F),
1747    {Fs, Ts};
1748
1749filter(Peers, RH, {all, L})
1750  when is_list(L) ->
1751    lists:foldl(fun(F,A) -> fltr_all(F, A, RH) end,
1752                {Peers, []},
1753                L);
1754
1755filter(Peers, RH, {any, L})
1756  when is_list(L) ->
1757    lists:foldl(fun(F,A) -> fltr_any(F, A, RH) end,
1758                {[], Peers},
1759                L);
1760
1761filter(Peers, RH, {first, L})
1762  when is_list(L) ->
1763    fltr_first(Peers, RH, L);
1764
1765filter(Peers, RH, F) ->
1766    lists:partition(fun({_,C}) -> caps_filter(C, RH, F) end, Peers).
1767
1768%% fltr_all/3
1769
1770fltr_all(F, {Ts0, Fs0}, RH) ->
1771    {Ts1, Fs1} = filter(Ts0, RH, F),
1772    {Ts1, Fs0 ++ Fs1}.
1773
1774%% fltr_any/3
1775
1776fltr_any(F, {Ts0, Fs0}, RH) ->
1777    {Ts1, Fs1} = filter(Fs0, RH, F),
1778    {Ts0 ++ Ts1, Fs1}.
1779
1780%% fltr_first/3
1781
1782fltr_first(Peers, _, []) ->
1783    {[], Peers};
1784
1785fltr_first(Peers, RH, [F | Filters]) ->
1786    case filter(Peers, RH, F) of
1787        {[], _} ->
1788            fltr_first(Peers, RH, Filters);
1789        {_, _} = T ->
1790            T
1791    end.
1792
1793%% caps_filter/3
1794
1795caps_filter(#diameter_caps{origin_host = {_,OH}}, [_,DH], host) ->
1796    eq(undefined, DH, OH);
1797
1798caps_filter(#diameter_caps{origin_realm = {_,OR}}, [DR,_], realm) ->
1799    eq(undefined, DR, OR);
1800
1801caps_filter(C, _, Filter) ->
1802    caps_filter(C, Filter).
1803
1804%% caps_filter/2
1805
1806caps_filter(#diameter_caps{origin_host = {_,OH}}, {host, H}) ->
1807    eq(any, H, OH);
1808
1809caps_filter(#diameter_caps{origin_realm = {_,OR}}, {realm, R}) ->
1810    eq(any, R, OR);
1811
1812%% Anything else is expected to be an eval filter. Filter failure is
1813%% documented as being equivalent to a non-matching filter.
1814
1815caps_filter(C, T) ->
1816    try
1817        {eval, F} = T,
1818        diameter_lib:eval([F,C])
1819    catch
1820        _:_ -> false
1821    end.
1822
1823eq(Any, Id, PeerId) ->
1824    Any == Id orelse try
1825                         iolist_to_binary(Id) == iolist_to_binary(PeerId)
1826                     catch
1827                         _:_ -> false
1828                     end.
1829%% OctetString() can be specified as an iolist() so test for string
1830%% rather then term equality.
1831
1832%% ---------------------------------------------------------------------------
1833%% # service_info/2
1834%% ---------------------------------------------------------------------------
1835
1836%% The config passed to diameter:start_service/2.
1837-define(CAP_INFO, ['Origin-Host',
1838                   'Origin-Realm',
1839                   'Vendor-Id',
1840                   'Product-Name',
1841                   'Origin-State-Id',
1842                   'Host-IP-Address',
1843                   'Supported-Vendor-Id',
1844                   'Auth-Application-Id',
1845                   'Inband-Security-Id',
1846                   'Acct-Application-Id',
1847                   'Vendor-Specific-Application-Id',
1848                   'Firmware-Revision']).
1849
1850%% The config returned by diameter:service_info(SvcName, all).
1851-define(ALL_INFO, [capabilities,
1852                   applications,
1853                   transport,
1854                   options]).
1855
1856%% The rest.
1857-define(OTHER_INFO, [connections,
1858                     name,
1859                     peers,
1860                     statistics,
1861                     info]).
1862
1863service_info(Item, S)
1864  when is_atom(Item) ->
1865    case tagged_info(Item, S) of
1866        {_, T} -> T;
1867        undefined = No -> No
1868    end;
1869
1870service_info(Items, S) ->
1871    tagged_info(Items, S).
1872
1873tagged_info(Item, S)
1874  when is_atom(Item) ->
1875    case complete(Item) of
1876        {value, I} ->
1877            {I, complete_info(I,S)};
1878        false ->
1879            undefined
1880    end;
1881
1882tagged_info(TPid, #state{watchdogT = WatchdogT, local = {PeerT, _, _}})
1883  when is_pid(TPid) ->
1884    try
1885        [#peer{watchdog = Pid}] = ets:lookup(PeerT, TPid),
1886        [#watchdog{ref = Ref, type = Type, options = Opts}]
1887            = ets:lookup(WatchdogT, Pid),
1888        [{ref, Ref},
1889         {type, Type},
1890         {options, Opts}]
1891    catch
1892        error:_ ->
1893            []
1894    end;
1895
1896tagged_info(Items, S)
1897  when is_list(Items) ->
1898    [T || I <- Items, T <- [tagged_info(I,S)], T /= undefined, T /= []];
1899
1900tagged_info(_, _) ->
1901    undefined.
1902
1903complete_info(Item, #state{service = Svc} = S) ->
1904    case Item of
1905        name ->
1906            S#state.service_name;
1907        'Origin-Host' ->
1908            (Svc#diameter_service.capabilities)
1909                #diameter_caps.origin_host;
1910        'Origin-Realm' ->
1911            (Svc#diameter_service.capabilities)
1912                #diameter_caps.origin_realm;
1913        'Vendor-Id' ->
1914            (Svc#diameter_service.capabilities)
1915                #diameter_caps.vendor_id;
1916        'Product-Name' ->
1917            (Svc#diameter_service.capabilities)
1918                #diameter_caps.product_name;
1919        'Origin-State-Id' ->
1920            (Svc#diameter_service.capabilities)
1921                #diameter_caps.origin_state_id;
1922        'Host-IP-Address' ->
1923            (Svc#diameter_service.capabilities)
1924                #diameter_caps.host_ip_address;
1925        'Supported-Vendor-Id' ->
1926            (Svc#diameter_service.capabilities)
1927                #diameter_caps.supported_vendor_id;
1928        'Auth-Application-Id' ->
1929            (Svc#diameter_service.capabilities)
1930                #diameter_caps.auth_application_id;
1931        'Inband-Security-Id'  ->
1932            (Svc#diameter_service.capabilities)
1933                #diameter_caps.inband_security_id;
1934        'Acct-Application-Id' ->
1935            (Svc#diameter_service.capabilities)
1936                #diameter_caps.acct_application_id;
1937        'Vendor-Specific-Application-Id' ->
1938            (Svc#diameter_service.capabilities)
1939                #diameter_caps.vendor_specific_application_id;
1940        'Firmware-Revision' ->
1941            (Svc#diameter_service.capabilities)
1942                #diameter_caps.firmware_revision;
1943        capabilities -> service_info(?CAP_INFO, S);
1944        applications -> info_apps(S);
1945        transport    -> info_transport(S);
1946        options      -> info_options(S);
1947        keys         -> ?ALL_INFO ++ ?CAP_INFO ++ ?OTHER_INFO;
1948        all          -> service_info(?ALL_INFO, S);
1949        statistics   -> info_stats(S);
1950        info         -> info_info(S);
1951        connections  -> info_connections(S);
1952        peers        -> info_peers(S)
1953    end.
1954
1955complete(I)
1956  when I == keys;
1957       I == all ->
1958    {value, I};
1959complete(Pre) ->
1960    P = atom_to_list(Pre),
1961    case [I || I <- ?ALL_INFO ++ ?CAP_INFO ++ ?OTHER_INFO,
1962               lists:prefix(P, atom_to_list(I))]
1963    of
1964        [I] -> {value, I};
1965        _   -> false
1966    end.
1967
1968%% info_stats/1
1969
1970info_stats(#state{watchdogT = WatchdogT}) ->
1971    MatchSpec = [{#watchdog{ref = '$1', peer = '$2', _ = '_'},
1972                  [{'is_pid', '$2'}],
1973                  [['$1', '$2']]}],
1974    try ets:select(WatchdogT, MatchSpec) of
1975        L ->
1976            diameter_stats:read(lists:append(L))
1977    catch
1978        error: badarg -> []  %% service  has gone down
1979    end.
1980
1981%% info_transport/1
1982%%
1983%% One entry per configured transport. Statistics for each entry are
1984%% the accumulated values for the ref and associated watchdog/peer
1985%% pids.
1986
1987info_transport(S) ->
1988    PeerD = peer_dict(S, config_dict(S)),
1989    Stats = diameter_stats:sum(dict:fetch_keys(PeerD)),
1990    dict:fold(fun(R, Ls, A) ->
1991                      Cs = proplists:get_value(R, Stats, []),
1992                      [[{ref, R} | transport(Ls)] ++ [{statistics, Cs}] | A]
1993              end,
1994              [],
1995              PeerD).
1996
1997%% Single config entry. Distinguish between pool_size config or not on
1998%% a connecting transport for backwards compatibility: with the option
1999%% the form is similar to the listening case, with connections grouped
2000%% in a pool tuple (for lack of a better name), without as before.
2001transport([[{type, Type}, {options, Opts}] = L])
2002  when Type == listen;
2003       Type == connect ->
2004    L ++ [{K, []} || [{_,K}] <- [keys(Type, Opts)]];
2005
2006%% Peer entries: discard config. Note that the peer entries have
2007%% length at least 3.
2008transport([[_,_] | L]) ->
2009    transport(L);
2010
2011%% Multiple tranports. Note that all have the same options by
2012%% construction, which is not terribly space efficient.
2013transport([[{type, Type}, {options, Opts} | _] | _] = Ls) ->
2014    transport(keys(Type, Opts), Ls).
2015
2016%% Group transports in an accept or pool tuple ...
2017transport([{Type, Key}], [[{type, _}, {options, Opts} | _] | _] = Ls) ->
2018    [{type, Type},
2019     {options, Opts},
2020     {Key, [tl(tl(L)) || L <- Ls]}];
2021
2022%% ... or not: there can only be one.
2023transport([], [L]) ->
2024    L.
2025
2026keys(connect = T, Opts) ->
2027    [{T, pool} || lists:keymember(pool_size, 1, Opts)];
2028keys(_, _) ->
2029    [{listen, accept}].
2030
2031peer_dict(#state{watchdogT = WatchdogT, local = {PeerT, _, _}}, Dict0) ->
2032    try ets:tab2list(WatchdogT) of
2033        L -> lists:foldl(fun(T,A) -> peer_acc(PeerT, A, T) end, Dict0, L)
2034    catch
2035        error: badarg -> Dict0  %% service has gone down
2036    end.
2037
2038peer_acc(PeerT, Acc, #watchdog{pid = Pid,
2039                               type = Type,
2040                               ref = Ref,
2041                               options = Opts,
2042                               state = WS,
2043                               started = At,
2044                               peer = TPid}) ->
2045    Info = [{type, Type},
2046            {options, Opts},
2047            {watchdog, {Pid, At, WS}}
2048            | info_peer(PeerT, TPid, WS)],
2049    dict:append(Ref, Info ++ [{info, info_process_info(Info)}], Acc).
2050
2051info_peer(PeerT, TPid, WS)
2052  when is_pid(TPid), WS /= ?WD_DOWN ->
2053    try ets:lookup(PeerT, TPid) of
2054        T -> info_peer(T)
2055    catch
2056        error: badarg -> []  %% service has gone down
2057    end;
2058info_peer(_, _, _) ->
2059    [].
2060
2061info_process_info(Info) ->
2062    lists:flatmap(fun ipi/1, Info).
2063
2064ipi({watchdog, {Pid, _, _}}) ->
2065    info_pid(Pid);
2066
2067ipi({peer, {Pid, _}}) ->
2068    info_pid(Pid);
2069
2070ipi({port, [{owner, Pid} | _]}) ->
2071    info_pid(Pid);
2072
2073ipi(_) ->
2074    [].
2075
2076info_pid(Pid) ->
2077    case process_info(Pid, [message_queue_len, memory, binary]) of
2078        undefined ->
2079            [];
2080        L ->
2081            [{Pid, lists:map(fun({K,V}) -> {K, map_info(K,V)} end, L)}]
2082    end.
2083
2084%% The binary list consists of 3-tuples {Ptr, Size, Count}, where Ptr
2085%% is a C pointer value, Size is the size of a referenced binary in
2086%% bytes, and Count is a global reference count. The same Ptr can
2087%% occur multiple times, once for each reference on the process heap.
2088%% In this case, the corresponding tuples will have Size in common but
2089%% Count may differ just because no global lock is taken when the
2090%% value is retrieved.
2091%%
2092%% The list can be quite large, and we aren't often interested in the
2093%% pointers or counts, so whittle this down to the number of binaries
2094%% referenced and their total byte count.
2095map_info(binary, L) ->
2096    SzD = lists:foldl(fun({P,S,_}, D) -> dict:store(P,S,D) end,
2097                      dict:new(),
2098                      L),
2099    {dict:size(SzD), dict:fold(fun(_,S,N) -> S + N end, 0, SzD)};
2100
2101map_info(_, T) ->
2102    T.
2103
2104%% The point of extracting the config here is so that 'transport' info
2105%% has one entry for each transport ref, the peer table only
2106%% containing entries that have a living watchdog.
2107
2108config_dict(#state{service_name = SvcName}) ->
2109    lists:foldl(fun config_acc/2,
2110                dict:new(),
2111                diameter_config:lookup(SvcName)).
2112
2113config_acc({Ref, T, Opts}, Dict)
2114  when T == listen;
2115       T == connect ->
2116    dict:store(Ref, [[{type, T}, {options, Opts}]], Dict);
2117config_acc(_, Dict) ->
2118    Dict.
2119
2120info_peer([#peer{pid = Pid, apps = SApps, caps = Caps, started = T}]) ->
2121    [{peer, {Pid, T}},
2122     {apps, SApps},
2123     {caps, info_caps(Caps)}
2124     | try [{port, info_port(Pid)}] catch _:_ -> [] end];
2125info_peer([] = No) ->
2126    No.
2127
2128%% Extract information that the processes involved are expected to
2129%% "publish" in their process dictionaries. Simple but backhanded.
2130info_port(Pid) ->
2131    {_, PD} = process_info(Pid, dictionary),
2132    {_, T} = lists:keyfind({diameter_peer_fsm, start}, 1, PD),
2133    {TPid, {_Type, TMod, _Cfg}} = T,
2134    {_, TD} = process_info(TPid, dictionary),
2135    {_, Data} = lists:keyfind({TMod, info}, 1, TD),
2136    [{owner, TPid},
2137     {module, TMod}
2138     | try TMod:info(Data) catch _:_ -> [] end].
2139
2140%% Use the fields names from diameter_caps instead of
2141%% diameter_base_CER to distinguish between the 2-tuple values
2142%% compared to the single capabilities values. Note also that the
2143%% returned list is tagged 'caps' rather than 'capabilities' to
2144%% emphasize the difference.
2145info_caps(#diameter_caps{} = C) ->
2146    lists:zip(record_info(fields, diameter_caps), tl(tuple_to_list(C))).
2147
2148info_apps(#state{service = #diameter_service{applications = Apps}}) ->
2149    lists:map(fun mk_app/1, Apps).
2150
2151mk_app(#diameter_app{} = A) ->
2152    lists:zip(record_info(fields, diameter_app), tl(tuple_to_list(A))).
2153
2154%% info_info/1
2155%%
2156%% Extract process_info from connections info.
2157
2158info_info(S) ->
2159    [I || L <- conn_list(S), {info, I} <- L].
2160
2161%% info_connections/1
2162%%
2163%% One entry per transport connection. Statistics for each entry are
2164%% for the peer pid only.
2165
2166info_connections(S) ->
2167    ConnL = conn_list(S),
2168    Stats = diameter_stats:read([P || L <- ConnL, {peer, {P,_}} <- L]),
2169    [L ++ [stats([P], Stats)] || L <- ConnL, {peer, {P,_}} <- L].
2170
2171conn_list(S) ->
2172    lists:append(dict:fold(fun conn_acc/3, [], peer_dict(S, dict:new()))).
2173
2174conn_acc(Ref, Peers, Acc) ->
2175    [[[{ref, Ref} | L] || L <- Peers, lists:keymember(peer, 1, L)]
2176     | Acc].
2177
2178stats(Refs, Stats) ->
2179    {statistics, dict:to_list(lists:foldl(fun(R,D) ->
2180                                                  stats_acc(R, D, Stats)
2181                                          end,
2182                                          dict:new(),
2183                                          Refs))}.
2184
2185stats_acc(Ref, Dict, Stats) ->
2186    lists:foldl(fun({C,N}, D) -> dict:update_counter(C, N, D) end,
2187                Dict,
2188                proplists:get_value(Ref, Stats, [])).
2189
2190%% info_peers/1
2191%%
2192%% One entry per peer Origin-Host. Statistics for each entry are
2193%% accumulated values for all peer pids.
2194
2195info_peers(S) ->
2196    {PeerD, RefD} = lists:foldl(fun peer_acc/2,
2197                                {dict:new(), dict:new()},
2198                                conn_list(S)),
2199    Refs = lists:append(dict:fold(fun(_, Rs, A) -> [Rs|A] end,
2200                                  [],
2201                                  RefD)),
2202    Stats = diameter_stats:read(Refs),
2203    dict:fold(fun(OH, Cs, A) ->
2204                      Rs = dict:fetch(OH, RefD),
2205                      [{OH, [{connections, Cs}, stats(Rs, Stats)]} | A]
2206              end,
2207              [],
2208              PeerD).
2209
2210peer_acc(Peer, {PeerD, RefD}) ->
2211    [{TPid, _}, [{origin_host, {_, OH}} | _]]
2212        = [proplists:get_value(K, Peer) || K <- [peer, caps]],
2213    {dict:append(OH, Peer, PeerD), dict:append(OH, TPid, RefD)}.
2214
2215%% info_options/1
2216
2217info_options(S) ->
2218    S#state.options.
2219