1%%
2%% %CopyrightBegin%
3%%
4%% Copyright Ericsson AB 1997-2017. All Rights Reserved.
5%%
6%% Licensed under the Apache License, Version 2.0 (the "License");
7%% you may not use this file except in compliance with the License.
8%% You may obtain a copy of the License at
9%%
10%%     http://www.apache.org/licenses/LICENSE-2.0
11%%
12%% Unless required by applicable law or agreed to in writing, software
13%% distributed under the License is distributed on an "AS IS" BASIS,
14%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15%% See the License for the specific language governing permissions and
16%% limitations under the License.
17%%
18%% %CopyrightEnd%
19%%
20-module(dist_ac).
21
22-behaviour(gen_server).
23
24%% External exports
25-export([start_link/0,
26	 load_application/2,
27	 takeover_application/2,
28	 permit_application/2,
29	 permit_only_loaded_application/2]).
30
31-export([get_known_nodes/0]).
32
33%% Internal exports
34-export([init/1, handle_cast/2, handle_call/3, handle_info/2, terminate/2,
35	 code_change/3, send_timeout/3]).
36-export([info/0]).
37
38-import(lists, [zf/2, filter/2, map/2, foreach/2, foldl/3, mapfoldl/3,
39		keysearch/3, keydelete/3, keyreplace/4, member/2]).
40
41-define(AC, application_controller).
42-define(DIST_AC, ?MODULE).
43-define(LOCK_ID, ?MODULE).
44
45%% This is the protocol version for the dist_ac protcol (between nodes)
46-define(vsn, 1).
47
48%%%-----------------------------------------------------------------
49%%% This module implements the default Distributed Applications
50%%% Controller.  It is possible to write other controllers, when
51%%% the functionality in this module are not sufficient.
52%%% The process cooperates with the application_controller.
53%%%-----------------------------------------------------------------
54
55%%-----------------------------------------------------------------
56%% Naming conventions:
57%%   Appl = #appl
58%%   AppName = atom()
59%%-----------------------------------------------------------------
60-record(state, {appls = [], tmp_locals = [], remote_started = [],
61		known = [], started = [], tmp_weights = [],
62		dist_loaded = [], t_reqs = [], s_reqs = [], p_reqs = []}).
63%%-----------------------------------------------------------------
64%% appls           = [#appl()] - these are the applications we control
65%% tmp_locals      = [{AppName, Weight, node()}] - tmp, info part of
66%%                   application startup for some distrib appls,
67%%                   not yet handled.
68%% remote_started  = [{Node, AppName}] - info on apps started before
69%%                   we were started
70%% known           = [Node] - These are the nodes known to us
71%% started         = [AppName] - An ordered list of started applications
72%%                   (reversed start order)
73%% tmp_weight      = [{AppName, MyWeight}] - tmp, if we're forced to
74%%                   send a dist_ac_weight message before we're prepared to,
75%%                   we remember the weight we sent here, so we can use
76%%                   it in the dist_ac_weight msgs later.
77%% dist_loaded     = {{Name, Node}, HisNodes, Permission} - info on
78%%                   application loaded on other nodes (and own node)
79%% t_reqs          = [{AppName, From}] - processes waiting for takeover
80%%                   to complete.
81%% s_reqs          = [{AppName, From}] - processes waiting for stop
82%%                   to complete.
83%% p_reqs          = [{From, AppName, Bool, [Node]] - outstanding permit.
84%%                   Nodes is a list of nodes we're still waiting for.
85%%-----------------------------------------------------------------
86
87-record(appl, {name, id, restart_time = 0,  nodes = [], run = []}).
88
89%%-----------------------------------------------------------------
90%% id = local | undefined | {distributed, node()} | waiting | run_waiting |
91%%      {failover, Node} | {takeover, Node}
92%%      local : local application
93%%      undefined : not yet started
94%%      {distributed, Node} : running on another node, we're standby
95%%      {failover, Node} : failover from Node
96%%      {takeover, Node} : takeover from Node
97%%      waiting : other node went down, we're waiting for a timeout
98%%                to takeover it.  From = pid() | undefined
99%%      run_waiting : we have decided to start the app; wait for the
100%%                    AC result
101%%-----------------------------------------------------------------
102
103start_link() ->
104    case gen_server:start_link({local, ?DIST_AC}, ?MODULE, [], []) of
105	{ok, Pid} ->
106	    gen_server:cast(?DIST_AC, init_sync),
107	    {ok, Pid};
108	Else ->
109	    Else
110    end.
111
112
113%%-----------------------------------------------------------------
114%% Func: load_application(AppName, DistNodes)
115%% Args: AppName = atom()
116%%       DistNodes = default | {AppName, Time, [node() | {node()...}]}
117%% Purpose: Notifies the dist_ac about distributed nodes for an
118%%          application.  DistNodes overrides the kernel 'distributed'
119%%          parameter.
120%% Returns: ok | {error, Reason}
121%%-----------------------------------------------------------------
122load_application(AppName, DistNodes) ->
123    gen_server:call(?DIST_AC, {load_application, AppName, DistNodes}, infinity).
124
125takeover_application(AppName, RestartType) ->
126    case valid_restart_type(RestartType) of
127	true ->
128	    wait_for_sync_dacs(),
129	    Nodes = get_nodes(AppName),
130	    global:trans(
131	      {?LOCK_ID, self()},
132	      fun() ->
133		      gen_server:call(
134			?DIST_AC,
135			{takeover_application, AppName, RestartType},
136			infinity)
137	      end,
138	      Nodes);
139	false ->
140	    {error, {invalid_restart_type, RestartType}}
141    end.
142
143%%-----------------------------------------------------------------
144%% This function controls which applications are permitted to run.  If
145%% an application X runs when this function is called as
146%% permit_application(X, false), it is moved to another node where it
147%% is permitted to run (distributed applications only).  If there is
148%% no such node, the application is stopped. (I.e. local applications
149%% are always stopped, and distributed applications with no other node
150%% alive are stopped as well.)  If later a call to
151%% permit_application(X, true) is made, X is restarted.
152%% For example, suppose applications app1 and app2 are started and
153%% running.
154%% If we evaluate
155%%   permit_application(app2, false)
156%% app2 is stopped and app1 only is running.
157%% If we now evaluate
158%%   permit_application(app2, true),
159%%   permit_application(app3, true)
160%% app2 is restarted, but not app3, since it hasn't been started by a
161%% call to start_application.
162%%-----------------------------------------------------------------
163permit_application(AppName, Bool) ->
164    wait_for_sync_dacs(),
165    LockId = {?LOCK_ID, self()},
166    global:trans(
167      LockId,
168      fun() ->
169	      gen_server:call(?DIST_AC,
170			      {permit_application, AppName, Bool, LockId, started},
171			      infinity)
172      end).
173
174permit_only_loaded_application(AppName, Bool) ->
175    wait_for_sync_dacs(),
176    LockId = {?LOCK_ID, self()},
177    global:trans(
178      LockId,
179      fun() ->
180	      gen_server:call(?DIST_AC,
181			      {permit_application, AppName, Bool, LockId, only_loaded},
182			      infinity)
183      end).
184
185get_nodes(AppName) ->
186    gen_server:call(?DIST_AC, {get_nodes, AppName}, infinity).
187
188get_known_nodes() ->
189    gen_server:call(?DIST_AC, get_known_nodes).
190
191%%%-----------------------------------------------------------------
192%%% call-back functions from gen_server
193%%%-----------------------------------------------------------------
194init([]) ->
195    process_flag(trap_exit, true),
196    {ok, #state{}}.
197
198sync_dacs(Appls) ->
199    Res = global:trans({?LOCK_ID, sync_dacs},
200		       fun() ->
201			       Nodes = introduce_me(nodes(), Appls),
202			       wait_dacs(Nodes, [node()], Appls, [])
203		       end),
204    ets:insert(ac_tab, {sync_dacs, ok}),
205    Res.
206
207introduce_me(Nodes, Appls) ->
208    Msg = {dist_ac_new_node, ?vsn, node(), Appls, []},
209    filter(fun(Node) ->
210		   %% This handles nodes without DACs
211		   case rpc:call(Node, erlang, whereis, [?DIST_AC]) of
212		       Pid when is_pid(Pid) ->
213			   Pid ! Msg,
214			   true;
215		       _ ->
216			   false
217		   end
218	   end, Nodes).
219
220wait_dacs([Node | Nodes], KnownNodes, Appls, RStarted) ->
221    monitor_node(Node, true),
222    receive
223	%% HisAppls =/= [] is the case when our node connects to a running system
224	%%
225	%% It is always the responsibility of newer versions to understand
226	%% older versions of the protocol.  As we don't have any older
227	%% versions (that are supposed to work with this version), we
228	%% don't handle version mismatch here.
229	{dist_ac_new_node, _Vsn, Node, HisAppls, HisStarted} ->
230	    monitor_node(Node, false),
231	    NRStarted = RStarted ++ HisStarted,
232	    NAppls = dist_merge(Appls, HisAppls, Node),
233	    wait_dacs(Nodes, [Node | KnownNodes], NAppls, NRStarted);
234	{nodedown, Node} ->
235	    monitor_node(Node, false),
236	    wait_dacs(Nodes, KnownNodes, Appls, RStarted)
237    end;
238wait_dacs([], KnownNodes, Appls, RStarted) ->
239    {KnownNodes, Appls, RStarted}.
240
241
242info() ->
243    gen_server:call(?DIST_AC, info).
244
245
246%%-----------------------------------------------------------------
247%% All functions that can affect which applications are running
248%% execute within a global lock, to ensure that they are not
249%% executing at the same time as sync_dacs.  However, to avoid a
250%% deadlock situation where e.g. permit_application gets the lock
251%% before sync_dacs, this function is used to ensure that the local
252%% sync_dacs always gets the lock first of all.  The lock is still
253%% used to not interfere with sync_dacs on other nodes.
254%%-----------------------------------------------------------------
255wait_for_sync_dacs() ->
256    case catch ets:lookup(ac_tab, sync_dacs) of
257	[{sync_dacs, ok}] -> ok;
258	_ ->
259	    receive after 100 -> ok end,
260	    wait_for_sync_dacs()
261    end.
262
263handle_cast(init_sync, _S) ->
264    %% When the dist_ac is started, it receives this msg, and gets into
265    %% the receive loop.  'go' is sent from the kernel_config proc when
266    %% all nodes that should be pinged has been pinged.  The reason for this
267    %% is that dist_ac syncs with the other nodes at start-up.  That is,
268    %% it does _not_ handle partitioned nets!  The other nodes tries to call
269    %% the local name dist_ac, which means that this name must be registered
270    %% before the distribution.  But it can't sync until after the distribution
271    %% is started.  Therefore, this 'go'-thing.
272    receive
273	{go, KernelConfig} ->
274	    Appls = case application:get_env(kernel, distributed) of
275			{ok, D} -> dist_check(D);
276			undefined -> []
277		    end,
278
279	    dist_take_control(Appls),
280	    %% kernel_config waits for dist_ac to take control over its
281	    %% applications. By this we can be sure that the kernel
282	    %% application hasn't completed its start before dist_ac has
283	    %% taken control over its applications. (OTP-3509)
284	    KernelConfig ! dist_ac_took_control,
285
286	    %% we're really just interested in nodedowns.
287	    ok = net_kernel:monitor_nodes(true),
288
289	    {Known, NAppls, RStarted} = sync_dacs(Appls),
290
291	    {noreply,
292	     #state{appls = NAppls, known = Known, remote_started = RStarted}}
293    end.
294
295
296handle_call(info, _From, S) ->
297    {reply, S, S};
298
299
300
301handle_call({load_application, AppName, DistNodes}, _From, S) ->
302    Appls = S#state.appls,
303    case catch dist_replace(DistNodes, AppName, Appls) of
304	{error, Error} ->
305	    {reply, {error, Error}, S};
306	{'EXIT', R} ->
307	    {stop, R, {error, R}, S};
308	NAppls ->
309	    NewS = case dist_find_nodes(NAppls, AppName) of
310		       [] -> % No distrib nodes; we ignore it
311			   S;
312		       _Nodes ->
313			   ensure_take_control(AppName, Appls),
314			   {ok, S2} = load(AppName, S#state{appls = NAppls}),
315			   S2
316		   end,
317	    {reply, ok, NewS}
318    end;
319
320handle_call({takeover_application, AppName, RestartType}, From, S) ->
321    Appls = S#state.appls,
322    case keysearch(AppName, #appl.name, Appls) of
323	{value, Appl} when element(1, Appl#appl.id) =:= distributed ->
324	    {distributed, Node} = Appl#appl.id,
325	    _ = ac_takeover(req, AppName, Node, RestartType),
326	    NAppl = Appl#appl{id = takeover},
327	    NAppls = keyreplace(AppName, #appl.name, Appls, NAppl),
328	    TR = S#state.t_reqs,
329	    {noreply, S#state{appls = NAppls,
330			      t_reqs = [{AppName, From} | TR]}};
331	{value, #appl{id = local}} ->
332	    {reply, {error, {already_running_locally, AppName}}, S};
333	_ ->
334	    {reply, {error, {not_running_distributed, AppName}}, S}
335    end;
336
337handle_call({permit_application, AppName, Bool, LockId, StartInfo}, From, S) ->
338    case lists:keymember(AppName, #appl.name, S#state.appls) of
339	false ->
340	    %% This one covers the case with permit for non-distributed
341	    %% applications.  This shouldn't be handled like this, and not
342	    %% here, but we have to be backwards-compatible.
343	    case application_controller:get_loaded(AppName) of
344		{true, _} when not Bool ->
345		    _ = ac_stop_it(AppName),
346		    {reply, ok, S};
347		{true, _} when Bool ->
348		    _ = ac_start_it(req, AppName),
349		    {reply, ok, S};
350		false ->
351		    {reply, {error, {not_loaded, AppName}}, S}
352	    end;
353	true ->
354	    NAppls = dist_update_run(S#state.appls, AppName, node(), Bool),
355	    NewS = S#state{appls = NAppls},
356	    %% Check if the application is running
357	    IsRunning = keysearch(AppName, #appl.name, NAppls),
358	    IsMyApp = case IsRunning of
359			  {value, #appl{id = local}} -> true;
360			  _ -> false
361		      end,
362	    %% Tell everyone about the new permission
363	    Nodes = dist_flat_nodes(NAppls, AppName),
364	    Msg = {dist_ac_new_permission, node(), AppName, Bool, IsMyApp},
365	    send_msg(Msg, Nodes),
366	    case StartInfo of
367		only_loaded ->
368		    {reply, ok, NewS};
369		started ->
370		    permit(Bool, IsRunning, AppName, From, NewS, LockId)
371	    end
372    end;
373
374%%-----------------------------------------------------------------
375%% The distributed parameter is changed. Update the parameters
376%% but the applications are actually not moved to other nodes
377%% even if they should.
378%%-----------------------------------------------------------------
379handle_call({distribution_changed, NewDistribution}, _From, S) ->
380    Appls = S#state.appls,
381    NewAppls = dist_change_update(Appls, NewDistribution),
382    NewS = S#state{appls = NewAppls},
383    {reply, ok, NewS};
384
385
386handle_call({get_nodes, AppName}, _From, S) ->
387    Alive = intersection(dist_flat_nodes(S#state.appls, AppName),
388			 S#state.known),
389    {reply, Alive, S};
390
391handle_call(get_known_nodes, _From, S) ->
392    {reply, S#state.known, S}.
393
394
395handle_info({ac_load_application_req, AppName}, S) ->
396    {ok, NewS} = load(AppName, S),
397    ?AC ! {ac_load_application_reply, AppName, ok},
398    {noreply, NewS};
399
400handle_info({ac_application_unloaded, AppName}, S) ->
401    {ok, NewS} = unload(AppName, S),
402    {noreply, NewS};
403
404handle_info({ac_start_application_req, AppName}, S) ->
405    %% We must decide if we or another node should start the application
406    Lock = {?LOCK_ID, self()},
407    case global:set_lock(Lock, [node()], 0) of
408	true ->
409	    S2 = case catch start_appl(AppName, S, reply) of
410		     {ok, NewS, _} ->
411			 NewS;
412		     {error, R} ->
413			 ?AC ! {ac_start_application_reply, AppName, {error,R}},
414			 S
415		 end,
416	    global:del_lock(Lock),
417	    {noreply, S2};
418	false ->
419	    send_after(100, {ac_start_application_req, AppName}),
420	    {noreply, S}
421    end;
422
423handle_info({ac_application_run, AppName, Res}, S) ->
424    %% We ordered a start, and here's the result.  Tell all other nodes.
425    Appls = S#state.appls,
426    Nodes = S#state.known,
427    %% Send this to _all_ known nodes, as any node could sync
428    %% on this app (not only nodes that can run it).
429    send_msg({dist_ac_app_started, node(), AppName, Res}, Nodes),
430    NId = case Res of
431	       ok -> local;
432	       {error, _R} -> undefined
433	  end,
434    {value, Appl} = keysearch(AppName, #appl.name, Appls),
435    %% Check if we have somebody waiting for the takeover result
436    NTReqs = del_t_reqs(AppName, S#state.t_reqs, Res),
437    NAppl = Appl#appl{id = NId},
438    NAppls = keyreplace(AppName, #appl.name, Appls, NAppl),
439    {noreply, S#state{appls = NAppls, t_reqs = NTReqs}};
440
441
442handle_info({ac_application_not_run, AppName}, S) ->
443    %% We ordered a stop, and now it has stopped
444    {value, Appl} = keysearch(AppName, #appl.name, Appls = S#state.appls),
445    %% Check if we have somebody waiting for the takeover result;
446    %% if somebody called stop just before takeover was handled,
447    NTReqs = del_t_reqs(AppName, S#state.t_reqs, {error, stopped}),
448    %% Check if we have somebody waiting for stop to return
449    SReqs = filter(fun({Name, From2}) when Name =:= AppName ->
450			   gen_server:reply(From2, ok),
451			   false;
452		      (_) ->
453			   true
454		   end, S#state.s_reqs),
455    RS = case Appl#appl.id of
456	     local ->
457		 send_msg({dist_ac_app_stopped, AppName}, S#state.known),
458		 S#state.remote_started;
459	     {distributed, Node} ->
460		 [{Node, AppName} | S#state.remote_started];
461	     _ ->
462		 S#state.remote_started
463	 end,
464    NAppl = Appl#appl{id = undefined},
465    NAppls = keyreplace(AppName, #appl.name, Appls, NAppl),
466    {noreply, S#state{appls = NAppls, t_reqs = NTReqs, s_reqs = SReqs,
467		      remote_started = RS}};
468
469handle_info({ac_application_stopped, AppName}, S) ->
470    %% Somebody called application:stop - reset state as it was before
471    %% the application was started.
472    {value, Appl} = keysearch(AppName, #appl.name, Appls = S#state.appls),
473    %% Check if we have somebody waiting for the takeover result;
474    %% if somebody called stop just before takeover was handled,
475    NTReqs = del_t_reqs(AppName, S#state.t_reqs, {error, stopped}),
476    %% Check if we have somebody waiting for stop to return
477    SReqs = filter(fun({Name, From2}) when Name =:= AppName ->
478			   gen_server:reply(From2, ok),
479			   false;
480		      (_) ->
481			   true
482		   end, S#state.s_reqs),
483    RS = case Appl#appl.id of
484	     local ->
485		 send_msg({dist_ac_app_stopped, AppName}, S#state.known),
486		 S#state.remote_started;
487	     {distributed, Node} ->
488		 [{Node, AppName} | S#state.remote_started];
489	     _ ->
490		 S#state.remote_started
491	 end,
492    NAppl = Appl#appl{id = undefined},
493    NAppls = keyreplace(AppName, #appl.name, Appls, NAppl),
494    Started = lists:delete(AppName, S#state.started),
495    {noreply, S#state{appls = NAppls, started = Started,
496		      t_reqs = NTReqs, s_reqs = SReqs,
497		      remote_started = RS}};
498
499
500%%-----------------------------------------------------------------
501%% A new node gets running.
502%% Send him info about our started distributed applications.
503%%-----------------------------------------------------------------
504handle_info({dist_ac_new_node, _Vsn, Node, HisAppls, []}, S) ->
505    Appls = S#state.appls,
506    MyStarted = zf(fun(Appl) when Appl#appl.id =:= local ->
507			   {true, {node(), Appl#appl.name}};
508		      (_) ->
509			   false
510		   end, Appls),
511    {?DIST_AC, Node} ! {dist_ac_new_node, ?vsn, node(), Appls, MyStarted},
512    NAppls = dist_merge(Appls, HisAppls, Node),
513    {noreply, S#state{appls = NAppls, known = [Node | S#state.known]}};
514
515handle_info({dist_ac_app_started, Node, Name, Res}, S) ->
516    case {keysearch(Name, #appl.name, S#state.appls), lists:member(Name, S#state.started)} of
517	{{value, Appl}, true} ->
518	    Appls = S#state.appls,
519	    NId = case Appl#appl.id of
520		      _ when element(1, Res) =:= error ->
521			  %% Start of appl on some node failed.
522			  %% Set Id to undefined.  That node will have
523			  %% to take some actions, e.g. reboot
524			  undefined;
525		      {distributed, _} ->
526			  %% Another node tookover from some node. Update
527			  %% appl list.
528			  {distributed, Node};
529		      local ->
530			  %% Another node tookover from me; stop my application
531			  %% and update the running list.
532			  {distributed, Node};
533		      _ ->
534			  %% Another node started appl. Update appl list.
535			  {distributed, Node}
536		  end,
537	    _ = ac_started(req, Name, Node),
538	    NAppl = Appl#appl{id = NId},
539	    NAppls = keyreplace(Name, #appl.name, Appls, NAppl),
540	    TmpWeights = keydelete_all(Name, 1, S#state.tmp_weights),
541	    NewS = S#state{appls = NAppls, tmp_weights = TmpWeights},
542	    NPermitReq = req_del_permit_false(NewS#state.p_reqs, Name),
543	    case catch req_start_app(NewS#state{p_reqs = NPermitReq}, Name) of
544		{error, R} ->
545		    {stop, R};
546		{ok, NewS2} ->
547		    {noreply, NewS2}
548	    end;
549	{_, _} ->
550	    %% The app has not been started at this node yet; remember this in
551	    %% remote started.
552	    NRStarted = [{Node, Name} | S#state.remote_started],
553	    {noreply, S#state{remote_started = NRStarted}}
554    end;
555
556handle_info({dist_ac_app_stopped, AppName}, S) ->
557    Appls = S#state.appls,
558    case keysearch(AppName, #appl.name, Appls) of
559	false ->
560	    RStarted = keydelete(AppName, 2, S#state.remote_started),
561	    {noreply, S#state{remote_started = RStarted}};
562	{value, Appl} ->
563	    NAppl = Appl#appl{id = undefined},
564	    NAppls = keyreplace(AppName, #appl.name, Appls, NAppl),
565	    RStarted = keydelete(AppName, 2, S#state.remote_started),
566	    {noreply, S#state{appls = NAppls, remote_started = RStarted}}
567    end;
568
569handle_info({dist_ac_weight, Name, Weight, Node}, S) ->
570    %% This means another node starts up, and will eventually take over
571    %% this appl.  We have a situation like: {Name, [{Node}, node()]}
572    %% Node sends us this msg, and we must respond.  It doesn't really
573    %% matter what we send him; but it must be a dist_ac_weight msg.
574    %% Another situation is {Name, [RNode, {node()}, Node]}.
575    %%
576    %% Yet another situation is that the node where Name was running crashed,
577    %% and Node has got the nodedown message, but we haven't.  In this case,
578    %% we must send a correct weight to Node. i.e. the same weight that
579    %% we'll send to him later, when we get the nodedown message.
580    case keysearch(Name, #appl.name, S#state.appls) of
581	{value, Appl} ->
582	    Id = Appl#appl.id,
583	    case Id of
584		run_waiting ->
585		    {?DIST_AC, Node} ! {dist_ac_weight, Name, 0, node()},
586		    {noreply, S};
587		undefined ->
588		    {noreply,
589		     S#state{tmp_locals = [{Name, Weight, Node} |
590					   S#state.tmp_locals]}};
591		{takeover, _} ->
592		    {noreply,
593		     S#state{tmp_locals = [{Name, Weight, Node} |
594					   S#state.tmp_locals]}};
595		{failover, _} ->
596		    {noreply,
597		     S#state{tmp_locals = [{Name, Weight, Node} |
598					   S#state.tmp_locals]}};
599		_ ->
600		    MyWeight = get_cached_weight(Name, S),
601		    {?DIST_AC, Node} ! {dist_ac_weight, Name, MyWeight, node()},
602		    NTWs = keyreplaceadd(Name, 1, S#state.tmp_weights,
603					 {Name, MyWeight}),
604		    {noreply,  S#state{tmp_weights = NTWs}}
605	    end;
606	_ ->
607	    {noreply,
608	     S#state{tmp_locals = [{Name, Weight, Node} | S#state.tmp_locals]}}
609    end;
610
611%%-----------------------------------------------------------------
612%% A node died.  Check if we should takeover some applications.
613%%-----------------------------------------------------------------
614handle_info({nodedown, Node}, S) ->
615    AppNames = dist_get_runnable(S#state.appls),
616    HisAppls = filter(fun(#appl{name = Name, id = {distributed, N}})
617			 when Node =:= N -> lists:member(Name, AppNames);
618			 (_) -> false
619		      end,
620		      S#state.appls),
621    Appls2 = zf(fun(Appl) when Appl#appl.id =:= {distributed, Node} ->
622			case lists:member(Appl#appl.name, AppNames) of
623			    true ->
624				{true, Appl#appl{id = {failover, Node}}};
625			    false ->
626				_ = ac_not_running(Appl#appl.name),
627				{true, Appl#appl{id = undefined}}
628			end;
629		   (_) ->
630			true
631		end,
632		S#state.appls),
633    RStarted = filter(fun({Node2, _Name}) when Node2 =:= Node -> false;
634			 (_) -> true
635		      end,
636		      S#state.remote_started),
637    Appls3 = dist_del_node(Appls2, Node),
638    {NPermitReq, Appls4, SReqs} = req_del_node(S, Node, Appls3),
639    NKnown = lists:delete(Node, S#state.known),
640    NewS = S#state{appls = Appls4, p_reqs = NPermitReq, known = NKnown,
641		   s_reqs = SReqs,
642		   remote_started = RStarted},
643    restart_appls(HisAppls),
644    {noreply, NewS};
645
646handle_info({dist_ac_app_loaded, Node, Name, HisNodes, Permission, HeKnowsMe},
647	    S) ->
648    Nodes = dist_find_nodes(Appls = S#state.appls, Name),
649    case is_loaded(Name, S) of
650	true ->
651	    case equal_nodes(Nodes, HisNodes) of
652		true ->
653		    NAppls = dist_update_run(Appls, Name, Node, Permission),
654		    if
655			not HeKnowsMe ->
656			    %% We've got it loaded, but he doesn't know -
657			    %% he's a new node connecting to us.
658			    Msg = {dist_ac_app_loaded, node(), Name,
659				   Nodes, dist_is_runnable(Appls, Name), true},
660			    {?DIST_AC, Node} ! Msg,
661                            ok;
662			true ->
663			    ok
664		    end,
665		    {noreply, S#state{appls = NAppls}};
666		false ->
667		    dist_mismatch(Name, Node)
668	    end;
669	false ->
670	    Load =[{{Name, Node}, HisNodes, Permission} | S#state.dist_loaded],
671	    {noreply, S#state{dist_loaded = Load}}
672    end;
673
674handle_info({dist_ac_app_unloaded, Node, Name}, S) ->
675    Appls = dist_update_run(S#state.appls, Name, Node, undefined),
676    Load = keydelete({Name, Node}, 1, S#state.dist_loaded),
677    {noreply, S#state{appls = Appls, dist_loaded = Load}};
678
679
680handle_info({dist_ac_new_permission, Node, AppName, false, IsHisApp}, S) ->
681    Appls = dist_update_run(S#state.appls, AppName, Node, false),
682    NewS = S#state{appls =Appls},
683    case dist_is_runnable(Appls, AppName) of
684	true when IsHisApp ->
685	    case catch start_appl(AppName, NewS, req) of
686		{ok, NewS2, _}  ->
687		    {noreply, NewS2};
688		{error, _R} -> % if app was permanent, AC will shutdown the node
689		    {noreply, NewS}
690	    end;
691	_ ->
692	    {noreply, NewS}
693    end;
694handle_info({dist_ac_new_permission, Node, AppName, true, _IsHisApp}, S) ->
695    Appls = dist_update_run(S#state.appls, AppName, Node, true),
696    {noreply, S#state{appls = Appls}};
697
698handle_info({internal_restart_appl, Name}, S) ->
699    case restart_appl(Name, S) of
700	{error, R} ->
701	    {stop, {error, R}, S};
702	NewS ->
703	    {noreply, NewS}
704    end;
705
706handle_info(_, S) ->
707    {noreply, S}.
708
709terminate(_Reason, _S) ->
710    ok.
711
712code_change(_OldVsn, State, _Extra) ->
713    {ok, State}.
714
715%%%-----------------------------------------------------------------
716%%% Internal functions
717%%%-----------------------------------------------------------------
718load(AppName, S) ->
719    Appls0 = S#state.appls,
720    %% Get the dist specification for the app on other nodes
721    DistLoaded = get_dist_loaded(AppName, Load1 = S#state.dist_loaded),
722    %% Get the local dist specification
723    Nodes = dist_find_nodes(Appls0, AppName),
724    FNodes = flat_nodes(Nodes),
725    %% Update dists spec with our local permission
726    Permission = get_default_permission(AppName),
727    Appls1 = dist_update_run(Appls0, AppName, node(), Permission),
728    %% Compare the local spec with other nodes's specs
729    %% If equal, update our spec with his current permission
730    {LoadedNodes, Appls2} =
731	mapfoldl(
732	  fun({Node, HisNodes, HisPermission}, Appls) ->
733		  case equal_nodes(Nodes, HisNodes) of
734		      true ->
735			  {Node, dist_update_run(Appls, AppName,
736						 Node, HisPermission)};
737		      _ ->
738			  dist_mismatch(AppName, Node)
739		  end
740	  end, Appls1, DistLoaded),
741    Load2 = del_dist_loaded(AppName, Load1),
742    %% Tell all Nodes about the new appl loaded, and its permission.
743    foreach(fun(Node) when Node =/= node() ->
744		    Msg = {dist_ac_app_loaded, node(), AppName,
745			   Nodes, Permission, member(Node, LoadedNodes)},
746		    {?DIST_AC, Node} ! Msg;
747	       (_) -> ok
748	    end, FNodes),
749    {ok, S#state{appls = Appls2, dist_loaded = Load2}}.
750
751ensure_take_control(AppName, Appls) ->
752    %% Check if this is a new application that we don't control yet
753    case lists:keymember(AppName, #appl.name, Appls) of
754	true ->       % we have control
755	    ok;
756	false ->      % take control!
757	    %% Note: this works because this is executed within a
758	    %% synchronous call. I.e. we get the control *before*
759	    %% application:load returns. (otherwise application:start
760	    %% could be called before we got the chance to take control)
761	    %% The only reason we have to bother about this is because
762	    %% we have to be backwards compatible in the sense that all
763	    %% apps don't have to be specified in the 'distributed' parameter,
764	    %% but may be implicitly 'distributed' by a call to
765	    %% application:load.
766	    application_controller:control_application(AppName)
767    end.
768
769unload(AppName, S) ->
770    Appls = S#state.appls,
771    Nodes = dist_flat_nodes(Appls, AppName),
772    %% Tell all ACs in DistNodes about the unloaded appl
773    Msg = {dist_ac_app_unloaded, node(), AppName},
774    send_msg(Msg, Nodes),
775    {value, Appl} = keysearch(AppName, #appl.name, Appls),
776    NAppl = Appl#appl{id = undefined, run = []},
777    {ok, S#state{appls = keyreplace(AppName, #appl.name, Appls, NAppl)}}.
778
779start_appl(AppName, S, Type) ->
780    %% Get nodes, and check if App is loaded on all involved nodes.
781    %% If it is loaded everywhere, we know that we have the same picture
782    %% of the nodes; otherwise the load wouldn't have succeeded.
783    Appl = case keysearch(AppName, #appl.name, Appls = S#state.appls) of
784	       {value, A} -> A;
785	       _ -> throw({error, {unknown_application, AppName}})
786	   end,
787    case Appl#appl.id of
788	local ->
789	    %% UW 990913: we've already started the app
790	    %% this could happen if ac_start_application_req was resent.
791	    {ok,S,false};
792	_ ->
793	    {Id, IsWaiting} = case dist_get_all_nodes(Appl) of
794				  {ok, DistNodes, PermittedNodes} ->
795				      start_distributed(Appl, AppName, DistNodes,
796							PermittedNodes, S, Type);
797				  Error -> throw(Error)
798			      end,
799	    NAppl = Appl#appl{id = Id},
800	    NAppls = keyreplaceadd(AppName, #appl.name, Appls, NAppl),
801	    {ok, NewS} = req_start_app(S#state{appls = NAppls}, AppName),
802	    TmpLocals = keydelete_all(AppName, 1, NewS#state.tmp_locals),
803	    TmpWeights = keydelete_all(AppName, 1, NewS#state.tmp_weights),
804	    RStarted = keydelete(AppName, 2, S#state.remote_started),
805	    Started = replaceadd(AppName, NewS#state.started),
806	    {ok,
807	     NewS#state{started = Started, tmp_locals = TmpLocals,
808			tmp_weights = TmpWeights, remote_started = RStarted},
809	     IsWaiting}
810    end.
811
812
813start_distributed(Appl, Name, Nodes, PermittedNodes, S, Type) ->
814    case find_start_node(Nodes, PermittedNodes, Name, S) of
815	{ok, Node} when Node =:= node() ->
816	    _ = case Appl#appl.id of
817		    {failover, FoNode} when Type =:= req ->
818			ac_failover(Name, FoNode, undefined);
819		    {distributed, Node2} when Type =:= req ->
820			ac_takeover(req, Name, Node2, undefined);
821		    _ when Type =:= reply ->
822			case lists:keysearch(Name, 2, S#state.remote_started) of
823			    {value, {Node3, _}} ->
824				ac_takeover(reply, Name, Node3, undefined);
825			    _ ->
826				ac_start_it(Type, Name)
827			end;
828		    _ ->
829			ac_start_it(Type, Name)
830		end,
831	    {run_waiting, true};
832	{already_started, Node} ->
833	    _ = ac_started(Type, Name, Node),
834	    {{distributed, Node}, false};
835	{ok, Node} ->
836	    case keysearch(Name, #appl.name, S#state.appls) of
837		{value, #appl{id = {distributed, Node}}} ->
838		    _ = ac_started(Type, Name, Node),
839		    {{distributed, Node}, false};
840		_ ->
841		    wait_dist_start(Node, Appl, Name, Nodes,
842				    PermittedNodes, S, Type)
843	    end;
844	not_started ->
845	    wait_dist_start2(Appl, Name, Nodes, PermittedNodes, S, Type);
846	no_permission ->
847	    _ = ac_not_started(Type, Name),
848	    {undefined, false}
849    end.
850
851wait_dist_start(Node, Appl, Name, Nodes, PermittedNodes, S, Type) ->
852    monitor_node(Node, true),
853    receive
854	{dist_ac_app_started, Node, Name, ok} ->
855	    _ = ac_started(Type, Name, Node),
856	    monitor_node(Node, false),
857	    {{distributed, Node}, false};
858	{dist_ac_app_started, Node, Name, {error, R}} ->
859	    _ = ac_error(Type, Name, {Node, R}),
860	    monitor_node(Node, false),
861	    {Appl#appl.id, false};
862	{dist_ac_weight, Name, _Weigth, Node} ->
863	    %% This is the situation: {Name, [RNode, {Node}, node()]}
864	    %% and permit(false) is called on RNode, and we sent the
865	    %% weigth first.  Node handled it in handle_info, and
866	    %% now we must send him a weigth msg.  We can use any weigth;
867	    %% he wins anyway.
868	    monitor_node(Node, false),
869	    {?DIST_AC, Node} !
870		{dist_ac_weight, Name, get_cached_weight(Name, S), node()},
871	    wait_dist_start(Node, Appl, Name, Nodes, PermittedNodes, S, Type);
872	{nodedown, Node} ->
873	    monitor_node(Node, false),
874	    TmpLocals =
875		filter(fun({Name2, _Weight, Node2}) when Node2 =:= Node,
876							 Name2 =:= Name -> false;
877			  (_) -> true
878		       end,
879		       S#state.tmp_locals),
880	    NewS = S#state{tmp_locals = TmpLocals},
881	    start_distributed(Appl, Name, Nodes,
882			      lists:delete(Node, PermittedNodes), NewS, Type)
883    end.
884
885wait_dist_start2(Appl, Name, Nodes, PermittedNodes, S, Type) ->
886    receive
887	{dist_ac_app_started, Node, Name, ok} ->
888	    _ = ac_started(Type, Name, Node),
889	    {{distributed, Node}, false};
890	{dist_ac_app_started, Node, Name, {error, R}} ->
891	    _ = ac_error(Type, Name, {Node, R}),
892	    {Appl#appl.id, false};
893	{nodedown, Node} ->
894	    %% A node went down, try to start the app again - there may not
895	    %% be any more nodes to wait for.
896	    TmpLocals =
897		filter(fun({Name2, _Weight, Node2}) when Node2 =:= Node,
898							 Name2 =:= Name -> false;
899			  (_) -> true
900		       end,
901		       S#state.tmp_locals),
902	    NewS = S#state{tmp_locals = TmpLocals},
903	    start_distributed(Appl, Name, Nodes,
904			      lists:delete(Node, PermittedNodes), NewS, Type)
905    end.
906
907
908ac_start_it(reply, Name) ->
909    ?AC ! {ac_start_application_reply, Name, start_it};
910ac_start_it(req, Name) ->
911    ?AC ! {ac_change_application_req, Name, start_it}.
912
913ac_started(reply, Name, Node) ->
914    ?AC ! {ac_start_application_reply, Name, {started, Node}};
915ac_started(req, Name, Node) ->
916    ?AC ! {ac_change_application_req, Name, {started, Node}}.
917
918ac_error(reply, Name, Error) ->
919    ?AC ! {ac_start_application_reply, Name, {error, Error}};
920ac_error(req, _Name, _Error) ->
921    ok.
922
923ac_not_started(reply, Name) ->
924    ?AC ! {ac_start_application_reply, Name, not_started};
925ac_not_started(req, Name) ->
926    ?AC ! {ac_change_application_req, Name, stop_it}.
927
928ac_stop_it(Name) ->
929  ?AC ! {ac_change_application_req, Name, stop_it}.
930
931ac_takeover(reply, Name, Node, _RestartType) ->
932    ?AC ! {ac_start_application_reply, Name, {takeover, Node}};
933ac_takeover(req, Name, Node, RestartType) ->
934    ?AC ! {ac_change_application_req, Name,
935	   {takeover, Node, RestartType}}.
936
937ac_failover(Name, Node, RestartType) ->
938    ?AC ! {ac_change_application_req, Name,
939	   {failover, Node, RestartType}}.
940
941ac_not_running(Name) ->
942    ?AC ! {ac_change_application_req, Name, not_running}.
943
944restart_appls(Appls) ->
945    foreach(fun(Appl) ->
946		    AppName = Appl#appl.name,
947		    send_after(Appl#appl.restart_time,
948			       {internal_restart_appl, AppName})
949	    end, lists:reverse(Appls)).
950
951restart_appl(AppName, S) ->
952    case keysearch(AppName, #appl.name, S#state.appls) of
953	{value, Appl} when element(1, Appl#appl.id) =:= failover ->
954	    case catch start_appl(AppName, S, req) of
955		{ok, NewS, _} ->
956		    NewS;
957		{error, R}  ->
958		    error_msg("Error when restarting application ~p: ~p~n",
959			      [AppName, R]),
960		    S
961	    end;
962	_ ->
963	    S
964    end.
965
966%% permit(ShouldBeRunning, IsRunning, ...)
967permit(false, {value, #appl{id = undefined}}, _AppName, _From, S, _LockId) ->
968   {reply, ok, S}; % It's not running
969permit(false, {value, #appl{id = Id}}, _AppName, _From, S, _LockId)
970  when element(1, Id) =:= distributed ->
971    %% It is running at another node already
972    {reply, ok, S};
973permit(false, {value, _}, AppName, From, S, _LockId) ->
974    %% It is a distributed application
975    %% Check if there is any runnable node
976    case dist_get_runnable_nodes(S#state.appls, AppName) of
977	[] ->
978	    %% There is no runnable node; stop application
979	    _ = ac_stop_it(AppName),
980	    SReqs = [{AppName, From} | S#state.s_reqs],
981	    {noreply, S#state{s_reqs = SReqs}};
982	Nodes ->
983	    %% Delete all outstanding 'permit true' requests.
984	    PR = req_del_permit_true(S#state.p_reqs, AppName),
985	    NPReqs = [{From, AppName, false, Nodes} | PR],
986	    {noreply, S#state{p_reqs = NPReqs}}
987    end;
988permit(true, {value, #appl{id = local}}, _AppName, _From, S, _LockId) ->
989    {reply, ok, S};
990permit(true, _, AppName, From, S, LockId) ->
991    case catch start_appl(AppName, S, req) of
992	{_ErrorTag, {not_running, App}} ->
993	    %% Delete all outstanding 'permit false' requests
994	    PR = req_del_permit_false(S#state.p_reqs, AppName),
995	    NPReqs = [{false, AppName, true, App} | PR],
996	    {reply, ok, S#state{p_reqs = NPReqs}};
997	{ok, NewS, true} ->
998	    %% We have ordered a start or a takeover; we must not return
999	    %% until the app is running.
1000	    TR = NewS#state.t_reqs,
1001	    %% Delete the lock, so others may start the app
1002	    global:del_lock(LockId),
1003	    {noreply, NewS#state{t_reqs = [{AppName, From} | TR]}};
1004	{ok, _S, false} ->
1005	    %% Application should be started, but at another node
1006	    %% State remains the same
1007	    {reply, ok, S};
1008	{_ErrorTag, R} ->
1009	    {stop, R, {error, R}, S}
1010    end.
1011
1012do_start_appls(StartApps, S) ->
1013    SortedStartApps = StartApps,
1014    Appls = S#state.appls,
1015    {ok, foldl(
1016      fun(AppName, NewS) ->
1017	      case catch start_appl(AppName, NewS, req) of
1018		  {error, R}  ->
1019		      throw({{error, NewS}, R});
1020		  {ok, NewS2, _} ->
1021		      NewS2
1022	      end
1023      end, S#state{appls = Appls}, lists:reverse(SortedStartApps))}.
1024
1025%%-----------------------------------------------------------------
1026%% Nodes = [node() | {node(), ..., node()}]
1027%% A list in priority order.  If it is a tuple, we may pick any of
1028%% them.  This decision is made by all nodes in the list, and all
1029%% nodes choose the same.  This is accomplished in the following
1030%% way:  all Nodes send to all others a msg which tells how many
1031%% applications each node has started.  The one with least no of
1032%% appls starts this one.
1033%%-----------------------------------------------------------------
1034find_start_node(Nodes, PermittedNodes, Name, S) ->
1035    AllNodes = intersection(flat_nodes(Nodes), PermittedNodes),
1036    case lists:member(node(), AllNodes) of
1037	true ->
1038	    Weight = get_cached_weight(Name, S),
1039	    find_start_node(Nodes, Name, S, Weight, AllNodes);
1040	false ->
1041	    case keysearch(Name, 2, S#state.remote_started) of
1042		{value, {Node, _Name}} ->
1043		    {already_started, Node};
1044		_ when AllNodes =/= [] ->
1045		    not_started;
1046		_ ->
1047		    no_permission
1048	    end
1049    end.
1050
1051find_start_node([AnyNodes | Nodes], Name, S, Weight, AllNodes)
1052  when is_tuple(AnyNodes) ->
1053    case find_any_node(tuple_to_list(AnyNodes), Name, S, Weight, AllNodes) of
1054	false -> find_start_node(Nodes, Name, S, Weight, AllNodes);
1055	Res -> Res
1056    end;
1057find_start_node([Node | Nodes], Name, S, Weight, AllNodes) ->
1058    case lists:member(Node, AllNodes) of
1059	true ->
1060	    case keysearch(Name, #appl.name, S#state.appls) of
1061		{value, #appl{id = {distributed, Node}}} ->
1062		    {already_started, Node};
1063		_ ->
1064		    case keysearch(Name, 2, S#state.remote_started) of
1065			{value, {Node, _Name}} ->
1066			    {already_started, Node};
1067			_ ->
1068			    {ok, Node}
1069		    end
1070	    end;
1071	false -> find_start_node(Nodes, Name, S, Weight, AllNodes)
1072    end;
1073find_start_node([], _Name, _S, _Weight, _AllNodes) ->
1074    not_started.
1075
1076%%-----------------------------------------------------------------
1077%% First of all, check if the application is already running
1078%% somewhere in AnyNodes; in that case we shall not move it!
1079%%-----------------------------------------------------------------
1080find_any_node(AnyNodes, Name, S, Weight, AllNodes) ->
1081    case check_running(Name, S, intersection(AnyNodes, AllNodes)) of
1082	{already_started, Node} -> {already_started, Node};
1083	false ->
1084	    %% Synchronize with all other nodes.
1085	    send_nodes(AllNodes, {dist_ac_weight, Name, Weight, node()}),
1086	    Answers = [{Weight, node()} |
1087		       collect_answers(AllNodes, Name, S, [])],
1088	    %% Make a decision (the same at every node) (smallest weight wins)
1089	    find_alive_node(lists:sort(Answers),
1090			    intersection(AnyNodes, S#state.known))
1091    end.
1092
1093%%-----------------------------------------------------------------
1094%% Check if another node started the appl before we got alive.
1095%% If so, check if the node is one of AnyNodes.
1096%%-----------------------------------------------------------------
1097check_running(Name, #state{remote_started = RStarted,
1098			   appls = Appls}, AnyNodes) ->
1099    case keysearch(Name, 2, RStarted) of
1100	{value, {Node, _Name}} ->
1101	    case lists:member(Node, AnyNodes) of
1102		true -> {already_started, Node};
1103		false -> false
1104	    end;
1105	false ->
1106	    case keysearch(Name, #appl.name, Appls) of
1107		{value, #appl{id = {distributed, Node}}} ->
1108		    case lists:member(Node, AnyNodes) of
1109			true -> {already_started, Node};
1110			false -> false
1111		    end;
1112		_ ->
1113		    false
1114	    end
1115    end.
1116
1117find_alive_node([{_, Node} | Nodes], AliveNodes) ->
1118    case lists:member(Node, AliveNodes) of
1119	true -> {ok, Node};
1120	false -> find_alive_node(Nodes, AliveNodes)
1121    end;
1122find_alive_node([], _AliveNodes) ->
1123    false.
1124
1125%%-----------------------------------------------------------------
1126%% First, check if the node's msg is buffered (received in our
1127%% main loop).  Otherwise, wait for msg or nodedown.
1128%% We have sent the dist_ac_weight message, and will wait for it
1129%% to be received here (or a nodedown).  This implies that a
1130%% dist_ac must *always* be prepared to get this messages, and to
1131%% send it to us.
1132%%-----------------------------------------------------------------
1133collect_answers([Node | Nodes], Name, S, Res) when Node =/= node() ->
1134    case keysearch(Node, 3, S#state.tmp_locals) of
1135	{value, {Name, Weight, Node}} ->
1136	    collect_answers(Nodes, Name, S, [{Weight, Node} | Res]);
1137	_ ->
1138	    monitor_node(Node, true),
1139	    receive
1140		{dist_ac_weight, Name, Weight, Node} ->
1141		    monitor_node(Node, false),
1142		    collect_answers(Nodes, Name, S, [{Weight, Node} | Res]);
1143		{nodedown, Node} ->
1144		    monitor_node(Node, false),
1145		    collect_answers(Nodes, Name, S, Res)
1146	    end
1147    end;
1148collect_answers([_ThisNode | Nodes], Name, S, Res) ->
1149    collect_answers(Nodes, Name, S, Res);
1150collect_answers([], _Name, _S, Res) ->
1151    Res.
1152
1153send_nodes(Nodes, Msg) ->
1154    FlatNodes = flat_nodes(Nodes),
1155    foreach(fun(Node) when Node =/= node() -> {?DIST_AC, Node} ! Msg;
1156	       (_ThisNode) -> ok
1157	    end, FlatNodes).
1158
1159send_after(Time, Msg) when is_integer(Time), Time >= 0 ->
1160    _Pid = spawn_link(?MODULE, send_timeout, [self(), Time, Msg]),
1161    ok;
1162send_after(_,_) -> % infinity
1163    ok.
1164
1165send_timeout(To, Time, Msg) ->
1166    receive
1167    after Time -> To ! Msg
1168    end.
1169
1170send_msg(Msg, Nodes) ->
1171    foreach(fun(Node) when Node =/= node() -> {?DIST_AC, Node} ! Msg;
1172	       (_) -> ok
1173	    end, Nodes).
1174
1175replaceadd(Item, List) ->
1176    case member(Item, List) of
1177	true -> List;
1178	false -> [Item | List]
1179    end.
1180
1181keyreplaceadd(Key, Pos, List, New) ->
1182    case lists:keymember(Key, Pos, List) of
1183	true -> lists:keyreplace(Key, Pos, List, New);
1184	false -> [New | List]
1185    end.
1186
1187keydelete_all(Key, N, [H|T]) when element(N, H) =:= Key ->
1188    keydelete_all(Key, N, T);
1189keydelete_all(Key, N, [H|T]) ->
1190    [H|keydelete_all(Key, N, T)];
1191keydelete_all(_Key, _N, []) -> [].
1192
1193-ifdef(NOTUSED).
1194keysearchdelete(Key, Pos, List) ->
1195    ksd(Key, Pos, List, []).
1196
1197ksd(Key, Pos, [H | T], Rest) when element(Pos, H) =:= Key ->
1198    {value, H, Rest ++ T};
1199ksd(Key, Pos, [H | T], Rest) ->
1200    ksd(Key, Pos, T, [H | Rest]);
1201ksd(_Key, _Pos, [], _Rest) ->
1202    false.
1203
1204get_new_appl(Name, [{application, Name, App} | _]) ->
1205    {ok, {application, Name, App}};
1206get_new_appl(Name, [_ | T]) -> get_new_appl(Name, T);
1207get_new_appl(Name, []) -> false.
1208-endif.
1209
1210equal_nodes([H | T1], [H | T2]) when is_atom(H) ->
1211    equal_nodes(T1, T2);
1212equal_nodes([H1 | T1], [H2 | T2]) when is_tuple(H1), is_tuple(H2) ->
1213    case equal(tuple_to_list(H1), tuple_to_list(H2)) of
1214	true -> equal_nodes(T1, T2);
1215	false -> false
1216    end;
1217equal_nodes([], []) -> true;
1218equal_nodes(_, _) -> false.
1219
1220equal([H | T] , S) ->
1221    case lists:member(H, S) of
1222	true -> equal(T, lists:delete(H, S));
1223	false -> false
1224    end;
1225equal([], []) -> true;
1226equal(_, _) -> false.
1227
1228flat_nodes(Nodes) when is_list(Nodes) ->
1229    foldl(fun(Node, Res) when is_atom(Node) -> [Node | Res];
1230	     (Tuple, Res) when is_tuple(Tuple) -> tuple_to_list(Tuple) ++ Res
1231	  end, [], Nodes);
1232flat_nodes(Nodes) ->
1233    throw({error, {badarg, Nodes}}).
1234
1235get_cached_weight(Name, S) ->
1236    case lists:keysearch(Name, 1, S#state.tmp_weights) of
1237	{value, {_, W}} -> W;
1238	_ -> get_weight()
1239    end.
1240
1241%% Simple weight; just count the number of applications running.
1242get_weight() ->
1243    length(application:which_applications()).
1244
1245get_dist_loaded(Name, [{{Name, Node}, HisNodes, Permission} | T]) ->
1246    [{Node, HisNodes, Permission} | get_dist_loaded(Name, T)];
1247get_dist_loaded(Name, [_H | T]) ->
1248    get_dist_loaded(Name, T);
1249get_dist_loaded(_Name, []) ->
1250    [].
1251
1252del_dist_loaded(Name, [{{Name, _Node}, _HisNodes, _Permission} | T]) ->
1253    del_dist_loaded(Name, T);
1254del_dist_loaded(Name, [H | T]) ->
1255    [H | del_dist_loaded(Name, T)];
1256del_dist_loaded(_Name, []) ->
1257    [].
1258
1259req_start_app(State, Name) ->
1260    {ok, foldl(
1261	   fun({false, AppName, true, Name2}, S) when Name =:= Name2 ->
1262		   PR = keydelete(AppName, 2, S#state.p_reqs),
1263		   NS = S#state{p_reqs = PR},
1264		   case catch do_start_appls([AppName], NS) of
1265		       {_ErrorTag, {not_running, App}} ->
1266			   NRequests = [{false, AppName, true, App} | PR],
1267			   S#state{p_reqs = NRequests};
1268		       {ok, NewS} ->
1269			   NewS;
1270		       {_ErrorTag, R} ->
1271			   throw({error, R})
1272		   end;
1273	      (_, S) ->
1274		   S
1275	   end, State, State#state.p_reqs)}.
1276
1277
1278req_del_permit_true(Reqs, Name) ->
1279    filter(fun({From, Name2, true, _}) when Name2 =:= Name ->
1280		   gen_server:reply(From, ok),
1281		   false;
1282	      (_) ->
1283		   true
1284	   end, Reqs).
1285
1286req_del_permit_false(Reqs, Name) ->
1287    filter(fun({From, Name2, false, _Nodes}) when Name2 =:= Name ->
1288		   gen_server:reply(From, ok),
1289		   false;
1290	      (_) ->
1291		   true
1292	   end, Reqs).
1293
1294req_del_node(S, Node, Appls) ->
1295    check_waiting(S#state.p_reqs, S, Node, Appls, [], S#state.s_reqs).
1296
1297del_t_reqs(AppName, TReqs, Res) ->
1298    lists:filter(fun({AN, From}) when AppName =:= AN ->
1299			 gen_server:reply(From, Res),
1300			 false;
1301		    (_) ->
1302			 true
1303		 end,
1304		 TReqs).
1305
1306
1307check_waiting([{From, AppName, false, Nodes} | Reqs],
1308	      S, Node, Appls, Res, SReqs) ->
1309    case lists:delete(Node, Nodes) of
1310	[] ->
1311	    _ = ac_stop_it(AppName),
1312	    NSReqs = [{AppName, From} | SReqs],
1313	    check_waiting(Reqs, Node, S, Appls, Res, NSReqs);
1314	NNodes ->
1315	    check_waiting(Reqs, Node, S, Appls,
1316			  [{From, AppName, false, NNodes} | Res], SReqs)
1317    end;
1318check_waiting([H | Reqs], S, Node, Appls, Res, SReqs) ->
1319    check_waiting(Reqs, Node, S, Appls, [H | Res], SReqs);
1320check_waiting([], _Node, _S, Appls, Res, SReqs) ->
1321    {Res, Appls, SReqs}.
1322
1323intersection([], _) ->
1324    [];
1325intersection(_, []) ->
1326    [];
1327intersection(L1, L2) ->
1328    L1 -- (L1 -- L2).
1329
1330get_default_permission(AppName) ->
1331    case application:get_env(kernel, permissions) of
1332	{ok, Permissions} ->
1333	    case keysearch(AppName, 1, Permissions) of
1334		{value, {_, true}} ->  true;
1335		{value, {_, false}} -> false;
1336		{value, {_, X}} -> exit({bad_permission, {AppName, X}});
1337		false -> true
1338	    end;
1339	undefined -> true
1340    end.
1341
1342%%-----------------------------------------------------------------
1343%% ADT dist() - info on how an application is distributed
1344%% dist() = [{AppName, Time, DistNodes, [{Node, Runnable}]}]
1345%% Time = int() >= 0 | infinity
1346%% Nodes = [node() | {node()...}]
1347%% Runnable = true | false | undefined
1348%%       An appl may not be started if any Runnable is undefined;
1349%%       i.e. the appl must be loaded on all Nodes.
1350%%-----------------------------------------------------------------
1351dist_check([{AppName, Nodes} | T]) ->
1352    P = get_default_permission(AppName),
1353    [#appl{name = AppName, nodes = Nodes, run = [{node(), P}]} | dist_check(T)];
1354dist_check([{AppName, Time, Nodes} | T]) when is_integer(Time), Time >= 0 ->
1355    P = get_default_permission(AppName),
1356    [#appl{name = AppName, restart_time = Time, nodes = Nodes,
1357	   run = [{node(), P}]} | dist_check(T)];
1358dist_check([{AppName, infinity, Nodes} | T]) ->
1359    P = get_default_permission(AppName),
1360    [#appl{name = AppName, restart_time = infinity,
1361	   nodes = Nodes, run = [{node(), P}]} |
1362     dist_check(T)];
1363dist_check([_ | T]) ->
1364    dist_check(T);
1365dist_check([]) ->
1366    [].
1367
1368dist_take_control(Appls) ->
1369    foreach(fun(#appl{name = AppName}) ->
1370		    application_controller:control_application(AppName)
1371	    end, Appls).
1372
1373dist_replace(default, _Name, Appls) -> Appls;
1374dist_replace({AppName, Nodes}, AppName, Appls) ->
1375    Run = [{Node, undefined} || Node <- flat_nodes(Nodes)],
1376    keyreplaceadd(AppName, #appl.name, Appls,
1377		  #appl{name = AppName, restart_time = 0,
1378			nodes = Nodes, run = Run});
1379dist_replace({AppName, Time, Nodes}, AppName, Appls)
1380  when is_integer(Time), Time >= 0 ->
1381    Run = [{Node, undefined} || Node <- flat_nodes(Nodes)],
1382    keyreplaceadd(AppName, #appl.name, Appls,
1383		  #appl{name = AppName, restart_time = Time,
1384			nodes = Nodes, run = Run});
1385dist_replace(Bad, _Name, _Appls) ->
1386    throw({error, {bad_distribution_spec, Bad}}).
1387
1388dist_update_run(Appls, AppName, Node, Permission) ->
1389    map(fun(Appl) when Appl#appl.name =:= AppName ->
1390		Run = Appl#appl.run,
1391		NRun = keyreplaceadd(Node, 1, Run, {Node, Permission}),
1392		Appl#appl{run = NRun};
1393	   (Appl) ->
1394		Appl
1395	end, Appls).
1396
1397
1398
1399dist_change_update(Appls, []) ->
1400    Appls;
1401dist_change_update(Appls, [{AppName, NewNodes} | NewDist]) ->
1402    NewAppls = do_dist_change_update(Appls, AppName, 0, NewNodes),
1403    dist_change_update(NewAppls, NewDist);
1404dist_change_update(Appls, [{AppName, NewTime, NewNodes} | NewDist]) ->
1405    NewAppls = do_dist_change_update(Appls, AppName, NewTime, NewNodes),
1406    dist_change_update(NewAppls, NewDist).
1407
1408do_dist_change_update(Appls, AppName, NewTime, NewNodes) ->
1409    map(fun(Appl) when Appl#appl.name =:= AppName ->
1410		Appl#appl{restart_time = NewTime, nodes = NewNodes};
1411	   (Appl) ->
1412		Appl
1413	end, Appls).
1414
1415%% Merge his Permissions with mine.
1416dist_merge(MyAppls, HisAppls, HisNode) ->
1417    zf(fun(Appl) ->
1418	       #appl{name = AppName, run = Run} = Appl,
1419%	       #appl{name = AppName, nodes = Nodes, run = Run} = Appl,
1420%	       HeIsMember = lists:member(HisNode, flat_nodes(Nodes)),
1421	       HeIsMember = true,
1422	       case keysearch(AppName, #appl.name, HisAppls) of
1423		   {value, #appl{run = HisRun}} when HeIsMember ->
1424		       case keysearch(HisNode, 1, HisRun) of
1425			   {value, Val} -> % He has it loaded
1426			       NRun = keyreplaceadd(HisNode, 1, Run, Val),
1427			       {true, Appl#appl{run = NRun}};
1428			   false -> % He hasn't loaded it yet
1429			       Val = {HisNode, undefined},
1430			       {true, Appl#appl{run = [Val | Run]}}
1431		       end;
1432		   _ ->
1433		       true
1434	       end
1435       end, MyAppls).
1436
1437dist_get_runnable_nodes(Appls, AppName) ->
1438    case keysearch(AppName, #appl.name, Appls) of
1439	{value, #appl{run = Run}} ->
1440	    zf(fun({Node, true}) -> {true, Node};
1441		  (_) -> false
1442	       end, Run);
1443	false ->
1444	    []
1445    end.
1446
1447dist_is_runnable(Appls, AppName) ->
1448    case keysearch(AppName, #appl.name, Appls) of
1449	{value, #appl{run = Run}} ->
1450	    case keysearch(node(), 1, Run) of
1451		{value, {_, true}} -> true;
1452		_ -> false
1453	    end;
1454	false ->
1455	    false
1456    end.
1457
1458is_loaded(AppName, #state{appls = Appls}) ->
1459    case keysearch(AppName, #appl.name, Appls) of
1460	{value, #appl{run = Run}} ->
1461	    case keysearch(node(), 1, Run) of
1462		{value, {_Node, undefined}} -> false;
1463		{value, _} -> true;
1464		false -> false
1465	    end;
1466	false ->
1467	    false
1468    end.
1469
1470dist_get_runnable(Appls) ->
1471    zf(fun(#appl{name = AppName, run = Run}) ->
1472	       case keysearch(node(), 1, Run) of
1473		   {value, {_, true}} -> {true, AppName};
1474		   _ -> false
1475	       end
1476       end, Appls).
1477
1478dist_get_all_nodes(#appl{name = AppName, nodes = Nodes, run = Run}) ->
1479    {Res, BadNodes} = check_nodes(Run, [], []),
1480    case intersection(BadNodes, erlang:nodes(connected)) of
1481        [] -> {ok, Nodes, Res};
1482        _ -> {error, {app_not_loaded, AppName, BadNodes}}
1483    end.
1484
1485check_nodes([{Node, undefined} | T], Res, BadNodes) ->
1486    check_nodes(T, Res, [Node | BadNodes]);
1487check_nodes([{Node, true} | T], Res, BadNodes) ->
1488    check_nodes(T, [Node | Res], BadNodes);
1489check_nodes([{_Node, false} | T], Res, BadNodes) ->
1490    check_nodes(T, Res, BadNodes);
1491check_nodes([], Res, BadNodes) ->
1492    {Res, BadNodes}.
1493
1494-ifdef(NOTUSED).
1495dist_find_time([#appl{name = Name, restart_time = Time} |_], Name) -> Time;
1496dist_find_time([_ | T], Name) -> dist_find_time(T, Name);
1497dist_find_time([], Name) -> 0.
1498-endif.
1499
1500%% Find all nodes that can run the app (even if they're not permitted
1501%% to right now).
1502dist_find_nodes([#appl{name = Name, nodes = Nodes} |_], Name) -> Nodes;
1503dist_find_nodes([_ | T], Name) -> dist_find_nodes(T, Name);
1504dist_find_nodes([], _Name) -> [].
1505
1506dist_flat_nodes(Appls, Name) ->
1507    flat_nodes(dist_find_nodes(Appls, Name)).
1508
1509dist_del_node(Appls, Node) ->
1510    map(fun(Appl) ->
1511		NRun = filter(fun({N, _Runnable}) when N =:= Node -> false;
1512				 (_) -> true
1513			      end, Appl#appl.run),
1514		Appl#appl{run = NRun}
1515	end, Appls).
1516
1517valid_restart_type(permanent)   -> true;
1518valid_restart_type(temporary)   -> true;
1519valid_restart_type(transient)   -> true;
1520valid_restart_type(_RestartType) -> false.
1521
1522dist_mismatch(AppName, Node) ->
1523    error_msg("Distribution mismatch for application \"~p\" on nodes ~p and ~p~n",
1524	      [AppName, node(), Node]),
1525    exit({distribution_mismatch, AppName, Node}).
1526
1527%error_msg(Format) when is_list(Format) ->
1528%    error_msg(Format, []).
1529
1530error_msg(Format, ArgList) when is_list(Format), is_list(ArgList) ->
1531    error_logger:error_msg("dist_ac on node ~p:~n" ++ Format, [node()|ArgList]).
1532
1533%info_msg(Format) when is_list(Format) ->
1534%    info_msg(Format, []).
1535
1536%info_msg(Format, ArgList) when is_list(Format), is_list(ArgList) ->
1537%    error_logger:info_msg("dist_ac on node ~p:~n" ++ Format, [node()|ArgList]).
1538