1%%
2%% %CopyrightBegin%
3%%
4%% Copyright Ericsson AB 2010-2017. All Rights Reserved.
5%%
6%% Licensed under the Apache License, Version 2.0 (the "License");
7%% you may not use this file except in compliance with the License.
8%% You may obtain a copy of the License at
9%%
10%%     http://www.apache.org/licenses/LICENSE-2.0
11%%
12%% Unless required by applicable law or agreed to in writing, software
13%% distributed under the License is distributed on an "AS IS" BASIS,
14%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15%% See the License for the specific language governing permissions and
16%% limitations under the License.
17%%
18%% %CopyrightEnd%
19%%
20
21-module(diameter_sctp).
22-behaviour(gen_server).
23
24%% interface
25-export([start/3]).
26
27%% child start from supervisor
28-export([start_link/1]).
29
30%% child start from here
31-export([init/1]).
32
33%% gen_server callbacks
34-export([handle_call/3,
35         handle_cast/2,
36         handle_info/2,
37         code_change/3,
38         terminate/2]).
39
40-export([listener/1,%% diameter_sync callback
41         info/1]).  %% service_info callback
42
43-export([ports/0,
44         ports/1]).
45
46-export_type([listen_option/0,
47              connect_option/0]).
48
49-include_lib("kernel/include/inet_sctp.hrl").
50-include_lib("diameter/include/diameter.hrl").
51
52%% Keys into process dictionary.
53-define(INFO_KEY, info).
54-define(REF_KEY,  ref).
55-define(TRANSPORT_KEY, transport).
56
57-define(ERROR(T), erlang:error({T, ?MODULE, ?LINE})).
58
59%% The default port for a listener.
60-define(DEFAULT_PORT, 3868).  %% RFC 3588, ch 2.1
61
62%% How long to wait for a transport process to attach after
63%% association establishment.
64-define(ACCEPT_TIMEOUT, 5000).
65
66-type connect_option() :: {raddr, inet:ip_address()}
67                        | {rport, inet:port_number()}
68                        | option()
69                        | term(). %% gen_sctp:open_option().
70
71-type match() :: inet:ip_address()
72               | string()
73               | [match()].
74
75-type listen_option() :: {accept, match()}
76                       | option()
77                       | term().  %% gen_sctp:open_option().
78
79-type option() :: {sender, boolean()}
80                | sender
81                | {packet, boolean() | raw}
82                | {message_cb, false | diameter:eval()}.
83
84-type uint() :: non_neg_integer().
85
86%% Accepting/connecting transport process state.
87-record(transport,
88        {parent  :: pid() | undefined,
89         mode :: {accept, pid()}
90               | accept
91               | {connect, {[inet:ip_address()], uint(), list()}}
92                        %% {RAs, RP, Errors}
93               | connect,
94         socket   :: gen_sctp:sctp_socket() | undefined,
95         active = false :: boolean(),      %% is socket active?
96         recv = true    :: boolean(),      %% should it be active?
97         assoc_id :: gen_sctp:assoc_id()   %% association identifier
98                   | undefined
99                   | true,
100         peer     :: {[inet:ip_address()], uint()} %% {RAs, RP}
101                   | undefined,
102         streams  :: {uint(), uint()}      %% {InStream, OutStream} counts
103                   | undefined,
104         os = 0   :: uint(),               %% next output stream
105         rotate = 1 :: boolean() | 0 | 1,  %% rotate os?
106         unordered = false :: boolean()    %% always send unordered?
107                            | pos_integer(),% or if =< N outbound streams?
108         packet = true :: boolean()        %% legacy transport_data?
109                        | raw,
110         message_cb = false :: false | diameter:eval(),
111         send = false :: pid() | boolean()}).      %% sending process
112
113%% Monitor process state.
114-record(monitor,
115        {transport :: pid(),
116         ack = false :: boolean(),
117         socket :: gen_sctp:sctp_socket(),
118         assoc_id :: gen_sctp:assoc_id()}).
119
120%% Listener process state.
121-record(listener,
122        {ref       :: reference(),
123         socket    :: gen_sctp:sctp_socket(),
124         service   :: pid(), %% service process
125         pending = {0, queue:new()},
126         opts      :: [[match()] | boolean() | diameter:eval()]}).
127%% Field pending implements two queues: the first of transport-to-be
128%% processes to which an association has been assigned but for which
129%% diameter hasn't yet spawned a transport process, a short-lived
130%% state of affairs as a new transport is spawned as a consequence of
131%% a peer being taken up, transport processes being spawned by the
132%% listener on demand; the second of started transport processes that
133%% have not yet been assigned an association.
134%%
135%% When diameter calls start/3, the transport process is either taken
136%% from the first queue or spawned and placed in the second queue
137%% until an association is established. When an association is
138%% established, a controlling process is either taken from the second
139%% queue or spawned and placed in the first queue. Thus, there are
140%% only elements in one queue at a time, so share an ets table queue
141%% and tag it with a positive length if it contains the first queue, a
142%% negative length if it contains the second queue.
143
144%% ---------------------------------------------------------------------------
145%% # start/3
146%% ---------------------------------------------------------------------------
147
148-spec start({accept, Ref}, #diameter_service{}, [listen_option()])
149   -> {ok, pid(), [inet:ip_address()]}
150 when Ref :: diameter:transport_ref();
151           ({connect, Ref}, #diameter_service{}, [connect_option()])
152   -> {ok, pid(), [inet:ip_address()]}
153 when Ref :: diameter:transport_ref().
154
155start(T, Svc, Opts)
156  when is_list(Opts) ->
157    #diameter_service{capabilities = Caps,
158                      pid = Pid}
159        = Svc,
160    diameter_sctp_sup:start(),  %% start supervisors on demand
161    Addrs = Caps#diameter_caps.host_ip_address,
162    s(T, Addrs, Pid, Opts).
163
164%% A listener spawns transports either as a consequence of this call
165%% when there is not yet an association to assign it, or at comm_up on
166%% a new association in which case the call retrieves a transport from
167%% the pending queue.
168s({accept, Ref} = A, Addrs, SvcPid, Opts) ->
169    {ok, LPid, LAs} = listener(Ref, {Opts, SvcPid, Addrs}),
170    try gen_server:call(LPid, {A, self()}, infinity) of
171        {ok, TPid} ->
172            {ok, TPid, LAs};
173        No ->
174            {error, No}
175    catch
176        exit: Reason ->
177            {error, Reason}
178    end;
179%% This implementation is due to there being no accept call in
180%% gen_sctp in order to be able to accept a new association only
181%% *after* an accepting transport has been spawned.
182
183s({connect = C, Ref}, Addrs, _SvcPid, Opts) ->
184    diameter_sctp_sup:start_child({C, self(), Opts, Addrs, Ref}).
185
186%% start_link/1
187
188start_link(T) ->
189    proc_lib:start_link(?MODULE,
190                        init,
191                        [T],
192                        infinity,
193                        diameter_lib:spawn_opts(server, [])).
194
195%% ---------------------------------------------------------------------------
196%% # info/1
197%% ---------------------------------------------------------------------------
198
199info({gen_sctp, Sock}) ->
200    lists:flatmap(fun(K) -> info(K, Sock) end,
201                  [{socket, socknames},
202                   {peer, peernames},
203                   {statistics, getstat}]).
204
205info({K,F}, Sock) ->
206    case inet:F(Sock) of
207        {ok, V} ->
208            [{K, map(F,V)}];
209        _ ->
210            []
211    end.
212
213%% inet:{sock,peer}names/1 returns [{Addr, Port}] but the port number
214%% should be the same in each tuple. Map to a {[Addr], Port} tuple if
215%% so.
216map(K, [{_, Port} | _] = APs)
217  when K == socknames;
218       K == peernames ->
219    try [A || {A,P} <- APs, P == Port orelse throw(?MODULE)] of
220        As -> {As, Port}
221    catch
222        ?MODULE -> APs
223    end;
224
225map(_, V) ->
226    V.
227
228%% ---------------------------------------------------------------------------
229%% # init/1
230%% ---------------------------------------------------------------------------
231
232init(T) ->
233    gen_server:enter_loop(?MODULE, [], i(T)).
234
235%% i/1
236
237i(#monitor{transport = TPid} = S) ->
238    monitor(process, TPid),
239    putr(?TRANSPORT_KEY, TPid),
240    proc_lib:init_ack({ok, self()}),
241    S;
242
243%% A process owning a listening socket.
244i({listen, Ref, {Opts, SvcPid, Addrs}}) ->
245    monitor(process, SvcPid),
246    [_] = diameter_config:subscribe(Ref, transport), %% assert existence
247    {Split, Rest} = proplists:split(Opts, [accept,
248                                           packet,
249                                           sender,
250                                           message_cb,
251                                           unordered]),
252    OwnOpts = lists:append(Split),
253    {LAs, Sock} = AS = open(Addrs, Rest, ?DEFAULT_PORT),
254    ok = gen_sctp:listen(Sock, true),
255    true = diameter_reg:add_new({?MODULE, listener, {Ref, AS}}),
256    proc_lib:init_ack({ok, self(), LAs}),
257    #listener{ref = Ref,
258              service = SvcPid,
259              socket = Sock,
260              opts = [[[M] || {accept, M} <- OwnOpts],
261                      proplists:get_value(packet, OwnOpts, true)
262                      | [proplists:get_value(K, OwnOpts, false)
263                         || K <- [sender, message_cb, unordered]]]};
264
265%% A connecting transport.
266i({connect, Pid, Opts, Addrs, Ref}) ->
267    {[Ps | Split], Rest} = proplists:split(Opts, [rport,
268                                                  raddr,
269                                                  packet,
270                                                  sender,
271                                                  message_cb,
272                                                  unordered]),
273    OwnOpts = lists:append(Split),
274    CB = proplists:get_value(message_cb, OwnOpts, false),
275    false == CB orelse (Pid ! {diameter, ack}),
276    RAs  = [diameter_lib:ipaddr(A) || {raddr, A} <- OwnOpts],
277    [RP] = [P || {rport, P} <- Ps] ++ [P || P <- [?DEFAULT_PORT], [] == Ps],
278    {LAs, Sock} = open(Addrs, Rest, 0),
279    putr(?REF_KEY, Ref),
280    proc_lib:init_ack({ok, self(), LAs}),
281    monitor(process, Pid),
282    #transport{parent = Pid,
283               mode = {connect, connect(Sock, RAs, RP, [])},
284               socket = Sock,
285               message_cb = CB,
286               unordered = proplists:get_value(ordered, OwnOpts, false),
287               packet = proplists:get_value(packet, OwnOpts, true),
288               send = proplists:get_value(sender, OwnOpts, false)};
289
290%% An accepting transport spawned by diameter, not yet owning an
291%% association.
292i({accept, Ref, LPid, Pid})
293  when is_pid(Pid) ->
294    putr(?REF_KEY, Ref),
295    proc_lib:init_ack({ok, self()}),
296    monitor(process, Pid),
297    MRef = monitor(process, LPid),
298    wait([{peeloff, MRef}], #transport{parent = Pid,
299                                       mode = {accept, LPid}});
300
301%% An accepting transport spawned at association establishment, whose
302%% parent is not yet known.
303i({accept, Ref, LPid}) ->
304    putr(?REF_KEY, Ref),
305    proc_lib:init_ack({ok, self()}),
306    erlang:send_after(?ACCEPT_TIMEOUT, self(), accept_timeout),
307    MRef = monitor(process, LPid),
308    wait([{parent, Ref}, {peeloff, MRef}], #transport{mode = {accept, LPid}}).
309
310%% wait/2
311%%
312%% Wait for diameter to start the transport process and for the
313%% association to be peeled off before processing other messages.
314
315wait(Keys, S) ->
316    lists:foldl(fun i/2, S, Keys).
317
318i({K, Ref}, #transport{mode = {accept, _}} = S) ->
319    receive
320        {Ref, Pid} when K == parent ->  %% transport process started
321            S#transport{parent = Pid};
322        {K, T, Opts} when K == peeloff ->  %% association
323            {sctp, Sock, _RA, _RP, _Data} = T,
324            [Matches, Packet, Sender, CB, Unordered] = Opts,
325            ok = accept_peer(Sock, Matches),
326            demonitor(Ref, [flush]),
327            false == CB orelse (S#transport.parent ! {diameter, ack}),
328            t(T, S#transport{socket = Sock,
329                             message_cb = CB,
330                             unordered = Unordered,
331                             packet = Packet,
332                             send = Sender});
333        accept_timeout = T ->
334            x(T);
335        {'DOWN', _, process, _, _} = T ->
336            x(T)
337    end.
338
339%% listener/2
340
341%% Accepting processes can be started concurrently: ensure only one
342%% listener is started.
343listener(Ref, T) ->
344    diameter_sync:call({?MODULE, listener, Ref},
345                       {?MODULE, listener, [{Ref, T}]},
346                       infinity,
347                       infinity).
348
349listener({Ref, T}) ->
350    l(diameter_reg:match({?MODULE, listener, {Ref, '_'}}), Ref, T).
351
352%% Existing listening process ...
353l([{{?MODULE, listener, {_, AS}}, LPid}], _, _) ->
354    {LAs, _Sock} = AS,
355    {ok, LPid, LAs};
356
357%% ... or not.
358l([], Ref, T) ->
359    diameter_sctp_sup:start_child({listen, Ref, T}).
360
361%% open/3
362
363open(Addrs, Opts, PortNr) ->
364    case gen_sctp:open(gen_opts(portnr(addrs(Addrs, Opts), PortNr))) of
365        {ok, Sock} ->
366            {addrs(Sock), Sock};
367        {error, Reason} ->
368            x({open, Reason})
369    end.
370
371addrs(Addrs, Opts) ->
372    case lists:mapfoldl(fun ipaddr/2, false, Opts) of
373        {Os, true} ->
374            Os;
375        {_, false} ->
376            Opts ++ [{ip, A} || A <- Addrs]
377    end.
378
379ipaddr({K,A}, _)
380  when K == ifaddr;
381       K == ip ->
382    {{ip, ipaddr(A)}, true};
383ipaddr(T, B) ->
384    {T, B}.
385
386ipaddr(A)
387  when A == loopback;
388       A == any ->
389    A;
390ipaddr(A) ->
391    diameter_lib:ipaddr(A).
392
393portnr(Opts, PortNr) ->
394    case proplists:get_value(port, Opts) of
395        undefined ->
396            [{port, PortNr} | Opts];
397        _ ->
398            Opts
399    end.
400
401addrs(Sock) ->
402    case inet:socknames(Sock) of
403        {ok, As} ->
404            [A || {A,_} <- As];
405        {error, Reason} ->
406            x({socknames, Reason})
407    end.
408
409%% x/1
410
411x(Reason) ->
412    exit({shutdown, Reason}).
413
414%% gen_opts/1
415
416gen_opts(Opts) ->
417    {L,_} = proplists:split(Opts, [binary, list, mode, active, sctp_events]),
418    [[],[],[],[],[]] == L orelse ?ERROR({reserved_options, Opts}),
419    [binary, {active, once} | Opts].
420
421%% ---------------------------------------------------------------------------
422%% # ports/0-1
423%% ---------------------------------------------------------------------------
424
425ports() ->
426    Ts = diameter_reg:match({?MODULE, '_', '_'}),
427    [{type(T), N, Pid} || {{?MODULE, T, {_, {_, S}}}, Pid} <- Ts,
428                          {ok, N} <- [inet:port(S)]].
429
430ports(Ref) ->
431    Ts = diameter_reg:match({?MODULE, '_', {Ref, '_'}}),
432    [{type(T), N, Pid} || {{?MODULE, T, {R, {_, S}}}, Pid} <- Ts,
433                          R == Ref,
434                          {ok, N} <- [inet:port(S)]].
435
436type(listener) ->
437    listen;
438type(T) ->
439    T.
440
441%% ---------------------------------------------------------------------------
442%% # handle_call/3
443%% ---------------------------------------------------------------------------
444
445handle_call({{accept, Ref}, Pid}, _, #listener{ref = Ref} = S) ->
446    {TPid, NewS} = accept(Ref, Pid, S),
447    {reply, {ok, TPid}, NewS};
448
449%% Transport is telling us of parent death.
450handle_call({stop, _Pid} = Reason, _From, #monitor{} = S) ->
451    {stop, {shutdown, Reason}, ok, S};
452
453handle_call(_, _, State) ->
454    {reply, nok, State}.
455
456%% ---------------------------------------------------------------------------
457%% # handle_cast/2
458%% ---------------------------------------------------------------------------
459
460handle_cast(_, State) ->
461    {noreply, State}.
462
463%% ---------------------------------------------------------------------------
464%% # handle_info/2
465%% ---------------------------------------------------------------------------
466
467handle_info(T, #transport{} = S) ->
468    {noreply, #transport{} = t(T,S)};
469
470handle_info(T, #listener{} = S) ->
471    {noreply, #listener{} = l(T,S)};
472
473handle_info(T, #monitor{} = S) ->
474    m(T,S),
475    {noreply, S}.
476
477%% Prior to the possibility of setting pool_size on in transport
478%% configuration, a new accepting transport was only started following
479%% the death of a predecessor, so that there was only at most one
480%% previously started transport process waiting for an association.
481%% This assumption no longer holds with pool_size > 1, in which case
482%% several accepting transports are started concurrently. Deal with
483%% this by placing the started transports in a new queue of transport
484%% processes waiting for an association.
485
486%% ---------------------------------------------------------------------------
487%% # code_change/3
488%% ---------------------------------------------------------------------------
489
490code_change(_, State, _) ->
491    {ok, State}.
492
493%% ---------------------------------------------------------------------------
494%% # terminate/2
495%% ---------------------------------------------------------------------------
496
497terminate(_, #monitor{}) ->
498    ok;
499
500terminate(_, #transport{assoc_id = undefined}) ->
501    ok;
502
503terminate(_, #transport{socket = Sock}) ->
504    gen_sctp:close(Sock);
505
506terminate(_, #listener{socket = Sock}) ->
507    gen_sctp:close(Sock).
508
509%% ---------------------------------------------------------------------------
510
511putr(Key, Val) ->
512    put({?MODULE, Key}, Val).
513
514getr(Key) ->
515    get({?MODULE, Key}).
516
517%% l/2
518%%
519%% Transition listener state.
520
521%% Incoming message from SCTP.
522l({sctp, Sock, _RA, _RP, Data} = T, #listener{socket = Sock,
523                                              opts = Opts}
524                                    = S) ->
525    Id = assoc_id(Data),
526    {TPid, NewS} = accept(S),
527    TPid ! {peeloff, setelement(2, T, peeloff(Sock, Id, TPid)), Opts},
528    setopts(Sock),
529    NewS;
530
531%% Service process has died.
532l({'DOWN', _, process, Pid, _} = T, #listener{service = Pid,
533                                              socket = Sock}) ->
534    gen_sctp:close(Sock),
535    x(T);
536
537%% Accepting process has died.
538l({'DOWN', _MRef, process, TPid, _}, #listener{pending = {_,Q}} = S) ->
539    down(queue:member(TPid, Q), TPid, S);
540
541%% Transport has been removed.
542l({transport, remove, _} = T, #listener{socket = Sock}) ->
543    gen_sctp:close(Sock),
544    x(T).
545
546%% down/3
547%%
548%% Accepting transport has died.
549
550%% One that's waiting for transport start in the pending queue ...
551down(true, TPid, #listener{pending = {N,Q}} = S) ->
552    NQ = queue:filter(fun(P) -> P /= TPid end, Q),
553    if N < 0 ->  %% awaiting an association ...
554            S#listener{pending = {N+1, NQ}};
555       true ->   %% ... or one has been assigned
556            S#listener{pending = {N-1, NQ}}
557    end;
558
559%% ... or one that's already attached.
560down(false, _TPid, S) ->
561    S.
562
563%% t/2
564%%
565%% Transition transport state.
566
567t(T,S) ->
568    case transition(T,S) of
569        ok ->
570            S;
571        #transport{} = NS ->
572            NS;
573        stop ->
574            x(T)
575    end.
576
577%% transition/2
578
579%% Incoming message.
580transition({sctp, Sock, _RA, _RP, Data}, #transport{socket = Sock} = S) ->
581    setopts(S, recv(Data, S#transport{active = false}));
582
583%% Outgoing message.
584transition({diameter, {send, Msg}}, S) ->
585    message(send, Msg, S);
586
587%% Monitor has sent an outgoing message.
588transition(Msg, S)
589  when is_record(Msg, diameter_packet);
590       is_binary(Msg) ->
591    message(ack, Msg, S);
592
593%% Deferred actions from a message_cb.
594transition({actions, Dir, Acts}, S) ->
595    setopts(ok, actions(Acts, Dir, S));
596
597%% Request to close the transport connection.
598transition({diameter, {close, Pid}}, #transport{parent = Pid}) ->
599    stop;
600
601%% TLS over SCTP is described in RFC 3436 but has limitations as
602%% described in RFC 6083. The latter describes DTLS over SCTP, which
603%% addresses these limitations, DTLS itself being described in RFC
604%% 4347. TLS is primarily used over TCP, which RFC 6733 acknowledges
605%% by equating TLS with TLS/TCP and DTLS/SCTP.
606transition({diameter, {tls, _Ref, _Type, _Bool}}, _) ->
607    stop;
608
609%% Parent process has died: call the monitor to not close the socket
610%% during an ongoing send, but don't let it take forever.
611transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid,
612                                                    send = MPid}) ->
613    is_boolean(MPid)
614        orelse ok == (catch gen_server:call(MPid, {stop, Pid}))
615        orelse exit(MPid, kill),
616    stop;
617
618%% Monitor process has died.
619transition({'DOWN', _, process, MPid, _}, #transport{send = MPid})
620  when is_pid(MPid) ->
621    stop;
622
623%% Timeout after transport process has been started.
624transition(accept_timeout, _) ->
625    ok;
626
627%% Request for the local port number.
628transition({resolve_port, Pid}, #transport{socket = Sock})
629  when is_pid(Pid) ->
630    Pid ! inet:port(Sock),
631    ok.
632
633%% m/2
634
635m({Msg, StreamId}, #monitor{socket = Sock,
636                            transport = TPid,
637                            assoc_id = AId,
638                            ack = B}) ->
639    send(Sock, AId, StreamId, Msg),
640    B andalso (TPid ! Msg);
641
642m({'DOWN', _, process, TPid, _} = T, #monitor{transport = TPid}) ->
643    x(T).
644
645%% Crash on anything unexpected.
646
647ok({ok, T}) ->
648    T;
649ok(T) ->
650    x(T).
651
652%% accept_peer/2
653
654accept_peer(_, []) ->
655    ok;
656
657accept_peer(Sock, Matches) ->
658    RAddrs = [A || {A,_} <- ok(inet:peernames(Sock))],
659    diameter_peer:match(RAddrs, Matches)
660        orelse x({accept, RAddrs, Matches}),
661    ok.
662
663%% accept/3
664%%
665%% Start a new transport process or use one that's already been
666%% started as a consequence of diameter requesting a transport
667%% process.
668
669accept(Ref, Pid, #listener{pending = {N,_}} = S) ->
670    {TPid, NQ} = q(Ref, Pid, S),
671    {TPid, S#listener{pending = {N-1, NQ}}}.
672
673%% Pending associations: attach to the first in the queue.
674q(_, Pid, #listener{ref = Ref,
675                    pending = {N,Q}})
676  when 0 < N ->
677    {TPid, _} = T = dq(Q),
678    TPid ! {Ref, Pid},
679    T;
680
681%% No pending associations: spawn a new transport.
682q(Ref, Pid, #listener{pending = {_,Q}}) ->
683    nq({accept, Ref, self(), Pid}, Q).
684
685%% send/2
686
687%% Start monitor process on first send.
688send(Msg, #transport{send = true,
689                     socket = Sock,
690                     assoc_id = AId,
691                     message_cb = CB}
692          = S) ->
693    {ok, MPid} = diameter_sctp_sup:start_child(#monitor{transport = self(),
694                                                        socket = Sock,
695                                                        assoc_id = AId,
696                                                        ack = false /= CB}),
697    monitor(process, MPid),
698    send(Msg, S#transport{send = MPid});
699
700%% Outbound Diameter message on a specified stream ...
701send(#diameter_packet{transport_data = {outstream, SId}}
702     = Msg,
703     #transport{streams = {_, OS}}
704     = S) ->
705    send(SId rem OS, Msg, S);
706
707%% ... or not: rotate when sending on multiple streams ...
708send(Msg, #transport{rotate = true,
709                     streams = {_, OS},
710                     os = N}
711          = S) ->
712    send(N, Msg, S#transport{os = (N + 1) rem OS});
713
714%% ... or send on the only stream available.
715send(Msg, S) ->
716    send(0, Msg, S).
717
718%% send/3
719
720send(StreamId, Msg, #transport{send = false,
721                               socket = Sock,
722                               assoc_id = AId}
723                    = S) ->
724    send(Sock, AId, StreamId, Msg),
725    message(ack, Msg, S);
726
727send(StreamId, Msg, #transport{send = MPid} = S) ->
728    MPid ! {Msg, StreamId},
729    S.
730
731%% send/4
732
733send(Sock, AssocId, StreamId, #diameter_packet{bin = Bin}) ->
734    send(Sock, AssocId, StreamId, Bin);
735
736send(Sock, AssocId, StreamId, Bin) ->
737    case gen_sctp:send(Sock, AssocId, StreamId, Bin) of
738        ok ->
739            ok;
740        {error, Reason} ->
741            x({send, Reason})
742    end.
743
744%% recv/2
745
746%% Association established ...
747recv({_, #sctp_assoc_change{state = comm_up,
748                            outbound_streams = OS,
749                            inbound_streams = IS,
750                            assoc_id = Id}},
751     #transport{assoc_id = undefined,
752                mode = {T, _},
753                socket = Sock}
754     = S) ->
755    Ref = getr(?REF_KEY),
756    publish(T, Ref, Id, Sock),
757    %% Deal with different association id after peeloff on Solaris by
758    %% taking the id from the first reception.
759    up(S#transport{assoc_id = T == accept orelse Id,
760                   streams = {IS, OS}});
761
762%% ... or not: try the next address.
763recv({_, #sctp_assoc_change{} = E},
764     #transport{assoc_id = undefined,
765                socket = Sock,
766                mode = {connect = C, {[RA|RAs], RP, Es}}}
767     = S) ->
768    S#transport{mode = {C, connect(Sock, RAs, RP, [{RA,E} | Es])}};
769
770%% Association failure.
771recv({_, #sctp_assoc_change{}}, _) ->
772    stop;
773
774%% First inbound on an accepting transport.
775recv({[#sctp_sndrcvinfo{assoc_id = Id}], _Bin}
776     = T,
777     #transport{assoc_id = true}
778     = S) ->
779    recv(T, S#transport{assoc_id = Id});
780
781%% Inbound Diameter message.
782recv({[#sctp_sndrcvinfo{}], Bin} = Msg, S)
783  when is_binary(Bin) ->
784    message(recv, Msg, recv(S));
785
786recv({_, #sctp_shutdown_event{}}, _) ->
787    stop;
788
789%% Note that diameter_sctp(3) documents that sctp_events cannot be
790%% specified in the list of options passed to gen_sctp and that
791%% gen_opts/1 guards against this. This is to ensure that we know what
792%% events to expect and also to ensure that we receive
793%% #sctp_sndrcvinfo{} with each incoming message (data_io_event =
794%% true). Adaptation layer events (ie. #sctp_adaptation_event{}) are
795%% disabled by default so don't handle it. We could simply disable
796%% events we don't react to but don't.
797
798recv({_, #sctp_paddr_change{}}, _) ->
799    ok;
800
801recv({_, #sctp_pdapi_event{}}, _) ->
802    ok.
803
804%% recv/1
805%%
806%% Start sending unordered after the second reception, so that an
807%% outgoing CER/CEA will arrive at the peer before another request.
808
809recv(#transport{rotate = B} = S)
810  when is_boolean(B) ->
811    S;
812
813recv(#transport{rotate = 0,
814                streams = {_,OS},
815                socket = Sock,
816                unordered = B}
817     = S) ->
818    ok = unordered(Sock, OS, B),
819    S#transport{rotate = 1 < OS};
820
821recv(#transport{rotate = N} = S) ->
822    S#transport{rotate = N-1}.
823
824%% unordered/3
825
826unordered(Sock, OS, B)
827  when B;
828       is_integer(B), OS =< B ->
829    inet:setopts(Sock, [{sctp_default_send_param,
830                         #sctp_sndrcvinfo{flags = [unordered]}}]);
831
832unordered(_, OS, B)
833  when not B;
834       is_integer(B), B < OS ->
835    ok.
836
837%% publish/4
838
839publish(T, Ref, Id, Sock) ->
840    true = diameter_reg:add_new({?MODULE, T, {Ref, {Id, Sock}}}),
841    putr(?INFO_KEY, {gen_sctp, Sock}).  %% for info/1
842
843%% up/1
844
845up(#transport{parent = Pid,
846              mode = {connect = C, {[RA | _], RP, _}}}
847   = S) ->
848    diameter_peer:up(Pid, {RA,RP}),
849    S#transport{mode = C};
850
851up(#transport{parent = Pid,
852              mode = {accept = A, _}}
853   = S) ->
854    diameter_peer:up(Pid),
855    S#transport{mode = A}.
856
857%% accept/1
858%%
859%% Start a new transport process or use one that's already been
860%% started as a consequence of an event to a listener process.
861
862accept(#listener{pending = {N,_}} = S) ->
863    {TPid, NQ} = q(S),
864    {TPid, S#listener{pending = {N+1, NQ}}}.
865
866%% Transport waiting for an association: use it.
867q(#listener{pending = {N,Q}})
868  when N < 0 ->
869    dq(Q);
870
871%% No transport start yet: spawn one and queue.
872q(#listener{ref = Ref,
873            pending = {_,Q}}) ->
874    nq({accept, Ref, self()}, Q).
875
876%% nq/2
877%%
878%% Place a transport process in the second pending queue to make it
879%% available to the next association.
880
881nq(Arg, Q) ->
882    {ok, TPid} = diameter_sctp_sup:start_child(Arg),
883    monitor(process, TPid),
884    {TPid, queue:in(TPid, Q)}.
885
886%% dq/1
887%%
888%% Remove a transport process from the first pending queue to assign
889%% it to an existing association.
890
891dq(Q) ->
892    {{value, TPid}, NQ} = queue:out(Q),
893    {TPid, NQ}.
894
895%% assoc_id/1
896%%
897%% It's unclear if this is needed, or if the first message on an
898%% association is always sctp_assoc_change, but don't assume since
899%% SCTP behaviour differs between operating systems.
900
901assoc_id({[#sctp_sndrcvinfo{assoc_id = Id}], _}) ->
902    Id;
903assoc_id({_, Rec}) ->
904    id(Rec).
905
906id(#sctp_shutdown_event{assoc_id = Id}) ->
907    Id;
908id(#sctp_assoc_change{assoc_id = Id}) ->
909    Id;
910id(#sctp_sndrcvinfo{assoc_id = Id}) ->
911    Id;
912id(#sctp_paddr_change{assoc_id = Id}) ->
913    Id;
914id(#sctp_adaptation_event{assoc_id = Id}) ->
915    Id.
916
917%% peeloff/3
918
919peeloff(LSock, Id, TPid) ->
920    {ok, Sock} = gen_sctp:peeloff(LSock, Id),
921    ok = gen_sctp:controlling_process(Sock, TPid),
922    Sock.
923
924%% connect/4
925
926connect(_, [], _, Reasons) ->
927    x({connect, lists:reverse(Reasons)});
928
929connect(Sock, [Addr | AT] = As, Port, Reasons) ->
930    case gen_sctp:connect_init(Sock, Addr, Port, []) of
931        ok ->
932            {As, Port, Reasons};
933        {error, _} = E ->
934            connect(Sock, AT, Port, [{Addr, E} | Reasons])
935    end.
936
937%% setopts/2
938
939setopts(_, #transport{socket = Sock,
940                      active = A,
941                      recv = B}
942        = S)
943  when B, not A ->
944    setopts(Sock),
945    S#transport{active = true};
946
947setopts(_, #transport{} = S) ->
948    S;
949
950setopts(#transport{socket = Sock}, T) ->
951    setopts(Sock),
952    T.
953
954%% setopts/1
955
956setopts(Sock) ->
957    case inet:setopts(Sock, [{active, once}]) of
958        ok -> ok;
959        X  -> x({setopts, Sock, X})  %% possibly on peer disconnect
960    end.
961
962%% A message_cb is invoked whenever a message is sent or received, or
963%% to provide acknowledgement of a completed send or discarded
964%% request. See diameter_tcp for semantics, the only difference being
965%% that a recv callback can get a diameter_packet record as Msg
966%% depending on how/if option packet has been specified.
967
968%% message/3
969
970message(send, false = M, S) ->
971    message(ack, M, S);
972
973message(ack, _, #transport{message_cb = false} = S) ->
974    S;
975
976message(Dir, Msg, S) ->
977    setopts(S, actions(cb(S, Dir, Msg), Dir, S)).
978
979%% actions/3
980
981actions([], _, S) ->
982    S;
983
984actions([B | As], Dir, S)
985  when is_boolean(B) ->
986    actions(As, Dir, S#transport{recv = B});
987
988actions([Dir | As], _, S)
989  when Dir == send;
990       Dir == recv ->
991    actions(As, Dir, S);
992
993actions([Msg | As], send = Dir, S)
994  when is_record(Msg, diameter_packet);
995       is_binary(Msg) ->
996    actions(As, Dir, send(Msg, S));
997
998actions([Msg | As], recv = Dir, #transport{parent = Pid} = S)
999  when is_record(Msg, diameter_packet);
1000       is_binary(Msg) ->
1001    diameter_peer:recv(Pid, Msg),
1002    actions(As, Dir, S);
1003
1004actions([{defer, Tmo, Acts} | As], Dir, S) ->
1005    erlang:send_after(Tmo, self(), {actions, Dir, Acts}),
1006    actions(As, Dir, S);
1007
1008actions(CB, _, S) ->
1009    S#transport{message_cb = CB}.
1010
1011%% cb/3
1012
1013cb(#transport{message_cb = false, packet = P}, recv, Msg) ->
1014    [pkt(P, true, Msg)];
1015
1016cb(#transport{message_cb = CB, packet = P}, recv = D, Msg) ->
1017    cb(CB, D, pkt(P, false, Msg));
1018
1019cb(#transport{message_cb = CB}, Dir, Msg) ->
1020    cb(CB, Dir, Msg);
1021
1022cb(false, send, Msg) ->
1023    [Msg];
1024
1025cb(CB, Dir, Msg) ->
1026    diameter_lib:eval([CB, Dir, Msg]).
1027
1028%% pkt/3
1029
1030pkt(false, _, {_Info, Bin}) ->
1031    Bin;
1032
1033pkt(true, _, {[#sctp_sndrcvinfo{stream = Id}], Bin}) ->
1034    #diameter_packet{bin = Bin, transport_data = {stream, Id}};
1035
1036pkt(raw, true, {[Info], Bin}) ->
1037    #diameter_packet{bin = Bin, transport_data = Info};
1038
1039pkt(raw, false, {[_], _} = Msg) ->
1040    Msg.
1041