1%%
2%% %CopyrightBegin%
3%%
4%% Copyright Ericsson AB 1997-2018. All Rights Reserved.
5%%
6%% Licensed under the Apache License, Version 2.0 (the "License");
7%% you may not use this file except in compliance with the License.
8%% You may obtain a copy of the License at
9%%
10%%     http://www.apache.org/licenses/LICENSE-2.0
11%%
12%% Unless required by applicable law or agreed to in writing, software
13%% distributed under the License is distributed on an "AS IS" BASIS,
14%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15%% See the License for the specific language governing permissions and
16%% limitations under the License.
17%%
18%% %CopyrightEnd%
19%%
20
21-module(distribution_SUITE).
22-compile(r16).
23
24-define(VERSION_MAGIC,       131).
25
26-define(ATOM_EXT,            100).
27-define(REFERENCE_EXT,       101).
28-define(PORT_EXT,            102).
29-define(PID_EXT,             103).
30-define(NEW_REFERENCE_EXT,   114).
31-define(ATOM_UTF8_EXT,       118).
32-define(SMALL_ATOM_UTF8_EXT, 119).
33
34%% Tests distribution and the tcp driver.
35
36-include_lib("common_test/include/ct.hrl").
37
38%-define(Line, erlang:display({line,?LINE}),).
39-define(Line,).
40
41-export([all/0, suite/0, groups/0,
42         ping/1, bulk_send_small/1,
43         group_leader/1,
44         optimistic_dflags/1,
45         bulk_send_big/1, bulk_send_bigbig/1,
46         local_send_small/1, local_send_big/1,
47         local_send_legal/1, link_to_busy/1, exit_to_busy/1,
48         lost_exit/1, link_to_dead/1, link_to_dead_new_node/1,
49         ref_port_roundtrip/1, nil_roundtrip/1,
50         trap_bif_1/1, trap_bif_2/1, trap_bif_3/1,
51         stop_dist/1,
52         dist_auto_connect_never/1, dist_auto_connect_once/1,
53         dist_parallel_send/1,
54         atom_roundtrip/1,
55         unicode_atom_roundtrip/1,
56         atom_roundtrip_r16b/1,
57         contended_atom_cache_entry/1,
58         contended_unicode_atom_cache_entry/1,
59         bad_dist_structure/1,
60         bad_dist_ext_receive/1,
61         bad_dist_ext_process_info/1,
62         bad_dist_ext_control/1,
63         bad_dist_ext_connection_id/1,
64         bad_dist_ext_size/1,
65	 start_epmd_false/1, epmd_module/1]).
66
67%% Internal exports.
68-export([sender/3, receiver2/2, dummy_waiter/0, dead_process/0,
69         group_leader_1/1,
70         optimistic_dflags_echo/0, optimistic_dflags_sender/1,
71         roundtrip/1, bounce/1, do_dist_auto_connect/1, inet_rpc_server/1,
72         dist_parallel_sender/3, dist_parallel_receiver/0,
73         dist_evil_parallel_receiver/0]).
74
75%% epmd_module exports
76-export([start_link/0, register_node/2, register_node/3, port_please/2, address_please/3]).
77
78suite() ->
79    [{ct_hooks,[ts_install_cth]},
80     {timetrap, {minutes, 4}}].
81
82all() ->
83    [ping, {group, bulk_send}, {group, local_send},
84     group_leader,
85     optimistic_dflags,
86     link_to_busy, exit_to_busy, lost_exit, link_to_dead,
87     link_to_dead_new_node,
88     ref_port_roundtrip, nil_roundtrip, stop_dist,
89     {group, trap_bif}, {group, dist_auto_connect},
90     dist_parallel_send, atom_roundtrip, unicode_atom_roundtrip,
91     atom_roundtrip_r16b,
92     contended_atom_cache_entry, contended_unicode_atom_cache_entry,
93     bad_dist_structure, {group, bad_dist_ext},
94     start_epmd_false, epmd_module].
95
96groups() ->
97    [{bulk_send, [], [bulk_send_small, bulk_send_big, bulk_send_bigbig]},
98     {local_send, [],
99      [local_send_small, local_send_big, local_send_legal]},
100     {trap_bif, [], [trap_bif_1, trap_bif_2, trap_bif_3]},
101     {dist_auto_connect, [],
102      [dist_auto_connect_never, dist_auto_connect_once]},
103     {bad_dist_ext, [],
104      [bad_dist_ext_receive, bad_dist_ext_process_info,
105       bad_dist_ext_size,
106       bad_dist_ext_control, bad_dist_ext_connection_id]}].
107
108%% Tests pinging a node in different ways.
109ping(Config) when is_list(Config) ->
110    Times = 1024,
111
112    %% Ping a non-existing node many times.  This used to crash the emulator
113    %% on Windows.
114
115    Host = hostname(),
116    BadName = list_to_atom("__pucko__@" ++ Host),
117    io:format("Pinging ~s (assumed to not exist)", [BadName]),
118    test_server:do_times(Times, fun() -> pang = net_adm:ping(BadName)
119                                end),
120
121    %% Pings another node.
122
123    {ok, OtherNode} = start_node(distribution_SUITE_other),
124    io:format("Pinging ~s (assumed to exist)", [OtherNode]),
125    test_server:do_times(Times, fun() -> pong = net_adm:ping(OtherNode) end),
126    stop_node(OtherNode),
127
128    %% Pings our own node many times.
129
130    Node = node(),
131    io:format("Pinging ~s (the same node)", [Node]),
132    test_server:do_times(Times, fun() -> pong = net_adm:ping(Node) end),
133
134    ok.
135
136%% Test erlang:group_leader(_, ExternalPid), i.e. DOP_GROUP_LEADER
137group_leader(Config) when is_list(Config) ->
138    ?Line Sock = start_relay_node(group_leader_1, []),
139    ?Line Sock2 = start_relay_node(group_leader_2, []),
140    try
141        ?Line Node2 = inet_rpc_nodename(Sock2),
142        ?Line {ok, ok} = do_inet_rpc(Sock, ?MODULE, group_leader_1, [Node2])
143    after
144        ?Line stop_relay_node(Sock),
145        ?Line stop_relay_node(Sock2)
146    end,
147    ok.
148
149group_leader_1(Node2) ->
150    ?Line ExtPid = spawn(Node2, fun F() ->
151                                        receive {From, group_leader} ->
152                                                From ! {self(), group_leader, group_leader()}
153                                        end,
154                                        F()
155                                end),
156    ?Line GL1 = self(),
157    ?Line group_leader(GL1, ExtPid),
158    ?Line ExtPid ! {self(), group_leader},
159    ?Line {ExtPid, group_leader, GL1} = receive_one(),
160
161    %% Kill connection and repeat test when group_leader/2 triggers auto-connect
162    ?Line net_kernel:monitor_nodes(true),
163    ?Line net_kernel:disconnect(Node2),
164    ?Line {nodedown, Node2} = receive_one(),
165    ?Line GL2 = spawn(fun() -> dummy end),
166    ?Line group_leader(GL2, ExtPid),
167    ?Line {nodeup, Node2} = receive_one(),
168    ?Line ExtPid ! {self(), group_leader},
169    ?Line {ExtPid, group_leader, GL2} = receive_one(),
170    ok.
171
172%% Test optimistic distribution flags toward pending connections (DFLAG_DIST_HOPEFULLY)
173optimistic_dflags(Config) when is_list(Config) ->
174    ?Line Sender = start_relay_node(optimistic_dflags_sender, []),
175    ?Line Echo = start_relay_node(optimistic_dflags_echo, []),
176    try
177        ?Line {ok, ok} = do_inet_rpc(Echo, ?MODULE, optimistic_dflags_echo, []),
178
179        ?Line EchoNode = inet_rpc_nodename(Echo),
180        ?Line {ok, ok} = do_inet_rpc(Sender, ?MODULE, optimistic_dflags_sender, [EchoNode])
181    after
182        ?Line stop_relay_node(Sender),
183        ?Line stop_relay_node(Echo)
184    end,
185    ok.
186
187optimistic_dflags_echo() ->
188    P = spawn(fun F() ->
189                      receive {From, Term} ->
190                              From ! {self(), Term}
191                      end,
192                      F()
193              end),
194    register(optimistic_dflags_echo, P),
195    optimistic_dflags_echo ! {self(), hello},
196    {P, hello} = receive_one(),
197    ok.
198
199optimistic_dflags_sender(EchoNode) ->
200    ?Line net_kernel:monitor_nodes(true),
201
202    optimistic_dflags_do(EchoNode, <<1:1>>),
203    optimistic_dflags_do(EchoNode, fun lists:map/2),
204    ok.
205
206optimistic_dflags_do(EchoNode, Term) ->
207    ?Line {optimistic_dflags_echo, EchoNode} ! {self(), Term},
208    ?Line {nodeup, EchoNode} = receive_one(),
209    ?Line {EchoPid, Term} = receive_one(),
210    %% repeat with pid destination
211    ?Line net_kernel:disconnect(EchoNode),
212    ?Line {nodedown, EchoNode} = receive_one(),
213    ?Line EchoPid ! {self(), Term},
214    ?Line {nodeup, EchoNode} = receive_one(),
215    ?Line {EchoPid, Term} = receive_one(),
216
217    ?Line net_kernel:disconnect(EchoNode),
218    ?Line {nodedown, EchoNode} = receive_one(),
219    ok.
220
221
222receive_one() ->
223    receive M -> M after 1000 -> timeout end.
224
225
226bulk_send_small(Config) when is_list(Config) ->
227    bulk_send(64, 32).
228
229bulk_send_big(Config) when is_list(Config) ->
230    bulk_send(32, 64).
231
232bulk_send(Terms, BinSize) ->
233    ct:timetrap({seconds, 30}),
234
235    io:format("Sending ~w binaries, each of size ~w K", [Terms, BinSize]),
236    {ok, Node} = start_node(bulk_receiver),
237    Recv = spawn(Node, erlang, apply, [fun receiver/2, [0, 0]]),
238    Bin = binary:copy(<<253>>, BinSize*1024),
239    Size = Terms*size(Bin),
240    {Elapsed, {Terms, Size}} = test_server:timecall(?MODULE, sender,
241                                                    [Recv, Bin, Terms]),
242    stop_node(Node),
243    {comment, integer_to_list(round(Size/1024/max(1,Elapsed))) ++ " K/s"}.
244
245sender(To, _Bin, 0) ->
246    To ! {done, self()},
247    receive
248        Any ->
249            Any
250    end;
251sender(To, Bin, Left) ->
252    To ! {term, Bin},
253    sender(To, Bin, Left-1).
254
255bulk_send_bigbig(Config) when is_list(Config) ->
256    Terms = 32*5,
257    BinSize = 4,
258    {Rate1, MonitorCount1} = bulk_sendsend2(Terms, BinSize,   5),
259    {Rate2, MonitorCount2} = bulk_sendsend2(Terms, BinSize, 995),
260    Ratio = if MonitorCount2 == 0 -> MonitorCount1 / 1.0;
261               true               -> MonitorCount1 / MonitorCount2
262            end,
263    Comment0 = io_lib:format("~p K/s, ~p K/s, "
264                             "~p monitor msgs, ~p monitor msgs, "
265                             "~.1f monitor ratio",
266                             [Rate1,Rate2,MonitorCount1,
267                              MonitorCount2,Ratio]),
268    Comment = lists:flatten(Comment0),
269    {comment,Comment}.
270
271bulk_sendsend2(Terms, BinSize, BusyBufSize) ->
272    ct:timetrap({seconds, 30}),
273
274    io:format("\nSending ~w binaries, each of size ~w K",
275              [Terms, BinSize]),
276    {ok, NodeRecv} = start_node(bulk_receiver),
277    Recv = spawn(NodeRecv, erlang, apply, [fun receiver/2, [0, 0]]),
278    Bin = binary:copy(<<253>>, BinSize*1024),
279
280    %% SLF LEFT OFF HERE.
281    %% When the caller uses small hunks, like 4k via
282    %% bulk_sendsend(32*5, 4), then (on my laptop at least), we get
283    %% zero monitor messages.  But if we use "+zdbbl 5", then we
284    %% get a lot of monitor messages.  So, if we can count up the
285    %% total number of monitor messages that we get when running both
286    %% default busy size and "+zdbbl 5", and if the 5 case gets
287    %% "many many more" monitor messages, then we know we're working.
288
289    {ok, NodeSend} = start_node(bulk_sender, "+zdbbl " ++
290                                    integer_to_list(BusyBufSize)),
291    _Send = spawn(NodeSend, erlang, apply,
292                  [fun sendersender/4, [self(), Recv, Bin, Terms]]),
293    {Elapsed, {_TermsN, SizeN}, MonitorCount} =
294        receive
295            %% On some platforms (Windows), the time taken is 0 so we
296            %% simulate that some little time has passed.
297            {sendersender, {0.0,T,MC}} ->
298                {0.0015, T, MC};
299            {sendersender, BigRes} ->
300                BigRes
301        end,
302    stop_node(NodeRecv),
303    stop_node(NodeSend),
304    {round(SizeN/1024/Elapsed), MonitorCount}.
305
306%% Sender process to be run on a slave node
307
308sendersender(Parent, To, Bin, Left) ->
309    erlang:system_monitor(self(), [busy_dist_port]),
310    _ = spawn(fun() ->
311                      sendersender_send(To, Bin, Left),
312                      exit(normal)
313              end),
314    {USec, {Res, MonitorCount}} =
315        timer:tc(fun() ->
316                         sendersender_send(To, Bin, Left),
317                         To ! {done, self()},
318                         count_monitors(0)
319                 end),
320    Parent ! {sendersender, {USec/1000000, Res, MonitorCount}}.
321
322sendersender_send(_To, _Bin, 0) ->
323    ok;
324sendersender_send(To, Bin, Left) ->
325    To ! {term, Bin},
326    sendersender_send(To, Bin, Left-1).
327
328count_monitors(MonitorCount) ->
329    receive
330        {monitor, _Pid, _Type, _Info} ->
331            count_monitors(MonitorCount + 1)
332    after 0 ->
333            receive
334                {_,_}=Any ->
335                    {Any,MonitorCount}
336            end
337    end.
338
339%% Receiver process to be run on a slave node.
340
341receiver(Terms, Size) ->
342    receive
343        {term, Bin} ->
344            receiver(Terms+1, Size+byte_size(Bin));
345        {done, ReplyTo} ->
346            ReplyTo ! {Terms, Size}
347    end.
348
349
350
351%% Sends several big message to an non-registered process on the local node.
352local_send_big(Config) when is_list(Config) ->
353    Data0= ["Tests sending small and big messages to a non-existing ",
354            "local registered process."],
355    Data1=[Data0,[Data0, Data0, [Data0], Data0],Data0],
356    Data2=Data0++lists:flatten(Data1)++
357    list_to_binary(lists:flatten(Data1)),
358    Func=fun() -> Data2= {arbitrary_name, node()} ! Data2 end,
359    test_server:do_times(4096, Func),
360    ok.
361
362%% Sends a small message to an non-registered process on the local node.
363local_send_small(Config) when is_list(Config) ->
364    Data={some_stupid, "arbitrary", 'Data'},
365    Func=fun() -> Data= {unregistered_name, node()} ! Data end,
366    test_server:do_times(4096, Func),
367    ok.
368
369%% Sends data to a registered process on the local node, as if it was on another node.
370local_send_legal(Config) when is_list(Config) ->
371    Times=16384,
372    Txt = "Some Not so random Data",
373    Data={[Txt,Txt,Txt], [Txt,Txt,Txt]},
374    Pid=spawn(?MODULE,receiver2, [0, 0]) ,
375    true=register(registered_process, Pid),
376
377    Func=fun() -> Data={registered_process, node()} ! Data end,
378    TotalSize=size(Data)*Times,
379    test_server:do_times(Times, Func),
380
381    % Check that all msgs really came through.
382    Me=self(),
383    {done, Me}=
384    {registered_process, node()} ! {done, Me},
385    receive
386        {Times, TotalSize} ->
387            ok;
388        _ ->
389            ct:fail("Wrong number of msgs received.")
390    end,
391    ok.
392
393receiver2(Num, TotSize) ->
394    receive
395        {done, ReplyTo} ->
396            ReplyTo ! {Num, TotSize};
397        Stuff ->
398            receiver2(Num+1, TotSize+size(Stuff))
399    end.
400
401%% Test that link/1 to a busy distribution port works.
402link_to_busy(Config) when is_list(Config) ->
403    ct:timetrap({seconds, 60}),
404    {ok, Node} = start_node(link_to_busy),
405    Recv = spawn(Node, erlang, apply, [fun sink/1, [link_to_busy_sink]]),
406
407    Tracer = case os:getenv("TRACE_BUSY_DIST_PORT") of
408                 "true" -> start_busy_dist_port_tracer();
409                 _ -> false
410             end,
411
412    %% We will spawn off a process which will try to link to the other
413    %% node.  The linker process will not actually run until this
414    %% process is suspended due to the busy distribution port (because
415    %% of the big send).  When the link/1 is run, the linker
416    %% process will block, too, because of the because busy port,
417    %% and will later be restarted.
418
419    do_busy_test(Node, fun () -> linker(Recv) end),
420
421    %% Same thing, but we apply link/1 instead of calling it directly.
422
423    do_busy_test(Node, fun () -> applied_linker(Recv) end),
424
425    %% Same thing again, but we apply link/1 in the tail of a function.
426
427    do_busy_test(Node, fun () -> tail_applied_linker(Recv) end),
428
429    %% Done.
430    stop_node(Node),
431    stop_busy_dist_port_tracer(Tracer),
432    ok.
433
434linker(Pid) ->
435    true = link(Pid),
436    {links, Links} = process_info(self(), links),
437    true = lists:member(Pid, Links).
438
439applied_linker(Pid) ->
440    true = apply(erlang, link, [Pid]),
441    {links, Links} = process_info(self(), links),
442    true = lists:member(Pid, Links).
443
444tail_applied_linker(Pid) ->
445    apply(erlang, link, [Pid]).
446
447%% Test that exit/2 to a busy distribution port works.
448exit_to_busy(Config) when is_list(Config) ->
449    ct:timetrap({seconds, 60}),
450    {ok, Node} = start_node(exit_to_busy),
451
452    Tracer = case os:getenv("TRACE_BUSY_DIST_PORT") of
453                 "true" -> start_busy_dist_port_tracer();
454                 _ -> false
455             end,
456
457    %% We will spawn off a process which will try to exit a process on
458    %% the other node.  That process will not actually run until this
459    %% process is suspended due to the busy distribution port
460    %% The process executing exit/2 will block,
461    %% too, because of the busy distribution port, and will be allowed
462    %% to continue when the port becomes non-busy.
463
464    Recv1 = spawn(Node, fun () -> sink(exit_to_busy_sink) end),
465    M1 = erlang:monitor(process, Recv1),
466    do_busy_test(Node, fun () -> joey_killer(Recv1) end),
467    receive
468        {'DOWN', M1, process, Recv1, R1} ->
469            joey_said_die = R1
470    end,
471
472    %% Same thing, but tail call to exit/2.
473    Recv2 = spawn(Node, fun () -> sink(exit_to_busy_sink) end),
474    M2 = erlang:monitor(process, Recv2),
475    do_busy_test(Node, fun () -> tail_joey_killer(Recv2) end),
476    receive
477        {'DOWN', M2, process, Recv2, R2} ->
478            joey_said_die = R2
479    end,
480
481    %% Same thing, but we apply exit/2 instead of calling it directly.
482    Recv3 = spawn(Node, fun () -> sink(exit_to_busy_sink) end),
483    M3 = erlang:monitor(process, Recv3),
484    do_busy_test(Node, fun () -> applied_joey_killer(Recv3) end),
485    receive
486        {'DOWN', M3, process, Recv3, R3} ->
487            joey_said_die = R3
488    end,
489
490    %% Same thing again, but we apply exit/2 in the tail of a function.
491    Recv4 = spawn(Node, fun () -> sink(exit_to_busy_sink) end),
492    M4 = erlang:monitor(process, Recv4),
493    do_busy_test(Node, fun () -> tail_applied_joey_killer(Recv4) end),
494    receive
495        {'DOWN', M4, process, Recv4, R4} ->
496            joey_said_die = R4
497    end,
498
499    %% Done.
500    stop_node(Node),
501    stop_busy_dist_port_tracer(Tracer),
502    ok.
503
504make_busy_data() ->
505    Size = 1024*1024,
506    Key = '__busy__port__data__',
507    case get(Key) of
508        undefined ->
509            Data = list_to_binary(lists:duplicate(Size, 253)),
510            put(Key, Data),
511            Data;
512        Data ->
513            true = is_binary(Data),
514            true = size(Data) == Size,
515            Data
516    end.
517
518make_busy(Node, Time) when is_integer(Time) ->
519    Own = 500,
520    freeze_node(Node, Time+Own),
521    Data = make_busy_data(),
522    DCtrl = dctrl(Node),
523    %% first make port busy
524    Pid = spawn_link(fun () ->
525                             forever(fun () ->
526                                             dctrl_dop_reg_send(Node,
527                                                                '__noone__',
528                                                                Data)
529                                     end)
530                     end),
531    receive after Own -> ok end,
532    until(fun () ->
533                  case {DCtrl, process_info(Pid, status)} of
534                      {DPrt, {status, suspended}} when is_port(DPrt) -> true;
535                      {DPid, {status, waiting}} when is_pid(DPid) -> true;
536                      _ -> false
537                  end
538          end),
539    %% then dist entry
540    make_busy(Node, [nosuspend], Data),
541    Pid.
542
543make_busy(Node, Opts, Data) ->
544    case erlang:send({'__noone__', Node}, Data, Opts) of
545        nosuspend -> nosuspend;
546        _ -> make_busy(Node, Opts, Data)
547    end.
548
549unmake_busy(Pid) ->
550    unlink(Pid),
551    exit(Pid, bang).
552
553do_busy_test(Node, Fun) ->
554    Busy = make_busy(Node, 1000),
555    {P, M} = spawn_monitor(Fun),
556    receive after 100 -> ok end,
557    Pinfo = process_info(P, [status, current_function]),
558    unmake_busy(Busy),
559    io:format("~p : ~p~n", [P, Pinfo]),
560    case Pinfo of
561        undefined ->
562            receive
563                {'DOWN', M, process, P, Reason} ->
564                    io:format("~p died with exit reason ~p~n", [P, Reason])
565            end,
566            ct:fail(premature_death);
567        _ ->
568            %% Don't match arity; it is different in debug and
569            %% optimized emulator
570            [{status, suspended},
571             {current_function, {erlang, bif_return_trap, _}}] = Pinfo,
572            receive
573                {'DOWN', M, process, P, Reason} ->
574                    io:format("~p died with exit reason ~p~n", [P, Reason]),
575                    normal = Reason
576            end
577    end.
578
579remote_is_process_alive(Pid) ->
580    rpc:call(node(Pid), erlang, is_process_alive,
581             [Pid]).
582
583joey_killer(Pid) ->
584    exit(Pid, joey_said_die),
585    until(fun () -> false == remote_is_process_alive(Pid) end).
586
587tail_joey_killer(Pid) ->
588    exit(Pid, joey_said_die).
589
590applied_joey_killer(Pid) ->
591    apply(erlang, exit, [Pid, joey_said_die]),
592    until(fun () -> false == remote_is_process_alive(Pid) end).
593
594tail_applied_joey_killer(Pid) ->
595    apply(erlang, exit, [Pid, joey_said_die]).
596
597sink(Name) ->
598    register(Name, self()),
599    sink1().
600
601sink1() ->
602    receive
603        _Any -> sink1()
604    end.
605
606%% Test that EXIT and DOWN messages send to another node are not lost if
607%% the distribution port is busy.
608lost_exit(Config) when is_list(Config) ->
609    {ok, Node} = start_node(lost_exit),
610
611    Tracer = case os:getenv("TRACE_BUSY_DIST_PORT") of
612                 "true" -> start_busy_dist_port_tracer();
613                 _ -> false
614             end,
615
616    Self = self(),
617    Die = make_ref(),
618    R1 = spawn(fun () -> receive after infinity -> ok end end),
619    MR1 = erlang:monitor(process, R1),
620
621    {L1, ML1} = spawn_monitor(fun() ->
622                                      link(R1),
623                                      Self ! {self(), linked},
624                                      receive
625                                          Die ->
626                                              exit(controlled_suicide)
627                                      end
628                              end),
629
630    R2 = spawn(fun () ->
631                       M = erlang:monitor(process, L1),
632                       receive
633                           {'DOWN', M, process, L1, R} ->
634                               Self ! {self(), got_down_message, L1, R}
635                       end
636               end),
637
638    receive {L1, linked} -> ok end,
639
640    Busy = make_busy(Node, 2000),
641    receive after 100 -> ok end,
642    L1 ! Die,
643    receive
644        {'DOWN', ML1, process, L1, RL1} ->
645            controlled_suicide = RL1
646    end,
647    receive after 500 -> ok end,
648    unmake_busy(Busy),
649
650    receive
651        {'DOWN', MR1, process, R1, RR1} ->
652            controlled_suicide = RR1
653    end,
654
655    receive
656        {R2, got_down_message, L1, RR2} ->
657            controlled_suicide = RR2
658    end,
659
660    %% Done.
661    stop_busy_dist_port_tracer(Tracer),
662    stop_node(Node),
663    ok.
664
665dummy_waiter() ->
666    receive
667    after infinity ->
668              ok
669    end.
670
671%% Test that linking to a dead remote process gives an EXIT message
672%% AND that the link is teared down.
673link_to_dead(Config) when is_list(Config) ->
674    process_flag(trap_exit, true),
675    {ok, Node} = start_node(link_to_dead),
676    %    monitor_node(Node, true),
677    net_adm:ping(Node), %% Ts_cross_server workaround.
678    Pid = spawn(Node, ?MODULE, dead_process, []),
679    receive
680    after 5000 -> ok
681    end,
682    link(Pid),
683    receive
684        {'EXIT', Pid, noproc} ->
685            ok;
686        Other ->
687            ct:fail({unexpected_message, Other})
688    after 5000 ->
689              ct:fail(nothing_received)
690    end,
691    {links, Links} = process_info(self(), links),
692    io:format("Pid=~p, links=~p", [Pid, Links]),
693    false = lists:member(Pid, Links),
694    stop_node(Node),
695    receive
696        Message ->
697            ct:fail({unexpected_message, Message})
698    after 3000 ->
699              ok
700    end,
701    ok.
702
703dead_process() ->
704    erlang:error(die).
705
706%% Test that linking to a pid on node that has gone and restarted gives
707%% the correct EXIT message (OTP-2304).
708link_to_dead_new_node(Config) when is_list(Config) ->
709    process_flag(trap_exit, true),
710
711    %% Start the node, get a Pid and stop the node again.
712    {ok, Node} = start_node(link_to_dead_new_node),
713    Pid = spawn(Node, ?MODULE, dead_process, []),
714    stop_node(Node),
715
716    %% Start a new node with the same name.
717    {ok, Node} = start_node(link_to_dead_new_node),
718    link(Pid),
719    receive
720        {'EXIT', Pid, noproc} ->
721            ok;
722        Other ->
723            ct:fail({unexpected_message, Other})
724    after 5000 ->
725              ct:fail(nothing_received)
726    end,
727
728    %% Make sure that the link wasn't created.
729    {links, Links} = process_info(self(), links),
730    io:format("Pid=~p, links=~p", [Pid, Links]),
731    false = lists:member(Pid, Links),
732    stop_node(Node),
733    receive
734        Message ->
735            ct:fail({unexpected_message, Message})
736    after 3000 ->
737              ok
738    end,
739    ok.
740
741%% Test that sending a port or reference to another node and back again
742%% doesn't correct them in any way.
743ref_port_roundtrip(Config) when is_list(Config) ->
744    process_flag(trap_exit, true),
745    Port = make_port(),
746    Ref = make_ref(),
747    {ok, Node} = start_node(ref_port_roundtrip),
748    net_adm:ping(Node),
749    Term = {Port, Ref},
750    io:format("Term before: ~p", [show_term(Term)]),
751    Pid = spawn_link(Node, ?MODULE, roundtrip, [Term]),
752    receive after 5000 -> ok end,
753    stop_node(Node),
754    receive
755        {'EXIT', Pid, {Port, Ref}} ->
756            io:format("Term after: ~p", [show_term(Term)]),
757            ok;
758        Other ->
759            io:format("Term after: ~p", [show_term(Term)]),
760            ct:fail({unexpected, Other})
761    after 10000 ->
762              ct:fail(timeout)
763    end,
764    ok.
765
766make_port() ->
767    hd(erlang:ports()).
768
769roundtrip(Term) ->
770    exit(Term).
771
772%% Test that the smallest external term [] aka NIL can be sent to
773%% another node node and back again.
774nil_roundtrip(Config) when is_list(Config) ->
775    process_flag(trap_exit, true),
776    {ok, Node} = start_node(nil_roundtrip),
777    net_adm:ping(Node),
778    Pid = spawn_link(Node, ?MODULE, bounce, [self()]),
779    Pid ! [],
780    receive
781        [] ->
782            receive
783                {'EXIT', Pid, []} ->
784                    stop_node(Node),
785                    ok
786            end
787    end.
788
789bounce(Dest) ->
790    receive Msg ->
791                Dest ! Msg,
792                exit(Msg)
793    end.
794
795show_term(Term) ->
796    binary_to_list(term_to_binary(Term)).
797
798%% Tests behaviour after net_kernel:stop (OTP-2586).
799stop_dist(Config) when is_list(Config) ->
800    Str = os:cmd(ct:get_progname()
801                 ++ " -noshell -pa "
802                 ++ proplists:get_value(data_dir, Config)
803                 ++ " -s run"),
804    %% The "true" may be followed by an error report, so ignore anything that
805    %% follows it.
806    "true\n"++_ = Str,
807
808    %% "May fail on FreeBSD due to differently configured name lookup - ask Arndt",
809    %% if you can find him.
810
811    ok.
812
813
814trap_bif_1(Config) when is_list(Config) ->
815    {true} = tr1(),
816    ok.
817
818trap_bif_2(Config) when is_list(Config) ->
819    {true} = tr2(),
820    ok.
821
822trap_bif_3(Config) when is_list(Config) ->
823    {hoo} = tr3(),
824    ok.
825
826tr1() ->
827    NonExisting = 'abc@boromir',
828    X = erlang:monitor_node(NonExisting, true),
829    {X}.
830
831tr2() ->
832    NonExisting = 'abc@boromir',
833    X = apply(erlang, monitor_node, [NonExisting, true]),
834    {X}.
835
836tr3() ->
837    NonExisting = 'abc@boromir',
838    X = {NonExisting, glirp} ! hoo,
839    {X}.
840
841
842
843
844% This has to be done by nodes with differrent cookies, otherwise global
845% will connect nodes, which is correct, but makes it hard to test.
846% * Start two nodes, n1 and n2. n2 with the dist_auto_connect once parameter
847% * n2 pings n1 -> connection
848% * check that they now know each other
849% * Kill n1
850% * Make sure n2 gets pang when pinging n1
851% * restart n1
852% * Make sure n2 *still gets pang*!
853% * Ping n2 from n1 -> pong
854% * n2 now also gets pong when pinging n1
855% * disconnect n2 from n1
856% * n2 gets pang when pinging n1
857% * n2 forces connection by using net_kernel:connect_node (ovverrides)
858% * n2 gets pong when pinging n1.
859
860%% Test the dist_auto_connect once kernel parameter
861dist_auto_connect_once(Config) when is_list(Config) ->
862    Sock = start_relay_node(dist_auto_connect_relay_node,[]),
863    NN = inet_rpc_nodename(Sock),
864    Sock2 = start_relay_node(dist_auto_connect_once_node,
865                             "-kernel dist_auto_connect once"),
866    NN2 = inet_rpc_nodename(Sock2),
867    {ok,[]} = do_inet_rpc(Sock,erlang,nodes,[]),
868    {ok, pong} = do_inet_rpc(Sock2,net_adm,ping,[NN]),
869    {ok,[NN2]} = do_inet_rpc(Sock,erlang,nodes,[]),
870    {ok,[NN]} = do_inet_rpc(Sock2,erlang,nodes,[]),
871    [_,HostPartPeer] = string:lexemes(atom_to_list(NN),"@"),
872    [_,MyHostPart] = string:lexemes(atom_to_list(node()),"@"),
873    % Give net_kernel a chance to change the state of the node to up to.
874    receive after 1000 -> ok end,
875    case HostPartPeer of
876        MyHostPart ->
877            ok = stop_relay_node(Sock),
878            {ok,pang} = do_inet_rpc(Sock2,net_adm,ping,[NN]);
879        _ ->
880            {ok, true} = do_inet_rpc(Sock,net_kernel,disconnect,[NN2]),
881            receive
882            after 500 -> ok
883            end
884    end,
885    {ok, []} = do_inet_rpc(Sock2,erlang,nodes,[]),
886    Sock3 = case HostPartPeer of
887                MyHostPart ->
888                    start_relay_node(dist_auto_connect_relay_node,[]);
889                _ ->
890                    Sock
891            end,
892    TS1 = timestamp(),
893    {ok, pang} = do_inet_rpc(Sock2,net_adm,ping,[NN]),
894    TS2 = timestamp(),
895    RefT = net_kernel:connecttime() - 1000,
896    true = ((TS2 - TS1) < RefT),
897    TS3 = timestamp(),
898    {ok, true} = do_inet_rpc(Sock2,erlang,monitor_node,
899                             [NN,true,[allow_passive_connect]]),
900    TS4 = timestamp(),
901    true = ((TS4 - TS3) > RefT),
902    {ok, pong} = do_inet_rpc(Sock3,net_adm,ping,[NN2]),
903    {ok, pong} = do_inet_rpc(Sock2,net_adm,ping,[NN]),
904    {ok, true} = do_inet_rpc(Sock3,net_kernel,disconnect,[NN2]),
905    receive
906    after 500 -> ok
907    end,
908    {ok, pang} = do_inet_rpc(Sock2,net_adm,ping,[NN]),
909    {ok, true} = do_inet_rpc(Sock2,net_kernel,connect_node,[NN]),
910    {ok, pong} = do_inet_rpc(Sock2,net_adm,ping,[NN]),
911    stop_relay_node(Sock3),
912    stop_relay_node(Sock2).
913
914
915
916%% Start a relay node and a lonely (dist_auto_connect never) node.
917%% Lonely node pings relay node. That should fail.
918%% Lonely node connects to relay node with net_kernel:connect_node/1.
919%% Result is sent here through relay node.
920dist_auto_connect_never(Config) when is_list(Config) ->
921    Self = self(),
922    {ok, RelayNode} = start_node(dist_auto_connect_relay),
923    spawn(RelayNode,
924          fun() ->
925                  register(dist_auto_connect_relay, self()),
926                  dist_auto_connect_relay(Self)
927          end),
928    {ok, Handle} = dist_auto_connect_start(dist_auto_connect, never),
929    Result = receive
930                 {do_dist_auto_connect, ok} ->
931                     ok;
932                 {do_dist_auto_connect, Error} ->
933                     {error, Error};
934                 Other ->
935                     {error, Other}
936             after 32000 ->
937                       timeout
938             end,
939    stop_node(RelayNode),
940    Stopped = dist_auto_connect_stop(Handle),
941    Junk = receive
942               {do_dist_auto_connect, _} = J -> J
943           after 0 -> ok
944           end,
945    {ok, ok, ok} = {Result, Stopped, Junk},
946    ok.
947
948
949do_dist_auto_connect([never]) ->
950    Node = list_to_atom("dist_auto_connect_relay@" ++ hostname()),
951    io:format("~p:do_dist_auto_connect([false]) Node=~p~n", [?MODULE, Node]),
952    Ping = net_adm:ping(Node),
953    io:format("~p:do_dist_auto_connect([false]) Ping=~p~n", [?MODULE, Ping]),
954    Result = case Ping of
955                 pang -> ok;
956                 _ -> {error, Ping}
957             end,
958    io:format("~p:do_dist_auto_connect([false]) Result=~p~n", [?MODULE, Result]),
959    net_kernel:connect_node(Node),
960    catch {dist_auto_connect_relay, Node} ! {do_dist_auto_connect, Result};
961%    receive after 1000 -> ok end,
962%    halt();
963
964do_dist_auto_connect(Arg) ->
965    io:format("~p:do_dist_auto_connect(~p)~n", [?MODULE, Arg]),
966    receive after 10000 -> ok end,
967    halt().
968
969
970dist_auto_connect_start(Name, Value) when is_atom(Name) ->
971    dist_auto_connect_start(atom_to_list(Name), Value);
972dist_auto_connect_start(Name, Value) when is_list(Name), is_atom(Value) ->
973    Node = list_to_atom(lists:append([Name, "@", hostname()])),
974    ModuleDir = filename:dirname(code:which(?MODULE)),
975    ValueStr = atom_to_list(Value),
976    Cookie = atom_to_list(erlang:get_cookie()),
977    Cmd = lists:append(
978            [%"xterm -e ",
979             ct:get_progname(),
980             %	     " -noinput ",
981             " -detached ",
982             long_or_short(), " ", Name,
983             " -setcookie ", Cookie,
984             " -pa ", ModuleDir,
985             " -s ", atom_to_list(?MODULE),
986             " do_dist_auto_connect ", ValueStr,
987             " -kernel dist_auto_connect ", ValueStr]),
988    io:format("~p:dist_auto_connect_start() cmd: ~p~n", [?MODULE, Cmd]),
989    Port = open_port({spawn, Cmd}, [stream]),
990    {ok, {Port, Node}}.
991
992
993dist_auto_connect_stop({Port, Node}) ->
994    Pid = spawn_link(fun() -> rpc:call(Node, erlang, halt, []) end),
995    dist_auto_connect_stop(Port, Node, Pid, 5000).
996
997dist_auto_connect_stop(Port, _Node, Pid, N) when is_integer(N), N =< 0 ->
998    exit(Pid, normal),
999    catch erlang:port_close(Port),
1000    Result = {error, node_not_down},
1001    io:format("~p:dist_auto_connect_stop() ~p~n", [?MODULE, Result]),
1002    Result;
1003dist_auto_connect_stop(Port, Node, Pid, N) when is_integer(N) ->
1004    case net_adm:ping(Node) of
1005        pong ->
1006            receive after 100 -> ok end,
1007            dist_auto_connect_stop(Port, Node, Pid, N-100);
1008        pang ->
1009            exit(Pid, normal),
1010            catch erlang:port_close(Port),
1011            io:format("~p:dist_auto_connect_stop() ok~n", [?MODULE]),
1012            ok
1013    end.
1014
1015
1016dist_auto_connect_relay(Parent) ->
1017    receive X ->
1018                catch Parent ! X
1019    end,
1020    dist_auto_connect_relay(Parent).
1021
1022
1023dist_parallel_send(Config) when is_list(Config) ->
1024    {ok, RNode} = start_node(dist_parallel_receiver),
1025    {ok, SNode} = start_node(dist_parallel_sender),
1026    WatchDog = spawn_link(
1027                 fun () ->
1028                         TRef = erlang:start_timer((2*60*1000), self(), oops),
1029                         receive
1030                             {timeout, TRef, _ } ->
1031                                 spawn(SNode, fun () -> abort(timeout) end),
1032                                 spawn(RNode, fun () -> abort(timeout) end)
1033                                 %%       rpc:cast(SNode, erlang, halt,
1034                                 %%		["Timetrap (sender)"]),
1035                                 %%       rpc:cast(RNode, erlang, halt,
1036                                 %%		["Timetrap (receiver)"])
1037                         end
1038                 end),
1039    MkSndrs = fun (Receiver) ->
1040                      lists:map(fun (_) ->
1041                                        spawn_link(SNode,
1042                                                   ?MODULE,
1043                                                   dist_parallel_sender,
1044                                                   [self(), Receiver, 1000])
1045                                end, lists:seq(1, 64))
1046              end,
1047    SndrsStart = fun (Sndrs) ->
1048                         Parent = self(),
1049                         spawn_link(SNode,
1050                           fun () ->
1051                                   lists:foreach(fun (P) ->
1052                                                         P ! {go, Parent}
1053                                                 end, Sndrs)
1054                           end)
1055                 end,
1056    SndrsWait = fun (Sndrs) ->
1057                        lists:foreach(fun (P) ->
1058                                              receive {P, done} -> ok end
1059                                      end, Sndrs)
1060                end,
1061    DPR = spawn_link(RNode, ?MODULE, dist_parallel_receiver, []),
1062    Sndrs1 = MkSndrs(DPR),
1063    SndrsStart(Sndrs1),
1064    SndrsWait(Sndrs1),
1065    unlink(DPR),
1066    exit(DPR, bang),
1067
1068    DEPR = spawn_link(RNode, ?MODULE, dist_evil_parallel_receiver, []),
1069    Sndrs2 = MkSndrs(DEPR),
1070    SndrsStart(Sndrs2),
1071    SndrsWait(Sndrs2),
1072    unlink(DEPR),
1073    exit(DEPR, bang),
1074
1075    unlink(WatchDog),
1076    exit(WatchDog, bang),
1077
1078    stop_node(RNode),
1079    stop_node(SNode),
1080
1081    ok.
1082
1083do_dist_parallel_sender(Parent, _Receiver, 0) ->
1084    Parent ! {self(), done};
1085do_dist_parallel_sender(Parent, Receiver, N) ->
1086    Receiver ! {self(), "Some data"},
1087    do_dist_parallel_sender(Parent, Receiver, N-1).
1088
1089dist_parallel_sender(Parent, Receiver, N) ->
1090    receive {go, Parent} -> ok end,
1091    do_dist_parallel_sender(Parent, Receiver, N).
1092
1093dist_parallel_receiver() ->
1094    receive {_Sender, _Data} -> ok end,
1095    dist_parallel_receiver().
1096
1097dist_evil_parallel_receiver() ->
1098    receive {Sender, _Data} -> ok end,
1099    net_kernel:disconnect(node(Sender)),
1100    dist_evil_parallel_receiver().
1101
1102atom_roundtrip(Config) when is_list(Config) ->
1103    AtomData = atom_data(),
1104    verify_atom_data(AtomData),
1105    {ok, Node} = start_node(Config),
1106    do_atom_roundtrip(Node, AtomData),
1107    stop_node(Node),
1108    ok.
1109
1110atom_roundtrip_r16b(Config) when is_list(Config) ->
1111    case test_server:is_release_available("r16b") of
1112        true ->
1113            ct:timetrap({minutes, 6}),
1114            AtomData = unicode_atom_data(),
1115            verify_atom_data(AtomData),
1116            case start_node(Config, [], "r16b") of
1117                {ok, Node} ->
1118                    do_atom_roundtrip(Node, AtomData),
1119                    stop_node(Node);
1120                {error, timeout} ->
1121                    {skip,"Unable to start OTP R16B release"}
1122            end;
1123        false ->
1124            {skip,"No OTP R16B available"}
1125    end.
1126
1127unicode_atom_roundtrip(Config) when is_list(Config) ->
1128    AtomData = unicode_atom_data(),
1129    verify_atom_data(AtomData),
1130    {ok, Node} = start_node(Config),
1131    do_atom_roundtrip(Node, AtomData),
1132    stop_node(Node),
1133    ok.
1134
1135do_atom_roundtrip(Node, AtomData) ->
1136    Parent = self(),
1137    Proc = spawn_link(Node, fun () -> verify_atom_data_loop(Parent) end),
1138    Proc ! {self(), AtomData},
1139    receive {Proc, AD1} -> AtomData = AD1 end,
1140    Proc ! {self(), AtomData},
1141    receive {Proc, AD2} -> AtomData = AD2 end,
1142    RevAtomData = lists:reverse(AtomData),
1143    Proc ! {self(), RevAtomData},
1144    receive {Proc, RAD1} -> RevAtomData = RAD1 end,
1145    unlink(Proc),
1146    exit(Proc, bang),
1147    ok.
1148
1149verify_atom_data_loop(From) ->
1150    receive
1151        {From, AtomData} ->
1152            verify_atom_data(AtomData),
1153            From ! {self(), AtomData},
1154            verify_atom_data_loop(From)
1155    end.
1156
1157atom_data() ->
1158    lists:map(fun (N) ->
1159                      ATxt = "a"++integer_to_list(N),
1160                      {list_to_atom(ATxt), ATxt}
1161              end,
1162              lists:seq(1, 2000)).
1163
1164verify_atom_data(AtomData) ->
1165    lists:foreach(fun ({Atom, AtomTxt}) when is_atom(Atom) ->
1166                          AtomTxt = atom_to_list(Atom);
1167                      ({PPR, AtomTxt}) ->
1168                          % Pid, Port, or Ref
1169                          AtomTxt = atom_to_list(node(PPR))
1170                  end,
1171                  AtomData).
1172
1173uc_atom_tup(ATxt) ->
1174    Atom = string_to_atom(ATxt),
1175    ATxt = atom_to_list(Atom),
1176    {Atom, ATxt}.
1177
1178uc_pid_tup(ATxt) ->
1179    ATxtExt = string_to_atom_ext(ATxt),
1180    Pid = mk_pid({ATxtExt, 1}, 4711,17),
1181    true = is_pid(Pid),
1182    Atom = node(Pid),
1183    true = is_atom(Atom),
1184    ATxt = atom_to_list(Atom),
1185    {Pid, ATxt}.
1186
1187uc_port_tup(ATxt) ->
1188    ATxtExt = string_to_atom_ext(ATxt),
1189    Port = mk_port({ATxtExt, 2}, 4711),
1190    true = is_port(Port),
1191    Atom = node(Port),
1192    true = is_atom(Atom),
1193    ATxt = atom_to_list(Atom),
1194    {Port, ATxt}.
1195
1196uc_ref_tup(ATxt) ->
1197    ATxtExt = string_to_atom_ext(ATxt),
1198    Ref = mk_ref({ATxtExt, 3}, [4711,17, 4711]),
1199    true = is_reference(Ref),
1200    Atom = node(Ref),
1201    true = is_atom(Atom),
1202    ATxt = atom_to_list(Atom),
1203    {Ref, ATxt}.
1204
1205
1206unicode_atom_data() ->
1207    [uc_pid_tup(lists:seq(16#1f600, 16#1f600+249) ++ "@host"),
1208     uc_pid_tup(lists:seq(16#1f600, 16#1f600+30) ++ "@host"),
1209     uc_port_tup(lists:seq(16#1f600, 16#1f600+249) ++ "@host"),
1210     uc_port_tup(lists:seq(16#1f600, 16#1f600+30) ++ "@host"),
1211     uc_ref_tup(lists:seq(16#1f600, 16#1f600+249) ++ "@host"),
1212     uc_ref_tup(lists:seq(16#1f600, 16#1f600+30) ++ "@host"),
1213     uc_atom_tup(lists:seq(16#1f600, 16#1f600+254)),
1214     uc_atom_tup(lists:seq(16#1f600, 16#1f600+63)),
1215     uc_atom_tup(lists:seq(0, 254)),
1216     uc_atom_tup(lists:seq(100, 163)),
1217     uc_atom_tup(lists:seq(200, 354)),
1218     uc_atom_tup(lists:seq(200, 263)),
1219     uc_atom_tup(lists:seq(2000, 2254)),
1220     uc_atom_tup(lists:seq(2000, 2063)),
1221     uc_atom_tup(lists:seq(65500, 65754)),
1222     uc_atom_tup(lists:seq(65500, 65563))
1223     | lists:map(fun (N) ->
1224                         uc_atom_tup(lists:seq(64000+N, 64254+N))
1225                 end, lists:seq(1, 2000))].
1226
1227contended_atom_cache_entry(Config) when is_list(Config) ->
1228    contended_atom_cache_entry_test(Config, latin1).
1229
1230contended_unicode_atom_cache_entry(Config) when is_list(Config) ->
1231    contended_atom_cache_entry_test(Config, unicode).
1232
1233contended_atom_cache_entry_test(Config, Type) ->
1234    TestServer = self(),
1235    ProcessPairs = 10,
1236    Msgs = 100000,
1237    {ok, SNode} = start_node(Config),
1238    {ok, RNode} = start_node(Config),
1239    Success = make_ref(),
1240    spawn_link(
1241      SNode,
1242      fun () ->
1243              Master = self(),
1244              CIX = get_cix(),
1245              TestAtoms = case Type of
1246                              latin1 ->
1247                                  get_conflicting_atoms(CIX,
1248                                                        ProcessPairs);
1249                              unicode ->
1250                                  get_conflicting_unicode_atoms(CIX,
1251                                                                ProcessPairs)
1252                          end,
1253              io:format("Testing with the following atoms all using "
1254                        "cache index ~p:~n ~w~n",
1255                        [CIX, TestAtoms]),
1256              Ps = lists:map(
1257                     fun (A) ->
1258                             Ref = make_ref(),
1259                             R = spawn_link(RNode,
1260                                   fun () ->
1261                                           Atom = receive
1262                                                      {Ref, txt, ATxt} ->
1263                                                          case Type of
1264                                                              latin1 ->
1265                                                                  list_to_atom(ATxt);
1266                                                              unicode ->
1267                                                                  string_to_atom(ATxt)
1268                                                          end
1269                                                  end,
1270                                           receive_ref_atom(Ref,
1271                                                            Atom,
1272                                                            Msgs),
1273                                           Master ! {self(), success}
1274                                   end),
1275                             S = spawn_link(SNode,
1276                                   fun () ->
1277                                           receive go -> ok end,
1278                                           R ! {Ref,
1279                                                txt,
1280                                                atom_to_list(A)},
1281                                           send_ref_atom(R, Ref, A, Msgs)
1282                                   end),
1283                             {S, R}
1284                     end,
1285                     TestAtoms),
1286              lists:foreach(fun ({S, _}) ->
1287                                    S ! go
1288                            end,
1289                            Ps),
1290              lists:foreach(fun ({_, R}) ->
1291                                    receive {R, success} -> ok end
1292                            end,
1293                            Ps),
1294              TestServer ! Success
1295      end),
1296    receive
1297        Success ->
1298            ok
1299    end,
1300    stop_node(SNode),
1301    stop_node(RNode),
1302    ok.
1303
1304send_ref_atom(_To, _Ref, _Atom, 0) ->
1305    ok;
1306send_ref_atom(To, Ref, Atom, N) ->
1307    To ! {Ref, Atom},
1308    send_ref_atom(To, Ref, Atom, N-1).
1309
1310receive_ref_atom(_Ref, _Atom, 0) ->
1311    ok;
1312receive_ref_atom(Ref, Atom, N) ->
1313    receive
1314        {Ref, Value} ->
1315            Atom = Value
1316    end,
1317    receive_ref_atom(Ref, Atom, N-1).
1318
1319get_cix() ->
1320    get_cix(1000).
1321
1322get_cix(CIX) when is_integer(CIX), CIX < 0 ->
1323    get_cix(0);
1324get_cix(CIX) when is_integer(CIX) ->
1325    get_cix(CIX,
1326            unwanted_cixs(),
1327            get_internal_state(max_atom_out_cache_index)).
1328
1329get_cix(CIX, Unwanted, MaxCIX) when CIX > MaxCIX ->
1330    get_cix(0, Unwanted, MaxCIX);
1331get_cix(CIX, Unwanted, MaxCIX) ->
1332    case lists:member(CIX, Unwanted) of
1333        true -> get_cix(CIX+1, Unwanted, MaxCIX);
1334        false -> CIX
1335    end.
1336
1337unwanted_cixs() ->
1338    lists:map(fun (Node) ->
1339                      get_internal_state({atom_out_cache_index,
1340                                          Node})
1341              end,
1342              nodes()).
1343
1344
1345get_conflicting_atoms(_CIX, 0) ->
1346    [];
1347get_conflicting_atoms(CIX, N) ->
1348    Atom = list_to_atom("atom" ++ integer_to_list(erlang:unique_integer([positive]))),
1349    case get_internal_state({atom_out_cache_index, Atom}) of
1350        CIX ->
1351            [Atom|get_conflicting_atoms(CIX, N-1)];
1352        _ ->
1353            get_conflicting_atoms(CIX, N)
1354    end.
1355
1356get_conflicting_unicode_atoms(_CIX, 0) ->
1357    [];
1358get_conflicting_unicode_atoms(CIX, N) ->
1359    Atom = string_to_atom([16#1f608] ++ "atom" ++ integer_to_list(erlang:unique_integer([positive]))),
1360    case get_internal_state({atom_out_cache_index, Atom}) of
1361        CIX ->
1362            [Atom|get_conflicting_unicode_atoms(CIX, N-1)];
1363        _ ->
1364            get_conflicting_unicode_atoms(CIX, N)
1365    end.
1366
1367-define(COOKIE, '').
1368-define(DOP_LINK,		1).
1369-define(DOP_SEND,		2).
1370-define(DOP_EXIT,		3).
1371-define(DOP_UNLINK,		4).
1372-define(DOP_REG_SEND,		6).
1373-define(DOP_GROUP_LEADER,	7).
1374-define(DOP_EXIT2,		8).
1375
1376-define(DOP_SEND_TT,		12).
1377-define(DOP_EXIT_TT,		13).
1378-define(DOP_REG_SEND_TT,	16).
1379-define(DOP_EXIT2_TT,		18).
1380
1381-define(DOP_MONITOR_P,		19).
1382-define(DOP_DEMONITOR_P,	20).
1383-define(DOP_MONITOR_P_EXIT,	21).
1384
1385start_monitor(Offender,P) ->
1386    Parent = self(),
1387    Q = spawn(Offender,
1388              fun () ->
1389                      Ref = erlang:monitor(process,P),
1390                      Parent ! {self(),ref,Ref},
1391                      receive
1392                          just_stay_alive -> ok
1393                      end
1394              end),
1395    Ref = receive
1396              {Q,ref,R} ->
1397                  R
1398          after  5000 ->
1399                     error
1400          end,
1401    io:format("Ref is ~p~n",[Ref]),
1402    ok.
1403start_link(Offender,P) ->
1404    Parent = self(),
1405    Q = spawn(Offender,
1406              fun () ->
1407                      process_flag(trap_exit,true),
1408                      link(P),
1409                      Parent ! {self(),ref,P},
1410                      receive
1411                          just_stay_alive -> ok
1412                      end
1413              end),
1414    Ref = receive
1415              {Q,ref,R} ->
1416                  R
1417          after  5000 ->
1418                     error
1419          end,
1420    io:format("Ref is ~p~n",[Ref]),
1421    ok.
1422
1423%% Test dist messages with valid structure (binary to term ok) but malformed control content
1424bad_dist_structure(Config) when is_list(Config) ->
1425    ct:timetrap({seconds, 15}),
1426
1427    {ok, Offender} = start_node(bad_dist_structure_offender),
1428    {ok, Victim} = start_node(bad_dist_structure_victim),
1429    start_node_monitors([Offender,Victim]),
1430    Parent = self(),
1431    P = spawn(Victim,
1432              fun () ->
1433                      process_flag(trap_exit,true),
1434                      Parent ! {self(), started},
1435                      receive check_msgs -> ok end,
1436                      bad_dist_struct_check_msgs([one,
1437                                                  two]),
1438                      Parent ! {self(), messages_checked},
1439                      receive done -> ok end
1440              end),
1441    receive {P, started} -> ok end,
1442    pong = rpc:call(Victim, net_adm, ping, [Offender]),
1443    verify_up(Offender, Victim),
1444    true = lists:member(Offender, rpc:call(Victim, erlang, nodes, [])),
1445    start_monitor(Offender,P),
1446    P ! one,
1447    send_bad_structure(Offender, P,{?DOP_MONITOR_P_EXIT,'replace',P,normal},2),
1448
1449    start_monitor(Offender,P),
1450    send_bad_structure(Offender, P,{?DOP_MONITOR_P_EXIT,'replace',P,normal,normal},2),
1451
1452    start_link(Offender,P),
1453    send_bad_structure(Offender, P,{?DOP_LINK},0),
1454
1455    start_link(Offender,P),
1456    send_bad_structure(Offender, P,{?DOP_UNLINK,'replace'},2),
1457
1458    start_link(Offender,P),
1459    send_bad_structure(Offender, P,{?DOP_UNLINK,'replace',make_ref()},2),
1460
1461    start_link(Offender,P),
1462    send_bad_structure(Offender, P,{?DOP_UNLINK,make_ref(),P},0),
1463
1464    start_link(Offender,P),
1465    send_bad_structure(Offender, P,{?DOP_UNLINK,normal,normal},0),
1466
1467    start_monitor(Offender,P),
1468    send_bad_structure(Offender, P,{?DOP_MONITOR_P,'replace',P},2),
1469
1470    start_monitor(Offender,P),
1471    send_bad_structure(Offender, P,{?DOP_MONITOR_P,'replace',P,normal},2),
1472
1473    start_monitor(Offender,P),
1474    send_bad_structure(Offender, P,{?DOP_DEMONITOR_P,'replace',P},2),
1475
1476    start_monitor(Offender,P),
1477    send_bad_structure(Offender, P,{?DOP_DEMONITOR_P,'replace',P,normal},2),
1478
1479    send_bad_structure(Offender, P,{?DOP_EXIT,'replace',P},2),
1480    send_bad_structure(Offender, P,{?DOP_EXIT,make_ref(),normal,normal},0),
1481    send_bad_structure(Offender, P,{?DOP_EXIT_TT,'replace',token,P},2),
1482    send_bad_structure(Offender, P,{?DOP_EXIT_TT,make_ref(),token,normal,normal},0),
1483    send_bad_structure(Offender, P,{?DOP_EXIT2,'replace',P},2),
1484    send_bad_structure(Offender, P,{?DOP_EXIT2,make_ref(),normal,normal},0),
1485    send_bad_structure(Offender, P,{?DOP_EXIT2_TT,'replace',token,P},2),
1486    send_bad_structure(Offender, P,{?DOP_EXIT2_TT,make_ref(),token,normal,normal},0),
1487    send_bad_structure(Offender, P,{?DOP_GROUP_LEADER,'replace'},2),
1488    send_bad_structure(Offender, P,{?DOP_GROUP_LEADER,'replace','atomic'},2),
1489    send_bad_structure(Offender, P,{?DOP_GROUP_LEADER,'replace',P},0),
1490    send_bad_structure(Offender, P,{?DOP_REG_SEND_TT,'replace','',name},2,{message}),
1491    send_bad_structure(Offender, P,{?DOP_REG_SEND_TT,'replace','',name,token},0,{message}),
1492    send_bad_structure(Offender, P,{?DOP_REG_SEND,'replace',''},2,{message}),
1493    send_bad_structure(Offender, P,{?DOP_REG_SEND,'replace','',P},0,{message}),
1494    send_bad_structure(Offender, P,{?DOP_REG_SEND,'replace','',name},0,{message}),
1495    send_bad_structure(Offender, P,{?DOP_REG_SEND,'replace','',name,{token}},2,{message}),
1496    send_bad_structure(Offender, P,{?DOP_SEND_TT,'',P},0,{message}),
1497    send_bad_structure(Offender, P,{?DOP_SEND_TT,'',name,token},0,{message}),
1498    send_bad_structure(Offender, P,{?DOP_SEND,''},0,{message}),
1499    send_bad_structure(Offender, P,{?DOP_SEND,'',name},0,{message}),
1500    send_bad_structure(Offender, P,{?DOP_SEND,'',P,{token}},0,{message}),
1501    P ! two,
1502    P ! check_msgs,
1503    receive
1504        {P, messages_checked} -> ok
1505    after 5000 ->
1506              exit(victim_is_dead)
1507    end,
1508
1509    {message_queue_len, 0}
1510    = rpc:call(Victim, erlang, process_info, [P, message_queue_len]),
1511
1512    unlink(P),
1513    P ! done,
1514    stop_node(Offender),
1515    stop_node(Victim),
1516    ok.
1517
1518
1519
1520bad_dist_ext_receive(Config) when is_list(Config) ->
1521    {ok, Offender} = start_node(bad_dist_ext_receive_offender),
1522    {ok, Victim} = start_node(bad_dist_ext_receive_victim),
1523    start_node_monitors([Offender,Victim]),
1524
1525    Parent = self(),
1526
1527    P = spawn_link(Victim,
1528                   fun () ->
1529                           Parent ! {self(), started},
1530                           receive check_msgs -> ok end,
1531                           bad_dist_ext_check_msgs([one,
1532                                                    two,
1533                                                    three]),
1534                           Parent ! {self(), messages_checked},
1535                           receive done -> ok end
1536                   end),
1537
1538    receive {P, started} -> ok end,
1539    pong = rpc:call(Victim, net_adm, ping, [Offender]),
1540    verify_up(Offender, Victim),
1541    true = lists:member(Offender, rpc:call(Victim, erlang, nodes, [])),
1542    P ! one,
1543    send_bad_msg(Offender, P),
1544    P ! two,
1545    verify_down(Offender, connection_closed, Victim, killed),
1546    {message_queue_len, 2}
1547    = rpc:call(Victim, erlang, process_info, [P, message_queue_len]),
1548
1549    Suspended = make_ref(),
1550    S = spawn(Victim,
1551              fun () ->
1552                      erlang:suspend_process(P),
1553                      Parent ! Suspended,
1554                      receive after infinity -> ok end
1555              end),
1556    MS = erlang:monitor(process, S),
1557    receive Suspended -> ok end,
1558    pong = rpc:call(Victim, net_adm, ping, [Offender]),
1559    verify_up(Offender, Victim),
1560    true = lists:member(Offender, rpc:call(Victim, erlang, nodes, [])),
1561    send_bad_msgs(Offender, P, 5),
1562    true = lists:member(Offender, rpc:call(Victim, erlang, nodes, [])),
1563    P ! three,
1564    send_bad_msgs(Offender, P, 5),
1565
1566    %% Make sure bad msgs has reached Victim
1567    rpc:call(Offender, rpc, call, [Victim, erlang, node, []]),
1568
1569    verify_still_up(Offender, Victim),
1570    {message_queue_len, 13}
1571    = rpc:call(Victim, erlang, process_info, [P, message_queue_len]),
1572
1573    exit(S, bang),
1574    receive {'DOWN', MS, process, S, bang} -> ok end,
1575    verify_down(Offender, connection_closed, Victim, killed),
1576    {message_queue_len, 3}
1577    = rpc:call(Victim, erlang, process_info, [P, message_queue_len]),
1578
1579    P ! check_msgs,
1580    receive {P, messages_checked} -> ok end,
1581
1582    {message_queue_len, 0}
1583    = rpc:call(Victim, erlang, process_info, [P, message_queue_len]),
1584
1585    P ! done,
1586    unlink(P),
1587    verify_no_down(Offender, Victim),
1588    stop_node(Offender),
1589    stop_node(Victim).
1590
1591
1592bad_dist_ext_process_info(Config) when is_list(Config) ->
1593    {ok, Offender} = start_node(bad_dist_ext_process_info_offender),
1594    {ok, Victim} = start_node(bad_dist_ext_process_info_victim),
1595    start_node_monitors([Offender,Victim]),
1596
1597    Parent = self(),
1598    P = spawn_link(Victim,
1599                   fun () ->
1600                           Parent ! {self(), started},
1601                           receive check_msgs -> ok end,
1602                           bad_dist_ext_check_msgs([one, two]),
1603                           Parent ! {self(), messages_checked},
1604                           receive done -> ok end
1605                   end),
1606
1607    receive {P, started} -> ok end,
1608    P ! one,
1609
1610    Suspended = make_ref(),
1611    S = spawn(Victim,
1612              fun () ->
1613                      erlang:suspend_process(P),
1614                      Parent ! Suspended,
1615                      receive after infinity -> ok end
1616              end),
1617
1618    receive Suspended -> ok end,
1619    pong = rpc:call(Victim, net_adm, ping, [Offender]),
1620    verify_up(Offender, Victim),
1621    send_bad_msgs(Offender, P, 5),
1622
1623    P ! two,
1624    send_bad_msgs(Offender, P, 5),
1625
1626    %% Make sure bad msgs has reached Victim
1627    rpc:call(Offender, rpc, call, [Victim, erlang, node, []]),
1628
1629    verify_still_up(Offender, Victim),
1630    {message_queue_len, 12}
1631    = rpc:call(Victim, erlang, process_info, [P, message_queue_len]),
1632    verify_still_up(Offender, Victim),
1633    [{message_queue_len, 2},
1634     {messages, [one, two]}]
1635    = rpc:call(Victim, erlang, process_info, [P, [message_queue_len,
1636                                                  messages]]),
1637    verify_down(Offender, connection_closed, Victim, killed),
1638
1639    P ! check_msgs,
1640    exit(S, bang),
1641    receive {P, messages_checked} -> ok end,
1642
1643    {message_queue_len, 0}
1644    = rpc:call(Victim, erlang, process_info, [P, message_queue_len]),
1645
1646    P ! done,
1647    unlink(P),
1648    verify_no_down(Offender, Victim),
1649    stop_node(Offender),
1650    stop_node(Victim).
1651
1652bad_dist_ext_control(Config) when is_list(Config) ->
1653    {ok, Offender} = start_node(bad_dist_ext_control_offender),
1654    {ok, Victim} = start_node(bad_dist_ext_control_victim),
1655    start_node_monitors([Offender,Victim]),
1656
1657    pong = rpc:call(Victim, net_adm, ping, [Offender]),
1658    verify_up(Offender, Victim),
1659    send_bad_dhdr(Offender, Victim),
1660    verify_down(Offender, connection_closed, Victim, killed),
1661
1662    pong = rpc:call(Victim, net_adm, ping, [Offender]),
1663    verify_up(Offender, Victim),
1664    send_bad_ctl(Offender, Victim),
1665    verify_down(Offender, connection_closed, Victim, killed),
1666
1667    verify_no_down(Offender, Victim),
1668    stop_node(Offender),
1669    stop_node(Victim).
1670
1671bad_dist_ext_connection_id(Config) when is_list(Config) ->
1672    {ok, Offender} = start_node(bad_dist_ext_connection_id_offender),
1673    {ok, Victim} = start_node(bad_dist_ext_connection_id_victim),
1674    start_node_monitors([Offender,Victim]),
1675
1676    Parent = self(),
1677    P = spawn_link(Victim,
1678                   fun () ->
1679                           Parent ! {self(), started},
1680                           receive check_msgs -> ok end,
1681                           bad_dist_ext_check_msgs([]),
1682                           Parent ! {self(), messages_checked},
1683                           receive done -> ok end
1684                   end),
1685
1686    receive {P, started} -> ok end,
1687    Suspended = make_ref(),
1688    S = spawn(Victim,
1689              fun () ->
1690                      erlang:suspend_process(P),
1691                      Parent ! Suspended,
1692                      receive after infinity -> ok end
1693              end),
1694    MS = erlang:monitor(process, S),
1695    receive Suspended -> ok end,
1696    pong = rpc:call(Victim, net_adm, ping, [Offender]),
1697    verify_up(Offender, Victim),
1698    send_bad_msg(Offender, P),
1699
1700    %% Make sure bad msg has reached Victim
1701    rpc:call(Offender, rpc, call, [Victim, erlang, node, []]),
1702
1703    {message_queue_len, 1}
1704    = rpc:call(Victim, erlang, process_info, [P, message_queue_len]),
1705
1706    true = rpc:call(Offender, net_kernel, disconnect, [Victim]),
1707    verify_down(Offender, disconnect, Victim, connection_closed),
1708    pong = rpc:call(Offender, net_adm, ping, [Victim]),
1709
1710    verify_up(Offender, Victim),
1711    %% We have a new connection between Offender and Victim, bad message
1712    %% should not bring it down.
1713
1714    {message_queue_len, 1}
1715    = rpc:call(Victim, erlang, process_info, [P, message_queue_len]),
1716
1717    exit(S, bang),
1718    receive {'DOWN', MS, process, S, bang} -> ok end,
1719    %% Wait for a while (if the connection is taken down it might take a
1720    %% while).
1721    receive after 2000 -> ok end,
1722    verify_still_up(Offender, Victim),
1723
1724    P ! check_msgs,
1725    receive {P, messages_checked} -> ok end,
1726
1727    {message_queue_len, 0}
1728    = rpc:call(Victim, erlang, process_info, [P, message_queue_len]),
1729
1730    verify_still_up(Offender, Victim),
1731    P ! done,
1732    unlink(P),
1733    verify_no_down(Offender, Victim),
1734    stop_node(Offender),
1735    stop_node(Victim).
1736
1737%% OTP-14661: Bad message is discovered by erts_msg_attached_data_size
1738bad_dist_ext_size(Config) when is_list(Config) ->
1739    {ok, Offender} = start_node(bad_dist_ext_process_info_offender),
1740    %%Prog = "Prog=/home/uabseri/src/otp_new3/bin/cerl -rr -debug",
1741    Prog = [],
1742    {ok, Victim} = start_node(bad_dist_ext_process_info_victim, [], Prog),
1743    start_node_monitors([Offender,Victim]),
1744
1745    Parent = self(),
1746    P = spawn_opt(Victim,
1747                   fun () ->
1748                           Parent ! {self(), started},
1749                           receive check_msgs -> ok end,  %% DID CRASH HERE
1750                           bad_dist_ext_check_msgs([one]),
1751                           Parent ! {self(), messages_checked}
1752                   end,
1753                 [link,
1754                  %% on_heap to force total_heap_size to inspect msg queue
1755                  {message_queue_data, on_heap}]),
1756
1757    receive {P, started} -> ok end,
1758    P ! one,
1759
1760    Suspended = make_ref(),
1761    S = spawn(Victim,
1762              fun () ->
1763                      erlang:suspend_process(P),
1764                      Parent ! Suspended,
1765                      receive after infinity -> ok end
1766              end),
1767
1768    receive Suspended -> ok end,
1769    pong = rpc:call(Victim, net_adm, ping, [Offender]),
1770    verify_up(Offender, Victim),
1771    send_bad_msgs(Offender, P, 1, dmsg_bad_tag()),
1772
1773    %% Make sure bad msgs has reached Victim
1774    rpc:call(Offender, rpc, call, [Victim, erlang, node, []]),
1775
1776    verify_still_up(Offender, Victim),
1777
1778    %% Let process_info(P, total_heap_size) find bad msg and disconnect
1779    rpc:call(Victim, erlang, process_info, [P, total_heap_size]),
1780
1781    verify_down(Offender, connection_closed, Victim, killed),
1782
1783    P ! check_msgs,
1784    exit(S, bang),  % resume Victim
1785    receive {P, messages_checked} -> ok end,
1786
1787    unlink(P),
1788    verify_no_down(Offender, Victim),
1789    stop_node(Offender),
1790    stop_node(Victim).
1791
1792
1793bad_dist_struct_check_msgs([]) ->
1794    receive
1795        Msg ->
1796            exit({unexpected_message, Msg})
1797    after 0 ->
1798              ok
1799    end;
1800bad_dist_struct_check_msgs([M|Ms]) ->
1801    receive
1802        {'EXIT',_,_} = EM ->
1803            io:format("Ignoring exit message: ~p~n",[EM]),
1804            bad_dist_struct_check_msgs([M|Ms]);
1805        Msg ->
1806            M = Msg,
1807            bad_dist_struct_check_msgs(Ms)
1808    end.
1809bad_dist_ext_check_msgs([]) ->
1810    receive
1811        Msg ->
1812            exit({unexpected_message, Msg})
1813    after 0 ->
1814              ok
1815    end;
1816bad_dist_ext_check_msgs([M|Ms]) ->
1817    receive
1818        Msg ->
1819            M = Msg,
1820            bad_dist_ext_check_msgs(Ms)
1821    end.
1822
1823ensure_dctrl(Node) ->
1824    case dctrl(Node) of
1825        undefined ->
1826            pong = net_adm:ping(Node),
1827            dctrl(Node);
1828        DCtrl ->
1829            DCtrl
1830    end.
1831
1832dctrl_send(DPrt, Data) when is_port(DPrt) ->
1833    port_command(DPrt, Data);
1834dctrl_send(DPid, Data) when is_pid(DPid) ->
1835    Ref = make_ref(),
1836    DPid ! {send, self(), Ref, Data},
1837    receive {Ref, Res} -> Res end.
1838
1839dctrl_dop_reg_send(Node, Name, Msg) ->
1840    dctrl_send(ensure_dctrl(Node),
1841               [dmsg_hdr(),
1842                dmsg_ext({?DOP_REG_SEND,
1843                          self(),
1844                          ?COOKIE,
1845                          Name}),
1846                dmsg_ext(Msg)]).
1847
1848dctrl_dop_send(To, Msg) ->
1849    Node = node(To),
1850    dctrl_send(ensure_dctrl(Node),
1851               [dmsg_hdr(),
1852                dmsg_ext({?DOP_SEND, ?COOKIE, To}),
1853                dmsg_ext(Msg)]).
1854
1855send_bad_structure(Offender,Victim,Bad,WhereToPutSelf) ->
1856    send_bad_structure(Offender,Victim,Bad,WhereToPutSelf,[]).
1857send_bad_structure(Offender,Victim,Bad,WhereToPutSelf,PayLoad) ->
1858    Parent = self(),
1859    Done = make_ref(),
1860    spawn_link(Offender,
1861          fun () ->
1862                  Node = node(Victim),
1863                  pong = net_adm:ping(Node),
1864                  erlang:monitor_node(Node, true),
1865                  DCtrl = dctrl(Node),
1866                  Bad1 = case WhereToPutSelf of
1867                             0 ->
1868                                 Bad;
1869                             N when N > 0 ->
1870                                 setelement(N,Bad,self())
1871                         end,
1872                  DData = [dmsg_hdr(),
1873                           dmsg_ext(Bad1)] ++
1874                  case PayLoad of
1875                      [] -> [];
1876                      _Other -> [dmsg_ext(PayLoad)]
1877                  end,
1878
1879                  receive {nodedown, Node} -> exit("premature nodedown")
1880                  after 10 -> ok
1881                  end,
1882
1883                  dctrl_send(DCtrl, DData),
1884
1885                  receive {nodedown, Node} -> ok
1886                  after 5000 -> exit("missing nodedown")
1887                  end,
1888                  Parent ! {DData,Done}
1889          end),
1890    receive
1891        {WhatSent,Done} ->
1892            io:format("Offender sent ~p~n",[WhatSent]),
1893            ok
1894    after 5000 ->
1895              exit(unable_to_send)
1896    end.
1897
1898
1899%% send_bad_msgs():
1900%% Send a valid distribution header and control message
1901%% but an invalid message. This invalid message will be
1902%% enqueued in the receivers message queue.
1903send_bad_msg(BadNode, To) ->
1904    send_bad_msgs(BadNode, To, 1).
1905
1906send_bad_msgs(BadNode, To, Repeat) ->
1907    send_bad_msgs(BadNode, To, Repeat, dmsg_bad_atom_cache_ref()).
1908
1909send_bad_msgs(BadNode, To, Repeat, BadTerm) when is_atom(BadNode),
1910                                                 is_pid(To),
1911                                                 is_integer(Repeat) ->
1912    Parent = self(),
1913    Done = make_ref(),
1914    spawn_link(BadNode,
1915               fun () ->
1916                       Node = node(To),
1917                       pong = net_adm:ping(Node),
1918                       DCtrl = dctrl(Node),
1919                       DData = [dmsg_hdr(),
1920                                dmsg_ext({?DOP_SEND, ?COOKIE, To}),
1921                                BadTerm],
1922		       repeat(fun () -> dctrl_send(DCtrl, DData) end, Repeat),
1923                       Parent ! Done
1924               end),
1925    receive Done -> ok end.
1926
1927%% send_bad_ctl():
1928%% Send a valid distribution header but an invalid control message.
1929send_bad_ctl(BadNode, ToNode) when is_atom(BadNode), is_atom(ToNode) ->
1930    Parent = self(),
1931    Done = make_ref(),
1932    spawn_link(BadNode,
1933               fun () ->
1934                       pong = net_adm:ping(ToNode),
1935                       %% We creat a valid ctl msg and replace an
1936                       %% atom with an invalid atom cache reference
1937                       <<131,Replace/binary>> = term_to_binary(replace),
1938                       Ctl = dmsg_ext({?DOP_REG_SEND,
1939                                       self(),
1940                                       ?COOKIE,
1941                                       replace}),
1942                       CtlBeginSize = size(Ctl) - size(Replace),
1943                       <<CtlBegin:CtlBeginSize/binary, Replace/binary>> = Ctl,
1944                       DCtrl = dctrl(ToNode),
1945                       Data = [dmsg_fake_hdr2(),
1946                               CtlBegin,
1947                               dmsg_bad_atom_cache_ref(),
1948                               dmsg_ext({a, message})],
1949                       dctrl_send(DCtrl, Data),
1950                       Parent ! Done
1951               end),
1952    receive Done -> ok end.
1953
1954%% send_bad_dhr():
1955%% Send an invalid distribution header
1956send_bad_dhdr(BadNode, ToNode) when is_atom(BadNode), is_atom(ToNode) ->
1957    Parent = self(),
1958    Done = make_ref(),
1959    spawn_link(BadNode,
1960               fun () ->
1961                       pong = net_adm:ping(ToNode),
1962                       dctrl_send(dctrl(ToNode), dmsg_bad_hdr()),
1963                       Parent ! Done
1964               end),
1965    receive Done -> ok end.
1966
1967dctrl(Node) when is_atom(Node) ->
1968    get_internal_state({dist_ctrl, Node}).
1969
1970get_internal_state(Op) ->
1971    try erts_debug:get_internal_state(Op) of
1972        R -> R
1973    catch
1974        error:undef ->
1975            erts_debug:set_internal_state(available_internal_state, true),
1976            erts_debug:get_internal_state(Op)
1977    end.
1978
1979set_internal_state(Op, Val) ->
1980    try erts_debug:set_internal_state(Op, Val) of
1981        R -> R
1982    catch
1983        error:undef ->
1984            erts_debug:set_internal_state(available_internal_state, true),
1985            erts_debug:set_internal_state(Op, Val)
1986    end.
1987
1988
1989dmsg_hdr() ->
1990    [131, % Version Magic
1991     $D,  % Dist header
1992     0].  % No atom cache referenses
1993
1994dmsg_bad_hdr() ->
1995    [131, % Version Magic
1996     $D,  % Dist header
1997     255].  % 255 atom references
1998
1999
2000%% dmsg_fake_hdr1() ->
2001%%     A = <<"fake header atom 1">>,
2002%%     [131, % Version Magic
2003%%      $D, 1, 16#8, 0, size(A), A]. % Fake header
2004
2005dmsg_fake_hdr2() ->
2006    A1 = <<"fake header atom 1">>,
2007    A2 = <<"atom 2">>,
2008    A3 = <<"atom 3">>,
2009    [131, % Version Magic
2010     $D,
2011     3,
2012     16#88, 16#08, % Flags
2013     0, size(A1), A1,
2014     1, size(A2), A2,
2015     2, size(A3), A3].
2016
2017dmsg_ext(Term) ->
2018    <<131, Res/binary>> = term_to_binary(Term),
2019    Res.
2020
2021dmsg_bad_atom_cache_ref() ->
2022    [$R, 137].
2023
2024dmsg_bad_tag() ->  %% Will fail early at heap size calculation
2025    [$?, 66].
2026
2027start_epmd_false(Config) when is_list(Config) ->
2028    %% Start a node with the option -start_epmd false.
2029    {ok, OtherNode} = start_node(start_epmd_false, "-start_epmd false"),
2030    %% We should be able to ping it, as epmd was started by us:
2031    pong = net_adm:ping(OtherNode),
2032    stop_node(OtherNode),
2033
2034    ok.
2035
2036epmd_module(Config) when is_list(Config) ->
2037    %% We need a relay node to test this, since the test node uses the
2038    %% standard epmd module.
2039    Sock1 = start_relay_node(epmd_module_node1, "-epmd_module " ++ ?MODULE_STRING),
2040    Node1 = inet_rpc_nodename(Sock1),
2041    %% Ask what port it's listening on - it won't have registered with
2042    %% epmd.
2043    {ok, {ok, Port1}} = do_inet_rpc(Sock1, application, get_env, [kernel, dist_listen_port]),
2044
2045    %% Start a second node, passing the port number as a secret
2046    %% argument.
2047    Sock2 = start_relay_node(epmd_module_node2, "-epmd_module " ++ ?MODULE_STRING
2048			     ++ " -other_node_port " ++ integer_to_list(Port1)),
2049    Node2 = inet_rpc_nodename(Sock2),
2050    %% Node 1 can't ping node 2
2051    {ok, pang} = do_inet_rpc(Sock1, net_adm, ping, [Node2]),
2052    {ok, []} = do_inet_rpc(Sock1, erlang, nodes, []),
2053    {ok, []} = do_inet_rpc(Sock2, erlang, nodes, []),
2054    %% But node 2 can ping node 1
2055    {ok, pong} = do_inet_rpc(Sock2, net_adm, ping, [Node1]),
2056    {ok, [Node2]} = do_inet_rpc(Sock1, erlang, nodes, []),
2057    {ok, [Node1]} = do_inet_rpc(Sock2, erlang, nodes, []),
2058
2059    stop_relay_node(Sock2),
2060    stop_relay_node(Sock1).
2061
2062%% epmd_module functions:
2063
2064start_link() ->
2065    ignore.
2066
2067register_node(Name, Port) ->
2068    register_node(Name, Port, inet_tcp).
2069register_node(_Name, Port, _Driver) ->
2070    %% Save the port number we're listening on.
2071    application:set_env(kernel, dist_listen_port, Port),
2072    Creation = rand:uniform(3),
2073    {ok, Creation}.
2074
2075port_please(_Name, _Ip) ->
2076    case init:get_argument(other_node_port) of
2077	error ->
2078	    %% None specified.  Default to 42.
2079	    Port = 42,
2080	    Version = 5,
2081	    {port, Port, Version};
2082	{ok, [[PortS]]} ->
2083	    %% Port number given on command line.
2084	    Port = list_to_integer(PortS),
2085	    Version = 5,
2086	    {port, Port, Version}
2087    end.
2088
2089address_please(_Name, _Address, _AddressFamily) ->
2090    %% Use localhost.
2091    IP = {127,0,0,1},
2092    {ok, IP}.
2093
2094%%% Utilities
2095
2096timestamp() ->
2097    erlang:monotonic_time(millisecond).
2098
2099start_node(X) ->
2100    start_node(X, [], []).
2101
2102start_node(X, Y) ->
2103    start_node(X, Y, []).
2104
2105start_node(Name, Args, Rel) when is_atom(Name), is_list(Rel) ->
2106    Pa = filename:dirname(code:which(?MODULE)),
2107    Cookie = atom_to_list(erlang:get_cookie()),
2108    RelArg = case Rel of
2109                 [] -> [];
2110                 _ -> [{erl,[{release,Rel}]}]
2111             end,
2112    test_server:start_node(Name, slave,
2113                           [{args,
2114                             Args++" -setcookie "++Cookie++" -pa \""++Pa++"\""}
2115                            | RelArg]);
2116start_node(Config, Args, Rel) when is_list(Config), is_list(Rel) ->
2117    Name = list_to_atom((atom_to_list(?MODULE)
2118                         ++ "-"
2119                         ++ atom_to_list(proplists:get_value(testcase, Config))
2120                         ++ "-"
2121                         ++ integer_to_list(erlang:system_time(second))
2122                         ++ "-"
2123                         ++ integer_to_list(erlang:unique_integer([positive])))),
2124    start_node(Name, Args, Rel).
2125
2126stop_node(Node) ->
2127    test_server:stop_node(Node).
2128
2129freeze_node(Node, MS) ->
2130    Own = 300,
2131    DoingIt = make_ref(),
2132    Freezer = self(),
2133    spawn_link(Node,
2134               fun () ->
2135                       dctrl_dop_send(Freezer, DoingIt),
2136                       receive after Own -> ok end,
2137                       set_internal_state(block, MS+Own)
2138               end),
2139    receive DoingIt -> ok end,
2140    receive after Own -> ok end.
2141
2142inet_rpc_nodename({N,H,_Sock}) ->
2143    list_to_atom(N++"@"++H).
2144
2145do_inet_rpc({_,_,Sock},M,F,A) ->
2146    Bin = term_to_binary({M,F,A}),
2147    gen_tcp:send(Sock,Bin),
2148    case gen_tcp:recv(Sock,0) of
2149        {ok, Bin2} ->
2150            T = binary_to_term(Bin2),
2151            {ok,T};
2152        Else ->
2153            {error, Else}
2154    end.
2155
2156inet_rpc_server([Host, PortList]) ->
2157    Port = list_to_integer(PortList),
2158    {ok, Sock} = gen_tcp:connect(Host, Port,[binary, {packet, 4},
2159                                             {active, false}]),
2160    inet_rpc_server_loop(Sock).
2161
2162inet_rpc_server_loop(Sock) ->
2163    case gen_tcp:recv(Sock,0) of
2164        {ok, Bin} ->
2165            {M,F,A} = binary_to_term(Bin),
2166            Res = (catch apply(M,F,A)),
2167            RB = term_to_binary(Res),
2168            gen_tcp:send(Sock,RB),
2169            inet_rpc_server_loop(Sock);
2170        _ ->
2171            erlang:halt()
2172    end.
2173
2174
2175start_relay_node(Node, Args) ->
2176    Pa = filename:dirname(code:which(?MODULE)),
2177    Cookie = "NOT"++atom_to_list(erlang:get_cookie()),
2178    {ok, LSock} = gen_tcp:listen(0, [binary, {packet, 4}, {active, false}]),
2179    {ok, Port} = inet:port(LSock),
2180    {ok, Host} = inet:gethostname(),
2181    RunArg = "-run " ++ atom_to_list(?MODULE) ++ " inet_rpc_server " ++
2182    Host ++ " " ++ integer_to_list(Port),
2183    {ok, NN} = test_server:start_node(Node, peer,
2184                                      [{args, Args ++
2185                                        " -setcookie "++Cookie++" -pa "++Pa++" "++
2186                                        RunArg}]),
2187    [N,H] = string:lexemes(atom_to_list(NN),"@"),
2188    {ok, Sock} = gen_tcp:accept(LSock),
2189    pang = net_adm:ping(NN),
2190    {N,H,Sock}.
2191
2192stop_relay_node({N,H,Sock}) ->
2193    catch do_inet_rpc(Sock,erlang,halt,[]),
2194    catch gen_tcp:close(Sock),
2195    wait_dead(N,H,10).
2196
2197wait_dead(N,H,0) ->
2198    {error,{not_dead,N,H}};
2199wait_dead(N,H,X) ->
2200    case erl_epmd:port_please(N,H) of
2201        {port,_,_} ->
2202            receive
2203            after 1000 ->
2204                      ok
2205            end,
2206            wait_dead(N,H,X-1);
2207        noport ->
2208            ok;
2209        Else ->
2210            {error, {unexpected, Else}}
2211    end.
2212
2213
2214start_node_monitors(Nodes) ->
2215    Master = self(),
2216    lists:foreach(fun (Node) ->
2217                          spawn(Node,
2218                                fun () ->
2219                                        node_monitor(Master)
2220                                end)
2221                  end,
2222                  Nodes),
2223    ok.
2224
2225node_monitor(Master) ->
2226    Opts = [nodedown_reason,{node_type,all}],
2227    Nodes0 = nodes(connected),
2228    net_kernel:monitor_nodes(true, Opts),
2229    Nodes1 = nodes(connected),
2230    case lists:sort(Nodes0) == lists:sort(Nodes1) of
2231        true ->
2232            lists:foreach(fun (Node) ->
2233                                  Master ! {nodeup, node(), Node}
2234                          end,
2235                          Nodes0),
2236            io:format("~p ~p: ~p~n", [node(), erlang:system_time(microsecond), Nodes0]),
2237            node_monitor_loop(Master);
2238        false ->
2239            net_kernel:monitor_nodes(false, Opts),
2240            flush_node_changes(),
2241            node_monitor(Master)
2242    end.
2243
2244flush_node_changes() ->
2245    receive
2246        {NodeChange, _Node, _InfoList} when NodeChange == nodeup;
2247                                            NodeChange == nodedown ->
2248            flush_node_changes()
2249    after 0 ->
2250              ok
2251    end.
2252
2253node_monitor_loop(Master) ->
2254    receive
2255        {nodeup, Node, _InfoList} = Msg ->
2256            Master ! {nodeup, node(), Node},
2257            io:format("~p ~p: ~p~n", [node(), erlang:system_time(microsecond), Msg]),
2258            node_monitor_loop(Master);
2259        {nodedown, Node, InfoList} = Msg ->
2260            Reason = case lists:keysearch(nodedown_reason, 1, InfoList) of
2261                         {value, {nodedown_reason, R}} -> R;
2262                         _ -> undefined
2263                     end,
2264            Master ! {nodedown, node(), Node, Reason},
2265            io:format("~p ~p: ~p~n", [node(), erlang:system_time(microsecond), Msg]),
2266            node_monitor_loop(Master)
2267    end.
2268
2269verify_up(A, B) ->
2270    receive {nodeup, A, B} -> ok end,
2271    receive {nodeup, B, A} -> ok end.
2272
2273verify_still_up(A, B) ->
2274    true = lists:member(B, rpc:call(A, erlang, nodes, [connected])),
2275    true = lists:member(A, rpc:call(B, erlang, nodes, [connected])),
2276    verify_no_down(A, B).
2277
2278verify_no_down(A, B) ->
2279    receive
2280        {nodedown, A, B, _} = Msg0 ->
2281            ct:fail(Msg0)
2282    after 0 ->
2283              ok
2284    end,
2285    receive
2286        {nodedown, B, A, _} = Msg1 ->
2287            ct:fail(Msg1)
2288    after 0 ->
2289              ok
2290    end.
2291
2292%% verify_down(A, B) ->
2293%%     receive {nodedown, A, B, _} -> ok end,
2294%%     receive {nodedown, B, A, _} -> ok end.
2295
2296verify_down(A, ReasonA, B, ReasonB) ->
2297    receive
2298        {nodedown, A, B, _} = Msg0 ->
2299            {nodedown, A, B, ReasonA} = Msg0
2300    end,
2301    receive
2302        {nodedown, B, A, _} = Msg1 ->
2303            {nodedown, B, A, ReasonB} = Msg1
2304    end,
2305    ok.
2306
2307hostname() ->
2308    from($@, atom_to_list(node())).
2309
2310from(H, [H | T]) -> T;
2311from(H, [_ | T]) -> from(H, T);
2312from(_, []) -> [].
2313
2314%% fun_spawn(Fun) ->
2315%%     fun_spawn(Fun, []).
2316
2317%% fun_spawn(Fun, Args) ->
2318%%     spawn_link(erlang, apply, [Fun, Args]).
2319
2320
2321long_or_short() ->
2322    case net_kernel:longnames() of
2323        true -> " -name ";
2324        false -> " -sname "
2325    end.
2326
2327until(Fun) ->
2328    case Fun() of
2329        true ->
2330            ok;
2331        false ->
2332            receive after 10 -> ok end,
2333            until(Fun)
2334    end.
2335
2336forever(Fun) ->
2337    Fun(),
2338    forever(Fun).
2339
2340abort(Why) ->
2341    set_internal_state(abort, Why).
2342
2343
2344start_busy_dist_port_tracer() ->
2345    Tracer = spawn_link(fun () -> busy_dist_port_tracer() end),
2346    erlang:system_monitor(Tracer, [busy_dist_port]),
2347    Tracer.
2348
2349stop_busy_dist_port_tracer(Tracer) when is_pid(Tracer) ->
2350    unlink(Tracer),
2351    exit(Tracer, bye);
2352stop_busy_dist_port_tracer(_) ->
2353    true.
2354
2355busy_dist_port_tracer() ->
2356    receive
2357        {monitor, _SuspendedProcess, busy_dist_port, _Port} = M ->
2358            erlang:display(M),
2359            busy_dist_port_tracer()
2360    end.
2361
2362repeat(_Fun, 0) ->
2363    ok;
2364repeat(Fun, N) ->
2365    Fun(),
2366    repeat(Fun, N-1).
2367
2368string_to_atom_ext(String) ->
2369    Utf8List = string_to_utf8_list(String),
2370    Len = length(Utf8List),
2371    case Len < 256 of
2372        true ->
2373            [?SMALL_ATOM_UTF8_EXT, Len | Utf8List];
2374        false ->
2375            [?ATOM_UTF8_EXT, Len bsr 8, Len band 16#ff | Utf8List]
2376    end.
2377
2378string_to_atom(String) ->
2379    binary_to_term(list_to_binary([?VERSION_MAGIC
2380                                   | string_to_atom_ext(String)])).
2381
2382string_to_utf8_list([]) ->
2383    [];
2384string_to_utf8_list([CP|CPs]) when is_integer(CP),
2385                                   0 =< CP,
2386                                   CP =< 16#7F ->
2387    [CP | string_to_utf8_list(CPs)];
2388string_to_utf8_list([CP|CPs]) when is_integer(CP),
2389                                   16#80 =< CP,
2390                                   CP =< 16#7FF ->
2391    [16#C0 bor (CP bsr 6),
2392     16#80 bor (16#3F band CP)
2393     | string_to_utf8_list(CPs)];
2394string_to_utf8_list([CP|CPs]) when is_integer(CP),
2395                                   16#800 =< CP,
2396                                   CP =< 16#FFFF ->
2397    [16#E0 bor (CP bsr 12),
2398     16#80 bor (16#3F band (CP bsr 6)),
2399     16#80 bor (16#3F band CP)
2400     | string_to_utf8_list(CPs)];
2401string_to_utf8_list([CP|CPs]) when is_integer(CP),
2402                                   16#10000 =< CP,
2403                                   CP =< 16#10FFFF ->
2404    [16#F0 bor (CP bsr 18),
2405     16#80 bor (16#3F band (CP bsr 12)),
2406     16#80 bor (16#3F band (CP bsr 6)),
2407     16#80 bor (16#3F band CP)
2408     | string_to_utf8_list(CPs)].
2409
2410mk_pid({NodeName, Creation}, Number, Serial) when is_atom(NodeName) ->
2411    <<?VERSION_MAGIC, NodeNameExt/binary>> = term_to_binary(NodeName),
2412    mk_pid({NodeNameExt, Creation}, Number, Serial);
2413mk_pid({NodeNameExt, Creation}, Number, Serial) ->
2414    case catch binary_to_term(list_to_binary([?VERSION_MAGIC,
2415                                              ?PID_EXT,
2416                                              NodeNameExt,
2417                                              uint32_be(Number),
2418                                              uint32_be(Serial),
2419                                              uint8(Creation)])) of
2420        Pid when is_pid(Pid) ->
2421            Pid;
2422        {'EXIT', {badarg, _}} ->
2423            exit({badarg, mk_pid, [{NodeNameExt, Creation}, Number, Serial]});
2424        Other ->
2425            exit({unexpected_binary_to_term_result, Other})
2426    end.
2427
2428mk_port({NodeName, Creation}, Number) when is_atom(NodeName) ->
2429    <<?VERSION_MAGIC, NodeNameExt/binary>> = term_to_binary(NodeName),
2430    mk_port({NodeNameExt, Creation}, Number);
2431mk_port({NodeNameExt, Creation}, Number) ->
2432    case catch binary_to_term(list_to_binary([?VERSION_MAGIC,
2433                                              ?PORT_EXT,
2434                                              NodeNameExt,
2435                                              uint32_be(Number),
2436                                              uint8(Creation)])) of
2437        Port when is_port(Port) ->
2438            Port;
2439        {'EXIT', {badarg, _}} ->
2440            exit({badarg, mk_port, [{NodeNameExt, Creation}, Number]});
2441        Other ->
2442            exit({unexpected_binary_to_term_result, Other})
2443    end.
2444
2445mk_ref({NodeName, Creation}, [Number] = NL) when is_atom(NodeName),
2446                                                 is_integer(Creation),
2447                                                 is_integer(Number) ->
2448    <<?VERSION_MAGIC, NodeNameExt/binary>> = term_to_binary(NodeName),
2449    mk_ref({NodeNameExt, Creation}, NL);
2450mk_ref({NodeNameExt, Creation}, [Number]) when is_integer(Creation),
2451                                               is_integer(Number) ->
2452    case catch binary_to_term(list_to_binary([?VERSION_MAGIC,
2453                                              ?REFERENCE_EXT,
2454                                              NodeNameExt,
2455                                              uint32_be(Number),
2456                                              uint8(Creation)])) of
2457        Ref when is_reference(Ref) ->
2458            Ref;
2459        {'EXIT', {badarg, _}} ->
2460            exit({badarg, mk_ref, [{NodeNameExt, Creation}, [Number]]});
2461        Other ->
2462            exit({unexpected_binary_to_term_result, Other})
2463    end;
2464mk_ref({NodeName, Creation}, Numbers) when is_atom(NodeName),
2465                                           is_integer(Creation),
2466                                           is_list(Numbers) ->
2467    <<?VERSION_MAGIC, NodeNameExt/binary>> = term_to_binary(NodeName),
2468    mk_ref({NodeNameExt, Creation}, Numbers);
2469mk_ref({NodeNameExt, Creation}, Numbers) when is_integer(Creation),
2470                                              is_list(Numbers) ->
2471    case catch binary_to_term(list_to_binary([?VERSION_MAGIC,
2472                                              ?NEW_REFERENCE_EXT,
2473                                              uint16_be(length(Numbers)),
2474                                              NodeNameExt,
2475                                              uint8(Creation),
2476                                              lists:map(fun (N) ->
2477                                                                uint32_be(N)
2478                                                        end,
2479                                                        Numbers)])) of
2480        Ref when is_reference(Ref) ->
2481            Ref;
2482        {'EXIT', {badarg, _}} ->
2483            exit({badarg, mk_ref, [{NodeNameExt, Creation}, Numbers]});
2484        Other ->
2485            exit({unexpected_binary_to_term_result, Other})
2486    end.
2487
2488
2489uint32_be(Uint) when is_integer(Uint), 0 =< Uint, Uint < 1 bsl 32 ->
2490    [(Uint bsr 24) band 16#ff,
2491     (Uint bsr 16) band 16#ff,
2492     (Uint bsr 8) band 16#ff,
2493     Uint band 16#ff];
2494uint32_be(Uint) ->
2495    exit({badarg, uint32_be, [Uint]}).
2496
2497
2498uint16_be(Uint) when is_integer(Uint), 0 =< Uint, Uint < 1 bsl 16 ->
2499    [(Uint bsr 8) band 16#ff,
2500     Uint band 16#ff];
2501uint16_be(Uint) ->
2502    exit({badarg, uint16_be, [Uint]}).
2503
2504uint8(Uint) when is_integer(Uint), 0 =< Uint, Uint < 1 bsl 8 ->
2505    Uint band 16#ff;
2506uint8(Uint) ->
2507    exit({badarg, uint8, [Uint]}).
2508