1%% 2%% %CopyrightBegin% 3%% 4%% Copyright Ericsson AB 2010-2017. All Rights Reserved. 5%% 6%% Licensed under the Apache License, Version 2.0 (the "License"); 7%% you may not use this file except in compliance with the License. 8%% You may obtain a copy of the License at 9%% 10%% http://www.apache.org/licenses/LICENSE-2.0 11%% 12%% Unless required by applicable law or agreed to in writing, software 13%% distributed under the License is distributed on an "AS IS" BASIS, 14%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15%% See the License for the specific language governing permissions and 16%% limitations under the License. 17%% 18%% %CopyrightEnd% 19%% 20 21-module(diameter_sctp). 22-behaviour(gen_server). 23 24%% interface 25-export([start/3]). 26 27%% child start from supervisor 28-export([start_link/1]). 29 30%% child start from here 31-export([init/1]). 32 33%% gen_server callbacks 34-export([handle_call/3, 35 handle_cast/2, 36 handle_info/2, 37 code_change/3, 38 terminate/2]). 39 40-export([listener/1,%% diameter_sync callback 41 info/1]). %% service_info callback 42 43-export([ports/0, 44 ports/1]). 45 46-export_type([listen_option/0, 47 connect_option/0]). 48 49-include_lib("kernel/include/inet_sctp.hrl"). 50-include_lib("diameter/include/diameter.hrl"). 51 52%% Keys into process dictionary. 53-define(INFO_KEY, info). 54-define(REF_KEY, ref). 55-define(TRANSPORT_KEY, transport). 56 57-define(ERROR(T), erlang:error({T, ?MODULE, ?LINE})). 58 59%% The default port for a listener. 60-define(DEFAULT_PORT, 3868). %% RFC 3588, ch 2.1 61 62%% How long to wait for a transport process to attach after 63%% association establishment. 64-define(ACCEPT_TIMEOUT, 5000). 65 66-type connect_option() :: {raddr, inet:ip_address()} 67 | {rport, inet:port_number()} 68 | option() 69 | term(). %% gen_sctp:open_option(). 70 71-type match() :: inet:ip_address() 72 | string() 73 | [match()]. 74 75-type listen_option() :: {accept, match()} 76 | option() 77 | term(). %% gen_sctp:open_option(). 78 79-type option() :: {sender, boolean()} 80 | sender 81 | {packet, boolean() | raw} 82 | {message_cb, false | diameter:eval()}. 83 84-type uint() :: non_neg_integer(). 85 86%% Accepting/connecting transport process state. 87-record(transport, 88 {parent :: pid() | undefined, 89 mode :: {accept, pid()} 90 | accept 91 | {connect, {[inet:ip_address()], uint(), list()}} 92 %% {RAs, RP, Errors} 93 | connect, 94 socket :: gen_sctp:sctp_socket() | undefined, 95 active = false :: boolean(), %% is socket active? 96 recv = true :: boolean(), %% should it be active? 97 assoc_id :: gen_sctp:assoc_id() %% association identifier 98 | undefined 99 | true, 100 peer :: {[inet:ip_address()], uint()} %% {RAs, RP} 101 | undefined, 102 streams :: {uint(), uint()} %% {InStream, OutStream} counts 103 | undefined, 104 os = 0 :: uint(), %% next output stream 105 rotate = 1 :: boolean() | 0 | 1, %% rotate os? 106 unordered = false :: boolean() %% always send unordered? 107 | pos_integer(),% or if =< N outbound streams? 108 packet = true :: boolean() %% legacy transport_data? 109 | raw, 110 message_cb = false :: false | diameter:eval(), 111 send = false :: pid() | boolean()}). %% sending process 112 113%% Monitor process state. 114-record(monitor, 115 {transport :: pid(), 116 ack = false :: boolean(), 117 socket :: gen_sctp:sctp_socket(), 118 assoc_id :: gen_sctp:assoc_id()}). 119 120%% Listener process state. 121-record(listener, 122 {ref :: reference(), 123 socket :: gen_sctp:sctp_socket(), 124 service :: pid(), %% service process 125 pending = {0, queue:new()}, 126 opts :: [[match()] | boolean() | diameter:eval()]}). 127%% Field pending implements two queues: the first of transport-to-be 128%% processes to which an association has been assigned but for which 129%% diameter hasn't yet spawned a transport process, a short-lived 130%% state of affairs as a new transport is spawned as a consequence of 131%% a peer being taken up, transport processes being spawned by the 132%% listener on demand; the second of started transport processes that 133%% have not yet been assigned an association. 134%% 135%% When diameter calls start/3, the transport process is either taken 136%% from the first queue or spawned and placed in the second queue 137%% until an association is established. When an association is 138%% established, a controlling process is either taken from the second 139%% queue or spawned and placed in the first queue. Thus, there are 140%% only elements in one queue at a time, so share an ets table queue 141%% and tag it with a positive length if it contains the first queue, a 142%% negative length if it contains the second queue. 143 144%% --------------------------------------------------------------------------- 145%% # start/3 146%% --------------------------------------------------------------------------- 147 148-spec start({accept, Ref}, #diameter_service{}, [listen_option()]) 149 -> {ok, pid(), [inet:ip_address()]} 150 when Ref :: diameter:transport_ref(); 151 ({connect, Ref}, #diameter_service{}, [connect_option()]) 152 -> {ok, pid(), [inet:ip_address()]} 153 when Ref :: diameter:transport_ref(). 154 155start(T, Svc, Opts) 156 when is_list(Opts) -> 157 #diameter_service{capabilities = Caps, 158 pid = Pid} 159 = Svc, 160 diameter_sctp_sup:start(), %% start supervisors on demand 161 Addrs = Caps#diameter_caps.host_ip_address, 162 s(T, Addrs, Pid, Opts). 163 164%% A listener spawns transports either as a consequence of this call 165%% when there is not yet an association to assign it, or at comm_up on 166%% a new association in which case the call retrieves a transport from 167%% the pending queue. 168s({accept, Ref} = A, Addrs, SvcPid, Opts) -> 169 {ok, LPid, LAs} = listener(Ref, {Opts, SvcPid, Addrs}), 170 try gen_server:call(LPid, {A, self()}, infinity) of 171 {ok, TPid} -> 172 {ok, TPid, LAs}; 173 No -> 174 {error, No} 175 catch 176 exit: Reason -> 177 {error, Reason} 178 end; 179%% This implementation is due to there being no accept call in 180%% gen_sctp in order to be able to accept a new association only 181%% *after* an accepting transport has been spawned. 182 183s({connect = C, Ref}, Addrs, _SvcPid, Opts) -> 184 diameter_sctp_sup:start_child({C, self(), Opts, Addrs, Ref}). 185 186%% start_link/1 187 188start_link(T) -> 189 proc_lib:start_link(?MODULE, 190 init, 191 [T], 192 infinity, 193 diameter_lib:spawn_opts(server, [])). 194 195%% --------------------------------------------------------------------------- 196%% # info/1 197%% --------------------------------------------------------------------------- 198 199info({gen_sctp, Sock}) -> 200 lists:flatmap(fun(K) -> info(K, Sock) end, 201 [{socket, socknames}, 202 {peer, peernames}, 203 {statistics, getstat}]). 204 205info({K,F}, Sock) -> 206 case inet:F(Sock) of 207 {ok, V} -> 208 [{K, map(F,V)}]; 209 _ -> 210 [] 211 end. 212 213%% inet:{sock,peer}names/1 returns [{Addr, Port}] but the port number 214%% should be the same in each tuple. Map to a {[Addr], Port} tuple if 215%% so. 216map(K, [{_, Port} | _] = APs) 217 when K == socknames; 218 K == peernames -> 219 try [A || {A,P} <- APs, P == Port orelse throw(?MODULE)] of 220 As -> {As, Port} 221 catch 222 ?MODULE -> APs 223 end; 224 225map(_, V) -> 226 V. 227 228%% --------------------------------------------------------------------------- 229%% # init/1 230%% --------------------------------------------------------------------------- 231 232init(T) -> 233 gen_server:enter_loop(?MODULE, [], i(T)). 234 235%% i/1 236 237i(#monitor{transport = TPid} = S) -> 238 monitor(process, TPid), 239 putr(?TRANSPORT_KEY, TPid), 240 proc_lib:init_ack({ok, self()}), 241 S; 242 243%% A process owning a listening socket. 244i({listen, Ref, {Opts, SvcPid, Addrs}}) -> 245 monitor(process, SvcPid), 246 [_] = diameter_config:subscribe(Ref, transport), %% assert existence 247 {Split, Rest} = proplists:split(Opts, [accept, 248 packet, 249 sender, 250 message_cb, 251 unordered]), 252 OwnOpts = lists:append(Split), 253 {LAs, Sock} = AS = open(Addrs, Rest, ?DEFAULT_PORT), 254 ok = gen_sctp:listen(Sock, true), 255 true = diameter_reg:add_new({?MODULE, listener, {Ref, AS}}), 256 proc_lib:init_ack({ok, self(), LAs}), 257 #listener{ref = Ref, 258 service = SvcPid, 259 socket = Sock, 260 opts = [[[M] || {accept, M} <- OwnOpts], 261 proplists:get_value(packet, OwnOpts, true) 262 | [proplists:get_value(K, OwnOpts, false) 263 || K <- [sender, message_cb, unordered]]]}; 264 265%% A connecting transport. 266i({connect, Pid, Opts, Addrs, Ref}) -> 267 {[Ps | Split], Rest} = proplists:split(Opts, [rport, 268 raddr, 269 packet, 270 sender, 271 message_cb, 272 unordered]), 273 OwnOpts = lists:append(Split), 274 CB = proplists:get_value(message_cb, OwnOpts, false), 275 false == CB orelse (Pid ! {diameter, ack}), 276 RAs = [diameter_lib:ipaddr(A) || {raddr, A} <- OwnOpts], 277 [RP] = [P || {rport, P} <- Ps] ++ [P || P <- [?DEFAULT_PORT], [] == Ps], 278 {LAs, Sock} = open(Addrs, Rest, 0), 279 putr(?REF_KEY, Ref), 280 proc_lib:init_ack({ok, self(), LAs}), 281 monitor(process, Pid), 282 #transport{parent = Pid, 283 mode = {connect, connect(Sock, RAs, RP, [])}, 284 socket = Sock, 285 message_cb = CB, 286 unordered = proplists:get_value(ordered, OwnOpts, false), 287 packet = proplists:get_value(packet, OwnOpts, true), 288 send = proplists:get_value(sender, OwnOpts, false)}; 289 290%% An accepting transport spawned by diameter, not yet owning an 291%% association. 292i({accept, Ref, LPid, Pid}) 293 when is_pid(Pid) -> 294 putr(?REF_KEY, Ref), 295 proc_lib:init_ack({ok, self()}), 296 monitor(process, Pid), 297 MRef = monitor(process, LPid), 298 wait([{peeloff, MRef}], #transport{parent = Pid, 299 mode = {accept, LPid}}); 300 301%% An accepting transport spawned at association establishment, whose 302%% parent is not yet known. 303i({accept, Ref, LPid}) -> 304 putr(?REF_KEY, Ref), 305 proc_lib:init_ack({ok, self()}), 306 erlang:send_after(?ACCEPT_TIMEOUT, self(), accept_timeout), 307 MRef = monitor(process, LPid), 308 wait([{parent, Ref}, {peeloff, MRef}], #transport{mode = {accept, LPid}}). 309 310%% wait/2 311%% 312%% Wait for diameter to start the transport process and for the 313%% association to be peeled off before processing other messages. 314 315wait(Keys, S) -> 316 lists:foldl(fun i/2, S, Keys). 317 318i({K, Ref}, #transport{mode = {accept, _}} = S) -> 319 receive 320 {Ref, Pid} when K == parent -> %% transport process started 321 S#transport{parent = Pid}; 322 {K, T, Opts} when K == peeloff -> %% association 323 {sctp, Sock, _RA, _RP, _Data} = T, 324 [Matches, Packet, Sender, CB, Unordered] = Opts, 325 ok = accept_peer(Sock, Matches), 326 demonitor(Ref, [flush]), 327 false == CB orelse (S#transport.parent ! {diameter, ack}), 328 t(T, S#transport{socket = Sock, 329 message_cb = CB, 330 unordered = Unordered, 331 packet = Packet, 332 send = Sender}); 333 accept_timeout = T -> 334 x(T); 335 {'DOWN', _, process, _, _} = T -> 336 x(T) 337 end. 338 339%% listener/2 340 341%% Accepting processes can be started concurrently: ensure only one 342%% listener is started. 343listener(Ref, T) -> 344 diameter_sync:call({?MODULE, listener, Ref}, 345 {?MODULE, listener, [{Ref, T}]}, 346 infinity, 347 infinity). 348 349listener({Ref, T}) -> 350 l(diameter_reg:match({?MODULE, listener, {Ref, '_'}}), Ref, T). 351 352%% Existing listening process ... 353l([{{?MODULE, listener, {_, AS}}, LPid}], _, _) -> 354 {LAs, _Sock} = AS, 355 {ok, LPid, LAs}; 356 357%% ... or not. 358l([], Ref, T) -> 359 diameter_sctp_sup:start_child({listen, Ref, T}). 360 361%% open/3 362 363open(Addrs, Opts, PortNr) -> 364 case gen_sctp:open(gen_opts(portnr(addrs(Addrs, Opts), PortNr))) of 365 {ok, Sock} -> 366 {addrs(Sock), Sock}; 367 {error, Reason} -> 368 x({open, Reason}) 369 end. 370 371addrs(Addrs, Opts) -> 372 case lists:mapfoldl(fun ipaddr/2, false, Opts) of 373 {Os, true} -> 374 Os; 375 {_, false} -> 376 Opts ++ [{ip, A} || A <- Addrs] 377 end. 378 379ipaddr({K,A}, _) 380 when K == ifaddr; 381 K == ip -> 382 {{ip, ipaddr(A)}, true}; 383ipaddr(T, B) -> 384 {T, B}. 385 386ipaddr(A) 387 when A == loopback; 388 A == any -> 389 A; 390ipaddr(A) -> 391 diameter_lib:ipaddr(A). 392 393portnr(Opts, PortNr) -> 394 case proplists:get_value(port, Opts) of 395 undefined -> 396 [{port, PortNr} | Opts]; 397 _ -> 398 Opts 399 end. 400 401addrs(Sock) -> 402 case inet:socknames(Sock) of 403 {ok, As} -> 404 [A || {A,_} <- As]; 405 {error, Reason} -> 406 x({socknames, Reason}) 407 end. 408 409%% x/1 410 411x(Reason) -> 412 exit({shutdown, Reason}). 413 414%% gen_opts/1 415 416gen_opts(Opts) -> 417 {L,_} = proplists:split(Opts, [binary, list, mode, active, sctp_events]), 418 [[],[],[],[],[]] == L orelse ?ERROR({reserved_options, Opts}), 419 [binary, {active, once} | Opts]. 420 421%% --------------------------------------------------------------------------- 422%% # ports/0-1 423%% --------------------------------------------------------------------------- 424 425ports() -> 426 Ts = diameter_reg:match({?MODULE, '_', '_'}), 427 [{type(T), N, Pid} || {{?MODULE, T, {_, {_, S}}}, Pid} <- Ts, 428 {ok, N} <- [inet:port(S)]]. 429 430ports(Ref) -> 431 Ts = diameter_reg:match({?MODULE, '_', {Ref, '_'}}), 432 [{type(T), N, Pid} || {{?MODULE, T, {R, {_, S}}}, Pid} <- Ts, 433 R == Ref, 434 {ok, N} <- [inet:port(S)]]. 435 436type(listener) -> 437 listen; 438type(T) -> 439 T. 440 441%% --------------------------------------------------------------------------- 442%% # handle_call/3 443%% --------------------------------------------------------------------------- 444 445handle_call({{accept, Ref}, Pid}, _, #listener{ref = Ref} = S) -> 446 {TPid, NewS} = accept(Ref, Pid, S), 447 {reply, {ok, TPid}, NewS}; 448 449%% Transport is telling us of parent death. 450handle_call({stop, _Pid} = Reason, _From, #monitor{} = S) -> 451 {stop, {shutdown, Reason}, ok, S}; 452 453handle_call(_, _, State) -> 454 {reply, nok, State}. 455 456%% --------------------------------------------------------------------------- 457%% # handle_cast/2 458%% --------------------------------------------------------------------------- 459 460handle_cast(_, State) -> 461 {noreply, State}. 462 463%% --------------------------------------------------------------------------- 464%% # handle_info/2 465%% --------------------------------------------------------------------------- 466 467handle_info(T, #transport{} = S) -> 468 {noreply, #transport{} = t(T,S)}; 469 470handle_info(T, #listener{} = S) -> 471 {noreply, #listener{} = l(T,S)}; 472 473handle_info(T, #monitor{} = S) -> 474 m(T,S), 475 {noreply, S}. 476 477%% Prior to the possibility of setting pool_size on in transport 478%% configuration, a new accepting transport was only started following 479%% the death of a predecessor, so that there was only at most one 480%% previously started transport process waiting for an association. 481%% This assumption no longer holds with pool_size > 1, in which case 482%% several accepting transports are started concurrently. Deal with 483%% this by placing the started transports in a new queue of transport 484%% processes waiting for an association. 485 486%% --------------------------------------------------------------------------- 487%% # code_change/3 488%% --------------------------------------------------------------------------- 489 490code_change(_, State, _) -> 491 {ok, State}. 492 493%% --------------------------------------------------------------------------- 494%% # terminate/2 495%% --------------------------------------------------------------------------- 496 497terminate(_, #monitor{}) -> 498 ok; 499 500terminate(_, #transport{assoc_id = undefined}) -> 501 ok; 502 503terminate(_, #transport{socket = Sock}) -> 504 gen_sctp:close(Sock); 505 506terminate(_, #listener{socket = Sock}) -> 507 gen_sctp:close(Sock). 508 509%% --------------------------------------------------------------------------- 510 511putr(Key, Val) -> 512 put({?MODULE, Key}, Val). 513 514getr(Key) -> 515 get({?MODULE, Key}). 516 517%% l/2 518%% 519%% Transition listener state. 520 521%% Incoming message from SCTP. 522l({sctp, Sock, _RA, _RP, Data} = T, #listener{socket = Sock, 523 opts = Opts} 524 = S) -> 525 Id = assoc_id(Data), 526 {TPid, NewS} = accept(S), 527 TPid ! {peeloff, setelement(2, T, peeloff(Sock, Id, TPid)), Opts}, 528 setopts(Sock), 529 NewS; 530 531%% Service process has died. 532l({'DOWN', _, process, Pid, _} = T, #listener{service = Pid, 533 socket = Sock}) -> 534 gen_sctp:close(Sock), 535 x(T); 536 537%% Accepting process has died. 538l({'DOWN', _MRef, process, TPid, _}, #listener{pending = {_,Q}} = S) -> 539 down(queue:member(TPid, Q), TPid, S); 540 541%% Transport has been removed. 542l({transport, remove, _} = T, #listener{socket = Sock}) -> 543 gen_sctp:close(Sock), 544 x(T). 545 546%% down/3 547%% 548%% Accepting transport has died. 549 550%% One that's waiting for transport start in the pending queue ... 551down(true, TPid, #listener{pending = {N,Q}} = S) -> 552 NQ = queue:filter(fun(P) -> P /= TPid end, Q), 553 if N < 0 -> %% awaiting an association ... 554 S#listener{pending = {N+1, NQ}}; 555 true -> %% ... or one has been assigned 556 S#listener{pending = {N-1, NQ}} 557 end; 558 559%% ... or one that's already attached. 560down(false, _TPid, S) -> 561 S. 562 563%% t/2 564%% 565%% Transition transport state. 566 567t(T,S) -> 568 case transition(T,S) of 569 ok -> 570 S; 571 #transport{} = NS -> 572 NS; 573 stop -> 574 x(T) 575 end. 576 577%% transition/2 578 579%% Incoming message. 580transition({sctp, Sock, _RA, _RP, Data}, #transport{socket = Sock} = S) -> 581 setopts(S, recv(Data, S#transport{active = false})); 582 583%% Outgoing message. 584transition({diameter, {send, Msg}}, S) -> 585 message(send, Msg, S); 586 587%% Monitor has sent an outgoing message. 588transition(Msg, S) 589 when is_record(Msg, diameter_packet); 590 is_binary(Msg) -> 591 message(ack, Msg, S); 592 593%% Deferred actions from a message_cb. 594transition({actions, Dir, Acts}, S) -> 595 setopts(ok, actions(Acts, Dir, S)); 596 597%% Request to close the transport connection. 598transition({diameter, {close, Pid}}, #transport{parent = Pid}) -> 599 stop; 600 601%% TLS over SCTP is described in RFC 3436 but has limitations as 602%% described in RFC 6083. The latter describes DTLS over SCTP, which 603%% addresses these limitations, DTLS itself being described in RFC 604%% 4347. TLS is primarily used over TCP, which RFC 6733 acknowledges 605%% by equating TLS with TLS/TCP and DTLS/SCTP. 606transition({diameter, {tls, _Ref, _Type, _Bool}}, _) -> 607 stop; 608 609%% Parent process has died: call the monitor to not close the socket 610%% during an ongoing send, but don't let it take forever. 611transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid, 612 send = MPid}) -> 613 is_boolean(MPid) 614 orelse ok == (catch gen_server:call(MPid, {stop, Pid})) 615 orelse exit(MPid, kill), 616 stop; 617 618%% Monitor process has died. 619transition({'DOWN', _, process, MPid, _}, #transport{send = MPid}) 620 when is_pid(MPid) -> 621 stop; 622 623%% Timeout after transport process has been started. 624transition(accept_timeout, _) -> 625 ok; 626 627%% Request for the local port number. 628transition({resolve_port, Pid}, #transport{socket = Sock}) 629 when is_pid(Pid) -> 630 Pid ! inet:port(Sock), 631 ok. 632 633%% m/2 634 635m({Msg, StreamId}, #monitor{socket = Sock, 636 transport = TPid, 637 assoc_id = AId, 638 ack = B}) -> 639 send(Sock, AId, StreamId, Msg), 640 B andalso (TPid ! Msg); 641 642m({'DOWN', _, process, TPid, _} = T, #monitor{transport = TPid}) -> 643 x(T). 644 645%% Crash on anything unexpected. 646 647ok({ok, T}) -> 648 T; 649ok(T) -> 650 x(T). 651 652%% accept_peer/2 653 654accept_peer(_, []) -> 655 ok; 656 657accept_peer(Sock, Matches) -> 658 RAddrs = [A || {A,_} <- ok(inet:peernames(Sock))], 659 diameter_peer:match(RAddrs, Matches) 660 orelse x({accept, RAddrs, Matches}), 661 ok. 662 663%% accept/3 664%% 665%% Start a new transport process or use one that's already been 666%% started as a consequence of diameter requesting a transport 667%% process. 668 669accept(Ref, Pid, #listener{pending = {N,_}} = S) -> 670 {TPid, NQ} = q(Ref, Pid, S), 671 {TPid, S#listener{pending = {N-1, NQ}}}. 672 673%% Pending associations: attach to the first in the queue. 674q(_, Pid, #listener{ref = Ref, 675 pending = {N,Q}}) 676 when 0 < N -> 677 {TPid, _} = T = dq(Q), 678 TPid ! {Ref, Pid}, 679 T; 680 681%% No pending associations: spawn a new transport. 682q(Ref, Pid, #listener{pending = {_,Q}}) -> 683 nq({accept, Ref, self(), Pid}, Q). 684 685%% send/2 686 687%% Start monitor process on first send. 688send(Msg, #transport{send = true, 689 socket = Sock, 690 assoc_id = AId, 691 message_cb = CB} 692 = S) -> 693 {ok, MPid} = diameter_sctp_sup:start_child(#monitor{transport = self(), 694 socket = Sock, 695 assoc_id = AId, 696 ack = false /= CB}), 697 monitor(process, MPid), 698 send(Msg, S#transport{send = MPid}); 699 700%% Outbound Diameter message on a specified stream ... 701send(#diameter_packet{transport_data = {outstream, SId}} 702 = Msg, 703 #transport{streams = {_, OS}} 704 = S) -> 705 send(SId rem OS, Msg, S); 706 707%% ... or not: rotate when sending on multiple streams ... 708send(Msg, #transport{rotate = true, 709 streams = {_, OS}, 710 os = N} 711 = S) -> 712 send(N, Msg, S#transport{os = (N + 1) rem OS}); 713 714%% ... or send on the only stream available. 715send(Msg, S) -> 716 send(0, Msg, S). 717 718%% send/3 719 720send(StreamId, Msg, #transport{send = false, 721 socket = Sock, 722 assoc_id = AId} 723 = S) -> 724 send(Sock, AId, StreamId, Msg), 725 message(ack, Msg, S); 726 727send(StreamId, Msg, #transport{send = MPid} = S) -> 728 MPid ! {Msg, StreamId}, 729 S. 730 731%% send/4 732 733send(Sock, AssocId, StreamId, #diameter_packet{bin = Bin}) -> 734 send(Sock, AssocId, StreamId, Bin); 735 736send(Sock, AssocId, StreamId, Bin) -> 737 case gen_sctp:send(Sock, AssocId, StreamId, Bin) of 738 ok -> 739 ok; 740 {error, Reason} -> 741 x({send, Reason}) 742 end. 743 744%% recv/2 745 746%% Association established ... 747recv({_, #sctp_assoc_change{state = comm_up, 748 outbound_streams = OS, 749 inbound_streams = IS, 750 assoc_id = Id}}, 751 #transport{assoc_id = undefined, 752 mode = {T, _}, 753 socket = Sock} 754 = S) -> 755 Ref = getr(?REF_KEY), 756 publish(T, Ref, Id, Sock), 757 %% Deal with different association id after peeloff on Solaris by 758 %% taking the id from the first reception. 759 up(S#transport{assoc_id = T == accept orelse Id, 760 streams = {IS, OS}}); 761 762%% ... or not: try the next address. 763recv({_, #sctp_assoc_change{} = E}, 764 #transport{assoc_id = undefined, 765 socket = Sock, 766 mode = {connect = C, {[RA|RAs], RP, Es}}} 767 = S) -> 768 S#transport{mode = {C, connect(Sock, RAs, RP, [{RA,E} | Es])}}; 769 770%% Association failure. 771recv({_, #sctp_assoc_change{}}, _) -> 772 stop; 773 774%% First inbound on an accepting transport. 775recv({[#sctp_sndrcvinfo{assoc_id = Id}], _Bin} 776 = T, 777 #transport{assoc_id = true} 778 = S) -> 779 recv(T, S#transport{assoc_id = Id}); 780 781%% Inbound Diameter message. 782recv({[#sctp_sndrcvinfo{}], Bin} = Msg, S) 783 when is_binary(Bin) -> 784 message(recv, Msg, recv(S)); 785 786recv({_, #sctp_shutdown_event{}}, _) -> 787 stop; 788 789%% Note that diameter_sctp(3) documents that sctp_events cannot be 790%% specified in the list of options passed to gen_sctp and that 791%% gen_opts/1 guards against this. This is to ensure that we know what 792%% events to expect and also to ensure that we receive 793%% #sctp_sndrcvinfo{} with each incoming message (data_io_event = 794%% true). Adaptation layer events (ie. #sctp_adaptation_event{}) are 795%% disabled by default so don't handle it. We could simply disable 796%% events we don't react to but don't. 797 798recv({_, #sctp_paddr_change{}}, _) -> 799 ok; 800 801recv({_, #sctp_pdapi_event{}}, _) -> 802 ok. 803 804%% recv/1 805%% 806%% Start sending unordered after the second reception, so that an 807%% outgoing CER/CEA will arrive at the peer before another request. 808 809recv(#transport{rotate = B} = S) 810 when is_boolean(B) -> 811 S; 812 813recv(#transport{rotate = 0, 814 streams = {_,OS}, 815 socket = Sock, 816 unordered = B} 817 = S) -> 818 ok = unordered(Sock, OS, B), 819 S#transport{rotate = 1 < OS}; 820 821recv(#transport{rotate = N} = S) -> 822 S#transport{rotate = N-1}. 823 824%% unordered/3 825 826unordered(Sock, OS, B) 827 when B; 828 is_integer(B), OS =< B -> 829 inet:setopts(Sock, [{sctp_default_send_param, 830 #sctp_sndrcvinfo{flags = [unordered]}}]); 831 832unordered(_, OS, B) 833 when not B; 834 is_integer(B), B < OS -> 835 ok. 836 837%% publish/4 838 839publish(T, Ref, Id, Sock) -> 840 true = diameter_reg:add_new({?MODULE, T, {Ref, {Id, Sock}}}), 841 putr(?INFO_KEY, {gen_sctp, Sock}). %% for info/1 842 843%% up/1 844 845up(#transport{parent = Pid, 846 mode = {connect = C, {[RA | _], RP, _}}} 847 = S) -> 848 diameter_peer:up(Pid, {RA,RP}), 849 S#transport{mode = C}; 850 851up(#transport{parent = Pid, 852 mode = {accept = A, _}} 853 = S) -> 854 diameter_peer:up(Pid), 855 S#transport{mode = A}. 856 857%% accept/1 858%% 859%% Start a new transport process or use one that's already been 860%% started as a consequence of an event to a listener process. 861 862accept(#listener{pending = {N,_}} = S) -> 863 {TPid, NQ} = q(S), 864 {TPid, S#listener{pending = {N+1, NQ}}}. 865 866%% Transport waiting for an association: use it. 867q(#listener{pending = {N,Q}}) 868 when N < 0 -> 869 dq(Q); 870 871%% No transport start yet: spawn one and queue. 872q(#listener{ref = Ref, 873 pending = {_,Q}}) -> 874 nq({accept, Ref, self()}, Q). 875 876%% nq/2 877%% 878%% Place a transport process in the second pending queue to make it 879%% available to the next association. 880 881nq(Arg, Q) -> 882 {ok, TPid} = diameter_sctp_sup:start_child(Arg), 883 monitor(process, TPid), 884 {TPid, queue:in(TPid, Q)}. 885 886%% dq/1 887%% 888%% Remove a transport process from the first pending queue to assign 889%% it to an existing association. 890 891dq(Q) -> 892 {{value, TPid}, NQ} = queue:out(Q), 893 {TPid, NQ}. 894 895%% assoc_id/1 896%% 897%% It's unclear if this is needed, or if the first message on an 898%% association is always sctp_assoc_change, but don't assume since 899%% SCTP behaviour differs between operating systems. 900 901assoc_id({[#sctp_sndrcvinfo{assoc_id = Id}], _}) -> 902 Id; 903assoc_id({_, Rec}) -> 904 id(Rec). 905 906id(#sctp_shutdown_event{assoc_id = Id}) -> 907 Id; 908id(#sctp_assoc_change{assoc_id = Id}) -> 909 Id; 910id(#sctp_sndrcvinfo{assoc_id = Id}) -> 911 Id; 912id(#sctp_paddr_change{assoc_id = Id}) -> 913 Id; 914id(#sctp_adaptation_event{assoc_id = Id}) -> 915 Id. 916 917%% peeloff/3 918 919peeloff(LSock, Id, TPid) -> 920 {ok, Sock} = gen_sctp:peeloff(LSock, Id), 921 ok = gen_sctp:controlling_process(Sock, TPid), 922 Sock. 923 924%% connect/4 925 926connect(_, [], _, Reasons) -> 927 x({connect, lists:reverse(Reasons)}); 928 929connect(Sock, [Addr | AT] = As, Port, Reasons) -> 930 case gen_sctp:connect_init(Sock, Addr, Port, []) of 931 ok -> 932 {As, Port, Reasons}; 933 {error, _} = E -> 934 connect(Sock, AT, Port, [{Addr, E} | Reasons]) 935 end. 936 937%% setopts/2 938 939setopts(_, #transport{socket = Sock, 940 active = A, 941 recv = B} 942 = S) 943 when B, not A -> 944 setopts(Sock), 945 S#transport{active = true}; 946 947setopts(_, #transport{} = S) -> 948 S; 949 950setopts(#transport{socket = Sock}, T) -> 951 setopts(Sock), 952 T. 953 954%% setopts/1 955 956setopts(Sock) -> 957 case inet:setopts(Sock, [{active, once}]) of 958 ok -> ok; 959 X -> x({setopts, Sock, X}) %% possibly on peer disconnect 960 end. 961 962%% A message_cb is invoked whenever a message is sent or received, or 963%% to provide acknowledgement of a completed send or discarded 964%% request. See diameter_tcp for semantics, the only difference being 965%% that a recv callback can get a diameter_packet record as Msg 966%% depending on how/if option packet has been specified. 967 968%% message/3 969 970message(send, false = M, S) -> 971 message(ack, M, S); 972 973message(ack, _, #transport{message_cb = false} = S) -> 974 S; 975 976message(Dir, Msg, S) -> 977 setopts(S, actions(cb(S, Dir, Msg), Dir, S)). 978 979%% actions/3 980 981actions([], _, S) -> 982 S; 983 984actions([B | As], Dir, S) 985 when is_boolean(B) -> 986 actions(As, Dir, S#transport{recv = B}); 987 988actions([Dir | As], _, S) 989 when Dir == send; 990 Dir == recv -> 991 actions(As, Dir, S); 992 993actions([Msg | As], send = Dir, S) 994 when is_record(Msg, diameter_packet); 995 is_binary(Msg) -> 996 actions(As, Dir, send(Msg, S)); 997 998actions([Msg | As], recv = Dir, #transport{parent = Pid} = S) 999 when is_record(Msg, diameter_packet); 1000 is_binary(Msg) -> 1001 diameter_peer:recv(Pid, Msg), 1002 actions(As, Dir, S); 1003 1004actions([{defer, Tmo, Acts} | As], Dir, S) -> 1005 erlang:send_after(Tmo, self(), {actions, Dir, Acts}), 1006 actions(As, Dir, S); 1007 1008actions(CB, _, S) -> 1009 S#transport{message_cb = CB}. 1010 1011%% cb/3 1012 1013cb(#transport{message_cb = false, packet = P}, recv, Msg) -> 1014 [pkt(P, true, Msg)]; 1015 1016cb(#transport{message_cb = CB, packet = P}, recv = D, Msg) -> 1017 cb(CB, D, pkt(P, false, Msg)); 1018 1019cb(#transport{message_cb = CB}, Dir, Msg) -> 1020 cb(CB, Dir, Msg); 1021 1022cb(false, send, Msg) -> 1023 [Msg]; 1024 1025cb(CB, Dir, Msg) -> 1026 diameter_lib:eval([CB, Dir, Msg]). 1027 1028%% pkt/3 1029 1030pkt(false, _, {_Info, Bin}) -> 1031 Bin; 1032 1033pkt(true, _, {[#sctp_sndrcvinfo{stream = Id}], Bin}) -> 1034 #diameter_packet{bin = Bin, transport_data = {stream, Id}}; 1035 1036pkt(raw, true, {[Info], Bin}) -> 1037 #diameter_packet{bin = Bin, transport_data = Info}; 1038 1039pkt(raw, false, {[_], _} = Msg) -> 1040 Msg. 1041