1%%
2%% %CopyrightBegin%
3%%
4%% Copyright Ericsson AB 2010-2017. All Rights Reserved.
5%%
6%% Licensed under the Apache License, Version 2.0 (the "License");
7%% you may not use this file except in compliance with the License.
8%% You may obtain a copy of the License at
9%%
10%%     http://www.apache.org/licenses/LICENSE-2.0
11%%
12%% Unless required by applicable law or agreed to in writing, software
13%% distributed under the License is distributed on an "AS IS" BASIS,
14%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15%% See the License for the specific language governing permissions and
16%% limitations under the License.
17%%
18%% %CopyrightEnd%
19%%
20
21%%
22%% Tests of diameter_tcp/sctp as implementations of the diameter
23%% transport interface.
24%%
25
26-module(diameter_transport_SUITE).
27
28-export([suite/0,
29         all/0,
30         groups/0,
31         init_per_suite/1,
32         end_per_suite/1]).
33
34%% testcases
35-export([start/1,
36         tcp_accept/1,
37         tcp_connect/1,
38         sctp_accept/1,
39         sctp_connect/1,
40         reconnect/1, reconnect/0,
41         stop/1]).
42
43-export([accept/1,
44         connect/1,
45         init/2]).
46
47-include_lib("kernel/include/inet_sctp.hrl").
48-include("diameter.hrl").
49
50-define(util, diameter_util).
51
52%% Corresponding to diameter_* transport modules.
53-define(TRANSPORTS, [tcp, sctp]).
54
55%% Receive a message.
56-define(RECV(Pat, Ret), receive Pat -> Ret end).
57-define(RECV(Pat), ?RECV(Pat, diameter_lib:now())).
58
59%% Sockets are opened on the loopback address.
60-define(ADDR, {127,0,0,1}).
61
62%% diameter_tcp doesn't use anything but host_ip_address, and that
63%% only is a local address isn't configured as at transport start.
64-define(SVC(Addrs), #diameter_service{capabilities
65                                      = #diameter_caps{host_ip_address
66                                                       = Addrs}}).
67
68%% The term we register after open a listening port with gen_{tcp,sctp}.
69-define(TEST_LISTENER(Ref, PortNr),
70        {?MODULE, listen, Ref, PortNr}).
71
72%% Message over the transport interface.
73-define(TMSG(T), {diameter, T}).
74
75%% Options for gen_tcp/gen_sctp.
76-define(TCP_OPTS,  [binary, {active, true}, {packet, 0}]).
77-define(SCTP_OPTS, [binary, {active, true}, {sctp_initmsg, ?SCTP_INIT}]).
78
79%% Request a specific number of streams just because we can.
80-define(SCTP_INIT, #sctp_initmsg{num_ostreams = 5,
81                                 max_instreams = 5}).
82
83%% Messages from gen_sctp.
84-define(SCTP(Sock, Data), {sctp, Sock, _, _, Data}).
85
86%% ===========================================================================
87
88suite() ->
89    [{timetrap, {seconds, 15}}].
90
91all() ->
92    [start,
93     {group, all},
94     {group, all, [parallel]},
95     stop].
96
97groups() ->
98    [{all, [], tc()}].
99
100tc() ->
101    [tcp_accept,
102     tcp_connect,
103     sctp_accept,
104     sctp_connect,
105     reconnect].
106
107init_per_suite(Config) ->
108    [{sctp, ?util:have_sctp()} | Config].
109
110end_per_suite(_Config) ->
111    ok.
112
113%% ===========================================================================
114
115start(_Config) ->
116    ok = diameter:start().
117
118stop(_Config) ->
119    ok = diameter:stop().
120
121%% ===========================================================================
122%% tcp_accept/1
123%% sctp_accept/1
124%%
125%% diameter transport accepting, test code connecting.
126
127tcp_accept(_) ->
128    accept(tcp).
129
130sctp_accept(Config) ->
131    case lists:member({sctp, true}, Config) of
132        true  -> accept(sctp);
133        false -> {skip, no_sctp}
134    end.
135
136%% Start multiple accepting transport processes that are connected to
137%% with an equal number of connecting processes using gen_tcp/sctp
138%% directly.
139
140-define(PEER_COUNT, 8).
141
142accept(Prot) ->
143    Ref = make_ref(),
144    true = diameter_reg:add_new({diameter_config, transport, Ref}), %% fake it
145    T = {Prot, Ref},
146    [] = ?util:run(?util:scramble(acc(2*?PEER_COUNT, T, []))).
147
148acc(0, _, Acc) ->
149    Acc;
150acc(N, T, Acc) ->
151    acc(N-1, T, [{?MODULE, [init,
152                            element(1 + N rem 2, {accept, gen_connect}),
153                            T]}
154                 | Acc]).
155
156%% ===========================================================================
157%% tcp_connect/1
158%% sctp_connect/1
159%%
160%% Test code accepting, diameter transport connecting.
161
162tcp_connect(_) ->
163    connect(tcp).
164
165sctp_connect(Config) ->
166    case lists:member({sctp, true}, Config) of
167        true  -> connect(sctp);
168        false -> {skip, no_sctp}
169    end.
170
171connect(Prot) ->
172    T = {Prot, make_ref()},
173    [] = ?util:run([{?MODULE, [init, X, T]} || X <- [gen_accept, connect]]).
174
175%% ===========================================================================
176%% reconnect/1
177%%
178%% Exercise reconnection behaviour: that a connecting transport
179%% doesn't try to establish a new connection until the old one is
180%% broken.
181
182reconnect() ->
183    [{timetrap, {minutes, 4}}].
184
185reconnect({listen, Ref}) ->
186    SvcName = make_ref(),
187    ok = start_service(SvcName),
188    LRef = ?util:listen(SvcName, tcp, [{watchdog_timer, 6000}]),
189    [_] = diameter_reg:wait({diameter_tcp, listener, {LRef, '_'}}),
190    true = diameter_reg:add_new({?MODULE, Ref, LRef}),
191
192    %% Wait for partner to request transport death.
193    TPid = abort(SvcName, LRef, Ref),
194
195    %% Kill transport to force the peer to reconnect.
196    exit(TPid, kill),
197
198    %% Wait for the partner again.
199    abort(SvcName, LRef, Ref);
200
201reconnect({connect, Ref}) ->
202    SvcName = make_ref(),
203    true = diameter:subscribe(SvcName),
204    ok = start_service(SvcName),
205    [{{_, _, LRef}, Pid}] = diameter_reg:wait({?MODULE, Ref, '_'}),
206    CRef = ?util:connect(SvcName, tcp, LRef, [{connect_timer, 2000},
207                                              {watchdog_timer, 6000}]),
208
209    %% Tell partner to kill transport after seeing that there are no
210    %% reconnection attempts.
211    abort(SvcName, Pid, Ref),
212
213    %% Transport goes down and is reestablished.
214    ?RECV(#diameter_event{service = SvcName, info = {down, CRef, _, _}}),
215    ?RECV(#diameter_event{service = SvcName, info = {reconnect, CRef, _}}),
216    ?RECV(#diameter_event{service = SvcName, info = {up, CRef, _, _, _}}),
217
218    %% Kill again.
219    abort(SvcName, Pid, Ref),
220
221    %% Wait for partner to die.
222    MRef = erlang:monitor(process, Pid),
223    ?RECV({'DOWN', MRef, process, _, _});
224
225reconnect(_) ->
226    Ref = make_ref(),
227    [] = ?util:run([{?MODULE, [reconnect, {T, Ref}]}
228                    || T <- [listen, connect]]).
229
230start_service(SvcName) ->
231    OH = diameter_util:unique_string(),
232    Opts = [{application, [{dictionary, diameter_gen_base_rfc6733},
233                           {module, diameter_callback}]},
234            {'Origin-Host', OH},
235            {'Origin-Realm', OH ++ ".org"},
236            {'Vendor-Id', 0},
237            {'Product-Name', "x"},
238            {'Auth-Application-Id', [0]}],
239    diameter:start_service(SvcName, Opts).
240
241abort(SvcName, Pid, Ref)
242  when is_pid(Pid) ->
243    receive
244        #diameter_event{service = SvcName, info = {reconnect, _, _}} = E ->
245            erlang:error(E)
246    after 45000 ->
247            ok
248    end,
249    Pid ! {abort, Ref};
250
251abort(SvcName, LRef, Ref)
252  when is_reference(LRef) ->
253    ?RECV({abort, Ref}),
254    [[{ref, LRef}, {type, listen}, {options, _}, {accept, [_,_] = Ts} | _]]
255                                                 %% assert on two accepting
256        = diameter:service_info(SvcName, transport),
257    [TPid] = [P || [{watchdog, {_,_,okay}}, {peer, {P,_}} | _] <- Ts],
258    TPid.
259
260%% ===========================================================================
261%% ===========================================================================
262
263%% init/2
264
265init(accept, {Prot, Ref}) ->
266    %% Start an accepting transport and receive notification of a
267    %% connection.
268    TPid = start_accept(Prot, Ref),
269
270    %% Receive a message and send it back.
271    <<_:8, Len:24, _/binary>> = Bin = bin(Prot, ?RECV(?TMSG({recv, P}), P)),
272
273    Len = size(Bin),
274    TPid ! ?TMSG({send, Bin}),
275
276    %% Expect the transport process to die as a result of the peer
277    %% closing the connection.
278    MRef = erlang:monitor(process, TPid),
279    ?RECV({'DOWN', MRef, process, _, _});
280
281init(gen_connect, {Prot, Ref}) ->
282    %% Lookup the peer's listening socket.
283    [PortNr] = ?util:lport(Prot, Ref),
284
285    %% Connect, send a message and receive it back.
286    {ok, Sock} = gen_connect(Prot, PortNr),
287    Bin = make_msg(),
288    ok = gen_send(Prot, Sock, Bin),
289    Bin = gen_recv(Prot, Sock);
290
291init(gen_accept, {Prot, Ref}) ->
292    %% Open a listening socket and publish the port number.
293    {ok, LSock} = gen_listen(Prot),
294    {ok, PortNr} = inet:port(LSock),
295    true = diameter_reg:add_new(?TEST_LISTENER(Ref, PortNr)),
296
297    %% Accept a connection, receive a message send it back, and wait
298    %% for the peer to close the connection.
299    {ok, Sock} = gen_accept(Prot, LSock),
300    Bin = gen_recv(Prot, Sock),
301    ok = gen_send(Prot, Sock, Bin),
302    receive
303        {tcp_closed, Sock} = T ->
304            T;
305        ?SCTP(Sock, {_, #sctp_assoc_change{}}) = T ->
306            T
307    end;
308
309init(connect, {Prot, Ref}) ->
310    %% Lookup the peer's listening socket.
311    [{?TEST_LISTENER(_, PortNr), _}]
312        = diameter_reg:wait(?TEST_LISTENER(Ref, '_')),
313
314    %% Start a connecting transport and receive notification of
315    %% the connection.
316    TPid = start_connect(Prot, PortNr, Ref),
317
318    %% Send a message and receive it back.
319    Bin = make_msg(),
320    TPid ! ?TMSG({send, Bin}),
321    Bin = bin(Prot, ?RECV(?TMSG({recv, P}), P)).
322
323bin(sctp, #diameter_packet{bin = Bin}) ->
324    Bin;
325bin(tcp, Bin) ->
326    Bin.
327
328%% make_msg/0
329%%
330%% A valid Diameter message in as far as diameter_tcp examines it,
331%% the module examining the length in the Diameter header to locate
332%% message boundaries.
333
334make_msg() ->
335    N = 1024,
336    Bin = rand_bytes(4*N),
337    Len = 4*(N+1),
338    <<1:8, Len:24, Bin/binary>>.
339
340%% crypto:rand_bytes/1 isn't available on all platforms (since openssl
341%% isn't) so roll our own. Not particularly random, but less verbose
342%% in trace.
343rand_bytes(N) ->
344    Oct = rand:uniform(256) - 1,
345    binary:copy(<<Oct>>, N).
346
347%% ===========================================================================
348
349%% start_connect/3
350
351start_connect(Prot, PortNr, Ref) ->
352    {ok, TPid} = start_connect(Prot,
353                               {connect, Ref},
354                               ?SVC([]),
355                               [{raddr, ?ADDR},
356                                {rport, PortNr},
357                                {ip, ?ADDR},
358                                {port, 0}]),
359    connected(Prot, TPid),
360    TPid.
361
362connected(sctp, TPid) ->
363    ?RECV(?TMSG({TPid, connected, _}));
364connected(tcp, TPid) ->
365    ?RECV(?TMSG({TPid, connected, _, [?ADDR]})).
366
367start_connect(sctp, T, Svc, Opts) ->
368    {ok, TPid, [?ADDR]}
369        = diameter_sctp:start(T, Svc, [{sctp_initmsg, ?SCTP_INIT} | Opts]),
370    {ok, TPid};
371start_connect(tcp, T, Svc, Opts) ->
372    diameter_tcp:start(T, Svc, Opts).
373
374%% start_accept/2
375
376start_accept(Prot, Ref) ->
377    {ok, TPid, [?ADDR]}
378        = start_accept(Prot, {accept, Ref}, ?SVC([?ADDR]), [{port, 0}]),
379    ?RECV(?TMSG({TPid, connected})),
380    TPid.
381
382start_accept(sctp, T, Svc, Opts) ->
383    diameter_sctp:start(T, Svc, [{sctp_initmsg, ?SCTP_INIT} | Opts]);
384start_accept(tcp, T, Svc, Opts) ->
385    diameter_tcp:start(T, Svc, Opts).
386
387%% ===========================================================================
388
389%% gen_connect/2
390
391gen_connect(sctp = P, PortNr) ->
392    {ok, Sock} = Ok = gen_sctp:open([{ip, ?ADDR}, {port, 0} | ?SCTP_OPTS]),
393    ok = gen_sctp:connect_init(Sock, ?ADDR, PortNr, []),
394    Ok = gen_accept(P, Sock);
395gen_connect(tcp, PortNr) ->
396    gen_tcp:connect(?ADDR, PortNr, ?TCP_OPTS).
397
398%% gen_listen/1
399
400gen_listen(sctp) ->
401    {ok, Sock} = gen_sctp:open([{ip, ?ADDR}, {port, 0} | ?SCTP_OPTS]),
402    {gen_sctp:listen(Sock, true), Sock};
403gen_listen(tcp) ->
404    gen_tcp:listen(0, [{ip, ?ADDR} | ?TCP_OPTS]).
405
406%% gen_accept/2
407
408gen_accept(sctp, Sock) ->
409    #sctp_assoc_change{state = comm_up,
410                       outbound_streams = OS,
411                       inbound_streams = IS,
412                       assoc_id = Id}
413        = ?RECV(?SCTP(Sock, {_, #sctp_assoc_change{} = S}), S),
414
415    putr(assoc, {OS, IS, Id}),
416    {ok, Sock};
417gen_accept(tcp, LSock) ->
418    gen_tcp:accept(LSock).
419
420%% gen_send/3
421
422gen_send(sctp, Sock, Bin) ->
423    {OS, _IS, Id} = getr(assoc),
424    gen_sctp:send(Sock, Id, erlang:unique_integer([positive]) rem OS, Bin);
425gen_send(tcp, Sock, Bin) ->
426    gen_tcp:send(Sock, Bin).
427
428%% gen_recv/2
429
430gen_recv(sctp, Sock) ->
431    {_OS, _IS, Id} = getr(assoc),
432    receive
433        ?SCTP(Sock, {[#sctp_sndrcvinfo{assoc_id = Id}], Bin})
434          when is_binary(Bin) ->
435            Bin
436    end;
437gen_recv(tcp, Sock) ->
438    tcp_recv(Sock, <<>>).
439
440tcp_recv(_, <<_:8, Len:24, _/binary>> = Bin)
441  when Len =< size(Bin) ->
442    Bin;
443tcp_recv(Sock, B) ->
444    receive {tcp, Sock, Bin} -> tcp_recv(Sock, <<B/binary, Bin/binary>>) end.
445
446%% putr/2
447
448putr(Key, Val) ->
449    put({?MODULE, Key}, Val).
450
451%% getr/1
452
453getr(Key) ->
454    get({?MODULE, Key}).
455