1%% 2%% %CopyrightBegin% 3%% 4%% Copyright Ericsson AB 2017-2019. All Rights Reserved. 5%% 6%% Licensed under the Apache License, Version 2.0 (the "License"); 7%% you may not use this file except in compliance with the License. 8%% You may obtain a copy of the License at 9%% 10%% http://www.apache.org/licenses/LICENSE-2.0 11%% 12%% Unless required by applicable law or agreed to in writing, software 13%% distributed under the License is distributed on an "AS IS" BASIS, 14%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15%% See the License for the specific language governing permissions and 16%% limitations under the License. 17%% 18%% %CopyrightEnd% 19%% 20-module(gen_tcp_dist). 21 22%% 23%% This is an example of how to plug in an arbitrary distribution 24%% carrier for Erlang using distribution processes. 25%% 26%% This example uses gen_tcp for transportation of data, but 27%% you can use whatever underlying protocol you want as long 28%% as your implementation reliably delivers data chunks to the 29%% receiving VM in the order they were sent from the sending 30%% VM. 31%% 32%% This code is a rewrite of the lib/kernel/src/inet_tcp_dist.erl 33%% distribution implementation for TCP used by default. The default 34%% implementation uses distribution ports instead of distribution 35%% processes and is more efficient compared to this implementation. 36%% This example more or less gets the distribution processes 37%% in between the VM and the ports without any specific gain. 38%% 39 40-export([listen/1, accept/1, accept_connection/5, 41 setup/5, close/1, select/1, is_node_name/1]). 42 43%% Optional 44-export([setopts/2, getopts/2]). 45 46%% internal exports 47 48-export([dist_cntrlr_setup/1, dist_cntrlr_input_setup/3, 49 dist_cntrlr_tick_handler/1]). 50 51-export([accept_loop/2,do_accept/6,do_setup/6]). 52 53-import(error_logger,[error_msg/2]). 54 55-include_lib("kernel/include/net_address.hrl"). 56 57-include_lib("kernel/include/dist.hrl"). 58-include_lib("kernel/include/dist_util.hrl"). 59 60%% ------------------------------------------------------------ 61%% Select this protocol based on node name 62%% select(Node) => Bool 63%% ------------------------------------------------------------ 64 65select(Node) -> 66 case split_node(atom_to_list(Node), $@, []) of 67 [_, Host] -> 68 case inet:getaddr(Host, inet) of 69 {ok,_} -> true; 70 _ -> false 71 end; 72 _ -> false 73 end. 74 75%% ------------------------------------------------------------ 76%% Create the listen socket, i.e. the port that this erlang 77%% node is accessible through. 78%% ------------------------------------------------------------ 79 80listen(Name) -> 81 case do_listen([binary, {active, false}, {packet,2}, {reuseaddr, true}]) of 82 {ok, Socket} -> 83 TcpAddress = get_tcp_address(Socket), 84 {_,Port} = TcpAddress#net_address.address, 85 ErlEpmd = net_kernel:epmd_module(), 86 case ErlEpmd:register_node(Name, Port) of 87 {ok, Creation} -> 88 {ok, {Socket, TcpAddress, Creation}}; 89 Error -> 90 Error 91 end; 92 Error -> 93 Error 94 end. 95 96do_listen(Options) -> 97 {First,Last} = case application:get_env(kernel,inet_dist_listen_min) of 98 {ok,N} when is_integer(N) -> 99 case application:get_env(kernel, 100 inet_dist_listen_max) of 101 {ok,M} when is_integer(M) -> 102 {N,M}; 103 _ -> 104 {N,N} 105 end; 106 _ -> 107 {0,0} 108 end, 109 do_listen(First, Last, listen_options([{backlog,128}|Options])). 110 111do_listen(First,Last,_) when First > Last -> 112 {error,eaddrinuse}; 113do_listen(First,Last,Options) -> 114 case gen_tcp:listen(First, Options) of 115 {error, eaddrinuse} -> 116 do_listen(First+1,Last,Options); 117 Other -> 118 Other 119 end. 120 121listen_options(Opts0) -> 122 Opts1 = 123 case application:get_env(kernel, inet_dist_use_interface) of 124 {ok, Ip} -> 125 [{ip, Ip} | Opts0]; 126 _ -> 127 Opts0 128 end, 129 case application:get_env(kernel, inet_dist_listen_options) of 130 {ok,ListenOpts} -> 131 ListenOpts ++ Opts1; 132 _ -> 133 Opts1 134 end. 135 136 137%% ------------------------------------------------------------ 138%% Accepts new connection attempts from other Erlang nodes. 139%% ------------------------------------------------------------ 140 141accept(Listen) -> 142 spawn_opt(?MODULE, accept_loop, [self(), Listen], [link, {priority, max}]). 143 144accept_loop(Kernel, Listen) -> 145 ?trace("~p~n",[{?MODULE, accept_loop, self()}]), 146 case gen_tcp:accept(Listen) of 147 {ok, Socket} -> 148 DistCtrl = spawn_dist_cntrlr(Socket), 149 ?trace("~p~n",[{?MODULE, accept_loop, accepted, Socket, DistCtrl, self()}]), 150 flush_controller(DistCtrl, Socket), 151 gen_tcp:controlling_process(Socket, DistCtrl), 152 flush_controller(DistCtrl, Socket), 153 Kernel ! {accept,self(),DistCtrl,inet,tcp}, 154 receive 155 {Kernel, controller, Pid} -> 156 call_ctrlr(DistCtrl, {supervisor, Pid}), 157 Pid ! {self(), controller}; 158 {Kernel, unsupported_protocol} -> 159 exit(unsupported_protocol) 160 end, 161 accept_loop(Kernel, Listen); 162 Error -> 163 exit(Error) 164 end. 165 166flush_controller(Pid, Socket) -> 167 receive 168 {tcp, Socket, Data} -> 169 Pid ! {tcp, Socket, Data}, 170 flush_controller(Pid, Socket); 171 {tcp_closed, Socket} -> 172 Pid ! {tcp_closed, Socket}, 173 flush_controller(Pid, Socket) 174 after 0 -> 175 ok 176 end. 177 178%% ------------------------------------------------------------ 179%% Accepts a new connection attempt from another Erlang node. 180%% Performs the handshake with the other side. 181%% ------------------------------------------------------------ 182 183accept_connection(AcceptPid, DistCtrl, MyNode, Allowed, SetupTime) -> 184 spawn_opt(?MODULE, do_accept, 185 [self(), AcceptPid, DistCtrl, MyNode, Allowed, SetupTime], 186 [link, {priority, max}]). 187 188do_accept(Kernel, AcceptPid, DistCtrl, MyNode, Allowed, SetupTime) -> 189 ?trace("~p~n",[{?MODULE, do_accept, self(), MyNode}]), 190 receive 191 {AcceptPid, controller} -> 192 Timer = dist_util:start_timer(SetupTime), 193 case check_ip(DistCtrl) of 194 true -> 195 HSData0 = hs_data_common(DistCtrl), 196 HSData = HSData0#hs_data{kernel_pid = Kernel, 197 this_node = MyNode, 198 socket = DistCtrl, 199 timer = Timer, 200 this_flags = 0, 201 allowed = Allowed}, 202 dist_util:handshake_other_started(HSData); 203 {false,IP} -> 204 error_msg("** Connection attempt from " 205 "disallowed IP ~w ** ~n", [IP]), 206 ?shutdown(no_node) 207 end 208 end. 209 210%% we may not always want the nodelay behaviour 211%% for performance reasons 212 213nodelay() -> 214 case application:get_env(kernel, dist_nodelay) of 215 undefined -> 216 {nodelay, true}; 217 {ok, true} -> 218 {nodelay, true}; 219 {ok, false} -> 220 {nodelay, false}; 221 _ -> 222 {nodelay, true} 223 end. 224 225%% ------------------------------------------------------------ 226%% Setup a new connection to another Erlang node. 227%% Performs the handshake with the other side. 228%% ------------------------------------------------------------ 229 230setup(Node, Type, MyNode, LongOrShortNames,SetupTime) -> 231 spawn_opt(?MODULE, do_setup, 232 [self(), Node, Type, MyNode, LongOrShortNames, SetupTime], 233 [link, {priority, max}]). 234 235do_setup(Kernel, Node, Type, MyNode, LongOrShortNames, SetupTime) -> 236 ?trace("~p~n",[{?MODULE, do_setup, self(), Node}]), 237 [Name, Address] = splitnode(Node, LongOrShortNames), 238 case inet:getaddr(Address, inet) of 239 {ok, Ip} -> 240 Timer = dist_util:start_timer(SetupTime), 241 ErlEpmd = net_kernel:epmd_module(), 242 case ErlEpmd:port_please(Name, Ip) of 243 {port, TcpPort, Version} -> 244 ?trace("port_please(~p) -> version ~p~n", 245 [Node,Version]), 246 dist_util:reset_timer(Timer), 247 case 248 gen_tcp:connect( 249 Ip, TcpPort, 250 connect_options([binary, {active, false}, {packet, 2}])) 251 of 252 {ok, Socket} -> 253 DistCtrl = spawn_dist_cntrlr(Socket), 254 call_ctrlr(DistCtrl, {supervisor, self()}), 255 flush_controller(DistCtrl, Socket), 256 gen_tcp:controlling_process(Socket, DistCtrl), 257 flush_controller(DistCtrl, Socket), 258 HSData0 = hs_data_common(DistCtrl), 259 HSData = HSData0#hs_data{kernel_pid = Kernel, 260 other_node = Node, 261 this_node = MyNode, 262 socket = DistCtrl, 263 timer = Timer, 264 this_flags = 0, 265 other_version = Version, 266 request_type = Type}, 267 dist_util:handshake_we_started(HSData); 268 _ -> 269 %% Other Node may have closed since 270 %% port_please ! 271 ?trace("other node (~p) " 272 "closed since port_please.~n", 273 [Node]), 274 ?shutdown(Node) 275 end; 276 _ -> 277 ?trace("port_please (~p) " 278 "failed.~n", [Node]), 279 ?shutdown(Node) 280 end; 281 _Other -> 282 ?trace("inet_getaddr(~p) " 283 "failed (~p).~n", [Node,_Other]), 284 ?shutdown(Node) 285 end. 286 287connect_options(Opts) -> 288 case application:get_env(kernel, inet_dist_connect_options) of 289 {ok,ConnectOpts} -> 290 ConnectOpts ++ Opts; 291 _ -> 292 Opts 293 end. 294 295%% 296%% Close a socket. 297%% 298close(Listen) -> 299 gen_tcp:close(Listen). 300 301 302%% If Node is illegal terminate the connection setup!! 303splitnode(Node, LongOrShortNames) -> 304 case split_node(atom_to_list(Node), $@, []) of 305 [Name|Tail] when Tail =/= [] -> 306 Host = lists:append(Tail), 307 case split_node(Host, $., []) of 308 [_] when LongOrShortNames =:= longnames -> 309 case inet:parse_address(Host) of 310 {ok, _} -> 311 [Name, Host]; 312 _ -> 313 error_msg("** System running to use " 314 "fully qualified " 315 "hostnames **~n" 316 "** Hostname ~ts is illegal **~n", 317 [Host]), 318 ?shutdown(Node) 319 end; 320 L when length(L) > 1, LongOrShortNames =:= shortnames -> 321 error_msg("** System NOT running to use fully qualified " 322 "hostnames **~n" 323 "** Hostname ~ts is illegal **~n", 324 [Host]), 325 ?shutdown(Node); 326 _ -> 327 [Name, Host] 328 end; 329 [_] -> 330 error_msg("** Nodename ~p illegal, no '@' character **~n", 331 [Node]), 332 ?shutdown(Node); 333 _ -> 334 error_msg("** Nodename ~p illegal **~n", [Node]), 335 ?shutdown(Node) 336 end. 337 338split_node([Chr|T], Chr, Ack) -> [lists:reverse(Ack)|split_node(T, Chr, [])]; 339split_node([H|T], Chr, Ack) -> split_node(T, Chr, [H|Ack]); 340split_node([], _, Ack) -> [lists:reverse(Ack)]. 341 342%% ------------------------------------------------------------ 343%% Fetch local information about a Socket. 344%% ------------------------------------------------------------ 345get_tcp_address(Socket) -> 346 {ok, Address} = inet:sockname(Socket), 347 {ok, Host} = inet:gethostname(), 348 #net_address { 349 address = Address, 350 host = Host, 351 protocol = tcp, 352 family = inet 353 }. 354 355%% ------------------------------------------------------------ 356%% Do only accept new connection attempts from nodes at our 357%% own LAN, if the check_ip environment parameter is true. 358%% ------------------------------------------------------------ 359check_ip(DistCtrl) -> 360 case application:get_env(check_ip) of 361 {ok, true} -> 362 case get_ifs(DistCtrl) of 363 {ok, IFs, IP} -> 364 check_ip(IFs, IP); 365 _ -> 366 ?shutdown(no_node) 367 end; 368 _ -> 369 true 370 end. 371 372get_ifs(DistCtrl) -> 373 Socket = call_ctrlr(DistCtrl, socket), 374 case inet:peername(Socket) of 375 {ok, {IP, _}} -> 376 case inet:getif(Socket) of 377 {ok, IFs} -> {ok, IFs, IP}; 378 Error -> Error 379 end; 380 Error -> 381 Error 382 end. 383 384check_ip([{OwnIP, _, Netmask}|IFs], PeerIP) -> 385 case {inet_tcp:mask(Netmask, PeerIP), inet_tcp:mask(Netmask, OwnIP)} of 386 {M, M} -> true; 387 _ -> check_ip(IFs, PeerIP) 388 end; 389check_ip([], PeerIP) -> 390 {false, PeerIP}. 391 392is_node_name(Node) when is_atom(Node) -> 393 case split_node(atom_to_list(Node), $@, []) of 394 [_, _Host] -> true; 395 _ -> false 396 end; 397is_node_name(_Node) -> 398 false. 399 400hs_data_common(DistCtrl) -> 401 TickHandler = call_ctrlr(DistCtrl, tick_handler), 402 Socket = call_ctrlr(DistCtrl, socket), 403 RejectFlags = case init:get_argument(gen_tcp_dist_reject_flags) of 404 {ok,[[Flags]]} -> list_to_integer(Flags); 405 _ -> #hs_data{}#hs_data.reject_flags 406 end, 407 #hs_data{f_send = send_fun(), 408 f_recv = recv_fun(), 409 f_setopts_pre_nodeup = setopts_pre_nodeup_fun(), 410 f_setopts_post_nodeup = setopts_post_nodeup_fun(), 411 f_getll = getll_fun(), 412 f_handshake_complete = handshake_complete_fun(), 413 f_address = address_fun(), 414 mf_setopts = setopts_fun(DistCtrl, Socket), 415 mf_getopts = getopts_fun(DistCtrl, Socket), 416 mf_getstat = getstat_fun(DistCtrl, Socket), 417 mf_tick = tick_fun(DistCtrl, TickHandler), 418 reject_flags = RejectFlags}. 419 420%%% ------------------------------------------------------------ 421%%% Distribution controller processes 422%%% ------------------------------------------------------------ 423 424%% 425%% There will be five parties working together when the 426%% connection is up: 427%% - The gen_tcp socket. Providing a tcp/ip connection 428%% to the other node. 429%% - The output handler. It will dispatch all outgoing 430%% traffic from the VM to the gen_tcp socket. This 431%% process is registered as distribution controller 432%% for this channel with the VM. 433%% - The input handler. It will dispatch all incoming 434%% traffic from the gen_tcp socket to the VM. This 435%% process is also the socket owner and receives 436%% incoming traffic using active-N. 437%% - The tick handler. Dispatches asynchronous tick 438%% requests to the socket. It executes on max priority 439%% since it is important to get ticks through to the 440%% other end. 441%% - The channel supervisor (provided by dist_util). It 442%% monitors traffic. Issue tick requests to the tick 443%% handler when no outgoing traffic is seen and bring 444%% the connection down if no incoming traffic is seen. 445%% This process also executes on max priority. 446%% 447%% These parties are linked togheter so should one 448%% of them fail, all of them are terminated and the 449%% connection is taken down. 450%% 451 452%% In order to avoid issues with lingering signal binaries 453%% we enable off-heap message queue data as well as fullsweep 454%% after 0. The fullsweeps will be cheap since we have more 455%% or less no live data. 456-define(DIST_CNTRL_COMMON_SPAWN_OPTS, 457 [{message_queue_data, off_heap}, 458 {fullsweep_after, 0}]). 459 460tick_fun(DistCtrl, TickHandler) -> 461 fun (Ctrl) when Ctrl == DistCtrl -> 462 TickHandler ! tick 463 end. 464 465getstat_fun(DistCtrl, Socket) -> 466 fun (Ctrl) when Ctrl == DistCtrl -> 467 case inet:getstat(Socket, [recv_cnt, send_cnt, send_pend]) of 468 {ok, Stat} -> 469 split_stat(Stat,0,0,0); 470 Error -> 471 Error 472 end 473 end. 474 475split_stat([{recv_cnt, R}|Stat], _, W, P) -> 476 split_stat(Stat, R, W, P); 477split_stat([{send_cnt, W}|Stat], R, _, P) -> 478 split_stat(Stat, R, W, P); 479split_stat([{send_pend, P}|Stat], R, W, _) -> 480 split_stat(Stat, R, W, P); 481split_stat([], R, W, P) -> 482 {ok, R, W, P}. 483 484setopts_fun(DistCtrl, Socket) -> 485 fun (Ctrl, Opts) when Ctrl == DistCtrl -> 486 setopts(Socket, Opts) 487 end. 488 489getopts_fun(DistCtrl, Socket) -> 490 fun (Ctrl, Opts) when Ctrl == DistCtrl -> 491 getopts(Socket, Opts) 492 end. 493 494setopts(S, Opts) -> 495 case [Opt || {K,_}=Opt <- Opts, 496 K =:= active orelse K =:= deliver orelse K =:= packet] of 497 [] -> inet:setopts(S,Opts); 498 Opts1 -> {error, {badopts,Opts1}} 499 end. 500 501getopts(S, Opts) -> 502 inet:getopts(S, Opts). 503 504send_fun() -> 505 fun (Ctrlr, Packet) -> 506 call_ctrlr(Ctrlr, {send, Packet}) 507 end. 508 509recv_fun() -> 510 fun (Ctrlr, Length, Timeout) -> 511 case call_ctrlr(Ctrlr, {recv, Length, Timeout}) of 512 {ok, Bin} when is_binary(Bin) -> 513 {ok, binary_to_list(Bin)}; 514 Other -> 515 Other 516 end 517 end. 518 519getll_fun() -> 520 fun (Ctrlr) -> 521 call_ctrlr(Ctrlr, getll) 522 end. 523 524address_fun() -> 525 fun (Ctrlr, Node) -> 526 case call_ctrlr(Ctrlr, {address, Node}) of 527 {error, no_node} -> %% No '@' or more than one '@' in node name. 528 ?shutdown(no_node); 529 Res -> 530 Res 531 end 532 end. 533 534setopts_pre_nodeup_fun() -> 535 fun (Ctrlr) -> 536 call_ctrlr(Ctrlr, pre_nodeup) 537 end. 538 539setopts_post_nodeup_fun() -> 540 fun (Ctrlr) -> 541 call_ctrlr(Ctrlr, post_nodeup) 542 end. 543 544handshake_complete_fun() -> 545 fun (Ctrlr, Node, DHandle) -> 546 call_ctrlr(Ctrlr, {handshake_complete, Node, DHandle}) 547 end. 548 549call_ctrlr(Ctrlr, Msg) -> 550 Ref = erlang:monitor(process, Ctrlr), 551 Ctrlr ! {Ref, self(), Msg}, 552 receive 553 {Ref, Res} -> 554 erlang:demonitor(Ref, [flush]), 555 Res; 556 {'DOWN', Ref, process, Ctrlr, Reason} -> 557 exit({dist_controller_exit, Reason}) 558 end. 559 560%% 561%% The tick handler process writes a tick to the 562%% socket when it receives a 'tick' message from 563%% the connection supervisor. 564%% 565%% We are not allowed to block the connection 566%% superviser when writing a tick and we also want 567%% the tick to go through even during a heavily 568%% loaded system. gen_tcp does not have a 569%% non-blocking send operation exposed in its API 570%% and we don't want to run the distribution 571%% controller under high priority. Therefore this 572%% sparate process with max prio that dispatches 573%% ticks. 574%% 575dist_cntrlr_tick_handler(Socket) -> 576 receive 577 tick -> 578 %% May block due to busy port... 579 sock_send(Socket, ""); 580 _ -> 581 ok 582 end, 583 dist_cntrlr_tick_handler(Socket). 584 585spawn_dist_cntrlr(Socket) -> 586 spawn_opt(?MODULE, dist_cntrlr_setup, [Socket], 587 [{priority, max}] ++ ?DIST_CNTRL_COMMON_SPAWN_OPTS). 588 589dist_cntrlr_setup(Socket) -> 590 TickHandler = spawn_opt(?MODULE, dist_cntrlr_tick_handler, 591 [Socket], 592 [link, {priority, max}] 593 ++ ?DIST_CNTRL_COMMON_SPAWN_OPTS), 594 dist_cntrlr_setup_loop(Socket, TickHandler, undefined). 595 596%% 597%% During the handshake phase we loop in dist_cntrlr_setup(). 598%% When the connection is up we spawn an input handler and 599%% continue as output handler. 600%% 601dist_cntrlr_setup_loop(Socket, TickHandler, Sup) -> 602 receive 603 {tcp_closed, Socket} -> 604 exit(connection_closed); 605 606 {Ref, From, {supervisor, Pid}} -> 607 Res = link(Pid), 608 From ! {Ref, Res}, 609 dist_cntrlr_setup_loop(Socket, TickHandler, Pid); 610 611 {Ref, From, tick_handler} -> 612 From ! {Ref, TickHandler}, 613 dist_cntrlr_setup_loop(Socket, TickHandler, Sup); 614 615 {Ref, From, socket} -> 616 From ! {Ref, Socket}, 617 dist_cntrlr_setup_loop(Socket, TickHandler, Sup); 618 619 {Ref, From, {send, Packet}} -> 620 Res = gen_tcp:send(Socket, Packet), 621 From ! {Ref, Res}, 622 dist_cntrlr_setup_loop(Socket, TickHandler, Sup); 623 624 {Ref, From, {recv, Length, Timeout}} -> 625 Res = gen_tcp:recv(Socket, Length, Timeout), 626 From ! {Ref, Res}, 627 dist_cntrlr_setup_loop(Socket, TickHandler, Sup); 628 629 {Ref, From, getll} -> 630 From ! {Ref, {ok, self()}}, 631 dist_cntrlr_setup_loop(Socket, TickHandler, Sup); 632 633 {Ref, From, {address, Node}} -> 634 Res = case inet:peername(Socket) of 635 {ok, Address} -> 636 case split_node(atom_to_list(Node), $@, []) of 637 [_,Host] -> 638 #net_address{address=Address,host=Host, 639 protocol=tcp, family=inet}; 640 _ -> 641 {error, no_node} 642 end 643 end, 644 From ! {Ref, Res}, 645 dist_cntrlr_setup_loop(Socket, TickHandler, Sup); 646 647 {Ref, From, pre_nodeup} -> 648 Res = inet:setopts(Socket, 649 [{active, false}, 650 {packet, 4}, 651 nodelay()]), 652 From ! {Ref, Res}, 653 dist_cntrlr_setup_loop(Socket, TickHandler, Sup); 654 655 {Ref, From, post_nodeup} -> 656 Res = inet:setopts(Socket, 657 [{active, false}, 658 {packet, 4}, 659 nodelay()]), 660 From ! {Ref, Res}, 661 dist_cntrlr_setup_loop(Socket, TickHandler, Sup); 662 663 {Ref, From, {handshake_complete, _Node, DHandle}} -> 664 From ! {Ref, ok}, 665 %% Handshake complete! Begin dispatching traffic... 666 667 %% We use separate process for dispatching input. This 668 %% is not necessary, but it enables parallel execution 669 %% of independent work loads at the same time as it 670 %% simplifies the the implementation... 671 InputHandler = spawn_opt(?MODULE, dist_cntrlr_input_setup, 672 [DHandle, Socket, Sup], 673 [link] ++ ?DIST_CNTRL_COMMON_SPAWN_OPTS), 674 675 flush_controller(InputHandler, Socket), 676 gen_tcp:controlling_process(Socket, InputHandler), 677 flush_controller(InputHandler, Socket), 678 679 ok = erlang:dist_ctrl_input_handler(DHandle, InputHandler), 680 681 InputHandler ! DHandle, 682 683 %% From now on we execute on normal priority 684 process_flag(priority, normal), 685 erlang:dist_ctrl_get_data_notification(DHandle), 686 case init:get_argument(gen_tcp_dist_output_loop) of 687 error -> 688 dist_cntrlr_output_loop(DHandle, Socket); 689 {ok, [[ModStr, FuncStr]]} -> % For testing... 690 apply(list_to_atom(ModStr), 691 list_to_atom(FuncStr), 692 [DHandle, Socket]) 693 end 694 end. 695 696%% We use active 10 for good throughput while still 697%% maintaining back-pressure if the input controller 698%% isn't able to handle all incoming messages... 699-define(ACTIVE_INPUT, 10). 700 701dist_cntrlr_input_setup(DHandle, Socket, Sup) -> 702 link(Sup), 703 %% Ensure we don't try to put data before registerd 704 %% as input handler... 705 receive 706 DHandle -> 707 dist_cntrlr_input_loop(DHandle, Socket, 0) 708 end. 709 710dist_cntrlr_input_loop(DHandle, Socket, N) when N =< ?ACTIVE_INPUT/2 -> 711 inet:setopts(Socket, [{active, ?ACTIVE_INPUT - N}]), 712 dist_cntrlr_input_loop(DHandle, Socket, ?ACTIVE_INPUT); 713dist_cntrlr_input_loop(DHandle, Socket, N) -> 714 receive 715 {tcp_closed, Socket} -> 716 %% Connection to remote node terminated... 717 exit(connection_closed); 718 719 {tcp, Socket, Data} -> 720 %% Incoming data from remote node... 721 try erlang:dist_ctrl_put_data(DHandle, Data) 722 catch _ : _ -> death_row() 723 end, 724 dist_cntrlr_input_loop(DHandle, Socket, N-1); 725 726 _ -> 727 %% Ignore... 728 dist_cntrlr_input_loop(DHandle, Socket, N) 729 end. 730 731dist_cntrlr_send_data(DHandle, Socket) -> 732 case erlang:dist_ctrl_get_data(DHandle) of 733 none -> 734 erlang:dist_ctrl_get_data_notification(DHandle); 735 Data -> 736 sock_send(Socket, Data), 737 dist_cntrlr_send_data(DHandle, Socket) 738 end. 739 740 741dist_cntrlr_output_loop(DHandle, Socket) -> 742 receive 743 dist_data -> 744 %% Outgoing data from this node... 745 try dist_cntrlr_send_data(DHandle, Socket) 746 catch _ : _ -> death_row() 747 end, 748 dist_cntrlr_output_loop(DHandle, Socket); 749 750 {send, From, Ref, Data} -> 751 %% This is for testing only! 752 %% 753 %% Needed by some OTP distribution 754 %% test suites... 755 sock_send(Socket, Data), 756 From ! {Ref, ok}, 757 dist_cntrlr_output_loop(DHandle, Socket); 758 759 _ -> 760 %% Drop garbage message... 761 dist_cntrlr_output_loop(DHandle, Socket) 762 763 end. 764 765sock_send(Socket, Data) -> 766 try gen_tcp:send(Socket, Data) of 767 ok -> ok; 768 {error, Reason} -> death_row({send_error, Reason}) 769 catch 770 Type : Reason -> death_row({send_error, {Type, Reason}}) 771 end. 772 773death_row() -> 774 death_row(connection_closed). 775 776death_row(normal) -> 777 %% We do not want to exit with normal 778 %% exit reason since it wont bring down 779 %% linked processes... 780 death_row(); 781death_row(Reason) -> 782 %% When the connection is on its way down operations 783 %% begin to fail. We catch the failures and call 784 %% this function waiting for termination. We should 785 %% be terminated by one of our links to the other 786 %% involved parties that began bringing the 787 %% connection down. By waiting for termination we 788 %% avoid altering the exit reason for the connection 789 %% teardown. We however limit the wait to 5 seconds 790 %% and bring down the connection ourselves if not 791 %% terminated... 792 receive after 5000 -> exit(Reason) end. 793