1%% This Source Code Form is subject to the terms of the Mozilla Public
2%% License, v. 2.0. If a copy of the MPL was not distributed with this
3%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4%%
5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates.  All rights reserved.
6%%
7
8-module(rabbit_node_monitor).
9
10%% Transitional step until we can require Erlang/OTP 21 and
11%% use the now recommended try/catch syntax for obtaining the stack trace.
12-compile(nowarn_deprecated_function).
13
14-behaviour(gen_server).
15
16-export([start_link/0]).
17-export([running_nodes_filename/0,
18         cluster_status_filename/0, coordination_filename/0,
19         quorum_filename/0, default_quorum_filename/0,
20         prepare_cluster_status_files/0,
21         write_cluster_status/1, read_cluster_status/0,
22         update_cluster_status/0, reset_cluster_status/0]).
23-export([notify_node_up/0, notify_joined_cluster/0, notify_left_cluster/1]).
24-export([partitions/0, partitions/1, status/1, subscribe/1]).
25-export([pause_partition_guard/0]).
26-export([global_sync/0]).
27
28%% gen_server callbacks
29-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
30         code_change/3]).
31
32 %% Utils
33-export([all_rabbit_nodes_up/0, run_outside_applications/2, ping_all/0,
34         alive_nodes/1, alive_rabbit_nodes/1]).
35
36-define(SERVER, ?MODULE).
37-define(NODE_REPLY_TIMEOUT, 5000).
38-define(RABBIT_UP_RPC_TIMEOUT, 2000).
39-define(RABBIT_DOWN_PING_INTERVAL, 1000).
40
41-record(state, {monitors, partitions, subscribers, down_ping_timer,
42                keepalive_timer, autoheal, guid, node_guids}).
43
44%%----------------------------------------------------------------------------
45%% Start
46%%----------------------------------------------------------------------------
47
48-spec start_link() -> rabbit_types:ok_pid_or_error().
49
50start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
51
52%%----------------------------------------------------------------------------
53%% Cluster file operations
54%%----------------------------------------------------------------------------
55
56%% The cluster file information is kept in two files.  The "cluster
57%% status file" contains all the clustered nodes and the disc nodes.
58%% The "running nodes file" contains the currently running nodes or
59%% the running nodes at shutdown when the node is down.
60%%
61%% We strive to keep the files up to date and we rely on this
62%% assumption in various situations. Obviously when mnesia is offline
63%% the information we have will be outdated, but it cannot be
64%% otherwise.
65
66-spec running_nodes_filename() -> string().
67
68running_nodes_filename() ->
69    filename:join(rabbit_mnesia:dir(), "nodes_running_at_shutdown").
70
71-spec cluster_status_filename() -> string().
72
73cluster_status_filename() ->
74    filename:join(rabbit_mnesia:dir(), "cluster_nodes.config").
75
76coordination_filename() ->
77    filename:join(rabbit_mnesia:dir(), "coordination").
78
79quorum_filename() ->
80    ra_env:data_dir().
81
82default_quorum_filename() ->
83    filename:join(rabbit_mnesia:dir(), "quorum").
84
85-spec prepare_cluster_status_files() -> 'ok' | no_return().
86
87prepare_cluster_status_files() ->
88    rabbit_mnesia:ensure_mnesia_dir(),
89    RunningNodes1 = case try_read_file(running_nodes_filename()) of
90                        {ok, [Nodes]} when is_list(Nodes) -> Nodes;
91                        {ok, Other}                       -> corrupt_cluster_status_files(Other);
92                        {error, enoent}                   -> []
93                    end,
94    ThisNode = [node()],
95    %% The running nodes file might contain a set or a list, in case
96    %% of the legacy file
97    RunningNodes2 = lists:usort(ThisNode ++ RunningNodes1),
98    {AllNodes1, DiscNodes} =
99        case try_read_file(cluster_status_filename()) of
100            {ok, [{AllNodes, DiscNodes0}]} ->
101                {AllNodes, DiscNodes0};
102            {ok, [AllNodes0]} when is_list(AllNodes0) ->
103                {legacy_cluster_nodes(AllNodes0), legacy_disc_nodes(AllNodes0)};
104            {ok, Files} ->
105                corrupt_cluster_status_files(Files);
106            {error, enoent} ->
107                LegacyNodes = legacy_cluster_nodes([]),
108                {LegacyNodes, LegacyNodes}
109        end,
110    AllNodes2 = lists:usort(AllNodes1 ++ RunningNodes2),
111    ok = write_cluster_status({AllNodes2, DiscNodes, RunningNodes2}).
112
113-spec corrupt_cluster_status_files(any()) -> no_return().
114
115corrupt_cluster_status_files(F) ->
116    throw({error, corrupt_cluster_status_files, F}).
117
118-spec write_cluster_status(rabbit_mnesia:cluster_status()) -> 'ok'.
119
120write_cluster_status({All, Disc, Running}) ->
121    ClusterStatusFN = cluster_status_filename(),
122    Res = case rabbit_file:write_term_file(ClusterStatusFN, [{All, Disc}]) of
123              ok ->
124                  RunningNodesFN = running_nodes_filename(),
125                  {RunningNodesFN,
126                   rabbit_file:write_term_file(RunningNodesFN, [Running])};
127              E1 = {error, _} ->
128                  {ClusterStatusFN, E1}
129          end,
130    case Res of
131        {_, ok}           -> ok;
132        {FN, {error, E2}} -> throw({error, {could_not_write_file, FN, E2}})
133    end.
134
135-spec read_cluster_status() -> rabbit_mnesia:cluster_status().
136
137read_cluster_status() ->
138    case {try_read_file(cluster_status_filename()),
139          try_read_file(running_nodes_filename())} of
140        {{ok, [{All, Disc}]}, {ok, [Running]}} when is_list(Running) ->
141            {All, Disc, Running};
142        {Stat, Run} ->
143            throw({error, {corrupt_or_missing_cluster_files, Stat, Run}})
144    end.
145
146-spec update_cluster_status() -> 'ok'.
147
148update_cluster_status() ->
149    {ok, Status} = rabbit_mnesia:cluster_status_from_mnesia(),
150    write_cluster_status(Status).
151
152-spec reset_cluster_status() -> 'ok'.
153
154reset_cluster_status() ->
155    write_cluster_status({[node()], [node()], [node()]}).
156
157%%----------------------------------------------------------------------------
158%% Cluster notifications
159%%----------------------------------------------------------------------------
160
161-spec notify_node_up() -> 'ok'.
162
163notify_node_up() ->
164    gen_server:cast(?SERVER, notify_node_up).
165
166-spec notify_joined_cluster() -> 'ok'.
167
168notify_joined_cluster() ->
169    NewMember = node(),
170    Nodes = rabbit_nodes:all_running() -- [NewMember],
171    gen_server:abcast(Nodes, ?SERVER,
172                      {joined_cluster, node(), rabbit_mnesia:node_type()}),
173
174    ok.
175
176-spec notify_left_cluster(node()) -> 'ok'.
177
178notify_left_cluster(Node) ->
179    Nodes = rabbit_nodes:all_running(),
180    gen_server:abcast(Nodes, ?SERVER, {left_cluster, Node}),
181    ok.
182
183%%----------------------------------------------------------------------------
184%% Server calls
185%%----------------------------------------------------------------------------
186
187-spec partitions() -> [node()].
188
189partitions() ->
190    gen_server:call(?SERVER, partitions, infinity).
191
192-spec partitions([node()]) -> [{node(), [node()]}].
193
194partitions(Nodes) ->
195    {Replies, _} = gen_server:multi_call(Nodes, ?SERVER, partitions, ?NODE_REPLY_TIMEOUT),
196    Replies.
197
198-spec status([node()]) -> {[{node(), [node()]}], [node()]}.
199
200status(Nodes) ->
201    gen_server:multi_call(Nodes, ?SERVER, status, infinity).
202
203-spec subscribe(pid()) -> 'ok'.
204
205subscribe(Pid) ->
206    gen_server:cast(?SERVER, {subscribe, Pid}).
207
208%%----------------------------------------------------------------------------
209%% pause_minority/pause_if_all_down safety
210%%----------------------------------------------------------------------------
211
212%% If we are in a minority and pause_minority mode then a) we are
213%% going to shut down imminently and b) we should not confirm anything
214%% until then, since anything we confirm is likely to be lost.
215%%
216%% The same principles apply to a node which isn't part of the preferred
217%% partition when we are in pause_if_all_down mode.
218%%
219%% We could confirm something by having an HA queue see the pausing
220%% state (and fail over into it) before the node monitor stops us, or
221%% by using unmirrored queues and just having them vanish (and
222%% confirming messages as thrown away).
223%%
224%% So we have channels call in here before issuing confirms, to do a
225%% lightweight check that we have not entered a pausing state.
226
227-spec pause_partition_guard() -> 'ok' | 'pausing'.
228
229pause_partition_guard() ->
230    case get(pause_partition_guard) of
231        not_pause_mode ->
232            ok;
233        undefined ->
234            {ok, M} = application:get_env(rabbit, cluster_partition_handling),
235            case M of
236                pause_minority ->
237                    pause_minority_guard([], ok);
238                {pause_if_all_down, PreferredNodes, _} ->
239                    pause_if_all_down_guard(PreferredNodes, [], ok);
240                _ ->
241                    put(pause_partition_guard, not_pause_mode),
242                    ok
243            end;
244        {minority_mode, Nodes, LastState} ->
245            pause_minority_guard(Nodes, LastState);
246        {pause_if_all_down_mode, PreferredNodes, Nodes, LastState} ->
247            pause_if_all_down_guard(PreferredNodes, Nodes, LastState)
248    end.
249
250pause_minority_guard(LastNodes, LastState) ->
251    case nodes() of
252        LastNodes -> LastState;
253        _         -> NewState = case majority() of
254                                    false -> pausing;
255                                    true  -> ok
256                                end,
257                     put(pause_partition_guard,
258                         {minority_mode, nodes(), NewState}),
259                     NewState
260    end.
261
262pause_if_all_down_guard(PreferredNodes, LastNodes, LastState) ->
263    case nodes() of
264        LastNodes -> LastState;
265        _         -> NewState = case in_preferred_partition(PreferredNodes) of
266                                    false -> pausing;
267                                    true  -> ok
268                                end,
269                     put(pause_partition_guard,
270                         {pause_if_all_down_mode, PreferredNodes, nodes(),
271                          NewState}),
272                     NewState
273    end.
274
275%%----------------------------------------------------------------------------
276%% "global" hang workaround.
277%%----------------------------------------------------------------------------
278
279%% This code works around a possible inconsistency in the "global"
280%% state, causing global:sync/0 to never return.
281%%
282%%     1. A process is spawned.
283%%     2. If after 15", global:sync() didn't return, the "global"
284%%        state is parsed.
285%%     3. If it detects that a sync is blocked for more than 10",
286%%        the process sends fake nodedown/nodeup events to the two
287%%        nodes involved (one local, one remote).
288%%     4. Both "global" instances restart their synchronisation.
289%%     5. globao:sync() finally returns.
290%%
291%% FIXME: Remove this workaround, once we got rid of the change to
292%% "dist_auto_connect" and fixed the bugs uncovered.
293
294global_sync() ->
295    Pid = spawn(fun workaround_global_hang/0),
296    ok = global:sync(),
297    Pid ! global_sync_done,
298    ok.
299
300workaround_global_hang() ->
301    receive
302        global_sync_done ->
303            ok
304    after 10000 ->
305            find_blocked_global_peers()
306    end.
307
308find_blocked_global_peers() ->
309    Snapshot1 = snapshot_global_dict(),
310    timer:sleep(10000),
311    Snapshot2 = snapshot_global_dict(),
312    find_blocked_global_peers1(Snapshot2, Snapshot1).
313
314snapshot_global_dict() ->
315    {status, _, _, [Dict | _]} = sys:get_status(global_name_server),
316    [E || {{sync_tag_his, _}, _} = E <- Dict].
317
318find_blocked_global_peers1([{{sync_tag_his, Peer}, _} = Item | Rest],
319  OlderSnapshot) ->
320    case lists:member(Item, OlderSnapshot) of
321        true  -> unblock_global_peer(Peer);
322        false -> ok
323    end,
324    find_blocked_global_peers1(Rest, OlderSnapshot);
325find_blocked_global_peers1([], _) ->
326    ok.
327
328unblock_global_peer(PeerNode) ->
329    ThisNode = node(),
330    PeerState = rpc:call(PeerNode, sys, get_status, [global_name_server]),
331    logger:info(
332      "Global hang workaround: global state on ~s seems broken~n"
333      " * Peer global state:  ~p~n"
334      " * Local global state: ~p~n"
335      "Faking nodedown/nodeup between ~s and ~s",
336      [PeerNode, PeerState, sys:get_status(global_name_server),
337       PeerNode, ThisNode]),
338    {global_name_server, ThisNode} ! {nodedown, PeerNode},
339    {global_name_server, PeerNode} ! {nodedown, ThisNode},
340    {global_name_server, ThisNode} ! {nodeup, PeerNode},
341    {global_name_server, PeerNode} ! {nodeup, ThisNode},
342    ok.
343
344%%----------------------------------------------------------------------------
345%% gen_server callbacks
346%%----------------------------------------------------------------------------
347
348init([]) ->
349    %% We trap exits so that the supervisor will not just kill us. We
350    %% want to be sure that we are not going to be killed while
351    %% writing out the cluster status files - bad things can then
352    %% happen.
353    process_flag(trap_exit, true),
354    net_kernel:monitor_nodes(true, [nodedown_reason]),
355    {ok, _} = mnesia:subscribe(system),
356    %% If the node has been restarted, Mnesia can trigger a system notification
357    %% before the monitor subscribes to receive them. To avoid autoheal blocking due to
358    %% the inconsistent database event never arriving, we being monitoring all running
359    %% nodes as early as possible. The rest of the monitoring ops will only be triggered
360    %% when notifications arrive.
361    Nodes = possibly_partitioned_nodes(),
362    startup_log(Nodes),
363    Monitors = lists:foldl(fun(Node, Monitors0) ->
364                                   pmon:monitor({rabbit, Node}, Monitors0)
365                           end, pmon:new(), Nodes),
366    {ok, ensure_keepalive_timer(#state{monitors    = Monitors,
367                                       subscribers = pmon:new(),
368                                       partitions  = [],
369                                       guid        = erlang:system_info(creation),
370                                       node_guids  = maps:new(),
371                                       autoheal    = rabbit_autoheal:init()})}.
372
373handle_call(partitions, _From, State = #state{partitions = Partitions}) ->
374    {reply, Partitions, State};
375
376handle_call(status, _From, State = #state{partitions = Partitions}) ->
377    {reply, [{partitions, Partitions},
378             {nodes,      [node() | nodes()]}], State};
379
380handle_call(_Request, _From, State) ->
381    {noreply, State}.
382
383handle_cast(notify_node_up, State = #state{guid = GUID}) ->
384    Nodes = rabbit_nodes:all_running() -- [node()],
385    gen_server:abcast(Nodes, ?SERVER,
386                      {node_up, node(), rabbit_mnesia:node_type(), GUID}),
387    %% register other active rabbits with this rabbit
388    DiskNodes = rabbit_mnesia:cluster_nodes(disc),
389    [gen_server:cast(?SERVER, {node_up, N, case lists:member(N, DiskNodes) of
390                                               true  -> disc;
391                                               false -> ram
392                                           end}) || N <- Nodes],
393    {noreply, State};
394
395%%----------------------------------------------------------------------------
396%% Partial partition detection
397%%
398%% Every node generates a GUID each time it starts, and announces that
399%% GUID in 'node_up', with 'announce_guid' sent by return so the new
400%% node knows the GUIDs of the others. These GUIDs are sent in all the
401%% partial partition related messages to ensure that we ignore partial
402%% partition messages from before we restarted (to avoid getting stuck
403%% in a loop).
404%%
405%% When one node gets nodedown from another, it then sends
406%% 'check_partial_partition' to all the nodes it still thinks are
407%% alive. If any of those (intermediate) nodes still see the "down"
408%% node as up, they inform it that this has happened. The original
409%% node (in 'ignore', 'pause_if_all_down' or 'autoheal' mode) will then
410%% disconnect from the intermediate node to "upgrade" to a full
411%% partition.
412%%
413%% In pause_minority mode it will instead immediately pause until all
414%% nodes come back. This is because the contract for pause_minority is
415%% that nodes should never sit in a partitioned state - if it just
416%% disconnected, it would become a minority, pause, realise it's not
417%% in a minority any more, and come back, still partitioned (albeit no
418%% longer partially).
419%%
420%% UPDATE: The GUID is actually not a GUID anymore - it is the value
421%% returned by erlang:system_info(creation). This prevent false-positives
422%% in a situation when a node is restarted (Erlang VM is up) but the rabbit
423%% app is not yet up. The GUID was only generated and announced upon rabbit
424%% startup; creation is available immediately. Therefore we can tell that
425%% the node was restarted, before it announces the new value.
426%% ----------------------------------------------------------------------------
427
428handle_cast({node_up, Node, NodeType, GUID},
429            State = #state{guid       = MyGUID,
430                           node_guids = GUIDs}) ->
431    cast(Node, {announce_guid, node(), MyGUID}),
432    GUIDs1 = maps:put(Node, GUID, GUIDs),
433    handle_cast({node_up, Node, NodeType}, State#state{node_guids = GUIDs1});
434
435handle_cast({announce_guid, Node, GUID}, State = #state{node_guids = GUIDs}) ->
436    {noreply, State#state{node_guids = maps:put(Node, GUID, GUIDs)}};
437
438handle_cast({check_partial_partition, Node, Rep, NodeGUID, MyGUID, RepGUID},
439            State = #state{guid       = MyGUID,
440                           node_guids = GUIDs}) ->
441    case lists:member(Node, rabbit_nodes:all_running()) andalso
442        maps:find(Node, GUIDs) =:= {ok, NodeGUID} of
443        true  -> spawn_link( %%[1]
444                   fun () ->
445                           case rpc:call(Node, erlang, system_info, [creation]) of
446                               {badrpc, _} -> ok;
447                               NodeGUID ->
448                                   rabbit_log:warning("Received a 'DOWN' message"
449                                                      " from ~p but still can"
450                                                      " communicate with it ",
451                                                      [Node]),
452                                   cast(Rep, {partial_partition,
453                                                         Node, node(), RepGUID});
454                                _ ->
455                                   rabbit_log:warning("Node ~p was restarted", [Node]),
456                                   ok
457                           end
458                   end);
459        false -> ok
460    end,
461    {noreply, State};
462%% [1] We checked that we haven't heard the node go down - but we
463%% really should make sure we can actually communicate with
464%% it. Otherwise there's a race where we falsely detect a partial
465%% partition.
466%%
467%% Now of course the rpc:call/4 may take a long time to return if
468%% connectivity with the node is actually interrupted - but that's OK,
469%% we only really want to do something in a timely manner if
470%% connectivity is OK. However, of course as always we must not block
471%% the node monitor, so we do the check in a separate process.
472
473handle_cast({check_partial_partition, _Node, _Reporter,
474             _NodeGUID, _GUID, _ReporterGUID}, State) ->
475    {noreply, State};
476
477handle_cast({partial_partition, NotReallyDown, Proxy, MyGUID},
478            State = #state{guid = MyGUID}) ->
479    FmtBase = "Partial partition detected:~n"
480        " * We saw DOWN from ~s~n"
481        " * We can still see ~s which can see ~s~n",
482    ArgsBase = [NotReallyDown, Proxy, NotReallyDown],
483    case application:get_env(rabbit, cluster_partition_handling) of
484        {ok, pause_minority} ->
485            rabbit_log:error(
486              FmtBase ++ " * pause_minority mode enabled~n"
487              "We will therefore pause until the *entire* cluster recovers",
488              ArgsBase),
489            await_cluster_recovery(fun all_nodes_up/0),
490            {noreply, State};
491        {ok, {pause_if_all_down, PreferredNodes, _}} ->
492            case in_preferred_partition(PreferredNodes) of
493                true  -> rabbit_log:error(
494                           FmtBase ++ "We will therefore intentionally "
495                           "disconnect from ~s", ArgsBase ++ [Proxy]),
496                         upgrade_to_full_partition(Proxy);
497                false -> rabbit_log:info(
498                           FmtBase ++ "We are about to pause, no need "
499                           "for further actions", ArgsBase)
500            end,
501            {noreply, State};
502        {ok, _} ->
503            rabbit_log:error(
504              FmtBase ++ "We will therefore intentionally disconnect from ~s",
505              ArgsBase ++ [Proxy]),
506            upgrade_to_full_partition(Proxy),
507            {noreply, State}
508    end;
509
510handle_cast({partial_partition, _GUID, _Reporter, _Proxy}, State) ->
511    {noreply, State};
512
513%% Sometimes it appears the Erlang VM does not give us nodedown
514%% messages reliably when another node disconnects from us. Therefore
515%% we are told just before the disconnection so we can reciprocate.
516handle_cast({partial_partition_disconnect, Other}, State) ->
517    rabbit_log:error("Partial partition disconnect from ~s", [Other]),
518    disconnect(Other),
519    {noreply, State};
520
521%% Note: when updating the status file, we can't simply write the
522%% mnesia information since the message can (and will) overtake the
523%% mnesia propagation.
524handle_cast({node_up, Node, NodeType},
525            State = #state{monitors = Monitors}) ->
526    rabbit_log:info("rabbit on node ~p up", [Node]),
527    {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
528    write_cluster_status({add_node(Node, AllNodes),
529                          case NodeType of
530                              disc -> add_node(Node, DiscNodes);
531                              ram  -> DiscNodes
532                          end,
533                          add_node(Node, RunningNodes)}),
534    ok = handle_live_rabbit(Node),
535    Monitors1 = case pmon:is_monitored({rabbit, Node}, Monitors) of
536                    true ->
537                        Monitors;
538                    false ->
539                        pmon:monitor({rabbit, Node}, Monitors)
540                end,
541    {noreply, maybe_autoheal(State#state{monitors = Monitors1})};
542
543handle_cast({joined_cluster, Node, NodeType}, State) ->
544    {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
545    write_cluster_status({add_node(Node, AllNodes),
546                          case NodeType of
547                              disc -> add_node(Node, DiscNodes);
548                              ram  -> DiscNodes
549                          end,
550                          RunningNodes}),
551    rabbit_log:debug("Node '~p' has joined the cluster", [Node]),
552    rabbit_event:notify(node_added, [{node, Node}]),
553    {noreply, State};
554
555handle_cast({left_cluster, Node}, State) ->
556    {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
557    write_cluster_status({del_node(Node, AllNodes), del_node(Node, DiscNodes),
558                          del_node(Node, RunningNodes)}),
559    {noreply, State};
560
561handle_cast({subscribe, Pid}, State = #state{subscribers = Subscribers}) ->
562    {noreply, State#state{subscribers = pmon:monitor(Pid, Subscribers)}};
563
564handle_cast(keepalive, State) ->
565    {noreply, State};
566
567handle_cast(_Msg, State) ->
568    {noreply, State}.
569
570handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason},
571            State = #state{monitors = Monitors, subscribers = Subscribers}) ->
572    rabbit_log:info("rabbit on node ~p down", [Node]),
573    {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
574    write_cluster_status({AllNodes, DiscNodes, del_node(Node, RunningNodes)}),
575    [P ! {node_down, Node} || P <- pmon:monitored(Subscribers)],
576    {noreply, handle_dead_rabbit(
577                Node,
578                State#state{monitors = pmon:erase({rabbit, Node}, Monitors)})};
579
580handle_info({'DOWN', _MRef, process, Pid, _Reason},
581            State = #state{subscribers = Subscribers}) ->
582    {noreply, State#state{subscribers = pmon:erase(Pid, Subscribers)}};
583
584handle_info({nodedown, Node, Info}, State = #state{guid       = MyGUID,
585                                                   node_guids = GUIDs}) ->
586    rabbit_log:info("node ~p down: ~p",
587                    [Node, proplists:get_value(nodedown_reason, Info)]),
588    Check = fun (N, CheckGUID, DownGUID) ->
589                    cast(N, {check_partial_partition,
590                             Node, node(), DownGUID, CheckGUID, MyGUID})
591            end,
592    case maps:find(Node, GUIDs) of
593        {ok, DownGUID} -> Alive = rabbit_nodes:all_running()
594                              -- [node(), Node],
595                          [case maps:find(N, GUIDs) of
596                               {ok, CheckGUID} -> Check(N, CheckGUID, DownGUID);
597                               error           -> ok
598                           end || N <- Alive];
599        error          -> ok
600    end,
601    {noreply, handle_dead_node(Node, State)};
602
603handle_info({nodeup, Node, _Info}, State) ->
604    rabbit_log:info("node ~p up", [Node]),
605    {noreply, State};
606
607handle_info({mnesia_system_event,
608             {inconsistent_database, running_partitioned_network, Node}},
609            State = #state{partitions = Partitions,
610                           monitors   = Monitors}) ->
611    %% We will not get a node_up from this node - yet we should treat it as
612    %% up (mostly).
613    State1 = case pmon:is_monitored({rabbit, Node}, Monitors) of
614                 true  -> State;
615                 false -> State#state{
616                            monitors = pmon:monitor({rabbit, Node}, Monitors)}
617             end,
618    ok = handle_live_rabbit(Node),
619    Partitions1 = lists:usort([Node | Partitions]),
620    {noreply, maybe_autoheal(State1#state{partitions = Partitions1})};
621
622handle_info({autoheal_msg, Msg}, State = #state{autoheal   = AState,
623                                                partitions = Partitions}) ->
624    AState1 = rabbit_autoheal:handle_msg(Msg, AState, Partitions),
625    {noreply, State#state{autoheal = AState1}};
626
627handle_info(ping_down_nodes, State) ->
628    %% We ping nodes when some are down to ensure that we find out
629    %% about healed partitions quickly. We ping all nodes rather than
630    %% just the ones we know are down for simplicity; it's not expensive
631    %% to ping the nodes that are up, after all.
632    State1 = State#state{down_ping_timer = undefined},
633    Self = self(),
634    %% We ping in a separate process since in a partition it might
635    %% take some noticeable length of time and we don't want to block
636    %% the node monitor for that long.
637    spawn_link(fun () ->
638                       ping_all(),
639                       case all_nodes_up() of
640                           true  -> ok;
641                           false -> Self ! ping_down_nodes_again
642                       end
643               end),
644    {noreply, State1};
645
646handle_info(ping_down_nodes_again, State) ->
647    {noreply, ensure_ping_timer(State)};
648
649handle_info(ping_up_nodes, State) ->
650    %% In this case we need to ensure that we ping "quickly" -
651    %% i.e. only nodes that we know to be up.
652    [cast(N, keepalive) || N <- alive_nodes() -- [node()]],
653    {noreply, ensure_keepalive_timer(State#state{keepalive_timer = undefined})};
654
655handle_info({'EXIT', _, _} = Info, State = #state{autoheal = AState0}) ->
656    AState = rabbit_autoheal:process_down(Info, AState0),
657    {noreply, State#state{autoheal = AState}};
658
659handle_info(_Info, State) ->
660    {noreply, State}.
661
662terminate(_Reason, State) ->
663    rabbit_misc:stop_timer(State, #state.down_ping_timer),
664    ok.
665
666code_change(_OldVsn, State, _Extra) ->
667    {ok, State}.
668
669%%----------------------------------------------------------------------------
670%% Functions that call the module specific hooks when nodes go up/down
671%%----------------------------------------------------------------------------
672
673handle_dead_node(Node, State = #state{autoheal = Autoheal}) ->
674    %% In general in rabbit_node_monitor we care about whether the
675    %% rabbit application is up rather than the node; we do this so
676    %% that we can respond in the same way to "rabbitmqctl stop_app"
677    %% and "rabbitmqctl stop" as much as possible.
678    %%
679    %% However, for pause_minority and pause_if_all_down modes we can't do
680    %% this, since we depend on looking at whether other nodes are up
681    %% to decide whether to come back up ourselves - if we decide that
682    %% based on the rabbit application we would go down and never come
683    %% back.
684    case application:get_env(rabbit, cluster_partition_handling) of
685        {ok, pause_minority} ->
686            case majority([Node]) of
687                true  -> ok;
688                false -> await_cluster_recovery(fun majority/0)
689            end,
690            State;
691        {ok, {pause_if_all_down, PreferredNodes, HowToRecover}} ->
692            case in_preferred_partition(PreferredNodes, [Node]) of
693                true  -> ok;
694                false -> await_cluster_recovery(
695                           fun in_preferred_partition/0)
696            end,
697            case HowToRecover of
698                autoheal -> State#state{autoheal =
699                              rabbit_autoheal:node_down(Node, Autoheal)};
700                _        -> State
701            end;
702        {ok, ignore} ->
703            State;
704        {ok, autoheal} ->
705            State#state{autoheal = rabbit_autoheal:node_down(Node, Autoheal)};
706        {ok, Term} ->
707            rabbit_log:warning("cluster_partition_handling ~p unrecognised, "
708                               "assuming 'ignore'", [Term]),
709            State
710    end.
711
712await_cluster_recovery(Condition) ->
713    rabbit_log:warning("Cluster minority/secondary status detected - "
714                       "awaiting recovery", []),
715    run_outside_applications(fun () ->
716                                     rabbit:stop(),
717                                     wait_for_cluster_recovery(Condition)
718                             end, false),
719    ok.
720
721run_outside_applications(Fun, WaitForExistingProcess) ->
722    spawn_link(fun () ->
723                       %% Ignore exit messages from the monitor - the link is needed
724                       %% to ensure the monitor detects abnormal exits from this process
725                       %% and can reset the 'restarting' status on the autoheal, avoiding
726                       %% a deadlock. The monitor is restarted when rabbit does, so messages
727                       %% in the other direction should be ignored.
728                       process_flag(trap_exit, true),
729                       %% If our group leader is inside an application we are about
730                       %% to stop, application:stop/1 does not return.
731                       group_leader(whereis(init), self()),
732                       register_outside_app_process(Fun, WaitForExistingProcess)
733               end).
734
735register_outside_app_process(Fun, WaitForExistingProcess) ->
736    %% Ensure only one such process at a time, the exit(badarg) is
737    %% harmless if one is already running.
738    %%
739    %% If WaitForExistingProcess is false, the given fun is simply not
740    %% executed at all and the process exits.
741    %%
742    %% If WaitForExistingProcess is true, we wait for the end of the
743    %% currently running process before executing the given function.
744    try register(rabbit_outside_app_process, self()) of
745        true ->
746            do_run_outside_app_fun(Fun)
747    catch
748        error:badarg when WaitForExistingProcess ->
749            MRef = erlang:monitor(process, rabbit_outside_app_process),
750            receive
751                {'DOWN', MRef, _, _, _} ->
752                    %% The existing process exited, let's try to
753                    %% register again.
754                    register_outside_app_process(Fun, WaitForExistingProcess)
755            end;
756        error:badarg ->
757            ok
758    end.
759
760do_run_outside_app_fun(Fun) ->
761    try
762        Fun()
763    catch _:E:Stacktrace ->
764            rabbit_log:error(
765              "rabbit_outside_app_process:~n~p~n~p",
766              [E, Stacktrace])
767    end.
768
769wait_for_cluster_recovery(Condition) ->
770    ping_all(),
771    case Condition() of
772        true  -> rabbit:start();
773        false -> timer:sleep(?RABBIT_DOWN_PING_INTERVAL),
774                 wait_for_cluster_recovery(Condition)
775    end.
776
777handle_dead_rabbit(Node, State = #state{partitions = Partitions,
778                                        autoheal   = Autoheal}) ->
779    %% TODO: This may turn out to be a performance hog when there are
780    %% lots of nodes.  We really only need to execute some of these
781    %% statements on *one* node, rather than all of them.
782    ok = rabbit_networking:on_node_down(Node),
783    ok = rabbit_amqqueue:on_node_down(Node),
784    ok = rabbit_alarm:on_node_down(Node),
785    ok = rabbit_mnesia:on_node_down(Node),
786    %% If we have been partitioned, and we are now in the only remaining
787    %% partition, we no longer care about partitions - forget them. Note
788    %% that we do not attempt to deal with individual (other) partitions
789    %% going away. It's only safe to forget anything about partitions when
790    %% there are no partitions.
791    Down = Partitions -- alive_rabbit_nodes(),
792    NoLongerPartitioned = rabbit_nodes:all_running(),
793    Partitions1 = case Partitions -- Down -- NoLongerPartitioned of
794                      [] -> [];
795                      _  -> Partitions
796                  end,
797    ensure_ping_timer(
798      State#state{partitions = Partitions1,
799                  autoheal   = rabbit_autoheal:rabbit_down(Node, Autoheal)}).
800
801ensure_ping_timer(State) ->
802    rabbit_misc:ensure_timer(
803      State, #state.down_ping_timer, ?RABBIT_DOWN_PING_INTERVAL,
804      ping_down_nodes).
805
806ensure_keepalive_timer(State) ->
807    {ok, Interval} = application:get_env(rabbit, cluster_keepalive_interval),
808    rabbit_misc:ensure_timer(
809      State, #state.keepalive_timer, Interval, ping_up_nodes).
810
811handle_live_rabbit(Node) ->
812    ok = rabbit_amqqueue:on_node_up(Node),
813    ok = rabbit_alarm:on_node_up(Node),
814    ok = rabbit_mnesia:on_node_up(Node).
815
816maybe_autoheal(State = #state{partitions = []}) ->
817    State;
818
819maybe_autoheal(State = #state{autoheal = AState}) ->
820    case all_nodes_up() of
821        true  -> State#state{autoheal = rabbit_autoheal:maybe_start(AState)};
822        false -> State
823    end.
824
825%%--------------------------------------------------------------------
826%% Internal utils
827%%--------------------------------------------------------------------
828
829try_read_file(FileName) ->
830    case rabbit_file:read_term_file(FileName) of
831        {ok, Term}      -> {ok, Term};
832        {error, enoent} -> {error, enoent};
833        {error, E}      -> throw({error, {cannot_read_file, FileName, E}})
834    end.
835
836legacy_cluster_nodes(Nodes) ->
837    %% We get all the info that we can, including the nodes from
838    %% mnesia, which will be there if the node is a disc node (empty
839    %% list otherwise)
840    lists:usort(Nodes ++ mnesia:system_info(db_nodes)).
841
842legacy_disc_nodes(AllNodes) ->
843    case AllNodes == [] orelse lists:member(node(), AllNodes) of
844        true  -> [node()];
845        false -> []
846    end.
847
848add_node(Node, Nodes) -> lists:usort([Node | Nodes]).
849
850del_node(Node, Nodes) -> Nodes -- [Node].
851
852cast(Node, Msg) -> gen_server:cast({?SERVER, Node}, Msg).
853
854upgrade_to_full_partition(Proxy) ->
855    cast(Proxy, {partial_partition_disconnect, node()}),
856    disconnect(Proxy).
857
858%% When we call this, it's because we want to force Mnesia to detect a
859%% partition. But if we just disconnect_node/1 then Mnesia won't
860%% detect a very short partition. So we want to force a slightly
861%% longer disconnect. Unfortunately we don't have a way to blacklist
862%% individual nodes; the best we can do is turn off auto-connect
863%% altogether.
864disconnect(Node) ->
865    application:set_env(kernel, dist_auto_connect, never),
866    erlang:disconnect_node(Node),
867    timer:sleep(1000),
868    application:unset_env(kernel, dist_auto_connect),
869    ok.
870
871%%--------------------------------------------------------------------
872
873%% mnesia:system_info(db_nodes) (and hence
874%% rabbit_nodes:all_running()) does not return all nodes
875%% when partitioned, just those that we are sharing Mnesia state
876%% with. So we have a small set of replacement functions
877%% here. "rabbit" in a function's name implies we test if the rabbit
878%% application is up, not just the node.
879
880%% As we use these functions to decide what to do in pause_minority or
881%% pause_if_all_down states, they *must* be fast, even in the case where
882%% TCP connections are timing out. So that means we should be careful
883%% about whether we connect to nodes which are currently disconnected.
884
885majority() ->
886    majority([]).
887
888majority(NodesDown) ->
889    Nodes = rabbit_nodes:all(),
890    AliveNodes = alive_nodes(Nodes) -- NodesDown,
891    length(AliveNodes) / length(Nodes) > 0.5.
892
893in_preferred_partition() ->
894    {ok, {pause_if_all_down, PreferredNodes, _}} =
895        application:get_env(rabbit, cluster_partition_handling),
896    in_preferred_partition(PreferredNodes).
897
898in_preferred_partition(PreferredNodes) ->
899    in_preferred_partition(PreferredNodes, []).
900
901in_preferred_partition(PreferredNodes, NodesDown) ->
902    Nodes = rabbit_nodes:all(),
903    RealPreferredNodes = [N || N <- PreferredNodes, lists:member(N, Nodes)],
904    AliveNodes = alive_nodes(RealPreferredNodes) -- NodesDown,
905    RealPreferredNodes =:= [] orelse AliveNodes =/= [].
906
907all_nodes_up() ->
908    Nodes = rabbit_nodes:all(),
909    length(alive_nodes(Nodes)) =:= length(Nodes).
910
911-spec all_rabbit_nodes_up() -> boolean().
912
913all_rabbit_nodes_up() ->
914    Nodes = rabbit_nodes:all(),
915    length(alive_rabbit_nodes(Nodes)) =:= length(Nodes).
916
917alive_nodes() -> alive_nodes(rabbit_nodes:all()).
918
919-spec alive_nodes([node()]) -> [node()].
920
921alive_nodes(Nodes) -> [N || N <- Nodes, lists:member(N, [node()|nodes()])].
922
923alive_rabbit_nodes() -> alive_rabbit_nodes(rabbit_nodes:all()).
924
925-spec alive_rabbit_nodes([node()]) -> [node()].
926
927alive_rabbit_nodes(Nodes) ->
928    [N || N <- alive_nodes(Nodes), rabbit:is_running(N)].
929
930%% This one is allowed to connect!
931
932-spec ping_all() -> 'ok'.
933
934ping_all() ->
935    [net_adm:ping(N) || N <- rabbit_nodes:all()],
936    ok.
937
938possibly_partitioned_nodes() ->
939    alive_rabbit_nodes() -- rabbit_nodes:all_running().
940
941startup_log([]) ->
942    rabbit_log:info("Starting rabbit_node_monitor", []);
943startup_log(Nodes) ->
944    rabbit_log:info("Starting rabbit_node_monitor, might be partitioned from ~p",
945                    [Nodes]).
946