1%%
2%% %CopyrightBegin%
3%%
4%% Copyright Ericsson AB 1996-2020. All Rights Reserved.
5%%
6%% Licensed under the Apache License, Version 2.0 (the "License");
7%% you may not use this file except in compliance with the License.
8%% You may obtain a copy of the License at
9%%
10%%     http://www.apache.org/licenses/LICENSE-2.0
11%%
12%% Unless required by applicable law or agreed to in writing, software
13%% distributed under the License is distributed on an "AS IS" BASIS,
14%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15%% See the License for the specific language governing permissions and
16%% limitations under the License.
17%%
18%% %CopyrightEnd%
19%%
20-module(pool).
21
22%% Supplies a computational pool of processors.
23%% The chief user interface function here is get_node()
24%% Which returns the name of the nodes in the pool
25%% with the least load !!!!
26%% This function is callable from any node including the master
27%% That is part of the pool
28%% nodes are scheduled on a per usage basis and per load basis,
29%% Whenever we use a node, we put at the end of the queue, and whenever
30%% a node report a change in load, we insert it accordingly
31
32% User interface Exports ...
33-export([start/1,
34	 start/2,
35	 stop/0,
36	 get_nodes/0,
37	 get_nodes_and_load/0,
38	 get_node/0,
39	 pspawn/3,
40	 attach/1,
41	 pspawn_link/3]).
42
43%% Internal Exports
44-export([statistic_collector/0,
45	 do_spawn/4,
46	 init/1,
47	 handle_call/3,
48	 handle_cast/2,
49	 handle_info/2,
50	 terminate/2]).
51
52%% User interface
53
54%% Start up using the .hosts.erlang file
55
56-spec start(Name) -> Nodes when
57      Name :: atom(),
58      Nodes :: [node()].
59start(Name) ->
60    start(Name,[]).
61
62-spec start(Name, Args) -> Nodes when
63      Name :: atom(),
64      Args :: string(),
65      Nodes :: [node()].
66start(Name, Args) when is_atom(Name) ->
67    _ = gen_server:start({global, pool_master}, pool, [], []),
68    Hosts = net_adm:host_file(),
69    Nodes = start_nodes(Hosts, Name, Args),
70    lists:foreach(fun attach/1, Nodes),
71    Nodes.
72
73%%
74%% Interface functions ...
75%%
76-spec get_nodes() -> [node()].
77get_nodes() ->
78    get_elements(2, get_nodes_and_load()).
79
80-spec attach(Node) -> 'already_attached' | 'attached' when
81      Node :: node().
82attach(Node) ->
83    gen_server:call({global, pool_master}, {attach, Node}).
84
85get_nodes_and_load() ->
86    gen_server:call({global, pool_master}, get_nodes).
87
88-spec get_node() -> node().
89get_node() ->
90    gen_server:call({global, pool_master}, get_node).
91
92-spec pspawn(Mod, Fun, Args) -> pid() when
93      Mod :: module(),
94      Fun :: atom(),
95      Args :: [term()].
96pspawn(M, F, A) ->
97    gen_server:call({global, pool_master}, {spawn, group_leader(), M, F, A}).
98
99-spec pspawn_link(Mod, Fun, Args) -> pid() when
100      Mod :: module(),
101      Fun :: atom(),
102      Args :: [term()].
103pspawn_link(M, F, A) ->
104    spawn_link(get_node(), M, F, A).
105
106start_nodes([], _, _) -> [];
107start_nodes([Host|Tail], Name, Args) ->
108    case slave:start(Host, Name, Args) of
109	{error, {already_running, Node}} ->
110	    io:format("Can't start node on host ~w due to ~w~n",[Host, {already_running, Node}]),
111	    [Node | start_nodes(Tail, Name, Args)];
112	{error, R} ->
113	    io:format("Can't start node on host ~w due to ~w~n",[Host, R]),
114	    start_nodes(Tail, Name, Args);
115	{ok, Node} ->
116	    [Node | start_nodes(Tail, Name, Args)]
117    end.
118
119-spec stop() -> 'stopped'.
120stop() ->
121    gen_server:call({global, pool_master}, stop).
122
123get_elements(_Pos,[]) -> [];
124get_elements(Pos,[E|T]) -> [element(Pos,E) | get_elements(Pos,T)].
125
126stop_em([]) -> stopped;
127stop_em([N|Tail]) ->
128    rpc:cast(N, erlang, halt, []),
129    stop_em(Tail).
130
131init([]) ->
132    process_flag(trap_exit, true),
133    spawn_link(pool, statistic_collector, []),
134    {ok,[{0,node()}]}.
135
136handle_call(get_nodes, _From, Nodes)->
137    {reply, Nodes, Nodes};
138handle_call(get_node, _From, [{Load,N}|Tail]) ->
139    {reply, N, Tail++[{Load+1, N}]};
140handle_call({attach, Node}, _From, Nodes) ->
141    case lists:keymember(Node, 2, Nodes) of
142	true ->
143	    {reply, already_attached, Nodes};
144	false ->
145	    erlang:monitor_node(Node, true),
146	    spawn_link(Node, pool, statistic_collector, []),
147	    {reply, attached, Nodes++[{999999,Node}]}
148    end;
149handle_call({spawn, Gl, M, F, A}, _From, Nodes) ->
150    {reply, N, NewNodes} = handle_call(get_node, _From, Nodes),
151    Pid = spawn(N, pool, do_spawn, [Gl, M, F, A]),
152    {reply, Pid, NewNodes};
153handle_call(stop, _From, Nodes) ->
154    %% clean up in terminate/2
155    {stop, normal, stopped, Nodes}.
156
157handle_cast(_, Nodes) ->
158    {noreply, Nodes}.
159
160handle_info({Node,load,Load}, Nodes) ->
161    Nodes2 = insert_node({Load,Node}, Nodes),
162    {noreply, Nodes2};
163handle_info({nodedown, Node}, Nodes) ->
164    {noreply, lists:keydelete(Node, 2, Nodes)};
165handle_info(_, Nodes) ->  %% The EXIT signals etc.etc
166    {noreply, Nodes}.
167
168terminate(_Reason, Nodes) ->
169    N = lists:delete(node(), get_elements(2, Nodes)),
170    stop_em(N),
171    ok.
172
173-spec do_spawn(pid(), module(), atom(), [term()]) -> term().
174do_spawn(Gl, M, F, A) ->
175    group_leader(Gl, self()),
176    apply(M, F, A).
177
178insert_node({Load,Node},[{L,Node}|Tail]) when Load > L ->
179    %% We have a raised load here
180    pure_insert({Load,Node},Tail);
181insert_node({Load,Node},[{L,N}|Tail]) when Load =< L ->
182    %% Move forward in the list
183    T = lists:keydelete(Node,2,[{L,N}|Tail]),
184    [{Load,Node} | T];
185insert_node(Ln,[H|T]) ->
186    [H | insert_node(Ln,T)];
187insert_node(X,[]) ->          % Can't happen
188    error_logger:error_msg("Pool_master: Bad node list X=~w\n", [X]),
189    exit(crash).
190
191pure_insert({Load,Node},[]) ->
192    [{Load,Node}];
193pure_insert({Load,Node},[{L,N}|Tail]) when Load < L ->
194    [{Load,Node}, {L,N} | Tail];
195pure_insert(L,[H|T]) -> [H|pure_insert(L,T)].
196
197%% Really should not measure the contributions from
198%% the background processes here .... which we do :-(
199%% We don't have to monitor the master, since we're slaves anyway
200
201statistic_collector() ->
202    statistic_collector(5).
203
204statistic_collector(0) -> exit(normal);
205statistic_collector(I) ->
206    timer:sleep(300),
207    case global:whereis_name(pool_master) of
208	undefined ->
209	    statistic_collector(I-1);
210	M ->
211	    stat_loop(M, 999999)
212    end.
213
214%% Do not tell the master about our load if it has not changed
215
216stat_loop(M, Old) ->
217    timer:sleep(2000),
218    case statistics(run_queue) of
219	Old ->
220	    stat_loop(M, Old);
221	NewLoad ->
222	    M ! {node(), load, NewLoad}, %% async
223	    stat_loop(M, NewLoad)
224    end.
225