1%%
2%% %CopyrightBegin%
3%%
4%% Copyright Ericsson AB 2017-2019. All Rights Reserved.
5%%
6%% Licensed under the Apache License, Version 2.0 (the "License");
7%% you may not use this file except in compliance with the License.
8%% You may obtain a copy of the License at
9%%
10%%     http://www.apache.org/licenses/LICENSE-2.0
11%%
12%% Unless required by applicable law or agreed to in writing, software
13%% distributed under the License is distributed on an "AS IS" BASIS,
14%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15%% See the License for the specific language governing permissions and
16%% limitations under the License.
17%%
18%% %CopyrightEnd%
19%%
20-module(gen_tcp_dist).
21
22%%
23%% This is an example of how to plug in an arbitrary distribution
24%% carrier for Erlang using distribution processes.
25%%
26%% This example uses gen_tcp for transportation of data, but
27%% you can use whatever underlying protocol you want as long
28%% as your implementation reliably delivers data chunks to the
29%% receiving VM in the order they were sent from the sending
30%% VM.
31%%
32%% This code is a rewrite of the lib/kernel/src/inet_tcp_dist.erl
33%% distribution implementation for TCP used by default. The default
34%% implementation uses distribution ports instead of distribution
35%% processes and is more efficient compared to this implementation.
36%% This example more or less gets the distribution processes
37%% in between the VM and the ports without any specific gain.
38%%
39
40-export([listen/1, accept/1, accept_connection/5,
41	 setup/5, close/1, select/1, is_node_name/1]).
42
43%% Optional
44-export([setopts/2, getopts/2]).
45
46%% internal exports
47
48-export([dist_cntrlr_setup/1, dist_cntrlr_input_setup/3,
49         dist_cntrlr_tick_handler/1]).
50
51-export([accept_loop/2,do_accept/6,do_setup/6]).
52
53-import(error_logger,[error_msg/2]).
54
55-include_lib("kernel/include/net_address.hrl").
56
57-include_lib("kernel/include/dist.hrl").
58-include_lib("kernel/include/dist_util.hrl").
59
60%% ------------------------------------------------------------
61%%  Select this protocol based on node name
62%%  select(Node) => Bool
63%% ------------------------------------------------------------
64
65select(Node) ->
66    case split_node(atom_to_list(Node), $@, []) of
67	[_, Host] ->
68	    case inet:getaddr(Host, inet) of
69                {ok,_} -> true;
70                _ -> false
71            end;
72	_ -> false
73    end.
74
75%% ------------------------------------------------------------
76%% Create the listen socket, i.e. the port that this erlang
77%% node is accessible through.
78%% ------------------------------------------------------------
79
80listen(Name) ->
81    case do_listen([binary, {active, false}, {packet,2}, {reuseaddr, true}]) of
82	{ok, Socket} ->
83	    TcpAddress = get_tcp_address(Socket),
84	    {_,Port} = TcpAddress#net_address.address,
85	    ErlEpmd = net_kernel:epmd_module(),
86	    case ErlEpmd:register_node(Name, Port) of
87		{ok, Creation} ->
88		    {ok, {Socket, TcpAddress, Creation}};
89		Error ->
90		    Error
91	    end;
92	Error ->
93	    Error
94    end.
95
96do_listen(Options) ->
97    {First,Last} = case application:get_env(kernel,inet_dist_listen_min) of
98		       {ok,N} when is_integer(N) ->
99			   case application:get_env(kernel,
100						    inet_dist_listen_max) of
101			       {ok,M} when is_integer(M) ->
102				   {N,M};
103			       _ ->
104				   {N,N}
105			   end;
106		       _ ->
107			   {0,0}
108		   end,
109    do_listen(First, Last, listen_options([{backlog,128}|Options])).
110
111do_listen(First,Last,_) when First > Last ->
112    {error,eaddrinuse};
113do_listen(First,Last,Options) ->
114    case gen_tcp:listen(First, Options) of
115	{error, eaddrinuse} ->
116	    do_listen(First+1,Last,Options);
117	Other ->
118	    Other
119    end.
120
121listen_options(Opts0) ->
122    Opts1 =
123	case application:get_env(kernel, inet_dist_use_interface) of
124	    {ok, Ip} ->
125		[{ip, Ip} | Opts0];
126	    _ ->
127		Opts0
128	end,
129    case application:get_env(kernel, inet_dist_listen_options) of
130	{ok,ListenOpts} ->
131	    ListenOpts ++ Opts1;
132	_ ->
133	    Opts1
134    end.
135
136
137%% ------------------------------------------------------------
138%% Accepts new connection attempts from other Erlang nodes.
139%% ------------------------------------------------------------
140
141accept(Listen) ->
142    spawn_opt(?MODULE, accept_loop, [self(), Listen], [link, {priority, max}]).
143
144accept_loop(Kernel, Listen) ->
145    ?trace("~p~n",[{?MODULE, accept_loop, self()}]),
146    case gen_tcp:accept(Listen) of
147	{ok, Socket} ->
148            DistCtrl = spawn_dist_cntrlr(Socket),
149            ?trace("~p~n",[{?MODULE, accept_loop, accepted, Socket, DistCtrl, self()}]),
150	    flush_controller(DistCtrl, Socket),
151	    gen_tcp:controlling_process(Socket, DistCtrl),
152	    flush_controller(DistCtrl, Socket),
153	    Kernel ! {accept,self(),DistCtrl,inet,tcp},
154            receive
155                {Kernel, controller, Pid} ->
156                    call_ctrlr(DistCtrl, {supervisor, Pid}),
157                    Pid ! {self(), controller};
158                {Kernel, unsupported_protocol} ->
159                    exit(unsupported_protocol)
160            end,
161	    accept_loop(Kernel, Listen);
162	Error ->
163	    exit(Error)
164    end.
165
166flush_controller(Pid, Socket) ->
167    receive
168	{tcp, Socket, Data} ->
169	    Pid ! {tcp, Socket, Data},
170	    flush_controller(Pid, Socket);
171	{tcp_closed, Socket} ->
172	    Pid ! {tcp_closed, Socket},
173	    flush_controller(Pid, Socket)
174    after 0 ->
175	    ok
176    end.
177
178%% ------------------------------------------------------------
179%% Accepts a new connection attempt from another Erlang node.
180%% Performs the handshake with the other side.
181%% ------------------------------------------------------------
182
183accept_connection(AcceptPid, DistCtrl, MyNode, Allowed, SetupTime) ->
184    spawn_opt(?MODULE, do_accept,
185	      [self(), AcceptPid, DistCtrl, MyNode, Allowed, SetupTime],
186	      [link, {priority, max}]).
187
188do_accept(Kernel, AcceptPid, DistCtrl, MyNode, Allowed, SetupTime) ->
189    ?trace("~p~n",[{?MODULE, do_accept, self(), MyNode}]),
190    receive
191	{AcceptPid, controller} ->
192	    Timer = dist_util:start_timer(SetupTime),
193	    case check_ip(DistCtrl) of
194		true ->
195                    HSData0 = hs_data_common(DistCtrl),
196		    HSData = HSData0#hs_data{kernel_pid = Kernel,
197                                             this_node = MyNode,
198                                             socket = DistCtrl,
199                                             timer = Timer,
200                                             this_flags = 0,
201                                             allowed = Allowed},
202		    dist_util:handshake_other_started(HSData);
203		{false,IP} ->
204		    error_msg("** Connection attempt from "
205			      "disallowed IP ~w ** ~n", [IP]),
206		    ?shutdown(no_node)
207	    end
208    end.
209
210%% we may not always want the nodelay behaviour
211%% for performance reasons
212
213nodelay() ->
214    case application:get_env(kernel, dist_nodelay) of
215	undefined ->
216	    {nodelay, true};
217	{ok, true} ->
218	    {nodelay, true};
219	{ok, false} ->
220	    {nodelay, false};
221	_ ->
222	    {nodelay, true}
223    end.
224
225%% ------------------------------------------------------------
226%% Setup a new connection to another Erlang node.
227%% Performs the handshake with the other side.
228%% ------------------------------------------------------------
229
230setup(Node, Type, MyNode, LongOrShortNames,SetupTime) ->
231    spawn_opt(?MODULE, do_setup,
232	      [self(), Node, Type, MyNode, LongOrShortNames, SetupTime],
233	      [link, {priority, max}]).
234
235do_setup(Kernel, Node, Type, MyNode, LongOrShortNames, SetupTime) ->
236    ?trace("~p~n",[{?MODULE, do_setup, self(), Node}]),
237    [Name, Address] = splitnode(Node, LongOrShortNames),
238    case inet:getaddr(Address, inet) of
239	{ok, Ip} ->
240	    Timer = dist_util:start_timer(SetupTime),
241	    ErlEpmd = net_kernel:epmd_module(),
242	    case ErlEpmd:port_please(Name, Ip) of
243		{port, TcpPort, Version} ->
244		    ?trace("port_please(~p) -> version ~p~n",
245			   [Node,Version]),
246		    dist_util:reset_timer(Timer),
247		    case
248			gen_tcp:connect(
249			  Ip, TcpPort,
250			  connect_options([binary, {active, false}, {packet, 2}]))
251		    of
252			{ok, Socket} ->
253                            DistCtrl = spawn_dist_cntrlr(Socket),
254                            call_ctrlr(DistCtrl, {supervisor, self()}),
255                            flush_controller(DistCtrl, Socket),
256                            gen_tcp:controlling_process(Socket, DistCtrl),
257                            flush_controller(DistCtrl, Socket),
258                            HSData0 = hs_data_common(DistCtrl),
259			    HSData = HSData0#hs_data{kernel_pid = Kernel,
260                                                     other_node = Node,
261                                                     this_node = MyNode,
262                                                     socket = DistCtrl,
263                                                     timer = Timer,
264                                                     this_flags = 0,
265                                                     other_version = Version,
266                                                     request_type = Type},
267			    dist_util:handshake_we_started(HSData);
268			_ ->
269			    %% Other Node may have closed since
270			    %% port_please !
271			    ?trace("other node (~p) "
272				   "closed since port_please.~n",
273				   [Node]),
274			    ?shutdown(Node)
275		    end;
276		_ ->
277		    ?trace("port_please (~p) "
278			   "failed.~n", [Node]),
279		    ?shutdown(Node)
280	    end;
281	_Other ->
282	    ?trace("inet_getaddr(~p) "
283		   "failed (~p).~n", [Node,_Other]),
284	    ?shutdown(Node)
285    end.
286
287connect_options(Opts) ->
288    case application:get_env(kernel, inet_dist_connect_options) of
289	{ok,ConnectOpts} ->
290	    ConnectOpts ++ Opts;
291	_ ->
292	    Opts
293    end.
294
295%%
296%% Close a socket.
297%%
298close(Listen) ->
299    gen_tcp:close(Listen).
300
301
302%% If Node is illegal terminate the connection setup!!
303splitnode(Node, LongOrShortNames) ->
304    case split_node(atom_to_list(Node), $@, []) of
305	[Name|Tail] when Tail =/= [] ->
306	    Host = lists:append(Tail),
307	    case split_node(Host, $., []) of
308		[_] when LongOrShortNames =:= longnames ->
309                    case inet:parse_address(Host) of
310                        {ok, _} ->
311                            [Name, Host];
312                        _ ->
313                            error_msg("** System running to use "
314                                      "fully qualified "
315                                      "hostnames **~n"
316                                      "** Hostname ~ts is illegal **~n",
317                                      [Host]),
318                            ?shutdown(Node)
319                    end;
320		L when length(L) > 1, LongOrShortNames =:= shortnames ->
321		    error_msg("** System NOT running to use fully qualified "
322			      "hostnames **~n"
323			      "** Hostname ~ts is illegal **~n",
324			      [Host]),
325		    ?shutdown(Node);
326		_ ->
327		    [Name, Host]
328	    end;
329	[_] ->
330	    error_msg("** Nodename ~p illegal, no '@' character **~n",
331		      [Node]),
332	    ?shutdown(Node);
333	_ ->
334	    error_msg("** Nodename ~p illegal **~n", [Node]),
335	    ?shutdown(Node)
336    end.
337
338split_node([Chr|T], Chr, Ack) -> [lists:reverse(Ack)|split_node(T, Chr, [])];
339split_node([H|T], Chr, Ack)   -> split_node(T, Chr, [H|Ack]);
340split_node([], _, Ack)        -> [lists:reverse(Ack)].
341
342%% ------------------------------------------------------------
343%% Fetch local information about a Socket.
344%% ------------------------------------------------------------
345get_tcp_address(Socket) ->
346    {ok, Address} = inet:sockname(Socket),
347    {ok, Host} = inet:gethostname(),
348    #net_address {
349		  address = Address,
350		  host = Host,
351		  protocol = tcp,
352		  family = inet
353		 }.
354
355%% ------------------------------------------------------------
356%% Do only accept new connection attempts from nodes at our
357%% own LAN, if the check_ip environment parameter is true.
358%% ------------------------------------------------------------
359check_ip(DistCtrl) ->
360    case application:get_env(check_ip) of
361	{ok, true} ->
362	    case get_ifs(DistCtrl) of
363		{ok, IFs, IP} ->
364		    check_ip(IFs, IP);
365		_ ->
366		    ?shutdown(no_node)
367	    end;
368	_ ->
369	    true
370    end.
371
372get_ifs(DistCtrl) ->
373    Socket = call_ctrlr(DistCtrl, socket),
374    case inet:peername(Socket) of
375	{ok, {IP, _}} ->
376	    case inet:getif(Socket) of
377		{ok, IFs} -> {ok, IFs, IP};
378		Error     -> Error
379	    end;
380	Error ->
381	    Error
382    end.
383
384check_ip([{OwnIP, _, Netmask}|IFs], PeerIP) ->
385    case {inet_tcp:mask(Netmask, PeerIP), inet_tcp:mask(Netmask, OwnIP)} of
386	{M, M} -> true;
387	_      -> check_ip(IFs, PeerIP)
388    end;
389check_ip([], PeerIP) ->
390    {false, PeerIP}.
391
392is_node_name(Node) when is_atom(Node) ->
393    case split_node(atom_to_list(Node), $@, []) of
394	[_, _Host] -> true;
395	_ -> false
396    end;
397is_node_name(_Node) ->
398    false.
399
400hs_data_common(DistCtrl) ->
401    TickHandler = call_ctrlr(DistCtrl, tick_handler),
402    Socket = call_ctrlr(DistCtrl, socket),
403    RejectFlags = case init:get_argument(gen_tcp_dist_reject_flags) of
404                      {ok,[[Flags]]} -> list_to_integer(Flags);
405                      _ -> #hs_data{}#hs_data.reject_flags
406                  end,
407    #hs_data{f_send = send_fun(),
408             f_recv = recv_fun(),
409             f_setopts_pre_nodeup = setopts_pre_nodeup_fun(),
410             f_setopts_post_nodeup = setopts_post_nodeup_fun(),
411             f_getll = getll_fun(),
412             f_handshake_complete = handshake_complete_fun(),
413             f_address = address_fun(),
414             mf_setopts = setopts_fun(DistCtrl, Socket),
415             mf_getopts = getopts_fun(DistCtrl, Socket),
416             mf_getstat = getstat_fun(DistCtrl, Socket),
417             mf_tick = tick_fun(DistCtrl, TickHandler),
418             reject_flags = RejectFlags}.
419
420%%% ------------------------------------------------------------
421%%% Distribution controller processes
422%%% ------------------------------------------------------------
423
424%%
425%% There will be five parties working together when the
426%% connection is up:
427%% - The gen_tcp socket. Providing a tcp/ip connection
428%%   to the other node.
429%% - The output handler. It will dispatch all outgoing
430%%   traffic from the VM to the gen_tcp socket. This
431%%   process is registered as distribution controller
432%%   for this channel with the VM.
433%% - The input handler. It will dispatch all incoming
434%%   traffic from the gen_tcp socket to the VM. This
435%%   process is also the socket owner and receives
436%%   incoming traffic using active-N.
437%% - The tick handler. Dispatches asynchronous tick
438%%   requests to the socket. It executes on max priority
439%%   since it is important to get ticks through to the
440%%   other end.
441%% - The channel supervisor (provided by dist_util). It
442%%   monitors traffic. Issue tick requests to the tick
443%%   handler when no outgoing traffic is seen and bring
444%%   the connection down if no incoming traffic is seen.
445%%   This process also executes on max priority.
446%%
447%%   These parties are linked togheter so should one
448%%   of them fail, all of them are terminated and the
449%%   connection is taken down.
450%%
451
452%% In order to avoid issues with lingering signal binaries
453%% we enable off-heap message queue data as well as fullsweep
454%% after 0. The fullsweeps will be cheap since we have more
455%% or less no live data.
456-define(DIST_CNTRL_COMMON_SPAWN_OPTS,
457        [{message_queue_data, off_heap},
458         {fullsweep_after, 0}]).
459
460tick_fun(DistCtrl, TickHandler) ->
461    fun (Ctrl) when Ctrl == DistCtrl ->
462            TickHandler ! tick
463    end.
464
465getstat_fun(DistCtrl, Socket) ->
466    fun (Ctrl) when Ctrl == DistCtrl ->
467            case inet:getstat(Socket, [recv_cnt, send_cnt, send_pend]) of
468                {ok, Stat} ->
469                    split_stat(Stat,0,0,0);
470                Error ->
471                    Error
472            end
473    end.
474
475split_stat([{recv_cnt, R}|Stat], _, W, P) ->
476    split_stat(Stat, R, W, P);
477split_stat([{send_cnt, W}|Stat], R, _, P) ->
478    split_stat(Stat, R, W, P);
479split_stat([{send_pend, P}|Stat], R, W, _) ->
480    split_stat(Stat, R, W, P);
481split_stat([], R, W, P) ->
482    {ok, R, W, P}.
483
484setopts_fun(DistCtrl, Socket) ->
485    fun (Ctrl, Opts) when Ctrl == DistCtrl ->
486            setopts(Socket, Opts)
487    end.
488
489getopts_fun(DistCtrl, Socket) ->
490    fun (Ctrl, Opts) when Ctrl == DistCtrl ->
491            getopts(Socket, Opts)
492    end.
493
494setopts(S, Opts) ->
495    case [Opt || {K,_}=Opt <- Opts,
496		 K =:= active orelse K =:= deliver orelse K =:= packet] of
497	[] -> inet:setopts(S,Opts);
498	Opts1 -> {error, {badopts,Opts1}}
499    end.
500
501getopts(S, Opts) ->
502    inet:getopts(S, Opts).
503
504send_fun() ->
505    fun (Ctrlr, Packet) ->
506            call_ctrlr(Ctrlr, {send, Packet})
507    end.
508
509recv_fun() ->
510    fun (Ctrlr, Length, Timeout) ->
511            case call_ctrlr(Ctrlr, {recv, Length, Timeout}) of
512                {ok, Bin} when is_binary(Bin) ->
513                    {ok, binary_to_list(Bin)};
514                Other ->
515                    Other
516            end
517    end.
518
519getll_fun() ->
520    fun (Ctrlr) ->
521            call_ctrlr(Ctrlr, getll)
522    end.
523
524address_fun() ->
525    fun (Ctrlr, Node) ->
526            case call_ctrlr(Ctrlr, {address, Node}) of
527                {error, no_node} -> %% No '@' or more than one '@' in node name.
528		    ?shutdown(no_node);
529                Res ->
530                    Res
531            end
532    end.
533
534setopts_pre_nodeup_fun() ->
535    fun (Ctrlr) ->
536            call_ctrlr(Ctrlr, pre_nodeup)
537    end.
538
539setopts_post_nodeup_fun() ->
540    fun (Ctrlr) ->
541            call_ctrlr(Ctrlr, post_nodeup)
542    end.
543
544handshake_complete_fun() ->
545    fun (Ctrlr, Node, DHandle) ->
546            call_ctrlr(Ctrlr, {handshake_complete, Node, DHandle})
547    end.
548
549call_ctrlr(Ctrlr, Msg) ->
550    Ref = erlang:monitor(process, Ctrlr),
551    Ctrlr ! {Ref, self(), Msg},
552    receive
553        {Ref, Res} ->
554            erlang:demonitor(Ref, [flush]),
555            Res;
556        {'DOWN', Ref, process, Ctrlr, Reason} ->
557            exit({dist_controller_exit, Reason})
558    end.
559
560%%
561%% The tick handler process writes a tick to the
562%% socket when it receives a 'tick' message from
563%% the connection supervisor.
564%%
565%% We are not allowed to block the connection
566%% superviser when writing a tick and we also want
567%% the tick to go through even during a heavily
568%% loaded system. gen_tcp does not have a
569%% non-blocking send operation exposed in its API
570%% and we don't want to run the distribution
571%% controller under high priority. Therefore this
572%% sparate process with max prio that dispatches
573%% ticks.
574%%
575dist_cntrlr_tick_handler(Socket) ->
576    receive
577        tick ->
578            %% May block due to busy port...
579            sock_send(Socket, "");
580        _ ->
581            ok
582    end,
583    dist_cntrlr_tick_handler(Socket).
584
585spawn_dist_cntrlr(Socket) ->
586    spawn_opt(?MODULE, dist_cntrlr_setup, [Socket],
587              [{priority, max}] ++ ?DIST_CNTRL_COMMON_SPAWN_OPTS).
588
589dist_cntrlr_setup(Socket) ->
590    TickHandler = spawn_opt(?MODULE, dist_cntrlr_tick_handler,
591                            [Socket],
592                            [link, {priority, max}]
593                            ++ ?DIST_CNTRL_COMMON_SPAWN_OPTS),
594    dist_cntrlr_setup_loop(Socket, TickHandler, undefined).
595
596%%
597%% During the handshake phase we loop in dist_cntrlr_setup().
598%% When the connection is up we spawn an input handler and
599%% continue as output handler.
600%%
601dist_cntrlr_setup_loop(Socket, TickHandler, Sup) ->
602    receive
603        {tcp_closed, Socket} ->
604            exit(connection_closed);
605
606        {Ref, From, {supervisor, Pid}} ->
607            Res = link(Pid),
608            From ! {Ref, Res},
609            dist_cntrlr_setup_loop(Socket, TickHandler, Pid);
610
611        {Ref, From, tick_handler} ->
612            From ! {Ref, TickHandler},
613            dist_cntrlr_setup_loop(Socket, TickHandler, Sup);
614
615        {Ref, From, socket} ->
616            From ! {Ref, Socket},
617            dist_cntrlr_setup_loop(Socket, TickHandler, Sup);
618
619        {Ref, From, {send, Packet}} ->
620            Res = gen_tcp:send(Socket, Packet),
621            From ! {Ref, Res},
622            dist_cntrlr_setup_loop(Socket, TickHandler, Sup);
623
624        {Ref, From, {recv, Length, Timeout}} ->
625            Res = gen_tcp:recv(Socket, Length, Timeout),
626            From ! {Ref, Res},
627            dist_cntrlr_setup_loop(Socket, TickHandler, Sup);
628
629        {Ref, From, getll} ->
630            From ! {Ref, {ok, self()}},
631            dist_cntrlr_setup_loop(Socket, TickHandler, Sup);
632
633        {Ref, From, {address, Node}} ->
634            Res = case inet:peername(Socket) of
635                      {ok, Address} ->
636                          case split_node(atom_to_list(Node), $@, []) of
637                              [_,Host] ->
638                                  #net_address{address=Address,host=Host,
639                                               protocol=tcp, family=inet};
640                              _ ->
641                                  {error, no_node}
642                          end
643                  end,
644            From ! {Ref, Res},
645            dist_cntrlr_setup_loop(Socket, TickHandler, Sup);
646
647        {Ref, From, pre_nodeup} ->
648            Res = inet:setopts(Socket,
649                               [{active, false},
650                                {packet, 4},
651                                nodelay()]),
652            From ! {Ref, Res},
653            dist_cntrlr_setup_loop(Socket, TickHandler, Sup);
654
655        {Ref, From, post_nodeup} ->
656            Res = inet:setopts(Socket,
657                               [{active, false},
658                                {packet, 4},
659                                nodelay()]),
660            From ! {Ref, Res},
661            dist_cntrlr_setup_loop(Socket, TickHandler, Sup);
662
663        {Ref, From, {handshake_complete, _Node, DHandle}} ->
664            From ! {Ref, ok},
665            %% Handshake complete! Begin dispatching traffic...
666
667            %% We use separate process for dispatching input. This
668            %% is not necessary, but it enables parallel execution
669            %% of independent work loads at the same time as it
670            %% simplifies the the implementation...
671            InputHandler = spawn_opt(?MODULE, dist_cntrlr_input_setup,
672                                     [DHandle, Socket, Sup],
673                                     [link] ++ ?DIST_CNTRL_COMMON_SPAWN_OPTS),
674
675	    flush_controller(InputHandler, Socket),
676	    gen_tcp:controlling_process(Socket, InputHandler),
677	    flush_controller(InputHandler, Socket),
678
679            ok = erlang:dist_ctrl_input_handler(DHandle, InputHandler),
680
681            InputHandler ! DHandle,
682
683            %% From now on we execute on normal priority
684            process_flag(priority, normal),
685            erlang:dist_ctrl_get_data_notification(DHandle),
686            case init:get_argument(gen_tcp_dist_output_loop) of
687                error ->
688                    dist_cntrlr_output_loop(DHandle, Socket);
689                {ok, [[ModStr, FuncStr]]} -> % For testing...
690                    apply(list_to_atom(ModStr),
691                          list_to_atom(FuncStr),
692                          [DHandle, Socket])
693            end
694    end.
695
696%% We use active 10 for good throughput while still
697%% maintaining back-pressure if the input controller
698%% isn't able to handle all incoming messages...
699-define(ACTIVE_INPUT, 10).
700
701dist_cntrlr_input_setup(DHandle, Socket, Sup) ->
702    link(Sup),
703    %% Ensure we don't try to put data before registerd
704    %% as input handler...
705    receive
706        DHandle ->
707            dist_cntrlr_input_loop(DHandle, Socket, 0)
708    end.
709
710dist_cntrlr_input_loop(DHandle, Socket, N) when N =< ?ACTIVE_INPUT/2 ->
711    inet:setopts(Socket, [{active, ?ACTIVE_INPUT - N}]),
712    dist_cntrlr_input_loop(DHandle, Socket, ?ACTIVE_INPUT);
713dist_cntrlr_input_loop(DHandle, Socket, N) ->
714    receive
715        {tcp_closed, Socket} ->
716            %% Connection to remote node terminated...
717            exit(connection_closed);
718
719        {tcp, Socket, Data} ->
720            %% Incoming data from remote node...
721            try erlang:dist_ctrl_put_data(DHandle, Data)
722            catch _ : _ -> death_row()
723            end,
724            dist_cntrlr_input_loop(DHandle, Socket, N-1);
725
726        _ ->
727            %% Ignore...
728            dist_cntrlr_input_loop(DHandle, Socket, N)
729    end.
730
731dist_cntrlr_send_data(DHandle, Socket) ->
732    case erlang:dist_ctrl_get_data(DHandle) of
733        none ->
734            erlang:dist_ctrl_get_data_notification(DHandle);
735        Data ->
736            sock_send(Socket, Data),
737            dist_cntrlr_send_data(DHandle, Socket)
738    end.
739
740
741dist_cntrlr_output_loop(DHandle, Socket) ->
742    receive
743        dist_data ->
744            %% Outgoing data from this node...
745            try dist_cntrlr_send_data(DHandle, Socket)
746            catch _ : _ -> death_row()
747            end,
748            dist_cntrlr_output_loop(DHandle, Socket);
749
750        {send, From, Ref, Data} ->
751            %% This is for testing only!
752            %%
753            %% Needed by some OTP distribution
754            %% test suites...
755            sock_send(Socket, Data),
756            From ! {Ref, ok},
757            dist_cntrlr_output_loop(DHandle, Socket);
758
759        _ ->
760            %% Drop garbage message...
761            dist_cntrlr_output_loop(DHandle, Socket)
762
763    end.
764
765sock_send(Socket, Data) ->
766    try gen_tcp:send(Socket, Data) of
767        ok -> ok;
768        {error, Reason} -> death_row({send_error, Reason})
769    catch
770        Type : Reason -> death_row({send_error, {Type, Reason}})
771    end.
772
773death_row() ->
774    death_row(connection_closed).
775
776death_row(normal) ->
777    %% We do not want to exit with normal
778    %% exit reason since it wont bring down
779    %% linked processes...
780    death_row();
781death_row(Reason) ->
782    %% When the connection is on its way down operations
783    %% begin to fail. We catch the failures and call
784    %% this function waiting for termination. We should
785    %% be terminated by one of our links to the other
786    %% involved parties that began bringing the
787    %% connection down. By waiting for termination we
788    %% avoid altering the exit reason for the connection
789    %% teardown. We however limit the wait to 5 seconds
790    %% and bring down the connection ourselves if not
791    %% terminated...
792    receive after 5000 -> exit(Reason) end.
793