1%% 2%% %CopyrightBegin% 3%% 4%% Copyright Ericsson AB 1997-2017. All Rights Reserved. 5%% 6%% Licensed under the Apache License, Version 2.0 (the "License"); 7%% you may not use this file except in compliance with the License. 8%% You may obtain a copy of the License at 9%% 10%% http://www.apache.org/licenses/LICENSE-2.0 11%% 12%% Unless required by applicable law or agreed to in writing, software 13%% distributed under the License is distributed on an "AS IS" BASIS, 14%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15%% See the License for the specific language governing permissions and 16%% limitations under the License. 17%% 18%% %CopyrightEnd% 19%% 20-module(dist_ac). 21 22-behaviour(gen_server). 23 24%% External exports 25-export([start_link/0, 26 load_application/2, 27 takeover_application/2, 28 permit_application/2, 29 permit_only_loaded_application/2]). 30 31-export([get_known_nodes/0]). 32 33%% Internal exports 34-export([init/1, handle_cast/2, handle_call/3, handle_info/2, terminate/2, 35 code_change/3, send_timeout/3]). 36-export([info/0]). 37 38-import(lists, [zf/2, filter/2, map/2, foreach/2, foldl/3, mapfoldl/3, 39 keysearch/3, keydelete/3, keyreplace/4, member/2]). 40 41-define(AC, application_controller). 42-define(DIST_AC, ?MODULE). 43-define(LOCK_ID, ?MODULE). 44 45%% This is the protocol version for the dist_ac protcol (between nodes) 46-define(vsn, 1). 47 48%%%----------------------------------------------------------------- 49%%% This module implements the default Distributed Applications 50%%% Controller. It is possible to write other controllers, when 51%%% the functionality in this module are not sufficient. 52%%% The process cooperates with the application_controller. 53%%%----------------------------------------------------------------- 54 55%%----------------------------------------------------------------- 56%% Naming conventions: 57%% Appl = #appl 58%% AppName = atom() 59%%----------------------------------------------------------------- 60-record(state, {appls = [], tmp_locals = [], remote_started = [], 61 known = [], started = [], tmp_weights = [], 62 dist_loaded = [], t_reqs = [], s_reqs = [], p_reqs = []}). 63%%----------------------------------------------------------------- 64%% appls = [#appl()] - these are the applications we control 65%% tmp_locals = [{AppName, Weight, node()}] - tmp, info part of 66%% application startup for some distrib appls, 67%% not yet handled. 68%% remote_started = [{Node, AppName}] - info on apps started before 69%% we were started 70%% known = [Node] - These are the nodes known to us 71%% started = [AppName] - An ordered list of started applications 72%% (reversed start order) 73%% tmp_weight = [{AppName, MyWeight}] - tmp, if we're forced to 74%% send a dist_ac_weight message before we're prepared to, 75%% we remember the weight we sent here, so we can use 76%% it in the dist_ac_weight msgs later. 77%% dist_loaded = {{Name, Node}, HisNodes, Permission} - info on 78%% application loaded on other nodes (and own node) 79%% t_reqs = [{AppName, From}] - processes waiting for takeover 80%% to complete. 81%% s_reqs = [{AppName, From}] - processes waiting for stop 82%% to complete. 83%% p_reqs = [{From, AppName, Bool, [Node]] - outstanding permit. 84%% Nodes is a list of nodes we're still waiting for. 85%%----------------------------------------------------------------- 86 87-record(appl, {name, id, restart_time = 0, nodes = [], run = []}). 88 89%%----------------------------------------------------------------- 90%% id = local | undefined | {distributed, node()} | waiting | run_waiting | 91%% {failover, Node} | {takeover, Node} 92%% local : local application 93%% undefined : not yet started 94%% {distributed, Node} : running on another node, we're standby 95%% {failover, Node} : failover from Node 96%% {takeover, Node} : takeover from Node 97%% waiting : other node went down, we're waiting for a timeout 98%% to takeover it. From = pid() | undefined 99%% run_waiting : we have decided to start the app; wait for the 100%% AC result 101%%----------------------------------------------------------------- 102 103start_link() -> 104 case gen_server:start_link({local, ?DIST_AC}, ?MODULE, [], []) of 105 {ok, Pid} -> 106 gen_server:cast(?DIST_AC, init_sync), 107 {ok, Pid}; 108 Else -> 109 Else 110 end. 111 112 113%%----------------------------------------------------------------- 114%% Func: load_application(AppName, DistNodes) 115%% Args: AppName = atom() 116%% DistNodes = default | {AppName, Time, [node() | {node()...}]} 117%% Purpose: Notifies the dist_ac about distributed nodes for an 118%% application. DistNodes overrides the kernel 'distributed' 119%% parameter. 120%% Returns: ok | {error, Reason} 121%%----------------------------------------------------------------- 122load_application(AppName, DistNodes) -> 123 gen_server:call(?DIST_AC, {load_application, AppName, DistNodes}, infinity). 124 125takeover_application(AppName, RestartType) -> 126 case valid_restart_type(RestartType) of 127 true -> 128 wait_for_sync_dacs(), 129 Nodes = get_nodes(AppName), 130 global:trans( 131 {?LOCK_ID, self()}, 132 fun() -> 133 gen_server:call( 134 ?DIST_AC, 135 {takeover_application, AppName, RestartType}, 136 infinity) 137 end, 138 Nodes); 139 false -> 140 {error, {invalid_restart_type, RestartType}} 141 end. 142 143%%----------------------------------------------------------------- 144%% This function controls which applications are permitted to run. If 145%% an application X runs when this function is called as 146%% permit_application(X, false), it is moved to another node where it 147%% is permitted to run (distributed applications only). If there is 148%% no such node, the application is stopped. (I.e. local applications 149%% are always stopped, and distributed applications with no other node 150%% alive are stopped as well.) If later a call to 151%% permit_application(X, true) is made, X is restarted. 152%% For example, suppose applications app1 and app2 are started and 153%% running. 154%% If we evaluate 155%% permit_application(app2, false) 156%% app2 is stopped and app1 only is running. 157%% If we now evaluate 158%% permit_application(app2, true), 159%% permit_application(app3, true) 160%% app2 is restarted, but not app3, since it hasn't been started by a 161%% call to start_application. 162%%----------------------------------------------------------------- 163permit_application(AppName, Bool) -> 164 wait_for_sync_dacs(), 165 LockId = {?LOCK_ID, self()}, 166 global:trans( 167 LockId, 168 fun() -> 169 gen_server:call(?DIST_AC, 170 {permit_application, AppName, Bool, LockId, started}, 171 infinity) 172 end). 173 174permit_only_loaded_application(AppName, Bool) -> 175 wait_for_sync_dacs(), 176 LockId = {?LOCK_ID, self()}, 177 global:trans( 178 LockId, 179 fun() -> 180 gen_server:call(?DIST_AC, 181 {permit_application, AppName, Bool, LockId, only_loaded}, 182 infinity) 183 end). 184 185get_nodes(AppName) -> 186 gen_server:call(?DIST_AC, {get_nodes, AppName}, infinity). 187 188get_known_nodes() -> 189 gen_server:call(?DIST_AC, get_known_nodes). 190 191%%%----------------------------------------------------------------- 192%%% call-back functions from gen_server 193%%%----------------------------------------------------------------- 194init([]) -> 195 process_flag(trap_exit, true), 196 {ok, #state{}}. 197 198sync_dacs(Appls) -> 199 Res = global:trans({?LOCK_ID, sync_dacs}, 200 fun() -> 201 Nodes = introduce_me(nodes(), Appls), 202 wait_dacs(Nodes, [node()], Appls, []) 203 end), 204 ets:insert(ac_tab, {sync_dacs, ok}), 205 Res. 206 207introduce_me(Nodes, Appls) -> 208 Msg = {dist_ac_new_node, ?vsn, node(), Appls, []}, 209 filter(fun(Node) -> 210 %% This handles nodes without DACs 211 case rpc:call(Node, erlang, whereis, [?DIST_AC]) of 212 Pid when is_pid(Pid) -> 213 Pid ! Msg, 214 true; 215 _ -> 216 false 217 end 218 end, Nodes). 219 220wait_dacs([Node | Nodes], KnownNodes, Appls, RStarted) -> 221 monitor_node(Node, true), 222 receive 223 %% HisAppls =/= [] is the case when our node connects to a running system 224 %% 225 %% It is always the responsibility of newer versions to understand 226 %% older versions of the protocol. As we don't have any older 227 %% versions (that are supposed to work with this version), we 228 %% don't handle version mismatch here. 229 {dist_ac_new_node, _Vsn, Node, HisAppls, HisStarted} -> 230 monitor_node(Node, false), 231 NRStarted = RStarted ++ HisStarted, 232 NAppls = dist_merge(Appls, HisAppls, Node), 233 wait_dacs(Nodes, [Node | KnownNodes], NAppls, NRStarted); 234 {nodedown, Node} -> 235 monitor_node(Node, false), 236 wait_dacs(Nodes, KnownNodes, Appls, RStarted) 237 end; 238wait_dacs([], KnownNodes, Appls, RStarted) -> 239 {KnownNodes, Appls, RStarted}. 240 241 242info() -> 243 gen_server:call(?DIST_AC, info). 244 245 246%%----------------------------------------------------------------- 247%% All functions that can affect which applications are running 248%% execute within a global lock, to ensure that they are not 249%% executing at the same time as sync_dacs. However, to avoid a 250%% deadlock situation where e.g. permit_application gets the lock 251%% before sync_dacs, this function is used to ensure that the local 252%% sync_dacs always gets the lock first of all. The lock is still 253%% used to not interfere with sync_dacs on other nodes. 254%%----------------------------------------------------------------- 255wait_for_sync_dacs() -> 256 case catch ets:lookup(ac_tab, sync_dacs) of 257 [{sync_dacs, ok}] -> ok; 258 _ -> 259 receive after 100 -> ok end, 260 wait_for_sync_dacs() 261 end. 262 263handle_cast(init_sync, _S) -> 264 %% When the dist_ac is started, it receives this msg, and gets into 265 %% the receive loop. 'go' is sent from the kernel_config proc when 266 %% all nodes that should be pinged has been pinged. The reason for this 267 %% is that dist_ac syncs with the other nodes at start-up. That is, 268 %% it does _not_ handle partitioned nets! The other nodes tries to call 269 %% the local name dist_ac, which means that this name must be registered 270 %% before the distribution. But it can't sync until after the distribution 271 %% is started. Therefore, this 'go'-thing. 272 receive 273 {go, KernelConfig} -> 274 Appls = case application:get_env(kernel, distributed) of 275 {ok, D} -> dist_check(D); 276 undefined -> [] 277 end, 278 279 dist_take_control(Appls), 280 %% kernel_config waits for dist_ac to take control over its 281 %% applications. By this we can be sure that the kernel 282 %% application hasn't completed its start before dist_ac has 283 %% taken control over its applications. (OTP-3509) 284 KernelConfig ! dist_ac_took_control, 285 286 %% we're really just interested in nodedowns. 287 ok = net_kernel:monitor_nodes(true), 288 289 {Known, NAppls, RStarted} = sync_dacs(Appls), 290 291 {noreply, 292 #state{appls = NAppls, known = Known, remote_started = RStarted}} 293 end. 294 295 296handle_call(info, _From, S) -> 297 {reply, S, S}; 298 299 300 301handle_call({load_application, AppName, DistNodes}, _From, S) -> 302 Appls = S#state.appls, 303 case catch dist_replace(DistNodes, AppName, Appls) of 304 {error, Error} -> 305 {reply, {error, Error}, S}; 306 {'EXIT', R} -> 307 {stop, R, {error, R}, S}; 308 NAppls -> 309 NewS = case dist_find_nodes(NAppls, AppName) of 310 [] -> % No distrib nodes; we ignore it 311 S; 312 _Nodes -> 313 ensure_take_control(AppName, Appls), 314 {ok, S2} = load(AppName, S#state{appls = NAppls}), 315 S2 316 end, 317 {reply, ok, NewS} 318 end; 319 320handle_call({takeover_application, AppName, RestartType}, From, S) -> 321 Appls = S#state.appls, 322 case keysearch(AppName, #appl.name, Appls) of 323 {value, Appl} when element(1, Appl#appl.id) =:= distributed -> 324 {distributed, Node} = Appl#appl.id, 325 _ = ac_takeover(req, AppName, Node, RestartType), 326 NAppl = Appl#appl{id = takeover}, 327 NAppls = keyreplace(AppName, #appl.name, Appls, NAppl), 328 TR = S#state.t_reqs, 329 {noreply, S#state{appls = NAppls, 330 t_reqs = [{AppName, From} | TR]}}; 331 {value, #appl{id = local}} -> 332 {reply, {error, {already_running_locally, AppName}}, S}; 333 _ -> 334 {reply, {error, {not_running_distributed, AppName}}, S} 335 end; 336 337handle_call({permit_application, AppName, Bool, LockId, StartInfo}, From, S) -> 338 case lists:keymember(AppName, #appl.name, S#state.appls) of 339 false -> 340 %% This one covers the case with permit for non-distributed 341 %% applications. This shouldn't be handled like this, and not 342 %% here, but we have to be backwards-compatible. 343 case application_controller:get_loaded(AppName) of 344 {true, _} when not Bool -> 345 _ = ac_stop_it(AppName), 346 {reply, ok, S}; 347 {true, _} when Bool -> 348 _ = ac_start_it(req, AppName), 349 {reply, ok, S}; 350 false -> 351 {reply, {error, {not_loaded, AppName}}, S} 352 end; 353 true -> 354 NAppls = dist_update_run(S#state.appls, AppName, node(), Bool), 355 NewS = S#state{appls = NAppls}, 356 %% Check if the application is running 357 IsRunning = keysearch(AppName, #appl.name, NAppls), 358 IsMyApp = case IsRunning of 359 {value, #appl{id = local}} -> true; 360 _ -> false 361 end, 362 %% Tell everyone about the new permission 363 Nodes = dist_flat_nodes(NAppls, AppName), 364 Msg = {dist_ac_new_permission, node(), AppName, Bool, IsMyApp}, 365 send_msg(Msg, Nodes), 366 case StartInfo of 367 only_loaded -> 368 {reply, ok, NewS}; 369 started -> 370 permit(Bool, IsRunning, AppName, From, NewS, LockId) 371 end 372 end; 373 374%%----------------------------------------------------------------- 375%% The distributed parameter is changed. Update the parameters 376%% but the applications are actually not moved to other nodes 377%% even if they should. 378%%----------------------------------------------------------------- 379handle_call({distribution_changed, NewDistribution}, _From, S) -> 380 Appls = S#state.appls, 381 NewAppls = dist_change_update(Appls, NewDistribution), 382 NewS = S#state{appls = NewAppls}, 383 {reply, ok, NewS}; 384 385 386handle_call({get_nodes, AppName}, _From, S) -> 387 Alive = intersection(dist_flat_nodes(S#state.appls, AppName), 388 S#state.known), 389 {reply, Alive, S}; 390 391handle_call(get_known_nodes, _From, S) -> 392 {reply, S#state.known, S}. 393 394 395handle_info({ac_load_application_req, AppName}, S) -> 396 {ok, NewS} = load(AppName, S), 397 ?AC ! {ac_load_application_reply, AppName, ok}, 398 {noreply, NewS}; 399 400handle_info({ac_application_unloaded, AppName}, S) -> 401 {ok, NewS} = unload(AppName, S), 402 {noreply, NewS}; 403 404handle_info({ac_start_application_req, AppName}, S) -> 405 %% We must decide if we or another node should start the application 406 Lock = {?LOCK_ID, self()}, 407 case global:set_lock(Lock, [node()], 0) of 408 true -> 409 S2 = case catch start_appl(AppName, S, reply) of 410 {ok, NewS, _} -> 411 NewS; 412 {error, R} -> 413 ?AC ! {ac_start_application_reply, AppName, {error,R}}, 414 S 415 end, 416 global:del_lock(Lock), 417 {noreply, S2}; 418 false -> 419 send_after(100, {ac_start_application_req, AppName}), 420 {noreply, S} 421 end; 422 423handle_info({ac_application_run, AppName, Res}, S) -> 424 %% We ordered a start, and here's the result. Tell all other nodes. 425 Appls = S#state.appls, 426 Nodes = S#state.known, 427 %% Send this to _all_ known nodes, as any node could sync 428 %% on this app (not only nodes that can run it). 429 send_msg({dist_ac_app_started, node(), AppName, Res}, Nodes), 430 NId = case Res of 431 ok -> local; 432 {error, _R} -> undefined 433 end, 434 {value, Appl} = keysearch(AppName, #appl.name, Appls), 435 %% Check if we have somebody waiting for the takeover result 436 NTReqs = del_t_reqs(AppName, S#state.t_reqs, Res), 437 NAppl = Appl#appl{id = NId}, 438 NAppls = keyreplace(AppName, #appl.name, Appls, NAppl), 439 {noreply, S#state{appls = NAppls, t_reqs = NTReqs}}; 440 441 442handle_info({ac_application_not_run, AppName}, S) -> 443 %% We ordered a stop, and now it has stopped 444 {value, Appl} = keysearch(AppName, #appl.name, Appls = S#state.appls), 445 %% Check if we have somebody waiting for the takeover result; 446 %% if somebody called stop just before takeover was handled, 447 NTReqs = del_t_reqs(AppName, S#state.t_reqs, {error, stopped}), 448 %% Check if we have somebody waiting for stop to return 449 SReqs = filter(fun({Name, From2}) when Name =:= AppName -> 450 gen_server:reply(From2, ok), 451 false; 452 (_) -> 453 true 454 end, S#state.s_reqs), 455 RS = case Appl#appl.id of 456 local -> 457 send_msg({dist_ac_app_stopped, AppName}, S#state.known), 458 S#state.remote_started; 459 {distributed, Node} -> 460 [{Node, AppName} | S#state.remote_started]; 461 _ -> 462 S#state.remote_started 463 end, 464 NAppl = Appl#appl{id = undefined}, 465 NAppls = keyreplace(AppName, #appl.name, Appls, NAppl), 466 {noreply, S#state{appls = NAppls, t_reqs = NTReqs, s_reqs = SReqs, 467 remote_started = RS}}; 468 469handle_info({ac_application_stopped, AppName}, S) -> 470 %% Somebody called application:stop - reset state as it was before 471 %% the application was started. 472 {value, Appl} = keysearch(AppName, #appl.name, Appls = S#state.appls), 473 %% Check if we have somebody waiting for the takeover result; 474 %% if somebody called stop just before takeover was handled, 475 NTReqs = del_t_reqs(AppName, S#state.t_reqs, {error, stopped}), 476 %% Check if we have somebody waiting for stop to return 477 SReqs = filter(fun({Name, From2}) when Name =:= AppName -> 478 gen_server:reply(From2, ok), 479 false; 480 (_) -> 481 true 482 end, S#state.s_reqs), 483 RS = case Appl#appl.id of 484 local -> 485 send_msg({dist_ac_app_stopped, AppName}, S#state.known), 486 S#state.remote_started; 487 {distributed, Node} -> 488 [{Node, AppName} | S#state.remote_started]; 489 _ -> 490 S#state.remote_started 491 end, 492 NAppl = Appl#appl{id = undefined}, 493 NAppls = keyreplace(AppName, #appl.name, Appls, NAppl), 494 Started = lists:delete(AppName, S#state.started), 495 {noreply, S#state{appls = NAppls, started = Started, 496 t_reqs = NTReqs, s_reqs = SReqs, 497 remote_started = RS}}; 498 499 500%%----------------------------------------------------------------- 501%% A new node gets running. 502%% Send him info about our started distributed applications. 503%%----------------------------------------------------------------- 504handle_info({dist_ac_new_node, _Vsn, Node, HisAppls, []}, S) -> 505 Appls = S#state.appls, 506 MyStarted = zf(fun(Appl) when Appl#appl.id =:= local -> 507 {true, {node(), Appl#appl.name}}; 508 (_) -> 509 false 510 end, Appls), 511 {?DIST_AC, Node} ! {dist_ac_new_node, ?vsn, node(), Appls, MyStarted}, 512 NAppls = dist_merge(Appls, HisAppls, Node), 513 {noreply, S#state{appls = NAppls, known = [Node | S#state.known]}}; 514 515handle_info({dist_ac_app_started, Node, Name, Res}, S) -> 516 case {keysearch(Name, #appl.name, S#state.appls), lists:member(Name, S#state.started)} of 517 {{value, Appl}, true} -> 518 Appls = S#state.appls, 519 NId = case Appl#appl.id of 520 _ when element(1, Res) =:= error -> 521 %% Start of appl on some node failed. 522 %% Set Id to undefined. That node will have 523 %% to take some actions, e.g. reboot 524 undefined; 525 {distributed, _} -> 526 %% Another node tookover from some node. Update 527 %% appl list. 528 {distributed, Node}; 529 local -> 530 %% Another node tookover from me; stop my application 531 %% and update the running list. 532 {distributed, Node}; 533 _ -> 534 %% Another node started appl. Update appl list. 535 {distributed, Node} 536 end, 537 _ = ac_started(req, Name, Node), 538 NAppl = Appl#appl{id = NId}, 539 NAppls = keyreplace(Name, #appl.name, Appls, NAppl), 540 TmpWeights = keydelete_all(Name, 1, S#state.tmp_weights), 541 NewS = S#state{appls = NAppls, tmp_weights = TmpWeights}, 542 NPermitReq = req_del_permit_false(NewS#state.p_reqs, Name), 543 case catch req_start_app(NewS#state{p_reqs = NPermitReq}, Name) of 544 {error, R} -> 545 {stop, R}; 546 {ok, NewS2} -> 547 {noreply, NewS2} 548 end; 549 {_, _} -> 550 %% The app has not been started at this node yet; remember this in 551 %% remote started. 552 NRStarted = [{Node, Name} | S#state.remote_started], 553 {noreply, S#state{remote_started = NRStarted}} 554 end; 555 556handle_info({dist_ac_app_stopped, AppName}, S) -> 557 Appls = S#state.appls, 558 case keysearch(AppName, #appl.name, Appls) of 559 false -> 560 RStarted = keydelete(AppName, 2, S#state.remote_started), 561 {noreply, S#state{remote_started = RStarted}}; 562 {value, Appl} -> 563 NAppl = Appl#appl{id = undefined}, 564 NAppls = keyreplace(AppName, #appl.name, Appls, NAppl), 565 RStarted = keydelete(AppName, 2, S#state.remote_started), 566 {noreply, S#state{appls = NAppls, remote_started = RStarted}} 567 end; 568 569handle_info({dist_ac_weight, Name, Weight, Node}, S) -> 570 %% This means another node starts up, and will eventually take over 571 %% this appl. We have a situation like: {Name, [{Node}, node()]} 572 %% Node sends us this msg, and we must respond. It doesn't really 573 %% matter what we send him; but it must be a dist_ac_weight msg. 574 %% Another situation is {Name, [RNode, {node()}, Node]}. 575 %% 576 %% Yet another situation is that the node where Name was running crashed, 577 %% and Node has got the nodedown message, but we haven't. In this case, 578 %% we must send a correct weight to Node. i.e. the same weight that 579 %% we'll send to him later, when we get the nodedown message. 580 case keysearch(Name, #appl.name, S#state.appls) of 581 {value, Appl} -> 582 Id = Appl#appl.id, 583 case Id of 584 run_waiting -> 585 {?DIST_AC, Node} ! {dist_ac_weight, Name, 0, node()}, 586 {noreply, S}; 587 undefined -> 588 {noreply, 589 S#state{tmp_locals = [{Name, Weight, Node} | 590 S#state.tmp_locals]}}; 591 {takeover, _} -> 592 {noreply, 593 S#state{tmp_locals = [{Name, Weight, Node} | 594 S#state.tmp_locals]}}; 595 {failover, _} -> 596 {noreply, 597 S#state{tmp_locals = [{Name, Weight, Node} | 598 S#state.tmp_locals]}}; 599 _ -> 600 MyWeight = get_cached_weight(Name, S), 601 {?DIST_AC, Node} ! {dist_ac_weight, Name, MyWeight, node()}, 602 NTWs = keyreplaceadd(Name, 1, S#state.tmp_weights, 603 {Name, MyWeight}), 604 {noreply, S#state{tmp_weights = NTWs}} 605 end; 606 _ -> 607 {noreply, 608 S#state{tmp_locals = [{Name, Weight, Node} | S#state.tmp_locals]}} 609 end; 610 611%%----------------------------------------------------------------- 612%% A node died. Check if we should takeover some applications. 613%%----------------------------------------------------------------- 614handle_info({nodedown, Node}, S) -> 615 AppNames = dist_get_runnable(S#state.appls), 616 HisAppls = filter(fun(#appl{name = Name, id = {distributed, N}}) 617 when Node =:= N -> lists:member(Name, AppNames); 618 (_) -> false 619 end, 620 S#state.appls), 621 Appls2 = zf(fun(Appl) when Appl#appl.id =:= {distributed, Node} -> 622 case lists:member(Appl#appl.name, AppNames) of 623 true -> 624 {true, Appl#appl{id = {failover, Node}}}; 625 false -> 626 _ = ac_not_running(Appl#appl.name), 627 {true, Appl#appl{id = undefined}} 628 end; 629 (_) -> 630 true 631 end, 632 S#state.appls), 633 RStarted = filter(fun({Node2, _Name}) when Node2 =:= Node -> false; 634 (_) -> true 635 end, 636 S#state.remote_started), 637 Appls3 = dist_del_node(Appls2, Node), 638 {NPermitReq, Appls4, SReqs} = req_del_node(S, Node, Appls3), 639 NKnown = lists:delete(Node, S#state.known), 640 NewS = S#state{appls = Appls4, p_reqs = NPermitReq, known = NKnown, 641 s_reqs = SReqs, 642 remote_started = RStarted}, 643 restart_appls(HisAppls), 644 {noreply, NewS}; 645 646handle_info({dist_ac_app_loaded, Node, Name, HisNodes, Permission, HeKnowsMe}, 647 S) -> 648 Nodes = dist_find_nodes(Appls = S#state.appls, Name), 649 case is_loaded(Name, S) of 650 true -> 651 case equal_nodes(Nodes, HisNodes) of 652 true -> 653 NAppls = dist_update_run(Appls, Name, Node, Permission), 654 if 655 not HeKnowsMe -> 656 %% We've got it loaded, but he doesn't know - 657 %% he's a new node connecting to us. 658 Msg = {dist_ac_app_loaded, node(), Name, 659 Nodes, dist_is_runnable(Appls, Name), true}, 660 {?DIST_AC, Node} ! Msg, 661 ok; 662 true -> 663 ok 664 end, 665 {noreply, S#state{appls = NAppls}}; 666 false -> 667 dist_mismatch(Name, Node) 668 end; 669 false -> 670 Load =[{{Name, Node}, HisNodes, Permission} | S#state.dist_loaded], 671 {noreply, S#state{dist_loaded = Load}} 672 end; 673 674handle_info({dist_ac_app_unloaded, Node, Name}, S) -> 675 Appls = dist_update_run(S#state.appls, Name, Node, undefined), 676 Load = keydelete({Name, Node}, 1, S#state.dist_loaded), 677 {noreply, S#state{appls = Appls, dist_loaded = Load}}; 678 679 680handle_info({dist_ac_new_permission, Node, AppName, false, IsHisApp}, S) -> 681 Appls = dist_update_run(S#state.appls, AppName, Node, false), 682 NewS = S#state{appls =Appls}, 683 case dist_is_runnable(Appls, AppName) of 684 true when IsHisApp -> 685 case catch start_appl(AppName, NewS, req) of 686 {ok, NewS2, _} -> 687 {noreply, NewS2}; 688 {error, _R} -> % if app was permanent, AC will shutdown the node 689 {noreply, NewS} 690 end; 691 _ -> 692 {noreply, NewS} 693 end; 694handle_info({dist_ac_new_permission, Node, AppName, true, _IsHisApp}, S) -> 695 Appls = dist_update_run(S#state.appls, AppName, Node, true), 696 {noreply, S#state{appls = Appls}}; 697 698handle_info({internal_restart_appl, Name}, S) -> 699 case restart_appl(Name, S) of 700 {error, R} -> 701 {stop, {error, R}, S}; 702 NewS -> 703 {noreply, NewS} 704 end; 705 706handle_info(_, S) -> 707 {noreply, S}. 708 709terminate(_Reason, _S) -> 710 ok. 711 712code_change(_OldVsn, State, _Extra) -> 713 {ok, State}. 714 715%%%----------------------------------------------------------------- 716%%% Internal functions 717%%%----------------------------------------------------------------- 718load(AppName, S) -> 719 Appls0 = S#state.appls, 720 %% Get the dist specification for the app on other nodes 721 DistLoaded = get_dist_loaded(AppName, Load1 = S#state.dist_loaded), 722 %% Get the local dist specification 723 Nodes = dist_find_nodes(Appls0, AppName), 724 FNodes = flat_nodes(Nodes), 725 %% Update dists spec with our local permission 726 Permission = get_default_permission(AppName), 727 Appls1 = dist_update_run(Appls0, AppName, node(), Permission), 728 %% Compare the local spec with other nodes's specs 729 %% If equal, update our spec with his current permission 730 {LoadedNodes, Appls2} = 731 mapfoldl( 732 fun({Node, HisNodes, HisPermission}, Appls) -> 733 case equal_nodes(Nodes, HisNodes) of 734 true -> 735 {Node, dist_update_run(Appls, AppName, 736 Node, HisPermission)}; 737 _ -> 738 dist_mismatch(AppName, Node) 739 end 740 end, Appls1, DistLoaded), 741 Load2 = del_dist_loaded(AppName, Load1), 742 %% Tell all Nodes about the new appl loaded, and its permission. 743 foreach(fun(Node) when Node =/= node() -> 744 Msg = {dist_ac_app_loaded, node(), AppName, 745 Nodes, Permission, member(Node, LoadedNodes)}, 746 {?DIST_AC, Node} ! Msg; 747 (_) -> ok 748 end, FNodes), 749 {ok, S#state{appls = Appls2, dist_loaded = Load2}}. 750 751ensure_take_control(AppName, Appls) -> 752 %% Check if this is a new application that we don't control yet 753 case lists:keymember(AppName, #appl.name, Appls) of 754 true -> % we have control 755 ok; 756 false -> % take control! 757 %% Note: this works because this is executed within a 758 %% synchronous call. I.e. we get the control *before* 759 %% application:load returns. (otherwise application:start 760 %% could be called before we got the chance to take control) 761 %% The only reason we have to bother about this is because 762 %% we have to be backwards compatible in the sense that all 763 %% apps don't have to be specified in the 'distributed' parameter, 764 %% but may be implicitly 'distributed' by a call to 765 %% application:load. 766 application_controller:control_application(AppName) 767 end. 768 769unload(AppName, S) -> 770 Appls = S#state.appls, 771 Nodes = dist_flat_nodes(Appls, AppName), 772 %% Tell all ACs in DistNodes about the unloaded appl 773 Msg = {dist_ac_app_unloaded, node(), AppName}, 774 send_msg(Msg, Nodes), 775 {value, Appl} = keysearch(AppName, #appl.name, Appls), 776 NAppl = Appl#appl{id = undefined, run = []}, 777 {ok, S#state{appls = keyreplace(AppName, #appl.name, Appls, NAppl)}}. 778 779start_appl(AppName, S, Type) -> 780 %% Get nodes, and check if App is loaded on all involved nodes. 781 %% If it is loaded everywhere, we know that we have the same picture 782 %% of the nodes; otherwise the load wouldn't have succeeded. 783 Appl = case keysearch(AppName, #appl.name, Appls = S#state.appls) of 784 {value, A} -> A; 785 _ -> throw({error, {unknown_application, AppName}}) 786 end, 787 case Appl#appl.id of 788 local -> 789 %% UW 990913: we've already started the app 790 %% this could happen if ac_start_application_req was resent. 791 {ok,S,false}; 792 _ -> 793 {Id, IsWaiting} = case dist_get_all_nodes(Appl) of 794 {ok, DistNodes, PermittedNodes} -> 795 start_distributed(Appl, AppName, DistNodes, 796 PermittedNodes, S, Type); 797 Error -> throw(Error) 798 end, 799 NAppl = Appl#appl{id = Id}, 800 NAppls = keyreplaceadd(AppName, #appl.name, Appls, NAppl), 801 {ok, NewS} = req_start_app(S#state{appls = NAppls}, AppName), 802 TmpLocals = keydelete_all(AppName, 1, NewS#state.tmp_locals), 803 TmpWeights = keydelete_all(AppName, 1, NewS#state.tmp_weights), 804 RStarted = keydelete(AppName, 2, S#state.remote_started), 805 Started = replaceadd(AppName, NewS#state.started), 806 {ok, 807 NewS#state{started = Started, tmp_locals = TmpLocals, 808 tmp_weights = TmpWeights, remote_started = RStarted}, 809 IsWaiting} 810 end. 811 812 813start_distributed(Appl, Name, Nodes, PermittedNodes, S, Type) -> 814 case find_start_node(Nodes, PermittedNodes, Name, S) of 815 {ok, Node} when Node =:= node() -> 816 _ = case Appl#appl.id of 817 {failover, FoNode} when Type =:= req -> 818 ac_failover(Name, FoNode, undefined); 819 {distributed, Node2} when Type =:= req -> 820 ac_takeover(req, Name, Node2, undefined); 821 _ when Type =:= reply -> 822 case lists:keysearch(Name, 2, S#state.remote_started) of 823 {value, {Node3, _}} -> 824 ac_takeover(reply, Name, Node3, undefined); 825 _ -> 826 ac_start_it(Type, Name) 827 end; 828 _ -> 829 ac_start_it(Type, Name) 830 end, 831 {run_waiting, true}; 832 {already_started, Node} -> 833 _ = ac_started(Type, Name, Node), 834 {{distributed, Node}, false}; 835 {ok, Node} -> 836 case keysearch(Name, #appl.name, S#state.appls) of 837 {value, #appl{id = {distributed, Node}}} -> 838 _ = ac_started(Type, Name, Node), 839 {{distributed, Node}, false}; 840 _ -> 841 wait_dist_start(Node, Appl, Name, Nodes, 842 PermittedNodes, S, Type) 843 end; 844 not_started -> 845 wait_dist_start2(Appl, Name, Nodes, PermittedNodes, S, Type); 846 no_permission -> 847 _ = ac_not_started(Type, Name), 848 {undefined, false} 849 end. 850 851wait_dist_start(Node, Appl, Name, Nodes, PermittedNodes, S, Type) -> 852 monitor_node(Node, true), 853 receive 854 {dist_ac_app_started, Node, Name, ok} -> 855 _ = ac_started(Type, Name, Node), 856 monitor_node(Node, false), 857 {{distributed, Node}, false}; 858 {dist_ac_app_started, Node, Name, {error, R}} -> 859 _ = ac_error(Type, Name, {Node, R}), 860 monitor_node(Node, false), 861 {Appl#appl.id, false}; 862 {dist_ac_weight, Name, _Weigth, Node} -> 863 %% This is the situation: {Name, [RNode, {Node}, node()]} 864 %% and permit(false) is called on RNode, and we sent the 865 %% weigth first. Node handled it in handle_info, and 866 %% now we must send him a weigth msg. We can use any weigth; 867 %% he wins anyway. 868 monitor_node(Node, false), 869 {?DIST_AC, Node} ! 870 {dist_ac_weight, Name, get_cached_weight(Name, S), node()}, 871 wait_dist_start(Node, Appl, Name, Nodes, PermittedNodes, S, Type); 872 {nodedown, Node} -> 873 monitor_node(Node, false), 874 TmpLocals = 875 filter(fun({Name2, _Weight, Node2}) when Node2 =:= Node, 876 Name2 =:= Name -> false; 877 (_) -> true 878 end, 879 S#state.tmp_locals), 880 NewS = S#state{tmp_locals = TmpLocals}, 881 start_distributed(Appl, Name, Nodes, 882 lists:delete(Node, PermittedNodes), NewS, Type) 883 end. 884 885wait_dist_start2(Appl, Name, Nodes, PermittedNodes, S, Type) -> 886 receive 887 {dist_ac_app_started, Node, Name, ok} -> 888 _ = ac_started(Type, Name, Node), 889 {{distributed, Node}, false}; 890 {dist_ac_app_started, Node, Name, {error, R}} -> 891 _ = ac_error(Type, Name, {Node, R}), 892 {Appl#appl.id, false}; 893 {nodedown, Node} -> 894 %% A node went down, try to start the app again - there may not 895 %% be any more nodes to wait for. 896 TmpLocals = 897 filter(fun({Name2, _Weight, Node2}) when Node2 =:= Node, 898 Name2 =:= Name -> false; 899 (_) -> true 900 end, 901 S#state.tmp_locals), 902 NewS = S#state{tmp_locals = TmpLocals}, 903 start_distributed(Appl, Name, Nodes, 904 lists:delete(Node, PermittedNodes), NewS, Type) 905 end. 906 907 908ac_start_it(reply, Name) -> 909 ?AC ! {ac_start_application_reply, Name, start_it}; 910ac_start_it(req, Name) -> 911 ?AC ! {ac_change_application_req, Name, start_it}. 912 913ac_started(reply, Name, Node) -> 914 ?AC ! {ac_start_application_reply, Name, {started, Node}}; 915ac_started(req, Name, Node) -> 916 ?AC ! {ac_change_application_req, Name, {started, Node}}. 917 918ac_error(reply, Name, Error) -> 919 ?AC ! {ac_start_application_reply, Name, {error, Error}}; 920ac_error(req, _Name, _Error) -> 921 ok. 922 923ac_not_started(reply, Name) -> 924 ?AC ! {ac_start_application_reply, Name, not_started}; 925ac_not_started(req, Name) -> 926 ?AC ! {ac_change_application_req, Name, stop_it}. 927 928ac_stop_it(Name) -> 929 ?AC ! {ac_change_application_req, Name, stop_it}. 930 931ac_takeover(reply, Name, Node, _RestartType) -> 932 ?AC ! {ac_start_application_reply, Name, {takeover, Node}}; 933ac_takeover(req, Name, Node, RestartType) -> 934 ?AC ! {ac_change_application_req, Name, 935 {takeover, Node, RestartType}}. 936 937ac_failover(Name, Node, RestartType) -> 938 ?AC ! {ac_change_application_req, Name, 939 {failover, Node, RestartType}}. 940 941ac_not_running(Name) -> 942 ?AC ! {ac_change_application_req, Name, not_running}. 943 944restart_appls(Appls) -> 945 foreach(fun(Appl) -> 946 AppName = Appl#appl.name, 947 send_after(Appl#appl.restart_time, 948 {internal_restart_appl, AppName}) 949 end, lists:reverse(Appls)). 950 951restart_appl(AppName, S) -> 952 case keysearch(AppName, #appl.name, S#state.appls) of 953 {value, Appl} when element(1, Appl#appl.id) =:= failover -> 954 case catch start_appl(AppName, S, req) of 955 {ok, NewS, _} -> 956 NewS; 957 {error, R} -> 958 error_msg("Error when restarting application ~p: ~p~n", 959 [AppName, R]), 960 S 961 end; 962 _ -> 963 S 964 end. 965 966%% permit(ShouldBeRunning, IsRunning, ...) 967permit(false, {value, #appl{id = undefined}}, _AppName, _From, S, _LockId) -> 968 {reply, ok, S}; % It's not running 969permit(false, {value, #appl{id = Id}}, _AppName, _From, S, _LockId) 970 when element(1, Id) =:= distributed -> 971 %% It is running at another node already 972 {reply, ok, S}; 973permit(false, {value, _}, AppName, From, S, _LockId) -> 974 %% It is a distributed application 975 %% Check if there is any runnable node 976 case dist_get_runnable_nodes(S#state.appls, AppName) of 977 [] -> 978 %% There is no runnable node; stop application 979 _ = ac_stop_it(AppName), 980 SReqs = [{AppName, From} | S#state.s_reqs], 981 {noreply, S#state{s_reqs = SReqs}}; 982 Nodes -> 983 %% Delete all outstanding 'permit true' requests. 984 PR = req_del_permit_true(S#state.p_reqs, AppName), 985 NPReqs = [{From, AppName, false, Nodes} | PR], 986 {noreply, S#state{p_reqs = NPReqs}} 987 end; 988permit(true, {value, #appl{id = local}}, _AppName, _From, S, _LockId) -> 989 {reply, ok, S}; 990permit(true, _, AppName, From, S, LockId) -> 991 case catch start_appl(AppName, S, req) of 992 {_ErrorTag, {not_running, App}} -> 993 %% Delete all outstanding 'permit false' requests 994 PR = req_del_permit_false(S#state.p_reqs, AppName), 995 NPReqs = [{false, AppName, true, App} | PR], 996 {reply, ok, S#state{p_reqs = NPReqs}}; 997 {ok, NewS, true} -> 998 %% We have ordered a start or a takeover; we must not return 999 %% until the app is running. 1000 TR = NewS#state.t_reqs, 1001 %% Delete the lock, so others may start the app 1002 global:del_lock(LockId), 1003 {noreply, NewS#state{t_reqs = [{AppName, From} | TR]}}; 1004 {ok, _S, false} -> 1005 %% Application should be started, but at another node 1006 %% State remains the same 1007 {reply, ok, S}; 1008 {_ErrorTag, R} -> 1009 {stop, R, {error, R}, S} 1010 end. 1011 1012do_start_appls(StartApps, S) -> 1013 SortedStartApps = StartApps, 1014 Appls = S#state.appls, 1015 {ok, foldl( 1016 fun(AppName, NewS) -> 1017 case catch start_appl(AppName, NewS, req) of 1018 {error, R} -> 1019 throw({{error, NewS}, R}); 1020 {ok, NewS2, _} -> 1021 NewS2 1022 end 1023 end, S#state{appls = Appls}, lists:reverse(SortedStartApps))}. 1024 1025%%----------------------------------------------------------------- 1026%% Nodes = [node() | {node(), ..., node()}] 1027%% A list in priority order. If it is a tuple, we may pick any of 1028%% them. This decision is made by all nodes in the list, and all 1029%% nodes choose the same. This is accomplished in the following 1030%% way: all Nodes send to all others a msg which tells how many 1031%% applications each node has started. The one with least no of 1032%% appls starts this one. 1033%%----------------------------------------------------------------- 1034find_start_node(Nodes, PermittedNodes, Name, S) -> 1035 AllNodes = intersection(flat_nodes(Nodes), PermittedNodes), 1036 case lists:member(node(), AllNodes) of 1037 true -> 1038 Weight = get_cached_weight(Name, S), 1039 find_start_node(Nodes, Name, S, Weight, AllNodes); 1040 false -> 1041 case keysearch(Name, 2, S#state.remote_started) of 1042 {value, {Node, _Name}} -> 1043 {already_started, Node}; 1044 _ when AllNodes =/= [] -> 1045 not_started; 1046 _ -> 1047 no_permission 1048 end 1049 end. 1050 1051find_start_node([AnyNodes | Nodes], Name, S, Weight, AllNodes) 1052 when is_tuple(AnyNodes) -> 1053 case find_any_node(tuple_to_list(AnyNodes), Name, S, Weight, AllNodes) of 1054 false -> find_start_node(Nodes, Name, S, Weight, AllNodes); 1055 Res -> Res 1056 end; 1057find_start_node([Node | Nodes], Name, S, Weight, AllNodes) -> 1058 case lists:member(Node, AllNodes) of 1059 true -> 1060 case keysearch(Name, #appl.name, S#state.appls) of 1061 {value, #appl{id = {distributed, Node}}} -> 1062 {already_started, Node}; 1063 _ -> 1064 case keysearch(Name, 2, S#state.remote_started) of 1065 {value, {Node, _Name}} -> 1066 {already_started, Node}; 1067 _ -> 1068 {ok, Node} 1069 end 1070 end; 1071 false -> find_start_node(Nodes, Name, S, Weight, AllNodes) 1072 end; 1073find_start_node([], _Name, _S, _Weight, _AllNodes) -> 1074 not_started. 1075 1076%%----------------------------------------------------------------- 1077%% First of all, check if the application is already running 1078%% somewhere in AnyNodes; in that case we shall not move it! 1079%%----------------------------------------------------------------- 1080find_any_node(AnyNodes, Name, S, Weight, AllNodes) -> 1081 case check_running(Name, S, intersection(AnyNodes, AllNodes)) of 1082 {already_started, Node} -> {already_started, Node}; 1083 false -> 1084 %% Synchronize with all other nodes. 1085 send_nodes(AllNodes, {dist_ac_weight, Name, Weight, node()}), 1086 Answers = [{Weight, node()} | 1087 collect_answers(AllNodes, Name, S, [])], 1088 %% Make a decision (the same at every node) (smallest weight wins) 1089 find_alive_node(lists:sort(Answers), 1090 intersection(AnyNodes, S#state.known)) 1091 end. 1092 1093%%----------------------------------------------------------------- 1094%% Check if another node started the appl before we got alive. 1095%% If so, check if the node is one of AnyNodes. 1096%%----------------------------------------------------------------- 1097check_running(Name, #state{remote_started = RStarted, 1098 appls = Appls}, AnyNodes) -> 1099 case keysearch(Name, 2, RStarted) of 1100 {value, {Node, _Name}} -> 1101 case lists:member(Node, AnyNodes) of 1102 true -> {already_started, Node}; 1103 false -> false 1104 end; 1105 false -> 1106 case keysearch(Name, #appl.name, Appls) of 1107 {value, #appl{id = {distributed, Node}}} -> 1108 case lists:member(Node, AnyNodes) of 1109 true -> {already_started, Node}; 1110 false -> false 1111 end; 1112 _ -> 1113 false 1114 end 1115 end. 1116 1117find_alive_node([{_, Node} | Nodes], AliveNodes) -> 1118 case lists:member(Node, AliveNodes) of 1119 true -> {ok, Node}; 1120 false -> find_alive_node(Nodes, AliveNodes) 1121 end; 1122find_alive_node([], _AliveNodes) -> 1123 false. 1124 1125%%----------------------------------------------------------------- 1126%% First, check if the node's msg is buffered (received in our 1127%% main loop). Otherwise, wait for msg or nodedown. 1128%% We have sent the dist_ac_weight message, and will wait for it 1129%% to be received here (or a nodedown). This implies that a 1130%% dist_ac must *always* be prepared to get this messages, and to 1131%% send it to us. 1132%%----------------------------------------------------------------- 1133collect_answers([Node | Nodes], Name, S, Res) when Node =/= node() -> 1134 case keysearch(Node, 3, S#state.tmp_locals) of 1135 {value, {Name, Weight, Node}} -> 1136 collect_answers(Nodes, Name, S, [{Weight, Node} | Res]); 1137 _ -> 1138 monitor_node(Node, true), 1139 receive 1140 {dist_ac_weight, Name, Weight, Node} -> 1141 monitor_node(Node, false), 1142 collect_answers(Nodes, Name, S, [{Weight, Node} | Res]); 1143 {nodedown, Node} -> 1144 monitor_node(Node, false), 1145 collect_answers(Nodes, Name, S, Res) 1146 end 1147 end; 1148collect_answers([_ThisNode | Nodes], Name, S, Res) -> 1149 collect_answers(Nodes, Name, S, Res); 1150collect_answers([], _Name, _S, Res) -> 1151 Res. 1152 1153send_nodes(Nodes, Msg) -> 1154 FlatNodes = flat_nodes(Nodes), 1155 foreach(fun(Node) when Node =/= node() -> {?DIST_AC, Node} ! Msg; 1156 (_ThisNode) -> ok 1157 end, FlatNodes). 1158 1159send_after(Time, Msg) when is_integer(Time), Time >= 0 -> 1160 _Pid = spawn_link(?MODULE, send_timeout, [self(), Time, Msg]), 1161 ok; 1162send_after(_,_) -> % infinity 1163 ok. 1164 1165send_timeout(To, Time, Msg) -> 1166 receive 1167 after Time -> To ! Msg 1168 end. 1169 1170send_msg(Msg, Nodes) -> 1171 foreach(fun(Node) when Node =/= node() -> {?DIST_AC, Node} ! Msg; 1172 (_) -> ok 1173 end, Nodes). 1174 1175replaceadd(Item, List) -> 1176 case member(Item, List) of 1177 true -> List; 1178 false -> [Item | List] 1179 end. 1180 1181keyreplaceadd(Key, Pos, List, New) -> 1182 case lists:keymember(Key, Pos, List) of 1183 true -> lists:keyreplace(Key, Pos, List, New); 1184 false -> [New | List] 1185 end. 1186 1187keydelete_all(Key, N, [H|T]) when element(N, H) =:= Key -> 1188 keydelete_all(Key, N, T); 1189keydelete_all(Key, N, [H|T]) -> 1190 [H|keydelete_all(Key, N, T)]; 1191keydelete_all(_Key, _N, []) -> []. 1192 1193-ifdef(NOTUSED). 1194keysearchdelete(Key, Pos, List) -> 1195 ksd(Key, Pos, List, []). 1196 1197ksd(Key, Pos, [H | T], Rest) when element(Pos, H) =:= Key -> 1198 {value, H, Rest ++ T}; 1199ksd(Key, Pos, [H | T], Rest) -> 1200 ksd(Key, Pos, T, [H | Rest]); 1201ksd(_Key, _Pos, [], _Rest) -> 1202 false. 1203 1204get_new_appl(Name, [{application, Name, App} | _]) -> 1205 {ok, {application, Name, App}}; 1206get_new_appl(Name, [_ | T]) -> get_new_appl(Name, T); 1207get_new_appl(Name, []) -> false. 1208-endif. 1209 1210equal_nodes([H | T1], [H | T2]) when is_atom(H) -> 1211 equal_nodes(T1, T2); 1212equal_nodes([H1 | T1], [H2 | T2]) when is_tuple(H1), is_tuple(H2) -> 1213 case equal(tuple_to_list(H1), tuple_to_list(H2)) of 1214 true -> equal_nodes(T1, T2); 1215 false -> false 1216 end; 1217equal_nodes([], []) -> true; 1218equal_nodes(_, _) -> false. 1219 1220equal([H | T] , S) -> 1221 case lists:member(H, S) of 1222 true -> equal(T, lists:delete(H, S)); 1223 false -> false 1224 end; 1225equal([], []) -> true; 1226equal(_, _) -> false. 1227 1228flat_nodes(Nodes) when is_list(Nodes) -> 1229 foldl(fun(Node, Res) when is_atom(Node) -> [Node | Res]; 1230 (Tuple, Res) when is_tuple(Tuple) -> tuple_to_list(Tuple) ++ Res 1231 end, [], Nodes); 1232flat_nodes(Nodes) -> 1233 throw({error, {badarg, Nodes}}). 1234 1235get_cached_weight(Name, S) -> 1236 case lists:keysearch(Name, 1, S#state.tmp_weights) of 1237 {value, {_, W}} -> W; 1238 _ -> get_weight() 1239 end. 1240 1241%% Simple weight; just count the number of applications running. 1242get_weight() -> 1243 length(application:which_applications()). 1244 1245get_dist_loaded(Name, [{{Name, Node}, HisNodes, Permission} | T]) -> 1246 [{Node, HisNodes, Permission} | get_dist_loaded(Name, T)]; 1247get_dist_loaded(Name, [_H | T]) -> 1248 get_dist_loaded(Name, T); 1249get_dist_loaded(_Name, []) -> 1250 []. 1251 1252del_dist_loaded(Name, [{{Name, _Node}, _HisNodes, _Permission} | T]) -> 1253 del_dist_loaded(Name, T); 1254del_dist_loaded(Name, [H | T]) -> 1255 [H | del_dist_loaded(Name, T)]; 1256del_dist_loaded(_Name, []) -> 1257 []. 1258 1259req_start_app(State, Name) -> 1260 {ok, foldl( 1261 fun({false, AppName, true, Name2}, S) when Name =:= Name2 -> 1262 PR = keydelete(AppName, 2, S#state.p_reqs), 1263 NS = S#state{p_reqs = PR}, 1264 case catch do_start_appls([AppName], NS) of 1265 {_ErrorTag, {not_running, App}} -> 1266 NRequests = [{false, AppName, true, App} | PR], 1267 S#state{p_reqs = NRequests}; 1268 {ok, NewS} -> 1269 NewS; 1270 {_ErrorTag, R} -> 1271 throw({error, R}) 1272 end; 1273 (_, S) -> 1274 S 1275 end, State, State#state.p_reqs)}. 1276 1277 1278req_del_permit_true(Reqs, Name) -> 1279 filter(fun({From, Name2, true, _}) when Name2 =:= Name -> 1280 gen_server:reply(From, ok), 1281 false; 1282 (_) -> 1283 true 1284 end, Reqs). 1285 1286req_del_permit_false(Reqs, Name) -> 1287 filter(fun({From, Name2, false, _Nodes}) when Name2 =:= Name -> 1288 gen_server:reply(From, ok), 1289 false; 1290 (_) -> 1291 true 1292 end, Reqs). 1293 1294req_del_node(S, Node, Appls) -> 1295 check_waiting(S#state.p_reqs, S, Node, Appls, [], S#state.s_reqs). 1296 1297del_t_reqs(AppName, TReqs, Res) -> 1298 lists:filter(fun({AN, From}) when AppName =:= AN -> 1299 gen_server:reply(From, Res), 1300 false; 1301 (_) -> 1302 true 1303 end, 1304 TReqs). 1305 1306 1307check_waiting([{From, AppName, false, Nodes} | Reqs], 1308 S, Node, Appls, Res, SReqs) -> 1309 case lists:delete(Node, Nodes) of 1310 [] -> 1311 _ = ac_stop_it(AppName), 1312 NSReqs = [{AppName, From} | SReqs], 1313 check_waiting(Reqs, Node, S, Appls, Res, NSReqs); 1314 NNodes -> 1315 check_waiting(Reqs, Node, S, Appls, 1316 [{From, AppName, false, NNodes} | Res], SReqs) 1317 end; 1318check_waiting([H | Reqs], S, Node, Appls, Res, SReqs) -> 1319 check_waiting(Reqs, Node, S, Appls, [H | Res], SReqs); 1320check_waiting([], _Node, _S, Appls, Res, SReqs) -> 1321 {Res, Appls, SReqs}. 1322 1323intersection([], _) -> 1324 []; 1325intersection(_, []) -> 1326 []; 1327intersection(L1, L2) -> 1328 L1 -- (L1 -- L2). 1329 1330get_default_permission(AppName) -> 1331 case application:get_env(kernel, permissions) of 1332 {ok, Permissions} -> 1333 case keysearch(AppName, 1, Permissions) of 1334 {value, {_, true}} -> true; 1335 {value, {_, false}} -> false; 1336 {value, {_, X}} -> exit({bad_permission, {AppName, X}}); 1337 false -> true 1338 end; 1339 undefined -> true 1340 end. 1341 1342%%----------------------------------------------------------------- 1343%% ADT dist() - info on how an application is distributed 1344%% dist() = [{AppName, Time, DistNodes, [{Node, Runnable}]}] 1345%% Time = int() >= 0 | infinity 1346%% Nodes = [node() | {node()...}] 1347%% Runnable = true | false | undefined 1348%% An appl may not be started if any Runnable is undefined; 1349%% i.e. the appl must be loaded on all Nodes. 1350%%----------------------------------------------------------------- 1351dist_check([{AppName, Nodes} | T]) -> 1352 P = get_default_permission(AppName), 1353 [#appl{name = AppName, nodes = Nodes, run = [{node(), P}]} | dist_check(T)]; 1354dist_check([{AppName, Time, Nodes} | T]) when is_integer(Time), Time >= 0 -> 1355 P = get_default_permission(AppName), 1356 [#appl{name = AppName, restart_time = Time, nodes = Nodes, 1357 run = [{node(), P}]} | dist_check(T)]; 1358dist_check([{AppName, infinity, Nodes} | T]) -> 1359 P = get_default_permission(AppName), 1360 [#appl{name = AppName, restart_time = infinity, 1361 nodes = Nodes, run = [{node(), P}]} | 1362 dist_check(T)]; 1363dist_check([_ | T]) -> 1364 dist_check(T); 1365dist_check([]) -> 1366 []. 1367 1368dist_take_control(Appls) -> 1369 foreach(fun(#appl{name = AppName}) -> 1370 application_controller:control_application(AppName) 1371 end, Appls). 1372 1373dist_replace(default, _Name, Appls) -> Appls; 1374dist_replace({AppName, Nodes}, AppName, Appls) -> 1375 Run = [{Node, undefined} || Node <- flat_nodes(Nodes)], 1376 keyreplaceadd(AppName, #appl.name, Appls, 1377 #appl{name = AppName, restart_time = 0, 1378 nodes = Nodes, run = Run}); 1379dist_replace({AppName, Time, Nodes}, AppName, Appls) 1380 when is_integer(Time), Time >= 0 -> 1381 Run = [{Node, undefined} || Node <- flat_nodes(Nodes)], 1382 keyreplaceadd(AppName, #appl.name, Appls, 1383 #appl{name = AppName, restart_time = Time, 1384 nodes = Nodes, run = Run}); 1385dist_replace(Bad, _Name, _Appls) -> 1386 throw({error, {bad_distribution_spec, Bad}}). 1387 1388dist_update_run(Appls, AppName, Node, Permission) -> 1389 map(fun(Appl) when Appl#appl.name =:= AppName -> 1390 Run = Appl#appl.run, 1391 NRun = keyreplaceadd(Node, 1, Run, {Node, Permission}), 1392 Appl#appl{run = NRun}; 1393 (Appl) -> 1394 Appl 1395 end, Appls). 1396 1397 1398 1399dist_change_update(Appls, []) -> 1400 Appls; 1401dist_change_update(Appls, [{AppName, NewNodes} | NewDist]) -> 1402 NewAppls = do_dist_change_update(Appls, AppName, 0, NewNodes), 1403 dist_change_update(NewAppls, NewDist); 1404dist_change_update(Appls, [{AppName, NewTime, NewNodes} | NewDist]) -> 1405 NewAppls = do_dist_change_update(Appls, AppName, NewTime, NewNodes), 1406 dist_change_update(NewAppls, NewDist). 1407 1408do_dist_change_update(Appls, AppName, NewTime, NewNodes) -> 1409 map(fun(Appl) when Appl#appl.name =:= AppName -> 1410 Appl#appl{restart_time = NewTime, nodes = NewNodes}; 1411 (Appl) -> 1412 Appl 1413 end, Appls). 1414 1415%% Merge his Permissions with mine. 1416dist_merge(MyAppls, HisAppls, HisNode) -> 1417 zf(fun(Appl) -> 1418 #appl{name = AppName, run = Run} = Appl, 1419% #appl{name = AppName, nodes = Nodes, run = Run} = Appl, 1420% HeIsMember = lists:member(HisNode, flat_nodes(Nodes)), 1421 HeIsMember = true, 1422 case keysearch(AppName, #appl.name, HisAppls) of 1423 {value, #appl{run = HisRun}} when HeIsMember -> 1424 case keysearch(HisNode, 1, HisRun) of 1425 {value, Val} -> % He has it loaded 1426 NRun = keyreplaceadd(HisNode, 1, Run, Val), 1427 {true, Appl#appl{run = NRun}}; 1428 false -> % He hasn't loaded it yet 1429 Val = {HisNode, undefined}, 1430 {true, Appl#appl{run = [Val | Run]}} 1431 end; 1432 _ -> 1433 true 1434 end 1435 end, MyAppls). 1436 1437dist_get_runnable_nodes(Appls, AppName) -> 1438 case keysearch(AppName, #appl.name, Appls) of 1439 {value, #appl{run = Run}} -> 1440 zf(fun({Node, true}) -> {true, Node}; 1441 (_) -> false 1442 end, Run); 1443 false -> 1444 [] 1445 end. 1446 1447dist_is_runnable(Appls, AppName) -> 1448 case keysearch(AppName, #appl.name, Appls) of 1449 {value, #appl{run = Run}} -> 1450 case keysearch(node(), 1, Run) of 1451 {value, {_, true}} -> true; 1452 _ -> false 1453 end; 1454 false -> 1455 false 1456 end. 1457 1458is_loaded(AppName, #state{appls = Appls}) -> 1459 case keysearch(AppName, #appl.name, Appls) of 1460 {value, #appl{run = Run}} -> 1461 case keysearch(node(), 1, Run) of 1462 {value, {_Node, undefined}} -> false; 1463 {value, _} -> true; 1464 false -> false 1465 end; 1466 false -> 1467 false 1468 end. 1469 1470dist_get_runnable(Appls) -> 1471 zf(fun(#appl{name = AppName, run = Run}) -> 1472 case keysearch(node(), 1, Run) of 1473 {value, {_, true}} -> {true, AppName}; 1474 _ -> false 1475 end 1476 end, Appls). 1477 1478dist_get_all_nodes(#appl{name = AppName, nodes = Nodes, run = Run}) -> 1479 {Res, BadNodes} = check_nodes(Run, [], []), 1480 case intersection(BadNodes, erlang:nodes(connected)) of 1481 [] -> {ok, Nodes, Res}; 1482 _ -> {error, {app_not_loaded, AppName, BadNodes}} 1483 end. 1484 1485check_nodes([{Node, undefined} | T], Res, BadNodes) -> 1486 check_nodes(T, Res, [Node | BadNodes]); 1487check_nodes([{Node, true} | T], Res, BadNodes) -> 1488 check_nodes(T, [Node | Res], BadNodes); 1489check_nodes([{_Node, false} | T], Res, BadNodes) -> 1490 check_nodes(T, Res, BadNodes); 1491check_nodes([], Res, BadNodes) -> 1492 {Res, BadNodes}. 1493 1494-ifdef(NOTUSED). 1495dist_find_time([#appl{name = Name, restart_time = Time} |_], Name) -> Time; 1496dist_find_time([_ | T], Name) -> dist_find_time(T, Name); 1497dist_find_time([], Name) -> 0. 1498-endif. 1499 1500%% Find all nodes that can run the app (even if they're not permitted 1501%% to right now). 1502dist_find_nodes([#appl{name = Name, nodes = Nodes} |_], Name) -> Nodes; 1503dist_find_nodes([_ | T], Name) -> dist_find_nodes(T, Name); 1504dist_find_nodes([], _Name) -> []. 1505 1506dist_flat_nodes(Appls, Name) -> 1507 flat_nodes(dist_find_nodes(Appls, Name)). 1508 1509dist_del_node(Appls, Node) -> 1510 map(fun(Appl) -> 1511 NRun = filter(fun({N, _Runnable}) when N =:= Node -> false; 1512 (_) -> true 1513 end, Appl#appl.run), 1514 Appl#appl{run = NRun} 1515 end, Appls). 1516 1517valid_restart_type(permanent) -> true; 1518valid_restart_type(temporary) -> true; 1519valid_restart_type(transient) -> true; 1520valid_restart_type(_RestartType) -> false. 1521 1522dist_mismatch(AppName, Node) -> 1523 error_msg("Distribution mismatch for application \"~p\" on nodes ~p and ~p~n", 1524 [AppName, node(), Node]), 1525 exit({distribution_mismatch, AppName, Node}). 1526 1527%error_msg(Format) when is_list(Format) -> 1528% error_msg(Format, []). 1529 1530error_msg(Format, ArgList) when is_list(Format), is_list(ArgList) -> 1531 error_logger:error_msg("dist_ac on node ~p:~n" ++ Format, [node()|ArgList]). 1532 1533%info_msg(Format) when is_list(Format) -> 1534% info_msg(Format, []). 1535 1536%info_msg(Format, ArgList) when is_list(Format), is_list(ArgList) -> 1537% error_logger:info_msg("dist_ac on node ~p:~n" ++ Format, [node()|ArgList]). 1538