1%% 2%% %CopyrightBegin% 3%% 4%% Copyright Ericsson AB 2021-2021. All Rights Reserved. 5%% 6%% Licensed under the Apache License, Version 2.0 (the "License"); 7%% you may not use this file except in compliance with the License. 8%% You may obtain a copy of the License at 9%% 10%% http://www.apache.org/licenses/LICENSE-2.0 11%% 12%% Unless required by applicable law or agreed to in writing, software 13%% distributed under the License is distributed on an "AS IS" BASIS, 14%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15%% See the License for the specific language governing permissions and 16%% limitations under the License. 17%% 18%% %CopyrightEnd% 19%% 20 21%% ======================================================================= 22%% This is just a simple utility that sets op several socket connections 23%% and monitors. Data is exchanged. 24%% This is done to make it possible to see that the socket sections of 25%% observer works. 26%% ======================================================================= 27 28-module(observer_socket_test). 29 30-export([start/0]). 31 32-define(NUM_ACCEPTORS, 5). 33 34-define(MTAG, 42). 35-define(PRINT, 01). 36-define(REQUEST, 02). 37-define(REPLY, 03). 38 39-define(SOCKET_MONITOR, whereis(socket_monitor)). 40 41-define(MSG_DATA1, <<"This is test data 0123456789 0123456789 0123456789">>). 42-define(MSG_DATA2, <<"This is test data 0123456789 0123456789 0123456789" 43 "This is test data 0123456789 0123456789 0123456789" 44 "This is test data 0123456789 0123456789 0123456789" 45 "This is test data 0123456789 0123456789 0123456789" 46 "This is test data 0123456789 0123456789 0123456789" 47 "This is test data 0123456789 0123456789 0123456789" 48 "This is test data 0123456789 0123456789 0123456789" 49 "This is test data 0123456789 0123456789 0123456789" 50 "This is test data 0123456789 0123456789 0123456789" 51 "This is test data 0123456789 0123456789 0123456789">>). 52-define(MSG_DATA3, <<"This is test data 0123456789 0123456789 0123456789" 53 "This is test data 0123456789 0123456789 0123456789" 54 "This is test data 0123456789 0123456789 0123456789" 55 "This is test data 0123456789 0123456789 0123456789" 56 "This is test data 0123456789 0123456789 0123456789" 57 "This is test data 0123456789 0123456789 0123456789" 58 "This is test data 0123456789 0123456789 0123456789" 59 "This is test data 0123456789 0123456789 0123456789" 60 "This is test data 0123456789 0123456789 0123456789" 61 "This is test data 0123456789 0123456789 0123456789" 62 "This is test data 0123456789 0123456789 0123456789" 63 "This is test data 0123456789 0123456789 0123456789" 64 "This is test data 0123456789 0123456789 0123456789" 65 "This is test data 0123456789 0123456789 0123456789" 66 "This is test data 0123456789 0123456789 0123456789" 67 "This is test data 0123456789 0123456789 0123456789" 68 "This is test data 0123456789 0123456789 0123456789" 69 "This is test data 0123456789 0123456789 0123456789" 70 "This is test data 0123456789 0123456789 0123456789" 71 "This is test data 0123456789 0123456789 0123456789" 72 "This is test data 0123456789 0123456789 0123456789" 73 "This is test data 0123456789 0123456789 0123456789" 74 "This is test data 0123456789 0123456789 0123456789" 75 "This is test data 0123456789 0123456789 0123456789" 76 "This is test data 0123456789 0123456789 0123456789" 77 "This is test data 0123456789 0123456789 0123456789" 78 "This is test data 0123456789 0123456789 0123456789" 79 "This is test data 0123456789 0123456789 0123456789" 80 "This is test data 0123456789 0123456789 0123456789" 81 "This is test data 0123456789 0123456789 0123456789" 82 "This is test data 0123456789 0123456789 0123456789" 83 "This is test data 0123456789 0123456789 0123456789" 84 "This is test data 0123456789 0123456789 0123456789" 85 "This is test data 0123456789 0123456789 0123456789" 86 "This is test data 0123456789 0123456789 0123456789" 87 "This is test data 0123456789 0123456789 0123456789" 88 "This is test data 0123456789 0123456789 0123456789" 89 "This is test data 0123456789 0123456789 0123456789" 90 "This is test data 0123456789 0123456789 0123456789" 91 "This is test data 0123456789 0123456789 0123456789" 92 "This is test data 0123456789 0123456789 0123456789" 93 "This is test data 0123456789 0123456789 0123456789" 94 "This is test data 0123456789 0123456789 0123456789" 95 "This is test data 0123456789 0123456789 0123456789" 96 "This is test data 0123456789 0123456789 0123456789" 97 "This is test data 0123456789 0123456789 0123456789" 98 "This is test data 0123456789 0123456789 0123456789" 99 "This is test data 0123456789 0123456789 0123456789" 100 "This is test data 0123456789 0123456789 0123456789" 101 "This is test data 0123456789 0123456789 0123456789" 102 "This is test data 0123456789 0123456789 0123456789" 103 "This is test data 0123456789 0123456789 0123456789" 104 "This is test data 0123456789 0123456789 0123456789" 105 "This is test data 0123456789 0123456789 0123456789" 106 "This is test data 0123456789 0123456789 0123456789" 107 "This is test data 0123456789 0123456789 0123456789" 108 "This is test data 0123456789 0123456789 0123456789" 109 "This is test data 0123456789 0123456789 0123456789" 110 "This is test data 0123456789 0123456789 0123456789" 111 "This is test data 0123456789 0123456789 0123456789" 112 "This is test data 0123456789 0123456789 0123456789" 113 "This is test data 0123456789 0123456789 0123456789" 114 "This is test data 0123456789 0123456789 0123456789" 115 "This is test data 0123456789 0123456789 0123456789" 116 "This is test data 0123456789 0123456789 0123456789" 117 "This is test data 0123456789 0123456789 0123456789" 118 "This is test data 0123456789 0123456789 0123456789" 119 "This is test data 0123456789 0123456789 0123456789" 120 "This is test data 0123456789 0123456789 0123456789" 121 "This is test data 0123456789 0123456789 0123456789" 122 "This is test data 0123456789 0123456789 0123456789" 123 "This is test data 0123456789 0123456789 0123456789" 124 "This is test data 0123456789 0123456789 0123456789" 125 "This is test data 0123456789 0123456789 0123456789" 126 "This is test data 0123456789 0123456789 0123456789" 127 "This is test data 0123456789 0123456789 0123456789" 128 "This is test data 0123456789 0123456789 0123456789" 129 "This is test data 0123456789 0123456789 0123456789" 130 "This is test data 0123456789 0123456789 0123456789" 131 "This is test data 0123456789 0123456789 0123456789" 132 "This is test data 0123456789 0123456789 0123456789" 133 "This is test data 0123456789 0123456789 0123456789" 134 "This is test data 0123456789 0123456789 0123456789" 135 "This is test data 0123456789 0123456789 0123456789" 136 "This is test data 0123456789 0123456789 0123456789" 137 "This is test data 0123456789 0123456789 0123456789" 138 "This is test data 0123456789 0123456789 0123456789" 139 "This is test data 0123456789 0123456789 0123456789" 140 "This is test data 0123456789 0123456789 0123456789" 141 "This is test data 0123456789 0123456789 0123456789" 142 "This is test data 0123456789 0123456789 0123456789" 143 "This is test data 0123456789 0123456789 0123456789" 144 "This is test data 0123456789 0123456789 0123456789" 145 "This is test data 0123456789 0123456789 0123456789" 146 "This is test data 0123456789 0123456789 0123456789" 147 "This is test data 0123456789 0123456789 0123456789" 148 "This is test data 0123456789 0123456789 0123456789" 149 "This is test data 0123456789 0123456789 0123456789" 150 "This is test data 0123456789 0123456789 0123456789" 151 "This is test data 0123456789 0123456789 0123456789">>). 152-define(DATA, ?MSG_DATA1). 153 154 155start() -> 156 put(sname, "starter"), 157 %% put(debug, true), 158 i("try start socket-monitor"), 159 SockMon = start_socket_monitor(), 160 i("try start server"), 161 Domain = inet, 162 Type = stream, 163 Proto = tcp, 164 {Server, SockAddr} = start_server(Domain, Type, Proto), 165 i("try start client"), 166 Client = start_client(Domain, Type, Proto, SockAddr), 167 i("done"), 168 {SockMon, Server, Client}. 169 170 171 172%% ======================================================================= 173%% Socket Monitor 174%% ======================================================================= 175 176start_socket_monitor() -> 177 Self = self(), 178 {SockMon, MRef} = spawn_monitor(fun() -> socket_monitor_start(Self) end), 179 receive 180 {'DOWN', MRef, process, SockMon, Reason} -> 181 e("received unexpected down message from socket-monitor:" 182 "~n ~p", [Reason]), 183 exit({socket_monitor, Reason}); 184 {SockMon, ready} -> 185 SockMon 186 end. 187 188socket_monitor_start(Parent) -> 189 put(sname, "socket-monitor"), 190 put(debug, true), 191 erlang:register(socket_monitor, self()), 192 Parent ! {self(), ready}, 193 %% i("started"), 194 socket_monitor_loop(#{}). 195 196 197socket_monitor_loop(State) -> 198 receive 199 {'DOWN', MRef, Kind, Sock, Info} when (Kind =:= socket) orelse 200 (Kind =:= port) -> 201 {{Sock, SockStr, _Pid}, State2} = maps:take(MRef, State), 202 i("received (socket) DOWN message: ~s" 203 "~n ~p", [SockStr, Info]), 204 socket_monitor_loop(State2); 205 206 {monitor, Sock, SockStr, Fun, Pid} -> 207 i("request to monitor socket: ~s", [SockStr]), 208 MRef = Fun(Sock), 209 socket_monitor_loop(State#{MRef => {Sock, SockStr, Pid}}) 210 end. 211 212 213 214%% ======================================================================= 215%% Server 216%% The server(s) processes are implemented on plain socket. 217%% ======================================================================= 218 219start_server(Domain, Type, Proto) -> 220 Self = self(), 221 {Listener, MRef} = 222 spawn_monitor(fun() -> 223 listener_start(Self, Domain, Type, Proto) 224 end), 225 receive 226 {'DOWN', MRef, process, Listener, Reason} -> 227 e("received unexpected down message from listener:" 228 "~n ~p", [Reason]), 229 exit({listener, Reason}); 230 {Listener, ready, SockAddr} -> 231 {Listener, SockAddr} 232 end. 233 234 235listener_start(Parent, Domain, Type, Proto) 236 when (Type =:= stream) andalso (Proto =:= tcp) -> 237 put(sname, "listener"), 238 %% put(debug, true), 239 i("starting"), 240 LSock = listener_create_lsock(Domain, Type, Proto), 241 LSM = socket:monitor(LSock), 242 ?SOCKET_MONITOR ! 243 {monitor, LSock, string_of(socket, LSock), 244 sockmon_fun(socket), self()}, 245 Acceptors = listener_create_acceptors(LSock), 246 {ok, SockAddr} = socket:sockname(LSock), 247 Parent ! {self(), ready, SockAddr}, 248 i("started"), 249 listener_loop(#{lsock => LSock, lmon => LSM, accs => Acceptors}). 250 251listener_loop(#{lsock := Sock} = State) -> 252 receive 253 {'DOWN', _MRef, socket, Sock, Reason} -> 254 e("unexpected (socket) down received: " 255 "~n ~p", [Reason]), 256 listener_exit(socket_down, Reason); 257 258 {'DOWN', _MRef, process, Pid, Reason} -> 259 i("unexpected (process ~p) down received: " 260 "~n ~p", [Pid, Reason]), 261 listener_loop(listener_handle_down(State, Pid, Reason)) 262 end. 263 264listener_handle_down(#{lsock := LSock, accs := Acceptors0} = State, 265 Pid, Reason) -> 266 case maps:remove(Pid, Acceptors0) of 267 Acceptors0 -> 268 i("unexpected down from unknown process ~p received: " 269 "~n ~p", [Pid, Reason]), 270 State; 271 _Acceptors1 -> 272 %% We could create a new acceptor here, but we need to make 273 %% sure we do not end up in a create-die loop, easier to just 274 %% assume *they should never die*... 275 e("unexpected down from acceptor process ~p received: " 276 "~n ~p", [Pid, Reason]), 277 (catch socket:close(LSock)), 278 listener_exit(acceptor_down, Reason) 279 end. 280 281listener_create_lsock(Domain, Type, Proto) -> 282 i("try extract local address"), 283 Addr = case which_local_addr(Domain) of 284 {ok, A} -> 285 A; 286 {error, Reason} -> 287 listener_exit(failed_local_addr, Reason) 288 end, 289 i("try create socket"), 290 LSock = case socket:open(Domain, Type, Proto) of 291 {ok, S} -> 292 S; 293 {error, Reason1} -> 294 listener_exit(failed_socket_open, Reason1) 295 end, 296 i("try bind socket"), 297 case socket:bind(LSock, #{family => Domain, 298 addr => Addr, 299 port => 0}) of 300 ok -> 301 ok; 302 {error, Reason2} -> 303 listener_exit(failed_socket_bind, Reason2) 304 end, 305 i("try make listen socket"), 306 case socket:listen(LSock, 10) of 307 ok -> 308 ok; 309 {error, Reason3} -> 310 listener_exit(failed_socket_listen, Reason3) 311 end, 312 i("listen socket created"), 313 LSock. 314 315 316listener_create_acceptors(LSock) -> 317 listener_create_acceptors(LSock, 1, []). 318 319listener_create_acceptors(_LSock, ID, Acc) when (ID > ?NUM_ACCEPTORS) -> 320 maps:from_list(Acc); 321listener_create_acceptors(LSock, ID, Acc) -> 322 i("try create acceptor ~w", [ID ]), 323 Acceptor = listener_create_acceptor(LSock, ID), 324 listener_create_acceptors(LSock, ID+1, [Acceptor|Acc]). 325 326listener_create_acceptor(LSock, ID) -> 327 Self = self(), 328 Acceptor = {Pid, MRef} = 329 spawn_monitor(fun() -> acceptor_start(Self, LSock, ID) end), 330 receive 331 {'$socket', LSock, abort, Info} -> 332 e("received unexpected select abort: " 333 "~n ~p", [Info]), 334 listener_exit(abort, Info); 335 336 {'DOWN', MRef, process, Pid, Reason} -> 337 e("received unexpected acceptor ~w down: " 338 "~n ~p", [ID, Reason]), 339 listener_exit(acceptor_start, Reason); 340 341 {Pid, ready} -> 342 i("received expected acceptor ~w ready", [ID]), 343 Acceptor 344 end. 345 346listener_exit(Tag, Reason) -> 347 exit({listener, Tag, Reason}). 348 349 350%% --- 351 352acceptor_start(Listener, LSock, ID) -> 353 put(sname, f("acceptor[~w]", [ID])), 354 %% put(debug, true), 355 MRef = erlang:monitor(process, Listener), 356 Listener ! {self(), ready}, 357 i("started"), 358 acceptor_loop(#{id => ID, 359 hid => 1, 360 listener => {Listener, MRef}, 361 lsock => LSock, 362 select => undefined, 363 handlers => []}). 364 365acceptor_loop(#{select := undefined} = State) -> 366 acceptor_loop(acceptor_try_accept(State)); 367 368acceptor_loop(State) -> 369 acceptor_loop(acceptor_try_select(State)). 370 371acceptor_try_accept(#{lsock := LSock} = State) -> 372 i("try accept (nowait)"), 373 case socket:accept(LSock, nowait) of 374 {ok, ASock} -> 375 %% i("accepted - spawn handler"), 376 ?SOCKET_MONITOR ! 377 {monitor, ASock, string_of(socket, ASock), 378 sockmon_fun(socket), self()}, 379 acceptor_create_handler(State, ASock); 380 {select, SelectInfo} -> 381 %% i("selected:" 382 %% "~n ~p", [SelectInfo]), 383 State#{select => SelectInfo}; 384 {error, Reason} -> 385 e("accept failed: " 386 "~n ~p", [Reason]), 387 acceptor_exit(State, accept, Reason) 388 end. 389 390 391acceptor_try_select( 392 #{lsock := LSock, 393 listener := {Listener, MRef}, 394 select := {select_info, _, Info} = SelectInfo} = State) -> 395 i("await select message"), 396 receive 397 {'$socket', LSock, abort, Info} -> 398 e("received unexpected select abort: " 399 "~n ~p", [Info]), 400 acceptor_exit(State, abort, Info); 401 402 {'$socket', LSock, select, Info} -> 403 i("received select message: " 404 "~n ~p", [Info]), 405 case socket:accept(LSock) of 406 {ok, ASock} -> 407 %% i("accepted - spawn handler"), 408 ?SOCKET_MONITOR ! 409 {monitor, ASock, string_of(socket, ASock), 410 sockmon_fun(socket), self()}, 411 acceptor_create_handler(State#{select => undefined}, 412 ASock); 413 {error, Reason} -> 414 e("accept failed: " 415 "~n ~p", [Reason]), 416 %% This is a bit overkill, but just to be on the safe side 417 (catch socket:cancel(LSock, SelectInfo)), 418 acceptor_exit(State, post_select_accept, Reason) 419 end; 420 421 {'DOWN', MRef, process, Listener, Reason} -> 422 e("listener down received: " 423 "~n ~p", [Reason]), 424 (catch socket:cancel(LSock, SelectInfo)), 425 acceptor_exit(State, listener, Reason); 426 427 {'DOWN', _MRef, process, _Pid, {{handler,_,_HID},recv,normal}} -> 428 %% i("received normal exit from handler ~w (~p)", [_HID, _Pid]), 429 acceptor_try_select(State); 430 431 Any -> 432 i("received unexpected message: " 433 "~n ~p" 434 "~nwhen" 435 "~n LSock: ~p" 436 "~n Info: ~p", [Any, LSock, Info]), 437 acceptor_try_select(State) 438 439 end. 440 441 442acceptor_create_handler(#{listener := {Listener, MRef}, 443 hid := HID} = State, ASock) -> 444 Self = self(), 445 _Handler = {HPid, HMRef} = 446 spawn_monitor(fun() -> handler_start(Self, HID) end), 447 receive 448 {'$socket', ASock, abort, Info} -> 449 e("received unexpected select abort: " 450 "~n ~p", [Info]), 451 acceptor_exit(State, abort, Info); 452 453 {'DOWN', MRef, process, Listener, Reason} -> 454 e("listener down received: " 455 "~n ~p", [Reason]), 456 exit(HPid, kill), 457 acceptor_exit(State, listener, Reason); 458 459 {'DOWN', HMRef, process, HPid, Reason} -> 460 e("new handler (~p) down received: " 461 "~n ~p", [HPid, Reason]), 462 acceptor_exit(State, handler, Reason); 463 464 {HPid, ready} -> 465 case socket:setopt(ASock, otp, controlling_process, HPid) of 466 ok -> 467 HPid ! {continue, ASock}, 468 State#{hid => HID+1}; 469 {error, Reason} -> 470 e("failed changing controlling process: " 471 "~n ~p", [Reason]), 472 (catch socket:close(ASock)), 473 exit(HPid, kill), 474 acceptor_exit(State, failed_changing_ctrl_proc, Reason) 475 end 476 end. 477 478 479acceptor_exit(#{id := ID}, Tag, Reason) -> 480 exit({{acceptor, ID}, Tag, Reason}). 481 482 483%% --- 484 485handler_start(Parent, ID) -> 486 put(sname, f("handler[~w]", [ID])), 487 %% put(debug, true), 488 i("starting"), 489 MRef = erlang:monitor(process, Parent), 490 Parent ! {self(), ready}, 491 receive 492 {'DOWN', MRef, process, Parent, Reason} -> 493 e("parent (~p) down received: " 494 "~n ~p", [Parent, Reason]), 495 handler_exit(#{id => undefined, parent => {Parent, undefined}}, 496 parent, Reason); 497 498 {continue, Sock} -> 499 i("started"), 500 handler_loop(#{id => ID, 501 parent => {Parent, MRef}, 502 sock => Sock, 503 select => undefined, 504 buf => <<>>}) 505 end. 506 507 508handler_loop(#{select := undefined} = State) -> 509 handler_loop(handler_try_read(State)); 510handler_loop(State) -> 511 handler_loop(handler_try_select(State)). 512 513 514handler_try_read(#{sock := Sock} = State) -> 515 case socket:recv(Sock) of 516 {ok, Data} -> 517 handler_process_data(State, Data); 518 {select, SelectInfo} -> %% Will never happen with this, recv/1, call... 519 State#{select => SelectInfo}; 520 {error, closed} -> 521 i("recv got closed => client done"), 522 handler_exit(State, recv, normal); 523 {error, Reason} -> 524 e("recv failed: " 525 "~n ~p", [Reason]), 526 (catch socket:close(Sock)), 527 handler_exit(State, recv, Reason) 528 end. 529 530handler_try_select( 531 #{sock := Sock, 532 parent := {Parent, MRef}, 533 select := {select_info, _, Info} = SelectInfo} = State) -> 534 receive 535 {'$socket', Sock, abort, Info} -> 536 e("received unexpected select abort: " 537 "~n ~p", [Info]), 538 handler_exit(State, abort, Info); 539 540 {'$socket', Sock, select, Info} -> 541 case socket:recv(Sock) of 542 {ok, Data} -> 543 handler_process_data(State#{select => undefined}, Data); 544 {error, Reason} -> 545 i("recv failed: " 546 "~n ~p", [Reason]), 547 (catch socket:close(Sock)), 548 handler_exit(State, recv, Reason) 549 end; 550 {'DOWN', MRef, process, Parent, Reason} -> 551 e("parent down received: " 552 "~n ~p", [Reason]), 553 (catch socket:cancel(Sock, SelectInfo)), 554 (catch socket:close(Sock)), 555 handler_exit(State, parent, Reason) 556 end. 557 558handler_process_data(#{buf := <<>>} = State, NewData) -> 559 handler_process_data(State#{buf => NewData}); 560handler_process_data(State, <<>>) -> 561 handler_process_data(State); 562handler_process_data(#{buf := Buf} = State, NewData) -> 563 handler_process_data(State#{buf => <<Buf/binary, NewData/binary>>}). 564 565handler_process_data(#{buf := <<?MTAG:32, 566 ID:32, 567 TYPE:32, 568 SZ:32, 569 Data:SZ/binary, 570 Rest/binary>>} = State) -> 571 handler_process_data(State, ID, TYPE, Data), 572 handler_process_data(State#{buf => Rest}); 573handler_process_data(State) -> 574 State. 575 576 577handler_process_data(_State, ID, ?PRINT, Data) -> 578 i("~w: print" 579 "~n ~p", [ID, erlang:binary_to_list(Data)]); 580handler_process_data(#{sock := Sock} = State, ID, ?REQUEST, Data) -> 581 SZ = size(Data), 582 Reply = <<?MTAG:32, ID:32, ?REPLY:32, SZ:32, Data/binary>>, 583 case socket:send(Sock, Reply) of 584 ok -> 585 ok; 586 {error, Reason} -> 587 e("failed sending reply for request ~w: " 588 "~n ~p", [ID, Reason]), 589 (catch socket:close(Sock)), 590 handler_exit(State, recv, Reason) 591 end. 592 593handler_exit(#{id := ID, parent := {Pid, _}}, Tag, Reason) -> 594 exit({{handler, Pid, ID}, Tag, Reason}). 595 596 597%% ======================================================================= 598%% Client 599%% Of the client(s), one is implemented on gen_tcp with 600%% inet_backend = socket and the other with inet_backend = inet. 601%% The clients run for a period of 30 sec - 2 min, and then die. 602%% Then a new is created! 603%% Each client sends simple requests (with some dummy date), and 604%% also once every 15 seconds an 'print' message (which the server 605%% is to simply print the data (which is supposed to be a string). 606%% ======================================================================= 607 608start_client(Domain, Type, Proto, ServerSockAddr) -> 609 Self = self(), 610 Client = {Pid, MRef} = 611 spawn_monitor(fun() -> 612 client_ctrl_start(Self, 613 Domain, Type, Proto, 614 ServerSockAddr) 615 end), 616 receive 617 {'DOWN', MRef, process, Pid, Reason} -> 618 e("received unexpected down message from client (~p):" 619 "~n ~p", [Pid, Reason]), 620 exit({client, Reason}); 621 622 {Pid, ready} -> 623 i("received expected ready from client"), 624 Client 625 end. 626 627 628client_ctrl_start(Parent, 629 Domain, Type, Proto, 630 ServerSockAddr) -> 631 put(sname, "client-ctrl"), 632 %% put(debug, true), 633 State0 = #{parent => Parent, 634 domain => Domain, 635 type => Type, 636 protocol => Proto, 637 server => ServerSockAddr, 638 cid => 1, 639 clients => #{}}, 640 State1 = start_gen_client(State0, socket), 641 State2 = start_gen_client(State1, inet), 642 State3 = start_esock_client(State2), 643 State4 = start_esock_client(State3), 644 Parent ! {self(), ready}, 645 i("started"), 646 client_ctrl_loop(State4). 647 648 649client_ctrl_loop(State) -> 650 receive 651 {'DOWN', MRef, process, Pid, Reason} -> 652 client_ctrl_loop(client_ctrl_handle_down(State, MRef, Pid, Reason)) 653 end. 654 655client_ctrl_handle_down(#{clients := Clients0} = State, 656 MRef, Pid, Reason) -> 657 case maps:take(Pid, Clients0) of 658 {{ID, MRef, Backend}, Clients1} -> 659 i("received down from (gen) ~w-client ~w (~p): " 660 "~n ~p", [Backend, ID, Pid, Reason]), 661 start_gen_client(State#{clients => Clients1}, Backend); 662 {{ID, MRef}, Clients1} -> 663 i("received down from (esock) ~w-client ~w (~p): " 664 "~n ~p", [ID, Pid, Reason]), 665 start_esock_client(State#{clients => Clients1}); 666 error -> 667 i("received down from unknown process ~p: " 668 "~n ~p", [Pid, Reason]), 669 State 670 end. 671 672client_ctrl_exit(Tag, Reason) -> 673 exit({'client-ctrl', Tag, Reason}). 674 675 676start_gen_client(#{domain := Domain, 677 type := Type, 678 protocol := Proto, 679 server := #{addr := Addr, port := Port}, 680 cid := ID, 681 clients := Clients0} = State, Backend) -> 682 Self = self(), 683 i("try start (gen) client ~w", [ID]), 684 LifeTime = rand:uniform(timer:minutes(3)) + timer:minutes(2), 685 {Pid, MRef} = 686 spawn_monitor(fun() -> 687 gen_client_start(Self, 688 Backend, ID, 689 LifeTime, 690 Domain, Type, Proto, 691 Addr, Port) 692 end), 693 receive 694 {'DOWN', MRef, process, Pid, Reason} -> 695 e("received unexpected down message from client ~w (~p):" 696 "~n ~p", [ID, Pid, Reason]), 697 client_ctrl_exit({client, ID}, Reason); 698 699 {Pid, ready} -> 700 i("received expected ready from client ~w (~p)", [ID, Pid]), 701 Clients1 = Clients0#{Pid => {ID, MRef, Backend}}, 702 State#{cid => ID + 1, clients => Clients1} 703 end. 704 705 706start_esock_client(#{domain := Domain, 707 type := Type, 708 protocol := Proto, 709 server := #{addr := Addr, port := Port}, 710 cid := ID, 711 clients := Clients0} = State) -> 712 Self = self(), 713 i("try start (esock) client ~w", [ID]), 714 LifeTime = rand:uniform(timer:minutes(3)) + timer:minutes(2), 715 {Pid, MRef} = 716 spawn_monitor(fun() -> 717 esock_client_start(Self, 718 ID, 719 LifeTime, 720 Domain, Type, Proto, 721 Addr, Port) 722 end), 723 receive 724 {'DOWN', MRef, process, Pid, Reason} -> 725 e("received unexpected down message from client ~w (~p):" 726 "~n ~p", [ID, Pid, Reason]), 727 client_ctrl_exit({client, ID}, Reason); 728 729 {Pid, ready} -> 730 i("received expected ready from client ~w (~p)", [ID, Pid]), 731 Clients1 = Clients0#{Pid => {ID, MRef}}, 732 State#{cid => ID + 1, clients => Clients1} 733 end. 734 735 736gen_client_start(Parent, Backend, ID, 737 LifeTime, 738 Domain, Type, Proto, 739 ServerAddr, ServerPort) 740 when (Type =:= stream) andalso (Proto =:= tcp) -> 741 put(sname, f("gen-client[~w,~w]", [Backend, ID])), 742 %% put(debug, true), 743 i("starting"), 744 State = gen_client_connect(#{id => ID, 745 backend => Backend, 746 domain => Domain, 747 type => Type, 748 protocol => Proto, 749 server_addr => ServerAddr, 750 server_port => ServerPort}), 751 erlang:send_after(LifeTime, self(), terminate), 752 MRef = erlang:monitor(process, Parent), 753 Parent ! {self(), ready}, 754 i("started"), 755 gen_client_loop(State#{condition => send_request, 756 mid => 1, 757 parent => Parent, 758 mref => MRef, 759 buf => <<>>}). 760 761 762gen_client_connect(#{backend := Backend, 763 type := Type, 764 protocol := Proto, 765 server_addr := ServerAddr, 766 server_port := ServerPort} = State) 767 when (Type =:= stream) andalso (Proto =:= tcp) -> 768 COpts = [{inet_backend, Backend}, {active, true}, binary], 769 i("try connect to ~s:~w", 770 [inet_parse:ntoa(ServerAddr), ServerPort]), 771 case gen_tcp:connect(ServerAddr, ServerPort, COpts) of 772 {ok, Sock} -> 773 %% i("connected"), 774 ?SOCKET_MONITOR ! 775 {monitor, Sock, 776 string_of(inet, Sock), 777 sockmon_fun(inet), self()}, 778 State#{sock => Sock}; 779 {error, Reason} -> 780 e("failed connecting: " 781 "~n ~p", [Reason]), 782 client_exit(State, connect, Reason) 783 end. 784 785 786esock_client_start(Parent, ID, 787 LifeTime, 788 Domain, Type, Proto, 789 ServerAddr, ServerPort) 790 when (Type =:= stream) andalso (Proto =:= tcp) -> 791 put(sname, f("esock-client[~w]", [ID])), 792 %% put(debug, true), 793 i("starting"), 794 State = esock_client_connect(#{id => ID, 795 domain => Domain, 796 type => Type, 797 protocol => Proto, 798 server_addr => ServerAddr, 799 server_port => ServerPort}), 800 erlang:send_after(LifeTime, self(), terminate), 801 MRef = erlang:monitor(process, Parent), 802 Parent ! {self(), ready}, 803 i("started"), 804 esock_client_loop(State#{condition => send_request, 805 select => undefined, 806 mid => 1, 807 parent => Parent, 808 mref => MRef, 809 buf => <<>>}). 810 811 812esock_client_connect(#{domain := Domain, 813 type := Type, 814 protocol := Proto, 815 server_addr := ServerAddr, 816 server_port := ServerPort} = State) 817 when (Type =:= stream) andalso (Proto =:= tcp) -> 818 i("try open socket"), 819 Sock = 820 case socket:open(Domain, Type, Proto) of 821 {ok, S} -> 822 i("opened"), 823 S; 824 {error, Reason1} -> 825 e("failed open: " 826 "~n ~p", [Reason1]), 827 client_exit(State, open, Reason1) 828 end, 829 %% We are on the same machine (as the server), so just reuse that address 830 i("try bind to ~s", [inet_parse:ntoa(ServerAddr)]), 831 case socket:bind(Sock, #{family => Domain, addr => ServerAddr}) of 832 ok -> 833 i("bound"), 834 ok; 835 {error, Reason2} -> 836 e("failed bind: " 837 "~n ~p", [Reason2]), 838 (catch socket:close(Sock)), 839 client_exit(State, bind, Reason2) 840 end, 841 i("try connect to ~s:~w", [inet_parse:ntoa(ServerAddr), ServerPort]), 842 case socket:connect(Sock, #{family => Domain, 843 addr => ServerAddr, 844 port => ServerPort}) of 845 ok -> 846 i("connected"), 847 ?SOCKET_MONITOR ! 848 {monitor, Sock, 849 string_of(socket, Sock), 850 sockmon_fun(socket), self()}, 851 State#{sock => Sock}; 852 {error, Reason3} -> 853 e("failed connecting: " 854 "~n ~p", [Reason3]), 855 (catch socket:close(Sock)), 856 client_exit(State, connect, Reason3) 857 end. 858 859 860gen_client_loop(#{condition := terminate, 861 sock := Sock}) -> 862 (catch gen_tcp:close(Sock)), 863 exit(normal); 864gen_client_loop(#{condition := {await_reply, _MID}, 865 parent := Parent, 866 sock := Sock} = State) -> 867 receive 868 {'DOWN', _MRef, process, Parent, Reason} -> 869 e("unexpected down from parent received: " 870 "~n ~p", [Reason]), 871 client_exit(State, parent_down, Reason); 872 873 {tcp, Sock, Data} -> 874 %% i("received (~w bytes of) data", [size(Data)]), 875 gen_client_loop(client_process_data(State, Data)); 876 877 terminate -> 878 gen_client_loop(State#{condition => terminate}) 879 880 end; 881gen_client_loop(#{condition := send_request, 882 sock := Sock, 883 mid := MID} = State) -> 884 %% i("try send request ~w", [MID]), 885 Data = ?DATA, 886 SZ = size(Data), 887 Req = <<?MTAG:32, MID:32, ?REQUEST:32, SZ:32, Data/binary>>, 888 case gen_tcp:send(Sock, Req) of 889 ok -> 890 gen_client_loop(State#{condition => {await_reply, MID}, 891 mid => MID + 1}); 892 {error, Reason} -> 893 e("failed sending request ~w: " 894 "~n ~p", [MID, Reason]), 895 client_exit(State, send, Reason) 896 end. 897 898 899 900esock_client_loop(#{condition := terminate, 901 sock := Sock}) -> 902 (catch socket:close(Sock)), 903 exit(normal); 904esock_client_loop(#{condition := {await_reply, _MID}, 905 select := undefined, 906 sock := Sock} = State) -> 907 %% i("try (nowait) recv"), 908 case socket:recv(Sock, 0, nowait) of 909 {ok, Data} when is_binary(Data) -> 910 %% i("received (~w bytes of) data", [size(Data)]), 911 esock_client_loop(client_process_data(State, Data)); 912 %% This is the "old" style 913 {ok, {Data, SelectInfo}} when is_binary(Data) -> 914 %% i("partial recv - select"), 915 Buf0 = maps:take(buf, State), 916 Buf2 = <<Buf0/binary, Data/binary>>, 917 esock_client_loop(State#{buf => Buf2, 918 select => SelectInfo}); 919 %% This is the "new" style 920 {select, {Data, SelectInfo}} when is_binary(Data) -> 921 %% i("partial recv - select"), 922 Buf0 = maps:take(buf, State), 923 Buf2 = <<Buf0/binary, Data/binary>>, 924 esock_client_loop(State#{buf => Buf2, 925 select => SelectInfo}); 926 {select, SelectInfo} -> 927 %% i("select"), 928 esock_client_loop(State#{select => SelectInfo}); 929 {error, Reason} -> 930 e("recv failed: " 931 "~n ~p", [Reason]), 932 (catch socket:close(Sock)), 933 client_exit(State, recv, Reason) 934 end; 935esock_client_loop(#{condition := {await_reply, _MID}, 936 select := {select_info, _, Info} = SelectInfo, 937 parent := Parent, 938 sock := Sock} = State) -> 939 receive 940 {'DOWN', _MRef, process, Parent, Reason} -> 941 e("unexpected down from parent received: " 942 "~n ~p", [Reason]), 943 (catch socket:cancel(Sock, SelectInfo)), 944 (catch socket:close(Sock)), 945 client_exit(State, parent_down, Reason); 946 947 {'$socket', Sock, abort, Info} -> 948 e("received unexpected select abort: " 949 "~n ~p", [Info]), 950 (catch socket:close(Sock)), 951 client_exit(State, abort, Info); 952 953 {'$socket', Sock, select, Info} -> 954 %% i("select message received - try recv"), 955 case socket:recv(Sock) of 956 {ok, Data} -> 957 %% i("recv succeed (~w bytes of data received)", [size(Data)]), 958 esock_client_loop( 959 client_process_data(State#{select => undefined}, Data)); 960 {error, Reason} -> 961 e("recv failed: " 962 "~n ~p", [Reason]), 963 (catch socket:close(Sock)), 964 client_exit(State, recv, Reason) 965 end; 966 967 terminate -> 968 esock_client_loop(State#{condition => terminate}) 969 970 end; 971esock_client_loop(#{condition := send_request, 972 sock := Sock, 973 mid := MID} = State) -> 974 %% i("try send request ~w", [MID]), 975 Data = ?DATA, 976 SZ = size(Data), 977 Req = <<?MTAG:32, MID:32, ?REQUEST:32, SZ:32, Data/binary>>, 978 case socket:send(Sock, Req) of 979 ok -> 980 esock_client_loop(State#{condition => {await_reply, MID}, 981 mid => MID + 1}); 982 {error, Reason} -> 983 e("failed sending request ~w: " 984 "~n ~p", [MID, Reason]), 985 client_exit(State, send, Reason) 986 end. 987 988 989 990client_process_data(#{condition := {await_reply, MID}, 991 buf := Buf} = State, NewData) -> 992 case <<Buf/binary, NewData/binary>> of 993 <<?MTAG:32, MID:32, ?REPLY:32, SZ:32, _Data:SZ/binary, Rest/binary>> -> 994 State#{condition => send_request, buf => Rest}; 995 <<?MTAG:32, MID2:32, ?REPLY:32, SZ:32, _Data:SZ/binary, _Rest/binary>> -> 996 client_exit(State, unexpected_msg, MID2); 997 NewBuf -> 998 State#{buf => NewBuf} 999 end. 1000 1001client_exit(#{id := ID}, Tag, Reason) -> 1002 exit({{client, ID}, Tag, Reason}). 1003 1004 1005%% ======================================================================= 1006%% Utility 1007%% ======================================================================= 1008 1009%% This gets the local address (not {127, _} or {0, ...} or {16#fe80, ...}) 1010%% We should really implement this using the (new) net module, 1011%% but until that gets the necessary functionality... 1012which_local_addr(Domain) -> 1013 case which_local_host_info(Domain) of 1014 {ok, #{addr := Addr}} -> 1015 {ok, Addr}; 1016 {error, _Reason} = ERROR -> 1017 ERROR 1018 end. 1019 1020 1021%% Returns the interface (name), flags and address (not 127...) 1022%% of the local host. 1023which_local_host_info(Domain) -> 1024 case inet:getifaddrs() of 1025 {ok, IFL} -> 1026 which_local_host_info(Domain, IFL); 1027 {error, _} = ERROR -> 1028 ERROR 1029 end. 1030 1031which_local_host_info(_Domain, []) -> 1032 {error, no_address}; 1033which_local_host_info(Domain, [{"docker" ++ _, _}|IFL]) -> 1034 which_local_host_info(Domain, IFL); 1035which_local_host_info(Domain, [{"br-" ++ _, _}|IFL]) -> 1036 which_local_host_info(Domain, IFL); 1037which_local_host_info(Domain, [{Name, IFO}|IFL]) -> 1038 case if_is_running_and_not_loopback(IFO) of 1039 true -> 1040 try which_local_host_info2(Domain, IFO) of 1041 Info -> 1042 {ok, Info#{name => Name}} 1043 catch 1044 throw:_:_ -> 1045 which_local_host_info(Domain, IFL) 1046 end; 1047 false -> 1048 which_local_host_info(Domain, IFL) 1049 end; 1050which_local_host_info(Domain, [_|IFL]) -> 1051 which_local_host_info(Domain, IFL). 1052 1053if_is_running_and_not_loopback(If) -> 1054 lists:keymember(flags, 1, If) andalso 1055 begin 1056 {value, {flags, Flags}} = lists:keysearch(flags, 1, If), 1057 (not lists:member(loopback, Flags)) andalso 1058 lists:member(running, Flags) 1059 end. 1060 1061 1062which_local_host_info2(inet = _Domain, IFO) -> 1063 Addr = which_local_host_info3(addr, IFO, 1064 fun({A, _, _, _}) when (A =/= 127) -> true; 1065 (_) -> false 1066 end), 1067 NetMask = which_local_host_info3(netmask, IFO, 1068 fun({_, _, _, _}) -> true; 1069 (_) -> false 1070 end), 1071 BroadAddr = which_local_host_info3(broadaddr, IFO, 1072 fun({_, _, _, _}) -> true; 1073 (_) -> false 1074 end), 1075 Flags = which_local_host_info3(flags, IFO, fun(_) -> true end), 1076 #{flags => Flags, 1077 addr => Addr, 1078 broadaddr => BroadAddr, 1079 netmask => NetMask}; 1080which_local_host_info2(inet6 = _Domain, IFO) -> 1081 Addr = which_local_host_info3(addr, IFO, 1082 fun({A, _, _, _, _, _, _, _}) 1083 when (A =/= 0) andalso 1084 (A =/= 16#fe80) -> true; 1085 (_) -> false 1086 end), 1087 NetMask = which_local_host_info3(netmask, IFO, 1088 fun({_, _, _, _, _, _, _, _}) -> true; 1089 (_) -> false 1090 end), 1091 Flags = which_local_host_info3(flags, IFO, fun(_) -> true end), 1092 #{flags => Flags, 1093 addr => Addr, 1094 netmask => NetMask}. 1095 1096which_local_host_info3(_Key, [], _) -> 1097 throw({error, no_address}); 1098which_local_host_info3(Key, [{Key, Val}|IFO], Check) -> 1099 case Check(Val) of 1100 true -> 1101 Val; 1102 false -> 1103 which_local_host_info3(Key, IFO, Check) 1104 end; 1105which_local_host_info3(Key, [_|IFO], Check) -> 1106 which_local_host_info3(Key, IFO, Check). 1107 1108 1109%% --- 1110 1111string_of(socket = _Module, Socket) -> 1112 socket:to_list(Socket); 1113string_of(inet = _Module, Socket) -> 1114 inet:socket_to_list(Socket). 1115 1116 1117sockmon_fun(Module) -> 1118 fun(Sock) -> Module:monitor(Sock) end. 1119 1120 1121%% --- 1122 1123 1124 1125%% --- 1126 1127f(F, A) -> 1128 lists:flatten(io_lib:format(F, A)). 1129 1130 1131formated_timestamp() -> 1132 format_timestamp(os:timestamp()). 1133 1134format_timestamp({_N1, _N2, N3} = TS) -> 1135 {_Date, Time} = calendar:now_to_local_time(TS), 1136 {Hour,Min,Sec} = Time, 1137 FormatTS = io_lib:format("~.2.0w:~.2.0w:~.2.0w.4~w", 1138 [Hour, Min, Sec, round(N3/1000)]), 1139 lists:flatten(FormatTS). 1140 1141 1142e(F, A) -> 1143 p(true, "<ERROR> ", F, A). 1144 1145i(F) -> 1146 i(F, []). 1147 1148i(F, A) -> 1149 p(get(debug), "", F, A). 1150 1151p(true, PRE, F, A) -> 1152 io:format("[ ~s, ~s ] ~s" ++ F ++ "~n", 1153 [formated_timestamp(), get(sname), PRE | A]); 1154p(_, _, _, _) -> 1155 ok. 1156 1157