1%% This Source Code Form is subject to the terms of the Mozilla Public
2%% License, v. 2.0. If a copy of the MPL was not distributed with this
3%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4%%
5%% Copyright (c) 2010-2021 VMware, Inc. or its affiliates.  All rights reserved.
6%%
7
8-module(rabbit_mirror_queue_master).
9
10-export([init/3, terminate/2, delete_and_terminate/2,
11         purge/1, purge_acks/1, publish/6, publish_delivered/5,
12         batch_publish/4, batch_publish_delivered/4,
13         discard/4, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3,
14         len/1, is_empty/1, depth/1, drain_confirmed/1,
15         dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1,
16         needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1,
17         msg_rates/1, info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
18         zip_msgs_and_acks/4, handle_info/2]).
19
20-export([start/2, stop/1, delete_crashed/1]).
21
22-export([promote_backing_queue_state/8, sender_death_fun/0, depth_fun/0]).
23
24-export([init_with_existing_bq/3, stop_mirroring/1, sync_mirrors/3]).
25
26-behaviour(rabbit_backing_queue).
27
28-include_lib("rabbit_common/include/rabbit.hrl").
29-include("amqqueue.hrl").
30
31-record(state, { name,
32                 gm,
33                 coordinator,
34                 backing_queue,
35                 backing_queue_state,
36                 seen_status,
37                 confirmed,
38                 known_senders,
39                 wait_timeout
40               }).
41
42-export_type([death_fun/0, depth_fun/0, stats_fun/0]).
43
44-type death_fun() :: fun ((pid()) -> 'ok').
45-type depth_fun() :: fun (() -> 'ok').
46-type stats_fun() :: fun ((any()) -> 'ok').
47-type master_state() :: #state { name                :: rabbit_amqqueue:name(),
48                                 gm                  :: pid(),
49                                 coordinator         :: pid(),
50                                 backing_queue       :: atom(),
51                                 backing_queue_state :: any(),
52                                 seen_status         :: map(),
53                                 confirmed           :: [rabbit_guid:guid()],
54                                 known_senders       :: sets:set()
55                               }.
56
57%% For general documentation of HA design, see
58%% rabbit_mirror_queue_coordinator
59
60%% ---------------------------------------------------------------------------
61%% Backing queue
62%% ---------------------------------------------------------------------------
63
64-spec start(_, _) -> no_return().
65start(_Vhost, _DurableQueues) ->
66    %% This will never get called as this module will never be
67    %% installed as the default BQ implementation.
68    exit({not_valid_for_generic_backing_queue, ?MODULE}).
69
70-spec stop(_) -> no_return().
71stop(_Vhost) ->
72    %% Same as start/1.
73    exit({not_valid_for_generic_backing_queue, ?MODULE}).
74
75-spec delete_crashed(_) -> no_return().
76delete_crashed(_QName) ->
77    exit({not_valid_for_generic_backing_queue, ?MODULE}).
78
79init(Q, Recover, AsyncCallback) ->
80    {ok, BQ} = application:get_env(backing_queue_module),
81    BQS = BQ:init(Q, Recover, AsyncCallback),
82    State = #state{gm = GM} = init_with_existing_bq(Q, BQ, BQS),
83    ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}),
84    State.
85
86-spec init_with_existing_bq(amqqueue:amqqueue(), atom(), any()) ->
87          master_state().
88
89init_with_existing_bq(Q0, BQ, BQS) when ?is_amqqueue(Q0) ->
90    QName = amqqueue:get_name(Q0),
91    case rabbit_mirror_queue_coordinator:start_link(
92       Q0, undefined, sender_death_fun(), depth_fun()) of
93    {ok, CPid} ->
94        GM = rabbit_mirror_queue_coordinator:get_gm(CPid),
95        Self = self(),
96        Fun = fun () ->
97                  [Q1] = mnesia:read({rabbit_queue, QName}),
98                  true = amqqueue:is_amqqueue(Q1),
99                  GMPids0 = amqqueue:get_gm_pids(Q1),
100                  GMPids1 = [{GM, Self} | GMPids0],
101                  Q2 = amqqueue:set_gm_pids(Q1, GMPids1),
102                  Q3 = amqqueue:set_state(Q2, live),
103                  %% amqqueue migration:
104                  %% The amqqueue was read from this transaction, no
105                  %% need to handle migration.
106                  ok = rabbit_amqqueue:store_queue(Q3)
107              end,
108        ok = rabbit_misc:execute_mnesia_transaction(Fun),
109        {_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q0),
110        %% We need synchronous add here (i.e. do not return until the
111        %% mirror is running) so that when queue declaration is finished
112        %% all mirrors are up; we don't want to end up with unsynced mirrors
113        %% just by declaring a new queue. But add can't be synchronous all
114        %% the time as it can be called by mirrors and that's
115        %% deadlock-prone.
116        rabbit_mirror_queue_misc:add_mirrors(QName, SNodes, sync),
117        #state{name                = QName,
118               gm                  = GM,
119               coordinator         = CPid,
120               backing_queue       = BQ,
121               backing_queue_state = BQS,
122               seen_status         = #{},
123               confirmed           = [],
124               known_senders       = sets:new(),
125               wait_timeout        = rabbit_misc:get_env(rabbit, slave_wait_timeout, 15000)};
126    {error, Reason} ->
127        %% The GM can shutdown before the coordinator has started up
128        %% (lost membership or missing group), thus the start_link of
129        %% the coordinator returns {error, shutdown} as rabbit_amqqueue_process
130        % is trapping exists
131        throw({coordinator_not_started, Reason})
132    end.
133
134-spec stop_mirroring(master_state()) -> {atom(), any()}.
135
136stop_mirroring(State = #state { coordinator         = CPid,
137                                backing_queue       = BQ,
138                                backing_queue_state = BQS }) ->
139    unlink(CPid),
140    stop_all_slaves(shutdown, State),
141    {BQ, BQS}.
142
143-spec sync_mirrors(stats_fun(), stats_fun(), master_state()) ->
144          {'ok', master_state()} | {stop, any(), master_state()}.
145
146sync_mirrors(HandleInfo, EmitStats,
147             State = #state { name                = QName,
148                              gm                  = GM,
149                              backing_queue       = BQ,
150                              backing_queue_state = BQS }) ->
151    Log = fun (Fmt, Params) ->
152                  rabbit_mirror_queue_misc:log_info(
153                    QName, "Synchronising: " ++ Fmt ++ "", Params)
154          end,
155    Log("~p messages to synchronise", [BQ:len(BQS)]),
156    {ok, Q} = rabbit_amqqueue:lookup(QName),
157    SPids = amqqueue:get_slave_pids(Q),
158    SyncBatchSize = rabbit_mirror_queue_misc:sync_batch_size(Q),
159    Log("batch size: ~p", [SyncBatchSize]),
160    Ref = make_ref(),
161    Syncer = rabbit_mirror_queue_sync:master_prepare(Ref, QName, Log, SPids),
162    gm:broadcast(GM, {sync_start, Ref, Syncer, SPids}),
163    S = fun(BQSN) -> State#state{backing_queue_state = BQSN} end,
164    case rabbit_mirror_queue_sync:master_go(
165           Syncer, Ref, Log, HandleInfo, EmitStats, SyncBatchSize, BQ, BQS) of
166        {cancelled, BQS1}      -> Log(" synchronisation cancelled ", []),
167                                  {ok, S(BQS1)};
168        {shutdown,  R, BQS1}   -> {stop, R, S(BQS1)};
169        {sync_died, R, BQS1}   -> Log("~p", [R]),
170                                  {ok, S(BQS1)};
171        {already_synced, BQS1} -> {ok, S(BQS1)};
172        {ok, BQS1}             -> Log("complete", []),
173                                  {ok, S(BQS1)}
174    end.
175
176terminate({shutdown, dropped} = Reason,
177          State = #state { backing_queue       = BQ,
178                           backing_queue_state = BQS }) ->
179    %% Backing queue termination - this node has been explicitly
180    %% dropped. Normally, non-durable queues would be tidied up on
181    %% startup, but there's a possibility that we will be added back
182    %% in without this node being restarted. Thus we must do the full
183    %% blown delete_and_terminate now, but only locally: we do not
184    %% broadcast delete_and_terminate.
185    State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)};
186
187terminate(Reason,
188          State = #state { name                = QName,
189                           backing_queue       = BQ,
190                           backing_queue_state = BQS }) ->
191    %% Backing queue termination. The queue is going down but
192    %% shouldn't be deleted. Most likely safe shutdown of this
193    %% node.
194    {ok, Q} = rabbit_amqqueue:lookup(QName),
195    SSPids = amqqueue:get_sync_slave_pids(Q),
196    case SSPids =:= [] andalso
197        rabbit_policy:get(<<"ha-promote-on-shutdown">>, Q) =/= <<"always">> of
198        true  -> %% Remove the whole queue to avoid data loss
199                 rabbit_mirror_queue_misc:log_warning(
200                   QName, "Stopping all nodes on master shutdown since no "
201                   "synchronised mirror (replica) is available", []),
202                 stop_all_slaves(Reason, State);
203        false -> %% Just let some other mirror take over.
204                 ok
205    end,
206    State #state { backing_queue_state = BQ:terminate(Reason, BQS) }.
207
208delete_and_terminate(Reason, State = #state { backing_queue       = BQ,
209                                              backing_queue_state = BQS }) ->
210    stop_all_slaves(Reason, State),
211    State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)}.
212
213stop_all_slaves(Reason, #state{name = QName, gm = GM, wait_timeout = WT}) ->
214    {ok, Q} = rabbit_amqqueue:lookup(QName),
215    SPids = amqqueue:get_slave_pids(Q),
216    rabbit_mirror_queue_misc:stop_all_slaves(Reason, SPids, QName, GM, WT).
217
218purge(State = #state { gm                  = GM,
219                       backing_queue       = BQ,
220                       backing_queue_state = BQS }) ->
221    ok = gm:broadcast(GM, {drop, 0, BQ:len(BQS), false}),
222    {Count, BQS1} = BQ:purge(BQS),
223    {Count, State #state { backing_queue_state = BQS1 }}.
224
225-spec purge_acks(_) -> no_return().
226purge_acks(_State) -> exit({not_implemented, {?MODULE, purge_acks}}).
227
228publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid, Flow,
229        State = #state { gm                  = GM,
230                         seen_status         = SS,
231                         backing_queue       = BQ,
232                         backing_queue_state = BQS }) ->
233    false = maps:is_key(MsgId, SS), %% ASSERTION
234    ok = gm:broadcast(GM, {publish, ChPid, Flow, MsgProps, Msg},
235                      rabbit_basic:msg_size(Msg)),
236    BQS1 = BQ:publish(Msg, MsgProps, IsDelivered, ChPid, Flow, BQS),
237    ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }).
238
239batch_publish(Publishes, ChPid, Flow,
240              State = #state { gm                  = GM,
241                               seen_status         = SS,
242                               backing_queue       = BQ,
243                               backing_queue_state = BQS }) ->
244    {Publishes1, false, MsgSizes} =
245        lists:foldl(fun ({Msg = #basic_message { id = MsgId },
246                          MsgProps, _IsDelivered}, {Pubs, false, Sizes}) ->
247                            {[{Msg, MsgProps, true} | Pubs], %% [0]
248                             false = maps:is_key(MsgId, SS), %% ASSERTION
249                             Sizes + rabbit_basic:msg_size(Msg)}
250                    end, {[], false, 0}, Publishes),
251    Publishes2 = lists:reverse(Publishes1),
252    ok = gm:broadcast(GM, {batch_publish, ChPid, Flow, Publishes2},
253                      MsgSizes),
254    BQS1 = BQ:batch_publish(Publishes2, ChPid, Flow, BQS),
255    ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }).
256%% [0] When the mirror process handles the publish command, it sets the
257%% IsDelivered flag to true, so to avoid iterating over the messages
258%% again at the mirror, we do it here.
259
260publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps,
261                  ChPid, Flow, State = #state { gm                  = GM,
262                                                seen_status         = SS,
263                                                backing_queue       = BQ,
264                                                backing_queue_state = BQS }) ->
265    false = maps:is_key(MsgId, SS), %% ASSERTION
266    ok = gm:broadcast(GM, {publish_delivered, ChPid, Flow, MsgProps, Msg},
267                      rabbit_basic:msg_size(Msg)),
268    {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, Flow, BQS),
269    State1 = State #state { backing_queue_state = BQS1 },
270    {AckTag, ensure_monitoring(ChPid, State1)}.
271
272batch_publish_delivered(Publishes, ChPid, Flow,
273                        State = #state { gm                  = GM,
274                                         seen_status         = SS,
275                                         backing_queue       = BQ,
276                                         backing_queue_state = BQS }) ->
277    {false, MsgSizes} =
278        lists:foldl(fun ({Msg = #basic_message { id = MsgId }, _MsgProps},
279                         {false, Sizes}) ->
280                            {false = maps:is_key(MsgId, SS), %% ASSERTION
281                             Sizes + rabbit_basic:msg_size(Msg)}
282                    end, {false, 0}, Publishes),
283    ok = gm:broadcast(GM, {batch_publish_delivered, ChPid, Flow, Publishes},
284                      MsgSizes),
285    {AckTags, BQS1} = BQ:batch_publish_delivered(Publishes, ChPid, Flow, BQS),
286    State1 = State #state { backing_queue_state = BQS1 },
287    {AckTags, ensure_monitoring(ChPid, State1)}.
288
289discard(MsgId, ChPid, Flow, State = #state { gm                  = GM,
290                                             backing_queue       = BQ,
291                                             backing_queue_state = BQS,
292                                             seen_status         = SS }) ->
293    false = maps:is_key(MsgId, SS), %% ASSERTION
294    ok = gm:broadcast(GM, {discard, ChPid, Flow, MsgId}),
295    ensure_monitoring(ChPid,
296                      State #state { backing_queue_state =
297                                         BQ:discard(MsgId, ChPid, Flow, BQS) }).
298
299dropwhile(Pred, State = #state{backing_queue       = BQ,
300                               backing_queue_state = BQS }) ->
301    Len  = BQ:len(BQS),
302    {Next, BQS1} = BQ:dropwhile(Pred, BQS),
303    {Next, drop(Len, false, State #state { backing_queue_state = BQS1 })}.
304
305fetchwhile(Pred, Fun, Acc, State = #state{backing_queue       = BQ,
306                                          backing_queue_state = BQS }) ->
307    Len  = BQ:len(BQS),
308    {Next, Acc1, BQS1} = BQ:fetchwhile(Pred, Fun, Acc, BQS),
309    {Next, Acc1, drop(Len, true, State #state { backing_queue_state = BQS1 })}.
310
311drain_confirmed(State = #state { backing_queue       = BQ,
312                                 backing_queue_state = BQS,
313                                 seen_status         = SS,
314                                 confirmed           = Confirmed }) ->
315    {MsgIds, BQS1} = BQ:drain_confirmed(BQS),
316    {MsgIds1, SS1} =
317        lists:foldl(
318          fun (MsgId, {MsgIdsN, SSN}) ->
319                  %% We will never see 'discarded' here
320                  case maps:find(MsgId, SSN) of
321                      error ->
322                          {[MsgId | MsgIdsN], SSN};
323                      {ok, published} ->
324                          %% It was published when we were a mirror,
325                          %% and we were promoted before we saw the
326                          %% publish from the channel. We still
327                          %% haven't seen the channel publish, and
328                          %% consequently we need to filter out the
329                          %% confirm here. We will issue the confirm
330                          %% when we see the publish from the channel.
331                          {MsgIdsN, maps:put(MsgId, confirmed, SSN)};
332                      {ok, confirmed} ->
333                          %% Well, confirms are racy by definition.
334                          {[MsgId | MsgIdsN], SSN}
335                  end
336          end, {[], SS}, MsgIds),
337    {Confirmed ++ MsgIds1, State #state { backing_queue_state = BQS1,
338                                          seen_status         = SS1,
339                                          confirmed           = [] }}.
340
341fetch(AckRequired, State = #state { backing_queue       = BQ,
342                                    backing_queue_state = BQS }) ->
343    {Result, BQS1} = BQ:fetch(AckRequired, BQS),
344    State1 = State #state { backing_queue_state = BQS1 },
345    {Result, case Result of
346                 empty                          -> State1;
347                 {_MsgId, _IsDelivered, _AckTag} -> drop_one(AckRequired, State1)
348             end}.
349
350drop(AckRequired, State = #state { backing_queue       = BQ,
351                                   backing_queue_state = BQS }) ->
352    {Result, BQS1} = BQ:drop(AckRequired, BQS),
353    State1 = State #state { backing_queue_state = BQS1 },
354    {Result, case Result of
355                 empty            -> State1;
356                 {_MsgId, _AckTag} -> drop_one(AckRequired, State1)
357             end}.
358
359ack(AckTags, State = #state { gm                  = GM,
360                              backing_queue       = BQ,
361                              backing_queue_state = BQS }) ->
362    {MsgIds, BQS1} = BQ:ack(AckTags, BQS),
363    case MsgIds of
364        [] -> ok;
365        _  -> ok = gm:broadcast(GM, {ack, MsgIds})
366    end,
367    {MsgIds, State #state { backing_queue_state = BQS1 }}.
368
369requeue(AckTags, State = #state { gm                  = GM,
370                                  backing_queue       = BQ,
371                                  backing_queue_state = BQS }) ->
372    {MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
373    ok = gm:broadcast(GM, {requeue, MsgIds}),
374    {MsgIds, State #state { backing_queue_state = BQS1 }}.
375
376ackfold(MsgFun, Acc, State = #state { backing_queue       = BQ,
377                                      backing_queue_state = BQS }, AckTags) ->
378    {Acc1, BQS1} = BQ:ackfold(MsgFun, Acc, BQS, AckTags),
379    {Acc1, State #state { backing_queue_state =  BQS1 }}.
380
381fold(Fun, Acc, State = #state { backing_queue = BQ,
382                                backing_queue_state = BQS }) ->
383    {Result, BQS1} = BQ:fold(Fun, Acc, BQS),
384    {Result, State #state { backing_queue_state = BQS1 }}.
385
386len(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
387    BQ:len(BQS).
388
389is_empty(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
390    BQ:is_empty(BQS).
391
392depth(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
393    BQ:depth(BQS).
394
395set_ram_duration_target(Target, State = #state { backing_queue       = BQ,
396                                                 backing_queue_state = BQS }) ->
397    State #state { backing_queue_state =
398                       BQ:set_ram_duration_target(Target, BQS) }.
399
400ram_duration(State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
401    {Result, BQS1} = BQ:ram_duration(BQS),
402    {Result, State #state { backing_queue_state = BQS1 }}.
403
404needs_timeout(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
405    BQ:needs_timeout(BQS).
406
407timeout(State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
408    State #state { backing_queue_state = BQ:timeout(BQS) }.
409
410handle_pre_hibernate(State = #state { backing_queue       = BQ,
411                                      backing_queue_state = BQS }) ->
412    State #state { backing_queue_state = BQ:handle_pre_hibernate(BQS) }.
413
414handle_info(Msg, State = #state { backing_queue       = BQ,
415                                  backing_queue_state = BQS }) ->
416    State #state { backing_queue_state = BQ:handle_info(Msg, BQS) }.
417
418resume(State = #state { backing_queue       = BQ,
419                        backing_queue_state = BQS }) ->
420    State #state { backing_queue_state = BQ:resume(BQS) }.
421
422msg_rates(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
423    BQ:msg_rates(BQS).
424
425info(backing_queue_status,
426     State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
427    BQ:info(backing_queue_status, BQS) ++
428        [ {mirror_seen,    maps:size(State #state.seen_status)},
429          {mirror_senders, sets:size(State #state.known_senders)} ];
430info(Item, #state { backing_queue = BQ, backing_queue_state = BQS }) ->
431    BQ:info(Item, BQS).
432
433invoke(?MODULE, Fun, State) ->
434    Fun(?MODULE, State);
435invoke(Mod, Fun, State = #state { backing_queue       = BQ,
436                                  backing_queue_state = BQS }) ->
437    State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }.
438
439is_duplicate(Message = #basic_message { id = MsgId },
440             State = #state { seen_status         = SS,
441                              backing_queue       = BQ,
442                              backing_queue_state = BQS,
443                              confirmed           = Confirmed }) ->
444    %% Here, we need to deal with the possibility that we're about to
445    %% receive a message that we've already seen when we were a mirror
446    %% (we received it via gm). Thus if we do receive such message now
447    %% via the channel, there may be a confirm waiting to issue for
448    %% it.
449
450    %% We will never see {published, ChPid, MsgSeqNo} here.
451    case maps:find(MsgId, SS) of
452        error ->
453            %% We permit the underlying BQ to have a peek at it, but
454            %% only if we ourselves are not filtering out the msg.
455            {Result, BQS1} = BQ:is_duplicate(Message, BQS),
456            {Result, State #state { backing_queue_state = BQS1 }};
457        {ok, published} ->
458            %% It already got published when we were a mirror and no
459            %% confirmation is waiting. amqqueue_process will have, in
460            %% its msg_id_to_channel mapping, the entry for dealing
461            %% with the confirm when that comes back in (it's added
462            %% immediately after calling is_duplicate). The msg is
463            %% invalid. We will not see this again, nor will we be
464            %% further involved in confirming this message, so erase.
465            {{true, drop}, State #state { seen_status = maps:remove(MsgId, SS) }};
466        {ok, Disposition}
467          when Disposition =:= confirmed
468            %% It got published when we were a mirror via gm, and
469            %% confirmed some time after that (maybe even after
470            %% promotion), but before we received the publish from the
471            %% channel, so couldn't previously know what the
472            %% msg_seq_no was (and thus confirm as a mirror). So we
473            %% need to confirm now. As above, amqqueue_process will
474            %% have the entry for the msg_id_to_channel mapping added
475            %% immediately after calling is_duplicate/2.
476          orelse Disposition =:= discarded ->
477            %% Message was discarded while we were a mirror. Confirm now.
478            %% As above, amqqueue_process will have the entry for the
479            %% msg_id_to_channel mapping.
480            {{true, drop}, State #state { seen_status = maps:remove(MsgId, SS),
481                                          confirmed = [MsgId | Confirmed] }}
482    end.
483
484set_queue_mode(Mode, State = #state { gm                  = GM,
485                                      backing_queue       = BQ,
486                                      backing_queue_state = BQS }) ->
487    ok = gm:broadcast(GM, {set_queue_mode, Mode}),
488    BQS1 = BQ:set_queue_mode(Mode, BQS),
489    State #state { backing_queue_state = BQS1 }.
490
491zip_msgs_and_acks(Msgs, AckTags, Accumulator,
492                  #state { backing_queue = BQ,
493                           backing_queue_state = BQS }) ->
494    BQ:zip_msgs_and_acks(Msgs, AckTags, Accumulator, BQS).
495
496%% ---------------------------------------------------------------------------
497%% Other exported functions
498%% ---------------------------------------------------------------------------
499
500-spec promote_backing_queue_state
501        (rabbit_amqqueue:name(), pid(), atom(), any(), pid(), [any()],
502         map(), [pid()]) ->
503            master_state().
504
505promote_backing_queue_state(QName, CPid, BQ, BQS, GM, AckTags, Seen, KS) ->
506    {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
507    Len   = BQ:len(BQS1),
508    Depth = BQ:depth(BQS1),
509    true = Len == Depth, %% ASSERTION: everything must have been requeued
510    ok = gm:broadcast(GM, {depth, Depth}),
511    WaitTimeout = rabbit_misc:get_env(rabbit, slave_wait_timeout, 15000),
512    #state { name                = QName,
513             gm                  = GM,
514             coordinator         = CPid,
515             backing_queue       = BQ,
516             backing_queue_state = BQS1,
517             seen_status         = Seen,
518             confirmed           = [],
519             known_senders       = sets:from_list(KS),
520             wait_timeout        = WaitTimeout }.
521
522-spec sender_death_fun() -> death_fun().
523
524sender_death_fun() ->
525    Self = self(),
526    fun (DeadPid) ->
527            rabbit_amqqueue:run_backing_queue(
528              Self, ?MODULE,
529              fun (?MODULE, State = #state { gm = GM, known_senders = KS }) ->
530                      ok = gm:broadcast(GM, {sender_death, DeadPid}),
531                      KS1 = sets:del_element(DeadPid, KS),
532                      State #state { known_senders = KS1 }
533              end)
534    end.
535
536-spec depth_fun() -> depth_fun().
537
538depth_fun() ->
539    Self = self(),
540    fun () ->
541            rabbit_amqqueue:run_backing_queue(
542              Self, ?MODULE,
543              fun (?MODULE, State = #state { gm                  = GM,
544                                             backing_queue       = BQ,
545                                             backing_queue_state = BQS }) ->
546                      ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}),
547                      State
548              end)
549    end.
550
551%% ---------------------------------------------------------------------------
552%% Helpers
553%% ---------------------------------------------------------------------------
554
555drop_one(AckRequired, State = #state { gm                  = GM,
556                                       backing_queue       = BQ,
557                                       backing_queue_state = BQS }) ->
558    ok = gm:broadcast(GM, {drop, BQ:len(BQS), 1, AckRequired}),
559    State.
560
561drop(PrevLen, AckRequired, State = #state { gm                  = GM,
562                                            backing_queue       = BQ,
563                                            backing_queue_state = BQS }) ->
564    Len = BQ:len(BQS),
565    case PrevLen - Len of
566        0       -> State;
567        Dropped -> ok = gm:broadcast(GM, {drop, Len, Dropped, AckRequired}),
568                   State
569    end.
570
571ensure_monitoring(ChPid, State = #state { coordinator = CPid,
572                                          known_senders = KS }) ->
573    case sets:is_element(ChPid, KS) of
574        true  -> State;
575        false -> ok = rabbit_mirror_queue_coordinator:ensure_monitoring(
576                        CPid, [ChPid]),
577                 State #state { known_senders = sets:add_element(ChPid, KS) }
578    end.
579