1%% 2%% %CopyrightBegin% 3%% 4%% Copyright Ericsson AB 2010-2017. All Rights Reserved. 5%% 6%% Licensed under the Apache License, Version 2.0 (the "License"); 7%% you may not use this file except in compliance with the License. 8%% You may obtain a copy of the License at 9%% 10%% http://www.apache.org/licenses/LICENSE-2.0 11%% 12%% Unless required by applicable law or agreed to in writing, software 13%% distributed under the License is distributed on an "AS IS" BASIS, 14%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15%% See the License for the specific language governing permissions and 16%% limitations under the License. 17%% 18%% %CopyrightEnd% 19%% 20 21%% 22%% Tests of diameter_tcp/sctp as implementations of the diameter 23%% transport interface. 24%% 25 26-module(diameter_transport_SUITE). 27 28-export([suite/0, 29 all/0, 30 groups/0, 31 init_per_suite/1, 32 end_per_suite/1]). 33 34%% testcases 35-export([start/1, 36 tcp_accept/1, 37 tcp_connect/1, 38 sctp_accept/1, 39 sctp_connect/1, 40 reconnect/1, reconnect/0, 41 stop/1]). 42 43-export([accept/1, 44 connect/1, 45 init/2]). 46 47-include_lib("kernel/include/inet_sctp.hrl"). 48-include("diameter.hrl"). 49 50-define(util, diameter_util). 51 52%% Corresponding to diameter_* transport modules. 53-define(TRANSPORTS, [tcp, sctp]). 54 55%% Receive a message. 56-define(RECV(Pat, Ret), receive Pat -> Ret end). 57-define(RECV(Pat), ?RECV(Pat, diameter_lib:now())). 58 59%% Sockets are opened on the loopback address. 60-define(ADDR, {127,0,0,1}). 61 62%% diameter_tcp doesn't use anything but host_ip_address, and that 63%% only is a local address isn't configured as at transport start. 64-define(SVC(Addrs), #diameter_service{capabilities 65 = #diameter_caps{host_ip_address 66 = Addrs}}). 67 68%% The term we register after open a listening port with gen_{tcp,sctp}. 69-define(TEST_LISTENER(Ref, PortNr), 70 {?MODULE, listen, Ref, PortNr}). 71 72%% Message over the transport interface. 73-define(TMSG(T), {diameter, T}). 74 75%% Options for gen_tcp/gen_sctp. 76-define(TCP_OPTS, [binary, {active, true}, {packet, 0}]). 77-define(SCTP_OPTS, [binary, {active, true}, {sctp_initmsg, ?SCTP_INIT}]). 78 79%% Request a specific number of streams just because we can. 80-define(SCTP_INIT, #sctp_initmsg{num_ostreams = 5, 81 max_instreams = 5}). 82 83%% Messages from gen_sctp. 84-define(SCTP(Sock, Data), {sctp, Sock, _, _, Data}). 85 86%% =========================================================================== 87 88suite() -> 89 [{timetrap, {seconds, 15}}]. 90 91all() -> 92 [start, 93 {group, all}, 94 {group, all, [parallel]}, 95 stop]. 96 97groups() -> 98 [{all, [], tc()}]. 99 100tc() -> 101 [tcp_accept, 102 tcp_connect, 103 sctp_accept, 104 sctp_connect, 105 reconnect]. 106 107init_per_suite(Config) -> 108 [{sctp, ?util:have_sctp()} | Config]. 109 110end_per_suite(_Config) -> 111 ok. 112 113%% =========================================================================== 114 115start(_Config) -> 116 ok = diameter:start(). 117 118stop(_Config) -> 119 ok = diameter:stop(). 120 121%% =========================================================================== 122%% tcp_accept/1 123%% sctp_accept/1 124%% 125%% diameter transport accepting, test code connecting. 126 127tcp_accept(_) -> 128 accept(tcp). 129 130sctp_accept(Config) -> 131 case lists:member({sctp, true}, Config) of 132 true -> accept(sctp); 133 false -> {skip, no_sctp} 134 end. 135 136%% Start multiple accepting transport processes that are connected to 137%% with an equal number of connecting processes using gen_tcp/sctp 138%% directly. 139 140-define(PEER_COUNT, 8). 141 142accept(Prot) -> 143 Ref = make_ref(), 144 true = diameter_reg:add_new({diameter_config, transport, Ref}), %% fake it 145 T = {Prot, Ref}, 146 [] = ?util:run(?util:scramble(acc(2*?PEER_COUNT, T, []))). 147 148acc(0, _, Acc) -> 149 Acc; 150acc(N, T, Acc) -> 151 acc(N-1, T, [{?MODULE, [init, 152 element(1 + N rem 2, {accept, gen_connect}), 153 T]} 154 | Acc]). 155 156%% =========================================================================== 157%% tcp_connect/1 158%% sctp_connect/1 159%% 160%% Test code accepting, diameter transport connecting. 161 162tcp_connect(_) -> 163 connect(tcp). 164 165sctp_connect(Config) -> 166 case lists:member({sctp, true}, Config) of 167 true -> connect(sctp); 168 false -> {skip, no_sctp} 169 end. 170 171connect(Prot) -> 172 T = {Prot, make_ref()}, 173 [] = ?util:run([{?MODULE, [init, X, T]} || X <- [gen_accept, connect]]). 174 175%% =========================================================================== 176%% reconnect/1 177%% 178%% Exercise reconnection behaviour: that a connecting transport 179%% doesn't try to establish a new connection until the old one is 180%% broken. 181 182reconnect() -> 183 [{timetrap, {minutes, 4}}]. 184 185reconnect({listen, Ref}) -> 186 SvcName = make_ref(), 187 ok = start_service(SvcName), 188 LRef = ?util:listen(SvcName, tcp, [{watchdog_timer, 6000}]), 189 [_] = diameter_reg:wait({diameter_tcp, listener, {LRef, '_'}}), 190 true = diameter_reg:add_new({?MODULE, Ref, LRef}), 191 192 %% Wait for partner to request transport death. 193 TPid = abort(SvcName, LRef, Ref), 194 195 %% Kill transport to force the peer to reconnect. 196 exit(TPid, kill), 197 198 %% Wait for the partner again. 199 abort(SvcName, LRef, Ref); 200 201reconnect({connect, Ref}) -> 202 SvcName = make_ref(), 203 true = diameter:subscribe(SvcName), 204 ok = start_service(SvcName), 205 [{{_, _, LRef}, Pid}] = diameter_reg:wait({?MODULE, Ref, '_'}), 206 CRef = ?util:connect(SvcName, tcp, LRef, [{connect_timer, 2000}, 207 {watchdog_timer, 6000}]), 208 209 %% Tell partner to kill transport after seeing that there are no 210 %% reconnection attempts. 211 abort(SvcName, Pid, Ref), 212 213 %% Transport goes down and is reestablished. 214 ?RECV(#diameter_event{service = SvcName, info = {down, CRef, _, _}}), 215 ?RECV(#diameter_event{service = SvcName, info = {reconnect, CRef, _}}), 216 ?RECV(#diameter_event{service = SvcName, info = {up, CRef, _, _, _}}), 217 218 %% Kill again. 219 abort(SvcName, Pid, Ref), 220 221 %% Wait for partner to die. 222 MRef = erlang:monitor(process, Pid), 223 ?RECV({'DOWN', MRef, process, _, _}); 224 225reconnect(_) -> 226 Ref = make_ref(), 227 [] = ?util:run([{?MODULE, [reconnect, {T, Ref}]} 228 || T <- [listen, connect]]). 229 230start_service(SvcName) -> 231 OH = diameter_util:unique_string(), 232 Opts = [{application, [{dictionary, diameter_gen_base_rfc6733}, 233 {module, diameter_callback}]}, 234 {'Origin-Host', OH}, 235 {'Origin-Realm', OH ++ ".org"}, 236 {'Vendor-Id', 0}, 237 {'Product-Name', "x"}, 238 {'Auth-Application-Id', [0]}], 239 diameter:start_service(SvcName, Opts). 240 241abort(SvcName, Pid, Ref) 242 when is_pid(Pid) -> 243 receive 244 #diameter_event{service = SvcName, info = {reconnect, _, _}} = E -> 245 erlang:error(E) 246 after 45000 -> 247 ok 248 end, 249 Pid ! {abort, Ref}; 250 251abort(SvcName, LRef, Ref) 252 when is_reference(LRef) -> 253 ?RECV({abort, Ref}), 254 [[{ref, LRef}, {type, listen}, {options, _}, {accept, [_,_] = Ts} | _]] 255 %% assert on two accepting 256 = diameter:service_info(SvcName, transport), 257 [TPid] = [P || [{watchdog, {_,_,okay}}, {peer, {P,_}} | _] <- Ts], 258 TPid. 259 260%% =========================================================================== 261%% =========================================================================== 262 263%% init/2 264 265init(accept, {Prot, Ref}) -> 266 %% Start an accepting transport and receive notification of a 267 %% connection. 268 TPid = start_accept(Prot, Ref), 269 270 %% Receive a message and send it back. 271 <<_:8, Len:24, _/binary>> = Bin = bin(Prot, ?RECV(?TMSG({recv, P}), P)), 272 273 Len = size(Bin), 274 TPid ! ?TMSG({send, Bin}), 275 276 %% Expect the transport process to die as a result of the peer 277 %% closing the connection. 278 MRef = erlang:monitor(process, TPid), 279 ?RECV({'DOWN', MRef, process, _, _}); 280 281init(gen_connect, {Prot, Ref}) -> 282 %% Lookup the peer's listening socket. 283 [PortNr] = ?util:lport(Prot, Ref), 284 285 %% Connect, send a message and receive it back. 286 {ok, Sock} = gen_connect(Prot, PortNr), 287 Bin = make_msg(), 288 ok = gen_send(Prot, Sock, Bin), 289 Bin = gen_recv(Prot, Sock); 290 291init(gen_accept, {Prot, Ref}) -> 292 %% Open a listening socket and publish the port number. 293 {ok, LSock} = gen_listen(Prot), 294 {ok, PortNr} = inet:port(LSock), 295 true = diameter_reg:add_new(?TEST_LISTENER(Ref, PortNr)), 296 297 %% Accept a connection, receive a message send it back, and wait 298 %% for the peer to close the connection. 299 {ok, Sock} = gen_accept(Prot, LSock), 300 Bin = gen_recv(Prot, Sock), 301 ok = gen_send(Prot, Sock, Bin), 302 receive 303 {tcp_closed, Sock} = T -> 304 T; 305 ?SCTP(Sock, {_, #sctp_assoc_change{}}) = T -> 306 T 307 end; 308 309init(connect, {Prot, Ref}) -> 310 %% Lookup the peer's listening socket. 311 [{?TEST_LISTENER(_, PortNr), _}] 312 = diameter_reg:wait(?TEST_LISTENER(Ref, '_')), 313 314 %% Start a connecting transport and receive notification of 315 %% the connection. 316 TPid = start_connect(Prot, PortNr, Ref), 317 318 %% Send a message and receive it back. 319 Bin = make_msg(), 320 TPid ! ?TMSG({send, Bin}), 321 Bin = bin(Prot, ?RECV(?TMSG({recv, P}), P)). 322 323bin(sctp, #diameter_packet{bin = Bin}) -> 324 Bin; 325bin(tcp, Bin) -> 326 Bin. 327 328%% make_msg/0 329%% 330%% A valid Diameter message in as far as diameter_tcp examines it, 331%% the module examining the length in the Diameter header to locate 332%% message boundaries. 333 334make_msg() -> 335 N = 1024, 336 Bin = rand_bytes(4*N), 337 Len = 4*(N+1), 338 <<1:8, Len:24, Bin/binary>>. 339 340%% crypto:rand_bytes/1 isn't available on all platforms (since openssl 341%% isn't) so roll our own. Not particularly random, but less verbose 342%% in trace. 343rand_bytes(N) -> 344 Oct = rand:uniform(256) - 1, 345 binary:copy(<<Oct>>, N). 346 347%% =========================================================================== 348 349%% start_connect/3 350 351start_connect(Prot, PortNr, Ref) -> 352 {ok, TPid} = start_connect(Prot, 353 {connect, Ref}, 354 ?SVC([]), 355 [{raddr, ?ADDR}, 356 {rport, PortNr}, 357 {ip, ?ADDR}, 358 {port, 0}]), 359 connected(Prot, TPid), 360 TPid. 361 362connected(sctp, TPid) -> 363 ?RECV(?TMSG({TPid, connected, _})); 364connected(tcp, TPid) -> 365 ?RECV(?TMSG({TPid, connected, _, [?ADDR]})). 366 367start_connect(sctp, T, Svc, Opts) -> 368 {ok, TPid, [?ADDR]} 369 = diameter_sctp:start(T, Svc, [{sctp_initmsg, ?SCTP_INIT} | Opts]), 370 {ok, TPid}; 371start_connect(tcp, T, Svc, Opts) -> 372 diameter_tcp:start(T, Svc, Opts). 373 374%% start_accept/2 375 376start_accept(Prot, Ref) -> 377 {ok, TPid, [?ADDR]} 378 = start_accept(Prot, {accept, Ref}, ?SVC([?ADDR]), [{port, 0}]), 379 ?RECV(?TMSG({TPid, connected})), 380 TPid. 381 382start_accept(sctp, T, Svc, Opts) -> 383 diameter_sctp:start(T, Svc, [{sctp_initmsg, ?SCTP_INIT} | Opts]); 384start_accept(tcp, T, Svc, Opts) -> 385 diameter_tcp:start(T, Svc, Opts). 386 387%% =========================================================================== 388 389%% gen_connect/2 390 391gen_connect(sctp = P, PortNr) -> 392 {ok, Sock} = Ok = gen_sctp:open([{ip, ?ADDR}, {port, 0} | ?SCTP_OPTS]), 393 ok = gen_sctp:connect_init(Sock, ?ADDR, PortNr, []), 394 Ok = gen_accept(P, Sock); 395gen_connect(tcp, PortNr) -> 396 gen_tcp:connect(?ADDR, PortNr, ?TCP_OPTS). 397 398%% gen_listen/1 399 400gen_listen(sctp) -> 401 {ok, Sock} = gen_sctp:open([{ip, ?ADDR}, {port, 0} | ?SCTP_OPTS]), 402 {gen_sctp:listen(Sock, true), Sock}; 403gen_listen(tcp) -> 404 gen_tcp:listen(0, [{ip, ?ADDR} | ?TCP_OPTS]). 405 406%% gen_accept/2 407 408gen_accept(sctp, Sock) -> 409 #sctp_assoc_change{state = comm_up, 410 outbound_streams = OS, 411 inbound_streams = IS, 412 assoc_id = Id} 413 = ?RECV(?SCTP(Sock, {_, #sctp_assoc_change{} = S}), S), 414 415 putr(assoc, {OS, IS, Id}), 416 {ok, Sock}; 417gen_accept(tcp, LSock) -> 418 gen_tcp:accept(LSock). 419 420%% gen_send/3 421 422gen_send(sctp, Sock, Bin) -> 423 {OS, _IS, Id} = getr(assoc), 424 gen_sctp:send(Sock, Id, erlang:unique_integer([positive]) rem OS, Bin); 425gen_send(tcp, Sock, Bin) -> 426 gen_tcp:send(Sock, Bin). 427 428%% gen_recv/2 429 430gen_recv(sctp, Sock) -> 431 {_OS, _IS, Id} = getr(assoc), 432 receive 433 ?SCTP(Sock, {[#sctp_sndrcvinfo{assoc_id = Id}], Bin}) 434 when is_binary(Bin) -> 435 Bin 436 end; 437gen_recv(tcp, Sock) -> 438 tcp_recv(Sock, <<>>). 439 440tcp_recv(_, <<_:8, Len:24, _/binary>> = Bin) 441 when Len =< size(Bin) -> 442 Bin; 443tcp_recv(Sock, B) -> 444 receive {tcp, Sock, Bin} -> tcp_recv(Sock, <<B/binary, Bin/binary>>) end. 445 446%% putr/2 447 448putr(Key, Val) -> 449 put({?MODULE, Key}, Val). 450 451%% getr/1 452 453getr(Key) -> 454 get({?MODULE, Key}). 455