1%% This Source Code Form is subject to the terms of the Mozilla Public
2%% License, v. 2.0. If a copy of the MPL was not distributed with this
3%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4%%
5%% Copyright (c) 2010-2021 VMware, Inc. or its affiliates.  All rights reserved.
6%%
7
8-module(rabbit_prequeue).
9
10%% This is the initial gen_server that all queue processes start off
11%% as. It handles the decision as to whether we need to start a new
12%% mirror, a new master/unmirrored, or whether we are restarting (and
13%% if so, as what). Thus a crashing queue process can restart from here
14%% and always do the right thing.
15
16-export([start_link/3]).
17
18-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
19         code_change/3]).
20
21-behaviour(gen_server2).
22
23-include_lib("rabbit_common/include/rabbit.hrl").
24-include("amqqueue.hrl").
25
26%%----------------------------------------------------------------------------
27
28-export_type([start_mode/0]).
29
30-type start_mode() :: 'declare' | 'recovery' | 'slave'.
31
32%%----------------------------------------------------------------------------
33
34-spec start_link(amqqueue:amqqueue(), start_mode(), pid())
35                      -> rabbit_types:ok_pid_or_error().
36
37start_link(Q, StartMode, Marker) ->
38    gen_server2:start_link(?MODULE, {Q, StartMode, Marker}, []).
39
40%%----------------------------------------------------------------------------
41
42init({Q, StartMode, Marker}) ->
43    init(Q, case {is_process_alive(Marker), StartMode} of
44                {true,  slave} -> slave;
45                {true,  _}     -> master;
46                {false, _}     -> restart
47            end).
48
49init(Q, master) -> rabbit_amqqueue_process:init(Q);
50init(Q, slave)  -> rabbit_mirror_queue_slave:init(Q);
51
52init(Q0, restart) when ?is_amqqueue(Q0) ->
53    QueueName = amqqueue:get_name(Q0),
54    {ok, Q1} = rabbit_amqqueue:lookup(QueueName),
55    QPid = amqqueue:get_pid(Q1),
56    SPids = amqqueue:get_slave_pids(Q1),
57    LocalOrMasterDown = node(QPid) =:= node()
58        orelse not rabbit_mnesia:on_running_node(QPid),
59    Slaves = [SPid || SPid <- SPids, rabbit_mnesia:is_process_alive(SPid)],
60    case rabbit_mnesia:is_process_alive(QPid) of
61        true  -> false = LocalOrMasterDown, %% assertion
62                 rabbit_mirror_queue_slave:go(self(), async),
63                 rabbit_mirror_queue_slave:init(Q1); %% [1]
64        false -> case LocalOrMasterDown andalso Slaves =:= [] of
65                     true  -> crash_restart(Q1);     %% [2]
66                     false -> timer:sleep(25),
67                              init(Q1, restart)      %% [3]
68                 end
69    end.
70%% [1] There is a master on another node. Regardless of whether we
71%%     were originally a master or a mirror, we are now a new slave.
72%%
73%% [2] Nothing is alive. We are the last best hope. Try to restart as a master.
74%%
75%% [3] The current master is dead but either there are alive mirrors to
76%%     take over or it's all happening on a different node anyway. This is
77%%     not a stable situation. Sleep and wait for somebody else to make a
78%%     move.
79
80crash_restart(Q0) when ?is_amqqueue(Q0) ->
81    QueueName = amqqueue:get_name(Q0),
82    rabbit_log:error("Restarting crashed ~s.", [rabbit_misc:rs(QueueName)]),
83    gen_server2:cast(self(), init),
84    Q1 = amqqueue:set_pid(Q0, self()),
85    rabbit_amqqueue_process:init(Q1).
86
87%%----------------------------------------------------------------------------
88
89%% This gen_server2 always hands over to some other module at the end
90%% of init/1.
91-spec handle_call(_, _, _) -> no_return().
92handle_call(_Msg, _From, _State)     -> exit(unreachable).
93-spec handle_cast(_, _) -> no_return().
94handle_cast(_Msg, _State)            -> exit(unreachable).
95-spec handle_info(_, _) -> no_return().
96handle_info(_Msg, _State)            -> exit(unreachable).
97-spec terminate(_, _) -> no_return().
98terminate(_Reason, _State)           -> exit(unreachable).
99-spec code_change(_, _, _) -> no_return().
100code_change(_OldVsn, _State, _Extra) -> exit(unreachable).
101