1%% 2%% %CopyrightBegin% 3%% 4%% Copyright Ericsson AB 2019. All Rights Reserved. 5%% 6%% Licensed under the Apache License, Version 2.0 (the "License"); 7%% you may not use this file except in compliance with the License. 8%% You may obtain a copy of the License at 9%% 10%% http://www.apache.org/licenses/LICENSE-2.0 11%% 12%% Unless required by applicable law or agreed to in writing, software 13%% distributed under the License is distributed on an "AS IS" BASIS, 14%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15%% See the License for the specific language governing permissions and 16%% limitations under the License. 17%% 18%% %CopyrightEnd% 19%% 20%% ------------------------------------------------------------------------- 21%% 22%% Module for encrypted Erlang protocol - a minimal encrypted 23%% distribution protocol based on only a shared secret 24%% and the crypto application 25%% 26-module(inet_crypto_dist). 27-define(DIST_NAME, inet_crypto). 28-define(DIST_PROTO, crypto). 29-define(DRIVER, inet_tcp). 30-define(FAMILY, inet). 31 32-export([listen/1, accept/1, accept_connection/5, 33 setup/5, close/1, select/1, is_node_name/1]). 34 35%% Generalized dist API, for sibling IPv6 module inet6_crypto_dist 36-export([gen_listen/2, gen_accept/2, gen_accept_connection/6, 37 gen_setup/6, gen_close/2, gen_select/2]). 38 39-export([nodelay/0]). 40 41%% Debug 42%%%-compile(export_all). 43-export([dbg/0, test_server/0, test_client/1]). 44 45-include_lib("kernel/include/net_address.hrl"). 46-include_lib("kernel/include/dist.hrl"). 47-include_lib("kernel/include/dist_util.hrl"). 48 49-define(PACKET_SIZE, 65536). 50-define(BUFFER_SIZE, (?PACKET_SIZE bsl 4)). 51 52%% ------------------------------------------------------------------------- 53 54-record(params, 55 {socket, 56 dist_handle, 57 hmac_algorithm = sha256, 58 aead_cipher = aes_gcm, 59 rekey_key, 60 iv = 12, 61 key = 16, 62 tag_len = 16, 63 rekey_interval = 262144 64 }). 65 66params(Socket) -> 67 #params{socket = Socket}. 68 69 70-record(key_pair, 71 {type = ecdh, 72 %% The curve choice greatly affects setup time, 73 %% we really want an Edwards curve but that would 74 %% require a very new openssl version. 75 %% Twisted brainpool curves (*t1) are faster than 76 %% non-twisted (*r1), 256 is much faster than 384, 77 %% and so on... 78%%% params = brainpoolP384t1, 79 params = brainpoolP256t1, 80 public, 81 private}). 82 83-define(KEY_PAIR_LIFE_TIME, 3600000). % 1 hour 84-define(KEY_PAIR_LIFE_COUNT, 256). % Number of connection setups 85 86 87%% ------------------------------------------------------------------------- 88%% Keep the node's public/private key pair in the process state 89%% of a key pair server linked to the acceptor process. 90%% Create the key pair the first time it is needed 91%% so crypto gets time to start first. 92%% 93 94start_key_pair_server() -> 95 monitor_dist_proc( 96 spawn_link( 97 fun () -> 98 register(?MODULE, self()), 99 key_pair_server() 100 end)). 101 102key_pair_server() -> 103 key_pair_server(undefined, undefined, undefined). 104%% 105key_pair_server(KeyPair) -> 106 key_pair_server( 107 KeyPair, 108 erlang:start_timer(?KEY_PAIR_LIFE_TIME, self(), discard), 109 ?KEY_PAIR_LIFE_COUNT). 110%% 111key_pair_server(_KeyPair, Timer, 0) -> 112 cancel_timer(Timer), 113 key_pair_server(); 114key_pair_server(KeyPair, Timer, Count) -> 115 receive 116 {Pid, Tag, get_key_pair} -> 117 case KeyPair of 118 undefined -> 119 KeyPair_1 = generate_key_pair(), 120 Pid ! {Tag, KeyPair_1}, 121 key_pair_server(KeyPair_1); 122 #key_pair{} -> 123 Pid ! {Tag, KeyPair}, 124 key_pair_server(KeyPair, Timer, Count - 1) 125 end; 126 {Pid, Tag, get_new_key_pair} -> 127 cancel_timer(Timer), 128 KeyPair_1 = generate_key_pair(), 129 Pid ! {Tag, KeyPair_1}, 130 key_pair_server(KeyPair_1); 131 {timeout, Timer, discard} when is_reference(Timer) -> 132 key_pair_server() 133 end. 134 135generate_key_pair() -> 136 #key_pair{type = Type, params = Params} = #key_pair{}, 137 {Public, Private} = 138 crypto:generate_key(Type, Params), 139 #key_pair{public = Public, private = Private}. 140 141cancel_timer(undefined) -> 142 ok; 143cancel_timer(Timer) -> 144 case erlang:cancel_timer(Timer) of 145 false -> 146 receive 147 {timeout, Timer, _} -> ok 148 end; 149 _RemainingTime -> 150 ok 151 end. 152 153get_key_pair() -> 154 call_key_pair_server(get_key_pair). 155 156get_new_key_pair() -> 157 call_key_pair_server(get_new_key_pair). 158 159call_key_pair_server(Request) -> 160 Pid = whereis(?MODULE), 161 Ref = erlang:monitor(process, Pid), 162 Pid ! {self(), Ref, Request}, 163 receive 164 {Ref, Reply} -> 165 erlang:demonitor(Ref, [flush]), 166 Reply; 167 {'DOWN', Ref, process, Pid, Reason} -> 168 error(Reason) 169 end. 170 171compute_shared_secret( 172 #key_pair{ 173 type = PublicKeyType, 174 params = PublicKeyParams, 175 private = PrivKey}, PubKey) -> 176 %% 177 crypto:compute_key(PublicKeyType, PubKey, PrivKey, PublicKeyParams). 178 179%% ------------------------------------------------------------------------- 180%% Erlang distribution plugin structure explained to myself 181%% ------- 182%% These are the processes involved in the distribution: 183%% * net_kernel 184%% * The Acceptor 185%% * The Controller | Handshaker | Ticker 186%% * The DistCtrl process that may be split into: 187%% + The Output controller 188%% + The Input controller 189%% For the regular inet_tcp_dist distribution module, DistCtrl 190%% is not one or two processes, but one port - a gen_tcp socket 191%% 192%% When the VM is started with the argument "-proto_dist inet_crypto" 193%% net_kernel registers the module inet_crypto_dist acli,oams distribution 194%% module. net_kernel calls listen/1 to create a listen socket 195%% and then accept/1 with the listen socket as argument to spawn 196%% the Acceptor process, which is linked to net_kernel. Apparently 197%% the listen socket is owned by net_kernel - I wonder if it could 198%% be owned by the Acceptor process instead... 199%% 200%% The Acceptor process calls blocking accept on the listen socket 201%% and when an incoming socket is returned it spawns the DistCtrl 202%% process a linked to the Acceptor. The ownership of the accepted 203%% socket is transferred to the DistCtrl process. 204%% A message is sent to net_kernel to inform it that an incoming 205%% connection has appeared and the Acceptor awaits a reply from net_kernel. 206%% 207%% net_kernel then calls accept_connection/5 to spawn the Controller | 208%% Handshaker | Ticker process that is linked to net_kernel. 209%% The Controller then awaits a message from the Acceptor process. 210%% 211%% When net_kernel has spawned the Controller it replies with a message 212%% to the Acceptor that then calls DistCtrl to changes its links 213%% so DistCtrl ends up linked to the Controller and not to the Acceptor. 214%% The Acceptor then sends a message to the Controller. The Controller 215%% then changes role into the Handshaker creates a #hs_data{} record 216%% and calls dist_util:handshake_other_started/1. After this 217%% the Acceptor goes back into a blocking accept on the listen socket. 218%% 219%% For the regular distribution inet_tcp_dist DistCtrl is a gen_tcp socket 220%% and when it is a process it also acts as a socket. The #hs_data{} 221%% record used by dist_util presents a set of funs that are used 222%% by dist_util to perform the distribution handshake. These funs 223%% make sure to transfer the handshake messages through the DistCtrl 224%% "socket". 225%% 226%% When the handshake is finished a fun for this purpose in #hs_data{} 227%% is called, which tells DistCtrl that it does not need to be prepared 228%% for any more #hs_data{} handshake calls. The DistCtrl process in this 229%% module then spawns the Input controller process that gets ownership 230%% of the connection's gen_tcp socket and changes into {active, N} mode 231%% so now it gets all incoming traffic and delivers that to the VM. 232%% The original DistCtrl process changes role into the Output controller 233%% process and starts asking the VM for outbound messages and transfers 234%% them on the connection socket. 235%% 236%% The Handshaker now changes into the Ticker role, and uses only two 237%% functions in the #hs_data{} record; one to get socket statistics 238%% and one to send a tick. None of these may block for any reason 239%% in particular not for a congested socket since that would destroy 240%% connection supervision. 241%% 242%% 243%% For an connection net_kernel calls setup/5 which spawns the 244%% Controller process as linked to net_kernel. This Controller process 245%% connects to the other node's listen socket and when that is succesful 246%% spawns the DistCtrl process as linked to the controller and transfers 247%% socket ownership to it. 248%% 249%% Then the Controller creates the #hs_data{} record and calls 250%% dist_util:handshake_we_started/1 which changes the process role 251%% into Handshaker. 252%% 253%% When the distribution handshake is finished the procedure is just 254%% as for an incoming connection above. 255%% 256%% 257%% To sum it up. 258%% 259%% There is an Acceptor process that is linked to net_kernel and 260%% informs it when new connections arrive. 261%% 262%% net_kernel spawns Controllers for incoming and for outgoing connections. 263%% these Controllers use the DistCtrl processes to do distribution 264%% handshake and after that becomes Tickers that supervise the connection. 265%% 266%% The Controller | Handshaker | Ticker is linked to net_kernel, and to 267%% DistCtrl, one or both. If any of these connection processes would die 268%% all others should be killed by the links. Therefore none of them may 269%% terminate with reason 'normal'. 270%% ------------------------------------------------------------------------- 271 272-compile({inline, [socket_options/0]}). 273socket_options() -> 274 [binary, {active, false}, {packet, 2}, {nodelay, true}, 275 {sndbuf, ?BUFFER_SIZE}, {recbuf, ?BUFFER_SIZE}, 276 {buffer, ?BUFFER_SIZE}]. 277 278%% ------------------------------------------------------------------------- 279%% select/1 is called by net_kernel to ask if this distribution protocol 280%% is willing to handle Node 281%% 282 283select(Node) -> 284 gen_select(Node, ?DRIVER). 285 286gen_select(Node, Driver) -> 287 case dist_util:split_node(Node) of 288 {node, _, Host} -> 289 case Driver:getaddr(Host) of 290 {ok, _} -> true; 291 _ -> false 292 end; 293 _ -> 294 false 295 end. 296 297%% ------------------------------------------------------------------------- 298 299is_node_name(Node) -> 300 dist_util:is_node_name(Node). 301 302%% ------------------------------------------------------------------------- 303%% Called by net_kernel to create a listen socket for this 304%% distribution protocol. This listen socket is used by 305%% the Acceptor process. 306%% 307 308listen(Name) -> 309 gen_listen(Name, ?DRIVER). 310 311gen_listen(Name, Driver) -> 312 case inet_tcp_dist:gen_listen(Driver, Name) of 313 {ok, {Socket, Address, Creation}} -> 314 inet:setopts(Socket, socket_options()), 315 {ok, 316 {Socket, Address#net_address{protocol = ?DIST_PROTO}, Creation}}; 317 Other -> 318 Other 319 end. 320 321%% ------------------------------------------------------------------------- 322%% Called by net_kernel to spawn the Acceptor process that awaits 323%% new connection in a blocking accept and informs net_kernel 324%% when a new connection has appeared, and starts the DistCtrl 325%% "socket" process for the connection. 326%% 327 328accept(Listen) -> 329 gen_accept(Listen, ?DRIVER). 330 331gen_accept(Listen, Driver) -> 332 NetKernel = self(), 333 %% 334 %% Spawn Acceptor process 335 %% 336 monitor_dist_proc( 337 spawn_opt( 338 fun () -> 339 start_key_pair_server(), 340 accept_loop(Listen, Driver, NetKernel) 341 end, 342 [link, {priority, max}])). 343 344accept_loop(Listen, Driver, NetKernel) -> 345 case Driver:accept(trace(Listen)) of 346 {ok, Socket} -> 347 wait_for_code_server(), 348 Timeout = net_kernel:connecttime(), 349 DistCtrl = start_dist_ctrl(trace(Socket), Timeout), 350 %% DistCtrl is a "socket" 351 NetKernel ! 352 trace({accept, 353 self(), DistCtrl, Driver:family(), ?DIST_PROTO}), 354 receive 355 {NetKernel, controller, Controller} -> 356 call_dist_ctrl(DistCtrl, {controller, Controller, self()}), 357 Controller ! {self(), controller, Socket}; 358 {NetKernel, unsupported_protocol} -> 359 exit(unsupported_protocol) 360 end, 361 accept_loop(Listen, Driver, NetKernel); 362 AcceptError -> 363 exit({accept, AcceptError}) 364 end. 365 366wait_for_code_server() -> 367 %% This is an ugly hack. Starting encryption on a connection 368 %% requires the crypto module to be loaded. Loading the crypto 369 %% module triggers its on_load function, which calls 370 %% code:priv_dir/1 to find the directory where its NIF library is. 371 %% However, distribution is started earlier than the code server, 372 %% so the code server is not necessarily started yet, and 373 %% code:priv_dir/1 might fail because of that, if we receive 374 %% an incoming connection on the distribution port early enough. 375 %% 376 %% If the on_load function of a module fails, the module is 377 %% unloaded, and the function call that triggered loading it fails 378 %% with 'undef', which is rather confusing. 379 %% 380 %% So let's avoid that by waiting for the code server to start. 381 %% 382 case whereis(code_server) of 383 undefined -> 384 timer:sleep(10), 385 wait_for_code_server(); 386 Pid when is_pid(Pid) -> 387 ok 388 end. 389 390%% ------------------------------------------------------------------------- 391%% Called by net_kernel when a new connection has appeared, to spawn 392%% a Controller process that performs the handshake with the new node, 393%% and then becomes the Ticker connection supervisor. 394%% ------------------------------------------------------------------------- 395 396accept_connection(Acceptor, DistCtrl, MyNode, Allowed, SetupTime) -> 397 gen_accept_connection( 398 Acceptor, DistCtrl, MyNode, Allowed, SetupTime, ?DRIVER). 399 400gen_accept_connection( 401 Acceptor, DistCtrl, MyNode, Allowed, SetupTime, Driver) -> 402 NetKernel = self(), 403 %% 404 %% Spawn Controller/handshaker/ticker process 405 %% 406 monitor_dist_proc( 407 spawn_opt( 408 fun() -> 409 do_accept( 410 Acceptor, DistCtrl, 411 trace(MyNode), Allowed, SetupTime, Driver, NetKernel) 412 end, 413 [link, {priority, max}])). 414 415do_accept( 416 Acceptor, DistCtrl, MyNode, Allowed, SetupTime, Driver, NetKernel) -> 417 %% 418 receive 419 {Acceptor, controller, Socket} -> 420 Timer = dist_util:start_timer(SetupTime), 421 HSData = 422 hs_data_common( 423 NetKernel, MyNode, DistCtrl, Timer, 424 Socket, Driver:family()), 425 HSData_1 = 426 HSData#hs_data{ 427 this_node = MyNode, 428 this_flags = 0, 429 allowed = Allowed}, 430 dist_util:handshake_other_started(trace(HSData_1)) 431 end. 432 433%% ------------------------------------------------------------------------- 434%% Called by net_kernel to spawn a Controller process that sets up 435%% a new connection to another Erlang node, performs the handshake 436%% with the other it, and then becomes the Ticker process 437%% that supervises the connection. 438%% ------------------------------------------------------------------------- 439 440setup(Node, Type, MyNode, LongOrShortNames, SetupTime) -> 441 gen_setup(Node, Type, MyNode, LongOrShortNames, SetupTime, ?DRIVER). 442 443gen_setup(Node, Type, MyNode, LongOrShortNames, SetupTime, Driver) -> 444 NetKernel = self(), 445 %% 446 %% Spawn Controller/handshaker/ticker process 447 %% 448 monitor_dist_proc( 449 spawn_opt( 450 setup_fun( 451 Node, Type, MyNode, LongOrShortNames, SetupTime, Driver, NetKernel), 452 [link, {priority, max}])). 453 454-spec setup_fun(_,_,_,_,_,_,_) -> fun(() -> no_return()). 455setup_fun( 456 Node, Type, MyNode, LongOrShortNames, SetupTime, Driver, NetKernel) -> 457 %% 458 fun() -> 459 do_setup( 460 trace(Node), Type, MyNode, LongOrShortNames, SetupTime, 461 Driver, NetKernel) 462 end. 463 464-spec do_setup(_,_,_,_,_,_,_) -> no_return(). 465do_setup( 466 Node, Type, MyNode, LongOrShortNames, SetupTime, Driver, NetKernel) -> 467 %% 468 {Name, Address} = split_node(Driver, Node, LongOrShortNames), 469 ErlEpmd = net_kernel:epmd_module(), 470 {ARMod, ARFun} = get_address_resolver(ErlEpmd, Driver), 471 Timer = trace(dist_util:start_timer(SetupTime)), 472 case ARMod:ARFun(Name, Address, Driver:family()) of 473 {ok, Ip, TcpPort, Version} -> 474 do_setup_connect( 475 Node, Type, MyNode, Timer, Driver, NetKernel, 476 Ip, TcpPort, Version); 477 {ok, Ip} -> 478 case ErlEpmd:port_please(Name, Ip) of 479 {port, TcpPort, Version} -> 480 do_setup_connect( 481 Node, Type, MyNode, Timer, Driver, NetKernel, 482 Ip, TcpPort, trace(Version)); 483 Other -> 484 _ = trace( 485 {ErlEpmd, port_please, [Name, Ip], Other}), 486 ?shutdown(Node) 487 end; 488 Other -> 489 _ = trace( 490 {ARMod, ARFun, [Name, Address, Driver:family()], 491 Other}), 492 ?shutdown(Node) 493 end. 494 495-spec do_setup_connect(_,_,_,_,_,_,_,_,_) -> no_return(). 496 497do_setup_connect( 498 Node, Type, MyNode, Timer, Driver, NetKernel, 499 Ip, TcpPort, Version) -> 500 dist_util:reset_timer(Timer), 501 ConnectOpts = trace(connect_options(socket_options())), 502 case Driver:connect(Ip, TcpPort, ConnectOpts) of 503 {ok, Socket} -> 504 DistCtrl = 505 try start_dist_ctrl(Socket, net_kernel:connecttime()) 506 catch error : {dist_ctrl, _} = DistCtrlError -> 507 _ = trace(DistCtrlError), 508 ?shutdown(Node) 509 end, 510 %% DistCtrl is a "socket" 511 HSData = 512 hs_data_common( 513 NetKernel, MyNode, DistCtrl, Timer, 514 Socket, Driver:family()), 515 HSData_1 = 516 HSData#hs_data{ 517 other_node = Node, 518 this_flags = 0, 519 other_version = Version, 520 request_type = Type}, 521 dist_util:handshake_we_started(trace(HSData_1)); 522 ConnectError -> 523 _ = trace( 524 {Driver, connect, [Ip, TcpPort, ConnectOpts], 525 ConnectError}), 526 ?shutdown(Node) 527 end. 528 529%% ------------------------------------------------------------------------- 530%% close/1 is only called by net_kernel on the socket returned by listen/1. 531 532close(Socket) -> 533 gen_close(Socket, ?DRIVER). 534 535gen_close(Socket, Driver) -> 536 Driver:close(trace(Socket)). 537 538%% ------------------------------------------------------------------------- 539 540 541hs_data_common(NetKernel, MyNode, DistCtrl, Timer, Socket, Family) -> 542 %% Field 'socket' below is set to DistCtrl, which makes 543 %% the distribution handshake process (ticker) call 544 %% the funs below with DistCtrl as the S argument. 545 %% So, S =:= DistCtrl below... 546 #hs_data{ 547 kernel_pid = NetKernel, 548 this_node = MyNode, 549 socket = DistCtrl, 550 timer = Timer, 551 %% 552 f_send = % -> ok | {error, closed}=>?shutdown() 553 fun (S, Packet) when S =:= DistCtrl -> 554 try call_dist_ctrl(S, {send, Packet}) 555 catch error : {dist_ctrl, Reason} -> 556 _ = trace(Reason), 557 {error, closed} 558 end 559 end, 560 f_recv = % -> {ok, List} | Other=>?shutdown() 561 fun (S, 0, infinity) when S =:= DistCtrl -> 562 try call_dist_ctrl(S, recv) of 563 {ok, Bin} when is_binary(Bin) -> 564 {ok, binary_to_list(Bin)}; 565 Error -> 566 Error 567 catch error : {dist_ctrl, Reason} -> 568 {error, trace(Reason)} 569 end 570 end, 571 f_setopts_pre_nodeup = 572 fun (S) when S =:= DistCtrl -> 573 ok 574 end, 575 f_setopts_post_nodeup = 576 fun (S) when S =:= DistCtrl -> 577 ok 578 end, 579 f_getll = 580 fun (S) when S =:= DistCtrl -> 581 {ok, S} %% DistCtrl is the distribution port 582 end, 583 f_address = % -> #net_address{} | ?shutdown() 584 fun (S, Node) when S =:= DistCtrl -> 585 try call_dist_ctrl(S, peername) of 586 {ok, Address} -> 587 case dist_util:split_node(Node) of 588 {node, _, Host} -> 589 #net_address{ 590 address = Address, 591 host = Host, 592 protocol = ?DIST_PROTO, 593 family = Family}; 594 _ -> 595 ?shutdown(Node) 596 end; 597 Error -> 598 _ = trace(Error), 599 ?shutdown(Node) 600 catch error : {dist_ctrl, Reason} -> 601 _ = trace(Reason), 602 ?shutdown(Node) 603 end 604 end, 605 f_handshake_complete = % -> ok | ?shutdown() 606 fun (S, Node, DistHandle) when S =:= DistCtrl -> 607 try call_dist_ctrl(S, {handshake_complete, DistHandle}) 608 catch error : {dist_ctrl, Reason} -> 609 _ = trace(Reason), 610 ?shutdown(Node) 611 end 612 end, 613 %% 614 %% mf_tick/1, mf_getstat/1, mf_setopts/2 and mf_getopts/2 615 %% are called by the ticker any time after f_handshake_complete/3 616 %% so they may not block the caller even for congested socket 617 mf_tick = 618 fun (S) when S =:= DistCtrl -> 619 S ! dist_tick 620 end, 621 mf_getstat = % -> {ok, RecvCnt, SendCnt, SendPend} | Other=>ignore_it 622 fun (S) when S =:= DistCtrl -> 623 case 624 inet:getstat(Socket, [recv_cnt, send_cnt, send_pend]) 625 of 626 {ok, Stat} -> 627 split_stat(Stat, 0, 0, 0); 628 Error -> 629 trace(Error) 630 end 631 end, 632 mf_setopts = 633 fun (S, Opts) when S =:= DistCtrl -> 634 inet:setopts(Socket, setopts_filter(Opts)) 635 end, 636 mf_getopts = 637 fun (S, Opts) when S =:= DistCtrl -> 638 inet:getopts(Socket, Opts) 639 end}. 640 641setopts_filter(Opts) -> 642 [Opt || 643 Opt <- Opts, 644 case Opt of 645 {K, _} when K =:= active; K =:= deliver; K =:= packet -> false; 646 K when K =:= list; K =:= binary -> false; 647 K when K =:= inet; K =:= inet6 -> false; 648 _ -> true 649 end]. 650 651split_stat([{recv_cnt, R}|Stat], _, W, P) -> 652 split_stat(Stat, R, W, P); 653split_stat([{send_cnt, W}|Stat], R, _, P) -> 654 split_stat(Stat, R, W, P); 655split_stat([{send_pend, P}|Stat], R, W, _) -> 656 split_stat(Stat, R, W, P); 657split_stat([], R, W, P) -> 658 {ok, R, W, P}. 659 660%% ------------------------------------------------------------ 661%% Determine if EPMD module supports address resolving. Default 662%% is to use inet_tcp:getaddr/2. 663%% ------------------------------------------------------------ 664get_address_resolver(EpmdModule, _Driver) -> 665 case erlang:function_exported(EpmdModule, address_please, 3) of 666 true -> {EpmdModule, address_please}; 667 _ -> {erl_epmd, address_please} 668 end. 669 670 671%% If Node is illegal terminate the connection setup!! 672split_node(Driver, Node, LongOrShortNames) -> 673 case dist_util:split_node(Node) of 674 {node, Name, Host} -> 675 check_node(Driver, Node, Name, Host, LongOrShortNames); 676 {host, _} -> 677 error_logger:error_msg( 678 "** Nodename ~p illegal, no '@' character **~n", 679 [Node]), 680 ?shutdown2(Node, trace({illegal_node_n@me, Node})); 681 _ -> 682 error_logger:error_msg( 683 "** Nodename ~p illegal **~n", [Node]), 684 ?shutdown2(Node, trace({illegal_node_name, Node})) 685 end. 686 687check_node(Driver, Node, Name, Host, LongOrShortNames) -> 688 case string:split(Host, ".", all) of 689 [_] when LongOrShortNames =:= longnames -> 690 case Driver:parse_address(Host) of 691 {ok, _} -> 692 {Name, Host}; 693 _ -> 694 error_logger:error_msg( 695 "** System running to use " 696 "fully qualified hostnames **~n" 697 "** Hostname ~s is illegal **~n", 698 [Host]), 699 ?shutdown2(Node, trace({not_longnames, Host})) 700 end; 701 [_, _|_] when LongOrShortNames =:= shortnames -> 702 error_logger:error_msg( 703 "** System NOT running to use " 704 "fully qualified hostnames **~n" 705 "** Hostname ~s is illegal **~n", 706 [Host]), 707 ?shutdown2(Node, trace({not_shortnames, Host})); 708 _ -> 709 {Name, Host} 710 end. 711 712%% ------------------------------------------------------------------------- 713 714connect_options(Opts) -> 715 case application:get_env(kernel, inet_dist_connect_options) of 716 {ok, ConnectOpts} -> 717 Opts ++ setopts_filter(ConnectOpts); 718 _ -> 719 Opts 720 end. 721 722%% we may not always want the nodelay behaviour 723%% for performance reasons 724nodelay() -> 725 case application:get_env(kernel, dist_nodelay) of 726 undefined -> 727 {nodelay, true}; 728 {ok, true} -> 729 {nodelay, true}; 730 {ok, false} -> 731 {nodelay, false}; 732 _ -> 733 {nodelay, true} 734 end. 735 736%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 737%% 738%% The DistCtrl process(es). 739%% 740%% At net_kernel handshake_complete spawns off the input controller that 741%% takes over the socket ownership, and itself becomes the output controller 742%% 743%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 744 745%%% XXX Missing to "productified": 746%%% * Cryptoanalysis by experts, this is crypto amateur work. 747%%% * Is it useful over inet_tls_dist; i.e to not have to bother 748%%% with certificates but instead manage a secret cluster cookie? 749%%% * An application to belong to (kernel) 750%%% * Restart and/or code reload policy (not needed in kernel) 751%%% * Fitting into the epmd/Erlang distro protocol version framework 752%%% (something needs to be created for multiple protocols, epmd, 753%%% multiple address families, fallback to previous version, etc) 754 755 756%% Debug client and server 757 758test_server() -> 759 {ok, Listen} = gen_tcp:listen(0, socket_options()), 760 {ok, Port} = inet:port(Listen), 761 io:format(?MODULE_STRING":test_client(~w).~n", [Port]), 762 {ok, Socket} = gen_tcp:accept(Listen), 763 test(Socket). 764 765test_client(Port) -> 766 {ok, Socket} = gen_tcp:connect(localhost, Port, socket_options()), 767 test(Socket). 768 769test(Socket) -> 770 start_dist_ctrl(Socket, 10000). 771 772%% ------------------------------------------------------------------------- 773 774start_dist_ctrl(Socket, Timeout) -> 775 Secret = atom_to_binary(auth:get_cookie(), latin1), 776 Controller = self(), 777 Server = 778 monitor_dist_proc( 779 spawn_opt( 780 fun () -> 781 receive 782 {?MODULE, From, start} -> 783 {SendParams, RecvParams} = 784 init(Socket, Secret), 785 reply(From, self()), 786 handshake(SendParams, 1, RecvParams, 1, Controller) 787 end 788 end, 789 [link, 790 {priority, max}, 791 {message_queue_data, off_heap}, 792 {fullsweep_after, 0}])), 793 ok = gen_tcp:controlling_process(Socket, Server), 794 call_dist_ctrl(Server, start, Timeout). 795 796 797call_dist_ctrl(Server, Msg) -> 798 call_dist_ctrl(Server, Msg, infinity). 799%% 800call_dist_ctrl(Server, Msg, Timeout) -> 801 Ref = erlang:monitor(process, Server), 802 Server ! {?MODULE, {Ref, self()}, Msg}, 803 receive 804 {Ref, Res} -> 805 erlang:demonitor(Ref, [flush]), 806 Res; 807 {'DOWN', Ref, process, Server, Reason} -> 808 error({dist_ctrl, Reason}) 809 after Timeout -> % Timeout < infinity is only used by start_dist_ctrl/2 810 receive 811 {'DOWN', Ref, process, Server, _} -> 812 receive {Ref, _} -> ok after 0 -> ok end, 813 error({dist_ctrl, timeout}) 814 %% Server will be killed by link 815 end 816 end. 817 818reply({Ref, Pid}, Msg) -> 819 Pid ! {Ref, Msg}, 820 ok. 821 822%% ------------------------------------------------------------------------- 823 824-define(TCP_ACTIVE, 16). 825-define(CHUNK_SIZE, (?PACKET_SIZE - 512)). 826 827-define(HANDSHAKE_CHUNK, 1). 828-define(DATA_CHUNK, 2). 829-define(TICK_CHUNK, 3). 830-define(REKEY_CHUNK, 4). 831 832%% ------------------------------------------------------------------------- 833%% Crypto strategy 834%% ------- 835%% The crypto strategy is as simple as possible to get an encrypted 836%% connection as benchmark reference. It is geared around AEAD 837%% ciphers in particular AES-GCM. 838%% 839%% The init message and the start message must fit in the TCP buffers 840%% since both sides start with sending the init message, waits 841%% for the other end's init message, sends the start message 842%% and waits for the other end's start message. So if the send 843%% blocks we have a deadlock. 844%% 845%% The init + start sequence tries to implement Password Encrypted 846%% Key Exchange using a node public/private key pair and the 847%% shared secret (the Cookie) to create session encryption keys 848%% that can not be re-created if the shared secret is compromized, 849%% which should create forward secrecy. You need both nodes' 850%% key pairs and the shared secret to decrypt the traffic 851%% between the nodes. 852%% 853%% All exchanged messages uses {packet, 2} i.e 16 bit size header. 854%% 855%% The init message contains a random number and encrypted: the public key 856%% and two random numbers. The encryption is done with Key and IV hashed 857%% from the unencrypted random number and the shared secret. 858%% 859%% The other node's public key is used with the own node's private 860%% key to create a shared key that is hashed with one of the encrypted 861%% random numbers from each side to create Key and IV for the session. 862%% 863%% The start message contains the two encrypted random numbers 864%% this time encrypted with the session keys for verification 865%% by the other side, plus the rekey interval. The rekey interval 866%% is just there to get an early check for if the other side's 867%% maximum rekey interal is acceptable, it is just an embryo 868%% of some better check. Any side may rekey earlier but if the 869%% rekey interval is exceeded the connection fails. 870%% 871%% Subsequent encrypted messages has the sequence number and the length 872%% of the message as AAD data, and an incrementing IV. These messages 873%% has got a message type that differentiates data from ticks and rekeys. 874%% Ticks have a random size in an attempt to make them less obvious to spot. 875%% 876%% Rekeying is done by the sender that creates a new key pair and 877%% a new shared secret from the other end's public key and with 878%% this and the current key and iv hashes a new key and iv. 879%% The new public key is sent to the other end that uses it 880%% and its old private key to create the same new shared 881%% secret and from that a new key and iv. 882%% So the receiver keeps its private key, and the sender keeps 883%% the receivers public key for the connection's life time. 884%% While the sender generates a new key pair at every rekey, 885%% which changes the shared secret at every rekey. 886%% 887%% The only reaction to errors is to crash noisily (?) wich will bring 888%% down the connection and hopefully produce something useful 889%% in the local log, but all the other end sees is a closed connection. 890%% ------------------------------------------------------------------------- 891 892init(Socket, Secret) -> 893 #key_pair{public = PubKey} = KeyPair = get_key_pair(), 894 Params = params(Socket), 895 {R2, R3, Msg} = init_msg(Params, PubKey, Secret), 896 ok = gen_tcp:send(Socket, Msg), 897 init_recv(Params, Secret, KeyPair, R2, R3). 898 899init_recv( 900 #params{socket = Socket, iv = IVLen} = Params, Secret, KeyPair, R2, R3) -> 901 %% 902 {ok, InitMsg} = gen_tcp:recv(Socket, 0), 903 IVSaltLen = IVLen - 6, 904 try 905 case init_msg(Params, Secret, KeyPair, R2, R3, InitMsg) of 906 {#params{iv = <<IV2ASalt:IVSaltLen/binary, IV2ANo:48>>} = 907 SendParams, 908 RecvParams, SendStartMsg} -> 909 ok = gen_tcp:send(Socket, SendStartMsg), 910 {ok, RecvStartMsg} = gen_tcp:recv(Socket, 0), 911 #params{ 912 iv = <<IV2BSalt:IVSaltLen/binary, IV2BNo:48>>} = 913 RecvParams_1 = 914 start_msg(RecvParams, R2, R3, RecvStartMsg), 915 {SendParams#params{iv = {IV2ASalt, IV2ANo}}, 916 RecvParams_1#params{iv = {IV2BSalt, IV2BNo}}} 917 end 918 catch 919 error : Reason : Stacktrace-> 920 _ = trace({Reason, Stacktrace}), 921 exit(connection_closed) 922 end. 923 924 925 926init_msg( 927 #params{ 928 hmac_algorithm = HmacAlgo, 929 aead_cipher = AeadCipher, 930 key = KeyLen, 931 iv = IVLen, 932 tag_len = TagLen}, PubKeyA, Secret) -> 933 %% 934 RLen = KeyLen + IVLen, 935 <<R1A:RLen/binary, R2A:RLen/binary, R3A:RLen/binary>> = 936 crypto:strong_rand_bytes(3 * RLen), 937 {Key1A, IV1A} = hmac_key_iv(HmacAlgo, R1A, Secret, KeyLen, IVLen), 938 Plaintext = [R2A, R3A, PubKeyA], 939 MsgLen = byte_size(R1A) + TagLen + iolist_size(Plaintext), 940 AAD = [<<MsgLen:32>>, R1A], 941 {Ciphertext, Tag} = 942 crypto:block_encrypt(AeadCipher, Key1A, IV1A, {AAD, Plaintext, TagLen}), 943 Msg = [R1A, Tag, Ciphertext], 944 {R2A, R3A, Msg}. 945%% 946init_msg( 947 #params{ 948 hmac_algorithm = HmacAlgo, 949 aead_cipher = AeadCipher, 950 key = KeyLen, 951 iv = IVLen, 952 tag_len = TagLen, 953 rekey_interval = RekeyInterval} = Params, 954 Secret, KeyPair, R2A, R3A, Msg) -> 955 %% 956 RLen = KeyLen + IVLen, 957 case Msg of 958 <<R1B:RLen/binary, Tag:TagLen/binary, Ciphertext/binary>> -> 959 {Key1B, IV1B} = hmac_key_iv(HmacAlgo, R1B, Secret, KeyLen, IVLen), 960 MsgLen = byte_size(Msg), 961 AAD = [<<MsgLen:32>>, R1B], 962 case 963 crypto:block_decrypt( 964 AeadCipher, Key1B, IV1B, {AAD, Ciphertext, Tag}) 965 of 966 <<R2B:RLen/binary, R3B:RLen/binary, PubKeyB/binary>> -> 967 SharedSecret = compute_shared_secret(KeyPair, PubKeyB), 968 %% 969 {Key2A, IV2A} = 970 hmac_key_iv( 971 HmacAlgo, SharedSecret, [R2A, R3B], KeyLen, IVLen), 972 SendParams = 973 Params#params{ 974 rekey_key = PubKeyB, 975 key = Key2A, iv = IV2A}, 976 %% 977 StartCleartext = [R2B, R3B, <<RekeyInterval:32>>], 978 StartMsgLen = TagLen + iolist_size(StartCleartext), 979 StartAAD = <<StartMsgLen:32>>, 980 {StartCiphertext, StartTag} = 981 crypto:block_encrypt( 982 AeadCipher, Key2A, IV2A, 983 {StartAAD, StartCleartext, TagLen}), 984 StartMsg = [StartTag, StartCiphertext], 985 %% 986 {Key2B, IV2B} = 987 hmac_key_iv( 988 HmacAlgo, SharedSecret, [R2B, R3A], KeyLen, IVLen), 989 RecvParams = 990 Params#params{ 991 rekey_key = KeyPair, 992 key = Key2B, iv = IV2B}, 993 %% 994 {SendParams, RecvParams, StartMsg} 995 end 996 end. 997 998start_msg( 999 #params{ 1000 aead_cipher = AeadCipher, 1001 key = Key2B, 1002 iv = IV2B, 1003 tag_len = TagLen, 1004 rekey_interval = RekeyIntervalA} = RecvParams, R2A, R3A, Msg) -> 1005 %% 1006 case Msg of 1007 <<Tag:TagLen/binary, Ciphertext/binary>> -> 1008 KeyLen = byte_size(Key2B), 1009 IVLen = byte_size(IV2B), 1010 RLen = KeyLen + IVLen, 1011 MsgLen = byte_size(Msg), 1012 AAD = <<MsgLen:32>>, 1013 case 1014 crypto:block_decrypt( 1015 AeadCipher, Key2B, IV2B, {AAD, Ciphertext, Tag}) 1016 of 1017 <<R2A:RLen/binary, R3A:RLen/binary, RekeyIntervalB:32>> 1018 when RekeyIntervalA =< (RekeyIntervalB bsl 2), 1019 RekeyIntervalB =< (RekeyIntervalA bsl 2) -> 1020 RecvParams#params{rekey_interval = RekeyIntervalB} 1021 end 1022 end. 1023 1024hmac_key_iv(HmacAlgo, MacKey, Data, KeyLen, IVLen) -> 1025 <<Key:KeyLen/binary, IV:IVLen/binary>> = 1026 crypto:hmac(HmacAlgo, MacKey, Data, KeyLen + IVLen), 1027 {Key, IV}. 1028 1029%% ------------------------------------------------------------------------- 1030%% net_kernel distribution handshake in progress 1031%% 1032 1033handshake( 1034 SendParams, SendSeq, 1035 #params{socket = Socket} = RecvParams, RecvSeq, Controller) -> 1036 receive 1037 {?MODULE, From, {controller, Controller_1, Parent}} -> 1038 Result = link(Controller_1), 1039 true = unlink(Parent), 1040 reply(From, Result), 1041 handshake(SendParams, SendSeq, RecvParams, RecvSeq, Controller_1); 1042 {?MODULE, From, {handshake_complete, DistHandle}} -> 1043 InputHandler = 1044 monitor_dist_proc( 1045 spawn_opt( 1046 fun () -> 1047 link(Controller), 1048 receive 1049 DistHandle -> 1050 ok = 1051 inet:setopts( 1052 Socket, 1053 [{active, ?TCP_ACTIVE}, 1054 nodelay()]), 1055 input_handler( 1056 RecvParams#params{ 1057 dist_handle = DistHandle}, 1058 RecvSeq, empty_q(), infinity) 1059 end 1060 end, 1061 [link, 1062 {priority, normal}, 1063 {message_queue_data, off_heap}, 1064 {fullsweep_after, 0}])), 1065 _ = monitor(process, InputHandler), % For the benchmark test 1066 ok = gen_tcp:controlling_process(Socket, InputHandler), 1067 ok = erlang:dist_ctrl_input_handler(DistHandle, InputHandler), 1068 InputHandler ! DistHandle, 1069 crypto:rand_seed_alg(crypto_cache), 1070 reply(From, ok), 1071 process_flag(priority, normal), 1072 erlang:dist_ctrl_get_data_notification(DistHandle), 1073 output_handler( 1074 SendParams#params{dist_handle = DistHandle}, SendSeq); 1075 %% 1076 {?MODULE, From, {send, Data}} -> 1077 case 1078 encrypt_and_send_chunk( 1079 SendParams, SendSeq, [?HANDSHAKE_CHUNK, Data]) 1080 of 1081 {SendParams_1, SendSeq_1, ok} -> 1082 reply(From, ok), 1083 handshake( 1084 SendParams_1, SendSeq_1, RecvParams, RecvSeq, 1085 Controller); 1086 {_, _, Error} -> 1087 reply(From, {error, closed}), 1088 death_row({send, trace(Error)}) 1089 end; 1090 {?MODULE, From, recv} -> 1091 case recv_and_decrypt_chunk(RecvParams, RecvSeq) of 1092 {RecvParams_1, RecvSeq_1, {ok, _} = Reply} -> 1093 reply(From, Reply), 1094 handshake( 1095 SendParams, SendSeq, RecvParams_1, RecvSeq_1, 1096 Controller); 1097 {_, _, Error} -> 1098 reply(From, Error), 1099 death_row({recv, trace(Error)}) 1100 end; 1101 {?MODULE, From, peername} -> 1102 reply(From, inet:peername(Socket)), 1103 handshake(SendParams, SendSeq, RecvParams, RecvSeq, Controller); 1104 %% 1105 _Alien -> 1106 handshake(SendParams, SendSeq, RecvParams, RecvSeq, Controller) 1107 end. 1108 1109recv_and_decrypt_chunk(#params{socket = Socket} = RecvParams, RecvSeq) -> 1110 case gen_tcp:recv(Socket, 0) of 1111 {ok, Chunk} -> 1112 case decrypt_chunk(RecvParams, RecvSeq, Chunk) of 1113 <<?HANDSHAKE_CHUNK, Cleartext/binary>> -> 1114 {RecvParams, RecvSeq + 1, {ok, Cleartext}}; 1115 OtherChunk when is_binary(OtherChunk) -> 1116 {RecvParams, RecvSeq + 1, {error, decrypt_error}}; 1117 #params{} = RecvParams_1 -> 1118 recv_and_decrypt_chunk(RecvParams_1, 0); 1119 error -> 1120 {RecvParams, RecvSeq, {error, decrypt_error}} 1121 end; 1122 Error -> 1123 {RecvParams, RecvSeq, Error} 1124 end. 1125 1126%% ------------------------------------------------------------------------- 1127%% Output handler process 1128%% 1129%% The game here is to flush all dist_data and dist_tick messages, 1130%% prioritize dist_data over dist_tick, and to not use selective receive 1131 1132output_handler(Params, Seq) -> 1133 receive 1134 Msg -> 1135 case Msg of 1136 dist_data -> 1137 output_handler_data(Params, Seq); 1138 dist_tick -> 1139 output_handler_tick(Params, Seq); 1140 Other -> 1141 %% Ignore 1142 _ = trace(Other), 1143 output_handler(Params, Seq) 1144 end 1145 end. 1146 1147output_handler_data(Params, Seq) -> 1148 receive 1149 Msg -> 1150 case Msg of 1151 dist_data -> 1152 output_handler_data(Params, Seq); 1153 dist_tick -> 1154 output_handler_data(Params, Seq); 1155 Other -> 1156 %% Ignore 1157 _ = trace(Other), 1158 output_handler_data(Params, Seq) 1159 end 1160 after 0 -> 1161 DistHandle = Params#params.dist_handle, 1162 Q = get_data(DistHandle, empty_q()), 1163 {Params_1, Seq_1} = output_handler_send(Params, Seq, Q), 1164 erlang:dist_ctrl_get_data_notification(DistHandle), 1165 output_handler(Params_1, Seq_1) 1166 end. 1167 1168output_handler_tick(Params, Seq) -> 1169 receive 1170 Msg -> 1171 case Msg of 1172 dist_data -> 1173 output_handler_data(Params, Seq); 1174 dist_tick -> 1175 output_handler_tick(Params, Seq); 1176 Other -> 1177 %% Ignore 1178 _ = trace(Other), 1179 output_handler_tick(Params, Seq) 1180 end 1181 after 0 -> 1182 TickSize = 7 + rand:uniform(56), 1183 TickData = binary:copy(<<0>>, TickSize), 1184 case 1185 encrypt_and_send_chunk(Params, Seq, [?TICK_CHUNK, TickData]) 1186 of 1187 {Params_1, Seq_1, ok} -> 1188 output_handler(Params_1, Seq_1); 1189 {_, _, Error} -> 1190 _ = trace(Error), 1191 death_row() 1192 end 1193 end. 1194 1195output_handler_send(Params, Seq, {_, Size, _} = Q) -> 1196 if 1197 ?CHUNK_SIZE < Size -> 1198 output_handler_send(Params, Seq, Q, ?CHUNK_SIZE); 1199 true -> 1200 case get_data(Params#params.dist_handle, Q) of 1201 {_, 0, _} -> 1202 {Params, Seq}; 1203 {_, Size, _} = Q_1 -> % Got no more 1204 output_handler_send(Params, Seq, Q_1, Size); 1205 Q_1 -> 1206 output_handler_send(Params, Seq, Q_1) 1207 end 1208 end. 1209 1210output_handler_send(Params, Seq, Q, Size) -> 1211 {Cleartext, Q_1} = deq_iovec(Size, Q), 1212 case 1213 encrypt_and_send_chunk(Params, Seq, [?DATA_CHUNK, Cleartext]) 1214 of 1215 {Params_1, Seq_1, ok} -> 1216 output_handler_send(Params_1, Seq_1, Q_1); 1217 {_, _, Error} -> 1218 _ = trace(Error), 1219 death_row() 1220 end. 1221 1222%% ------------------------------------------------------------------------- 1223%% Input handler process 1224%% 1225%% Here is T = 0|infinity to steer if we should try to receive 1226%% more data or not; start with infinity, and when we get some 1227%% data try with 0 to see if more is waiting 1228 1229input_handler(#params{socket = Socket} = Params, Seq, Q, T) -> 1230 receive 1231 Msg -> 1232 case Msg of 1233 {tcp_passive, Socket} -> 1234 ok = inet:setopts(Socket, [{active, ?TCP_ACTIVE}]), 1235 Q_1 = 1236 case T of 1237 0 -> 1238 deliver_data(Params#params.dist_handle, Q); 1239 infinity -> 1240 Q 1241 end, 1242 input_handler(Params, Seq, Q_1, infinity); 1243 {tcp, Socket, Chunk} -> 1244 input_chunk(Params, Seq, Q, T, Chunk); 1245 {tcp_closed, Socket} -> 1246 exit(connection_closed); 1247 Other -> 1248 %% Ignore... 1249 _ = trace(Other), 1250 input_handler(Params, Seq, Q, T) 1251 end 1252 after T -> 1253 Q_1 = deliver_data(Params#params.dist_handle, Q), 1254 input_handler(Params, Seq, Q_1, infinity) 1255 end. 1256 1257input_chunk(Params, Seq, Q, T, Chunk) -> 1258 case decrypt_chunk(Params, Seq, Chunk) of 1259 <<?DATA_CHUNK, Cleartext/binary>> -> 1260 input_handler(Params, Seq + 1, enq_binary(Cleartext, Q), 0); 1261 <<?TICK_CHUNK, _/binary>> -> 1262 input_handler(Params, Seq + 1, Q, T); 1263 OtherChunk when is_binary(OtherChunk) -> 1264 _ = trace(invalid_chunk), 1265 exit(connection_closed); 1266 #params{} = Params_1 -> 1267 input_handler(Params_1, 0, Q, T); 1268 error -> 1269 _ = trace(decrypt_error), 1270 exit(connection_closed) 1271 end. 1272 1273%% ------------------------------------------------------------------------- 1274%% erlang:dist_ctrl_* helpers 1275 1276%% Get data for sending from the VM and place it in a queue 1277%% 1278get_data(DistHandle, {Front, Size, Rear}) -> 1279 get_data(DistHandle, Front, Size, Rear). 1280%% 1281get_data(DistHandle, Front, Size, Rear) -> 1282 case erlang:dist_ctrl_get_data(DistHandle) of 1283 none -> 1284 {Front, Size, Rear}; 1285 Bin when is_binary(Bin) -> 1286 Len = byte_size(Bin), 1287 get_data( 1288 DistHandle, Front, Size + 4 + Len, 1289 [Bin, <<Len:32>>|Rear]); 1290 [Bin1, Bin2] -> 1291 Len = byte_size(Bin1) + byte_size(Bin2), 1292 get_data( 1293 DistHandle, Front, Size + 4 + Len, 1294 [Bin2, Bin1, <<Len:32>>|Rear]); 1295 Iovec -> 1296 Len = iolist_size(Iovec), 1297 get_data( 1298 DistHandle, Front, Size + 4 + Len, 1299 lists:reverse(Iovec, [<<Len:32>>|Rear])) 1300 end. 1301 1302%% De-packet and deliver received data to the VM from a queue 1303%% 1304deliver_data(DistHandle, Q) -> 1305 case Q of 1306 {[], Size, []} -> 1307 Size = 0, % Assert 1308 Q; 1309 {[], Size, Rear} -> 1310 [Bin|Front] = lists:reverse(Rear), 1311 deliver_data(DistHandle, Front, Size, [], Bin); 1312 {[Bin|Front], Size, Rear} -> 1313 deliver_data(DistHandle, Front, Size, Rear, Bin) 1314 end. 1315%% 1316deliver_data(DistHandle, Front, Size, Rear, Bin) -> 1317 case Bin of 1318 <<DataSizeA:32, DataA:DataSizeA/binary, 1319 DataSizeB:32, DataB:DataSizeB/binary, Rest/binary>> -> 1320 erlang:dist_ctrl_put_data(DistHandle, DataA), 1321 erlang:dist_ctrl_put_data(DistHandle, DataB), 1322 deliver_data( 1323 DistHandle, 1324 Front, Size - (4 + DataSizeA + 4 + DataSizeB), Rear, 1325 Rest); 1326 <<DataSize:32, Data:DataSize/binary, Rest/binary>> -> 1327 erlang:dist_ctrl_put_data(DistHandle, Data), 1328 deliver_data(DistHandle, Front, Size - (4 + DataSize), Rear, Rest); 1329 <<DataSize:32, FirstData/binary>> -> 1330 TotalSize = 4 + DataSize, 1331 if 1332 TotalSize =< Size -> 1333 BinSize = byte_size(Bin), 1334 {MoreData, Q} = 1335 deq_iovec( 1336 TotalSize - BinSize, 1337 Front, Size - BinSize, Rear), 1338 erlang:dist_ctrl_put_data(DistHandle, [FirstData|MoreData]), 1339 deliver_data(DistHandle, Q); 1340 true -> % Incomplete data 1341 {[Bin|Front], Size, Rear} 1342 end; 1343 <<_/binary>> -> 1344 BinSize = byte_size(Bin), 1345 if 1346 4 =< Size -> % Fragmented header - extract a header bin 1347 {RestHeader, {Front_1, _Size_1, Rear_1}} = 1348 deq_iovec(4 - BinSize, Front, Size - BinSize, Rear), 1349 Header = iolist_to_binary([Bin|RestHeader]), 1350 deliver_data(DistHandle, Front_1, Size, Rear_1, Header); 1351 true -> % Incomplete header 1352 {[Bin|Front], Size, Rear} 1353 end 1354 end. 1355 1356%% ------------------------------------------------------------------------- 1357%% Encryption and decryption helpers 1358 1359encrypt_and_send_chunk( 1360 #params{ 1361 socket = Socket, 1362 rekey_interval = Seq, 1363 rekey_key = PubKeyB, 1364 key = Key, 1365 iv = {IVSalt, IVNo}, 1366 hmac_algorithm = HmacAlgo} = Params, 1367 Seq, Cleartext) -> 1368 %% 1369 KeyLen = byte_size(Key), 1370 IVSaltLen = byte_size(IVSalt), 1371 #key_pair{public = PubKeyA} = KeyPair = get_new_key_pair(), 1372 case 1373 gen_tcp:send( 1374 Socket, encrypt_chunk(Params, Seq, [?REKEY_CHUNK, PubKeyA])) 1375 of 1376 ok -> 1377 SharedSecret = compute_shared_secret(KeyPair, PubKeyB), 1378 IV = <<(IVNo + Seq):48>>, 1379 {Key_1, <<IVSalt_1:IVSaltLen/binary, IVNo_1:48>>} = 1380 hmac_key_iv( 1381 HmacAlgo, SharedSecret, [Key, IVSalt, IV], 1382 KeyLen, IVSaltLen + 6), 1383 Params_1 = Params#params{key = Key_1, iv = {IVSalt_1, IVNo_1}}, 1384 Result = 1385 gen_tcp:send(Socket, encrypt_chunk(Params_1, 0, Cleartext)), 1386 {Params_1, 1, Result}; 1387 SendError -> 1388 {Params, Seq + 1, SendError} 1389 end; 1390encrypt_and_send_chunk(#params{socket = Socket} = Params, Seq, Cleartext) -> 1391 Result = gen_tcp:send(Socket, encrypt_chunk(Params, Seq, Cleartext)), 1392 {Params, Seq + 1, Result}. 1393 1394encrypt_chunk( 1395 #params{ 1396 aead_cipher = AeadCipher, 1397 iv = {IVSalt, IVNo}, key = Key, tag_len = TagLen}, Seq, Cleartext) -> 1398 %% 1399 ChunkLen = iolist_size(Cleartext) + TagLen, 1400 AAD = <<Seq:32, ChunkLen:32>>, 1401 IVBin = <<IVSalt/binary, (IVNo + Seq):48>>, 1402 {Ciphertext, CipherTag} = 1403 crypto:block_encrypt(AeadCipher, Key, IVBin, {AAD, Cleartext, TagLen}), 1404 Chunk = [Ciphertext,CipherTag], 1405 Chunk. 1406 1407decrypt_chunk( 1408 #params{ 1409 aead_cipher = AeadCipher, 1410 iv = {IVSalt, IVNo}, key = Key, tag_len = TagLen} = Params, Seq, Chunk) -> 1411 %% 1412 ChunkLen = byte_size(Chunk), 1413 if 1414 ChunkLen < TagLen -> 1415 error; 1416 true -> 1417 AAD = <<Seq:32, ChunkLen:32>>, 1418 IVBin = <<IVSalt/binary, (IVNo + Seq):48>>, 1419 CiphertextLen = ChunkLen - TagLen, 1420 case Chunk of 1421 <<Ciphertext:CiphertextLen/binary, 1422 CipherTag:TagLen/binary>> -> 1423 block_decrypt( 1424 Params, Seq, AeadCipher, Key, IVBin, 1425 {AAD, Ciphertext, CipherTag}); 1426 _ -> 1427 error 1428 end 1429 end. 1430 1431block_decrypt( 1432 #params{ 1433 rekey_key = #key_pair{public = PubKeyA} = KeyPair, 1434 rekey_interval = RekeyInterval} = Params, 1435 Seq, AeadCipher, Key, IV, Data) -> 1436 %% 1437 case crypto:block_decrypt(AeadCipher, Key, IV, Data) of 1438 <<?REKEY_CHUNK, Rest/binary>> -> 1439 PubKeyLen = byte_size(PubKeyA), 1440 case Rest of 1441 <<PubKeyB:PubKeyLen/binary>> -> 1442 SharedSecret = compute_shared_secret(KeyPair, PubKeyB), 1443 KeyLen = byte_size(Key), 1444 IVLen = byte_size(IV), 1445 IVSaltLen = IVLen - 6, 1446 {Key_1, <<IVSalt:IVSaltLen/binary, IVNo:48>>} = 1447 hmac_key_iv( 1448 Params#params.hmac_algorithm, 1449 SharedSecret, [Key, IV], KeyLen, IVLen), 1450 Params#params{iv = {IVSalt, IVNo}, key = Key_1}; 1451 _ -> 1452 error 1453 end; 1454 Chunk when is_binary(Chunk) -> 1455 case Seq of 1456 RekeyInterval -> 1457 %% This was one chunk too many without rekeying 1458 error; 1459 _ -> 1460 Chunk 1461 end; 1462 error -> 1463 error 1464 end. 1465 1466%% ------------------------------------------------------------------------- 1467%% Queue of binaries i.e an iovec queue 1468 1469empty_q() -> 1470 {[], 0, []}. 1471 1472enq_binary(Bin, {Front, Size, Rear}) -> 1473 {Front, Size + byte_size(Bin), [Bin|Rear]}. 1474 1475deq_iovec(GetSize, {Front, Size, Rear}) when GetSize =< Size -> 1476 deq_iovec(GetSize, Front, Size, Rear, []). 1477%% 1478deq_iovec(GetSize, Front, Size, Rear) -> 1479 deq_iovec(GetSize, Front, Size, Rear, []). 1480%% 1481deq_iovec(GetSize, [], Size, Rear, Acc) -> 1482 deq_iovec(GetSize, lists:reverse(Rear), Size, [], Acc); 1483deq_iovec(GetSize, [Bin|Front], Size, Rear, Acc) -> 1484 BinSize = byte_size(Bin), 1485 if 1486 BinSize < GetSize -> 1487 deq_iovec( 1488 GetSize - BinSize, Front, Size - BinSize, Rear, [Bin|Acc]); 1489 GetSize < BinSize -> 1490 {Bin1,Bin2} = erlang:split_binary(Bin, GetSize), 1491 {lists:reverse(Acc, [Bin1]), {[Bin2|Front], Size - GetSize, Rear}}; 1492 true -> 1493 {lists:reverse(Acc, [Bin]), {Front, Size - BinSize, Rear}} 1494 end. 1495 1496%% ------------------------------------------------------------------------- 1497 1498death_row() -> death_row(connection_closed). 1499%% 1500death_row(normal) -> death_row(connection_closed); 1501death_row(Reason) -> receive after 5000 -> exit(Reason) end. 1502 1503%% ------------------------------------------------------------------------- 1504 1505%% Trace point 1506trace(Term) -> Term. 1507 1508%% Keep an eye on this Pid (debug) 1509-ifndef(undefined). 1510monitor_dist_proc(Pid) -> 1511 Pid. 1512-else. 1513monitor_dist_proc(Pid) -> 1514 spawn( 1515 fun () -> 1516 MRef = erlang:monitor(process, Pid), 1517 receive 1518 {'DOWN', MRef, _, _, normal} -> 1519 error_logger:error_report( 1520 [dist_proc_died, 1521 {reason, normal}, 1522 {pid, Pid}]); 1523 {'DOWN', MRef, _, _, Reason} -> 1524 error_logger:info_report( 1525 [dist_proc_died, 1526 {reason, Reason}, 1527 {pid, Pid}]) 1528 end 1529 end), 1530 Pid. 1531-endif. 1532 1533dbg() -> 1534 dbg:stop(), 1535 dbg:tracer(), 1536 dbg:p(all, c), 1537 dbg:tpl(?MODULE, trace, cx), 1538 dbg:tpl(erlang, dist_ctrl_get_data_notification, cx), 1539 dbg:tpl(erlang, dist_ctrl_get_data, cx), 1540 dbg:tpl(erlang, dist_ctrl_put_data, cx), 1541 ok. 1542