1%% 2%% %CopyrightBegin% 3%% 4%% Copyright Ericsson AB 2018-2019. All Rights Reserved. 5%% 6%% Licensed under the Apache License, Version 2.0 (the "License"); 7%% you may not use this file except in compliance with the License. 8%% You may obtain a copy of the License at 9%% 10%% http://www.apache.org/licenses/LICENSE-2.0 11%% 12%% Unless required by applicable law or agreed to in writing, software 13%% distributed under the License is distributed on an "AS IS" BASIS, 14%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15%% See the License for the specific language governing permissions and 16%% limitations under the License. 17%% 18%% %CopyrightEnd% 19%% 20 21%% ========================================================================== 22%% 23%% This is the "simple" server using gen_tcp. The server is supposed to be 24%% as simple as possible in order to incur as little overhead as possible. 25%% 26%% There are three ways to run the server: active, passive or active-once. 27%% 28%% The server does only two things; accept connnections and then reply 29%% to requests (actually the handler(s) does that). No timing or counting. 30%% That is all done by the clients. 31%% 32%% ========================================================================== 33 34-module(socket_test_ttest_tcp_server). 35 36-export([ 37 %% This are for the test suite 38 start_monitor/3, 39 40 %% This are for starting in a shell when run "manually" 41 start/2, 42 43 stop/1 44 ]). 45 46%% Internal exports 47-export([ 48 do_start/3 49 ]). 50 51-include_lib("kernel/include/inet.hrl"). 52-include("socket_test_ttest.hrl"). 53 54-define(ACC_TIMEOUT, 5000). 55-define(RECV_TIMEOUT, 5000). 56 57-define(LIB, socket_test_ttest_lib). 58-define(I(F), ?LIB:info(F)). 59-define(I(F,A), ?LIB:info(F, A)). 60-define(E(F,A), ?LIB:error(F, A)). 61-define(F(F,A), ?LIB:format(F, A)). 62-define(FORMAT_TIME(T), ?LIB:format_time(T)). 63-define(T(), ?LIB:t()). 64-define(TDIFF(T1,T2), ?LIB:tdiff(T1, T2)). 65 66 67%% ========================================================================== 68 69start_monitor(Node, Transport, Active) when (Node =/= node()) -> 70 case rpc:call(Node, ?MODULE, do_start, [self(), Transport, Active]) of 71 {badrpc, _} = Reason -> 72 {error, Reason}; 73 {ok, {Pid, AddrPort}} -> 74 MRef = erlang:monitor(process, Pid), 75 {ok, {{Pid, MRef}, AddrPort}}; 76 {error, _} = ERROR -> 77 ERROR 78 end; 79start_monitor(_, Transport, Active) -> 80 case do_start(self(), Transport, Active) of 81 {ok, {Pid, AddrPort}} -> 82 MRef = erlang:monitor(process, Pid), 83 {ok, {{Pid, MRef}, AddrPort}}; 84 {error, _} = ERROR -> 85 ERROR 86 end. 87 88 89 90start(Transport, Active) -> 91 do_start(self(), Transport, Active). 92 93%% Note that the Async option is actually only "used" for the 94%% socket transport module (it details how to implement the 95%% active feature). 96do_start(Parent, Transport, Active) 97 when is_pid(Parent) andalso 98 (is_atom(Transport) orelse is_tuple(Transport)) andalso 99 (is_boolean(Active) orelse (Active =:= once)) -> 100 Starter = self(), 101 ServerInit = fun() -> 102 put(sname, "server"), 103 server_init(Starter, Parent, Transport, Active) 104 end, 105 {Pid, MRef} = spawn_monitor(ServerInit), 106 receive 107 {'DOWN', MRef, process, Pid, Reason} -> 108 {error, Reason}; 109 {?MODULE, Pid, {ok, AddrPort}} -> 110 erlang:demonitor(MRef), 111 {ok, {Pid, AddrPort}}; 112 {?MODULE, Pid, {error, _} = ERROR} -> 113 erlang:demonitor(MRef, [flush]), 114 ERROR 115 end. 116 117 118stop(Pid) when is_pid(Pid) -> 119 req(Pid, stop). 120 121 122%% ========================================================================== 123 124server_init(Starter, Parent, Transport, Active) -> 125 ?I("init with" 126 "~n Transport: ~p" 127 "~n Active: ~p", [Transport, Active]), 128 {Mod, Listen, StatsInterval} = process_transport(Transport, Active), 129 case Listen(0) of 130 {ok, LSock} -> 131 case Mod:port(LSock) of 132 {ok, PortOrPath} -> 133 Result = 134 if 135 is_integer(PortOrPath) -> 136 %% This is just for convenience 137 case Mod:sockname(LSock) of 138 {ok, {Addr, _}} -> 139 ?I("listening on:" 140 "~n Addr: ~p (~s)" 141 "~n Port: ~w" 142 "~n", [Addr, 143 inet:ntoa(Addr), 144 PortOrPath]), 145 {Addr, PortOrPath}; 146 {error, SNReason} -> 147 exit({sockname, SNReason}) 148 end; 149 is_list(PortOrPath) -> 150 ?I("listening on:" 151 "~n Path: ~s" 152 "~n", [PortOrPath]), 153 PortOrPath 154 end, 155 Starter ! {?MODULE, self(), {ok, Result}}, 156 server_loop(#{parent => Parent, 157 mod => Mod, 158 active => Active, 159 lsock => LSock, 160 port_or_path => PortOrPath, 161 handlers => [], 162 stats_interval => StatsInterval, 163 %% Accumulation 164 runtime => 0, 165 mcnt => 0, 166 bcnt => 0, 167 hcnt => 0 168 }); 169 {error, PReason} -> 170 (catch Mod:close(LSock)), 171 exit({port, PReason}) 172 end; 173 {error, LReason} -> 174 exit({listen, LReason}) 175 end. 176 177process_transport(Mod, _) when is_atom(Mod) -> 178 {Mod, fun(Port) -> Mod:listen(Port) end, infinity}; 179process_transport({Mod, #{stats_interval := T} = Opts}, Active) 180 when (Active =/= false) -> 181 {Mod, fun(Port) -> Mod:listen(Port, Opts#{stats_to => self()}) end, T}; 182process_transport({Mod, #{stats_interval := T} = Opts}, _Active) -> 183 {Mod, fun(Port) -> Mod:listen(Port, Opts) end, T}; 184process_transport({Mod, Opts}, _Active) -> 185 {Mod, fun(Port) -> Mod:listen(Port, Opts) end, infinity}. 186 187 188 189server_loop(State) -> 190 server_loop( server_handle_message( server_accept(State, ?ACC_TIMEOUT), 0) ). 191 192server_accept(#{mod := Mod, lsock := LSock} = State, Timeout) -> 193 case Mod:accept(LSock, Timeout) of 194 {ok, Sock} -> 195 server_handle_accepted(State, Sock); 196 {error, timeout} when (Timeout =/= nowait) -> 197 State; 198 {error, AReason} -> 199 (catch Mod:close(LSock)), 200 exit({accept, AReason}) 201 end. 202 203%% server_accept(#{mod := Mod, 204%% lsock := LSock} = State) -> 205%% case Mod:accept(LSock, ?ACC_TIMEOUT) of 206%% {ok, Sock} -> 207%% server_handle_accepted(State, Sock); 208%% {error, timeout} -> 209%% State; 210%% {error, AReason} -> 211%% (catch Mod:close(LSock)), 212%% exit({accept, AReason}) 213%% end. 214 215server_handle_accepted(#{mod := Mod, 216 lsock := LSock, 217 active := Active, 218 handlers := Handlers} = State, 219 Sock) -> 220 ?I("accepted connection from ~s", 221 [case Mod:peername(Sock) of 222 {ok, Peer} -> 223 format_peername(Peer); 224 {error, _} -> 225 "-" 226 end]), 227 {Pid, _} = handler_start(), 228 ?I("handler ~p started -> try transfer socket control", [Pid]), 229 case Mod:controlling_process(Sock, Pid) of 230 ok -> 231 maybe_start_stats_timer(State, Pid), 232 ?I("server-accept: handler ~p started", [Pid]), 233 handler_continue(Pid, Mod, Sock, Active), 234 Handlers2 = [Pid | Handlers], 235 State#{handlers => Handlers2}; 236 {error, CPReason} -> 237 (catch Mod:close(Sock)), 238 (catch Mod:close(LSock)), 239 exit({controlling_process, CPReason}) 240 end. 241 242 243format_peername({Addr, Port}) -> 244 case inet:gethostbyaddr(Addr) of 245 {ok, #hostent{h_name = N}} -> 246 ?F("~s (~s:~w)", [N, inet:ntoa(Addr), Port]); 247 {error, _} -> 248 ?F("~p, ~p", [Addr, Port]) 249 end; 250format_peername(Path) when is_list(Path) -> 251 Path. 252 253maybe_start_stats_timer(#{active := Active, stats_interval := Time}, Handler) 254 when (Active =/= false) andalso (is_integer(Time) andalso (Time > 0)) -> 255 start_stats_timer(Time, "handler", Handler); 256maybe_start_stats_timer(_, _) -> 257 ok. 258 259start_stats_timer(Time, ProcStr, Pid) -> 260 erlang:start_timer(Time, self(), {stats, Time, ProcStr, Pid}). 261 262server_handle_message(#{mod := Mod, 263 lsock := LSock, 264 parent := Parent, 265 handlers := H} = State, Timeout) -> 266 receive 267 {timeout, _TRef, {stats, Interval, ProcStr, Pid}} -> 268 case server_handle_stats(ProcStr, Pid) of 269 ok -> 270 start_stats_timer(Interval, ProcStr, Pid); 271 skip -> 272 ok 273 end, 274 State; 275 276 {?MODULE, Ref, Parent, stop} -> 277 reply(Parent, Ref, ok), 278 lists:foreach(fun(P) -> handler_stop(P) end, H), 279 (catch Mod:close(LSock)), 280 exit(normal); 281 282 {'DOWN', _MRef, process, Pid, Reason} -> 283 server_handle_down(Pid, Reason, State) 284 285 after Timeout -> 286 State 287 end. 288 289server_handle_stats(ProcStr, Pid) -> 290 case ?LIB:formated_process_stats(Pid) of 291 "" -> 292 skip; 293 FormatedStats -> 294 ?I("Statistics for ~s ~p:~s", [ProcStr, Pid, FormatedStats]), 295 ok 296 end. 297 298 299server_handle_down(Pid, Reason, #{handlers := Handlers} = State) -> 300 case lists:delete(Pid, Handlers) of 301 Handlers -> 302 ?I("unknown process ~p died", [Pid]), 303 State; 304 Handlers2 -> 305 server_handle_handler_down(Pid, Reason, State#{handlers => Handlers2}) 306 end. 307 308 309server_handle_handler_down(Pid, 310 {done, RunTime, MCnt, BCnt}, 311 #{runtime := AccRunTime, 312 mcnt := AccMCnt, 313 bcnt := AccBCnt, 314 hcnt := AccHCnt} = State) -> 315 AccRunTime2 = AccRunTime + RunTime, 316 AccMCnt2 = AccMCnt + MCnt, 317 AccBCnt2 = AccBCnt + BCnt, 318 AccHCnt2 = AccHCnt + 1, 319 MsgCount2Str = 320 fun(RT, ART, MC, AMC) when (RT > 0) -> 321 ?F("~w => ~w (~w) msgs / ms", [MC, MC div RT, AMC div ART]); 322 (_, _, MC, AMC) -> 323 ?F("~w (~w)", [MC, AMC]) 324 end, 325 ByteCount2Str = 326 fun(RT, ART, BC, ABC) when (RT > 0) -> 327 ?F("~w => ~w (~w) bytes / ms", [BC, BC div RT, ABC div ART]); 328 (_, _, BC, ABC) -> 329 ?F("~w", [BC, ABC]) 330 end, 331 ?I("handler ~p (~w) done: " 332 "~n Run Time: ~s" 333 "~n Message Count: ~s" 334 "~n Byte Count: ~s", 335 [Pid, AccHCnt2, 336 ?FORMAT_TIME(RunTime), 337 MsgCount2Str(RunTime, AccRunTime2, MCnt, AccMCnt2), 338 ByteCount2Str(RunTime, AccRunTime2, BCnt, AccBCnt2)]), 339 State#{runtime => AccRunTime2, 340 mcnt => AccMCnt2, 341 bcnt => AccBCnt2, 342 hcnt => AccHCnt2}; 343server_handle_handler_down(Pid, Reason, State) -> 344 ?I("handler ~p terminated: " 345 "~n ~p", [Pid, Reason]), 346 State. 347 348 349 350%% ========================================================================== 351 352handler_start() -> 353 Self = self(), 354 HandlerInit = fun() -> put(sname, "handler"), handler_init(Self) end, 355 spawn_monitor(HandlerInit). 356 357handler_continue(Pid, Mod, Sock, Active) -> 358 req(Pid, {continue, Mod, Sock, Active}). 359 360handler_stop(Pid) -> 361 req(Pid, stop). 362 363handler_init(Parent) -> 364 ?I("starting"), 365 receive 366 {?MODULE, Ref, Parent, {continue, Mod, Sock, Active}} -> 367 ?I("received continue"), 368 reply(Parent, Ref, ok), 369 handler_initial_activation(Mod, Sock, Active), 370 handler_loop(#{parent => Parent, 371 mod => Mod, 372 sock => Sock, 373 active => Active, 374 start => ?T(), 375 mcnt => 0, 376 bcnt => 0, 377 last_reply => none, 378 acc => <<>>}) 379 380 after 5000 -> 381 ?I("timeout when message queue: " 382 "~n ~p" 383 "~nwhen" 384 "~n Parent: ~p", [process_info(self(), messages), Parent]), 385 handler_init(Parent) 386 end. 387 388handler_loop(State) -> 389 handler_loop( handler_handle_message( handler_recv_message(State) ) ). 390 391%% When passive, we read *one* request and then attempt to reply to it. 392handler_recv_message(#{mod := Mod, 393 sock := Sock, 394 active := false, 395 mcnt := MCnt, 396 bcnt := BCnt, 397 last_reply := LID} = State) -> 398 case handler_recv_message2(Mod, Sock) of 399 {ok, {MsgSz, ID, Body}} -> 400 handler_send_reply(Mod, Sock, ID, Body), 401 State#{mcnt => MCnt + 1, 402 bcnt => BCnt + MsgSz, 403 last_reply => ID}; 404 {error, closed} -> 405 handler_done(State); 406 {error, timeout} -> 407 ?I("timeout when: " 408 "~n MCnt: ~p" 409 "~n BCnt: ~p" 410 "~n LID: ~p", [MCnt, BCnt, LID]), 411 State 412 end; 413 414 415%% When "active" (once or true), we receive one data "message", which may 416%% contain any number of requests or only part of a request. Then we 417%% process this data together with whatever we had "accumulated" from 418%% prevous messages. Each request will be extracted and replied to. If 419%% there is some data left, not enough for a complete request, we store 420%% this in 'acc' (accumulate it). 421handler_recv_message(#{mod := Mod, 422 sock := Sock, 423 active := Active, 424 mcnt := MCnt, 425 bcnt := BCnt, 426 last_reply := LID, 427 acc := Acc} = State) -> 428 case handler_recv_message3(Mod, Sock, Acc, LID) of 429 {ok, {MCnt2, BCnt2, LID2}, NewAcc} -> 430 handler_maybe_activate(Mod, Sock, Active), 431 State#{mcnt => MCnt + MCnt2, 432 bcnt => BCnt + BCnt2, 433 last_reply => LID2, 434 acc => NewAcc}; 435 436 {error, closed} -> 437 if 438 (size(Acc) =:= 0) -> 439 handler_done(State); 440 true -> 441 ?E("client done with partial message: " 442 "~n Last Reply Sent: ~w" 443 "~n Message Count: ~w" 444 "~n Byte Count: ~w" 445 "~n Partial Message: ~w bytes", 446 [LID, MCnt, BCnt, size(Acc)]), 447 exit({closed_with_partial_message, LID}) 448 end; 449 450 {error, timeout} -> 451 ?I("timeout when: " 452 "~n MCnt: ~p" 453 "~n BCnt: ~p" 454 "~n LID: ~p" 455 "~n size(Acc): ~p", [MCnt, BCnt, LID, size(Acc)]), 456 State 457 end. 458 459handler_process_data(Acc, Mod, Sock, LID) -> 460 handler_process_data(Acc, Mod, Sock, 0, 0, LID). 461 462%% Extract each complete request, one at a time. 463handler_process_data(<<?TTEST_TAG:32, 464 ?TTEST_TYPE_REQUEST:32, 465 ID:32, 466 SZ:32, 467 Rest/binary>>, 468 Mod, Sock, 469 MCnt, BCnt, _LID) when (size(Rest) >= SZ) -> 470 <<Body:SZ/binary, Rest2/binary>> = Rest, 471 case handler_send_reply(Mod, Sock, ID, Body) of 472 ok -> 473 handler_process_data(Rest2, Mod, Sock, MCnt+1, BCnt+16+SZ, ID); 474 {error, _} = ERROR -> 475 ERROR 476 end; 477handler_process_data(Data, _Mod, _Sock, MCnt, BCnt, LID) -> 478 {ok, {MCnt, BCnt, LID}, Data}. 479 480 481handler_recv_message2(Mod, Sock) -> 482 case Mod:recv(Sock, 4*4, ?RECV_TIMEOUT) of 483 {ok, <<?TTEST_TAG:32, 484 ?TTEST_TYPE_REQUEST:32, 485 ID:32, 486 SZ:32>> = Hdr} -> 487 case Mod:recv(Sock, SZ, ?RECV_TIMEOUT) of 488 {ok, Body} when (SZ =:= size(Body)) -> 489 {ok, {size(Hdr) + size(Body), ID, Body}}; 490 {error, BReason} -> 491 ?E("failed reading body (~w) of message ~w:" 492 "~n ~p", [SZ, ID, BReason]), 493 exit({recv, body, ID, SZ, BReason}) 494 end; 495 {error, timeout} = ERROR -> 496 ERROR; 497 {error, closed} = ERROR -> 498 ERROR; 499 {error, HReason} -> 500 ?E("Failed reading header of message:" 501 "~n ~p", [HReason]), 502 exit({recv, header, HReason}) 503 end. 504 505 506handler_recv_message3(Mod, Sock, Acc, LID) -> 507 receive 508 {TagClosed, Sock} when (TagClosed =:= tcp_closed) orelse 509 (TagClosed =:= socket_closed) -> 510 {error, closed}; 511 512 {TagErr, Sock, Reason} when (TagErr =:= tcp_error) orelse 513 (TagErr =:= socket_error) -> 514 {error, Reason}; 515 516 {Tag, Sock, Msg} when (Tag =:= tcp) orelse 517 (Tag =:= socket) -> 518 handler_process_data(<<Acc/binary, Msg/binary>>, Mod, Sock, LID) 519 520 after ?RECV_TIMEOUT -> 521 {error, timeout} 522 end. 523 524 525 526handler_send_reply(Mod, Sock, ID, Data) -> 527 SZ = size(Data), 528 Msg = <<?TTEST_TAG:32, 529 ?TTEST_TYPE_REPLY:32, 530 ID:32, 531 SZ:32, 532 Data/binary>>, 533 case Mod:send(Sock, Msg) of 534 ok -> 535 ok; 536 {error, Reason} -> 537 (catch Mod:close(Sock)), 538 exit({send, Reason}) 539 end. 540 541 542handler_done(State) -> 543 handler_done(State, ?T()). 544 545handler_done(#{start := Start, 546 mod := Mod, 547 sock := Sock, 548 mcnt := MCnt, 549 bcnt := BCnt}, Stop) -> 550 (catch Mod:close(Sock)), 551 exit({done, ?TDIFF(Start, Stop), MCnt, BCnt}). 552 553 554handler_handle_message(#{parent := Parent} = State) -> 555 receive 556 {'EXIT', Parent, Reason} -> 557 exit({parent_exit, Reason}) 558 after 0 -> 559 State 560 end. 561 562 563handler_initial_activation(_Mod, _Sock, false = _Active) -> 564 ok; 565handler_initial_activation(Mod, Sock, Active) -> 566 Mod:active(Sock, Active). 567 568 569handler_maybe_activate(Mod, Sock, once = Active) -> 570 Mod:active(Sock, Active); 571handler_maybe_activate(_, _, _) -> 572 ok. 573 574 575 576%% ========================================================================== 577 578%% which_addr() -> 579%% case inet:getifaddrs() of 580%% {ok, IfAddrs} -> 581%% which_addrs(inet, IfAddrs); 582%% {error, Reason} -> 583%% exit({getifaddrs, Reason}) 584%% end. 585 586%% which_addrs(_Family, []) -> 587%% exit({getifaddrs, not_found}); 588%% which_addrs(Family, [{"lo", _} | IfAddrs]) -> 589%% %% Skip 590%% which_addrs(Family, IfAddrs); 591%% which_addrs(Family, [{"docker" ++ _, _} | IfAddrs]) -> 592%% %% Skip docker 593%% which_addrs(Family, IfAddrs); 594%% which_addrs(Family, [{"br-" ++ _, _} | IfAddrs]) -> 595%% %% Skip docker 596%% which_addrs(Family, IfAddrs); 597%% which_addrs(Family, [{"en" ++ _, IfOpts} | IfAddrs]) -> 598%% %% Maybe take this one 599%% case which_addr(Family, IfOpts) of 600%% {ok, Addr} -> 601%% Addr; 602%% error -> 603%% which_addrs(Family, IfAddrs) 604%% end; 605%% which_addrs(Family, [{_IfName, IfOpts} | IfAddrs]) -> 606%% case which_addr(Family, IfOpts) of 607%% {ok, Addr} -> 608%% Addr; 609%% error -> 610%% which_addrs(Family, IfAddrs) 611%% end. 612 613%% which_addr(_, []) -> 614%% error; 615%% which_addr(inet, [{addr, Addr}|_]) 616%% when is_tuple(Addr) andalso (size(Addr) =:= 4) -> 617%% {ok, Addr}; 618%% which_addr(inet6, [{addr, Addr}|_]) 619%% when is_tuple(Addr) andalso (size(Addr) =:= 8) -> 620%% {ok, Addr}; 621%% which_addr(Family, [_|IfOpts]) -> 622%% which_addr(Family, IfOpts). 623 624 625%% ========================================================================== 626 627req(Pid, Req) -> 628 Ref = make_ref(), 629 Pid ! {?MODULE, Ref, self(), Req}, 630 receive 631 {'EXIT', Pid, Reason} -> 632 {error, {exit, Reason}}; 633 {?MODULE, Ref, Reply} -> 634 Reply 635 end. 636 637reply(Pid, Ref, Reply) -> 638 Pid ! {?MODULE, Ref, Reply}. 639 640 641%% ========================================================================== 642 643%% t() -> 644%% os:timestamp(). 645 646%% tdiff({A1, B1, C1} = _T1x, {A2, B2, C2} = _T2x) -> 647%% T1 = A1*1000000000+B1*1000+(C1 div 1000), 648%% T2 = A2*1000000000+B2*1000+(C2 div 1000), 649%% T2 - T1. 650 651%% formated_timestamp() -> 652%% format_timestamp(os:timestamp()). 653 654%% format_timestamp({_N1, _N2, N3} = TS) -> 655%% {_Date, Time} = calendar:now_to_local_time(TS), 656%% {Hour,Min,Sec} = Time, 657%% FormatTS = io_lib:format("~.2.0w:~.2.0w:~.2.0w.4~w", 658%% [Hour, Min, Sec, round(N3/1000)]), 659%% lists:flatten(FormatTS). 660 661%% %% Time is always in number os ms (milli seconds) 662%% format_time(T) -> 663%% f("~p", [T]). 664 665 666%% ========================================================================== 667 668%% f(F, A) -> 669%% lists:flatten(io_lib:format(F, A)). 670 671%% e(F, A) -> 672%% p(get(sname), "<ERROR> " ++ F, A). 673 674%% i(F) -> 675%% i(F, []). 676 677%% i(F, A) -> 678%% p(get(sname), "<INFO> " ++ F, A). 679 680%% p(undefined, F, A) -> 681%% p("- ", F, A); 682%% p(Prefix, F, A) -> 683%% io:format("[~s, ~s] " ++ F ++ "~n", [formated_timestamp(), Prefix |A]). 684 685