1%% This Source Code Form is subject to the terms of the Mozilla Public
2%% License, v. 2.0. If a copy of the MPL was not distributed with this
3%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4%%
5%% Copyright (c) 2010-2021 VMware, Inc. or its affiliates.  All rights reserved.
6%%
7
8-module(rabbit_mirror_queue_slave).
9
10%% For general documentation of HA design, see
11%% rabbit_mirror_queue_coordinator
12%%
13%% We receive messages from GM and from publishers, and the gm
14%% messages can arrive either before or after the 'actual' message.
15%% All instructions from the GM group must be processed in the order
16%% in which they're received.
17
18-export([set_maximum_since_use/2, info/1, go/2]).
19
20-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
21         code_change/3, handle_pre_hibernate/1, prioritise_call/4,
22         prioritise_cast/3, prioritise_info/3, format_message_queue/2]).
23
24-export([joined/2, members_changed/3, handle_msg/3, handle_terminate/2]).
25
26-behaviour(gen_server2).
27-behaviour(gm).
28
29-include_lib("rabbit_common/include/rabbit.hrl").
30
31-include("amqqueue.hrl").
32-include("gm_specs.hrl").
33
34%%----------------------------------------------------------------------------
35
36-define(INFO_KEYS,
37        [pid,
38         name,
39         master_pid,
40         is_synchronised
41        ]).
42
43-define(SYNC_INTERVAL,                 25). %% milliseconds
44-define(RAM_DURATION_UPDATE_INTERVAL,  5000).
45-define(DEATH_TIMEOUT,                 20000). %% 20 seconds
46
47-record(state, { q,
48                 gm,
49                 backing_queue,
50                 backing_queue_state,
51                 sync_timer_ref,
52                 rate_timer_ref,
53
54                 sender_queues, %% :: Pid -> {Q Msg, Set MsgId, ChState}
55                 msg_id_ack,    %% :: MsgId -> AckTag
56
57                 msg_id_status,
58                 known_senders,
59
60                 %% Master depth - local depth
61                 depth_delta
62               }).
63
64%%----------------------------------------------------------------------------
65
66set_maximum_since_use(QPid, Age) ->
67    gen_server2:cast(QPid, {set_maximum_since_use, Age}).
68
69info(QPid) -> gen_server2:call(QPid, info, infinity).
70
71init(Q) when ?is_amqqueue(Q) ->
72    QName = amqqueue:get_name(Q),
73    ?store_proc_name(QName),
74    {ok, {not_started, Q}, hibernate,
75     {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN,
76      ?DESIRED_HIBERNATE}, ?MODULE}.
77
78go(SPid, sync)  -> gen_server2:call(SPid, go, infinity);
79go(SPid, async) -> gen_server2:cast(SPid, go).
80
81handle_go(Q0) when ?is_amqqueue(Q0) ->
82    QName = amqqueue:get_name(Q0),
83    %% We join the GM group before we add ourselves to the amqqueue
84    %% record. As a result:
85    %% 1. We can receive msgs from GM that correspond to messages we will
86    %% never receive from publishers.
87    %% 2. When we receive a message from publishers, we must receive a
88    %% message from the GM group for it.
89    %% 3. However, that instruction from the GM group can arrive either
90    %% before or after the actual message. We need to be able to
91    %% distinguish between GM instructions arriving early, and case (1)
92    %% above.
93    %%
94    process_flag(trap_exit, true), %% amqqueue_process traps exits too.
95    {ok, GM} = gm:start_link(QName, ?MODULE, [self()],
96                             fun rabbit_misc:execute_mnesia_transaction/1),
97    MRef = erlang:monitor(process, GM),
98    %% We ignore the DOWN message because we are also linked and
99    %% trapping exits, we just want to not get stuck and we will exit
100    %% later.
101    receive
102        {joined, GM}            -> erlang:demonitor(MRef, [flush]),
103                                   ok;
104        {'DOWN', MRef, _, _, _} -> ok
105    end,
106    Self = self(),
107    Node = node(),
108    case rabbit_misc:execute_mnesia_transaction(
109           fun() -> init_it(Self, GM, Node, QName) end) of
110        {new, QPid, GMPids} ->
111            ok = file_handle_cache:register_callback(
112                   rabbit_amqqueue, set_maximum_since_use, [Self]),
113            ok = rabbit_memory_monitor:register(
114                   Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}),
115            {ok, BQ} = application:get_env(backing_queue_module),
116            Q1 = amqqueue:set_pid(Q0, QPid),
117            _ = BQ:delete_crashed(Q1), %% For crash recovery
118            BQS = bq_init(BQ, Q1, new),
119            State = #state { q                   = Q1,
120                             gm                  = GM,
121                             backing_queue       = BQ,
122                             backing_queue_state = BQS,
123                             rate_timer_ref      = undefined,
124                             sync_timer_ref      = undefined,
125
126                             sender_queues       = #{},
127                             msg_id_ack          = #{},
128
129                             msg_id_status       = #{},
130                             known_senders       = pmon:new(delegate),
131
132                             depth_delta         = undefined
133                   },
134            ok = gm:broadcast(GM, request_depth),
135            ok = gm:validate_members(GM, [GM | [G || {G, _} <- GMPids]]),
136            rabbit_mirror_queue_misc:maybe_auto_sync(Q1),
137            {ok, State};
138        {stale, StalePid} ->
139            rabbit_mirror_queue_misc:log_warning(
140              QName, "Detected stale classic mirrored queue leader: ~p", [StalePid]),
141            gm:leave(GM),
142            {error, {stale_master_pid, StalePid}};
143        duplicate_live_master ->
144            gm:leave(GM),
145            {error, {duplicate_live_master, Node}};
146        existing ->
147            gm:leave(GM),
148            {error, normal};
149        master_in_recovery ->
150            gm:leave(GM),
151            %% The queue record vanished - we must have a master starting
152            %% concurrently with us. In that case we can safely decide to do
153            %% nothing here, and the master will start us in
154            %% master:init_with_existing_bq/3
155            {error, normal}
156    end.
157
158init_it(Self, GM, Node, QName) ->
159    case mnesia:read({rabbit_queue, QName}) of
160        [Q] when ?is_amqqueue(Q) ->
161            QPid = amqqueue:get_pid(Q),
162            SPids = amqqueue:get_slave_pids(Q),
163            GMPids = amqqueue:get_gm_pids(Q),
164            PSPids = amqqueue:get_slave_pids_pending_shutdown(Q),
165            case [Pid || Pid <- [QPid | SPids], node(Pid) =:= Node] of
166                []     -> stop_pending_slaves(QName, PSPids),
167                          add_slave(Q, Self, GM),
168                          {new, QPid, GMPids};
169                [QPid] -> case rabbit_mnesia:is_process_alive(QPid) of
170                              true  -> duplicate_live_master;
171                              false -> {stale, QPid}
172                          end;
173                [SPid] -> case rabbit_mnesia:is_process_alive(SPid) of
174                              true  -> existing;
175                              false -> GMPids1 = [T || T = {_, S} <- GMPids, S =/= SPid],
176                                       SPids1 = SPids -- [SPid],
177                                       Q1 = amqqueue:set_slave_pids(Q, SPids1),
178                                       Q2 = amqqueue:set_gm_pids(Q1, GMPids1),
179                                       add_slave(Q2, Self, GM),
180                                       {new, QPid, GMPids1}
181                          end
182            end;
183        [] ->
184            master_in_recovery
185    end.
186
187%% Pending mirrors have been asked to stop by the master, but despite the node
188%% being up these did not answer on the expected timeout. Stop local mirrors now.
189stop_pending_slaves(QName, Pids) ->
190    [begin
191         rabbit_mirror_queue_misc:log_warning(
192           QName, "Detected a non-responsive classic queue mirror, stopping it: ~p", [Pid]),
193         case erlang:process_info(Pid, dictionary) of
194             undefined -> ok;
195             {dictionary, Dict} ->
196                 Vhost = QName#resource.virtual_host,
197                 {ok, AmqQSup} = rabbit_amqqueue_sup_sup:find_for_vhost(Vhost),
198                 case proplists:get_value('$ancestors', Dict) of
199                     [Sup, AmqQSup | _] ->
200                         exit(Sup, kill),
201                         exit(Pid, kill);
202                     _ ->
203                         ok
204                 end
205         end
206     end || Pid <- Pids, node(Pid) =:= node(),
207            true =:= erlang:is_process_alive(Pid)].
208
209%% Add to the end, so they are in descending order of age, see
210%% rabbit_mirror_queue_misc:promote_slave/1
211add_slave(Q0, New, GM) when ?is_amqqueue(Q0) ->
212    SPids = amqqueue:get_slave_pids(Q0),
213    GMPids = amqqueue:get_gm_pids(Q0),
214    SPids1 = SPids ++ [New],
215    GMPids1 = [{GM, New} | GMPids],
216    Q1 = amqqueue:set_slave_pids(Q0, SPids1),
217    Q2 = amqqueue:set_gm_pids(Q1, GMPids1),
218    rabbit_mirror_queue_misc:store_updated_slaves(Q2).
219
220handle_call(go, _From, {not_started, Q} = NotStarted) ->
221    case handle_go(Q) of
222        {ok, State}    -> {reply, ok, State};
223        {error, Error} -> {stop, Error, NotStarted}
224    end;
225
226handle_call({gm_deaths, DeadGMPids}, From,
227            State = #state{ gm = GM, q = Q,
228                            backing_queue = BQ,
229                            backing_queue_state = BQS}) when ?is_amqqueue(Q) ->
230    QName = amqqueue:get_name(Q),
231    MPid = amqqueue:get_pid(Q),
232    Self = self(),
233    case rabbit_mirror_queue_misc:remove_from_queue(QName, Self, DeadGMPids) of
234        {error, not_found} ->
235            gen_server2:reply(From, ok),
236            {stop, normal, State};
237        {error, {not_synced, _SPids}} ->
238            BQ:delete_and_terminate({error, not_synced}, BQS),
239            {stop, normal, State#state{backing_queue_state = undefined}};
240        {ok, Pid, DeadPids, ExtraNodes} ->
241            rabbit_mirror_queue_misc:report_deaths(Self, false, QName,
242                                                   DeadPids),
243            case Pid of
244                MPid ->
245                    %% master hasn't changed
246                    gen_server2:reply(From, ok),
247                    rabbit_mirror_queue_misc:add_mirrors(
248                      QName, ExtraNodes, async),
249                    noreply(State);
250                Self ->
251                    %% we've become master
252                    QueueState = promote_me(From, State),
253                    rabbit_mirror_queue_misc:add_mirrors(
254                      QName, ExtraNodes, async),
255                    {become, rabbit_amqqueue_process, QueueState, hibernate};
256                _ ->
257                    %% master has changed to not us
258                    gen_server2:reply(From, ok),
259                    %% see rabbitmq-server#914;
260                    %% It's not always guaranteed that we won't have ExtraNodes.
261                    %% If gm alters, master can change to not us with extra nodes,
262                    %% in which case we attempt to add mirrors on those nodes.
263                    case ExtraNodes of
264                        [] -> void;
265                        _  -> rabbit_mirror_queue_misc:add_mirrors(
266                                QName, ExtraNodes, async)
267                    end,
268                    %% Since GM is by nature lazy we need to make sure
269                    %% there is some traffic when a master dies, to
270                    %% make sure all mirrors get informed of the
271                    %% death. That is all process_death does, create
272                    %% some traffic.
273                    ok = gm:broadcast(GM, process_death),
274                    Q1 = amqqueue:set_pid(Q, Pid),
275                    State1 = State#state{q = Q1},
276                    noreply(State1)
277            end
278    end;
279
280handle_call(info, _From, State) ->
281    reply(infos(?INFO_KEYS, State), State).
282
283handle_cast(go, {not_started, Q} = NotStarted) ->
284    case handle_go(Q) of
285        {ok, State}    -> {noreply, State};
286        {error, Error} -> {stop, Error, NotStarted}
287    end;
288
289handle_cast({run_backing_queue, Mod, Fun}, State) ->
290    noreply(run_backing_queue(Mod, Fun, State));
291
292handle_cast({gm, Instruction}, State = #state{q = Q0}) when ?is_amqqueue(Q0) ->
293    QName = amqqueue:get_name(Q0),
294    case rabbit_amqqueue:lookup(QName) of
295       {ok, Q1} when ?is_amqqueue(Q1) ->
296           SPids = amqqueue:get_slave_pids(Q1),
297           case lists:member(self(), SPids) of
298               true ->
299                   handle_process_result(process_instruction(Instruction, State));
300               false ->
301                   %% Potentially a duplicated mirror caused by a partial partition,
302                   %% will stop as a new mirror could start unaware of our presence
303                   {stop, shutdown, State}
304           end;
305       {error, not_found} ->
306           %% Would not expect this to happen after fixing #953
307           {stop, shutdown, State}
308    end;
309
310handle_cast({deliver, Delivery = #delivery{sender = Sender, flow = Flow}, true},
311            State) ->
312    %% Asynchronous, non-"mandatory", deliver mode.
313    %% We are acking messages to the channel process that sent us
314    %% the message delivery. See
315    %% rabbit_amqqueue_process:handle_ch_down for more info.
316    %% If message is rejected by the master, the publish will be nacked
317    %% even if mirrors confirm it. No need to check for length here.
318    maybe_flow_ack(Sender, Flow),
319    noreply(maybe_enqueue_message(Delivery, State));
320
321handle_cast({sync_start, Ref, Syncer},
322            State = #state { depth_delta         = DD,
323                             backing_queue       = BQ,
324                             backing_queue_state = BQS }) ->
325    State1 = #state{rate_timer_ref = TRef} = ensure_rate_timer(State),
326    S = fun({MA, TRefN, BQSN}) ->
327                State1#state{depth_delta         = undefined,
328                             msg_id_ack          = maps:from_list(MA),
329                             rate_timer_ref      = TRefN,
330                             backing_queue_state = BQSN}
331        end,
332    case rabbit_mirror_queue_sync:slave(
333           DD, Ref, TRef, Syncer, BQ, BQS,
334           fun (BQN, BQSN) ->
335                   BQSN1 = update_ram_duration(BQN, BQSN),
336                   TRefN = rabbit_misc:send_after(?RAM_DURATION_UPDATE_INTERVAL,
337                                                  self(), update_ram_duration),
338                   {TRefN, BQSN1}
339           end) of
340        denied              -> noreply(State1);
341        {ok,           Res} -> noreply(set_delta(0, S(Res)));
342        {failed,       Res} -> noreply(S(Res));
343        {stop, Reason, Res} -> {stop, Reason, S(Res)}
344    end;
345
346handle_cast({set_maximum_since_use, Age}, State) ->
347    ok = file_handle_cache:set_maximum_since_use(Age),
348    noreply(State);
349
350handle_cast({set_ram_duration_target, Duration},
351            State = #state { backing_queue       = BQ,
352                             backing_queue_state = BQS }) ->
353    BQS1 = BQ:set_ram_duration_target(Duration, BQS),
354    noreply(State #state { backing_queue_state = BQS1 });
355
356handle_cast(policy_changed, State) ->
357    %% During partial partitions, we might end up receiving messages expected by a master
358    %% Ignore them
359    noreply(State).
360
361handle_info(update_ram_duration, State = #state{backing_queue       = BQ,
362                                                backing_queue_state = BQS}) ->
363    BQS1 = update_ram_duration(BQ, BQS),
364    %% Don't call noreply/1, we don't want to set timers
365    {State1, Timeout} = next_state(State #state {
366                                     rate_timer_ref      = undefined,
367                                     backing_queue_state = BQS1 }),
368    {noreply, State1, Timeout};
369
370handle_info(sync_timeout, State) ->
371    noreply(backing_queue_timeout(
372              State #state { sync_timer_ref = undefined }));
373
374handle_info(timeout, State) ->
375    noreply(backing_queue_timeout(State));
376
377handle_info({'DOWN', _MonitorRef, process, ChPid, _Reason}, State) ->
378    local_sender_death(ChPid, State),
379    noreply(maybe_forget_sender(ChPid, down_from_ch, State));
380
381handle_info({'EXIT', _Pid, Reason}, State) ->
382    {stop, Reason, State};
383
384handle_info({bump_credit, Msg}, State) ->
385    credit_flow:handle_bump_msg(Msg),
386    noreply(State);
387
388handle_info(bump_reduce_memory_use, State = #state{backing_queue       = BQ,
389                                                backing_queue_state = BQS}) ->
390    BQS1 = BQ:handle_info(bump_reduce_memory_use, BQS),
391    BQS2 = BQ:resume(BQS1),
392    noreply(State#state{
393        backing_queue_state = BQS2
394    });
395
396%% In the event of a short partition during sync we can detect the
397%% master's 'death', drop out of sync, and then receive sync messages
398%% which were still in flight. Ignore them.
399handle_info({sync_msg, _Ref, _Msg, _Props, _Unacked}, State) ->
400    noreply(State);
401
402handle_info({sync_complete, _Ref}, State) ->
403    noreply(State);
404
405handle_info(Msg, State) ->
406    {stop, {unexpected_info, Msg}, State}.
407
408terminate(_Reason, {not_started, _Q}) ->
409    ok;
410terminate(_Reason, #state { backing_queue_state = undefined }) ->
411    %% We've received a delete_and_terminate from gm, thus nothing to
412    %% do here.
413    ok;
414terminate({shutdown, dropped} = R, State = #state{backing_queue       = BQ,
415                                                  backing_queue_state = BQS}) ->
416    %% See rabbit_mirror_queue_master:terminate/2
417    terminate_common(State),
418    BQ:delete_and_terminate(R, BQS);
419terminate(shutdown, State) ->
420    terminate_shutdown(shutdown, State);
421terminate({shutdown, _} = R, State) ->
422    terminate_shutdown(R, State);
423terminate(Reason, State = #state{backing_queue       = BQ,
424                                 backing_queue_state = BQS}) ->
425    terminate_common(State),
426    BQ:delete_and_terminate(Reason, BQS).
427
428%% If the Reason is shutdown, or {shutdown, _}, it is not the queue
429%% being deleted: it's just the node going down. Even though we're a
430%% mirror, we have no idea whether or not we'll be the only copy coming
431%% back up. Thus we must assume we will be, and preserve anything we
432%% have on disk.
433terminate_shutdown(Reason, State = #state{backing_queue       = BQ,
434                                          backing_queue_state = BQS}) ->
435    terminate_common(State),
436    BQ:terminate(Reason, BQS).
437
438terminate_common(State) ->
439    ok = rabbit_memory_monitor:deregister(self()),
440    stop_rate_timer(stop_sync_timer(State)).
441
442code_change(_OldVsn, State, _Extra) ->
443    {ok, State}.
444
445handle_pre_hibernate({not_started, _Q} = State) ->
446    {hibernate, State};
447
448handle_pre_hibernate(State = #state { backing_queue       = BQ,
449                                      backing_queue_state = BQS }) ->
450    {RamDuration, BQS1} = BQ:ram_duration(BQS),
451    DesiredDuration =
452        rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
453    BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
454    BQS3 = BQ:handle_pre_hibernate(BQS2),
455    {hibernate, stop_rate_timer(State #state { backing_queue_state = BQS3 })}.
456
457prioritise_call(Msg, _From, _Len, _State) ->
458    case Msg of
459        info                                 -> 9;
460        {gm_deaths, _Dead}                   -> 5;
461        _                                    -> 0
462    end.
463
464prioritise_cast(Msg, _Len, _State) ->
465    case Msg of
466        {set_ram_duration_target, _Duration} -> 8;
467        {set_maximum_since_use, _Age}        -> 8;
468        {run_backing_queue, _Mod, _Fun}      -> 6;
469        {gm, _Msg}                           -> 5;
470        _                                    -> 0
471    end.
472
473prioritise_info(Msg, _Len, _State) ->
474    case Msg of
475        update_ram_duration                  -> 8;
476        sync_timeout                         -> 6;
477        _                                    -> 0
478    end.
479
480format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
481
482%% ---------------------------------------------------------------------------
483%% GM
484%% ---------------------------------------------------------------------------
485
486-spec joined(args(), members()) -> callback_result().
487
488joined([SPid], _Members) -> SPid ! {joined, self()}, ok.
489
490-spec members_changed(args(), members(),members()) -> callback_result().
491
492members_changed([_SPid], _Births, []) ->
493    ok;
494members_changed([ SPid], _Births, Deaths) ->
495    case rabbit_misc:with_exit_handler(
496           rabbit_misc:const(ok),
497           fun() ->
498                   gen_server2:call(SPid, {gm_deaths, Deaths}, infinity)
499           end) of
500        ok              -> ok;
501        {promote, CPid} -> {become, rabbit_mirror_queue_coordinator, [CPid]}
502    end.
503
504-spec handle_msg(args(), pid(), any()) -> callback_result().
505
506handle_msg([_SPid], _From, hibernate_heartbeat) ->
507    %% See rabbit_mirror_queue_coordinator:handle_pre_hibernate/1
508    ok;
509handle_msg([_SPid], _From, request_depth) ->
510    %% This is only of value to the master
511    ok;
512handle_msg([_SPid], _From, {ensure_monitoring, _Pid}) ->
513    %% This is only of value to the master
514    ok;
515handle_msg([_SPid], _From, process_death) ->
516    %% We must not take any notice of the master death here since it
517    %% comes without ordering guarantees - there could still be
518    %% messages from the master we have yet to receive. When we get
519    %% members_changed, then there will be no more messages.
520    ok;
521handle_msg([CPid], _From, {delete_and_terminate, _Reason} = Msg) ->
522    ok = gen_server2:cast(CPid, {gm, Msg}),
523    {stop, {shutdown, ring_shutdown}};
524handle_msg([SPid], _From, {sync_start, Ref, Syncer, SPids}) ->
525    case lists:member(SPid, SPids) of
526        true  -> gen_server2:cast(SPid, {sync_start, Ref, Syncer});
527        false -> ok
528    end;
529handle_msg([SPid], _From, Msg) ->
530    ok = gen_server2:cast(SPid, {gm, Msg}).
531
532-spec handle_terminate(args(), term()) -> any().
533
534handle_terminate([_SPid], _Reason) ->
535    ok.
536
537%% ---------------------------------------------------------------------------
538%% Others
539%% ---------------------------------------------------------------------------
540
541infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
542
543i(pid, _State) ->
544    self();
545i(name, #state{q = Q}) when ?is_amqqueue(Q) ->
546    amqqueue:get_name(Q);
547i(master_pid, #state{q = Q}) when ?is_amqqueue(Q) ->
548    amqqueue:get_pid(Q);
549i(is_synchronised, #state{depth_delta = DD}) ->
550    DD =:= 0;
551i(_, _) ->
552    ''.
553
554bq_init(BQ, Q, Recover) ->
555    Self = self(),
556    BQ:init(Q, Recover,
557            fun (Mod, Fun) ->
558                    rabbit_amqqueue:run_backing_queue(Self, Mod, Fun)
559            end).
560
561run_backing_queue(rabbit_mirror_queue_master, Fun, State) ->
562    %% Yes, this might look a little crazy, but see comments in
563    %% confirm_sender_death/1
564    Fun(?MODULE, State);
565run_backing_queue(Mod, Fun, State = #state { backing_queue       = BQ,
566                                             backing_queue_state = BQS }) ->
567    State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }.
568
569%% This feature was used by `rabbit_amqqueue_process` and
570%% `rabbit_mirror_queue_slave` up-to and including RabbitMQ 3.7.x. It is
571%% unused in 3.8.x and thus deprecated. We keep it to support in-place
572%% upgrades to 3.8.x (i.e. mixed-version clusters), but it is a no-op
573%% starting with that version.
574send_mandatory(#delivery{mandatory  = false}) ->
575    ok;
576send_mandatory(#delivery{mandatory  = true,
577                         sender     = SenderPid,
578                         msg_seq_no = MsgSeqNo}) ->
579    gen_server2:cast(SenderPid, {mandatory_received, MsgSeqNo}).
580
581send_or_record_confirm(_, #delivery{ confirm = false }, MS, _State) ->
582    MS;
583send_or_record_confirm(published, #delivery { sender     = ChPid,
584                                              confirm    = true,
585                                              msg_seq_no = MsgSeqNo,
586                                              message    = #basic_message {
587                                                id            = MsgId,
588                                                is_persistent = true } },
589                       MS, #state{q = Q}) when ?amqqueue_is_durable(Q) ->
590    maps:put(MsgId, {published, ChPid, MsgSeqNo} , MS);
591send_or_record_confirm(_Status, #delivery { sender     = ChPid,
592                                            confirm    = true,
593                                            msg_seq_no = MsgSeqNo },
594                       MS, #state{q = Q} = _State) ->
595    ok = rabbit_classic_queue:confirm_to_sender(ChPid,
596                                                amqqueue:get_name(Q), [MsgSeqNo]),
597    MS.
598
599confirm_messages(MsgIds, State = #state{q = Q, msg_id_status = MS}) ->
600    QName = amqqueue:get_name(Q),
601    {CMs, MS1} =
602        lists:foldl(
603          fun (MsgId, {CMsN, MSN} = Acc) ->
604                  %% We will never see 'discarded' here
605                  case maps:find(MsgId, MSN) of
606                      error ->
607                          %% If it needed confirming, it'll have
608                          %% already been done.
609                          Acc;
610                      {ok, published} ->
611                          %% Still not seen it from the channel, just
612                          %% record that it's been confirmed.
613                          {CMsN, maps:put(MsgId, confirmed, MSN)};
614                      {ok, {published, ChPid, MsgSeqNo}} ->
615                          %% Seen from both GM and Channel. Can now
616                          %% confirm.
617                          {rabbit_misc:gb_trees_cons(ChPid, MsgSeqNo, CMsN),
618                           maps:remove(MsgId, MSN)};
619                      {ok, confirmed} ->
620                          %% It's already been confirmed. This is
621                          %% probably it's been both sync'd to disk
622                          %% and then delivered and ack'd before we've
623                          %% seen the publish from the
624                          %% channel. Nothing to do here.
625                          Acc
626                  end
627          end, {gb_trees:empty(), MS}, MsgIds),
628    Fun = fun (Pid, MsgSeqNos) ->
629                  rabbit_classic_queue:confirm_to_sender(Pid, QName, MsgSeqNos)
630          end,
631    rabbit_misc:gb_trees_foreach(Fun, CMs),
632    State #state { msg_id_status = MS1 }.
633
634handle_process_result({ok,   State}) -> noreply(State);
635handle_process_result({stop, State}) -> {stop, normal, State}.
636
637-spec promote_me({pid(), term()}, #state{}) -> no_return().
638
639promote_me(From, #state { q                   = Q0,
640                          gm                  = GM,
641                          backing_queue       = BQ,
642                          backing_queue_state = BQS,
643                          rate_timer_ref      = RateTRef,
644                          sender_queues       = SQ,
645                          msg_id_ack          = MA,
646                          msg_id_status       = MS,
647                          known_senders       = KS}) when ?is_amqqueue(Q0) ->
648    QName = amqqueue:get_name(Q0),
649    rabbit_mirror_queue_misc:log_info(QName, "Promoting mirror ~s to leader",
650                                      [rabbit_misc:pid_to_string(self())]),
651    Q1 = amqqueue:set_pid(Q0, self()),
652    DeathFun = rabbit_mirror_queue_master:sender_death_fun(),
653    DepthFun = rabbit_mirror_queue_master:depth_fun(),
654    {ok, CPid} = rabbit_mirror_queue_coordinator:start_link(Q1, GM, DeathFun, DepthFun),
655    true = unlink(GM),
656    gen_server2:reply(From, {promote, CPid}),
657
658    %% Everything that we're monitoring, we need to ensure our new
659    %% coordinator is monitoring.
660    MPids = pmon:monitored(KS),
661    ok = rabbit_mirror_queue_coordinator:ensure_monitoring(CPid, MPids),
662
663    %% We find all the messages that we've received from channels but
664    %% not from gm, and pass them to the
665    %% queue_process:init_with_backing_queue_state to be enqueued.
666    %%
667    %% We also have to requeue messages which are pending acks: the
668    %% consumers from the master queue have been lost and so these
669    %% messages need requeuing. They might also be pending
670    %% confirmation, and indeed they might also be pending arrival of
671    %% the publication from the channel itself, if we received both
672    %% the publication and the fetch via gm first! Requeuing doesn't
673    %% affect confirmations: if the message was previously pending a
674    %% confirmation then it still will be, under the same msg_id. So
675    %% as a master, we need to be prepared to filter out the
676    %% publication of said messages from the channel (is_duplicate
677    %% (thus such requeued messages must remain in the msg_id_status
678    %% (MS) which becomes seen_status (SS) in the master)).
679    %%
680    %% Then there are messages we already have in the queue, which are
681    %% not currently pending acknowledgement:
682    %% 1. Messages we've only received via gm:
683    %%    Filter out subsequent publication from channel through
684    %%    validate_message. Might have to issue confirms then or
685    %%    later, thus queue_process state will have to know that
686    %%    there's a pending confirm.
687    %% 2. Messages received via both gm and channel:
688    %%    Queue will have to deal with issuing confirms if necessary.
689    %%
690    %% MS contains the following three entry types:
691    %%
692    %% a) published:
693    %%   published via gm only; pending arrival of publication from
694    %%   channel, maybe pending confirm.
695    %%
696    %% b) {published, ChPid, MsgSeqNo}:
697    %%   published via gm and channel; pending confirm.
698    %%
699    %% c) confirmed:
700    %%   published via gm only, and confirmed; pending publication
701    %%   from channel.
702    %%
703    %% d) discarded:
704    %%   seen via gm only as discarded. Pending publication from
705    %%   channel
706    %%
707    %% The forms a, c and d only, need to go to the master state
708    %% seen_status (SS).
709    %%
710    %% The form b only, needs to go through to the queue_process
711    %% state to form the msg_id_to_channel mapping (MTC).
712    %%
713    %% No messages that are enqueued from SQ at this point will have
714    %% entries in MS.
715    %%
716    %% Messages that are extracted from MA may have entries in MS, and
717    %% those messages are then requeued. However, as discussed above,
718    %% this does not affect MS, nor which bits go through to SS in
719    %% Master, or MTC in queue_process.
720
721    St = [published, confirmed, discarded],
722    SS = maps:filter(fun (_MsgId, Status) -> lists:member(Status, St) end, MS),
723    AckTags = [AckTag || {_MsgId, AckTag} <- maps:to_list(MA)],
724
725    MasterState = rabbit_mirror_queue_master:promote_backing_queue_state(
726                    QName, CPid, BQ, BQS, GM, AckTags, SS, MPids),
727
728    MTC = maps:fold(fun (MsgId, {published, ChPid, MsgSeqNo}, MTC0) ->
729                            maps:put(MsgId, {ChPid, MsgSeqNo}, MTC0);
730                        (_Msgid, _Status, MTC0) ->
731                            MTC0
732                    end, #{}, MS),
733    Deliveries = [promote_delivery(Delivery) ||
734                   {_ChPid, {PubQ, _PendCh, _ChState}} <- maps:to_list(SQ),
735                   Delivery <- queue:to_list(PubQ)],
736    AwaitGmDown = [ChPid || {ChPid, {_, _, down_from_ch}} <- maps:to_list(SQ)],
737    KS1 = lists:foldl(fun (ChPid0, KS0) ->
738                              pmon:demonitor(ChPid0, KS0)
739                      end, KS, AwaitGmDown),
740    rabbit_misc:store_proc_name(rabbit_amqqueue_process, QName),
741    rabbit_amqqueue_process:init_with_backing_queue_state(
742      Q1, rabbit_mirror_queue_master, MasterState, RateTRef, Deliveries, KS1,
743      MTC).
744
745%% We reset mandatory to false here because we will have sent the
746%% mandatory_received already as soon as we got the message. We also
747%% need to send an ack for these messages since the channel is waiting
748%% for one for the via-GM case and we will not now receive one.
749promote_delivery(Delivery = #delivery{sender = Sender, flow = Flow}) ->
750    maybe_flow_ack(Sender, Flow),
751    Delivery#delivery{mandatory = false}.
752
753noreply(State) ->
754    {NewState, Timeout} = next_state(State),
755    {noreply, ensure_rate_timer(NewState), Timeout}.
756
757reply(Reply, State) ->
758    {NewState, Timeout} = next_state(State),
759    {reply, Reply, ensure_rate_timer(NewState), Timeout}.
760
761next_state(State = #state{backing_queue = BQ, backing_queue_state = BQS}) ->
762    {MsgIds, BQS1} = BQ:drain_confirmed(BQS),
763    State1 = confirm_messages(MsgIds,
764                              State #state { backing_queue_state = BQS1 }),
765    case BQ:needs_timeout(BQS1) of
766        false -> {stop_sync_timer(State1),   hibernate     };
767        idle  -> {stop_sync_timer(State1),   ?SYNC_INTERVAL};
768        timed -> {ensure_sync_timer(State1), 0             }
769    end.
770
771backing_queue_timeout(State = #state { backing_queue       = BQ,
772                                       backing_queue_state = BQS }) ->
773    State#state{backing_queue_state = BQ:timeout(BQS)}.
774
775ensure_sync_timer(State) ->
776    rabbit_misc:ensure_timer(State, #state.sync_timer_ref,
777                             ?SYNC_INTERVAL, sync_timeout).
778
779stop_sync_timer(State) -> rabbit_misc:stop_timer(State, #state.sync_timer_ref).
780
781ensure_rate_timer(State) ->
782    rabbit_misc:ensure_timer(State, #state.rate_timer_ref,
783                             ?RAM_DURATION_UPDATE_INTERVAL,
784                             update_ram_duration).
785
786stop_rate_timer(State) -> rabbit_misc:stop_timer(State, #state.rate_timer_ref).
787
788ensure_monitoring(ChPid, State = #state { known_senders = KS }) ->
789    State #state { known_senders = pmon:monitor(ChPid, KS) }.
790
791local_sender_death(ChPid, #state { known_senders = KS }) ->
792    %% The channel will be monitored iff we have received a delivery
793    %% from it but not heard about its death from the master. So if it
794    %% is monitored we need to point the death out to the master (see
795    %% essay).
796    ok = case pmon:is_monitored(ChPid, KS) of
797             false -> ok;
798             true  -> confirm_sender_death(ChPid)
799         end.
800
801confirm_sender_death(Pid) ->
802    %% We have to deal with the possibility that we'll be promoted to
803    %% master before this thing gets run. Consequently we set the
804    %% module to rabbit_mirror_queue_master so that if we do become a
805    %% rabbit_amqqueue_process before then, sane things will happen.
806    Fun =
807        fun (?MODULE, State = #state { known_senders = KS,
808                                       gm            = GM }) ->
809                %% We're running still as a mirror
810                %%
811                %% See comment in local_sender_death/2; we might have
812                %% received a sender_death in the meanwhile so check
813                %% again.
814                ok = case pmon:is_monitored(Pid, KS) of
815                         false -> ok;
816                         true  -> gm:broadcast(GM, {ensure_monitoring, [Pid]}),
817                                  confirm_sender_death(Pid)
818                     end,
819                State;
820            (rabbit_mirror_queue_master, State) ->
821                %% We've become a master. State is now opaque to
822                %% us. When we became master, if Pid was still known
823                %% to us then we'd have set up monitoring of it then,
824                %% so this is now a noop.
825                State
826        end,
827    %% Note that we do not remove our knowledge of this ChPid until we
828    %% get the sender_death from GM as well as a DOWN notification.
829    {ok, _TRef} = timer:apply_after(
830                    ?DEATH_TIMEOUT, rabbit_amqqueue, run_backing_queue,
831                    [self(), rabbit_mirror_queue_master, Fun]),
832    ok.
833
834forget_sender(_, running)                        -> false;
835forget_sender(down_from_gm, down_from_gm)        -> false; %% [1]
836forget_sender(down_from_ch, down_from_ch)        -> false;
837forget_sender(Down1, Down2) when Down1 =/= Down2 -> true.
838
839%% [1] If another mirror goes through confirm_sender_death/1 before we
840%% do we can get two GM sender_death messages in a row for the same
841%% channel - don't treat that as anything special.
842
843%% Record and process lifetime events from channels. Forget all about a channel
844%% only when down notifications are received from both the channel and from gm.
845maybe_forget_sender(ChPid, ChState, State = #state { sender_queues = SQ,
846                                                     msg_id_status = MS,
847                                                     known_senders = KS }) ->
848    case maps:find(ChPid, SQ) of
849        error ->
850            State;
851        {ok, {MQ, PendCh, ChStateRecord}} ->
852            case forget_sender(ChState, ChStateRecord) of
853                true ->
854                    credit_flow:peer_down(ChPid),
855                    State #state { sender_queues = maps:remove(ChPid, SQ),
856                                   msg_id_status = lists:foldl(
857                                                     fun maps:remove/2,
858                                                     MS, sets:to_list(PendCh)),
859                                   known_senders = pmon:demonitor(ChPid, KS) };
860                false ->
861                    SQ1 = maps:put(ChPid, {MQ, PendCh, ChState}, SQ),
862                    State #state { sender_queues = SQ1 }
863            end
864    end.
865
866maybe_enqueue_message(
867  Delivery = #delivery { message = #basic_message { id = MsgId },
868                         sender  = ChPid },
869  State = #state { sender_queues = SQ, msg_id_status = MS }) ->
870    send_mandatory(Delivery), %% must do this before confirms
871    State1 = ensure_monitoring(ChPid, State),
872    %% We will never see {published, ChPid, MsgSeqNo} here.
873    case maps:find(MsgId, MS) of
874        error ->
875            {MQ, PendingCh, ChState} = get_sender_queue(ChPid, SQ),
876            MQ1 = queue:in(Delivery, MQ),
877            SQ1 = maps:put(ChPid, {MQ1, PendingCh, ChState}, SQ),
878            State1 #state { sender_queues = SQ1 };
879        {ok, Status} ->
880            MS1 = send_or_record_confirm(
881                    Status, Delivery, maps:remove(MsgId, MS), State1),
882            SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
883            State1 #state { msg_id_status = MS1,
884                            sender_queues = SQ1 }
885    end.
886
887get_sender_queue(ChPid, SQ) ->
888    case maps:find(ChPid, SQ) of
889        error     -> {queue:new(), sets:new(), running};
890        {ok, Val} -> Val
891    end.
892
893remove_from_pending_ch(MsgId, ChPid, SQ) ->
894    case maps:find(ChPid, SQ) of
895        error ->
896            SQ;
897        {ok, {MQ, PendingCh, ChState}} ->
898            maps:put(ChPid, {MQ, sets:del_element(MsgId, PendingCh), ChState},
899                       SQ)
900    end.
901
902publish_or_discard(Status, ChPid, MsgId,
903                   State = #state { sender_queues = SQ, msg_id_status = MS }) ->
904    %% We really are going to do the publish/discard right now, even
905    %% though we may not have seen it directly from the channel. But
906    %% we cannot issue confirms until the latter has happened. So we
907    %% need to keep track of the MsgId and its confirmation status in
908    %% the meantime.
909    State1 = ensure_monitoring(ChPid, State),
910    {MQ, PendingCh, ChState} = get_sender_queue(ChPid, SQ),
911    {MQ1, PendingCh1, MS1} =
912        case queue:out(MQ) of
913            {empty, _MQ2} ->
914                {MQ, sets:add_element(MsgId, PendingCh),
915                 maps:put(MsgId, Status, MS)};
916            {{value, Delivery = #delivery {
917                       message = #basic_message { id = MsgId } }}, MQ2} ->
918                {MQ2, PendingCh,
919                 %% We received the msg from the channel first. Thus
920                 %% we need to deal with confirms here.
921                 send_or_record_confirm(Status, Delivery, MS, State1)};
922            {{value, #delivery {}}, _MQ2} ->
923                %% The instruction was sent to us before we were
924                %% within the slave_pids within the #amqqueue{}
925                %% record. We'll never receive the message directly
926                %% from the channel. And the channel will not be
927                %% expecting any confirms from us.
928                {MQ, PendingCh, MS}
929        end,
930    SQ1 = maps:put(ChPid, {MQ1, PendingCh1, ChState}, SQ),
931    State1 #state { sender_queues = SQ1, msg_id_status = MS1 }.
932
933
934process_instruction({publish, ChPid, Flow, MsgProps,
935                     Msg = #basic_message { id = MsgId }}, State) ->
936    maybe_flow_ack(ChPid, Flow),
937    State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
938        publish_or_discard(published, ChPid, MsgId, State),
939    BQS1 = BQ:publish(Msg, MsgProps, true, ChPid, Flow, BQS),
940    {ok, State1 #state { backing_queue_state = BQS1 }};
941process_instruction({batch_publish, ChPid, Flow, Publishes}, State) ->
942    maybe_flow_ack(ChPid, Flow),
943    State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
944        lists:foldl(fun ({#basic_message { id = MsgId },
945                          _MsgProps, _IsDelivered}, St) ->
946                            publish_or_discard(published, ChPid, MsgId, St)
947                    end, State, Publishes),
948    BQS1 = BQ:batch_publish(Publishes, ChPid, Flow, BQS),
949    {ok, State1 #state { backing_queue_state = BQS1 }};
950process_instruction({publish_delivered, ChPid, Flow, MsgProps,
951                     Msg = #basic_message { id = MsgId }}, State) ->
952    maybe_flow_ack(ChPid, Flow),
953    State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
954        publish_or_discard(published, ChPid, MsgId, State),
955    true = BQ:is_empty(BQS),
956    {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, Flow, BQS),
957    {ok, maybe_store_ack(true, MsgId, AckTag,
958                         State1 #state { backing_queue_state = BQS1 })};
959process_instruction({batch_publish_delivered, ChPid, Flow, Publishes}, State) ->
960    maybe_flow_ack(ChPid, Flow),
961    {MsgIds,
962     State1 = #state { backing_queue = BQ, backing_queue_state = BQS }} =
963        lists:foldl(fun ({#basic_message { id = MsgId }, _MsgProps},
964                         {MsgIds, St}) ->
965                            {[MsgId | MsgIds],
966                             publish_or_discard(published, ChPid, MsgId, St)}
967                    end, {[], State}, Publishes),
968    true = BQ:is_empty(BQS),
969    {AckTags, BQS1} = BQ:batch_publish_delivered(Publishes, ChPid, Flow, BQS),
970    MsgIdsAndAcks = lists:zip(lists:reverse(MsgIds), AckTags),
971    State2 = lists:foldl(
972               fun ({MsgId, AckTag}, St) ->
973                       maybe_store_ack(true, MsgId, AckTag, St)
974               end, State1 #state { backing_queue_state = BQS1 },
975               MsgIdsAndAcks),
976    {ok, State2};
977process_instruction({discard, ChPid, Flow, MsgId}, State) ->
978    maybe_flow_ack(ChPid, Flow),
979    State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
980        publish_or_discard(discarded, ChPid, MsgId, State),
981    BQS1 = BQ:discard(MsgId, ChPid, Flow, BQS),
982    {ok, State1 #state { backing_queue_state = BQS1 }};
983process_instruction({drop, Length, Dropped, AckRequired},
984                    State = #state { backing_queue       = BQ,
985                                     backing_queue_state = BQS }) ->
986    QLen = BQ:len(BQS),
987    ToDrop = case QLen - Length of
988                 N when N > 0 -> N;
989                 _            -> 0
990             end,
991    State1 = lists:foldl(
992               fun (const, StateN = #state{backing_queue_state = BQSN}) ->
993                       {{MsgId, AckTag}, BQSN1} = BQ:drop(AckRequired, BQSN),
994                       maybe_store_ack(
995                         AckRequired, MsgId, AckTag,
996                         StateN #state { backing_queue_state = BQSN1 })
997               end, State, lists:duplicate(ToDrop, const)),
998    {ok, case AckRequired of
999             true  -> State1;
1000             false -> update_delta(ToDrop - Dropped, State1)
1001         end};
1002process_instruction({ack, MsgIds},
1003                    State = #state { backing_queue       = BQ,
1004                                     backing_queue_state = BQS,
1005                                     msg_id_ack          = MA }) ->
1006    {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA),
1007    {MsgIds1, BQS1} = BQ:ack(AckTags, BQS),
1008    [] = MsgIds1 -- MsgIds, %% ASSERTION
1009    {ok, update_delta(length(MsgIds1) - length(MsgIds),
1010                      State #state { msg_id_ack          = MA1,
1011                                     backing_queue_state = BQS1 })};
1012process_instruction({requeue, MsgIds},
1013                    State = #state { backing_queue       = BQ,
1014                                     backing_queue_state = BQS,
1015                                     msg_id_ack          = MA }) ->
1016    {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA),
1017    {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
1018    {ok, State #state { msg_id_ack          = MA1,
1019                        backing_queue_state = BQS1 }};
1020process_instruction({sender_death, ChPid},
1021                    State = #state { known_senders = KS }) ->
1022    %% The channel will be monitored iff we have received a message
1023    %% from it. In this case we just want to avoid doing work if we
1024    %% never got any messages.
1025    {ok, case pmon:is_monitored(ChPid, KS) of
1026             false -> State;
1027             true  -> maybe_forget_sender(ChPid, down_from_gm, State)
1028         end};
1029process_instruction({depth, Depth},
1030                    State = #state { backing_queue       = BQ,
1031                                     backing_queue_state = BQS }) ->
1032    {ok, set_delta(Depth - BQ:depth(BQS), State)};
1033
1034process_instruction({delete_and_terminate, Reason},
1035                    State = #state { backing_queue       = BQ,
1036                                     backing_queue_state = BQS }) ->
1037    BQ:delete_and_terminate(Reason, BQS),
1038    {stop, State #state { backing_queue_state = undefined }};
1039process_instruction({set_queue_mode, Mode},
1040                    State = #state { backing_queue       = BQ,
1041                                     backing_queue_state = BQS }) ->
1042    BQS1 = BQ:set_queue_mode(Mode, BQS),
1043    {ok, State #state { backing_queue_state = BQS1 }}.
1044
1045maybe_flow_ack(Sender, flow)    -> credit_flow:ack(Sender);
1046maybe_flow_ack(_Sender, noflow) -> ok.
1047
1048msg_ids_to_acktags(MsgIds, MA) ->
1049    {AckTags, MA1} =
1050        lists:foldl(
1051          fun (MsgId, {Acc, MAN}) ->
1052                  case maps:find(MsgId, MA) of
1053                      error        -> {Acc, MAN};
1054                      {ok, AckTag} -> {[AckTag | Acc], maps:remove(MsgId, MAN)}
1055                  end
1056          end, {[], MA}, MsgIds),
1057    {lists:reverse(AckTags), MA1}.
1058
1059maybe_store_ack(false, _MsgId, _AckTag, State) ->
1060    State;
1061maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA }) ->
1062    State #state { msg_id_ack = maps:put(MsgId, AckTag, MA) }.
1063
1064set_delta(0,        State = #state { depth_delta = undefined }) ->
1065    ok = record_synchronised(State#state.q),
1066    State #state { depth_delta = 0 };
1067set_delta(NewDelta, State = #state { depth_delta = undefined }) ->
1068    true = NewDelta > 0, %% assertion
1069    State #state { depth_delta = NewDelta };
1070set_delta(NewDelta, State = #state { depth_delta = Delta     }) ->
1071    update_delta(NewDelta - Delta, State).
1072
1073update_delta(_DeltaChange, State = #state { depth_delta = undefined }) ->
1074    State;
1075update_delta( DeltaChange, State = #state { depth_delta = 0         }) ->
1076    0 = DeltaChange, %% assertion: we cannot become unsync'ed
1077    State;
1078update_delta( DeltaChange, State = #state { depth_delta = Delta     }) ->
1079    true = DeltaChange =< 0, %% assertion: we cannot become 'less' sync'ed
1080    set_delta(Delta + DeltaChange, State #state { depth_delta = undefined }).
1081
1082update_ram_duration(BQ, BQS) ->
1083    {RamDuration, BQS1} = BQ:ram_duration(BQS),
1084    DesiredDuration =
1085        rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
1086    BQ:set_ram_duration_target(DesiredDuration, BQS1).
1087
1088record_synchronised(Q0) when ?is_amqqueue(Q0) ->
1089    QName = amqqueue:get_name(Q0),
1090    Self = self(),
1091    F = fun () ->
1092            case mnesia:read({rabbit_queue, QName}) of
1093                [] ->
1094                    ok;
1095                [Q1] when ?is_amqqueue(Q1) ->
1096                    SSPids = amqqueue:get_sync_slave_pids(Q1),
1097                    SSPids1 = [Self | SSPids],
1098                    Q2 = amqqueue:set_sync_slave_pids(Q1, SSPids1),
1099                    rabbit_mirror_queue_misc:store_updated_slaves(Q2),
1100                    {ok, Q2}
1101            end
1102        end,
1103    case rabbit_misc:execute_mnesia_transaction(F) of
1104        ok -> ok;
1105        {ok, Q2} -> rabbit_mirror_queue_misc:maybe_drop_master_after_sync(Q2)
1106    end.
1107