1%%
2%% %CopyrightBegin%
3%%
4%% Copyright Ericsson AB 2019. All Rights Reserved.
5%%
6%% Licensed under the Apache License, Version 2.0 (the "License");
7%% you may not use this file except in compliance with the License.
8%% You may obtain a copy of the License at
9%%
10%%     http://www.apache.org/licenses/LICENSE-2.0
11%%
12%% Unless required by applicable law or agreed to in writing, software
13%% distributed under the License is distributed on an "AS IS" BASIS,
14%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15%% See the License for the specific language governing permissions and
16%% limitations under the License.
17%%
18%% %CopyrightEnd%
19%%
20%% -------------------------------------------------------------------------
21%%
22%% Module for encrypted Erlang protocol - a minimal encrypted
23%% distribution protocol based on only a shared secret
24%% and the crypto application
25%%
26-module(inet_crypto_dist).
27-define(DIST_NAME, inet_crypto).
28-define(DIST_PROTO, crypto).
29-define(DRIVER, inet_tcp).
30-define(FAMILY, inet).
31
32-export([listen/1, accept/1, accept_connection/5,
33	 setup/5, close/1, select/1, is_node_name/1]).
34
35%% Generalized dist API, for sibling IPv6 module inet6_crypto_dist
36-export([gen_listen/2, gen_accept/2, gen_accept_connection/6,
37	 gen_setup/6, gen_close/2, gen_select/2]).
38
39-export([nodelay/0]).
40
41%% Debug
42%%%-compile(export_all).
43-export([dbg/0, test_server/0, test_client/1]).
44
45-include_lib("kernel/include/net_address.hrl").
46-include_lib("kernel/include/dist.hrl").
47-include_lib("kernel/include/dist_util.hrl").
48
49-define(PACKET_SIZE, 65536).
50-define(BUFFER_SIZE, (?PACKET_SIZE bsl 4)).
51
52%% -------------------------------------------------------------------------
53
54-record(params,
55        {socket,
56         dist_handle,
57         hmac_algorithm = sha256,
58         aead_cipher = aes_gcm,
59         rekey_key,
60         iv = 12,
61         key = 16,
62         tag_len = 16,
63         rekey_interval = 262144
64        }).
65
66params(Socket) ->
67    #params{socket = Socket}.
68
69
70-record(key_pair,
71        {type = ecdh,
72         %% The curve choice greatly affects setup time,
73         %% we really want an Edwards curve but that would
74         %% require a very new openssl version.
75         %% Twisted brainpool curves (*t1) are faster than
76         %% non-twisted (*r1), 256 is much faster than 384,
77         %% and so on...
78%%%         params = brainpoolP384t1,
79         params = brainpoolP256t1,
80         public,
81         private}).
82
83-define(KEY_PAIR_LIFE_TIME, 3600000). % 1 hour
84-define(KEY_PAIR_LIFE_COUNT, 256). % Number of connection setups
85
86
87%% -------------------------------------------------------------------------
88%% Keep the node's public/private key pair in the process state
89%% of a key pair server linked to the acceptor process.
90%% Create the key pair the first time it is needed
91%% so crypto gets time to start first.
92%%
93
94start_key_pair_server() ->
95    monitor_dist_proc(
96      spawn_link(
97        fun () ->
98                register(?MODULE, self()),
99                key_pair_server()
100        end)).
101
102key_pair_server() ->
103    key_pair_server(undefined, undefined, undefined).
104%%
105key_pair_server(KeyPair) ->
106    key_pair_server(
107      KeyPair,
108      erlang:start_timer(?KEY_PAIR_LIFE_TIME, self(), discard),
109      ?KEY_PAIR_LIFE_COUNT).
110%%
111key_pair_server(_KeyPair, Timer, 0) ->
112    cancel_timer(Timer),
113    key_pair_server();
114key_pair_server(KeyPair, Timer, Count) ->
115    receive
116        {Pid, Tag, get_key_pair} ->
117            case KeyPair of
118                undefined ->
119                    KeyPair_1 = generate_key_pair(),
120                    Pid ! {Tag, KeyPair_1},
121                    key_pair_server(KeyPair_1);
122                #key_pair{} ->
123                    Pid ! {Tag, KeyPair},
124                    key_pair_server(KeyPair, Timer, Count - 1)
125            end;
126        {Pid, Tag, get_new_key_pair} ->
127            cancel_timer(Timer),
128            KeyPair_1 = generate_key_pair(),
129            Pid ! {Tag, KeyPair_1},
130            key_pair_server(KeyPair_1);
131        {timeout, Timer, discard} when is_reference(Timer) ->
132            key_pair_server()
133    end.
134
135generate_key_pair() ->
136    #key_pair{type = Type, params = Params} = #key_pair{},
137    {Public, Private} =
138        crypto:generate_key(Type, Params),
139    #key_pair{public = Public, private = Private}.
140
141cancel_timer(undefined) ->
142    ok;
143cancel_timer(Timer) ->
144    case erlang:cancel_timer(Timer) of
145        false ->
146            receive
147                {timeout, Timer, _} -> ok
148            end;
149        _RemainingTime ->
150            ok
151    end.
152
153get_key_pair() ->
154    call_key_pair_server(get_key_pair).
155
156get_new_key_pair() ->
157    call_key_pair_server(get_new_key_pair).
158
159call_key_pair_server(Request) ->
160    Pid = whereis(?MODULE),
161    Ref = erlang:monitor(process, Pid),
162    Pid ! {self(), Ref, Request},
163    receive
164        {Ref, Reply} ->
165            erlang:demonitor(Ref, [flush]),
166            Reply;
167        {'DOWN', Ref, process, Pid, Reason} ->
168            error(Reason)
169    end.
170
171compute_shared_secret(
172  #key_pair{
173     type = PublicKeyType,
174     params = PublicKeyParams,
175     private = PrivKey}, PubKey) ->
176    %%
177    crypto:compute_key(PublicKeyType, PubKey, PrivKey, PublicKeyParams).
178
179%% -------------------------------------------------------------------------
180%% Erlang distribution plugin structure explained to myself
181%% -------
182%% These are the processes involved in the distribution:
183%% * net_kernel
184%% * The Acceptor
185%% * The Controller | Handshaker | Ticker
186%% * The DistCtrl process that may be split into:
187%%   + The Output controller
188%%   + The Input controller
189%%   For the regular inet_tcp_dist distribution module, DistCtrl
190%%   is not one or two processes, but one port - a gen_tcp socket
191%%
192%% When the VM is started with the argument "-proto_dist inet_crypto"
193%% net_kernel registers the module inet_crypto_dist acli,oams distribution
194%% module.  net_kernel calls listen/1 to create a listen socket
195%% and then accept/1 with the listen socket as argument to spawn
196%% the Acceptor process, which is linked to net_kernel.  Apparently
197%% the listen socket is owned by net_kernel - I wonder if it could
198%% be owned by the Acceptor process instead...
199%%
200%% The Acceptor process calls blocking accept on the listen socket
201%% and when an incoming socket is returned it spawns the DistCtrl
202%% process a linked to the Acceptor.  The ownership of the accepted
203%% socket is transferred to the DistCtrl process.
204%% A message is sent to net_kernel to inform it that an incoming
205%% connection has appeared and the Acceptor awaits a reply from net_kernel.
206%%
207%% net_kernel then calls accept_connection/5 to spawn the Controller |
208%% Handshaker | Ticker process that is linked to net_kernel.
209%% The Controller then awaits a message from the Acceptor process.
210%%
211%% When net_kernel has spawned the Controller it replies with a message
212%% to the Acceptor that then calls DistCtrl to changes its links
213%% so DistCtrl ends up linked to the Controller and not to the Acceptor.
214%% The Acceptor then sends a message to the Controller.  The Controller
215%% then changes role into the Handshaker creates a #hs_data{} record
216%% and calls dist_util:handshake_other_started/1.  After this
217%% the Acceptor goes back into a blocking accept on the listen socket.
218%%
219%% For the regular distribution inet_tcp_dist DistCtrl is a gen_tcp socket
220%% and when it is a process it also acts as a socket.  The #hs_data{}
221%% record used by dist_util presents a set of funs that are used
222%% by dist_util to perform the distribution handshake.  These funs
223%% make sure to transfer the handshake messages through the DistCtrl
224%% "socket".
225%%
226%% When the handshake is finished a fun for this purpose in #hs_data{}
227%% is called, which tells DistCtrl that it does not need to be prepared
228%% for any more #hs_data{} handshake calls.  The DistCtrl process in this
229%% module then spawns the Input controller process that gets ownership
230%% of the connection's gen_tcp socket and changes into {active, N} mode
231%% so now it gets all incoming traffic and delivers that to the VM.
232%% The original DistCtrl process changes role into the Output controller
233%% process and starts asking the VM for outbound messages and transfers
234%% them on the connection socket.
235%%
236%% The Handshaker now changes into the Ticker role, and uses only two
237%% functions in the #hs_data{} record; one to get socket statistics
238%% and one to send a tick.  None of these may block for any reason
239%% in particular not for a congested socket since that would destroy
240%% connection supervision.
241%%
242%%
243%% For an connection net_kernel calls setup/5 which spawns the
244%% Controller process as linked to net_kernel.  This Controller process
245%% connects to the other node's listen socket and when that is succesful
246%% spawns the DistCtrl process as linked to the controller and transfers
247%% socket ownership to it.
248%%
249%% Then the Controller creates the #hs_data{} record and calls
250%% dist_util:handshake_we_started/1 which changes the process role
251%% into Handshaker.
252%%
253%% When the distribution handshake is finished the procedure is just
254%% as for an incoming connection above.
255%%
256%%
257%% To sum it up.
258%%
259%% There is an Acceptor process that is linked to net_kernel and
260%% informs it when new connections arrive.
261%%
262%% net_kernel spawns Controllers for incoming and for outgoing connections.
263%% these Controllers use the DistCtrl processes to do distribution
264%% handshake and after that becomes Tickers that supervise the connection.
265%%
266%% The Controller | Handshaker | Ticker is linked to net_kernel, and to
267%% DistCtrl, one or both.  If any of these connection processes would die
268%% all others should be killed by the links.  Therefore none of them may
269%% terminate with reason 'normal'.
270%% -------------------------------------------------------------------------
271
272-compile({inline, [socket_options/0]}).
273socket_options() ->
274    [binary, {active, false}, {packet, 2}, {nodelay, true},
275     {sndbuf, ?BUFFER_SIZE}, {recbuf, ?BUFFER_SIZE},
276     {buffer, ?BUFFER_SIZE}].
277
278%% -------------------------------------------------------------------------
279%% select/1 is called by net_kernel to ask if this distribution protocol
280%% is willing to handle Node
281%%
282
283select(Node) ->
284    gen_select(Node, ?DRIVER).
285
286gen_select(Node, Driver) ->
287    case dist_util:split_node(Node) of
288        {node, _, Host} ->
289	    case Driver:getaddr(Host) of
290		{ok, _} -> true;
291		_ -> false
292	    end;
293        _ ->
294            false
295    end.
296
297%% -------------------------------------------------------------------------
298
299is_node_name(Node) ->
300    dist_util:is_node_name(Node).
301
302%% -------------------------------------------------------------------------
303%% Called by net_kernel to create a listen socket for this
304%% distribution protocol.  This listen socket is used by
305%% the Acceptor process.
306%%
307
308listen(Name) ->
309    gen_listen(Name, ?DRIVER).
310
311gen_listen(Name, Driver) ->
312    case inet_tcp_dist:gen_listen(Driver, Name) of
313        {ok, {Socket, Address, Creation}} ->
314            inet:setopts(Socket, socket_options()),
315            {ok,
316             {Socket, Address#net_address{protocol = ?DIST_PROTO}, Creation}};
317        Other ->
318            Other
319    end.
320
321%% -------------------------------------------------------------------------
322%% Called by net_kernel to spawn the Acceptor process that awaits
323%% new connection in a blocking accept and informs net_kernel
324%% when a new connection has appeared, and starts the DistCtrl
325%% "socket" process for the connection.
326%%
327
328accept(Listen) ->
329    gen_accept(Listen, ?DRIVER).
330
331gen_accept(Listen, Driver) ->
332    NetKernel = self(),
333    %%
334    %% Spawn Acceptor process
335    %%
336    monitor_dist_proc(
337      spawn_opt(
338        fun () ->
339                start_key_pair_server(),
340                accept_loop(Listen, Driver, NetKernel)
341        end,
342        [link, {priority, max}])).
343
344accept_loop(Listen, Driver, NetKernel) ->
345    case Driver:accept(trace(Listen)) of
346        {ok, Socket} ->
347            wait_for_code_server(),
348            Timeout = net_kernel:connecttime(),
349            DistCtrl = start_dist_ctrl(trace(Socket), Timeout),
350            %% DistCtrl is a "socket"
351            NetKernel !
352                trace({accept,
353                       self(), DistCtrl, Driver:family(), ?DIST_PROTO}),
354            receive
355                {NetKernel, controller, Controller} ->
356                    call_dist_ctrl(DistCtrl, {controller, Controller, self()}),
357                    Controller ! {self(), controller, Socket};
358                {NetKernel, unsupported_protocol} ->
359                    exit(unsupported_protocol)
360            end,
361            accept_loop(Listen, Driver, NetKernel);
362        AcceptError ->
363            exit({accept, AcceptError})
364    end.
365
366wait_for_code_server() ->
367    %% This is an ugly hack.  Starting encryption on a connection
368    %% requires the crypto module to be loaded.  Loading the crypto
369    %% module triggers its on_load function, which calls
370    %% code:priv_dir/1 to find the directory where its NIF library is.
371    %% However, distribution is started earlier than the code server,
372    %% so the code server is not necessarily started yet, and
373    %% code:priv_dir/1 might fail because of that, if we receive
374    %% an incoming connection on the distribution port early enough.
375    %%
376    %% If the on_load function of a module fails, the module is
377    %% unloaded, and the function call that triggered loading it fails
378    %% with 'undef', which is rather confusing.
379    %%
380    %% So let's avoid that by waiting for the code server to start.
381    %%
382    case whereis(code_server) of
383	undefined ->
384	    timer:sleep(10),
385	    wait_for_code_server();
386	Pid when is_pid(Pid) ->
387	    ok
388    end.
389
390%% -------------------------------------------------------------------------
391%% Called by net_kernel when a new connection has appeared, to spawn
392%% a Controller process that performs the handshake with the new node,
393%% and then becomes the Ticker connection supervisor.
394%% -------------------------------------------------------------------------
395
396accept_connection(Acceptor, DistCtrl, MyNode, Allowed, SetupTime) ->
397    gen_accept_connection(
398      Acceptor, DistCtrl, MyNode, Allowed, SetupTime, ?DRIVER).
399
400gen_accept_connection(
401  Acceptor, DistCtrl, MyNode, Allowed, SetupTime, Driver) ->
402    NetKernel = self(),
403    %%
404    %% Spawn Controller/handshaker/ticker process
405    %%
406    monitor_dist_proc(
407      spawn_opt(
408        fun() ->
409                do_accept(
410                  Acceptor, DistCtrl,
411                  trace(MyNode), Allowed, SetupTime, Driver, NetKernel)
412        end,
413        [link, {priority, max}])).
414
415do_accept(
416  Acceptor, DistCtrl, MyNode, Allowed, SetupTime, Driver, NetKernel) ->
417    %%
418    receive
419	{Acceptor, controller, Socket} ->
420	    Timer = dist_util:start_timer(SetupTime),
421            HSData =
422                hs_data_common(
423                  NetKernel, MyNode, DistCtrl, Timer,
424                  Socket, Driver:family()),
425            HSData_1 =
426                HSData#hs_data{
427                  this_node = MyNode,
428                  this_flags = 0,
429                  allowed = Allowed},
430            dist_util:handshake_other_started(trace(HSData_1))
431    end.
432
433%% -------------------------------------------------------------------------
434%% Called by net_kernel to spawn a Controller process that sets up
435%% a new connection to another Erlang node, performs the handshake
436%% with the other it, and then becomes the Ticker process
437%% that supervises the connection.
438%% -------------------------------------------------------------------------
439
440setup(Node, Type, MyNode, LongOrShortNames, SetupTime) ->
441    gen_setup(Node, Type, MyNode, LongOrShortNames, SetupTime, ?DRIVER).
442
443gen_setup(Node, Type, MyNode, LongOrShortNames, SetupTime, Driver) ->
444    NetKernel = self(),
445    %%
446    %% Spawn Controller/handshaker/ticker process
447    %%
448    monitor_dist_proc(
449      spawn_opt(
450        setup_fun(
451          Node, Type, MyNode, LongOrShortNames, SetupTime, Driver, NetKernel),
452        [link, {priority, max}])).
453
454-spec setup_fun(_,_,_,_,_,_,_) -> fun(() -> no_return()).
455setup_fun(
456  Node, Type, MyNode, LongOrShortNames, SetupTime, Driver, NetKernel) ->
457    %%
458    fun() ->
459            do_setup(
460              trace(Node), Type, MyNode, LongOrShortNames, SetupTime,
461              Driver, NetKernel)
462    end.
463
464-spec do_setup(_,_,_,_,_,_,_) -> no_return().
465do_setup(
466  Node, Type, MyNode, LongOrShortNames, SetupTime, Driver, NetKernel) ->
467    %%
468    {Name, Address} = split_node(Driver, Node, LongOrShortNames),
469    ErlEpmd = net_kernel:epmd_module(),
470    {ARMod, ARFun} = get_address_resolver(ErlEpmd, Driver),
471    Timer = trace(dist_util:start_timer(SetupTime)),
472    case ARMod:ARFun(Name, Address, Driver:family()) of
473        {ok, Ip, TcpPort, Version} ->
474            do_setup_connect(
475              Node, Type, MyNode, Timer, Driver, NetKernel,
476              Ip, TcpPort, Version);
477	{ok, Ip} ->
478	    case ErlEpmd:port_please(Name, Ip) of
479		{port, TcpPort, Version} ->
480                do_setup_connect(
481                  Node, Type, MyNode, Timer, Driver, NetKernel,
482                  Ip, TcpPort, trace(Version));
483		Other ->
484                    _ = trace(
485                          {ErlEpmd, port_please, [Name, Ip], Other}),
486                    ?shutdown(Node)
487	    end;
488	Other ->
489            _ = trace(
490                  {ARMod, ARFun, [Name, Address, Driver:family()],
491                   Other}),
492            ?shutdown(Node)
493    end.
494
495-spec do_setup_connect(_,_,_,_,_,_,_,_,_) -> no_return().
496
497do_setup_connect(
498  Node, Type, MyNode, Timer, Driver, NetKernel,
499  Ip, TcpPort, Version) ->
500    dist_util:reset_timer(Timer),
501    ConnectOpts = trace(connect_options(socket_options())),
502    case Driver:connect(Ip, TcpPort, ConnectOpts) of
503        {ok, Socket} ->
504            DistCtrl =
505                try start_dist_ctrl(Socket, net_kernel:connecttime())
506                catch error : {dist_ctrl, _} = DistCtrlError ->
507                        _ = trace(DistCtrlError),
508                        ?shutdown(Node)
509                end,
510            %% DistCtrl is a "socket"
511            HSData =
512                hs_data_common(
513                  NetKernel, MyNode, DistCtrl, Timer,
514                  Socket, Driver:family()),
515            HSData_1 =
516                HSData#hs_data{
517                  other_node = Node,
518                  this_flags = 0,
519                  other_version = Version,
520                  request_type = Type},
521            dist_util:handshake_we_started(trace(HSData_1));
522        ConnectError ->
523            _ = trace(
524                  {Driver, connect, [Ip, TcpPort, ConnectOpts],
525                   ConnectError}),
526            ?shutdown(Node)
527    end.
528
529%% -------------------------------------------------------------------------
530%% close/1 is only called by net_kernel on the socket returned by listen/1.
531
532close(Socket) ->
533    gen_close(Socket, ?DRIVER).
534
535gen_close(Socket, Driver) ->
536    Driver:close(trace(Socket)).
537
538%% -------------------------------------------------------------------------
539
540
541hs_data_common(NetKernel, MyNode, DistCtrl, Timer, Socket, Family) ->
542    %% Field 'socket' below is set to DistCtrl, which makes
543    %% the distribution handshake process (ticker) call
544    %% the funs below with DistCtrl as the S argument.
545    %% So, S =:= DistCtrl below...
546    #hs_data{
547       kernel_pid = NetKernel,
548       this_node = MyNode,
549       socket = DistCtrl,
550       timer = Timer,
551       %%
552       f_send = % -> ok | {error, closed}=>?shutdown()
553           fun (S, Packet) when S =:= DistCtrl ->
554                   try call_dist_ctrl(S, {send, Packet})
555                   catch error : {dist_ctrl, Reason} ->
556                           _ = trace(Reason),
557                           {error, closed}
558                   end
559           end,
560       f_recv = % -> {ok, List} | Other=>?shutdown()
561           fun (S, 0, infinity) when S =:= DistCtrl ->
562                   try call_dist_ctrl(S, recv) of
563                       {ok, Bin} when is_binary(Bin) ->
564                           {ok, binary_to_list(Bin)};
565                       Error ->
566                           Error
567                   catch error : {dist_ctrl, Reason} ->
568                           {error, trace(Reason)}
569                   end
570           end,
571       f_setopts_pre_nodeup =
572           fun (S) when S =:= DistCtrl ->
573                   ok
574           end,
575       f_setopts_post_nodeup =
576           fun (S) when S =:= DistCtrl ->
577                   ok
578           end,
579       f_getll =
580           fun (S) when S =:= DistCtrl ->
581                   {ok, S} %% DistCtrl is the distribution port
582           end,
583       f_address = % -> #net_address{} | ?shutdown()
584           fun (S, Node) when S =:= DistCtrl ->
585                   try call_dist_ctrl(S, peername) of
586                       {ok, Address} ->
587                           case dist_util:split_node(Node) of
588                               {node, _, Host} ->
589                                   #net_address{
590                                      address = Address,
591                                      host = Host,
592                                      protocol = ?DIST_PROTO,
593                                      family = Family};
594                               _ ->
595                                   ?shutdown(Node)
596                           end;
597                       Error ->
598                           _ = trace(Error),
599                           ?shutdown(Node)
600                   catch error : {dist_ctrl, Reason} ->
601                           _ = trace(Reason),
602                           ?shutdown(Node)
603                   end
604           end,
605       f_handshake_complete = % -> ok | ?shutdown()
606           fun (S, Node, DistHandle) when S =:= DistCtrl ->
607                   try call_dist_ctrl(S, {handshake_complete, DistHandle})
608                   catch error : {dist_ctrl, Reason} ->
609                           _ = trace(Reason),
610                           ?shutdown(Node)
611                   end
612           end,
613       %%
614       %% mf_tick/1, mf_getstat/1, mf_setopts/2 and mf_getopts/2
615       %% are called by the ticker any time after f_handshake_complete/3
616       %% so they may not block the caller even for congested socket
617       mf_tick =
618           fun (S) when S =:= DistCtrl ->
619                   S ! dist_tick
620           end,
621       mf_getstat = % -> {ok, RecvCnt, SendCnt, SendPend} | Other=>ignore_it
622           fun (S) when S =:= DistCtrl ->
623                   case
624                       inet:getstat(Socket, [recv_cnt, send_cnt, send_pend])
625                   of
626                       {ok, Stat} ->
627                           split_stat(Stat, 0, 0, 0);
628                       Error ->
629                           trace(Error)
630                   end
631           end,
632       mf_setopts =
633           fun (S, Opts) when S =:= DistCtrl ->
634                   inet:setopts(Socket, setopts_filter(Opts))
635           end,
636       mf_getopts =
637           fun (S, Opts) when S =:= DistCtrl ->
638                   inet:getopts(Socket, Opts)
639           end}.
640
641setopts_filter(Opts) ->
642    [Opt ||
643        Opt <- Opts,
644        case Opt of
645            {K, _} when K =:= active; K =:= deliver; K =:= packet -> false;
646            K when K =:= list; K =:= binary -> false;
647            K when K =:= inet; K =:= inet6 -> false;
648            _ -> true
649        end].
650
651split_stat([{recv_cnt, R}|Stat], _, W, P) ->
652    split_stat(Stat, R, W, P);
653split_stat([{send_cnt, W}|Stat], R, _, P) ->
654    split_stat(Stat, R, W, P);
655split_stat([{send_pend, P}|Stat], R, W, _) ->
656    split_stat(Stat, R, W, P);
657split_stat([], R, W, P) ->
658    {ok, R, W, P}.
659
660%% ------------------------------------------------------------
661%% Determine if EPMD module supports address resolving. Default
662%% is to use inet_tcp:getaddr/2.
663%% ------------------------------------------------------------
664get_address_resolver(EpmdModule, _Driver) ->
665    case erlang:function_exported(EpmdModule, address_please, 3) of
666        true -> {EpmdModule, address_please};
667        _    -> {erl_epmd, address_please}
668    end.
669
670
671%% If Node is illegal terminate the connection setup!!
672split_node(Driver, Node, LongOrShortNames) ->
673    case dist_util:split_node(Node) of
674        {node, Name, Host} ->
675	    check_node(Driver, Node, Name, Host, LongOrShortNames);
676	{host, _} ->
677	    error_logger:error_msg(
678              "** Nodename ~p illegal, no '@' character **~n",
679              [Node]),
680	    ?shutdown2(Node, trace({illegal_node_n@me, Node}));
681	_ ->
682	    error_logger:error_msg(
683              "** Nodename ~p illegal **~n", [Node]),
684	    ?shutdown2(Node, trace({illegal_node_name, Node}))
685    end.
686
687check_node(Driver, Node, Name, Host, LongOrShortNames) ->
688    case string:split(Host, ".", all) of
689	[_] when LongOrShortNames =:= longnames ->
690	    case Driver:parse_address(Host) of
691		{ok, _} ->
692		    {Name, Host};
693		_ ->
694		    error_logger:error_msg(
695                      "** System running to use "
696                      "fully qualified hostnames **~n"
697                      "** Hostname ~s is illegal **~n",
698                      [Host]),
699		    ?shutdown2(Node, trace({not_longnames, Host}))
700	    end;
701	[_, _|_] when LongOrShortNames =:= shortnames ->
702	    error_logger:error_msg(
703              "** System NOT running to use "
704              "fully qualified hostnames **~n"
705              "** Hostname ~s is illegal **~n",
706              [Host]),
707	    ?shutdown2(Node, trace({not_shortnames, Host}));
708	_ ->
709	    {Name, Host}
710    end.
711
712%% -------------------------------------------------------------------------
713
714connect_options(Opts) ->
715    case application:get_env(kernel, inet_dist_connect_options) of
716	{ok, ConnectOpts} ->
717            Opts ++ setopts_filter(ConnectOpts);
718	_ ->
719	    Opts
720    end.
721
722%% we may not always want the nodelay behaviour
723%% for performance reasons
724nodelay() ->
725    case application:get_env(kernel, dist_nodelay) of
726	undefined ->
727	    {nodelay, true};
728	{ok, true} ->
729	    {nodelay, true};
730	{ok, false} ->
731	    {nodelay, false};
732	_ ->
733	    {nodelay, true}
734    end.
735
736%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
737%%
738%% The DistCtrl process(es).
739%%
740%% At net_kernel handshake_complete spawns off the input controller that
741%% takes over the socket ownership, and itself becomes the output controller
742%%
743%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
744
745%%% XXX Missing to "productified":
746%%% * Cryptoanalysis by experts, this is crypto amateur work.
747%%% * Is it useful over inet_tls_dist; i.e to not have to bother
748%%%   with certificates but instead manage a secret cluster cookie?
749%%% * An application to belong to (kernel)
750%%% * Restart and/or code reload policy (not needed in kernel)
751%%% * Fitting into the epmd/Erlang distro protocol version framework
752%%%   (something needs to be created for multiple protocols, epmd,
753%%%    multiple address families, fallback to previous version, etc)
754
755
756%% Debug client and server
757
758test_server() ->
759    {ok, Listen} = gen_tcp:listen(0, socket_options()),
760    {ok, Port} = inet:port(Listen),
761    io:format(?MODULE_STRING":test_client(~w).~n", [Port]),
762    {ok, Socket} = gen_tcp:accept(Listen),
763    test(Socket).
764
765test_client(Port) ->
766    {ok, Socket} = gen_tcp:connect(localhost, Port, socket_options()),
767    test(Socket).
768
769test(Socket) ->
770    start_dist_ctrl(Socket, 10000).
771
772%% -------------------------------------------------------------------------
773
774start_dist_ctrl(Socket, Timeout) ->
775    Secret = atom_to_binary(auth:get_cookie(), latin1),
776    Controller = self(),
777    Server =
778        monitor_dist_proc(
779          spawn_opt(
780            fun () ->
781                    receive
782                        {?MODULE, From, start} ->
783                            {SendParams, RecvParams} =
784                                init(Socket, Secret),
785                            reply(From, self()),
786                            handshake(SendParams, 1, RecvParams, 1, Controller)
787                    end
788            end,
789            [link,
790             {priority, max},
791             {message_queue_data, off_heap},
792             {fullsweep_after, 0}])),
793    ok = gen_tcp:controlling_process(Socket, Server),
794    call_dist_ctrl(Server, start, Timeout).
795
796
797call_dist_ctrl(Server, Msg) ->
798    call_dist_ctrl(Server, Msg, infinity).
799%%
800call_dist_ctrl(Server, Msg, Timeout) ->
801    Ref = erlang:monitor(process, Server),
802    Server ! {?MODULE, {Ref, self()}, Msg},
803    receive
804        {Ref, Res} ->
805            erlang:demonitor(Ref, [flush]),
806            Res;
807        {'DOWN', Ref, process, Server, Reason} ->
808            error({dist_ctrl, Reason})
809    after Timeout -> % Timeout < infinity is only used by start_dist_ctrl/2
810            receive
811                {'DOWN', Ref, process, Server, _} ->
812                    receive {Ref, _} -> ok after 0 -> ok end,
813                    error({dist_ctrl, timeout})
814                    %% Server will be killed by link
815            end
816    end.
817
818reply({Ref, Pid}, Msg) ->
819    Pid ! {Ref, Msg},
820    ok.
821
822%% -------------------------------------------------------------------------
823
824-define(TCP_ACTIVE, 16).
825-define(CHUNK_SIZE, (?PACKET_SIZE - 512)).
826
827-define(HANDSHAKE_CHUNK, 1).
828-define(DATA_CHUNK, 2).
829-define(TICK_CHUNK, 3).
830-define(REKEY_CHUNK, 4).
831
832%% -------------------------------------------------------------------------
833%% Crypto strategy
834%% -------
835%% The crypto strategy is as simple as possible to get an encrypted
836%% connection as benchmark reference.  It is geared around AEAD
837%% ciphers in particular AES-GCM.
838%%
839%% The init message and the start message must fit in the TCP buffers
840%% since both sides start with sending the init message, waits
841%% for the other end's init message, sends the start message
842%% and waits for the other end's start message.  So if the send
843%% blocks we have a deadlock.
844%%
845%% The init + start sequence tries to implement Password Encrypted
846%% Key Exchange using a node public/private key pair and the
847%% shared secret (the Cookie) to create session encryption keys
848%% that can not be re-created if the shared secret is compromized,
849%% which should create forward secrecy.  You need both nodes'
850%% key pairs and the shared secret to decrypt the traffic
851%% between the nodes.
852%%
853%% All exchanged messages uses {packet, 2} i.e 16 bit size header.
854%%
855%% The init message contains a random number and encrypted: the public key
856%% and two random numbers.  The encryption is done with Key and IV hashed
857%% from the unencrypted random number and the shared secret.
858%%
859%% The other node's public key is used with the own node's private
860%% key to create a shared key that is hashed with one of the encrypted
861%% random numbers from each side to create Key and IV for the session.
862%%
863%% The start message contains the two encrypted random numbers
864%% this time encrypted with the session keys for verification
865%% by the other side, plus the rekey interval.  The rekey interval
866%% is just there to get an early check for if the other side's
867%% maximum rekey interal is acceptable, it is just an embryo
868%% of some better check.  Any side may rekey earlier but if the
869%% rekey interval is exceeded the connection fails.
870%%
871%% Subsequent encrypted messages has the sequence number and the length
872%% of the message as AAD data, and an incrementing IV.  These messages
873%% has got a message type that differentiates data from ticks and rekeys.
874%% Ticks have a random size in an attempt to make them less obvious to spot.
875%%
876%% Rekeying is done by the sender that creates a new key pair and
877%% a new shared secret from the other end's public key and with
878%% this and the current key and iv hashes a new key and iv.
879%% The new public key is sent to the other end that uses it
880%% and its old private key to create the same new shared
881%% secret and from that a new key and iv.
882%% So the receiver keeps its private key, and the sender keeps
883%% the receivers public key for the connection's life time.
884%% While the sender generates a new key pair at every rekey,
885%% which changes the shared secret at every rekey.
886%%
887%% The only reaction to errors is to crash noisily (?) wich will bring
888%% down the connection and hopefully produce something useful
889%% in the local log, but all the other end sees is a closed connection.
890%% -------------------------------------------------------------------------
891
892init(Socket, Secret) ->
893    #key_pair{public = PubKey} = KeyPair = get_key_pair(),
894    Params = params(Socket),
895    {R2, R3, Msg} = init_msg(Params, PubKey, Secret),
896    ok = gen_tcp:send(Socket, Msg),
897    init_recv(Params, Secret, KeyPair, R2, R3).
898
899init_recv(
900  #params{socket = Socket, iv = IVLen} = Params, Secret, KeyPair, R2, R3) ->
901    %%
902    {ok, InitMsg} = gen_tcp:recv(Socket, 0),
903    IVSaltLen = IVLen - 6,
904    try
905        case init_msg(Params, Secret, KeyPair, R2, R3, InitMsg) of
906            {#params{iv = <<IV2ASalt:IVSaltLen/binary, IV2ANo:48>>} =
907                 SendParams,
908             RecvParams, SendStartMsg} ->
909                ok = gen_tcp:send(Socket, SendStartMsg),
910                {ok, RecvStartMsg} = gen_tcp:recv(Socket, 0),
911                #params{
912                   iv = <<IV2BSalt:IVSaltLen/binary, IV2BNo:48>>} =
913                    RecvParams_1 =
914                    start_msg(RecvParams, R2, R3, RecvStartMsg),
915                {SendParams#params{iv = {IV2ASalt, IV2ANo}},
916                 RecvParams_1#params{iv = {IV2BSalt, IV2BNo}}}
917        end
918    catch
919        error : Reason : Stacktrace->
920            _ = trace({Reason, Stacktrace}),
921            exit(connection_closed)
922    end.
923
924
925
926init_msg(
927  #params{
928     hmac_algorithm = HmacAlgo,
929     aead_cipher = AeadCipher,
930     key = KeyLen,
931     iv = IVLen,
932     tag_len = TagLen}, PubKeyA, Secret) ->
933    %%
934    RLen = KeyLen + IVLen,
935    <<R1A:RLen/binary, R2A:RLen/binary, R3A:RLen/binary>> =
936        crypto:strong_rand_bytes(3 * RLen),
937    {Key1A, IV1A} = hmac_key_iv(HmacAlgo, R1A, Secret, KeyLen, IVLen),
938    Plaintext = [R2A, R3A, PubKeyA],
939    MsgLen = byte_size(R1A) + TagLen + iolist_size(Plaintext),
940    AAD = [<<MsgLen:32>>, R1A],
941    {Ciphertext, Tag} =
942        crypto:block_encrypt(AeadCipher, Key1A, IV1A, {AAD, Plaintext, TagLen}),
943    Msg = [R1A, Tag, Ciphertext],
944    {R2A, R3A, Msg}.
945%%
946init_msg(
947  #params{
948     hmac_algorithm = HmacAlgo,
949     aead_cipher = AeadCipher,
950     key = KeyLen,
951     iv = IVLen,
952     tag_len = TagLen,
953     rekey_interval = RekeyInterval} = Params,
954  Secret, KeyPair, R2A, R3A, Msg) ->
955    %%
956    RLen = KeyLen + IVLen,
957    case Msg of
958        <<R1B:RLen/binary, Tag:TagLen/binary, Ciphertext/binary>> ->
959            {Key1B, IV1B} = hmac_key_iv(HmacAlgo, R1B, Secret, KeyLen, IVLen),
960            MsgLen = byte_size(Msg),
961            AAD = [<<MsgLen:32>>, R1B],
962            case
963                crypto:block_decrypt(
964                  AeadCipher, Key1B, IV1B, {AAD, Ciphertext, Tag})
965            of
966                <<R2B:RLen/binary, R3B:RLen/binary, PubKeyB/binary>> ->
967                    SharedSecret = compute_shared_secret(KeyPair, PubKeyB),
968                    %%
969                    {Key2A, IV2A} =
970                        hmac_key_iv(
971                          HmacAlgo, SharedSecret, [R2A, R3B], KeyLen, IVLen),
972                    SendParams =
973                        Params#params{
974                          rekey_key = PubKeyB,
975                          key = Key2A, iv = IV2A},
976                    %%
977                    StartCleartext = [R2B, R3B, <<RekeyInterval:32>>],
978                    StartMsgLen = TagLen + iolist_size(StartCleartext),
979                    StartAAD = <<StartMsgLen:32>>,
980                    {StartCiphertext, StartTag} =
981                        crypto:block_encrypt(
982                          AeadCipher, Key2A, IV2A,
983                          {StartAAD, StartCleartext, TagLen}),
984                    StartMsg = [StartTag, StartCiphertext],
985                    %%
986                    {Key2B, IV2B} =
987                        hmac_key_iv(
988                          HmacAlgo, SharedSecret, [R2B, R3A], KeyLen, IVLen),
989                    RecvParams =
990                        Params#params{
991                          rekey_key = KeyPair,
992                          key = Key2B, iv = IV2B},
993                    %%
994                    {SendParams, RecvParams, StartMsg}
995            end
996    end.
997
998start_msg(
999  #params{
1000     aead_cipher = AeadCipher,
1001     key = Key2B,
1002     iv = IV2B,
1003     tag_len = TagLen,
1004     rekey_interval = RekeyIntervalA} = RecvParams, R2A, R3A, Msg) ->
1005    %%
1006    case Msg of
1007        <<Tag:TagLen/binary, Ciphertext/binary>> ->
1008            KeyLen = byte_size(Key2B),
1009            IVLen = byte_size(IV2B),
1010            RLen = KeyLen + IVLen,
1011            MsgLen = byte_size(Msg),
1012            AAD = <<MsgLen:32>>,
1013            case
1014                crypto:block_decrypt(
1015                  AeadCipher, Key2B, IV2B, {AAD, Ciphertext, Tag})
1016            of
1017                <<R2A:RLen/binary, R3A:RLen/binary, RekeyIntervalB:32>>
1018                  when RekeyIntervalA =< (RekeyIntervalB bsl 2),
1019                       RekeyIntervalB =< (RekeyIntervalA bsl 2) ->
1020                    RecvParams#params{rekey_interval = RekeyIntervalB}
1021            end
1022    end.
1023
1024hmac_key_iv(HmacAlgo, MacKey, Data, KeyLen, IVLen) ->
1025    <<Key:KeyLen/binary, IV:IVLen/binary>> =
1026        crypto:hmac(HmacAlgo, MacKey, Data, KeyLen + IVLen),
1027    {Key, IV}.
1028
1029%% -------------------------------------------------------------------------
1030%% net_kernel distribution handshake in progress
1031%%
1032
1033handshake(
1034  SendParams, SendSeq,
1035  #params{socket = Socket} = RecvParams, RecvSeq, Controller) ->
1036    receive
1037        {?MODULE, From, {controller, Controller_1, Parent}} ->
1038            Result = link(Controller_1),
1039            true = unlink(Parent),
1040            reply(From, Result),
1041            handshake(SendParams, SendSeq, RecvParams, RecvSeq, Controller_1);
1042        {?MODULE, From, {handshake_complete, DistHandle}} ->
1043            InputHandler =
1044                monitor_dist_proc(
1045                  spawn_opt(
1046                    fun () ->
1047                            link(Controller),
1048                            receive
1049                                DistHandle ->
1050                                    ok =
1051                                        inet:setopts(
1052                                          Socket,
1053                                          [{active, ?TCP_ACTIVE},
1054                                           nodelay()]),
1055                                    input_handler(
1056                                      RecvParams#params{
1057                                        dist_handle = DistHandle},
1058                                      RecvSeq, empty_q(), infinity)
1059                            end
1060                    end,
1061                    [link,
1062                     {priority, normal},
1063                     {message_queue_data, off_heap},
1064                     {fullsweep_after, 0}])),
1065            _ = monitor(process, InputHandler), % For the benchmark test
1066            ok = gen_tcp:controlling_process(Socket, InputHandler),
1067            ok = erlang:dist_ctrl_input_handler(DistHandle, InputHandler),
1068            InputHandler ! DistHandle,
1069            crypto:rand_seed_alg(crypto_cache),
1070            reply(From, ok),
1071            process_flag(priority, normal),
1072            erlang:dist_ctrl_get_data_notification(DistHandle),
1073            output_handler(
1074              SendParams#params{dist_handle = DistHandle}, SendSeq);
1075        %%
1076        {?MODULE, From, {send, Data}} ->
1077            case
1078                encrypt_and_send_chunk(
1079                  SendParams, SendSeq, [?HANDSHAKE_CHUNK, Data])
1080            of
1081                {SendParams_1, SendSeq_1, ok} ->
1082                    reply(From, ok),
1083                    handshake(
1084                      SendParams_1, SendSeq_1, RecvParams, RecvSeq,
1085                      Controller);
1086                {_, _, Error} ->
1087                    reply(From, {error, closed}),
1088                    death_row({send, trace(Error)})
1089            end;
1090        {?MODULE, From, recv} ->
1091            case recv_and_decrypt_chunk(RecvParams, RecvSeq) of
1092                {RecvParams_1, RecvSeq_1, {ok, _} = Reply} ->
1093                    reply(From, Reply),
1094                    handshake(
1095                      SendParams, SendSeq, RecvParams_1, RecvSeq_1,
1096                      Controller);
1097                {_, _, Error} ->
1098                    reply(From, Error),
1099                    death_row({recv, trace(Error)})
1100            end;
1101        {?MODULE, From, peername} ->
1102            reply(From, inet:peername(Socket)),
1103            handshake(SendParams, SendSeq, RecvParams, RecvSeq, Controller);
1104        %%
1105        _Alien ->
1106            handshake(SendParams, SendSeq, RecvParams, RecvSeq, Controller)
1107    end.
1108
1109recv_and_decrypt_chunk(#params{socket = Socket} = RecvParams, RecvSeq) ->
1110    case gen_tcp:recv(Socket, 0) of
1111        {ok, Chunk} ->
1112            case decrypt_chunk(RecvParams, RecvSeq, Chunk) of
1113                <<?HANDSHAKE_CHUNK, Cleartext/binary>> ->
1114                    {RecvParams, RecvSeq + 1, {ok, Cleartext}};
1115                OtherChunk when is_binary(OtherChunk) ->
1116                    {RecvParams, RecvSeq + 1, {error, decrypt_error}};
1117                #params{} = RecvParams_1 ->
1118                    recv_and_decrypt_chunk(RecvParams_1, 0);
1119                error ->
1120                    {RecvParams, RecvSeq, {error, decrypt_error}}
1121            end;
1122        Error ->
1123            {RecvParams, RecvSeq, Error}
1124    end.
1125
1126%% -------------------------------------------------------------------------
1127%% Output handler process
1128%%
1129%% The game here is to flush all dist_data and dist_tick messages,
1130%% prioritize dist_data over dist_tick, and to not use selective receive
1131
1132output_handler(Params, Seq) ->
1133    receive
1134        Msg ->
1135            case Msg of
1136                dist_data ->
1137                    output_handler_data(Params, Seq);
1138                dist_tick ->
1139                    output_handler_tick(Params, Seq);
1140                Other ->
1141                    %% Ignore
1142                    _ = trace(Other),
1143                    output_handler(Params, Seq)
1144            end
1145    end.
1146
1147output_handler_data(Params, Seq) ->
1148    receive
1149        Msg ->
1150            case Msg of
1151                dist_data ->
1152                    output_handler_data(Params, Seq);
1153                dist_tick ->
1154                    output_handler_data(Params, Seq);
1155                Other ->
1156                    %% Ignore
1157                    _ = trace(Other),
1158                    output_handler_data(Params, Seq)
1159            end
1160    after 0 ->
1161            DistHandle = Params#params.dist_handle,
1162            Q = get_data(DistHandle, empty_q()),
1163            {Params_1, Seq_1} = output_handler_send(Params, Seq, Q),
1164            erlang:dist_ctrl_get_data_notification(DistHandle),
1165            output_handler(Params_1, Seq_1)
1166    end.
1167
1168output_handler_tick(Params, Seq) ->
1169    receive
1170        Msg ->
1171            case Msg of
1172                dist_data ->
1173                    output_handler_data(Params, Seq);
1174                dist_tick ->
1175                    output_handler_tick(Params, Seq);
1176                Other ->
1177                    %% Ignore
1178                    _ = trace(Other),
1179                    output_handler_tick(Params, Seq)
1180            end
1181    after 0 ->
1182            TickSize = 7 + rand:uniform(56),
1183            TickData = binary:copy(<<0>>, TickSize),
1184            case
1185                encrypt_and_send_chunk(Params, Seq, [?TICK_CHUNK, TickData])
1186            of
1187                {Params_1, Seq_1, ok} ->
1188                    output_handler(Params_1, Seq_1);
1189                {_, _, Error} ->
1190                    _ = trace(Error),
1191                    death_row()
1192            end
1193    end.
1194
1195output_handler_send(Params, Seq, {_, Size, _} = Q) ->
1196    if
1197        ?CHUNK_SIZE < Size ->
1198            output_handler_send(Params, Seq, Q, ?CHUNK_SIZE);
1199        true ->
1200            case get_data(Params#params.dist_handle, Q) of
1201                {_, 0, _} ->
1202                    {Params, Seq};
1203                {_, Size, _} = Q_1 -> % Got no more
1204                    output_handler_send(Params, Seq, Q_1, Size);
1205                Q_1 ->
1206                    output_handler_send(Params, Seq, Q_1)
1207            end
1208    end.
1209
1210output_handler_send(Params, Seq, Q, Size) ->
1211    {Cleartext, Q_1} = deq_iovec(Size, Q),
1212    case
1213        encrypt_and_send_chunk(Params, Seq, [?DATA_CHUNK, Cleartext])
1214    of
1215        {Params_1, Seq_1, ok} ->
1216            output_handler_send(Params_1, Seq_1, Q_1);
1217        {_, _, Error} ->
1218            _ = trace(Error),
1219            death_row()
1220    end.
1221
1222%% -------------------------------------------------------------------------
1223%% Input handler process
1224%%
1225%% Here is T = 0|infinity to steer if we should try to receive
1226%% more data or not; start with infinity, and when we get some
1227%% data try with 0 to see if more is waiting
1228
1229input_handler(#params{socket = Socket} = Params, Seq, Q, T) ->
1230    receive
1231        Msg ->
1232            case Msg of
1233                {tcp_passive, Socket} ->
1234                    ok = inet:setopts(Socket, [{active, ?TCP_ACTIVE}]),
1235                    Q_1 =
1236                        case T of
1237                            0 ->
1238                                deliver_data(Params#params.dist_handle, Q);
1239                            infinity ->
1240                                Q
1241                        end,
1242                    input_handler(Params, Seq, Q_1, infinity);
1243                {tcp, Socket, Chunk} ->
1244                    input_chunk(Params, Seq, Q, T, Chunk);
1245                {tcp_closed, Socket} ->
1246                    exit(connection_closed);
1247                Other ->
1248                    %% Ignore...
1249                    _ = trace(Other),
1250                    input_handler(Params, Seq, Q, T)
1251            end
1252    after T ->
1253            Q_1 = deliver_data(Params#params.dist_handle, Q),
1254            input_handler(Params, Seq, Q_1, infinity)
1255    end.
1256
1257input_chunk(Params, Seq, Q, T, Chunk) ->
1258    case decrypt_chunk(Params, Seq, Chunk) of
1259        <<?DATA_CHUNK, Cleartext/binary>> ->
1260            input_handler(Params, Seq + 1, enq_binary(Cleartext, Q), 0);
1261        <<?TICK_CHUNK, _/binary>> ->
1262            input_handler(Params, Seq + 1, Q, T);
1263        OtherChunk when is_binary(OtherChunk) ->
1264            _ = trace(invalid_chunk),
1265            exit(connection_closed);
1266        #params{} = Params_1 ->
1267            input_handler(Params_1, 0, Q, T);
1268        error ->
1269            _ = trace(decrypt_error),
1270            exit(connection_closed)
1271    end.
1272
1273%% -------------------------------------------------------------------------
1274%% erlang:dist_ctrl_* helpers
1275
1276%% Get data for sending from the VM and place it in a queue
1277%%
1278get_data(DistHandle, {Front, Size, Rear}) ->
1279    get_data(DistHandle, Front, Size, Rear).
1280%%
1281get_data(DistHandle, Front, Size, Rear) ->
1282    case erlang:dist_ctrl_get_data(DistHandle) of
1283        none ->
1284            {Front, Size, Rear};
1285        Bin when is_binary(Bin)  ->
1286            Len = byte_size(Bin),
1287            get_data(
1288              DistHandle, Front, Size + 4 + Len,
1289              [Bin, <<Len:32>>|Rear]);
1290        [Bin1, Bin2] ->
1291            Len = byte_size(Bin1) + byte_size(Bin2),
1292            get_data(
1293              DistHandle, Front, Size + 4 + Len,
1294              [Bin2, Bin1, <<Len:32>>|Rear]);
1295        Iovec ->
1296            Len = iolist_size(Iovec),
1297            get_data(
1298              DistHandle, Front, Size + 4 + Len,
1299              lists:reverse(Iovec, [<<Len:32>>|Rear]))
1300    end.
1301
1302%% De-packet and deliver received data to the VM from a queue
1303%%
1304deliver_data(DistHandle, Q) ->
1305    case Q of
1306        {[], Size, []} ->
1307            Size = 0, % Assert
1308            Q;
1309        {[], Size, Rear} ->
1310            [Bin|Front] = lists:reverse(Rear),
1311            deliver_data(DistHandle, Front, Size, [], Bin);
1312        {[Bin|Front], Size, Rear} ->
1313            deliver_data(DistHandle, Front, Size, Rear, Bin)
1314    end.
1315%%
1316deliver_data(DistHandle, Front, Size, Rear, Bin) ->
1317    case Bin of
1318        <<DataSizeA:32, DataA:DataSizeA/binary,
1319          DataSizeB:32, DataB:DataSizeB/binary, Rest/binary>> ->
1320            erlang:dist_ctrl_put_data(DistHandle, DataA),
1321            erlang:dist_ctrl_put_data(DistHandle, DataB),
1322            deliver_data(
1323              DistHandle,
1324              Front, Size - (4 + DataSizeA + 4 + DataSizeB), Rear,
1325              Rest);
1326        <<DataSize:32, Data:DataSize/binary, Rest/binary>> ->
1327            erlang:dist_ctrl_put_data(DistHandle, Data),
1328            deliver_data(DistHandle, Front, Size - (4 + DataSize), Rear, Rest);
1329        <<DataSize:32, FirstData/binary>> ->
1330            TotalSize = 4 + DataSize,
1331            if
1332                TotalSize =< Size ->
1333                    BinSize = byte_size(Bin),
1334                    {MoreData, Q} =
1335                        deq_iovec(
1336                          TotalSize - BinSize,
1337                          Front, Size - BinSize, Rear),
1338                    erlang:dist_ctrl_put_data(DistHandle, [FirstData|MoreData]),
1339                    deliver_data(DistHandle, Q);
1340                true -> % Incomplete data
1341                    {[Bin|Front], Size, Rear}
1342            end;
1343        <<_/binary>> ->
1344            BinSize = byte_size(Bin),
1345            if
1346                4 =< Size -> % Fragmented header - extract a header bin
1347                    {RestHeader, {Front_1, _Size_1, Rear_1}} =
1348                        deq_iovec(4 - BinSize, Front, Size - BinSize, Rear),
1349                    Header = iolist_to_binary([Bin|RestHeader]),
1350                    deliver_data(DistHandle, Front_1, Size, Rear_1, Header);
1351                true -> % Incomplete header
1352                    {[Bin|Front], Size, Rear}
1353            end
1354    end.
1355
1356%% -------------------------------------------------------------------------
1357%% Encryption and decryption helpers
1358
1359encrypt_and_send_chunk(
1360  #params{
1361     socket = Socket,
1362     rekey_interval = Seq,
1363     rekey_key = PubKeyB,
1364     key = Key,
1365     iv = {IVSalt, IVNo},
1366     hmac_algorithm = HmacAlgo} = Params,
1367  Seq, Cleartext) ->
1368    %%
1369    KeyLen = byte_size(Key),
1370    IVSaltLen = byte_size(IVSalt),
1371    #key_pair{public = PubKeyA} = KeyPair = get_new_key_pair(),
1372    case
1373        gen_tcp:send(
1374          Socket, encrypt_chunk(Params, Seq, [?REKEY_CHUNK, PubKeyA]))
1375    of
1376        ok ->
1377            SharedSecret = compute_shared_secret(KeyPair, PubKeyB),
1378            IV = <<(IVNo + Seq):48>>,
1379            {Key_1, <<IVSalt_1:IVSaltLen/binary, IVNo_1:48>>} =
1380                hmac_key_iv(
1381                  HmacAlgo, SharedSecret, [Key, IVSalt, IV],
1382                  KeyLen, IVSaltLen + 6),
1383            Params_1 = Params#params{key = Key_1, iv = {IVSalt_1, IVNo_1}},
1384            Result =
1385                gen_tcp:send(Socket, encrypt_chunk(Params_1, 0, Cleartext)),
1386            {Params_1, 1, Result};
1387        SendError ->
1388            {Params, Seq + 1, SendError}
1389    end;
1390encrypt_and_send_chunk(#params{socket = Socket} = Params, Seq, Cleartext) ->
1391    Result = gen_tcp:send(Socket, encrypt_chunk(Params, Seq, Cleartext)),
1392    {Params, Seq + 1, Result}.
1393
1394encrypt_chunk(
1395  #params{
1396     aead_cipher = AeadCipher,
1397     iv = {IVSalt, IVNo}, key = Key, tag_len = TagLen}, Seq, Cleartext) ->
1398    %%
1399    ChunkLen = iolist_size(Cleartext) + TagLen,
1400    AAD = <<Seq:32, ChunkLen:32>>,
1401    IVBin = <<IVSalt/binary, (IVNo + Seq):48>>,
1402    {Ciphertext, CipherTag} =
1403        crypto:block_encrypt(AeadCipher, Key, IVBin, {AAD, Cleartext, TagLen}),
1404    Chunk = [Ciphertext,CipherTag],
1405    Chunk.
1406
1407decrypt_chunk(
1408  #params{
1409     aead_cipher = AeadCipher,
1410     iv = {IVSalt, IVNo}, key = Key, tag_len = TagLen} = Params, Seq, Chunk) ->
1411    %%
1412    ChunkLen = byte_size(Chunk),
1413    if
1414        ChunkLen < TagLen ->
1415            error;
1416        true ->
1417            AAD = <<Seq:32, ChunkLen:32>>,
1418            IVBin = <<IVSalt/binary, (IVNo + Seq):48>>,
1419            CiphertextLen = ChunkLen - TagLen,
1420            case Chunk of
1421                <<Ciphertext:CiphertextLen/binary,
1422                  CipherTag:TagLen/binary>> ->
1423                    block_decrypt(
1424                      Params, Seq, AeadCipher, Key, IVBin,
1425                      {AAD, Ciphertext, CipherTag});
1426                _ ->
1427                    error
1428            end
1429    end.
1430
1431block_decrypt(
1432  #params{
1433     rekey_key = #key_pair{public = PubKeyA} = KeyPair,
1434     rekey_interval = RekeyInterval} = Params,
1435  Seq, AeadCipher, Key, IV, Data) ->
1436    %%
1437    case crypto:block_decrypt(AeadCipher, Key, IV, Data) of
1438        <<?REKEY_CHUNK, Rest/binary>> ->
1439            PubKeyLen = byte_size(PubKeyA),
1440            case Rest of
1441                <<PubKeyB:PubKeyLen/binary>> ->
1442                    SharedSecret = compute_shared_secret(KeyPair, PubKeyB),
1443                    KeyLen = byte_size(Key),
1444                    IVLen = byte_size(IV),
1445                    IVSaltLen = IVLen - 6,
1446                    {Key_1, <<IVSalt:IVSaltLen/binary, IVNo:48>>} =
1447                        hmac_key_iv(
1448                          Params#params.hmac_algorithm,
1449                          SharedSecret, [Key, IV], KeyLen, IVLen),
1450                    Params#params{iv = {IVSalt, IVNo}, key = Key_1};
1451                _ ->
1452                    error
1453            end;
1454        Chunk when is_binary(Chunk) ->
1455            case Seq of
1456                RekeyInterval ->
1457                    %% This was one chunk too many without rekeying
1458                    error;
1459                _ ->
1460                    Chunk
1461            end;
1462        error ->
1463            error
1464    end.
1465
1466%% -------------------------------------------------------------------------
1467%% Queue of binaries i.e an iovec queue
1468
1469empty_q() ->
1470    {[], 0, []}.
1471
1472enq_binary(Bin, {Front, Size, Rear}) ->
1473    {Front, Size + byte_size(Bin), [Bin|Rear]}.
1474
1475deq_iovec(GetSize, {Front, Size, Rear}) when GetSize =< Size ->
1476    deq_iovec(GetSize, Front, Size, Rear, []).
1477%%
1478deq_iovec(GetSize, Front, Size, Rear) ->
1479    deq_iovec(GetSize, Front, Size, Rear, []).
1480%%
1481deq_iovec(GetSize, [], Size, Rear, Acc) ->
1482    deq_iovec(GetSize, lists:reverse(Rear), Size, [], Acc);
1483deq_iovec(GetSize, [Bin|Front], Size, Rear, Acc) ->
1484    BinSize = byte_size(Bin),
1485    if
1486        BinSize < GetSize ->
1487            deq_iovec(
1488              GetSize - BinSize, Front, Size - BinSize, Rear, [Bin|Acc]);
1489        GetSize < BinSize ->
1490            {Bin1,Bin2} = erlang:split_binary(Bin, GetSize),
1491            {lists:reverse(Acc, [Bin1]), {[Bin2|Front], Size - GetSize, Rear}};
1492        true ->
1493            {lists:reverse(Acc, [Bin]), {Front, Size - BinSize, Rear}}
1494    end.
1495
1496%% -------------------------------------------------------------------------
1497
1498death_row() -> death_row(connection_closed).
1499%%
1500death_row(normal) -> death_row(connection_closed);
1501death_row(Reason) -> receive after 5000 -> exit(Reason) end.
1502
1503%% -------------------------------------------------------------------------
1504
1505%% Trace point
1506trace(Term) -> Term.
1507
1508%% Keep an eye on this Pid (debug)
1509-ifndef(undefined).
1510monitor_dist_proc(Pid) ->
1511    Pid.
1512-else.
1513monitor_dist_proc(Pid) ->
1514    spawn(
1515      fun () ->
1516              MRef = erlang:monitor(process, Pid),
1517              receive
1518                  {'DOWN', MRef, _, _, normal} ->
1519                      error_logger:error_report(
1520                        [dist_proc_died,
1521                         {reason, normal},
1522                         {pid, Pid}]);
1523                  {'DOWN', MRef, _, _, Reason} ->
1524                      error_logger:info_report(
1525                        [dist_proc_died,
1526                         {reason, Reason},
1527                         {pid, Pid}])
1528              end
1529      end),
1530    Pid.
1531-endif.
1532
1533dbg() ->
1534    dbg:stop(),
1535    dbg:tracer(),
1536    dbg:p(all, c),
1537    dbg:tpl(?MODULE, trace, cx),
1538    dbg:tpl(erlang, dist_ctrl_get_data_notification, cx),
1539    dbg:tpl(erlang, dist_ctrl_get_data, cx),
1540    dbg:tpl(erlang, dist_ctrl_put_data, cx),
1541    ok.
1542