1%% This Source Code Form is subject to the terms of the Mozilla Public 2%% License, v. 2.0. If a copy of the MPL was not distributed with this 3%% file, You can obtain one at https://mozilla.org/MPL/2.0/. 4%% 5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved. 6%% 7 8-module(rabbit_node_monitor). 9 10%% Transitional step until we can require Erlang/OTP 21 and 11%% use the now recommended try/catch syntax for obtaining the stack trace. 12-compile(nowarn_deprecated_function). 13 14-behaviour(gen_server). 15 16-export([start_link/0]). 17-export([running_nodes_filename/0, 18 cluster_status_filename/0, coordination_filename/0, 19 quorum_filename/0, default_quorum_filename/0, 20 prepare_cluster_status_files/0, 21 write_cluster_status/1, read_cluster_status/0, 22 update_cluster_status/0, reset_cluster_status/0]). 23-export([notify_node_up/0, notify_joined_cluster/0, notify_left_cluster/1]). 24-export([partitions/0, partitions/1, status/1, subscribe/1]). 25-export([pause_partition_guard/0]). 26-export([global_sync/0]). 27 28%% gen_server callbacks 29-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, 30 code_change/3]). 31 32 %% Utils 33-export([all_rabbit_nodes_up/0, run_outside_applications/2, ping_all/0, 34 alive_nodes/1, alive_rabbit_nodes/1]). 35 36-define(SERVER, ?MODULE). 37-define(NODE_REPLY_TIMEOUT, 5000). 38-define(RABBIT_UP_RPC_TIMEOUT, 2000). 39-define(RABBIT_DOWN_PING_INTERVAL, 1000). 40 41-record(state, {monitors, partitions, subscribers, down_ping_timer, 42 keepalive_timer, autoheal, guid, node_guids}). 43 44%%---------------------------------------------------------------------------- 45%% Start 46%%---------------------------------------------------------------------------- 47 48-spec start_link() -> rabbit_types:ok_pid_or_error(). 49 50start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). 51 52%%---------------------------------------------------------------------------- 53%% Cluster file operations 54%%---------------------------------------------------------------------------- 55 56%% The cluster file information is kept in two files. The "cluster 57%% status file" contains all the clustered nodes and the disc nodes. 58%% The "running nodes file" contains the currently running nodes or 59%% the running nodes at shutdown when the node is down. 60%% 61%% We strive to keep the files up to date and we rely on this 62%% assumption in various situations. Obviously when mnesia is offline 63%% the information we have will be outdated, but it cannot be 64%% otherwise. 65 66-spec running_nodes_filename() -> string(). 67 68running_nodes_filename() -> 69 filename:join(rabbit_mnesia:dir(), "nodes_running_at_shutdown"). 70 71-spec cluster_status_filename() -> string(). 72 73cluster_status_filename() -> 74 filename:join(rabbit_mnesia:dir(), "cluster_nodes.config"). 75 76coordination_filename() -> 77 filename:join(rabbit_mnesia:dir(), "coordination"). 78 79quorum_filename() -> 80 ra_env:data_dir(). 81 82default_quorum_filename() -> 83 filename:join(rabbit_mnesia:dir(), "quorum"). 84 85-spec prepare_cluster_status_files() -> 'ok' | no_return(). 86 87prepare_cluster_status_files() -> 88 rabbit_mnesia:ensure_mnesia_dir(), 89 RunningNodes1 = case try_read_file(running_nodes_filename()) of 90 {ok, [Nodes]} when is_list(Nodes) -> Nodes; 91 {ok, Other} -> corrupt_cluster_status_files(Other); 92 {error, enoent} -> [] 93 end, 94 ThisNode = [node()], 95 %% The running nodes file might contain a set or a list, in case 96 %% of the legacy file 97 RunningNodes2 = lists:usort(ThisNode ++ RunningNodes1), 98 {AllNodes1, DiscNodes} = 99 case try_read_file(cluster_status_filename()) of 100 {ok, [{AllNodes, DiscNodes0}]} -> 101 {AllNodes, DiscNodes0}; 102 {ok, [AllNodes0]} when is_list(AllNodes0) -> 103 {legacy_cluster_nodes(AllNodes0), legacy_disc_nodes(AllNodes0)}; 104 {ok, Files} -> 105 corrupt_cluster_status_files(Files); 106 {error, enoent} -> 107 LegacyNodes = legacy_cluster_nodes([]), 108 {LegacyNodes, LegacyNodes} 109 end, 110 AllNodes2 = lists:usort(AllNodes1 ++ RunningNodes2), 111 ok = write_cluster_status({AllNodes2, DiscNodes, RunningNodes2}). 112 113-spec corrupt_cluster_status_files(any()) -> no_return(). 114 115corrupt_cluster_status_files(F) -> 116 throw({error, corrupt_cluster_status_files, F}). 117 118-spec write_cluster_status(rabbit_mnesia:cluster_status()) -> 'ok'. 119 120write_cluster_status({All, Disc, Running}) -> 121 ClusterStatusFN = cluster_status_filename(), 122 Res = case rabbit_file:write_term_file(ClusterStatusFN, [{All, Disc}]) of 123 ok -> 124 RunningNodesFN = running_nodes_filename(), 125 {RunningNodesFN, 126 rabbit_file:write_term_file(RunningNodesFN, [Running])}; 127 E1 = {error, _} -> 128 {ClusterStatusFN, E1} 129 end, 130 case Res of 131 {_, ok} -> ok; 132 {FN, {error, E2}} -> throw({error, {could_not_write_file, FN, E2}}) 133 end. 134 135-spec read_cluster_status() -> rabbit_mnesia:cluster_status(). 136 137read_cluster_status() -> 138 case {try_read_file(cluster_status_filename()), 139 try_read_file(running_nodes_filename())} of 140 {{ok, [{All, Disc}]}, {ok, [Running]}} when is_list(Running) -> 141 {All, Disc, Running}; 142 {Stat, Run} -> 143 throw({error, {corrupt_or_missing_cluster_files, Stat, Run}}) 144 end. 145 146-spec update_cluster_status() -> 'ok'. 147 148update_cluster_status() -> 149 {ok, Status} = rabbit_mnesia:cluster_status_from_mnesia(), 150 write_cluster_status(Status). 151 152-spec reset_cluster_status() -> 'ok'. 153 154reset_cluster_status() -> 155 write_cluster_status({[node()], [node()], [node()]}). 156 157%%---------------------------------------------------------------------------- 158%% Cluster notifications 159%%---------------------------------------------------------------------------- 160 161-spec notify_node_up() -> 'ok'. 162 163notify_node_up() -> 164 gen_server:cast(?SERVER, notify_node_up). 165 166-spec notify_joined_cluster() -> 'ok'. 167 168notify_joined_cluster() -> 169 NewMember = node(), 170 Nodes = rabbit_nodes:all_running() -- [NewMember], 171 gen_server:abcast(Nodes, ?SERVER, 172 {joined_cluster, node(), rabbit_mnesia:node_type()}), 173 174 ok. 175 176-spec notify_left_cluster(node()) -> 'ok'. 177 178notify_left_cluster(Node) -> 179 Nodes = rabbit_nodes:all_running(), 180 gen_server:abcast(Nodes, ?SERVER, {left_cluster, Node}), 181 ok. 182 183%%---------------------------------------------------------------------------- 184%% Server calls 185%%---------------------------------------------------------------------------- 186 187-spec partitions() -> [node()]. 188 189partitions() -> 190 gen_server:call(?SERVER, partitions, infinity). 191 192-spec partitions([node()]) -> [{node(), [node()]}]. 193 194partitions(Nodes) -> 195 {Replies, _} = gen_server:multi_call(Nodes, ?SERVER, partitions, ?NODE_REPLY_TIMEOUT), 196 Replies. 197 198-spec status([node()]) -> {[{node(), [node()]}], [node()]}. 199 200status(Nodes) -> 201 gen_server:multi_call(Nodes, ?SERVER, status, infinity). 202 203-spec subscribe(pid()) -> 'ok'. 204 205subscribe(Pid) -> 206 gen_server:cast(?SERVER, {subscribe, Pid}). 207 208%%---------------------------------------------------------------------------- 209%% pause_minority/pause_if_all_down safety 210%%---------------------------------------------------------------------------- 211 212%% If we are in a minority and pause_minority mode then a) we are 213%% going to shut down imminently and b) we should not confirm anything 214%% until then, since anything we confirm is likely to be lost. 215%% 216%% The same principles apply to a node which isn't part of the preferred 217%% partition when we are in pause_if_all_down mode. 218%% 219%% We could confirm something by having an HA queue see the pausing 220%% state (and fail over into it) before the node monitor stops us, or 221%% by using unmirrored queues and just having them vanish (and 222%% confirming messages as thrown away). 223%% 224%% So we have channels call in here before issuing confirms, to do a 225%% lightweight check that we have not entered a pausing state. 226 227-spec pause_partition_guard() -> 'ok' | 'pausing'. 228 229pause_partition_guard() -> 230 case get(pause_partition_guard) of 231 not_pause_mode -> 232 ok; 233 undefined -> 234 {ok, M} = application:get_env(rabbit, cluster_partition_handling), 235 case M of 236 pause_minority -> 237 pause_minority_guard([], ok); 238 {pause_if_all_down, PreferredNodes, _} -> 239 pause_if_all_down_guard(PreferredNodes, [], ok); 240 _ -> 241 put(pause_partition_guard, not_pause_mode), 242 ok 243 end; 244 {minority_mode, Nodes, LastState} -> 245 pause_minority_guard(Nodes, LastState); 246 {pause_if_all_down_mode, PreferredNodes, Nodes, LastState} -> 247 pause_if_all_down_guard(PreferredNodes, Nodes, LastState) 248 end. 249 250pause_minority_guard(LastNodes, LastState) -> 251 case nodes() of 252 LastNodes -> LastState; 253 _ -> NewState = case majority() of 254 false -> pausing; 255 true -> ok 256 end, 257 put(pause_partition_guard, 258 {minority_mode, nodes(), NewState}), 259 NewState 260 end. 261 262pause_if_all_down_guard(PreferredNodes, LastNodes, LastState) -> 263 case nodes() of 264 LastNodes -> LastState; 265 _ -> NewState = case in_preferred_partition(PreferredNodes) of 266 false -> pausing; 267 true -> ok 268 end, 269 put(pause_partition_guard, 270 {pause_if_all_down_mode, PreferredNodes, nodes(), 271 NewState}), 272 NewState 273 end. 274 275%%---------------------------------------------------------------------------- 276%% "global" hang workaround. 277%%---------------------------------------------------------------------------- 278 279%% This code works around a possible inconsistency in the "global" 280%% state, causing global:sync/0 to never return. 281%% 282%% 1. A process is spawned. 283%% 2. If after 15", global:sync() didn't return, the "global" 284%% state is parsed. 285%% 3. If it detects that a sync is blocked for more than 10", 286%% the process sends fake nodedown/nodeup events to the two 287%% nodes involved (one local, one remote). 288%% 4. Both "global" instances restart their synchronisation. 289%% 5. globao:sync() finally returns. 290%% 291%% FIXME: Remove this workaround, once we got rid of the change to 292%% "dist_auto_connect" and fixed the bugs uncovered. 293 294global_sync() -> 295 Pid = spawn(fun workaround_global_hang/0), 296 ok = global:sync(), 297 Pid ! global_sync_done, 298 ok. 299 300workaround_global_hang() -> 301 receive 302 global_sync_done -> 303 ok 304 after 10000 -> 305 find_blocked_global_peers() 306 end. 307 308find_blocked_global_peers() -> 309 Snapshot1 = snapshot_global_dict(), 310 timer:sleep(10000), 311 Snapshot2 = snapshot_global_dict(), 312 find_blocked_global_peers1(Snapshot2, Snapshot1). 313 314snapshot_global_dict() -> 315 {status, _, _, [Dict | _]} = sys:get_status(global_name_server), 316 [E || {{sync_tag_his, _}, _} = E <- Dict]. 317 318find_blocked_global_peers1([{{sync_tag_his, Peer}, _} = Item | Rest], 319 OlderSnapshot) -> 320 case lists:member(Item, OlderSnapshot) of 321 true -> unblock_global_peer(Peer); 322 false -> ok 323 end, 324 find_blocked_global_peers1(Rest, OlderSnapshot); 325find_blocked_global_peers1([], _) -> 326 ok. 327 328unblock_global_peer(PeerNode) -> 329 ThisNode = node(), 330 PeerState = rpc:call(PeerNode, sys, get_status, [global_name_server]), 331 logger:info( 332 "Global hang workaround: global state on ~s seems broken~n" 333 " * Peer global state: ~p~n" 334 " * Local global state: ~p~n" 335 "Faking nodedown/nodeup between ~s and ~s", 336 [PeerNode, PeerState, sys:get_status(global_name_server), 337 PeerNode, ThisNode]), 338 {global_name_server, ThisNode} ! {nodedown, PeerNode}, 339 {global_name_server, PeerNode} ! {nodedown, ThisNode}, 340 {global_name_server, ThisNode} ! {nodeup, PeerNode}, 341 {global_name_server, PeerNode} ! {nodeup, ThisNode}, 342 ok. 343 344%%---------------------------------------------------------------------------- 345%% gen_server callbacks 346%%---------------------------------------------------------------------------- 347 348init([]) -> 349 %% We trap exits so that the supervisor will not just kill us. We 350 %% want to be sure that we are not going to be killed while 351 %% writing out the cluster status files - bad things can then 352 %% happen. 353 process_flag(trap_exit, true), 354 net_kernel:monitor_nodes(true, [nodedown_reason]), 355 {ok, _} = mnesia:subscribe(system), 356 %% If the node has been restarted, Mnesia can trigger a system notification 357 %% before the monitor subscribes to receive them. To avoid autoheal blocking due to 358 %% the inconsistent database event never arriving, we being monitoring all running 359 %% nodes as early as possible. The rest of the monitoring ops will only be triggered 360 %% when notifications arrive. 361 Nodes = possibly_partitioned_nodes(), 362 startup_log(Nodes), 363 Monitors = lists:foldl(fun(Node, Monitors0) -> 364 pmon:monitor({rabbit, Node}, Monitors0) 365 end, pmon:new(), Nodes), 366 {ok, ensure_keepalive_timer(#state{monitors = Monitors, 367 subscribers = pmon:new(), 368 partitions = [], 369 guid = erlang:system_info(creation), 370 node_guids = maps:new(), 371 autoheal = rabbit_autoheal:init()})}. 372 373handle_call(partitions, _From, State = #state{partitions = Partitions}) -> 374 {reply, Partitions, State}; 375 376handle_call(status, _From, State = #state{partitions = Partitions}) -> 377 {reply, [{partitions, Partitions}, 378 {nodes, [node() | nodes()]}], State}; 379 380handle_call(_Request, _From, State) -> 381 {noreply, State}. 382 383handle_cast(notify_node_up, State = #state{guid = GUID}) -> 384 Nodes = rabbit_nodes:all_running() -- [node()], 385 gen_server:abcast(Nodes, ?SERVER, 386 {node_up, node(), rabbit_mnesia:node_type(), GUID}), 387 %% register other active rabbits with this rabbit 388 DiskNodes = rabbit_mnesia:cluster_nodes(disc), 389 [gen_server:cast(?SERVER, {node_up, N, case lists:member(N, DiskNodes) of 390 true -> disc; 391 false -> ram 392 end}) || N <- Nodes], 393 {noreply, State}; 394 395%%---------------------------------------------------------------------------- 396%% Partial partition detection 397%% 398%% Every node generates a GUID each time it starts, and announces that 399%% GUID in 'node_up', with 'announce_guid' sent by return so the new 400%% node knows the GUIDs of the others. These GUIDs are sent in all the 401%% partial partition related messages to ensure that we ignore partial 402%% partition messages from before we restarted (to avoid getting stuck 403%% in a loop). 404%% 405%% When one node gets nodedown from another, it then sends 406%% 'check_partial_partition' to all the nodes it still thinks are 407%% alive. If any of those (intermediate) nodes still see the "down" 408%% node as up, they inform it that this has happened. The original 409%% node (in 'ignore', 'pause_if_all_down' or 'autoheal' mode) will then 410%% disconnect from the intermediate node to "upgrade" to a full 411%% partition. 412%% 413%% In pause_minority mode it will instead immediately pause until all 414%% nodes come back. This is because the contract for pause_minority is 415%% that nodes should never sit in a partitioned state - if it just 416%% disconnected, it would become a minority, pause, realise it's not 417%% in a minority any more, and come back, still partitioned (albeit no 418%% longer partially). 419%% 420%% UPDATE: The GUID is actually not a GUID anymore - it is the value 421%% returned by erlang:system_info(creation). This prevent false-positives 422%% in a situation when a node is restarted (Erlang VM is up) but the rabbit 423%% app is not yet up. The GUID was only generated and announced upon rabbit 424%% startup; creation is available immediately. Therefore we can tell that 425%% the node was restarted, before it announces the new value. 426%% ---------------------------------------------------------------------------- 427 428handle_cast({node_up, Node, NodeType, GUID}, 429 State = #state{guid = MyGUID, 430 node_guids = GUIDs}) -> 431 cast(Node, {announce_guid, node(), MyGUID}), 432 GUIDs1 = maps:put(Node, GUID, GUIDs), 433 handle_cast({node_up, Node, NodeType}, State#state{node_guids = GUIDs1}); 434 435handle_cast({announce_guid, Node, GUID}, State = #state{node_guids = GUIDs}) -> 436 {noreply, State#state{node_guids = maps:put(Node, GUID, GUIDs)}}; 437 438handle_cast({check_partial_partition, Node, Rep, NodeGUID, MyGUID, RepGUID}, 439 State = #state{guid = MyGUID, 440 node_guids = GUIDs}) -> 441 case lists:member(Node, rabbit_nodes:all_running()) andalso 442 maps:find(Node, GUIDs) =:= {ok, NodeGUID} of 443 true -> spawn_link( %%[1] 444 fun () -> 445 case rpc:call(Node, erlang, system_info, [creation]) of 446 {badrpc, _} -> ok; 447 NodeGUID -> 448 rabbit_log:warning("Received a 'DOWN' message" 449 " from ~p but still can" 450 " communicate with it ", 451 [Node]), 452 cast(Rep, {partial_partition, 453 Node, node(), RepGUID}); 454 _ -> 455 rabbit_log:warning("Node ~p was restarted", [Node]), 456 ok 457 end 458 end); 459 false -> ok 460 end, 461 {noreply, State}; 462%% [1] We checked that we haven't heard the node go down - but we 463%% really should make sure we can actually communicate with 464%% it. Otherwise there's a race where we falsely detect a partial 465%% partition. 466%% 467%% Now of course the rpc:call/4 may take a long time to return if 468%% connectivity with the node is actually interrupted - but that's OK, 469%% we only really want to do something in a timely manner if 470%% connectivity is OK. However, of course as always we must not block 471%% the node monitor, so we do the check in a separate process. 472 473handle_cast({check_partial_partition, _Node, _Reporter, 474 _NodeGUID, _GUID, _ReporterGUID}, State) -> 475 {noreply, State}; 476 477handle_cast({partial_partition, NotReallyDown, Proxy, MyGUID}, 478 State = #state{guid = MyGUID}) -> 479 FmtBase = "Partial partition detected:~n" 480 " * We saw DOWN from ~s~n" 481 " * We can still see ~s which can see ~s~n", 482 ArgsBase = [NotReallyDown, Proxy, NotReallyDown], 483 case application:get_env(rabbit, cluster_partition_handling) of 484 {ok, pause_minority} -> 485 rabbit_log:error( 486 FmtBase ++ " * pause_minority mode enabled~n" 487 "We will therefore pause until the *entire* cluster recovers", 488 ArgsBase), 489 await_cluster_recovery(fun all_nodes_up/0), 490 {noreply, State}; 491 {ok, {pause_if_all_down, PreferredNodes, _}} -> 492 case in_preferred_partition(PreferredNodes) of 493 true -> rabbit_log:error( 494 FmtBase ++ "We will therefore intentionally " 495 "disconnect from ~s", ArgsBase ++ [Proxy]), 496 upgrade_to_full_partition(Proxy); 497 false -> rabbit_log:info( 498 FmtBase ++ "We are about to pause, no need " 499 "for further actions", ArgsBase) 500 end, 501 {noreply, State}; 502 {ok, _} -> 503 rabbit_log:error( 504 FmtBase ++ "We will therefore intentionally disconnect from ~s", 505 ArgsBase ++ [Proxy]), 506 upgrade_to_full_partition(Proxy), 507 {noreply, State} 508 end; 509 510handle_cast({partial_partition, _GUID, _Reporter, _Proxy}, State) -> 511 {noreply, State}; 512 513%% Sometimes it appears the Erlang VM does not give us nodedown 514%% messages reliably when another node disconnects from us. Therefore 515%% we are told just before the disconnection so we can reciprocate. 516handle_cast({partial_partition_disconnect, Other}, State) -> 517 rabbit_log:error("Partial partition disconnect from ~s", [Other]), 518 disconnect(Other), 519 {noreply, State}; 520 521%% Note: when updating the status file, we can't simply write the 522%% mnesia information since the message can (and will) overtake the 523%% mnesia propagation. 524handle_cast({node_up, Node, NodeType}, 525 State = #state{monitors = Monitors}) -> 526 rabbit_log:info("rabbit on node ~p up", [Node]), 527 {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(), 528 write_cluster_status({add_node(Node, AllNodes), 529 case NodeType of 530 disc -> add_node(Node, DiscNodes); 531 ram -> DiscNodes 532 end, 533 add_node(Node, RunningNodes)}), 534 ok = handle_live_rabbit(Node), 535 Monitors1 = case pmon:is_monitored({rabbit, Node}, Monitors) of 536 true -> 537 Monitors; 538 false -> 539 pmon:monitor({rabbit, Node}, Monitors) 540 end, 541 {noreply, maybe_autoheal(State#state{monitors = Monitors1})}; 542 543handle_cast({joined_cluster, Node, NodeType}, State) -> 544 {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(), 545 write_cluster_status({add_node(Node, AllNodes), 546 case NodeType of 547 disc -> add_node(Node, DiscNodes); 548 ram -> DiscNodes 549 end, 550 RunningNodes}), 551 rabbit_log:debug("Node '~p' has joined the cluster", [Node]), 552 rabbit_event:notify(node_added, [{node, Node}]), 553 {noreply, State}; 554 555handle_cast({left_cluster, Node}, State) -> 556 {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(), 557 write_cluster_status({del_node(Node, AllNodes), del_node(Node, DiscNodes), 558 del_node(Node, RunningNodes)}), 559 {noreply, State}; 560 561handle_cast({subscribe, Pid}, State = #state{subscribers = Subscribers}) -> 562 {noreply, State#state{subscribers = pmon:monitor(Pid, Subscribers)}}; 563 564handle_cast(keepalive, State) -> 565 {noreply, State}; 566 567handle_cast(_Msg, State) -> 568 {noreply, State}. 569 570handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, 571 State = #state{monitors = Monitors, subscribers = Subscribers}) -> 572 rabbit_log:info("rabbit on node ~p down", [Node]), 573 {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(), 574 write_cluster_status({AllNodes, DiscNodes, del_node(Node, RunningNodes)}), 575 [P ! {node_down, Node} || P <- pmon:monitored(Subscribers)], 576 {noreply, handle_dead_rabbit( 577 Node, 578 State#state{monitors = pmon:erase({rabbit, Node}, Monitors)})}; 579 580handle_info({'DOWN', _MRef, process, Pid, _Reason}, 581 State = #state{subscribers = Subscribers}) -> 582 {noreply, State#state{subscribers = pmon:erase(Pid, Subscribers)}}; 583 584handle_info({nodedown, Node, Info}, State = #state{guid = MyGUID, 585 node_guids = GUIDs}) -> 586 rabbit_log:info("node ~p down: ~p", 587 [Node, proplists:get_value(nodedown_reason, Info)]), 588 Check = fun (N, CheckGUID, DownGUID) -> 589 cast(N, {check_partial_partition, 590 Node, node(), DownGUID, CheckGUID, MyGUID}) 591 end, 592 case maps:find(Node, GUIDs) of 593 {ok, DownGUID} -> Alive = rabbit_nodes:all_running() 594 -- [node(), Node], 595 [case maps:find(N, GUIDs) of 596 {ok, CheckGUID} -> Check(N, CheckGUID, DownGUID); 597 error -> ok 598 end || N <- Alive]; 599 error -> ok 600 end, 601 {noreply, handle_dead_node(Node, State)}; 602 603handle_info({nodeup, Node, _Info}, State) -> 604 rabbit_log:info("node ~p up", [Node]), 605 {noreply, State}; 606 607handle_info({mnesia_system_event, 608 {inconsistent_database, running_partitioned_network, Node}}, 609 State = #state{partitions = Partitions, 610 monitors = Monitors}) -> 611 %% We will not get a node_up from this node - yet we should treat it as 612 %% up (mostly). 613 State1 = case pmon:is_monitored({rabbit, Node}, Monitors) of 614 true -> State; 615 false -> State#state{ 616 monitors = pmon:monitor({rabbit, Node}, Monitors)} 617 end, 618 ok = handle_live_rabbit(Node), 619 Partitions1 = lists:usort([Node | Partitions]), 620 {noreply, maybe_autoheal(State1#state{partitions = Partitions1})}; 621 622handle_info({autoheal_msg, Msg}, State = #state{autoheal = AState, 623 partitions = Partitions}) -> 624 AState1 = rabbit_autoheal:handle_msg(Msg, AState, Partitions), 625 {noreply, State#state{autoheal = AState1}}; 626 627handle_info(ping_down_nodes, State) -> 628 %% We ping nodes when some are down to ensure that we find out 629 %% about healed partitions quickly. We ping all nodes rather than 630 %% just the ones we know are down for simplicity; it's not expensive 631 %% to ping the nodes that are up, after all. 632 State1 = State#state{down_ping_timer = undefined}, 633 Self = self(), 634 %% We ping in a separate process since in a partition it might 635 %% take some noticeable length of time and we don't want to block 636 %% the node monitor for that long. 637 spawn_link(fun () -> 638 ping_all(), 639 case all_nodes_up() of 640 true -> ok; 641 false -> Self ! ping_down_nodes_again 642 end 643 end), 644 {noreply, State1}; 645 646handle_info(ping_down_nodes_again, State) -> 647 {noreply, ensure_ping_timer(State)}; 648 649handle_info(ping_up_nodes, State) -> 650 %% In this case we need to ensure that we ping "quickly" - 651 %% i.e. only nodes that we know to be up. 652 [cast(N, keepalive) || N <- alive_nodes() -- [node()]], 653 {noreply, ensure_keepalive_timer(State#state{keepalive_timer = undefined})}; 654 655handle_info({'EXIT', _, _} = Info, State = #state{autoheal = AState0}) -> 656 AState = rabbit_autoheal:process_down(Info, AState0), 657 {noreply, State#state{autoheal = AState}}; 658 659handle_info(_Info, State) -> 660 {noreply, State}. 661 662terminate(_Reason, State) -> 663 rabbit_misc:stop_timer(State, #state.down_ping_timer), 664 ok. 665 666code_change(_OldVsn, State, _Extra) -> 667 {ok, State}. 668 669%%---------------------------------------------------------------------------- 670%% Functions that call the module specific hooks when nodes go up/down 671%%---------------------------------------------------------------------------- 672 673handle_dead_node(Node, State = #state{autoheal = Autoheal}) -> 674 %% In general in rabbit_node_monitor we care about whether the 675 %% rabbit application is up rather than the node; we do this so 676 %% that we can respond in the same way to "rabbitmqctl stop_app" 677 %% and "rabbitmqctl stop" as much as possible. 678 %% 679 %% However, for pause_minority and pause_if_all_down modes we can't do 680 %% this, since we depend on looking at whether other nodes are up 681 %% to decide whether to come back up ourselves - if we decide that 682 %% based on the rabbit application we would go down and never come 683 %% back. 684 case application:get_env(rabbit, cluster_partition_handling) of 685 {ok, pause_minority} -> 686 case majority([Node]) of 687 true -> ok; 688 false -> await_cluster_recovery(fun majority/0) 689 end, 690 State; 691 {ok, {pause_if_all_down, PreferredNodes, HowToRecover}} -> 692 case in_preferred_partition(PreferredNodes, [Node]) of 693 true -> ok; 694 false -> await_cluster_recovery( 695 fun in_preferred_partition/0) 696 end, 697 case HowToRecover of 698 autoheal -> State#state{autoheal = 699 rabbit_autoheal:node_down(Node, Autoheal)}; 700 _ -> State 701 end; 702 {ok, ignore} -> 703 State; 704 {ok, autoheal} -> 705 State#state{autoheal = rabbit_autoheal:node_down(Node, Autoheal)}; 706 {ok, Term} -> 707 rabbit_log:warning("cluster_partition_handling ~p unrecognised, " 708 "assuming 'ignore'", [Term]), 709 State 710 end. 711 712await_cluster_recovery(Condition) -> 713 rabbit_log:warning("Cluster minority/secondary status detected - " 714 "awaiting recovery", []), 715 run_outside_applications(fun () -> 716 rabbit:stop(), 717 wait_for_cluster_recovery(Condition) 718 end, false), 719 ok. 720 721run_outside_applications(Fun, WaitForExistingProcess) -> 722 spawn_link(fun () -> 723 %% Ignore exit messages from the monitor - the link is needed 724 %% to ensure the monitor detects abnormal exits from this process 725 %% and can reset the 'restarting' status on the autoheal, avoiding 726 %% a deadlock. The monitor is restarted when rabbit does, so messages 727 %% in the other direction should be ignored. 728 process_flag(trap_exit, true), 729 %% If our group leader is inside an application we are about 730 %% to stop, application:stop/1 does not return. 731 group_leader(whereis(init), self()), 732 register_outside_app_process(Fun, WaitForExistingProcess) 733 end). 734 735register_outside_app_process(Fun, WaitForExistingProcess) -> 736 %% Ensure only one such process at a time, the exit(badarg) is 737 %% harmless if one is already running. 738 %% 739 %% If WaitForExistingProcess is false, the given fun is simply not 740 %% executed at all and the process exits. 741 %% 742 %% If WaitForExistingProcess is true, we wait for the end of the 743 %% currently running process before executing the given function. 744 try register(rabbit_outside_app_process, self()) of 745 true -> 746 do_run_outside_app_fun(Fun) 747 catch 748 error:badarg when WaitForExistingProcess -> 749 MRef = erlang:monitor(process, rabbit_outside_app_process), 750 receive 751 {'DOWN', MRef, _, _, _} -> 752 %% The existing process exited, let's try to 753 %% register again. 754 register_outside_app_process(Fun, WaitForExistingProcess) 755 end; 756 error:badarg -> 757 ok 758 end. 759 760do_run_outside_app_fun(Fun) -> 761 try 762 Fun() 763 catch _:E:Stacktrace -> 764 rabbit_log:error( 765 "rabbit_outside_app_process:~n~p~n~p", 766 [E, Stacktrace]) 767 end. 768 769wait_for_cluster_recovery(Condition) -> 770 ping_all(), 771 case Condition() of 772 true -> rabbit:start(); 773 false -> timer:sleep(?RABBIT_DOWN_PING_INTERVAL), 774 wait_for_cluster_recovery(Condition) 775 end. 776 777handle_dead_rabbit(Node, State = #state{partitions = Partitions, 778 autoheal = Autoheal}) -> 779 %% TODO: This may turn out to be a performance hog when there are 780 %% lots of nodes. We really only need to execute some of these 781 %% statements on *one* node, rather than all of them. 782 ok = rabbit_networking:on_node_down(Node), 783 ok = rabbit_amqqueue:on_node_down(Node), 784 ok = rabbit_alarm:on_node_down(Node), 785 ok = rabbit_mnesia:on_node_down(Node), 786 %% If we have been partitioned, and we are now in the only remaining 787 %% partition, we no longer care about partitions - forget them. Note 788 %% that we do not attempt to deal with individual (other) partitions 789 %% going away. It's only safe to forget anything about partitions when 790 %% there are no partitions. 791 Down = Partitions -- alive_rabbit_nodes(), 792 NoLongerPartitioned = rabbit_nodes:all_running(), 793 Partitions1 = case Partitions -- Down -- NoLongerPartitioned of 794 [] -> []; 795 _ -> Partitions 796 end, 797 ensure_ping_timer( 798 State#state{partitions = Partitions1, 799 autoheal = rabbit_autoheal:rabbit_down(Node, Autoheal)}). 800 801ensure_ping_timer(State) -> 802 rabbit_misc:ensure_timer( 803 State, #state.down_ping_timer, ?RABBIT_DOWN_PING_INTERVAL, 804 ping_down_nodes). 805 806ensure_keepalive_timer(State) -> 807 {ok, Interval} = application:get_env(rabbit, cluster_keepalive_interval), 808 rabbit_misc:ensure_timer( 809 State, #state.keepalive_timer, Interval, ping_up_nodes). 810 811handle_live_rabbit(Node) -> 812 ok = rabbit_amqqueue:on_node_up(Node), 813 ok = rabbit_alarm:on_node_up(Node), 814 ok = rabbit_mnesia:on_node_up(Node). 815 816maybe_autoheal(State = #state{partitions = []}) -> 817 State; 818 819maybe_autoheal(State = #state{autoheal = AState}) -> 820 case all_nodes_up() of 821 true -> State#state{autoheal = rabbit_autoheal:maybe_start(AState)}; 822 false -> State 823 end. 824 825%%-------------------------------------------------------------------- 826%% Internal utils 827%%-------------------------------------------------------------------- 828 829try_read_file(FileName) -> 830 case rabbit_file:read_term_file(FileName) of 831 {ok, Term} -> {ok, Term}; 832 {error, enoent} -> {error, enoent}; 833 {error, E} -> throw({error, {cannot_read_file, FileName, E}}) 834 end. 835 836legacy_cluster_nodes(Nodes) -> 837 %% We get all the info that we can, including the nodes from 838 %% mnesia, which will be there if the node is a disc node (empty 839 %% list otherwise) 840 lists:usort(Nodes ++ mnesia:system_info(db_nodes)). 841 842legacy_disc_nodes(AllNodes) -> 843 case AllNodes == [] orelse lists:member(node(), AllNodes) of 844 true -> [node()]; 845 false -> [] 846 end. 847 848add_node(Node, Nodes) -> lists:usort([Node | Nodes]). 849 850del_node(Node, Nodes) -> Nodes -- [Node]. 851 852cast(Node, Msg) -> gen_server:cast({?SERVER, Node}, Msg). 853 854upgrade_to_full_partition(Proxy) -> 855 cast(Proxy, {partial_partition_disconnect, node()}), 856 disconnect(Proxy). 857 858%% When we call this, it's because we want to force Mnesia to detect a 859%% partition. But if we just disconnect_node/1 then Mnesia won't 860%% detect a very short partition. So we want to force a slightly 861%% longer disconnect. Unfortunately we don't have a way to blacklist 862%% individual nodes; the best we can do is turn off auto-connect 863%% altogether. 864disconnect(Node) -> 865 application:set_env(kernel, dist_auto_connect, never), 866 erlang:disconnect_node(Node), 867 timer:sleep(1000), 868 application:unset_env(kernel, dist_auto_connect), 869 ok. 870 871%%-------------------------------------------------------------------- 872 873%% mnesia:system_info(db_nodes) (and hence 874%% rabbit_nodes:all_running()) does not return all nodes 875%% when partitioned, just those that we are sharing Mnesia state 876%% with. So we have a small set of replacement functions 877%% here. "rabbit" in a function's name implies we test if the rabbit 878%% application is up, not just the node. 879 880%% As we use these functions to decide what to do in pause_minority or 881%% pause_if_all_down states, they *must* be fast, even in the case where 882%% TCP connections are timing out. So that means we should be careful 883%% about whether we connect to nodes which are currently disconnected. 884 885majority() -> 886 majority([]). 887 888majority(NodesDown) -> 889 Nodes = rabbit_nodes:all(), 890 AliveNodes = alive_nodes(Nodes) -- NodesDown, 891 length(AliveNodes) / length(Nodes) > 0.5. 892 893in_preferred_partition() -> 894 {ok, {pause_if_all_down, PreferredNodes, _}} = 895 application:get_env(rabbit, cluster_partition_handling), 896 in_preferred_partition(PreferredNodes). 897 898in_preferred_partition(PreferredNodes) -> 899 in_preferred_partition(PreferredNodes, []). 900 901in_preferred_partition(PreferredNodes, NodesDown) -> 902 Nodes = rabbit_nodes:all(), 903 RealPreferredNodes = [N || N <- PreferredNodes, lists:member(N, Nodes)], 904 AliveNodes = alive_nodes(RealPreferredNodes) -- NodesDown, 905 RealPreferredNodes =:= [] orelse AliveNodes =/= []. 906 907all_nodes_up() -> 908 Nodes = rabbit_nodes:all(), 909 length(alive_nodes(Nodes)) =:= length(Nodes). 910 911-spec all_rabbit_nodes_up() -> boolean(). 912 913all_rabbit_nodes_up() -> 914 Nodes = rabbit_nodes:all(), 915 length(alive_rabbit_nodes(Nodes)) =:= length(Nodes). 916 917alive_nodes() -> alive_nodes(rabbit_nodes:all()). 918 919-spec alive_nodes([node()]) -> [node()]. 920 921alive_nodes(Nodes) -> [N || N <- Nodes, lists:member(N, [node()|nodes()])]. 922 923alive_rabbit_nodes() -> alive_rabbit_nodes(rabbit_nodes:all()). 924 925-spec alive_rabbit_nodes([node()]) -> [node()]. 926 927alive_rabbit_nodes(Nodes) -> 928 [N || N <- alive_nodes(Nodes), rabbit:is_running(N)]. 929 930%% This one is allowed to connect! 931 932-spec ping_all() -> 'ok'. 933 934ping_all() -> 935 [net_adm:ping(N) || N <- rabbit_nodes:all()], 936 ok. 937 938possibly_partitioned_nodes() -> 939 alive_rabbit_nodes() -- rabbit_nodes:all_running(). 940 941startup_log([]) -> 942 rabbit_log:info("Starting rabbit_node_monitor", []); 943startup_log(Nodes) -> 944 rabbit_log:info("Starting rabbit_node_monitor, might be partitioned from ~p", 945 [Nodes]). 946