1%% 2%% %CopyrightBegin% 3%% 4%% Copyright Ericsson AB 2018-2019. All Rights Reserved. 5%% 6%% Licensed under the Apache License, Version 2.0 (the "License"); 7%% you may not use this file except in compliance with the License. 8%% You may obtain a copy of the License at 9%% 10%% http://www.apache.org/licenses/LICENSE-2.0 11%% 12%% Unless required by applicable law or agreed to in writing, software 13%% distributed under the License is distributed on an "AS IS" BASIS, 14%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15%% See the License for the specific language governing permissions and 16%% limitations under the License. 17%% 18%% %CopyrightEnd% 19%% 20 21%% ========================================================================== 22%% 23%% This is the "simple" client using gen_tcp. The client is supposed to be 24%% as simple as possible in order to incur as little overhead as possible. 25%% 26%% There are three ways to run the client: active, passive or active-once. 27%% 28%% The client is the entity that controls the test, timing and counting. 29%% 30%% ========================================================================== 31%% 32%% Before the actual test starts, the client performs a "warmup". 33%% The warmup has two functions. First, to ensure that everything is "loaded" 34%% and, second, to calculate an approximate roundtrip time, in order to 35%% "know" how many iterations we should make (to run for the expected time). 36%% This is not intended to be exact, but just to ensure that all tests take 37%% approx the same time to run. 38%% 39%% ========================================================================== 40 41-module(socket_test_ttest_tcp_client). 42 43-export([ 44 %% These are for the test suite 45 start_monitor/5, start_monitor/6, start_monitor/8, 46 47 %% These are for starting in a shell when run "manually" 48 start/3, start/4, start/6, start/7, 49 stop/1 50 ]). 51 52%% Internal exports 53-export([ 54 do_start/9 55 ]). 56 57-include_lib("kernel/include/inet.hrl"). 58-include("socket_test_ttest.hrl"). 59-include("socket_test_ttest_client.hrl"). 60 61-define(RECV_TIMEOUT, 10000). 62-define(MAX_OUTSTANDING_DEFAULT_1, 100). 63-define(MAX_OUTSTANDING_DEFAULT_2, 10). 64-define(MAX_OUTSTANDING_DEFAULT_3, 3). 65 66-define(LIB, socket_test_ttest_lib). 67-define(I(F), ?LIB:info(F)). 68-define(I(F,A), ?LIB:info(F, A)). 69-define(E(F,A), ?LIB:error(F, A)). 70-define(F(F,A), ?LIB:format(F, A)). 71-define(FORMAT_TIME(T), ?LIB:format_time(T)). 72-define(T(), ?LIB:t()). 73-define(TDIFF(T1,T2), ?LIB:tdiff(T1, T2)). 74 75-type active() :: once | boolean(). 76-type msg_id() :: 1..3. 77-type max_outstanding() :: pos_integer(). 78-type runtime() :: pos_integer(). 79 80 81%% ========================================================================== 82 83start_monitor(Node, Notify, Transport, ServerInfo, Active) -> 84 start_monitor(Node, Notify, Transport, ServerInfo, Active, ?MSG_ID_DEFAULT). 85 86start_monitor(Node, Notify, Transport, ServerInfo, Active, 1 = MsgID) -> 87 start_monitor(Node, Notify, Transport, ServerInfo, Active, MsgID, 88 ?MAX_OUTSTANDING_DEFAULT_1, ?RUNTIME_DEFAULT); 89start_monitor(Node, Notify, Transport, ServerInfo, Active, 2 = MsgID) -> 90 start_monitor(Node, Notify, Transport, ServerInfo, Active, MsgID, 91 ?MAX_OUTSTANDING_DEFAULT_2, ?RUNTIME_DEFAULT); 92start_monitor(Node, Notify, Transport, ServerInfo, Active, 3 = MsgID) -> 93 start_monitor(Node, Notify, Transport, ServerInfo, Active, MsgID, 94 ?MAX_OUTSTANDING_DEFAULT_3, ?RUNTIME_DEFAULT). 95 96start_monitor(Node, Notify, Transport, ServerInfo, Active, 97 MsgID, MaxOutstanding, RunTime) 98 when (Node =/= node()) -> 99 Args = [false, 100 self(), Notify, 101 Transport, ServerInfo, Active, MsgID, MaxOutstanding, RunTime], 102 case rpc:call(Node, ?MODULE, do_start, Args) of 103 {badrpc, _} = Reason -> 104 {error, Reason}; 105 {ok, Pid} when is_pid(Pid) -> 106 MRef = erlang:monitor(process, Pid), 107 {ok, {Pid, MRef}}; 108 {error, _} = ERROR -> 109 ERROR 110 end; 111start_monitor(_, Notify, Transport, ServerInfo, Active, 112 MsgID, MaxOutstanding, RunTime) -> 113 case do_start(false, 114 self(), Notify, 115 Transport, Active, ServerInfo, 116 MsgID, MaxOutstanding, RunTime) of 117 {ok, Pid} -> 118 MRef = erlang:monitor(process, Pid), 119 {ok, {Pid, MRef}}; 120 {error, _} = ERROR -> 121 ERROR 122 end. 123 124 125start(Transport, ServerInfo, Active) -> 126 start(Transport, ServerInfo, Active, ?MSG_ID_DEFAULT). 127 128start(Transport, ServerInfo, Active, 1 = MsgID) -> 129 start(false, 130 Transport, ServerInfo, Active, MsgID, 131 ?MAX_OUTSTANDING_DEFAULT_1, ?RUNTIME_DEFAULT); 132start(Transport, ServerInfo, Active, 2 = MsgID) -> 133 start(false, 134 Transport, ServerInfo, Active, MsgID, 135 ?MAX_OUTSTANDING_DEFAULT_2, ?RUNTIME_DEFAULT); 136start(Transport, ServerInfo, Active, 3 = MsgID) -> 137 start(false, 138 Transport, ServerInfo, Active, MsgID, 139 ?MAX_OUTSTANDING_DEFAULT_3, ?RUNTIME_DEFAULT). 140 141start(Transport, ServerInfo, Active, MsgID, MaxOutstanding, RunTime) -> 142 start(false, 143 Transport, ServerInfo, Active, MsgID, MaxOutstanding, RunTime). 144 145start(Quiet, Transport, ServerInfo, Active, MsgID, MaxOutstanding, RunTime) -> 146 Notify = fun(R) -> present_results(R) end, 147 do_start(Quiet, 148 self(), Notify, 149 Transport, ServerInfo, Active, MsgID, MaxOutstanding, RunTime). 150 151 152-spec do_start(Quiet, 153 Parent, 154 Notify, 155 Transport, 156 ServerInfo, 157 Active, 158 MsgID, 159 MaxOutstanding, 160 RunTime) -> {ok, Pid} | {error, Reason} when 161 Quiet :: boolean(), 162 Parent :: pid(), 163 Notify :: function(), 164 Transport :: atom() | tuple(), 165 ServerInfo :: {inet:ip_address(), inet:port_number()} | string(), 166 Active :: active(), 167 MsgID :: msg_id(), 168 MaxOutstanding :: max_outstanding(), 169 RunTime :: runtime(), 170 Pid :: pid(), 171 Reason :: term(). 172 173do_start(Quiet, 174 Parent, Notify, 175 Transport, ServerInfo, Active, MsgID, MaxOutstanding, RunTime) 176 when is_boolean(Quiet) andalso 177 is_pid(Parent) andalso 178 is_function(Notify) andalso 179 (is_atom(Transport) orelse is_tuple(Transport)) andalso 180 (is_boolean(Active) orelse (Active =:= once)) andalso 181 (is_tuple(ServerInfo) orelse is_list(ServerInfo)) andalso 182 (is_integer(MsgID) andalso (MsgID >= 1) andalso (MsgID =< 3)) andalso 183 (is_integer(MaxOutstanding) andalso (MaxOutstanding > 0)) andalso 184 (is_integer(RunTime) andalso (RunTime > 0)) -> 185 Starter = self(), 186 Init = fun() -> put(sname, "client"), 187 init(Quiet, 188 Starter, 189 Parent, 190 Notify, 191 Transport, Active, ServerInfo, 192 MsgID, MaxOutstanding, RunTime) 193 end, 194 {Pid, MRef} = spawn_monitor(Init), 195 receive 196 {'DOWN', MRef, process, Pid, Reason} -> 197 {error, Reason}; 198 {?MODULE, Pid, ok} -> 199 erlang:demonitor(MRef), 200 {ok, Pid}; 201 {?MODULE, Pid, {error, _} = ERROR} -> 202 erlang:demonitor(MRef, [flush]), 203 ERROR 204 end. 205 206 207%% We should not normally stop this (it terminates when its done). 208stop(Pid) when is_pid(Pid) -> 209 req(Pid, stop). 210 211 212%% ========================================================================== 213 214init(Quiet, 215 Starter, 216 Parent, Notify, 217 Transport, Active, ServerInfo, 218 MsgID, MaxOutstanding, RunTime) -> 219 if 220 not Quiet -> 221 ?I("init with" 222 "~n Transport: ~p" 223 "~n Active: ~p" 224 "~n ServerInfo: ~s" 225 "~n Msg ID: ~p (=> 16 + ~w bytes)" 226 "~n Max Outstanding: ~p" 227 "~n (Suggested) Run Time: ~p ms", 228 [Transport, Active, 229 case ServerInfo of 230 {Addr, Port} -> 231 ?F("Addr: ~s, Port: ~w", [inet:ntoa(Addr), Port]); 232 Path -> 233 Path 234 end, 235 MsgID, size(which_msg_data(MsgID)), MaxOutstanding, RunTime]); 236 true -> 237 ok 238 end, 239 {Mod, Connect} = process_transport(Transport), 240 case Connect(ServerInfo) of 241 {ok, Sock} -> 242 if not Quiet -> ?I("connected"); 243 true -> ok 244 end, 245 Starter ! {?MODULE, self(), ok}, 246 initial_activation(Mod, Sock, Active), 247 Results = loop(#{quiet => Quiet, 248 slogan => run, 249 runtime => RunTime, 250 start => ?T(), 251 parent => Parent, 252 mod => Mod, 253 sock => Sock, 254 active => Active, 255 msg_data => which_msg_data(MsgID), 256 outstanding => 0, 257 max_outstanding => MaxOutstanding, 258 sid => 1, 259 rid => 1, 260 scnt => 0, 261 rcnt => 0, 262 bcnt => 0, 263 num => undefined, 264 acc => <<>>}), 265 Notify(Results), 266 (catch Mod:close(Sock)), 267 exit(normal); 268 {error, Reason} -> 269 ?E("connect failed: ~p" 270 "~n ~p", [Reason, ServerInfo]), 271 exit({connect, Reason, ServerInfo}) 272 end. 273 274process_transport(Mod) when is_atom(Mod) -> 275 %% In this case we assume it to be a plain tcp socket 276 {Mod, fun({A, P}) -> Mod:connect(A, P) end}; 277process_transport({Mod, #{domain := Domain} = Opts}) -> 278 Connect = 279 case Domain of 280 local -> fun(Path) -> Mod:connect(Path, Opts) end; 281 _ -> fun({A, P}) -> Mod:connect(A, P, Opts) end 282 end, 283 {Mod, Connect}. 284 285 286which_msg_data(1) -> ?MSG_DATA1; 287which_msg_data(2) -> ?MSG_DATA2; 288which_msg_data(3) -> ?MSG_DATA3. 289 290 291present_results(#{status := ok, 292 runtime := RunTime, 293 bcnt := ByteCnt, 294 cnt := NumIterations}) -> 295 ?I("Results: " 296 "~n Run Time: ~s" 297 "~n ByteCnt: ~s" 298 "~n NumIterations: ~s", 299 [?FORMAT_TIME(RunTime), 300 if ((ByteCnt =:= 0) orelse (RunTime =:= 0)) -> 301 ?F("~w, ~w", [ByteCnt, RunTime]); 302 true -> 303 ?F("~p => ~p byte / ms", [ByteCnt, ByteCnt div RunTime]) 304 end, 305 if (RunTime =:= 0) -> 306 "-"; 307 true -> 308 ?F("~p => ~p iterations / ms", 309 [NumIterations, NumIterations div RunTime]) 310 end]), 311 ok; 312present_results(#{status := Failure, 313 runtime := RunTime, 314 sid := SID, 315 rid := RID, 316 scnt := SCnt, 317 rcnt := RCnt, 318 bcnt := BCnt, 319 num := Num}) -> 320 ?I("Time Test failed: " 321 "~n ~p" 322 "~n" 323 "~nwhen" 324 "~n" 325 "~n Run Time: ~s" 326 "~n Send ID: ~p" 327 "~n Recv ID: ~p" 328 "~n Send Count: ~p" 329 "~n Recv Count: ~p" 330 "~n Byte Count: ~p" 331 "~n Num Iterations: ~p", 332 [Failure, 333 ?FORMAT_TIME(RunTime), 334 SID, RID, SCnt, RCnt, BCnt, Num]). 335 336 337 338loop(#{runtime := RunTime} = State) -> 339 erlang:start_timer(RunTime, self(), stop), 340 try do_loop(State) 341 catch 342 throw:Results -> 343 Results 344 end. 345 346do_loop(State) -> 347 do_loop( handle_message( msg_exchange(State) ) ). 348 349msg_exchange(#{rcnt := Num, num := Num} = State) -> 350 finish(ok, State); 351msg_exchange(#{scnt := Num, num := Num} = State) -> 352 %% We are done sending more requests - now we will just await 353 %% the replies for the (still) outstanding replies. 354 msg_exchange( recv_reply(State) ); 355msg_exchange(#{outstanding := Outstanding, 356 max_outstanding := MaxOutstanding} = State) 357 when (Outstanding < MaxOutstanding) -> 358 msg_exchange( send_request(State) ); 359msg_exchange(State) -> 360 send_request( recv_reply(State) ). 361 362 363finish(ok, 364 #{start := Start, bcnt := BCnt, num := Num}) -> 365 Stop = ?T(), 366 throw(#{status => ok, 367 runtime => ?TDIFF(Start, Stop), 368 bcnt => BCnt, 369 cnt => Num}); 370finish(Reason, 371 #{start := Start, 372 sid := SID, rid := RID, 373 scnt := SCnt, rcnt := RCnt, bcnt := BCnt, 374 num := Num}) -> 375 Stop = ?T(), 376 throw(#{status => Reason, 377 runtime => ?TDIFF(Start, Stop), 378 sid => SID, 379 rid => RID, 380 scnt => SCnt, 381 rcnt => RCnt, 382 bcnt => BCnt, 383 num => Num}). 384 385send_request(#{mod := Mod, 386 sock := Sock, 387 sid := ID, 388 scnt := Cnt, 389 outstanding := Outstanding, 390 max_outstanding := MaxOutstanding, 391 msg_data := Data} = State) 392 when (MaxOutstanding > Outstanding) -> 393 SZ = size(Data), 394 Req = <<?TTEST_TAG:32, 395 ?TTEST_TYPE_REQUEST:32, 396 ID:32, 397 SZ:32, 398 Data/binary>>, 399 case Mod:send(Sock, Req) of 400 ok -> 401 State#{sid => next_id(ID), 402 scnt => Cnt + 1, 403 outstanding => Outstanding + 1}; 404 {error, Reason} -> 405 ?E("Failed sending request: ~p", [Reason]), 406 exit({send, Reason}) 407 end; 408send_request(State) -> 409 State. 410 411 412 413recv_reply(#{mod := Mod, 414 sock := Sock, 415 rid := ID, 416 active := false, 417 bcnt := BCnt, 418 rcnt := Cnt, 419 outstanding := Outstanding} = State) -> 420 case recv_reply_message1(Mod, Sock, ID) of 421 {ok, MsgSz} -> 422 State#{rid => next_id(ID), 423 bcnt => BCnt + MsgSz, 424 rcnt => Cnt + 1, 425 outstanding => Outstanding - 1}; 426 427 {error, timeout} -> 428 ?I("receive timeout"), 429 State; 430 431 {error, Reason} -> 432 finish(Reason, State) 433 end; 434recv_reply(#{mod := Mod, 435 sock := Sock, 436 rid := ID, 437 active := Active, 438 bcnt := BCnt, 439 scnt := SCnt, 440 rcnt := RCnt, 441 outstanding := Outstanding, 442 acc := Acc} = State) -> 443 case recv_reply_message2(Mod, Sock, ID, Acc) of 444 {ok, {MsgSz, NewAcc}} when is_integer(MsgSz) andalso is_binary(NewAcc) -> 445 maybe_activate(Mod, Sock, Active), 446 State#{rid => next_id(ID), 447 bcnt => BCnt + MsgSz, 448 rcnt => RCnt + 1, 449 outstanding => Outstanding - 1, 450 acc => NewAcc}; 451 452 ok -> 453 State; 454 455 {error, stop} -> 456 ?I("receive [~w] -> stop", [Active]), 457 %% This will have the effect that no more requests are sent... 458 State#{num => SCnt, stop_started => ?T()}; 459 460 {error, timeout} -> 461 ?I("receive[~w] -> timeout", [Active]), 462 State; 463 464 {error, Reason} -> 465 finish(Reason, State) 466 end. 467 468 469%% This function reads exactly one (reply) message. No more no less. 470recv_reply_message1(Mod, Sock, ID) -> 471 case Mod:recv(Sock, 4*4, ?RECV_TIMEOUT) of 472 {ok, <<?TTEST_TAG:32, 473 ?TTEST_TYPE_REPLY:32, 474 ID:32, 475 SZ:32>> = Hdr} -> 476 %% Receive the ping-pong reply boby 477 case Mod:recv(Sock, SZ, ?RECV_TIMEOUT) of 478 {ok, Data} when (size(Data) =:= SZ) -> 479 {ok, size(Hdr) + size(Data)}; 480 {error, Reason2} -> 481 ?E("Failed reading body: " 482 "~n ~p: ~p", [Reason2]), 483 {error, {recv_body, Reason2}} 484 end; 485 486 {ok, <<BadTag:32, 487 BadType:32, 488 BadID:32, 489 BadSZ:32>>} -> 490 {error, {invalid_hdr, 491 {?TTEST_TAG, BadTag}, 492 {?TTEST_TYPE_REPLY, BadType}, 493 {ID, BadID}, 494 BadSZ}}; 495 {ok, _InvHdr} -> 496 {error, invalid_hdr}; 497 498 {error, Reason1} -> 499 ?E("Feiled reading header: " 500 "~n ~p", [Reason1]), 501 {error, {recv_hdr, Reason1}} 502 end. 503 504 505%% This function first attempts to process the data we have already 506%% accumulated. If that is not enough for a (complete) reply, it 507%% will attempt to receive more. 508recv_reply_message2(Mod, Sock, ID, Acc) -> 509 case process_acc_data(ID, Acc) of 510 ok -> 511 %% No or insufficient data, so get more 512 recv_reply_message3(Mod, Sock, ID, Acc); 513 514 {ok, _} = OK -> % We already had a reply accumulated - no need to read more 515 OK; 516 517 {error, _} = ERROR -> 518 ERROR 519 end. 520 521%% This function receives a "chunk" of data, then it tries to extract 522%% one (reply) message from the accumulated and new data (combined). 523recv_reply_message3(_Mod, Sock, ID, Acc) -> 524 receive 525 {timeout, _TRef, stop} -> 526 {error, stop}; 527 528 {TagClosed, Sock} when (TagClosed =:= tcp_closed) orelse 529 (TagClosed =:= socket_closed) -> 530 {error, closed}; 531 532 {TagErr, Sock, Reason} when (TagErr =:= tcp_error) orelse 533 (TagErr =:= socket_error) -> 534 {error, Reason}; 535 536 {Tag, Sock, Msg} when (Tag =:= tcp) orelse 537 (Tag =:= socket) -> 538 process_acc_data(ID, <<Acc/binary, Msg/binary>>) 539 540 after ?RECV_TIMEOUT -> 541 ?I("timeout when" 542 "~n ID: ~p" 543 "~n size(Acc): ~p", 544 [ID, size(Acc)]), 545 %% {error, timeout} 546 recv_reply_message3(_Mod, Sock, ID, Acc) 547 end. 548 549 550process_acc_data(ID, <<?TTEST_TAG:32, 551 ?TTEST_TYPE_REPLY:32, 552 ID:32, 553 SZ:32, 554 Data/binary>>) when (SZ =< size(Data)) -> 555 <<_Body:SZ/binary, Rest/binary>> = Data, 556 {ok, {4*4+SZ, Rest}}; 557process_acc_data(ID, <<BadTag:32, 558 BadType:32, 559 BadID:32, 560 BadSZ:32, 561 _Data/binary>>) 562 when ((BadTag =/= ?TTEST_TAG) orelse 563 (BadType =/= ?TTEST_TYPE_REPLY) orelse 564 (BadID =/= ID)) -> 565 {error, {invalid_hdr, 566 {?TTEST_TAG, BadTag}, 567 {?TTEST_TYPE_REPLY, BadType}, 568 {ID, BadID}, 569 BadSZ}}; 570%% Not enough for an entire (reply) message 571process_acc_data(_ID, _Data) -> 572 ok. 573 574 575handle_message(#{quiet := Quiet, 576 parent := Parent, sock := Sock, scnt := SCnt} = State) -> 577 receive 578 {timeout, _TRef, stop} -> 579 if not Quiet -> ?I("STOP"); 580 true -> ok 581 end, 582 %% This will have the effect that no more requests are sent... 583 State#{num => SCnt, stop_started => ?T()}; 584 585 {?MODULE, Ref, Parent, stop} -> 586 %% This *aborts* the test 587 reply(Parent, Ref, ok), 588 exit(normal); 589 590 %% Only when active 591 {TagClosed, Sock, Reason} when (TagClosed =:= tcp_closed) orelse 592 (TagClosed =:= socket_closed) -> 593 %% We should never get this (unless the server crashed) 594 exit({closed, Reason}); 595 596 %% Only when active 597 {TagErr, Sock, Reason} when (TagErr =:= tcp_error) orelse 598 (TagErr =:= socket_error) -> 599 exit({error, Reason}) 600 601 after 0 -> 602 State 603 end. 604 605 606initial_activation(_Mod, _Sock, false = _Active) -> 607 ok; 608initial_activation(Mod, Sock, Active) -> 609 Mod:active(Sock, Active). 610 611 612maybe_activate(Mod, Sock, once = Active) -> 613 Mod:active(Sock, Active); 614maybe_activate(_, _, _) -> 615 ok. 616 617 618%% ========================================================================== 619 620req(Pid, Req) -> 621 Ref = make_ref(), 622 Pid ! {?MODULE, Ref, Pid, Req}, 623 receive 624 {'EXIT', Pid, Reason} -> 625 {error, {exit, Reason}}; 626 {?MODULE, Ref, Reply} -> 627 Reply 628 end. 629 630reply(Pid, Ref, Reply) -> 631 Pid ! {?MODULE, Ref, Reply}. 632 633 634%% ========================================================================== 635 636next_id(ID) when (ID < ?MAX_ID) -> 637 ID + 1; 638next_id(_) -> 639 1. 640 641 642%% ========================================================================== 643 644%% t() -> 645%% os:timestamp(). 646 647%% tdiff({A1, B1, C1} = _T1x, {A2, B2, C2} = _T2x) -> 648%% T1 = A1*1000000000+B1*1000+(C1 div 1000), 649%% T2 = A2*1000000000+B2*1000+(C2 div 1000), 650%% T2 - T1. 651 652%% formated_timestamp() -> 653%% format_timestamp(os:timestamp()). 654 655%% format_timestamp({_N1, _N2, N3} = TS) -> 656%% {_Date, Time} = calendar:now_to_local_time(TS), 657%% {Hour,Min,Sec} = Time, 658%% FormatTS = io_lib:format("~.2.0w:~.2.0w:~.2.0w.4~w", 659%% [Hour, Min, Sec, round(N3/1000)]), 660%% lists:flatten(FormatTS). 661 662%% %% Time is always in number os ms (milli seconds) 663%% format_time(T) -> 664%% f("~p", [T]). 665 666 667%% ========================================================================== 668 669%% f(F, A) -> 670%% lists:flatten(io_lib:format(F, A)). 671 672%% %% e(F) -> 673%% %% i("<ERROR> " ++ F). 674 675%% e(F, A) -> 676%% p(get(sname), "<ERROR> " ++ F, A). 677 678%% i(F) -> 679%% i(F, []). 680 681%% i(F, A) -> 682%% p(get(sname), "<INFO> " ++ F, A). 683 684%% p(undefined, F, A) -> 685%% p("- ", F, A); 686%% p(Prefix, F, A) -> 687%% io:format("[~s, ~s] " ++ F ++ "~n", [formated_timestamp(), Prefix |A]). 688