1%%
2%% %CopyrightBegin%
3%%
4%% Copyright Ericsson AB 2018-2019. All Rights Reserved.
5%%
6%% Licensed under the Apache License, Version 2.0 (the "License");
7%% you may not use this file except in compliance with the License.
8%% You may obtain a copy of the License at
9%%
10%%     http://www.apache.org/licenses/LICENSE-2.0
11%%
12%% Unless required by applicable law or agreed to in writing, software
13%% distributed under the License is distributed on an "AS IS" BASIS,
14%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15%% See the License for the specific language governing permissions and
16%% limitations under the License.
17%%
18%% %CopyrightEnd%
19%%
20
21-module(socket_test_ttest_tcp_socket).
22
23-export([
24	 accept/1, accept/2,
25	 active/2,
26	 close/1,
27	 connect/1, connect/2, connect/3,
28	 controlling_process/2,
29	 listen/0, listen/1, listen/2,
30	 port/1,
31	 peername/1,
32	 recv/2, recv/3,
33	 send/2,
34	 shutdown/2,
35	 sockname/1
36	]).
37
38
39-define(LIB, socket_test_lib).
40
41-define(READER_RECV_TIMEOUT, 1000).
42
43-define(DATA_MSG(Sock, Method, Data),
44        {socket,
45         #{sock => Sock, reader => self(), method => Method},
46         Data}).
47
48-define(CLOSED_MSG(Sock, Method),
49        {socket_closed,
50         #{sock => Sock, reader => self(), method => Method}}).
51
52-define(ERROR_MSG(Sock, Method, Reason),
53        {socket_error,
54         #{sock => Sock, reader => self(), method => Method},
55         Reason}).
56
57
58%% ==========================================================================
59
60%% This does not really work. Its just a placeholder for the time being...
61
62%% getopt(Sock, Opt) when is_atom(Opt) ->
63%%     socket:getopt(Sock, socket, Opt).
64
65%% setopt(Sock, Opt, Value) when is_atom(Opt) ->
66%%     socket:setopts(Sock, socket, Opt, Value).
67
68
69%% ==========================================================================
70
71%% The way we use server async its no point in doing a async accept call
72%% (we do never actually run the test with more than one client).
73accept(#{sock := LSock, opts := #{async  := Async,
74                                  method := Method} = Opts}) ->
75    case socket:accept(LSock) of
76        {ok, Sock} ->
77	    Self = self(),
78	    Reader = spawn(fun() ->
79                                   reader_init(Self, Sock, Async, false, Method)
80                           end),
81            maybe_start_stats_timer(Opts, Reader),
82	    {ok, #{sock => Sock, reader => Reader, method => Method}};
83	{error, _} = ERROR ->
84	    ERROR
85    end.
86
87%% If a timeout has been explictly specified, then we do not use
88%% async here. We will pass it on to the reader process.
89accept(#{sock := LSock, opts := #{async  := Async,
90                                  method := Method} = Opts}, Timeout) ->
91    case socket:accept(LSock, Timeout) of
92	{ok, Sock} ->
93	    Self = self(),
94	    Reader = spawn(fun() ->
95                                   reader_init(Self, Sock, Async, false, Method)
96                           end),
97            maybe_start_stats_timer(Opts, Reader),
98	    {ok, #{sock => Sock, reader => Reader, method => Method}};
99	{error, _} = ERROR ->
100	    ERROR
101    end.
102
103
104active(#{reader := Pid}, NewActive)
105  when (is_boolean(NewActive) orelse (NewActive =:= once)) ->
106    Pid ! {?MODULE, active, NewActive},
107    ok.
108
109
110close(#{sock := Sock, reader := Pid}) ->
111    Pid ! {?MODULE, stop},
112    Unlink = case socket:sockname(Sock) of
113                 {ok, #{family := local, path := Path}} ->
114                     fun() -> os:cmd("unlink " ++ Path), ok end;
115                 _ ->
116                     fun() -> ok end
117             end,
118    Res = socket:close(Sock),
119    Unlink(),
120    Res.
121
122%% Create a socket and connect it to a peer
123connect(ServerPath) when is_list(ServerPath) ->
124    Domain     = local,
125    ClientPath = mk_unique_path(),
126    LocalSA    = #{family => Domain,
127                   path   => ClientPath},
128    ServerSA   = #{family => Domain, path => ServerPath},
129    Opts       = #{domain => Domain,
130                   proto  => default,
131                   method => plain},
132    Cleanup = fun() -> os:cmd("unlink " ++ ClientPath), ok end,
133    do_connect(LocalSA, ServerSA, Cleanup, Opts).
134
135connect(Addr, Port) when is_tuple(Addr) andalso is_integer(Port) ->
136    Domain   = inet,
137    LocalSA  = any,
138    ServerSA = #{family => Domain,
139                 addr   => Addr,
140                 port   => Port},
141    Opts     = #{domain => Domain,
142                 proto  => tcp,
143                 method => plain},
144    Cleanup  = fun() -> ok end,
145    do_connect(LocalSA, ServerSA, Cleanup, Opts);
146connect(ServerPath,
147        #{domain := local = Domain} = Opts)
148  when is_list(ServerPath) ->
149    ClientPath = mk_unique_path(),
150    LocalSA    = #{family => Domain,
151                   path   => ClientPath},
152    ServerSA   = #{family => Domain,
153                   path   => ServerPath},
154    Cleanup    = fun() -> os:cmd("unlink " ++ ClientPath), ok end,
155    do_connect(LocalSA, ServerSA, Cleanup, Opts#{proto => default}).
156
157connect(Addr, Port, #{domain := Domain} = Opts) ->
158    LocalSA  = any,
159    ServerSA = #{family => Domain,
160                 addr   => Addr,
161                 port   => Port},
162    Cleanup  = fun() -> ok end,
163    do_connect(LocalSA, ServerSA, Cleanup, Opts#{proto => tcp}).
164
165do_connect(LocalSA, ServerSA, Cleanup, #{domain := Domain,
166                                         proto  := Proto,
167                                         async  := Async,
168                                         method := Method} = Opts) ->
169    try
170	begin
171	    Sock =
172		case socket:open(Domain, stream, Proto) of
173		    {ok, S} ->
174			S;
175		    {error, OReason} ->
176			throw({error, {open, OReason}})
177		end,
178	    case socket:bind(Sock, LocalSA) of
179		ok ->
180		    ok;
181		{error, BReason} ->
182		    (catch socket:close(Sock)),
183                    Cleanup(),
184		    throw({error, {bind, BReason}})
185	    end,
186	    case socket:connect(Sock, ServerSA) of
187		ok ->
188		    ok;
189		{error, CReason} ->
190		    (catch socket:close(Sock)),
191                    Cleanup(),
192		    throw({error, {connect, CReason}})
193	    end,
194	    Self   = self(),
195	    Reader = spawn(fun() ->
196                                   reader_init(Self, Sock, Async, false, Method)
197                           end),
198            maybe_start_stats_timer(Opts, Reader),
199	    {ok, #{sock => Sock, reader => Reader, method => Method}}
200	end
201    catch
202	throw:ERROR:_ ->
203	    ERROR
204    end.
205
206mk_unique_path() ->
207    [NodeName | _] = string:tokens(atom_to_list(node()), [$@]),
208    ?LIB:f("/tmp/esock_~s_~w", [NodeName, erlang:system_time(nanosecond)]).
209
210maybe_start_stats_timer(#{stats_to       := Pid,
211                          stats_interval := T},
212                        Reader) when is_pid(Pid) ->
213    erlang:start_timer(T, Pid, {stats, T, "reader", Reader});
214maybe_start_stats_timer(_O, _) ->
215    ok.
216
217controlling_process(#{sock := Sock, reader := Pid}, NewPid) ->
218    case socket:setopt(Sock, otp, controlling_process, NewPid) of
219	ok ->
220	    Pid ! {?MODULE, self(), controlling_process, NewPid},
221	    receive
222		{?MODULE, Pid, controlling_process} ->
223		    ok
224	    end;
225	{error, _} = ERROR ->
226	    ERROR
227    end.
228
229
230%% Create a listen socket
231listen() ->
232    listen(0).
233
234listen(Port) when is_integer(Port) ->
235    listen(Port, #{domain => inet, async => false, method => plain});
236listen(Path) when is_list(Path) ->
237    listen(Path, #{domain => local, async => false, method => plain}).
238
239listen(0, #{domain := local} = Opts) ->
240    listen(mk_unique_path(), Opts);
241listen(Path, #{domain := local = Domain} = Opts)
242  when is_list(Path) andalso (Path =/= []) ->
243    SA = #{family => Domain,
244           path   => Path},
245    Cleanup = fun() -> os:cmd("unlink " ++ Path), ok end,
246    do_listen(SA, Cleanup, Opts#{proto => default});
247listen(Port, #{domain := Domain} = Opts)
248  when is_integer(Port) andalso (Port >= 0) ->
249    %% Bind fills in the rest
250    case ?LIB:which_local_host_info(Domain) of
251	{ok, #{addr := Addr}} ->
252	    SA = #{family => Domain,
253		   addr   => Addr,
254		   port   => Port},
255	    Cleanup = fun() -> ok end,
256	    do_listen(SA, Cleanup, Opts#{proto => tcp});
257	{error, _} = ERROR ->
258	    ERROR
259    end.
260
261do_listen(SA,
262          Cleanup,
263          #{domain := Domain, proto  := Proto,
264            async  := Async,  method := Method} = Opts)
265  when (Method =:= plain) orelse (Method =:= msg) andalso
266       is_boolean(Async) ->
267    try
268	begin
269	    Sock = case socket:open(Domain, stream, Proto) of
270		       {ok, S} ->
271			   S;
272		       {error, OReason} ->
273			   throw({error, {open, OReason}})
274		   end,
275	    case socket:bind(Sock, SA) of
276		ok ->
277		    ok;
278		{error, BReason} ->
279		    (catch socket:close(Sock)),
280                    Cleanup(),
281		    throw({error, {bind, BReason}})
282	    end,
283	    case socket:listen(Sock) of
284		ok ->
285                    ok;
286                {error, LReason} ->
287		    (catch socket:close(Sock)),
288                    Cleanup(),
289                    throw({error, {listen, LReason}})
290            end,
291	    {ok, #{sock => Sock, opts => Opts}}
292	end
293    catch
294	throw:{error, Reason}:_ ->
295	    {error, Reason}
296    end.
297
298
299port(#{sock := Sock}) ->
300    case socket:sockname(Sock) of
301	{ok, #{family := local, path := Path}} ->
302	    {ok, Path};
303	{ok, #{port := Port}} ->
304	    {ok, Port};
305	{error, _} = ERROR ->
306	    ERROR
307    end.
308
309
310peername(#{sock := Sock}) ->
311    case socket:peername(Sock) of
312	{ok, #{family := local, path := Path}} ->
313	    {ok, Path};
314	{ok, #{addr := Addr, port := Port}} ->
315	    {ok, {Addr, Port}};
316	{error, _} = ERROR ->
317	    ERROR
318    end.
319
320
321recv(#{sock := Sock, method := plain}, Length) ->
322    socket:recv(Sock, Length);
323recv(#{sock := Sock, method := msg}, Length) ->
324    case socket:recvmsg(Sock, Length, 0, [], infinity) of
325        {ok, #{iov := [Bin]}} ->
326            {ok, Bin};
327        {error, _} = ERROR ->
328            ERROR
329    end.
330
331recv(#{sock := Sock, method := plain}, Length, Timeout) ->
332    socket:recv(Sock, Length, Timeout);
333recv(#{sock := Sock, method := msg}, Length, Timeout) ->
334    case socket:recvmsg(Sock, Length, 0, [], Timeout) of
335        {ok, #{iov := [Bin]}} ->
336            {ok, Bin};
337        {error, _} = ERROR ->
338            ERROR
339    end.
340
341
342send(#{sock := Sock, method := plain}, Bin) ->
343    socket:send(Sock, Bin);
344send(#{sock := Sock, method := msg}, Bin) ->
345    socket:sendmsg(Sock, #{iov => [Bin]}).
346
347
348shutdown(#{sock := Sock}, How) ->
349    socket:shutdown(Sock, How).
350
351
352sockname(#{sock := Sock}) ->
353    case socket:sockname(Sock) of
354	{ok, #{addr := Addr, port := Port}} ->
355	    {ok, {Addr, Port}};
356	{error, _} = ERROR ->
357	    ERROR
358    end.
359
360
361%% ==========================================================================
362
363reader_init(ControllingProcess, Sock, Async, Active, Method)
364  when is_pid(ControllingProcess) andalso
365       is_boolean(Async) andalso
366       (is_boolean(Active) orelse (Active =:= once)) andalso
367       ((Method =:= plain) orelse (Method =:= msg)) ->
368    put(verbose, false),
369    MRef = erlang:monitor(process, ControllingProcess),
370    reader_loop(#{ctrl_proc      => ControllingProcess,
371		  ctrl_proc_mref => MRef,
372                  async          => Async,
373                  select_info    => undefined,
374                  select_num     => 0, % Count the number of select messages
375		  active         => Active,
376		  sock           => Sock,
377                  method         => Method}).
378
379
380%% Never read
381reader_loop(#{active    := false,
382	      ctrl_proc := Pid} = State) ->
383    receive
384	{?MODULE, stop} ->
385            reader_exit(State, stop);
386
387	{?MODULE, Pid, controlling_process, NewPid} ->
388	    OldMRef = maps:get(ctrl_proc_mref, State),
389	    erlang:demonitor(OldMRef, [flush]),
390	    NewMRef = erlang:monitor(process, NewPid),
391	    Pid ! {?MODULE, self(), controlling_process},
392	    reader_loop(State#{ctrl_proc      => NewPid,
393			       ctrl_proc_mref => NewMRef});
394
395	{?MODULE, active, NewActive} ->
396	    reader_loop(State#{active => NewActive});
397
398	{'DOWN', MRef, process, Pid, Reason} ->
399	    case maps:get(ctrl_proc_mref, State) of
400		MRef ->
401                    reader_exit(State, {ctrl_exit, Reason});
402		_ ->
403		    reader_loop(State)
404	    end
405    end;
406
407%% Read *once* and then change to false
408reader_loop(#{active    := once,
409	      async     := false,
410              sock      := Sock,
411              method    := Method,
412	      ctrl_proc := Pid} = State) ->
413    case do_recv(Method, Sock) of
414	{ok, Data} ->
415	    Pid ! ?DATA_MSG(Sock, Method, Data),
416	    reader_loop(State#{active => false});
417	{error, timeout} ->
418	    receive
419		{?MODULE, stop} ->
420		    reader_exit(State, stop);
421
422		{?MODULE, Pid, controlling_process, NewPid} ->
423		    OldMRef = maps:get(ctrl_proc_mref, State),
424		    erlang:demonitor(OldMRef, [flush]),
425		    NewMRef = erlang:monitor(process, NewPid),
426		    Pid ! {?MODULE, self(), controlling_process},
427		    reader_loop(State#{ctrl_proc      => NewPid,
428				       ctrl_proc_mref => NewMRef});
429
430		{?MODULE, active, NewActive} ->
431		    reader_loop(State#{active => NewActive});
432
433		{'DOWN', MRef, process, Pid, Reason} ->
434		    case maps:get(ctrl_proc_mref, State) of
435                        MRef ->
436                            reader_exit(State, {ctrl_exit, Reason});
437			_ ->
438			    reader_loop(State)
439		    end
440	    after 0 ->
441		    reader_loop(State)
442	    end;
443
444	{error, closed} = E1 ->
445            Pid ! ?CLOSED_MSG(Sock, Method),
446            reader_exit(State, E1);
447
448	{error, Reason} = E2 ->
449	    Pid ! ?ERROR_MSG(Sock, Method, Reason),
450            reader_exit(State, E2)
451    end;
452reader_loop(#{active      := once,
453	      async       := true,
454              select_info := undefined,
455              sock        := Sock,
456              method      := Method,
457	      ctrl_proc   := Pid} = State) ->
458    case do_recv(Method, Sock, nowait) of
459        {select, SelectInfo} ->
460            reader_loop(State#{select_info => SelectInfo});
461	{ok, Data} ->
462	    Pid ! ?DATA_MSG(Sock, Method, Data),
463	    reader_loop(State#{active => false});
464
465	{error, closed} = E1 ->
466	    Pid ! ?CLOSED_MSG(Sock, Method),
467            reader_exit(State, E1);
468
469	{error, Reason} = E2 ->
470	    Pid ! ?ERROR_MSG(Sock, Method, Reason),
471            reader_exit(State, E2)
472    end;
473reader_loop(#{active      := once,
474	      async       := true,
475              select_info := {select_info, _, Ref},
476              select_num  := N,
477              sock        := Sock,
478              method      := Method,
479	      ctrl_proc   := Pid} = State) ->
480    receive
481        {?MODULE, stop} ->
482            reader_exit(State, stop);
483
484        {?MODULE, Pid, controlling_process, NewPid} ->
485            OldMRef = maps:get(ctrl_proc_mref, State),
486            erlang:demonitor(OldMRef, [flush]),
487            NewMRef = erlang:monitor(process, NewPid),
488            Pid ! {?MODULE, self(), controlling_process},
489            reader_loop(State#{ctrl_proc      => NewPid,
490                               ctrl_proc_mref => NewMRef});
491
492        {?MODULE, active, NewActive} ->
493            reader_loop(State#{active => NewActive});
494
495        {'DOWN', MRef, process, Pid, Reason} ->
496            case maps:get(ctrl_proc_mref, State) of
497                MRef ->
498                    reader_exit(State, {ctrl_exit, Reason});
499                _ ->
500                    reader_loop(State)
501            end;
502
503        {'$socket', Sock, select, Ref} ->
504            case do_recv(Method, Sock, nowait) of
505                {ok, Data} when is_binary(Data) ->
506                    Pid ! ?DATA_MSG(Sock, Method, Data),
507                    reader_loop(State#{active      => false,
508                                       select_info => undefined,
509                                       select_num  => N+1});
510
511                {error, closed} = E1 ->
512                    Pid ! ?CLOSED_MSG(Sock, Method),
513                    reader_exit(State, E1);
514
515                {error, Reason} = E2 ->
516                    Pid ! ?ERROR_MSG(Sock, Method, Reason),
517                    reader_exit(State, E2)
518            end
519    end;
520
521%% Read and forward data continuously
522reader_loop(#{active    := true,
523	      async     := false,
524	      sock      := Sock,
525              method    := Method,
526	      ctrl_proc := Pid} = State) ->
527    case do_recv(Method, Sock) of
528	{ok, Data} ->
529	    Pid ! ?DATA_MSG(Sock, Method, Data),
530	    reader_loop(State);
531	{error, timeout} ->
532	    receive
533		{?MODULE, stop} ->
534                    reader_exit(State, stop);
535
536		{?MODULE, Pid, controlling_process, NewPid} ->
537		    OldMRef = maps:get(ctrl_proc_mref, State),
538		    erlang:demonitor(OldMRef, [flush]),
539		    NewMRef = erlang:monitor(process, NewPid),
540		    Pid ! {?MODULE, self(), controlling_process},
541		    reader_loop(State#{ctrl_proc      => NewPid,
542				       ctrl_proc_mref => NewMRef});
543
544		{?MODULE, active, NewActive} ->
545		    reader_loop(State#{active => NewActive});
546
547		{'DOWN', MRef, process, Pid, Reason} ->
548		    case maps:get(ctrl_proc_mref, State) of
549			MRef ->
550                            reader_exit(State, {ctrl_exit, Reason});
551			_ ->
552			    reader_loop(State)
553		    end
554	    after 0 ->
555		    reader_loop(State)
556	    end;
557
558	{error, closed} = E1 ->
559	    Pid ! ?CLOSED_MSG(Sock, Method),
560            reader_exit(State, E1);
561
562	{error, Reason} = E2 ->
563	    Pid ! ?ERROR_MSG(Sock, Method, Reason),
564            reader_exit(State, E2)
565    end;
566reader_loop(#{active      := true,
567	      async       := true,
568              select_info := undefined,
569	      sock        := Sock,
570              method      := Method,
571	      ctrl_proc   := Pid} = State) ->
572    case do_recv(Method, Sock) of
573        {select, SelectInfo} ->
574            reader_loop(State#{select_info => SelectInfo});
575	{ok, Data} ->
576	    Pid ! ?DATA_MSG(Sock, Method, Data),
577	    reader_loop(State);
578
579	{error, closed} = E1 ->
580	    Pid ! ?CLOSED_MSG(Sock, Method),
581            reader_exit(State, E1);
582
583	{error, Reason} = E2 ->
584	    Pid ! ?ERROR_MSG(Sock, Method, Reason),
585            reader_exit(State, E2)
586    end;
587reader_loop(#{active      := true,
588	      async       := true,
589              select_info := {select_info, _, Ref},
590              select_num  := N,
591	      sock        := Sock,
592              method      := Method,
593	      ctrl_proc   := Pid} = State) ->
594    receive
595        {?MODULE, stop} ->
596            reader_exit(State, stop);
597
598        {?MODULE, Pid, controlling_process, NewPid} ->
599            OldMRef = maps:get(ctrl_proc_mref, State),
600            erlang:demonitor(OldMRef, [flush]),
601            NewMRef = erlang:monitor(process, NewPid),
602            Pid ! {?MODULE, self(), controlling_process},
603            reader_loop(State#{ctrl_proc      => NewPid,
604                               ctrl_proc_mref => NewMRef});
605
606        {?MODULE, active, NewActive} ->
607            reader_loop(State#{active => NewActive});
608
609        {'DOWN', MRef, process, Pid, Reason} ->
610            case maps:get(ctrl_proc_mref, State) of
611                MRef ->
612                    reader_exit(State, {ctrl_exit, Reason});
613                _ ->
614                    reader_loop(State)
615            end;
616
617        {'$socket', Sock, select, Ref} ->
618            case do_recv(Method, Sock, nowait) of
619                {ok, Data} when is_binary(Data) ->
620                    Pid ! ?DATA_MSG(Sock, Method, Data),
621                    reader_loop(State#{select_info => undefined,
622                                       select_num  => N+1});
623
624                {error, closed} = E1 ->
625                    Pid ! ?CLOSED_MSG(Sock, Method),
626                    reader_exit(State, E1);
627
628                {error, Reason} = E2 ->
629                    Pid ! ?ERROR_MSG(Sock, Method, Reason),
630                    reader_exit(State, E2)
631            end
632    end.
633
634
635do_recv(Method, Sock) ->
636    do_recv(Method, Sock, ?READER_RECV_TIMEOUT).
637
638do_recv(plain, Sock, Timeout) ->
639    socket:recv(Sock, 0, Timeout);
640do_recv(msg, Sock, Timeout) ->
641    case socket:recvmsg(Sock, 0, 0, [], Timeout) of
642        {ok, #{iov := [Bin]}} ->
643            {ok, Bin};
644        {select, _} = SELECT ->
645            SELECT;
646        {error, _} = ERROR ->
647            ERROR
648    end.
649
650
651reader_exit(#{async := false, active := Active}, stop) ->
652    vp("reader stopped when active: ~w", [Active]),
653    exit(normal);
654reader_exit(#{async       := true,
655              active      := Active,
656              select_info := SelectInfo,
657              select_num  := N}, stop) ->
658    vp("reader stopped when active: ~w"
659       "~n   Current select info:       ~p"
660       "~n   Number of select messages: ~p", [Active, SelectInfo, N]),
661    exit(normal);
662reader_exit(#{async := false, active := Active}, {ctrl_exit, normal}) ->
663    vp("reader ctrl exit when active: ~w", [Active]),
664    exit(normal);
665reader_exit(#{async       := true,
666              active      := Active,
667              select_info := SelectInfo,
668              select_num  := N}, {ctrl_exit, normal}) ->
669    vp("reader ctrl exit when active: ~w"
670       "~n   Current select info:       ~p"
671       "~n   Number of select messages: ~p", [Active, SelectInfo, N]),
672    exit(normal);
673reader_exit(#{async := false, active := Active}, {ctrl_exit, Reason}) ->
674    vp("reader exit when ctrl crash when active: ~w", [Active]),
675    exit({controlling_process, Reason});
676reader_exit(#{async       := true,
677              active      := Active,
678              select_info := SelectInfo,
679              select_num  := N}, {ctrl_exit, Reason}) ->
680    vp("reader exit when ctrl crash when active: ~w"
681       "~n   Current select info:       ~p"
682       "~n   Number of select messages: ~p", [Active, SelectInfo, N]),
683    exit({controlling_process, Reason});
684reader_exit(#{async := false, active := Active}, {error, closed}) ->
685    vp("reader exit when socket closed when active: ~w", [Active]),
686    exit(normal);
687reader_exit(#{async       := true,
688              active      := Active,
689              select_info := SelectInfo,
690              select_num  := N}, {error, closed}) ->
691    vp("reader exit when socket closed when active: ~w "
692       "~n   Current select info:       ~p"
693       "~n   Number of select messages: ~p", [Active, SelectInfo, N]),
694    exit(normal);
695reader_exit(#{async := false, active := Active}, {error, Reason}) ->
696    vp("reader exit when socket error when active: ~w", [Active]),
697    exit(Reason);
698reader_exit(#{async       := true,
699              active      := Active,
700              select_info := SelectInfo,
701              select_num  := N}, {error, Reason}) ->
702    vp("reader exit when socket error when active: ~w: "
703       "~n   Current select info:       ~p"
704       "~n   Number of select messages: ~p", [Active, SelectInfo, N]),
705    exit(Reason).
706
707
708
709
710
711
712%% ==========================================================================
713
714vp(F, A) ->
715    vp(get(verbose), F, A).
716
717vp(true, F, A) ->
718    p(F, A);
719vp(_, _, _) ->
720    ok.
721
722p(F, A) ->
723    io:format(F ++ "~n", A).
724
725