1%% This Source Code Form is subject to the terms of the Mozilla Public 2%% License, v. 2.0. If a copy of the MPL was not distributed with this 3%% file, You can obtain one at https://mozilla.org/MPL/2.0/. 4%% 5%% Copyright (c) 2010-2021 VMware, Inc. or its affiliates. All rights reserved. 6%% 7 8-module(rabbit_prequeue). 9 10%% This is the initial gen_server that all queue processes start off 11%% as. It handles the decision as to whether we need to start a new 12%% mirror, a new master/unmirrored, or whether we are restarting (and 13%% if so, as what). Thus a crashing queue process can restart from here 14%% and always do the right thing. 15 16-export([start_link/3]). 17 18-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, 19 code_change/3]). 20 21-behaviour(gen_server2). 22 23-include_lib("rabbit_common/include/rabbit.hrl"). 24-include("amqqueue.hrl"). 25 26%%---------------------------------------------------------------------------- 27 28-export_type([start_mode/0]). 29 30-type start_mode() :: 'declare' | 'recovery' | 'slave'. 31 32%%---------------------------------------------------------------------------- 33 34-spec start_link(amqqueue:amqqueue(), start_mode(), pid()) 35 -> rabbit_types:ok_pid_or_error(). 36 37start_link(Q, StartMode, Marker) -> 38 gen_server2:start_link(?MODULE, {Q, StartMode, Marker}, []). 39 40%%---------------------------------------------------------------------------- 41 42init({Q, StartMode, Marker}) -> 43 init(Q, case {is_process_alive(Marker), StartMode} of 44 {true, slave} -> slave; 45 {true, _} -> master; 46 {false, _} -> restart 47 end). 48 49init(Q, master) -> rabbit_amqqueue_process:init(Q); 50init(Q, slave) -> rabbit_mirror_queue_slave:init(Q); 51 52init(Q0, restart) when ?is_amqqueue(Q0) -> 53 QueueName = amqqueue:get_name(Q0), 54 {ok, Q1} = rabbit_amqqueue:lookup(QueueName), 55 QPid = amqqueue:get_pid(Q1), 56 SPids = amqqueue:get_slave_pids(Q1), 57 LocalOrMasterDown = node(QPid) =:= node() 58 orelse not rabbit_mnesia:on_running_node(QPid), 59 Slaves = [SPid || SPid <- SPids, rabbit_mnesia:is_process_alive(SPid)], 60 case rabbit_mnesia:is_process_alive(QPid) of 61 true -> false = LocalOrMasterDown, %% assertion 62 rabbit_mirror_queue_slave:go(self(), async), 63 rabbit_mirror_queue_slave:init(Q1); %% [1] 64 false -> case LocalOrMasterDown andalso Slaves =:= [] of 65 true -> crash_restart(Q1); %% [2] 66 false -> timer:sleep(25), 67 init(Q1, restart) %% [3] 68 end 69 end. 70%% [1] There is a master on another node. Regardless of whether we 71%% were originally a master or a mirror, we are now a new slave. 72%% 73%% [2] Nothing is alive. We are the last best hope. Try to restart as a master. 74%% 75%% [3] The current master is dead but either there are alive mirrors to 76%% take over or it's all happening on a different node anyway. This is 77%% not a stable situation. Sleep and wait for somebody else to make a 78%% move. 79 80crash_restart(Q0) when ?is_amqqueue(Q0) -> 81 QueueName = amqqueue:get_name(Q0), 82 rabbit_log:error("Restarting crashed ~s.", [rabbit_misc:rs(QueueName)]), 83 gen_server2:cast(self(), init), 84 Q1 = amqqueue:set_pid(Q0, self()), 85 rabbit_amqqueue_process:init(Q1). 86 87%%---------------------------------------------------------------------------- 88 89%% This gen_server2 always hands over to some other module at the end 90%% of init/1. 91-spec handle_call(_, _, _) -> no_return(). 92handle_call(_Msg, _From, _State) -> exit(unreachable). 93-spec handle_cast(_, _) -> no_return(). 94handle_cast(_Msg, _State) -> exit(unreachable). 95-spec handle_info(_, _) -> no_return(). 96handle_info(_Msg, _State) -> exit(unreachable). 97-spec terminate(_, _) -> no_return(). 98terminate(_Reason, _State) -> exit(unreachable). 99-spec code_change(_, _, _) -> no_return(). 100code_change(_OldVsn, _State, _Extra) -> exit(unreachable). 101