1%% This Source Code Form is subject to the terms of the Mozilla Public 2%% License, v. 2.0. If a copy of the MPL was not distributed with this 3%% file, You can obtain one at https://mozilla.org/MPL/2.0/. 4%% 5%% Copyright (c) 2010-2021 VMware, Inc. or its affiliates. All rights reserved. 6%% 7 8-module(rabbit_mirror_queue_master). 9 10-export([init/3, terminate/2, delete_and_terminate/2, 11 purge/1, purge_acks/1, publish/6, publish_delivered/5, 12 batch_publish/4, batch_publish_delivered/4, 13 discard/4, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, 14 len/1, is_empty/1, depth/1, drain_confirmed/1, 15 dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1, 16 needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1, 17 msg_rates/1, info/2, invoke/3, is_duplicate/2, set_queue_mode/2, 18 zip_msgs_and_acks/4, handle_info/2]). 19 20-export([start/2, stop/1, delete_crashed/1]). 21 22-export([promote_backing_queue_state/8, sender_death_fun/0, depth_fun/0]). 23 24-export([init_with_existing_bq/3, stop_mirroring/1, sync_mirrors/3]). 25 26-behaviour(rabbit_backing_queue). 27 28-include_lib("rabbit_common/include/rabbit.hrl"). 29-include("amqqueue.hrl"). 30 31-record(state, { name, 32 gm, 33 coordinator, 34 backing_queue, 35 backing_queue_state, 36 seen_status, 37 confirmed, 38 known_senders, 39 wait_timeout 40 }). 41 42-export_type([death_fun/0, depth_fun/0, stats_fun/0]). 43 44-type death_fun() :: fun ((pid()) -> 'ok'). 45-type depth_fun() :: fun (() -> 'ok'). 46-type stats_fun() :: fun ((any()) -> 'ok'). 47-type master_state() :: #state { name :: rabbit_amqqueue:name(), 48 gm :: pid(), 49 coordinator :: pid(), 50 backing_queue :: atom(), 51 backing_queue_state :: any(), 52 seen_status :: map(), 53 confirmed :: [rabbit_guid:guid()], 54 known_senders :: sets:set() 55 }. 56 57%% For general documentation of HA design, see 58%% rabbit_mirror_queue_coordinator 59 60%% --------------------------------------------------------------------------- 61%% Backing queue 62%% --------------------------------------------------------------------------- 63 64-spec start(_, _) -> no_return(). 65start(_Vhost, _DurableQueues) -> 66 %% This will never get called as this module will never be 67 %% installed as the default BQ implementation. 68 exit({not_valid_for_generic_backing_queue, ?MODULE}). 69 70-spec stop(_) -> no_return(). 71stop(_Vhost) -> 72 %% Same as start/1. 73 exit({not_valid_for_generic_backing_queue, ?MODULE}). 74 75-spec delete_crashed(_) -> no_return(). 76delete_crashed(_QName) -> 77 exit({not_valid_for_generic_backing_queue, ?MODULE}). 78 79init(Q, Recover, AsyncCallback) -> 80 {ok, BQ} = application:get_env(backing_queue_module), 81 BQS = BQ:init(Q, Recover, AsyncCallback), 82 State = #state{gm = GM} = init_with_existing_bq(Q, BQ, BQS), 83 ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}), 84 State. 85 86-spec init_with_existing_bq(amqqueue:amqqueue(), atom(), any()) -> 87 master_state(). 88 89init_with_existing_bq(Q0, BQ, BQS) when ?is_amqqueue(Q0) -> 90 QName = amqqueue:get_name(Q0), 91 case rabbit_mirror_queue_coordinator:start_link( 92 Q0, undefined, sender_death_fun(), depth_fun()) of 93 {ok, CPid} -> 94 GM = rabbit_mirror_queue_coordinator:get_gm(CPid), 95 Self = self(), 96 Fun = fun () -> 97 [Q1] = mnesia:read({rabbit_queue, QName}), 98 true = amqqueue:is_amqqueue(Q1), 99 GMPids0 = amqqueue:get_gm_pids(Q1), 100 GMPids1 = [{GM, Self} | GMPids0], 101 Q2 = amqqueue:set_gm_pids(Q1, GMPids1), 102 Q3 = amqqueue:set_state(Q2, live), 103 %% amqqueue migration: 104 %% The amqqueue was read from this transaction, no 105 %% need to handle migration. 106 ok = rabbit_amqqueue:store_queue(Q3) 107 end, 108 ok = rabbit_misc:execute_mnesia_transaction(Fun), 109 {_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q0), 110 %% We need synchronous add here (i.e. do not return until the 111 %% mirror is running) so that when queue declaration is finished 112 %% all mirrors are up; we don't want to end up with unsynced mirrors 113 %% just by declaring a new queue. But add can't be synchronous all 114 %% the time as it can be called by mirrors and that's 115 %% deadlock-prone. 116 rabbit_mirror_queue_misc:add_mirrors(QName, SNodes, sync), 117 #state{name = QName, 118 gm = GM, 119 coordinator = CPid, 120 backing_queue = BQ, 121 backing_queue_state = BQS, 122 seen_status = #{}, 123 confirmed = [], 124 known_senders = sets:new(), 125 wait_timeout = rabbit_misc:get_env(rabbit, slave_wait_timeout, 15000)}; 126 {error, Reason} -> 127 %% The GM can shutdown before the coordinator has started up 128 %% (lost membership or missing group), thus the start_link of 129 %% the coordinator returns {error, shutdown} as rabbit_amqqueue_process 130 % is trapping exists 131 throw({coordinator_not_started, Reason}) 132 end. 133 134-spec stop_mirroring(master_state()) -> {atom(), any()}. 135 136stop_mirroring(State = #state { coordinator = CPid, 137 backing_queue = BQ, 138 backing_queue_state = BQS }) -> 139 unlink(CPid), 140 stop_all_slaves(shutdown, State), 141 {BQ, BQS}. 142 143-spec sync_mirrors(stats_fun(), stats_fun(), master_state()) -> 144 {'ok', master_state()} | {stop, any(), master_state()}. 145 146sync_mirrors(HandleInfo, EmitStats, 147 State = #state { name = QName, 148 gm = GM, 149 backing_queue = BQ, 150 backing_queue_state = BQS }) -> 151 Log = fun (Fmt, Params) -> 152 rabbit_mirror_queue_misc:log_info( 153 QName, "Synchronising: " ++ Fmt ++ "", Params) 154 end, 155 Log("~p messages to synchronise", [BQ:len(BQS)]), 156 {ok, Q} = rabbit_amqqueue:lookup(QName), 157 SPids = amqqueue:get_slave_pids(Q), 158 SyncBatchSize = rabbit_mirror_queue_misc:sync_batch_size(Q), 159 Log("batch size: ~p", [SyncBatchSize]), 160 Ref = make_ref(), 161 Syncer = rabbit_mirror_queue_sync:master_prepare(Ref, QName, Log, SPids), 162 gm:broadcast(GM, {sync_start, Ref, Syncer, SPids}), 163 S = fun(BQSN) -> State#state{backing_queue_state = BQSN} end, 164 case rabbit_mirror_queue_sync:master_go( 165 Syncer, Ref, Log, HandleInfo, EmitStats, SyncBatchSize, BQ, BQS) of 166 {cancelled, BQS1} -> Log(" synchronisation cancelled ", []), 167 {ok, S(BQS1)}; 168 {shutdown, R, BQS1} -> {stop, R, S(BQS1)}; 169 {sync_died, R, BQS1} -> Log("~p", [R]), 170 {ok, S(BQS1)}; 171 {already_synced, BQS1} -> {ok, S(BQS1)}; 172 {ok, BQS1} -> Log("complete", []), 173 {ok, S(BQS1)} 174 end. 175 176terminate({shutdown, dropped} = Reason, 177 State = #state { backing_queue = BQ, 178 backing_queue_state = BQS }) -> 179 %% Backing queue termination - this node has been explicitly 180 %% dropped. Normally, non-durable queues would be tidied up on 181 %% startup, but there's a possibility that we will be added back 182 %% in without this node being restarted. Thus we must do the full 183 %% blown delete_and_terminate now, but only locally: we do not 184 %% broadcast delete_and_terminate. 185 State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)}; 186 187terminate(Reason, 188 State = #state { name = QName, 189 backing_queue = BQ, 190 backing_queue_state = BQS }) -> 191 %% Backing queue termination. The queue is going down but 192 %% shouldn't be deleted. Most likely safe shutdown of this 193 %% node. 194 {ok, Q} = rabbit_amqqueue:lookup(QName), 195 SSPids = amqqueue:get_sync_slave_pids(Q), 196 case SSPids =:= [] andalso 197 rabbit_policy:get(<<"ha-promote-on-shutdown">>, Q) =/= <<"always">> of 198 true -> %% Remove the whole queue to avoid data loss 199 rabbit_mirror_queue_misc:log_warning( 200 QName, "Stopping all nodes on master shutdown since no " 201 "synchronised mirror (replica) is available", []), 202 stop_all_slaves(Reason, State); 203 false -> %% Just let some other mirror take over. 204 ok 205 end, 206 State #state { backing_queue_state = BQ:terminate(Reason, BQS) }. 207 208delete_and_terminate(Reason, State = #state { backing_queue = BQ, 209 backing_queue_state = BQS }) -> 210 stop_all_slaves(Reason, State), 211 State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)}. 212 213stop_all_slaves(Reason, #state{name = QName, gm = GM, wait_timeout = WT}) -> 214 {ok, Q} = rabbit_amqqueue:lookup(QName), 215 SPids = amqqueue:get_slave_pids(Q), 216 rabbit_mirror_queue_misc:stop_all_slaves(Reason, SPids, QName, GM, WT). 217 218purge(State = #state { gm = GM, 219 backing_queue = BQ, 220 backing_queue_state = BQS }) -> 221 ok = gm:broadcast(GM, {drop, 0, BQ:len(BQS), false}), 222 {Count, BQS1} = BQ:purge(BQS), 223 {Count, State #state { backing_queue_state = BQS1 }}. 224 225-spec purge_acks(_) -> no_return(). 226purge_acks(_State) -> exit({not_implemented, {?MODULE, purge_acks}}). 227 228publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid, Flow, 229 State = #state { gm = GM, 230 seen_status = SS, 231 backing_queue = BQ, 232 backing_queue_state = BQS }) -> 233 false = maps:is_key(MsgId, SS), %% ASSERTION 234 ok = gm:broadcast(GM, {publish, ChPid, Flow, MsgProps, Msg}, 235 rabbit_basic:msg_size(Msg)), 236 BQS1 = BQ:publish(Msg, MsgProps, IsDelivered, ChPid, Flow, BQS), 237 ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }). 238 239batch_publish(Publishes, ChPid, Flow, 240 State = #state { gm = GM, 241 seen_status = SS, 242 backing_queue = BQ, 243 backing_queue_state = BQS }) -> 244 {Publishes1, false, MsgSizes} = 245 lists:foldl(fun ({Msg = #basic_message { id = MsgId }, 246 MsgProps, _IsDelivered}, {Pubs, false, Sizes}) -> 247 {[{Msg, MsgProps, true} | Pubs], %% [0] 248 false = maps:is_key(MsgId, SS), %% ASSERTION 249 Sizes + rabbit_basic:msg_size(Msg)} 250 end, {[], false, 0}, Publishes), 251 Publishes2 = lists:reverse(Publishes1), 252 ok = gm:broadcast(GM, {batch_publish, ChPid, Flow, Publishes2}, 253 MsgSizes), 254 BQS1 = BQ:batch_publish(Publishes2, ChPid, Flow, BQS), 255 ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }). 256%% [0] When the mirror process handles the publish command, it sets the 257%% IsDelivered flag to true, so to avoid iterating over the messages 258%% again at the mirror, we do it here. 259 260publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps, 261 ChPid, Flow, State = #state { gm = GM, 262 seen_status = SS, 263 backing_queue = BQ, 264 backing_queue_state = BQS }) -> 265 false = maps:is_key(MsgId, SS), %% ASSERTION 266 ok = gm:broadcast(GM, {publish_delivered, ChPid, Flow, MsgProps, Msg}, 267 rabbit_basic:msg_size(Msg)), 268 {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, Flow, BQS), 269 State1 = State #state { backing_queue_state = BQS1 }, 270 {AckTag, ensure_monitoring(ChPid, State1)}. 271 272batch_publish_delivered(Publishes, ChPid, Flow, 273 State = #state { gm = GM, 274 seen_status = SS, 275 backing_queue = BQ, 276 backing_queue_state = BQS }) -> 277 {false, MsgSizes} = 278 lists:foldl(fun ({Msg = #basic_message { id = MsgId }, _MsgProps}, 279 {false, Sizes}) -> 280 {false = maps:is_key(MsgId, SS), %% ASSERTION 281 Sizes + rabbit_basic:msg_size(Msg)} 282 end, {false, 0}, Publishes), 283 ok = gm:broadcast(GM, {batch_publish_delivered, ChPid, Flow, Publishes}, 284 MsgSizes), 285 {AckTags, BQS1} = BQ:batch_publish_delivered(Publishes, ChPid, Flow, BQS), 286 State1 = State #state { backing_queue_state = BQS1 }, 287 {AckTags, ensure_monitoring(ChPid, State1)}. 288 289discard(MsgId, ChPid, Flow, State = #state { gm = GM, 290 backing_queue = BQ, 291 backing_queue_state = BQS, 292 seen_status = SS }) -> 293 false = maps:is_key(MsgId, SS), %% ASSERTION 294 ok = gm:broadcast(GM, {discard, ChPid, Flow, MsgId}), 295 ensure_monitoring(ChPid, 296 State #state { backing_queue_state = 297 BQ:discard(MsgId, ChPid, Flow, BQS) }). 298 299dropwhile(Pred, State = #state{backing_queue = BQ, 300 backing_queue_state = BQS }) -> 301 Len = BQ:len(BQS), 302 {Next, BQS1} = BQ:dropwhile(Pred, BQS), 303 {Next, drop(Len, false, State #state { backing_queue_state = BQS1 })}. 304 305fetchwhile(Pred, Fun, Acc, State = #state{backing_queue = BQ, 306 backing_queue_state = BQS }) -> 307 Len = BQ:len(BQS), 308 {Next, Acc1, BQS1} = BQ:fetchwhile(Pred, Fun, Acc, BQS), 309 {Next, Acc1, drop(Len, true, State #state { backing_queue_state = BQS1 })}. 310 311drain_confirmed(State = #state { backing_queue = BQ, 312 backing_queue_state = BQS, 313 seen_status = SS, 314 confirmed = Confirmed }) -> 315 {MsgIds, BQS1} = BQ:drain_confirmed(BQS), 316 {MsgIds1, SS1} = 317 lists:foldl( 318 fun (MsgId, {MsgIdsN, SSN}) -> 319 %% We will never see 'discarded' here 320 case maps:find(MsgId, SSN) of 321 error -> 322 {[MsgId | MsgIdsN], SSN}; 323 {ok, published} -> 324 %% It was published when we were a mirror, 325 %% and we were promoted before we saw the 326 %% publish from the channel. We still 327 %% haven't seen the channel publish, and 328 %% consequently we need to filter out the 329 %% confirm here. We will issue the confirm 330 %% when we see the publish from the channel. 331 {MsgIdsN, maps:put(MsgId, confirmed, SSN)}; 332 {ok, confirmed} -> 333 %% Well, confirms are racy by definition. 334 {[MsgId | MsgIdsN], SSN} 335 end 336 end, {[], SS}, MsgIds), 337 {Confirmed ++ MsgIds1, State #state { backing_queue_state = BQS1, 338 seen_status = SS1, 339 confirmed = [] }}. 340 341fetch(AckRequired, State = #state { backing_queue = BQ, 342 backing_queue_state = BQS }) -> 343 {Result, BQS1} = BQ:fetch(AckRequired, BQS), 344 State1 = State #state { backing_queue_state = BQS1 }, 345 {Result, case Result of 346 empty -> State1; 347 {_MsgId, _IsDelivered, _AckTag} -> drop_one(AckRequired, State1) 348 end}. 349 350drop(AckRequired, State = #state { backing_queue = BQ, 351 backing_queue_state = BQS }) -> 352 {Result, BQS1} = BQ:drop(AckRequired, BQS), 353 State1 = State #state { backing_queue_state = BQS1 }, 354 {Result, case Result of 355 empty -> State1; 356 {_MsgId, _AckTag} -> drop_one(AckRequired, State1) 357 end}. 358 359ack(AckTags, State = #state { gm = GM, 360 backing_queue = BQ, 361 backing_queue_state = BQS }) -> 362 {MsgIds, BQS1} = BQ:ack(AckTags, BQS), 363 case MsgIds of 364 [] -> ok; 365 _ -> ok = gm:broadcast(GM, {ack, MsgIds}) 366 end, 367 {MsgIds, State #state { backing_queue_state = BQS1 }}. 368 369requeue(AckTags, State = #state { gm = GM, 370 backing_queue = BQ, 371 backing_queue_state = BQS }) -> 372 {MsgIds, BQS1} = BQ:requeue(AckTags, BQS), 373 ok = gm:broadcast(GM, {requeue, MsgIds}), 374 {MsgIds, State #state { backing_queue_state = BQS1 }}. 375 376ackfold(MsgFun, Acc, State = #state { backing_queue = BQ, 377 backing_queue_state = BQS }, AckTags) -> 378 {Acc1, BQS1} = BQ:ackfold(MsgFun, Acc, BQS, AckTags), 379 {Acc1, State #state { backing_queue_state = BQS1 }}. 380 381fold(Fun, Acc, State = #state { backing_queue = BQ, 382 backing_queue_state = BQS }) -> 383 {Result, BQS1} = BQ:fold(Fun, Acc, BQS), 384 {Result, State #state { backing_queue_state = BQS1 }}. 385 386len(#state { backing_queue = BQ, backing_queue_state = BQS }) -> 387 BQ:len(BQS). 388 389is_empty(#state { backing_queue = BQ, backing_queue_state = BQS }) -> 390 BQ:is_empty(BQS). 391 392depth(#state { backing_queue = BQ, backing_queue_state = BQS }) -> 393 BQ:depth(BQS). 394 395set_ram_duration_target(Target, State = #state { backing_queue = BQ, 396 backing_queue_state = BQS }) -> 397 State #state { backing_queue_state = 398 BQ:set_ram_duration_target(Target, BQS) }. 399 400ram_duration(State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> 401 {Result, BQS1} = BQ:ram_duration(BQS), 402 {Result, State #state { backing_queue_state = BQS1 }}. 403 404needs_timeout(#state { backing_queue = BQ, backing_queue_state = BQS }) -> 405 BQ:needs_timeout(BQS). 406 407timeout(State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> 408 State #state { backing_queue_state = BQ:timeout(BQS) }. 409 410handle_pre_hibernate(State = #state { backing_queue = BQ, 411 backing_queue_state = BQS }) -> 412 State #state { backing_queue_state = BQ:handle_pre_hibernate(BQS) }. 413 414handle_info(Msg, State = #state { backing_queue = BQ, 415 backing_queue_state = BQS }) -> 416 State #state { backing_queue_state = BQ:handle_info(Msg, BQS) }. 417 418resume(State = #state { backing_queue = BQ, 419 backing_queue_state = BQS }) -> 420 State #state { backing_queue_state = BQ:resume(BQS) }. 421 422msg_rates(#state { backing_queue = BQ, backing_queue_state = BQS }) -> 423 BQ:msg_rates(BQS). 424 425info(backing_queue_status, 426 State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> 427 BQ:info(backing_queue_status, BQS) ++ 428 [ {mirror_seen, maps:size(State #state.seen_status)}, 429 {mirror_senders, sets:size(State #state.known_senders)} ]; 430info(Item, #state { backing_queue = BQ, backing_queue_state = BQS }) -> 431 BQ:info(Item, BQS). 432 433invoke(?MODULE, Fun, State) -> 434 Fun(?MODULE, State); 435invoke(Mod, Fun, State = #state { backing_queue = BQ, 436 backing_queue_state = BQS }) -> 437 State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }. 438 439is_duplicate(Message = #basic_message { id = MsgId }, 440 State = #state { seen_status = SS, 441 backing_queue = BQ, 442 backing_queue_state = BQS, 443 confirmed = Confirmed }) -> 444 %% Here, we need to deal with the possibility that we're about to 445 %% receive a message that we've already seen when we were a mirror 446 %% (we received it via gm). Thus if we do receive such message now 447 %% via the channel, there may be a confirm waiting to issue for 448 %% it. 449 450 %% We will never see {published, ChPid, MsgSeqNo} here. 451 case maps:find(MsgId, SS) of 452 error -> 453 %% We permit the underlying BQ to have a peek at it, but 454 %% only if we ourselves are not filtering out the msg. 455 {Result, BQS1} = BQ:is_duplicate(Message, BQS), 456 {Result, State #state { backing_queue_state = BQS1 }}; 457 {ok, published} -> 458 %% It already got published when we were a mirror and no 459 %% confirmation is waiting. amqqueue_process will have, in 460 %% its msg_id_to_channel mapping, the entry for dealing 461 %% with the confirm when that comes back in (it's added 462 %% immediately after calling is_duplicate). The msg is 463 %% invalid. We will not see this again, nor will we be 464 %% further involved in confirming this message, so erase. 465 {{true, drop}, State #state { seen_status = maps:remove(MsgId, SS) }}; 466 {ok, Disposition} 467 when Disposition =:= confirmed 468 %% It got published when we were a mirror via gm, and 469 %% confirmed some time after that (maybe even after 470 %% promotion), but before we received the publish from the 471 %% channel, so couldn't previously know what the 472 %% msg_seq_no was (and thus confirm as a mirror). So we 473 %% need to confirm now. As above, amqqueue_process will 474 %% have the entry for the msg_id_to_channel mapping added 475 %% immediately after calling is_duplicate/2. 476 orelse Disposition =:= discarded -> 477 %% Message was discarded while we were a mirror. Confirm now. 478 %% As above, amqqueue_process will have the entry for the 479 %% msg_id_to_channel mapping. 480 {{true, drop}, State #state { seen_status = maps:remove(MsgId, SS), 481 confirmed = [MsgId | Confirmed] }} 482 end. 483 484set_queue_mode(Mode, State = #state { gm = GM, 485 backing_queue = BQ, 486 backing_queue_state = BQS }) -> 487 ok = gm:broadcast(GM, {set_queue_mode, Mode}), 488 BQS1 = BQ:set_queue_mode(Mode, BQS), 489 State #state { backing_queue_state = BQS1 }. 490 491zip_msgs_and_acks(Msgs, AckTags, Accumulator, 492 #state { backing_queue = BQ, 493 backing_queue_state = BQS }) -> 494 BQ:zip_msgs_and_acks(Msgs, AckTags, Accumulator, BQS). 495 496%% --------------------------------------------------------------------------- 497%% Other exported functions 498%% --------------------------------------------------------------------------- 499 500-spec promote_backing_queue_state 501 (rabbit_amqqueue:name(), pid(), atom(), any(), pid(), [any()], 502 map(), [pid()]) -> 503 master_state(). 504 505promote_backing_queue_state(QName, CPid, BQ, BQS, GM, AckTags, Seen, KS) -> 506 {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS), 507 Len = BQ:len(BQS1), 508 Depth = BQ:depth(BQS1), 509 true = Len == Depth, %% ASSERTION: everything must have been requeued 510 ok = gm:broadcast(GM, {depth, Depth}), 511 WaitTimeout = rabbit_misc:get_env(rabbit, slave_wait_timeout, 15000), 512 #state { name = QName, 513 gm = GM, 514 coordinator = CPid, 515 backing_queue = BQ, 516 backing_queue_state = BQS1, 517 seen_status = Seen, 518 confirmed = [], 519 known_senders = sets:from_list(KS), 520 wait_timeout = WaitTimeout }. 521 522-spec sender_death_fun() -> death_fun(). 523 524sender_death_fun() -> 525 Self = self(), 526 fun (DeadPid) -> 527 rabbit_amqqueue:run_backing_queue( 528 Self, ?MODULE, 529 fun (?MODULE, State = #state { gm = GM, known_senders = KS }) -> 530 ok = gm:broadcast(GM, {sender_death, DeadPid}), 531 KS1 = sets:del_element(DeadPid, KS), 532 State #state { known_senders = KS1 } 533 end) 534 end. 535 536-spec depth_fun() -> depth_fun(). 537 538depth_fun() -> 539 Self = self(), 540 fun () -> 541 rabbit_amqqueue:run_backing_queue( 542 Self, ?MODULE, 543 fun (?MODULE, State = #state { gm = GM, 544 backing_queue = BQ, 545 backing_queue_state = BQS }) -> 546 ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}), 547 State 548 end) 549 end. 550 551%% --------------------------------------------------------------------------- 552%% Helpers 553%% --------------------------------------------------------------------------- 554 555drop_one(AckRequired, State = #state { gm = GM, 556 backing_queue = BQ, 557 backing_queue_state = BQS }) -> 558 ok = gm:broadcast(GM, {drop, BQ:len(BQS), 1, AckRequired}), 559 State. 560 561drop(PrevLen, AckRequired, State = #state { gm = GM, 562 backing_queue = BQ, 563 backing_queue_state = BQS }) -> 564 Len = BQ:len(BQS), 565 case PrevLen - Len of 566 0 -> State; 567 Dropped -> ok = gm:broadcast(GM, {drop, Len, Dropped, AckRequired}), 568 State 569 end. 570 571ensure_monitoring(ChPid, State = #state { coordinator = CPid, 572 known_senders = KS }) -> 573 case sets:is_element(ChPid, KS) of 574 true -> State; 575 false -> ok = rabbit_mirror_queue_coordinator:ensure_monitoring( 576 CPid, [ChPid]), 577 State #state { known_senders = sets:add_element(ChPid, KS) } 578 end. 579