1%% 2%% %CopyrightBegin% 3%% 4%% Copyright Ericsson AB 1996-2020. All Rights Reserved. 5%% 6%% Licensed under the Apache License, Version 2.0 (the "License"); 7%% you may not use this file except in compliance with the License. 8%% You may obtain a copy of the License at 9%% 10%% http://www.apache.org/licenses/LICENSE-2.0 11%% 12%% Unless required by applicable law or agreed to in writing, software 13%% distributed under the License is distributed on an "AS IS" BASIS, 14%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15%% See the License for the specific language governing permissions and 16%% limitations under the License. 17%% 18%% %CopyrightEnd% 19%% 20-module(pool). 21 22%% Supplies a computational pool of processors. 23%% The chief user interface function here is get_node() 24%% Which returns the name of the nodes in the pool 25%% with the least load !!!! 26%% This function is callable from any node including the master 27%% That is part of the pool 28%% nodes are scheduled on a per usage basis and per load basis, 29%% Whenever we use a node, we put at the end of the queue, and whenever 30%% a node report a change in load, we insert it accordingly 31 32% User interface Exports ... 33-export([start/1, 34 start/2, 35 stop/0, 36 get_nodes/0, 37 get_nodes_and_load/0, 38 get_node/0, 39 pspawn/3, 40 attach/1, 41 pspawn_link/3]). 42 43%% Internal Exports 44-export([statistic_collector/0, 45 do_spawn/4, 46 init/1, 47 handle_call/3, 48 handle_cast/2, 49 handle_info/2, 50 terminate/2]). 51 52%% User interface 53 54%% Start up using the .hosts.erlang file 55 56-spec start(Name) -> Nodes when 57 Name :: atom(), 58 Nodes :: [node()]. 59start(Name) -> 60 start(Name,[]). 61 62-spec start(Name, Args) -> Nodes when 63 Name :: atom(), 64 Args :: string(), 65 Nodes :: [node()]. 66start(Name, Args) when is_atom(Name) -> 67 _ = gen_server:start({global, pool_master}, pool, [], []), 68 Hosts = net_adm:host_file(), 69 Nodes = start_nodes(Hosts, Name, Args), 70 lists:foreach(fun attach/1, Nodes), 71 Nodes. 72 73%% 74%% Interface functions ... 75%% 76-spec get_nodes() -> [node()]. 77get_nodes() -> 78 get_elements(2, get_nodes_and_load()). 79 80-spec attach(Node) -> 'already_attached' | 'attached' when 81 Node :: node(). 82attach(Node) -> 83 gen_server:call({global, pool_master}, {attach, Node}). 84 85get_nodes_and_load() -> 86 gen_server:call({global, pool_master}, get_nodes). 87 88-spec get_node() -> node(). 89get_node() -> 90 gen_server:call({global, pool_master}, get_node). 91 92-spec pspawn(Mod, Fun, Args) -> pid() when 93 Mod :: module(), 94 Fun :: atom(), 95 Args :: [term()]. 96pspawn(M, F, A) -> 97 gen_server:call({global, pool_master}, {spawn, group_leader(), M, F, A}). 98 99-spec pspawn_link(Mod, Fun, Args) -> pid() when 100 Mod :: module(), 101 Fun :: atom(), 102 Args :: [term()]. 103pspawn_link(M, F, A) -> 104 spawn_link(get_node(), M, F, A). 105 106start_nodes([], _, _) -> []; 107start_nodes([Host|Tail], Name, Args) -> 108 case slave:start(Host, Name, Args) of 109 {error, {already_running, Node}} -> 110 io:format("Can't start node on host ~w due to ~w~n",[Host, {already_running, Node}]), 111 [Node | start_nodes(Tail, Name, Args)]; 112 {error, R} -> 113 io:format("Can't start node on host ~w due to ~w~n",[Host, R]), 114 start_nodes(Tail, Name, Args); 115 {ok, Node} -> 116 [Node | start_nodes(Tail, Name, Args)] 117 end. 118 119-spec stop() -> 'stopped'. 120stop() -> 121 gen_server:call({global, pool_master}, stop). 122 123get_elements(_Pos,[]) -> []; 124get_elements(Pos,[E|T]) -> [element(Pos,E) | get_elements(Pos,T)]. 125 126stop_em([]) -> stopped; 127stop_em([N|Tail]) -> 128 rpc:cast(N, erlang, halt, []), 129 stop_em(Tail). 130 131init([]) -> 132 process_flag(trap_exit, true), 133 spawn_link(pool, statistic_collector, []), 134 {ok,[{0,node()}]}. 135 136handle_call(get_nodes, _From, Nodes)-> 137 {reply, Nodes, Nodes}; 138handle_call(get_node, _From, [{Load,N}|Tail]) -> 139 {reply, N, Tail++[{Load+1, N}]}; 140handle_call({attach, Node}, _From, Nodes) -> 141 case lists:keymember(Node, 2, Nodes) of 142 true -> 143 {reply, already_attached, Nodes}; 144 false -> 145 erlang:monitor_node(Node, true), 146 spawn_link(Node, pool, statistic_collector, []), 147 {reply, attached, Nodes++[{999999,Node}]} 148 end; 149handle_call({spawn, Gl, M, F, A}, _From, Nodes) -> 150 {reply, N, NewNodes} = handle_call(get_node, _From, Nodes), 151 Pid = spawn(N, pool, do_spawn, [Gl, M, F, A]), 152 {reply, Pid, NewNodes}; 153handle_call(stop, _From, Nodes) -> 154 %% clean up in terminate/2 155 {stop, normal, stopped, Nodes}. 156 157handle_cast(_, Nodes) -> 158 {noreply, Nodes}. 159 160handle_info({Node,load,Load}, Nodes) -> 161 Nodes2 = insert_node({Load,Node}, Nodes), 162 {noreply, Nodes2}; 163handle_info({nodedown, Node}, Nodes) -> 164 {noreply, lists:keydelete(Node, 2, Nodes)}; 165handle_info(_, Nodes) -> %% The EXIT signals etc.etc 166 {noreply, Nodes}. 167 168terminate(_Reason, Nodes) -> 169 N = lists:delete(node(), get_elements(2, Nodes)), 170 stop_em(N), 171 ok. 172 173-spec do_spawn(pid(), module(), atom(), [term()]) -> term(). 174do_spawn(Gl, M, F, A) -> 175 group_leader(Gl, self()), 176 apply(M, F, A). 177 178insert_node({Load,Node},[{L,Node}|Tail]) when Load > L -> 179 %% We have a raised load here 180 pure_insert({Load,Node},Tail); 181insert_node({Load,Node},[{L,N}|Tail]) when Load =< L -> 182 %% Move forward in the list 183 T = lists:keydelete(Node,2,[{L,N}|Tail]), 184 [{Load,Node} | T]; 185insert_node(Ln,[H|T]) -> 186 [H | insert_node(Ln,T)]; 187insert_node(X,[]) -> % Can't happen 188 error_logger:error_msg("Pool_master: Bad node list X=~w\n", [X]), 189 exit(crash). 190 191pure_insert({Load,Node},[]) -> 192 [{Load,Node}]; 193pure_insert({Load,Node},[{L,N}|Tail]) when Load < L -> 194 [{Load,Node}, {L,N} | Tail]; 195pure_insert(L,[H|T]) -> [H|pure_insert(L,T)]. 196 197%% Really should not measure the contributions from 198%% the background processes here .... which we do :-( 199%% We don't have to monitor the master, since we're slaves anyway 200 201statistic_collector() -> 202 statistic_collector(5). 203 204statistic_collector(0) -> exit(normal); 205statistic_collector(I) -> 206 timer:sleep(300), 207 case global:whereis_name(pool_master) of 208 undefined -> 209 statistic_collector(I-1); 210 M -> 211 stat_loop(M, 999999) 212 end. 213 214%% Do not tell the master about our load if it has not changed 215 216stat_loop(M, Old) -> 217 timer:sleep(2000), 218 case statistics(run_queue) of 219 Old -> 220 stat_loop(M, Old); 221 NewLoad -> 222 M ! {node(), load, NewLoad}, %% async 223 stat_loop(M, NewLoad) 224 end. 225