1%%
2%% %CopyrightBegin%
3%%
4%% Copyright Ericsson AB 2021-2021. All Rights Reserved.
5%%
6%% Licensed under the Apache License, Version 2.0 (the "License");
7%% you may not use this file except in compliance with the License.
8%% You may obtain a copy of the License at
9%%
10%%     http://www.apache.org/licenses/LICENSE-2.0
11%%
12%% Unless required by applicable law or agreed to in writing, software
13%% distributed under the License is distributed on an "AS IS" BASIS,
14%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15%% See the License for the specific language governing permissions and
16%% limitations under the License.
17%%
18%% %CopyrightEnd%
19%%
20
21%% =======================================================================
22%% This is just a simple utility that sets op several socket connections
23%% and monitors. Data is exchanged.
24%% This is done to make it possible to see that the socket sections of
25%% observer works.
26%% =======================================================================
27
28-module(observer_socket_test).
29
30-export([start/0]).
31
32-define(NUM_ACCEPTORS, 5).
33
34-define(MTAG,    42).
35-define(PRINT,   01).
36-define(REQUEST, 02).
37-define(REPLY,   03).
38
39-define(SOCKET_MONITOR, whereis(socket_monitor)).
40
41-define(MSG_DATA1, <<"This is test data 0123456789 0123456789 0123456789">>).
42-define(MSG_DATA2, <<"This is test data 0123456789 0123456789 0123456789"
43                     "This is test data 0123456789 0123456789 0123456789"
44                     "This is test data 0123456789 0123456789 0123456789"
45                     "This is test data 0123456789 0123456789 0123456789"
46                     "This is test data 0123456789 0123456789 0123456789"
47                     "This is test data 0123456789 0123456789 0123456789"
48                     "This is test data 0123456789 0123456789 0123456789"
49                     "This is test data 0123456789 0123456789 0123456789"
50                     "This is test data 0123456789 0123456789 0123456789"
51                     "This is test data 0123456789 0123456789 0123456789">>).
52-define(MSG_DATA3, <<"This is test data 0123456789 0123456789 0123456789"
53                     "This is test data 0123456789 0123456789 0123456789"
54                     "This is test data 0123456789 0123456789 0123456789"
55                     "This is test data 0123456789 0123456789 0123456789"
56                     "This is test data 0123456789 0123456789 0123456789"
57                     "This is test data 0123456789 0123456789 0123456789"
58                     "This is test data 0123456789 0123456789 0123456789"
59                     "This is test data 0123456789 0123456789 0123456789"
60                     "This is test data 0123456789 0123456789 0123456789"
61                     "This is test data 0123456789 0123456789 0123456789"
62		     "This is test data 0123456789 0123456789 0123456789"
63                     "This is test data 0123456789 0123456789 0123456789"
64                     "This is test data 0123456789 0123456789 0123456789"
65                     "This is test data 0123456789 0123456789 0123456789"
66                     "This is test data 0123456789 0123456789 0123456789"
67                     "This is test data 0123456789 0123456789 0123456789"
68                     "This is test data 0123456789 0123456789 0123456789"
69                     "This is test data 0123456789 0123456789 0123456789"
70                     "This is test data 0123456789 0123456789 0123456789"
71                     "This is test data 0123456789 0123456789 0123456789"
72		     "This is test data 0123456789 0123456789 0123456789"
73                     "This is test data 0123456789 0123456789 0123456789"
74                     "This is test data 0123456789 0123456789 0123456789"
75                     "This is test data 0123456789 0123456789 0123456789"
76                     "This is test data 0123456789 0123456789 0123456789"
77                     "This is test data 0123456789 0123456789 0123456789"
78                     "This is test data 0123456789 0123456789 0123456789"
79                     "This is test data 0123456789 0123456789 0123456789"
80                     "This is test data 0123456789 0123456789 0123456789"
81                     "This is test data 0123456789 0123456789 0123456789"
82		     "This is test data 0123456789 0123456789 0123456789"
83                     "This is test data 0123456789 0123456789 0123456789"
84                     "This is test data 0123456789 0123456789 0123456789"
85                     "This is test data 0123456789 0123456789 0123456789"
86                     "This is test data 0123456789 0123456789 0123456789"
87                     "This is test data 0123456789 0123456789 0123456789"
88                     "This is test data 0123456789 0123456789 0123456789"
89                     "This is test data 0123456789 0123456789 0123456789"
90                     "This is test data 0123456789 0123456789 0123456789"
91                     "This is test data 0123456789 0123456789 0123456789"
92		     "This is test data 0123456789 0123456789 0123456789"
93                     "This is test data 0123456789 0123456789 0123456789"
94                     "This is test data 0123456789 0123456789 0123456789"
95                     "This is test data 0123456789 0123456789 0123456789"
96                     "This is test data 0123456789 0123456789 0123456789"
97                     "This is test data 0123456789 0123456789 0123456789"
98                     "This is test data 0123456789 0123456789 0123456789"
99                     "This is test data 0123456789 0123456789 0123456789"
100                     "This is test data 0123456789 0123456789 0123456789"
101                     "This is test data 0123456789 0123456789 0123456789"
102		     "This is test data 0123456789 0123456789 0123456789"
103                     "This is test data 0123456789 0123456789 0123456789"
104                     "This is test data 0123456789 0123456789 0123456789"
105                     "This is test data 0123456789 0123456789 0123456789"
106                     "This is test data 0123456789 0123456789 0123456789"
107                     "This is test data 0123456789 0123456789 0123456789"
108                     "This is test data 0123456789 0123456789 0123456789"
109                     "This is test data 0123456789 0123456789 0123456789"
110                     "This is test data 0123456789 0123456789 0123456789"
111                     "This is test data 0123456789 0123456789 0123456789"
112		     "This is test data 0123456789 0123456789 0123456789"
113                     "This is test data 0123456789 0123456789 0123456789"
114                     "This is test data 0123456789 0123456789 0123456789"
115                     "This is test data 0123456789 0123456789 0123456789"
116                     "This is test data 0123456789 0123456789 0123456789"
117                     "This is test data 0123456789 0123456789 0123456789"
118                     "This is test data 0123456789 0123456789 0123456789"
119                     "This is test data 0123456789 0123456789 0123456789"
120                     "This is test data 0123456789 0123456789 0123456789"
121                     "This is test data 0123456789 0123456789 0123456789"
122		     "This is test data 0123456789 0123456789 0123456789"
123                     "This is test data 0123456789 0123456789 0123456789"
124                     "This is test data 0123456789 0123456789 0123456789"
125                     "This is test data 0123456789 0123456789 0123456789"
126                     "This is test data 0123456789 0123456789 0123456789"
127                     "This is test data 0123456789 0123456789 0123456789"
128                     "This is test data 0123456789 0123456789 0123456789"
129                     "This is test data 0123456789 0123456789 0123456789"
130                     "This is test data 0123456789 0123456789 0123456789"
131                     "This is test data 0123456789 0123456789 0123456789"
132		     "This is test data 0123456789 0123456789 0123456789"
133                     "This is test data 0123456789 0123456789 0123456789"
134                     "This is test data 0123456789 0123456789 0123456789"
135                     "This is test data 0123456789 0123456789 0123456789"
136                     "This is test data 0123456789 0123456789 0123456789"
137                     "This is test data 0123456789 0123456789 0123456789"
138                     "This is test data 0123456789 0123456789 0123456789"
139                     "This is test data 0123456789 0123456789 0123456789"
140                     "This is test data 0123456789 0123456789 0123456789"
141                     "This is test data 0123456789 0123456789 0123456789"
142		     "This is test data 0123456789 0123456789 0123456789"
143                     "This is test data 0123456789 0123456789 0123456789"
144                     "This is test data 0123456789 0123456789 0123456789"
145                     "This is test data 0123456789 0123456789 0123456789"
146                     "This is test data 0123456789 0123456789 0123456789"
147                     "This is test data 0123456789 0123456789 0123456789"
148                     "This is test data 0123456789 0123456789 0123456789"
149                     "This is test data 0123456789 0123456789 0123456789"
150                     "This is test data 0123456789 0123456789 0123456789"
151                     "This is test data 0123456789 0123456789 0123456789">>).
152-define(DATA, ?MSG_DATA1).
153
154
155start() ->
156    put(sname, "starter"),
157    %% put(debug, true),
158    i("try start socket-monitor"),
159    SockMon = start_socket_monitor(),
160    i("try start server"),
161    Domain = inet,
162    Type   = stream,
163    Proto  = tcp,
164    {Server, SockAddr}  = start_server(Domain, Type, Proto),
165    i("try start client"),
166    Client = start_client(Domain, Type, Proto, SockAddr),
167    i("done"),
168    {SockMon, Server, Client}.
169
170
171
172%% =======================================================================
173%% Socket Monitor
174%% =======================================================================
175
176start_socket_monitor() ->
177    Self = self(),
178    {SockMon, MRef} = spawn_monitor(fun() -> socket_monitor_start(Self) end),
179    receive
180	{'DOWN', MRef, process, SockMon, Reason} ->
181	    e("received unexpected down message from socket-monitor:"
182	      "~n   ~p", [Reason]),
183	    exit({socket_monitor, Reason});
184	{SockMon, ready} ->
185	    SockMon
186    end.
187
188socket_monitor_start(Parent) ->
189    put(sname, "socket-monitor"),
190    put(debug, true),
191    erlang:register(socket_monitor, self()),
192    Parent ! {self(), ready},
193    %% i("started"),
194    socket_monitor_loop(#{}).
195
196
197socket_monitor_loop(State) ->
198    receive
199	{'DOWN', MRef, Kind, Sock, Info} when (Kind =:= socket) orelse
200					      (Kind =:= port) ->
201	    {{Sock, SockStr, _Pid}, State2} = maps:take(MRef, State),
202	    i("received (socket) DOWN message: ~s"
203	      "~n   ~p", [SockStr, Info]),
204	    socket_monitor_loop(State2);
205
206	{monitor, Sock, SockStr, Fun, Pid} ->
207	    i("request to monitor socket: ~s", [SockStr]),
208	    MRef = Fun(Sock),
209	    socket_monitor_loop(State#{MRef => {Sock, SockStr, Pid}})
210    end.
211
212
213
214%% =======================================================================
215%% Server
216%% The server(s) processes are implemented on plain socket.
217%% =======================================================================
218
219start_server(Domain, Type, Proto) ->
220    Self = self(),
221    {Listener, MRef} =
222	spawn_monitor(fun() ->
223			      listener_start(Self, Domain, Type, Proto)
224		      end),
225    receive
226	{'DOWN', MRef, process, Listener, Reason} ->
227	    e("received unexpected down message from listener:"
228	      "~n   ~p", [Reason]),
229	    exit({listener, Reason});
230	{Listener, ready, SockAddr} ->
231	    {Listener, SockAddr}
232    end.
233
234
235listener_start(Parent, Domain, Type, Proto)
236  when (Type =:= stream) andalso (Proto =:= tcp) ->
237    put(sname, "listener"),
238    %% put(debug, true),
239    i("starting"),
240    LSock     = listener_create_lsock(Domain, Type, Proto),
241    LSM       = socket:monitor(LSock),
242    ?SOCKET_MONITOR !
243	{monitor, LSock, string_of(socket, LSock),
244	 sockmon_fun(socket), self()},
245    Acceptors = listener_create_acceptors(LSock),
246    {ok, SockAddr} = socket:sockname(LSock),
247    Parent ! {self(), ready, SockAddr},
248    i("started"),
249    listener_loop(#{lsock => LSock, lmon => LSM, accs => Acceptors}).
250
251listener_loop(#{lsock := Sock} = State) ->
252    receive
253	{'DOWN', _MRef, socket, Sock, Reason} ->
254	    e("unexpected (socket) down received: "
255	      "~n   ~p", [Reason]),
256	    listener_exit(socket_down, Reason);
257
258	{'DOWN', _MRef, process, Pid, Reason} ->
259	    i("unexpected (process ~p) down received: "
260	      "~n   ~p", [Pid, Reason]),
261	    listener_loop(listener_handle_down(State, Pid, Reason))
262    end.
263
264listener_handle_down(#{lsock := LSock, accs := Acceptors0} = State,
265		     Pid, Reason) ->
266    case maps:remove(Pid, Acceptors0) of
267	Acceptors0 ->
268	    i("unexpected down from unknown process ~p received: "
269	      "~n   ~p", [Pid, Reason]),
270	    State;
271	_Acceptors1 ->
272	    %% We could create a new acceptor here, but we need to make
273	    %% sure we do not end up in a create-die loop, easier to just
274	    %% assume *they should never die*...
275	    e("unexpected down from acceptor process ~p received: "
276	      "~n   ~p", [Pid, Reason]),
277	    (catch socket:close(LSock)),
278	    listener_exit(acceptor_down, Reason)
279    end.
280
281listener_create_lsock(Domain, Type, Proto) ->
282    i("try extract local address"),
283    Addr = case which_local_addr(Domain) of
284	       {ok, A} ->
285		   A;
286	       {error, Reason} ->
287		   listener_exit(failed_local_addr, Reason)
288	   end,
289    i("try create socket"),
290    LSock = case socket:open(Domain, Type, Proto) of
291		{ok, S} ->
292		    S;
293		{error, Reason1} ->
294		    listener_exit(failed_socket_open, Reason1)
295	    end,
296    i("try bind socket"),
297    case socket:bind(LSock, #{family => Domain,
298			      addr   => Addr,
299			      port   => 0}) of
300	ok ->
301	    ok;
302	{error, Reason2} ->
303	    listener_exit(failed_socket_bind, Reason2)
304    end,
305    i("try make listen socket"),
306    case socket:listen(LSock, 10) of
307	ok ->
308	    ok;
309	{error, Reason3} ->
310	    listener_exit(failed_socket_listen, Reason3)
311    end,
312    i("listen socket created"),
313    LSock.
314
315
316listener_create_acceptors(LSock) ->
317    listener_create_acceptors(LSock, 1, []).
318
319listener_create_acceptors(_LSock, ID, Acc) when (ID > ?NUM_ACCEPTORS) ->
320    maps:from_list(Acc);
321listener_create_acceptors(LSock, ID, Acc) ->
322    i("try create acceptor ~w", [ID ]),
323    Acceptor = listener_create_acceptor(LSock, ID),
324    listener_create_acceptors(LSock, ID+1, [Acceptor|Acc]).
325
326listener_create_acceptor(LSock, ID) ->
327    Self     = self(),
328    Acceptor = {Pid, MRef} =
329	spawn_monitor(fun() -> acceptor_start(Self, LSock, ID) end),
330    receive
331	{'$socket', LSock, abort, Info} ->
332	    e("received unexpected select abort: "
333	      "~n   ~p", [Info]),
334	    listener_exit(abort, Info);
335
336	{'DOWN', MRef, process, Pid, Reason} ->
337	    e("received unexpected acceptor ~w down: "
338	      "~n   ~p", [ID, Reason]),
339	    listener_exit(acceptor_start, Reason);
340
341	{Pid, ready} ->
342	    i("received expected acceptor ~w ready", [ID]),
343	    Acceptor
344    end.
345
346listener_exit(Tag, Reason) ->
347    exit({listener, Tag, Reason}).
348
349
350%% ---
351
352acceptor_start(Listener, LSock, ID) ->
353    put(sname, f("acceptor[~w]", [ID])),
354    %% put(debug, true),
355    MRef = erlang:monitor(process, Listener),
356    Listener ! {self(), ready},
357    i("started"),
358    acceptor_loop(#{id       => ID,
359		    hid      => 1,
360		    listener => {Listener, MRef},
361		    lsock    => LSock,
362		    select   => undefined,
363		    handlers => []}).
364
365acceptor_loop(#{select := undefined} = State) ->
366    acceptor_loop(acceptor_try_accept(State));
367
368acceptor_loop(State) ->
369    acceptor_loop(acceptor_try_select(State)).
370
371acceptor_try_accept(#{lsock := LSock} = State) ->
372    i("try accept (nowait)"),
373    case socket:accept(LSock, nowait) of
374	{ok, ASock} ->
375	    %% i("accepted - spawn handler"),
376	    ?SOCKET_MONITOR !
377		{monitor, ASock, string_of(socket, ASock),
378		 sockmon_fun(socket), self()},
379	    acceptor_create_handler(State, ASock);
380	{select, SelectInfo} ->
381	    %% i("selected:"
382	    %%   "~n   ~p", [SelectInfo]),
383	    State#{select => SelectInfo};
384	{error, Reason} ->
385	    e("accept failed: "
386	      "~n   ~p", [Reason]),
387	    acceptor_exit(State, accept, Reason)
388    end.
389
390
391acceptor_try_select(
392  #{lsock    := LSock,
393    listener := {Listener, MRef},
394    select   := {select_info, _, Info} = SelectInfo} = State) ->
395    i("await select message"),
396    receive
397	{'$socket', LSock, abort, Info} ->
398	    e("received unexpected select abort: "
399	      "~n   ~p", [Info]),
400	    acceptor_exit(State, abort, Info);
401
402	{'$socket', LSock, select, Info} ->
403	    i("received select message: "
404	      "~n   ~p", [Info]),
405	    case socket:accept(LSock) of
406		{ok, ASock} ->
407		    %% i("accepted - spawn handler"),
408		    ?SOCKET_MONITOR !
409			{monitor, ASock, string_of(socket, ASock),
410			 sockmon_fun(socket), self()},
411		    acceptor_create_handler(State#{select => undefined},
412					    ASock);
413		{error, Reason} ->
414		    e("accept failed: "
415		      "~n   ~p", [Reason]),
416		    %% This is a bit overkill, but just to be on the safe side
417		    (catch socket:cancel(LSock, SelectInfo)),
418		    acceptor_exit(State, post_select_accept, Reason)
419	    end;
420
421	{'DOWN', MRef, process, Listener, Reason} ->
422	    e("listener down received: "
423	      "~n   ~p", [Reason]),
424	    (catch socket:cancel(LSock, SelectInfo)),
425	    acceptor_exit(State, listener, Reason);
426
427	{'DOWN', _MRef, process, _Pid, {{handler,_,_HID},recv,normal}} ->
428	    %% i("received normal exit from handler ~w (~p)", [_HID, _Pid]),
429	    acceptor_try_select(State);
430
431	Any ->
432	    i("received unexpected message: "
433	      "~n   ~p"
434	      "~nwhen"
435	      "~n   LSock: ~p"
436	      "~n   Info:  ~p", [Any, LSock, Info]),
437	    acceptor_try_select(State)
438
439    end.
440
441
442acceptor_create_handler(#{listener := {Listener, MRef},
443			  hid      := HID} = State, ASock) ->
444    Self    = self(),
445    _Handler = {HPid, HMRef} =
446	spawn_monitor(fun() -> handler_start(Self, HID) end),
447    receive
448	{'$socket', ASock, abort, Info} ->
449	    e("received unexpected select abort: "
450	      "~n   ~p", [Info]),
451	    acceptor_exit(State, abort, Info);
452
453	{'DOWN', MRef, process, Listener, Reason} ->
454	    e("listener down received: "
455	      "~n   ~p", [Reason]),
456	    exit(HPid, kill),
457	    acceptor_exit(State, listener, Reason);
458
459	{'DOWN', HMRef, process, HPid, Reason} ->
460	    e("new handler (~p) down received: "
461	      "~n   ~p", [HPid, Reason]),
462	    acceptor_exit(State, handler, Reason);
463
464	{HPid, ready} ->
465	    case socket:setopt(ASock, otp, controlling_process, HPid) of
466		ok ->
467		    HPid ! {continue, ASock},
468		    State#{hid => HID+1};
469		{error, Reason} ->
470		    e("failed changing controlling process: "
471		      "~n   ~p", [Reason]),
472		    (catch socket:close(ASock)),
473		    exit(HPid, kill),
474		    acceptor_exit(State, failed_changing_ctrl_proc, Reason)
475	    end
476    end.
477
478
479acceptor_exit(#{id := ID}, Tag, Reason) ->
480    exit({{acceptor, ID}, Tag, Reason}).
481
482
483%% ---
484
485handler_start(Parent, ID) ->
486    put(sname, f("handler[~w]", [ID])),
487    %% put(debug, true),
488    i("starting"),
489    MRef = erlang:monitor(process, Parent),
490    Parent ! {self(), ready},
491    receive
492	{'DOWN', MRef, process, Parent, Reason} ->
493	    e("parent (~p) down received: "
494	      "~n   ~p", [Parent, Reason]),
495	    handler_exit(#{id => undefined, parent => {Parent, undefined}},
496			 parent, Reason);
497
498	{continue, Sock} ->
499	    i("started"),
500	    handler_loop(#{id     => ID,
501			   parent => {Parent, MRef},
502			   sock   => Sock,
503			   select => undefined,
504			   buf    => <<>>})
505    end.
506
507
508handler_loop(#{select := undefined} = State) ->
509    handler_loop(handler_try_read(State));
510handler_loop(State) ->
511    handler_loop(handler_try_select(State)).
512
513
514handler_try_read(#{sock := Sock} = State) ->
515    case socket:recv(Sock) of
516	{ok, Data} ->
517	    handler_process_data(State, Data);
518	{select, SelectInfo} -> %% Will never happen with this, recv/1, call...
519	    State#{select => SelectInfo};
520	{error, closed} ->
521	    i("recv got closed => client done"),
522	    handler_exit(State, recv, normal);
523	{error, Reason} ->
524	    e("recv failed: "
525	      "~n   ~p", [Reason]),
526	    (catch socket:close(Sock)),
527	    handler_exit(State, recv, Reason)
528    end.
529
530handler_try_select(
531  #{sock   := Sock,
532    parent := {Parent, MRef},
533    select := {select_info, _, Info} = SelectInfo} = State) ->
534    receive
535	{'$socket', Sock, abort, Info} ->
536	    e("received unexpected select abort: "
537	      "~n   ~p", [Info]),
538	    handler_exit(State, abort, Info);
539
540	{'$socket', Sock, select, Info} ->
541	    case socket:recv(Sock) of
542		{ok, Data} ->
543		    handler_process_data(State#{select => undefined}, Data);
544		{error, Reason} ->
545		    i("recv failed: "
546		      "~n   ~p", [Reason]),
547		    (catch socket:close(Sock)),
548		    handler_exit(State, recv, Reason)
549	    end;
550	{'DOWN', MRef, process, Parent, Reason} ->
551	    e("parent down received: "
552	      "~n   ~p", [Reason]),
553	    (catch socket:cancel(Sock, SelectInfo)),
554	    (catch socket:close(Sock)),
555	    handler_exit(State, parent, Reason)
556    end.
557
558handler_process_data(#{buf := <<>>} = State, NewData) ->
559    handler_process_data(State#{buf => NewData});
560handler_process_data(State, <<>>) ->
561    handler_process_data(State);
562handler_process_data(#{buf := Buf} = State, NewData) ->
563    handler_process_data(State#{buf => <<Buf/binary, NewData/binary>>}).
564
565handler_process_data(#{buf := <<?MTAG:32,
566			       ID:32,
567			       TYPE:32,
568			       SZ:32,
569			       Data:SZ/binary,
570			       Rest/binary>>} = State) ->
571    handler_process_data(State, ID, TYPE, Data),
572    handler_process_data(State#{buf => Rest});
573handler_process_data(State) ->
574    State.
575
576
577handler_process_data(_State, ID, ?PRINT, Data) ->
578    i("~w: print"
579      "~n   ~p", [ID, erlang:binary_to_list(Data)]);
580handler_process_data(#{sock := Sock} = State, ID, ?REQUEST, Data) ->
581    SZ = size(Data),
582    Reply = <<?MTAG:32, ID:32, ?REPLY:32, SZ:32, Data/binary>>,
583    case socket:send(Sock, Reply) of
584	ok ->
585	    ok;
586	{error, Reason} ->
587	    e("failed sending reply for request ~w: "
588	      "~n   ~p", [ID, Reason]),
589	    (catch socket:close(Sock)),
590	    handler_exit(State, recv, Reason)
591    end.
592
593handler_exit(#{id := ID, parent := {Pid, _}}, Tag, Reason) ->
594    exit({{handler, Pid, ID}, Tag, Reason}).
595
596
597%% =======================================================================
598%% Client
599%% Of the client(s), one is implemented on gen_tcp with
600%% inet_backend = socket and the other with inet_backend = inet.
601%% The clients run for a period of 30 sec - 2 min, and then die.
602%% Then a new is created!
603%% Each client sends simple requests (with some dummy date), and
604%% also once every 15 seconds an 'print' message (which the server
605%% is to simply print the data (which is supposed to be a string).
606%% =======================================================================
607
608start_client(Domain, Type, Proto, ServerSockAddr) ->
609    Self = self(),
610    Client = {Pid, MRef} =
611	spawn_monitor(fun() ->
612			      client_ctrl_start(Self,
613						Domain, Type, Proto,
614						ServerSockAddr)
615		      end),
616    receive
617	{'DOWN', MRef, process, Pid, Reason} ->
618	    e("received unexpected down message from client (~p):"
619	      "~n   ~p", [Pid, Reason]),
620	    exit({client, Reason});
621
622	{Pid, ready} ->
623	    i("received expected ready from client"),
624	    Client
625    end.
626
627
628client_ctrl_start(Parent,
629		  Domain, Type, Proto,
630		  ServerSockAddr) ->
631    put(sname, "client-ctrl"),
632    %% put(debug, true),
633    State0 = #{parent   => Parent,
634	       domain   => Domain,
635	       type     => Type,
636	       protocol => Proto,
637	       server   => ServerSockAddr,
638	       cid      => 1,
639	       clients  => #{}},
640    State1 = start_gen_client(State0, socket),
641    State2 = start_gen_client(State1, inet),
642    State3 = start_esock_client(State2),
643    State4 = start_esock_client(State3),
644    Parent ! {self(), ready},
645    i("started"),
646    client_ctrl_loop(State4).
647
648
649client_ctrl_loop(State) ->
650    receive
651	{'DOWN', MRef, process, Pid, Reason} ->
652	    client_ctrl_loop(client_ctrl_handle_down(State, MRef, Pid, Reason))
653    end.
654
655client_ctrl_handle_down(#{clients := Clients0} = State,
656			MRef, Pid, Reason) ->
657    case maps:take(Pid, Clients0) of
658	{{ID, MRef, Backend}, Clients1} ->
659	    i("received down from (gen) ~w-client ~w (~p): "
660	      "~n   ~p", [Backend, ID, Pid, Reason]),
661	    start_gen_client(State#{clients => Clients1}, Backend);
662	{{ID, MRef}, Clients1} ->
663	    i("received down from (esock) ~w-client ~w (~p): "
664	      "~n   ~p", [ID, Pid, Reason]),
665	    start_esock_client(State#{clients => Clients1});
666	error ->
667	    i("received down from unknown process ~p: "
668	      "~n   ~p", [Pid, Reason]),
669	    State
670    end.
671
672client_ctrl_exit(Tag, Reason) ->
673    exit({'client-ctrl', Tag, Reason}).
674
675
676start_gen_client(#{domain   := Domain,
677		   type     := Type,
678		   protocol := Proto,
679		   server   := #{addr := Addr, port := Port},
680		   cid      := ID,
681		   clients  := Clients0} = State, Backend) ->
682    Self = self(),
683    i("try start (gen) client ~w", [ID]),
684    LifeTime = rand:uniform(timer:minutes(3)) + timer:minutes(2),
685    {Pid, MRef} =
686	spawn_monitor(fun() ->
687			      gen_client_start(Self,
688					       Backend, ID,
689					       LifeTime,
690					       Domain, Type, Proto,
691					       Addr, Port)
692		      end),
693    receive
694	{'DOWN', MRef, process, Pid, Reason} ->
695	    e("received unexpected down message from client ~w (~p):"
696	      "~n   ~p", [ID, Pid, Reason]),
697	    client_ctrl_exit({client, ID}, Reason);
698
699	{Pid, ready} ->
700	    i("received expected ready from client ~w (~p)", [ID, Pid]),
701	    Clients1 = Clients0#{Pid => {ID, MRef, Backend}},
702	    State#{cid => ID + 1, clients => Clients1}
703    end.
704
705
706start_esock_client(#{domain   := Domain,
707		     type     := Type,
708		     protocol := Proto,
709		     server   := #{addr := Addr, port := Port},
710		     cid      := ID,
711		     clients  := Clients0} = State) ->
712    Self = self(),
713    i("try start (esock) client ~w", [ID]),
714    LifeTime = rand:uniform(timer:minutes(3)) + timer:minutes(2),
715    {Pid, MRef} =
716	spawn_monitor(fun() ->
717			      esock_client_start(Self,
718						 ID,
719						 LifeTime,
720						 Domain, Type, Proto,
721						 Addr, Port)
722		      end),
723    receive
724	{'DOWN', MRef, process, Pid, Reason} ->
725	    e("received unexpected down message from client ~w (~p):"
726	      "~n   ~p", [ID, Pid, Reason]),
727	    client_ctrl_exit({client, ID}, Reason);
728
729	{Pid, ready} ->
730	    i("received expected ready from client ~w (~p)", [ID, Pid]),
731	    Clients1 = Clients0#{Pid => {ID, MRef}},
732	    State#{cid => ID + 1, clients => Clients1}
733    end.
734
735
736gen_client_start(Parent, Backend, ID,
737		 LifeTime,
738		 Domain, Type, Proto,
739		 ServerAddr, ServerPort)
740  when (Type =:= stream) andalso (Proto =:= tcp) ->
741    put(sname, f("gen-client[~w,~w]", [Backend, ID])),
742    %% put(debug, true),
743    i("starting"),
744    State = gen_client_connect(#{id          => ID,
745				 backend     => Backend,
746				 domain      => Domain,
747				 type        => Type,
748				 protocol    => Proto,
749				 server_addr => ServerAddr,
750				 server_port => ServerPort}),
751    erlang:send_after(LifeTime, self(), terminate),
752    MRef = erlang:monitor(process, Parent),
753    Parent ! {self(), ready},
754    i("started"),
755    gen_client_loop(State#{condition => send_request,
756			   mid       => 1,
757			   parent    => Parent,
758			   mref      => MRef,
759			   buf       => <<>>}).
760
761
762gen_client_connect(#{backend     := Backend,
763		     type        := Type,
764		     protocol    := Proto,
765		     server_addr := ServerAddr,
766		     server_port := ServerPort} = State)
767  when (Type =:= stream) andalso (Proto =:= tcp) ->
768    COpts = [{inet_backend, Backend}, {active, true}, binary],
769    i("try connect to ~s:~w",
770      [inet_parse:ntoa(ServerAddr), ServerPort]),
771    case gen_tcp:connect(ServerAddr, ServerPort, COpts) of
772	{ok, Sock} ->
773	    %% i("connected"),
774	    ?SOCKET_MONITOR !
775		{monitor, Sock,
776		 string_of(inet, Sock),
777		 sockmon_fun(inet), self()},
778	    State#{sock => Sock};
779	{error, Reason} ->
780	    e("failed connecting: "
781	      "~n   ~p", [Reason]),
782	    client_exit(State, connect, Reason)
783    end.
784
785
786esock_client_start(Parent, ID,
787		   LifeTime,
788		   Domain, Type, Proto,
789		   ServerAddr, ServerPort)
790  when (Type =:= stream) andalso (Proto =:= tcp) ->
791    put(sname, f("esock-client[~w]", [ID])),
792    %% put(debug, true),
793    i("starting"),
794    State = esock_client_connect(#{id          => ID,
795				   domain      => Domain,
796				   type        => Type,
797				   protocol    => Proto,
798				   server_addr => ServerAddr,
799				   server_port => ServerPort}),
800    erlang:send_after(LifeTime, self(), terminate),
801    MRef = erlang:monitor(process, Parent),
802    Parent ! {self(), ready},
803    i("started"),
804    esock_client_loop(State#{condition => send_request,
805			     select    => undefined,
806			     mid       => 1,
807			     parent    => Parent,
808			     mref      => MRef,
809			     buf       => <<>>}).
810
811
812esock_client_connect(#{domain      := Domain,
813		       type        := Type,
814		       protocol    := Proto,
815		       server_addr := ServerAddr,
816		       server_port := ServerPort} = State)
817  when (Type =:= stream) andalso (Proto =:= tcp) ->
818    i("try open socket"),
819    Sock =
820	case socket:open(Domain, Type, Proto) of
821	    {ok, S} ->
822		i("opened"),
823		S;
824	    {error, Reason1} ->
825		e("failed open: "
826		  "~n   ~p", [Reason1]),
827		client_exit(State, open, Reason1)
828	end,
829    %% We are on the same machine (as the server), so just reuse that address
830    i("try bind to ~s", [inet_parse:ntoa(ServerAddr)]),
831    case socket:bind(Sock, #{family => Domain, addr => ServerAddr}) of
832	ok ->
833	    i("bound"),
834	    ok;
835	{error, Reason2} ->
836	    e("failed bind: "
837	      "~n   ~p", [Reason2]),
838	    (catch socket:close(Sock)),
839	    client_exit(State, bind, Reason2)
840    end,
841    i("try connect to ~s:~w", [inet_parse:ntoa(ServerAddr), ServerPort]),
842    case socket:connect(Sock, #{family => Domain,
843				addr   => ServerAddr,
844				port   => ServerPort}) of
845	ok ->
846	    i("connected"),
847	    ?SOCKET_MONITOR !
848		{monitor, Sock,
849		 string_of(socket, Sock),
850		 sockmon_fun(socket), self()},
851	    State#{sock => Sock};
852	{error, Reason3} ->
853	    e("failed connecting: "
854	      "~n   ~p", [Reason3]),
855	    (catch socket:close(Sock)),
856	    client_exit(State, connect, Reason3)
857    end.
858
859
860gen_client_loop(#{condition := terminate,
861		  sock      := Sock}) ->
862    (catch gen_tcp:close(Sock)),
863    exit(normal);
864gen_client_loop(#{condition := {await_reply, _MID},
865		  parent    := Parent,
866		  sock      := Sock} = State) ->
867    receive
868	{'DOWN', _MRef, process, Parent, Reason} ->
869	    e("unexpected down from parent received: "
870	      "~n   ~p", [Reason]),
871	    client_exit(State, parent_down, Reason);
872
873	{tcp, Sock, Data} ->
874	    %% i("received (~w bytes of) data", [size(Data)]),
875	    gen_client_loop(client_process_data(State, Data));
876
877	terminate ->
878	    gen_client_loop(State#{condition => terminate})
879
880    end;
881gen_client_loop(#{condition := send_request,
882		  sock      := Sock,
883		  mid       := MID} = State) ->
884    %% i("try send request ~w", [MID]),
885    Data = ?DATA,
886    SZ   = size(Data),
887    Req  = <<?MTAG:32, MID:32, ?REQUEST:32, SZ:32, Data/binary>>,
888    case gen_tcp:send(Sock, Req) of
889	ok ->
890	    gen_client_loop(State#{condition => {await_reply, MID},
891				   mid => MID + 1});
892	{error, Reason} ->
893	    e("failed sending request ~w: "
894	      "~n   ~p", [MID, Reason]),
895	    client_exit(State, send, Reason)
896    end.
897
898
899
900esock_client_loop(#{condition := terminate,
901		    sock      := Sock}) ->
902    (catch socket:close(Sock)),
903    exit(normal);
904esock_client_loop(#{condition := {await_reply, _MID},
905		    select    := undefined,
906		    sock      := Sock} = State) ->
907    %% i("try (nowait) recv"),
908    case socket:recv(Sock, 0, nowait) of
909	{ok, Data} when is_binary(Data) ->
910	    %% i("received (~w bytes of) data", [size(Data)]),
911	    esock_client_loop(client_process_data(State, Data));
912	%% This is the "old" style
913	{ok, {Data, SelectInfo}} when is_binary(Data) ->
914	    %% i("partial recv - select"),
915	    Buf0 = maps:take(buf, State),
916	    Buf2 = <<Buf0/binary, Data/binary>>,
917	    esock_client_loop(State#{buf    => Buf2,
918				     select => SelectInfo});
919	%% This is the "new" style
920	{select, {Data, SelectInfo}} when is_binary(Data) ->
921	    %% i("partial recv - select"),
922	    Buf0 = maps:take(buf, State),
923	    Buf2 = <<Buf0/binary, Data/binary>>,
924	    esock_client_loop(State#{buf    => Buf2,
925				     select => SelectInfo});
926	{select, SelectInfo} ->
927	    %% i("select"),
928	    esock_client_loop(State#{select => SelectInfo});
929	{error, Reason} ->
930	    e("recv failed: "
931	      "~n   ~p", [Reason]),
932	    (catch socket:close(Sock)),
933	    client_exit(State, recv, Reason)
934    end;
935esock_client_loop(#{condition := {await_reply, _MID},
936		    select    := {select_info, _, Info} = SelectInfo,
937		    parent    := Parent,
938		    sock      := Sock} = State) ->
939    receive
940	{'DOWN', _MRef, process, Parent, Reason} ->
941	    e("unexpected down from parent received: "
942	      "~n   ~p", [Reason]),
943	    (catch socket:cancel(Sock, SelectInfo)),
944	    (catch socket:close(Sock)),
945	    client_exit(State, parent_down, Reason);
946
947	{'$socket', Sock, abort, Info} ->
948	    e("received unexpected select abort: "
949	      "~n   ~p", [Info]),
950	    (catch socket:close(Sock)),
951	    client_exit(State, abort, Info);
952
953	{'$socket', Sock, select, Info} ->
954	    %% i("select message received - try recv"),
955	    case socket:recv(Sock) of
956		{ok, Data} ->
957		    %% i("recv succeed (~w bytes of data received)", [size(Data)]),
958		    esock_client_loop(
959		      client_process_data(State#{select => undefined}, Data));
960		{error, Reason} ->
961		    e("recv failed: "
962		      "~n   ~p", [Reason]),
963		    (catch socket:close(Sock)),
964		    client_exit(State, recv, Reason)
965	    end;
966
967	terminate ->
968	    esock_client_loop(State#{condition => terminate})
969
970    end;
971esock_client_loop(#{condition := send_request,
972		    sock      := Sock,
973		    mid       := MID} = State) ->
974    %% i("try send request ~w", [MID]),
975    Data = ?DATA,
976    SZ   = size(Data),
977    Req  = <<?MTAG:32, MID:32, ?REQUEST:32, SZ:32, Data/binary>>,
978    case socket:send(Sock, Req) of
979	ok ->
980	    esock_client_loop(State#{condition => {await_reply, MID},
981				     mid => MID + 1});
982	{error, Reason} ->
983	    e("failed sending request ~w: "
984	      "~n   ~p", [MID, Reason]),
985	    client_exit(State, send, Reason)
986    end.
987
988
989
990client_process_data(#{condition := {await_reply, MID},
991		      buf       := Buf} = State, NewData) ->
992    case <<Buf/binary, NewData/binary>> of
993	<<?MTAG:32, MID:32, ?REPLY:32, SZ:32, _Data:SZ/binary, Rest/binary>> ->
994	    State#{condition => send_request, buf => Rest};
995	<<?MTAG:32, MID2:32, ?REPLY:32, SZ:32, _Data:SZ/binary, _Rest/binary>> ->
996	    client_exit(State, unexpected_msg, MID2);
997	NewBuf ->
998	    State#{buf => NewBuf}
999    end.
1000
1001client_exit(#{id := ID}, Tag, Reason) ->
1002    exit({{client, ID}, Tag, Reason}).
1003
1004
1005%% =======================================================================
1006%% Utility
1007%% =======================================================================
1008
1009%% This gets the local address (not {127, _} or {0, ...} or {16#fe80, ...})
1010%% We should really implement this using the (new) net module,
1011%% but until that gets the necessary functionality...
1012which_local_addr(Domain) ->
1013    case which_local_host_info(Domain) of
1014        {ok, #{addr := Addr}} ->
1015            {ok, Addr};
1016        {error, _Reason} = ERROR ->
1017            ERROR
1018    end.
1019
1020
1021%% Returns the interface (name), flags and address (not 127...)
1022%% of the local host.
1023which_local_host_info(Domain) ->
1024    case inet:getifaddrs() of
1025        {ok, IFL} ->
1026            which_local_host_info(Domain, IFL);
1027        {error, _} = ERROR ->
1028            ERROR
1029    end.
1030
1031which_local_host_info(_Domain, []) ->
1032    {error, no_address};
1033which_local_host_info(Domain, [{"docker" ++ _, _}|IFL]) ->
1034    which_local_host_info(Domain, IFL);
1035which_local_host_info(Domain, [{"br-" ++ _, _}|IFL]) ->
1036    which_local_host_info(Domain, IFL);
1037which_local_host_info(Domain, [{Name, IFO}|IFL]) ->
1038    case if_is_running_and_not_loopback(IFO) of
1039        true ->
1040            try which_local_host_info2(Domain, IFO) of
1041                Info ->
1042                    {ok, Info#{name => Name}}
1043            catch
1044                throw:_:_ ->
1045                    which_local_host_info(Domain, IFL)
1046            end;
1047        false ->
1048            which_local_host_info(Domain, IFL)
1049    end;
1050which_local_host_info(Domain, [_|IFL]) ->
1051    which_local_host_info(Domain, IFL).
1052
1053if_is_running_and_not_loopback(If) ->
1054    lists:keymember(flags, 1, If) andalso
1055        begin
1056            {value, {flags, Flags}} = lists:keysearch(flags, 1, If),
1057            (not lists:member(loopback, Flags)) andalso
1058                lists:member(running, Flags)
1059        end.
1060
1061
1062which_local_host_info2(inet = _Domain, IFO) ->
1063    Addr      = which_local_host_info3(addr,  IFO,
1064                                       fun({A, _, _, _}) when (A =/= 127) -> true;
1065                                          (_) -> false
1066                                       end),
1067    NetMask   = which_local_host_info3(netmask,  IFO,
1068                                       fun({_, _, _, _}) -> true;
1069                                          (_) -> false
1070                                       end),
1071    BroadAddr = which_local_host_info3(broadaddr,  IFO,
1072                                       fun({_, _, _, _}) -> true;
1073                                          (_) -> false
1074                                       end),
1075    Flags     = which_local_host_info3(flags, IFO, fun(_) -> true end),
1076    #{flags     => Flags,
1077      addr      => Addr,
1078      broadaddr => BroadAddr,
1079      netmask   => NetMask};
1080which_local_host_info2(inet6 = _Domain, IFO) ->
1081    Addr    = which_local_host_info3(addr,  IFO,
1082                                     fun({A, _, _, _, _, _, _, _})
1083                                           when (A =/= 0) andalso
1084                                                (A =/= 16#fe80) -> true;
1085                                        (_) -> false
1086                                     end),
1087    NetMask = which_local_host_info3(netmask,  IFO,
1088                                       fun({_, _, _, _, _, _, _, _}) -> true;
1089                                          (_) -> false
1090                                       end),
1091    Flags   = which_local_host_info3(flags, IFO, fun(_) -> true end),
1092    #{flags   => Flags,
1093      addr    => Addr,
1094      netmask => NetMask}.
1095
1096which_local_host_info3(_Key, [], _) ->
1097    throw({error, no_address});
1098which_local_host_info3(Key, [{Key, Val}|IFO], Check) ->
1099    case Check(Val) of
1100        true ->
1101            Val;
1102        false ->
1103            which_local_host_info3(Key, IFO, Check)
1104    end;
1105which_local_host_info3(Key, [_|IFO], Check) ->
1106    which_local_host_info3(Key, IFO, Check).
1107
1108
1109%% ---
1110
1111string_of(socket = _Module, Socket) ->
1112    socket:to_list(Socket);
1113string_of(inet = _Module, Socket) ->
1114    inet:socket_to_list(Socket).
1115
1116
1117sockmon_fun(Module) ->
1118    fun(Sock) -> Module:monitor(Sock) end.
1119
1120
1121%% ---
1122
1123
1124
1125%% ---
1126
1127f(F, A) ->
1128    lists:flatten(io_lib:format(F, A)).
1129
1130
1131formated_timestamp() ->
1132    format_timestamp(os:timestamp()).
1133
1134format_timestamp({_N1, _N2, N3} = TS) ->
1135    {_Date, Time}  = calendar:now_to_local_time(TS),
1136    {Hour,Min,Sec} = Time,
1137    FormatTS = io_lib:format("~.2.0w:~.2.0w:~.2.0w.4~w",
1138                             [Hour, Min, Sec, round(N3/1000)]),
1139    lists:flatten(FormatTS).
1140
1141
1142e(F, A) ->
1143    p(true, "<ERROR> ", F, A).
1144
1145i(F) ->
1146    i(F, []).
1147
1148i(F, A) ->
1149    p(get(debug), "", F, A).
1150
1151p(true, PRE, F, A) ->
1152    io:format("[ ~s, ~s ] ~s" ++ F ++ "~n",
1153	      [formated_timestamp(), get(sname), PRE | A]);
1154p(_, _, _, _) ->
1155    ok.
1156
1157