1%% This Source Code Form is subject to the terms of the Mozilla Public 2%% License, v. 2.0. If a copy of the MPL was not distributed with this 3%% file, You can obtain one at https://mozilla.org/MPL/2.0/. 4%% 5%% Copyright (c) 2010-2021 VMware, Inc. or its affiliates. All rights reserved. 6%% 7 8-module(rabbit_mirror_queue_slave). 9 10%% For general documentation of HA design, see 11%% rabbit_mirror_queue_coordinator 12%% 13%% We receive messages from GM and from publishers, and the gm 14%% messages can arrive either before or after the 'actual' message. 15%% All instructions from the GM group must be processed in the order 16%% in which they're received. 17 18-export([set_maximum_since_use/2, info/1, go/2]). 19 20-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, 21 code_change/3, handle_pre_hibernate/1, prioritise_call/4, 22 prioritise_cast/3, prioritise_info/3, format_message_queue/2]). 23 24-export([joined/2, members_changed/3, handle_msg/3, handle_terminate/2]). 25 26-behaviour(gen_server2). 27-behaviour(gm). 28 29-include_lib("rabbit_common/include/rabbit.hrl"). 30 31-include("amqqueue.hrl"). 32-include("gm_specs.hrl"). 33 34%%---------------------------------------------------------------------------- 35 36-define(INFO_KEYS, 37 [pid, 38 name, 39 master_pid, 40 is_synchronised 41 ]). 42 43-define(SYNC_INTERVAL, 25). %% milliseconds 44-define(RAM_DURATION_UPDATE_INTERVAL, 5000). 45-define(DEATH_TIMEOUT, 20000). %% 20 seconds 46 47-record(state, { q, 48 gm, 49 backing_queue, 50 backing_queue_state, 51 sync_timer_ref, 52 rate_timer_ref, 53 54 sender_queues, %% :: Pid -> {Q Msg, Set MsgId, ChState} 55 msg_id_ack, %% :: MsgId -> AckTag 56 57 msg_id_status, 58 known_senders, 59 60 %% Master depth - local depth 61 depth_delta 62 }). 63 64%%---------------------------------------------------------------------------- 65 66set_maximum_since_use(QPid, Age) -> 67 gen_server2:cast(QPid, {set_maximum_since_use, Age}). 68 69info(QPid) -> gen_server2:call(QPid, info, infinity). 70 71init(Q) when ?is_amqqueue(Q) -> 72 QName = amqqueue:get_name(Q), 73 ?store_proc_name(QName), 74 {ok, {not_started, Q}, hibernate, 75 {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, 76 ?DESIRED_HIBERNATE}, ?MODULE}. 77 78go(SPid, sync) -> gen_server2:call(SPid, go, infinity); 79go(SPid, async) -> gen_server2:cast(SPid, go). 80 81handle_go(Q0) when ?is_amqqueue(Q0) -> 82 QName = amqqueue:get_name(Q0), 83 %% We join the GM group before we add ourselves to the amqqueue 84 %% record. As a result: 85 %% 1. We can receive msgs from GM that correspond to messages we will 86 %% never receive from publishers. 87 %% 2. When we receive a message from publishers, we must receive a 88 %% message from the GM group for it. 89 %% 3. However, that instruction from the GM group can arrive either 90 %% before or after the actual message. We need to be able to 91 %% distinguish between GM instructions arriving early, and case (1) 92 %% above. 93 %% 94 process_flag(trap_exit, true), %% amqqueue_process traps exits too. 95 {ok, GM} = gm:start_link(QName, ?MODULE, [self()], 96 fun rabbit_misc:execute_mnesia_transaction/1), 97 MRef = erlang:monitor(process, GM), 98 %% We ignore the DOWN message because we are also linked and 99 %% trapping exits, we just want to not get stuck and we will exit 100 %% later. 101 receive 102 {joined, GM} -> erlang:demonitor(MRef, [flush]), 103 ok; 104 {'DOWN', MRef, _, _, _} -> ok 105 end, 106 Self = self(), 107 Node = node(), 108 case rabbit_misc:execute_mnesia_transaction( 109 fun() -> init_it(Self, GM, Node, QName) end) of 110 {new, QPid, GMPids} -> 111 ok = file_handle_cache:register_callback( 112 rabbit_amqqueue, set_maximum_since_use, [Self]), 113 ok = rabbit_memory_monitor:register( 114 Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}), 115 {ok, BQ} = application:get_env(backing_queue_module), 116 Q1 = amqqueue:set_pid(Q0, QPid), 117 _ = BQ:delete_crashed(Q1), %% For crash recovery 118 BQS = bq_init(BQ, Q1, new), 119 State = #state { q = Q1, 120 gm = GM, 121 backing_queue = BQ, 122 backing_queue_state = BQS, 123 rate_timer_ref = undefined, 124 sync_timer_ref = undefined, 125 126 sender_queues = #{}, 127 msg_id_ack = #{}, 128 129 msg_id_status = #{}, 130 known_senders = pmon:new(delegate), 131 132 depth_delta = undefined 133 }, 134 ok = gm:broadcast(GM, request_depth), 135 ok = gm:validate_members(GM, [GM | [G || {G, _} <- GMPids]]), 136 rabbit_mirror_queue_misc:maybe_auto_sync(Q1), 137 {ok, State}; 138 {stale, StalePid} -> 139 rabbit_mirror_queue_misc:log_warning( 140 QName, "Detected stale classic mirrored queue leader: ~p", [StalePid]), 141 gm:leave(GM), 142 {error, {stale_master_pid, StalePid}}; 143 duplicate_live_master -> 144 gm:leave(GM), 145 {error, {duplicate_live_master, Node}}; 146 existing -> 147 gm:leave(GM), 148 {error, normal}; 149 master_in_recovery -> 150 gm:leave(GM), 151 %% The queue record vanished - we must have a master starting 152 %% concurrently with us. In that case we can safely decide to do 153 %% nothing here, and the master will start us in 154 %% master:init_with_existing_bq/3 155 {error, normal} 156 end. 157 158init_it(Self, GM, Node, QName) -> 159 case mnesia:read({rabbit_queue, QName}) of 160 [Q] when ?is_amqqueue(Q) -> 161 QPid = amqqueue:get_pid(Q), 162 SPids = amqqueue:get_slave_pids(Q), 163 GMPids = amqqueue:get_gm_pids(Q), 164 PSPids = amqqueue:get_slave_pids_pending_shutdown(Q), 165 case [Pid || Pid <- [QPid | SPids], node(Pid) =:= Node] of 166 [] -> stop_pending_slaves(QName, PSPids), 167 add_slave(Q, Self, GM), 168 {new, QPid, GMPids}; 169 [QPid] -> case rabbit_mnesia:is_process_alive(QPid) of 170 true -> duplicate_live_master; 171 false -> {stale, QPid} 172 end; 173 [SPid] -> case rabbit_mnesia:is_process_alive(SPid) of 174 true -> existing; 175 false -> GMPids1 = [T || T = {_, S} <- GMPids, S =/= SPid], 176 SPids1 = SPids -- [SPid], 177 Q1 = amqqueue:set_slave_pids(Q, SPids1), 178 Q2 = amqqueue:set_gm_pids(Q1, GMPids1), 179 add_slave(Q2, Self, GM), 180 {new, QPid, GMPids1} 181 end 182 end; 183 [] -> 184 master_in_recovery 185 end. 186 187%% Pending mirrors have been asked to stop by the master, but despite the node 188%% being up these did not answer on the expected timeout. Stop local mirrors now. 189stop_pending_slaves(QName, Pids) -> 190 [begin 191 rabbit_mirror_queue_misc:log_warning( 192 QName, "Detected a non-responsive classic queue mirror, stopping it: ~p", [Pid]), 193 case erlang:process_info(Pid, dictionary) of 194 undefined -> ok; 195 {dictionary, Dict} -> 196 Vhost = QName#resource.virtual_host, 197 {ok, AmqQSup} = rabbit_amqqueue_sup_sup:find_for_vhost(Vhost), 198 case proplists:get_value('$ancestors', Dict) of 199 [Sup, AmqQSup | _] -> 200 exit(Sup, kill), 201 exit(Pid, kill); 202 _ -> 203 ok 204 end 205 end 206 end || Pid <- Pids, node(Pid) =:= node(), 207 true =:= erlang:is_process_alive(Pid)]. 208 209%% Add to the end, so they are in descending order of age, see 210%% rabbit_mirror_queue_misc:promote_slave/1 211add_slave(Q0, New, GM) when ?is_amqqueue(Q0) -> 212 SPids = amqqueue:get_slave_pids(Q0), 213 GMPids = amqqueue:get_gm_pids(Q0), 214 SPids1 = SPids ++ [New], 215 GMPids1 = [{GM, New} | GMPids], 216 Q1 = amqqueue:set_slave_pids(Q0, SPids1), 217 Q2 = amqqueue:set_gm_pids(Q1, GMPids1), 218 rabbit_mirror_queue_misc:store_updated_slaves(Q2). 219 220handle_call(go, _From, {not_started, Q} = NotStarted) -> 221 case handle_go(Q) of 222 {ok, State} -> {reply, ok, State}; 223 {error, Error} -> {stop, Error, NotStarted} 224 end; 225 226handle_call({gm_deaths, DeadGMPids}, From, 227 State = #state{ gm = GM, q = Q, 228 backing_queue = BQ, 229 backing_queue_state = BQS}) when ?is_amqqueue(Q) -> 230 QName = amqqueue:get_name(Q), 231 MPid = amqqueue:get_pid(Q), 232 Self = self(), 233 case rabbit_mirror_queue_misc:remove_from_queue(QName, Self, DeadGMPids) of 234 {error, not_found} -> 235 gen_server2:reply(From, ok), 236 {stop, normal, State}; 237 {error, {not_synced, _SPids}} -> 238 BQ:delete_and_terminate({error, not_synced}, BQS), 239 {stop, normal, State#state{backing_queue_state = undefined}}; 240 {ok, Pid, DeadPids, ExtraNodes} -> 241 rabbit_mirror_queue_misc:report_deaths(Self, false, QName, 242 DeadPids), 243 case Pid of 244 MPid -> 245 %% master hasn't changed 246 gen_server2:reply(From, ok), 247 rabbit_mirror_queue_misc:add_mirrors( 248 QName, ExtraNodes, async), 249 noreply(State); 250 Self -> 251 %% we've become master 252 QueueState = promote_me(From, State), 253 rabbit_mirror_queue_misc:add_mirrors( 254 QName, ExtraNodes, async), 255 {become, rabbit_amqqueue_process, QueueState, hibernate}; 256 _ -> 257 %% master has changed to not us 258 gen_server2:reply(From, ok), 259 %% see rabbitmq-server#914; 260 %% It's not always guaranteed that we won't have ExtraNodes. 261 %% If gm alters, master can change to not us with extra nodes, 262 %% in which case we attempt to add mirrors on those nodes. 263 case ExtraNodes of 264 [] -> void; 265 _ -> rabbit_mirror_queue_misc:add_mirrors( 266 QName, ExtraNodes, async) 267 end, 268 %% Since GM is by nature lazy we need to make sure 269 %% there is some traffic when a master dies, to 270 %% make sure all mirrors get informed of the 271 %% death. That is all process_death does, create 272 %% some traffic. 273 ok = gm:broadcast(GM, process_death), 274 Q1 = amqqueue:set_pid(Q, Pid), 275 State1 = State#state{q = Q1}, 276 noreply(State1) 277 end 278 end; 279 280handle_call(info, _From, State) -> 281 reply(infos(?INFO_KEYS, State), State). 282 283handle_cast(go, {not_started, Q} = NotStarted) -> 284 case handle_go(Q) of 285 {ok, State} -> {noreply, State}; 286 {error, Error} -> {stop, Error, NotStarted} 287 end; 288 289handle_cast({run_backing_queue, Mod, Fun}, State) -> 290 noreply(run_backing_queue(Mod, Fun, State)); 291 292handle_cast({gm, Instruction}, State = #state{q = Q0}) when ?is_amqqueue(Q0) -> 293 QName = amqqueue:get_name(Q0), 294 case rabbit_amqqueue:lookup(QName) of 295 {ok, Q1} when ?is_amqqueue(Q1) -> 296 SPids = amqqueue:get_slave_pids(Q1), 297 case lists:member(self(), SPids) of 298 true -> 299 handle_process_result(process_instruction(Instruction, State)); 300 false -> 301 %% Potentially a duplicated mirror caused by a partial partition, 302 %% will stop as a new mirror could start unaware of our presence 303 {stop, shutdown, State} 304 end; 305 {error, not_found} -> 306 %% Would not expect this to happen after fixing #953 307 {stop, shutdown, State} 308 end; 309 310handle_cast({deliver, Delivery = #delivery{sender = Sender, flow = Flow}, true}, 311 State) -> 312 %% Asynchronous, non-"mandatory", deliver mode. 313 %% We are acking messages to the channel process that sent us 314 %% the message delivery. See 315 %% rabbit_amqqueue_process:handle_ch_down for more info. 316 %% If message is rejected by the master, the publish will be nacked 317 %% even if mirrors confirm it. No need to check for length here. 318 maybe_flow_ack(Sender, Flow), 319 noreply(maybe_enqueue_message(Delivery, State)); 320 321handle_cast({sync_start, Ref, Syncer}, 322 State = #state { depth_delta = DD, 323 backing_queue = BQ, 324 backing_queue_state = BQS }) -> 325 State1 = #state{rate_timer_ref = TRef} = ensure_rate_timer(State), 326 S = fun({MA, TRefN, BQSN}) -> 327 State1#state{depth_delta = undefined, 328 msg_id_ack = maps:from_list(MA), 329 rate_timer_ref = TRefN, 330 backing_queue_state = BQSN} 331 end, 332 case rabbit_mirror_queue_sync:slave( 333 DD, Ref, TRef, Syncer, BQ, BQS, 334 fun (BQN, BQSN) -> 335 BQSN1 = update_ram_duration(BQN, BQSN), 336 TRefN = rabbit_misc:send_after(?RAM_DURATION_UPDATE_INTERVAL, 337 self(), update_ram_duration), 338 {TRefN, BQSN1} 339 end) of 340 denied -> noreply(State1); 341 {ok, Res} -> noreply(set_delta(0, S(Res))); 342 {failed, Res} -> noreply(S(Res)); 343 {stop, Reason, Res} -> {stop, Reason, S(Res)} 344 end; 345 346handle_cast({set_maximum_since_use, Age}, State) -> 347 ok = file_handle_cache:set_maximum_since_use(Age), 348 noreply(State); 349 350handle_cast({set_ram_duration_target, Duration}, 351 State = #state { backing_queue = BQ, 352 backing_queue_state = BQS }) -> 353 BQS1 = BQ:set_ram_duration_target(Duration, BQS), 354 noreply(State #state { backing_queue_state = BQS1 }); 355 356handle_cast(policy_changed, State) -> 357 %% During partial partitions, we might end up receiving messages expected by a master 358 %% Ignore them 359 noreply(State). 360 361handle_info(update_ram_duration, State = #state{backing_queue = BQ, 362 backing_queue_state = BQS}) -> 363 BQS1 = update_ram_duration(BQ, BQS), 364 %% Don't call noreply/1, we don't want to set timers 365 {State1, Timeout} = next_state(State #state { 366 rate_timer_ref = undefined, 367 backing_queue_state = BQS1 }), 368 {noreply, State1, Timeout}; 369 370handle_info(sync_timeout, State) -> 371 noreply(backing_queue_timeout( 372 State #state { sync_timer_ref = undefined })); 373 374handle_info(timeout, State) -> 375 noreply(backing_queue_timeout(State)); 376 377handle_info({'DOWN', _MonitorRef, process, ChPid, _Reason}, State) -> 378 local_sender_death(ChPid, State), 379 noreply(maybe_forget_sender(ChPid, down_from_ch, State)); 380 381handle_info({'EXIT', _Pid, Reason}, State) -> 382 {stop, Reason, State}; 383 384handle_info({bump_credit, Msg}, State) -> 385 credit_flow:handle_bump_msg(Msg), 386 noreply(State); 387 388handle_info(bump_reduce_memory_use, State = #state{backing_queue = BQ, 389 backing_queue_state = BQS}) -> 390 BQS1 = BQ:handle_info(bump_reduce_memory_use, BQS), 391 BQS2 = BQ:resume(BQS1), 392 noreply(State#state{ 393 backing_queue_state = BQS2 394 }); 395 396%% In the event of a short partition during sync we can detect the 397%% master's 'death', drop out of sync, and then receive sync messages 398%% which were still in flight. Ignore them. 399handle_info({sync_msg, _Ref, _Msg, _Props, _Unacked}, State) -> 400 noreply(State); 401 402handle_info({sync_complete, _Ref}, State) -> 403 noreply(State); 404 405handle_info(Msg, State) -> 406 {stop, {unexpected_info, Msg}, State}. 407 408terminate(_Reason, {not_started, _Q}) -> 409 ok; 410terminate(_Reason, #state { backing_queue_state = undefined }) -> 411 %% We've received a delete_and_terminate from gm, thus nothing to 412 %% do here. 413 ok; 414terminate({shutdown, dropped} = R, State = #state{backing_queue = BQ, 415 backing_queue_state = BQS}) -> 416 %% See rabbit_mirror_queue_master:terminate/2 417 terminate_common(State), 418 BQ:delete_and_terminate(R, BQS); 419terminate(shutdown, State) -> 420 terminate_shutdown(shutdown, State); 421terminate({shutdown, _} = R, State) -> 422 terminate_shutdown(R, State); 423terminate(Reason, State = #state{backing_queue = BQ, 424 backing_queue_state = BQS}) -> 425 terminate_common(State), 426 BQ:delete_and_terminate(Reason, BQS). 427 428%% If the Reason is shutdown, or {shutdown, _}, it is not the queue 429%% being deleted: it's just the node going down. Even though we're a 430%% mirror, we have no idea whether or not we'll be the only copy coming 431%% back up. Thus we must assume we will be, and preserve anything we 432%% have on disk. 433terminate_shutdown(Reason, State = #state{backing_queue = BQ, 434 backing_queue_state = BQS}) -> 435 terminate_common(State), 436 BQ:terminate(Reason, BQS). 437 438terminate_common(State) -> 439 ok = rabbit_memory_monitor:deregister(self()), 440 stop_rate_timer(stop_sync_timer(State)). 441 442code_change(_OldVsn, State, _Extra) -> 443 {ok, State}. 444 445handle_pre_hibernate({not_started, _Q} = State) -> 446 {hibernate, State}; 447 448handle_pre_hibernate(State = #state { backing_queue = BQ, 449 backing_queue_state = BQS }) -> 450 {RamDuration, BQS1} = BQ:ram_duration(BQS), 451 DesiredDuration = 452 rabbit_memory_monitor:report_ram_duration(self(), RamDuration), 453 BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), 454 BQS3 = BQ:handle_pre_hibernate(BQS2), 455 {hibernate, stop_rate_timer(State #state { backing_queue_state = BQS3 })}. 456 457prioritise_call(Msg, _From, _Len, _State) -> 458 case Msg of 459 info -> 9; 460 {gm_deaths, _Dead} -> 5; 461 _ -> 0 462 end. 463 464prioritise_cast(Msg, _Len, _State) -> 465 case Msg of 466 {set_ram_duration_target, _Duration} -> 8; 467 {set_maximum_since_use, _Age} -> 8; 468 {run_backing_queue, _Mod, _Fun} -> 6; 469 {gm, _Msg} -> 5; 470 _ -> 0 471 end. 472 473prioritise_info(Msg, _Len, _State) -> 474 case Msg of 475 update_ram_duration -> 8; 476 sync_timeout -> 6; 477 _ -> 0 478 end. 479 480format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ). 481 482%% --------------------------------------------------------------------------- 483%% GM 484%% --------------------------------------------------------------------------- 485 486-spec joined(args(), members()) -> callback_result(). 487 488joined([SPid], _Members) -> SPid ! {joined, self()}, ok. 489 490-spec members_changed(args(), members(),members()) -> callback_result(). 491 492members_changed([_SPid], _Births, []) -> 493 ok; 494members_changed([ SPid], _Births, Deaths) -> 495 case rabbit_misc:with_exit_handler( 496 rabbit_misc:const(ok), 497 fun() -> 498 gen_server2:call(SPid, {gm_deaths, Deaths}, infinity) 499 end) of 500 ok -> ok; 501 {promote, CPid} -> {become, rabbit_mirror_queue_coordinator, [CPid]} 502 end. 503 504-spec handle_msg(args(), pid(), any()) -> callback_result(). 505 506handle_msg([_SPid], _From, hibernate_heartbeat) -> 507 %% See rabbit_mirror_queue_coordinator:handle_pre_hibernate/1 508 ok; 509handle_msg([_SPid], _From, request_depth) -> 510 %% This is only of value to the master 511 ok; 512handle_msg([_SPid], _From, {ensure_monitoring, _Pid}) -> 513 %% This is only of value to the master 514 ok; 515handle_msg([_SPid], _From, process_death) -> 516 %% We must not take any notice of the master death here since it 517 %% comes without ordering guarantees - there could still be 518 %% messages from the master we have yet to receive. When we get 519 %% members_changed, then there will be no more messages. 520 ok; 521handle_msg([CPid], _From, {delete_and_terminate, _Reason} = Msg) -> 522 ok = gen_server2:cast(CPid, {gm, Msg}), 523 {stop, {shutdown, ring_shutdown}}; 524handle_msg([SPid], _From, {sync_start, Ref, Syncer, SPids}) -> 525 case lists:member(SPid, SPids) of 526 true -> gen_server2:cast(SPid, {sync_start, Ref, Syncer}); 527 false -> ok 528 end; 529handle_msg([SPid], _From, Msg) -> 530 ok = gen_server2:cast(SPid, {gm, Msg}). 531 532-spec handle_terminate(args(), term()) -> any(). 533 534handle_terminate([_SPid], _Reason) -> 535 ok. 536 537%% --------------------------------------------------------------------------- 538%% Others 539%% --------------------------------------------------------------------------- 540 541infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. 542 543i(pid, _State) -> 544 self(); 545i(name, #state{q = Q}) when ?is_amqqueue(Q) -> 546 amqqueue:get_name(Q); 547i(master_pid, #state{q = Q}) when ?is_amqqueue(Q) -> 548 amqqueue:get_pid(Q); 549i(is_synchronised, #state{depth_delta = DD}) -> 550 DD =:= 0; 551i(_, _) -> 552 ''. 553 554bq_init(BQ, Q, Recover) -> 555 Self = self(), 556 BQ:init(Q, Recover, 557 fun (Mod, Fun) -> 558 rabbit_amqqueue:run_backing_queue(Self, Mod, Fun) 559 end). 560 561run_backing_queue(rabbit_mirror_queue_master, Fun, State) -> 562 %% Yes, this might look a little crazy, but see comments in 563 %% confirm_sender_death/1 564 Fun(?MODULE, State); 565run_backing_queue(Mod, Fun, State = #state { backing_queue = BQ, 566 backing_queue_state = BQS }) -> 567 State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }. 568 569%% This feature was used by `rabbit_amqqueue_process` and 570%% `rabbit_mirror_queue_slave` up-to and including RabbitMQ 3.7.x. It is 571%% unused in 3.8.x and thus deprecated. We keep it to support in-place 572%% upgrades to 3.8.x (i.e. mixed-version clusters), but it is a no-op 573%% starting with that version. 574send_mandatory(#delivery{mandatory = false}) -> 575 ok; 576send_mandatory(#delivery{mandatory = true, 577 sender = SenderPid, 578 msg_seq_no = MsgSeqNo}) -> 579 gen_server2:cast(SenderPid, {mandatory_received, MsgSeqNo}). 580 581send_or_record_confirm(_, #delivery{ confirm = false }, MS, _State) -> 582 MS; 583send_or_record_confirm(published, #delivery { sender = ChPid, 584 confirm = true, 585 msg_seq_no = MsgSeqNo, 586 message = #basic_message { 587 id = MsgId, 588 is_persistent = true } }, 589 MS, #state{q = Q}) when ?amqqueue_is_durable(Q) -> 590 maps:put(MsgId, {published, ChPid, MsgSeqNo} , MS); 591send_or_record_confirm(_Status, #delivery { sender = ChPid, 592 confirm = true, 593 msg_seq_no = MsgSeqNo }, 594 MS, #state{q = Q} = _State) -> 595 ok = rabbit_classic_queue:confirm_to_sender(ChPid, 596 amqqueue:get_name(Q), [MsgSeqNo]), 597 MS. 598 599confirm_messages(MsgIds, State = #state{q = Q, msg_id_status = MS}) -> 600 QName = amqqueue:get_name(Q), 601 {CMs, MS1} = 602 lists:foldl( 603 fun (MsgId, {CMsN, MSN} = Acc) -> 604 %% We will never see 'discarded' here 605 case maps:find(MsgId, MSN) of 606 error -> 607 %% If it needed confirming, it'll have 608 %% already been done. 609 Acc; 610 {ok, published} -> 611 %% Still not seen it from the channel, just 612 %% record that it's been confirmed. 613 {CMsN, maps:put(MsgId, confirmed, MSN)}; 614 {ok, {published, ChPid, MsgSeqNo}} -> 615 %% Seen from both GM and Channel. Can now 616 %% confirm. 617 {rabbit_misc:gb_trees_cons(ChPid, MsgSeqNo, CMsN), 618 maps:remove(MsgId, MSN)}; 619 {ok, confirmed} -> 620 %% It's already been confirmed. This is 621 %% probably it's been both sync'd to disk 622 %% and then delivered and ack'd before we've 623 %% seen the publish from the 624 %% channel. Nothing to do here. 625 Acc 626 end 627 end, {gb_trees:empty(), MS}, MsgIds), 628 Fun = fun (Pid, MsgSeqNos) -> 629 rabbit_classic_queue:confirm_to_sender(Pid, QName, MsgSeqNos) 630 end, 631 rabbit_misc:gb_trees_foreach(Fun, CMs), 632 State #state { msg_id_status = MS1 }. 633 634handle_process_result({ok, State}) -> noreply(State); 635handle_process_result({stop, State}) -> {stop, normal, State}. 636 637-spec promote_me({pid(), term()}, #state{}) -> no_return(). 638 639promote_me(From, #state { q = Q0, 640 gm = GM, 641 backing_queue = BQ, 642 backing_queue_state = BQS, 643 rate_timer_ref = RateTRef, 644 sender_queues = SQ, 645 msg_id_ack = MA, 646 msg_id_status = MS, 647 known_senders = KS}) when ?is_amqqueue(Q0) -> 648 QName = amqqueue:get_name(Q0), 649 rabbit_mirror_queue_misc:log_info(QName, "Promoting mirror ~s to leader", 650 [rabbit_misc:pid_to_string(self())]), 651 Q1 = amqqueue:set_pid(Q0, self()), 652 DeathFun = rabbit_mirror_queue_master:sender_death_fun(), 653 DepthFun = rabbit_mirror_queue_master:depth_fun(), 654 {ok, CPid} = rabbit_mirror_queue_coordinator:start_link(Q1, GM, DeathFun, DepthFun), 655 true = unlink(GM), 656 gen_server2:reply(From, {promote, CPid}), 657 658 %% Everything that we're monitoring, we need to ensure our new 659 %% coordinator is monitoring. 660 MPids = pmon:monitored(KS), 661 ok = rabbit_mirror_queue_coordinator:ensure_monitoring(CPid, MPids), 662 663 %% We find all the messages that we've received from channels but 664 %% not from gm, and pass them to the 665 %% queue_process:init_with_backing_queue_state to be enqueued. 666 %% 667 %% We also have to requeue messages which are pending acks: the 668 %% consumers from the master queue have been lost and so these 669 %% messages need requeuing. They might also be pending 670 %% confirmation, and indeed they might also be pending arrival of 671 %% the publication from the channel itself, if we received both 672 %% the publication and the fetch via gm first! Requeuing doesn't 673 %% affect confirmations: if the message was previously pending a 674 %% confirmation then it still will be, under the same msg_id. So 675 %% as a master, we need to be prepared to filter out the 676 %% publication of said messages from the channel (is_duplicate 677 %% (thus such requeued messages must remain in the msg_id_status 678 %% (MS) which becomes seen_status (SS) in the master)). 679 %% 680 %% Then there are messages we already have in the queue, which are 681 %% not currently pending acknowledgement: 682 %% 1. Messages we've only received via gm: 683 %% Filter out subsequent publication from channel through 684 %% validate_message. Might have to issue confirms then or 685 %% later, thus queue_process state will have to know that 686 %% there's a pending confirm. 687 %% 2. Messages received via both gm and channel: 688 %% Queue will have to deal with issuing confirms if necessary. 689 %% 690 %% MS contains the following three entry types: 691 %% 692 %% a) published: 693 %% published via gm only; pending arrival of publication from 694 %% channel, maybe pending confirm. 695 %% 696 %% b) {published, ChPid, MsgSeqNo}: 697 %% published via gm and channel; pending confirm. 698 %% 699 %% c) confirmed: 700 %% published via gm only, and confirmed; pending publication 701 %% from channel. 702 %% 703 %% d) discarded: 704 %% seen via gm only as discarded. Pending publication from 705 %% channel 706 %% 707 %% The forms a, c and d only, need to go to the master state 708 %% seen_status (SS). 709 %% 710 %% The form b only, needs to go through to the queue_process 711 %% state to form the msg_id_to_channel mapping (MTC). 712 %% 713 %% No messages that are enqueued from SQ at this point will have 714 %% entries in MS. 715 %% 716 %% Messages that are extracted from MA may have entries in MS, and 717 %% those messages are then requeued. However, as discussed above, 718 %% this does not affect MS, nor which bits go through to SS in 719 %% Master, or MTC in queue_process. 720 721 St = [published, confirmed, discarded], 722 SS = maps:filter(fun (_MsgId, Status) -> lists:member(Status, St) end, MS), 723 AckTags = [AckTag || {_MsgId, AckTag} <- maps:to_list(MA)], 724 725 MasterState = rabbit_mirror_queue_master:promote_backing_queue_state( 726 QName, CPid, BQ, BQS, GM, AckTags, SS, MPids), 727 728 MTC = maps:fold(fun (MsgId, {published, ChPid, MsgSeqNo}, MTC0) -> 729 maps:put(MsgId, {ChPid, MsgSeqNo}, MTC0); 730 (_Msgid, _Status, MTC0) -> 731 MTC0 732 end, #{}, MS), 733 Deliveries = [promote_delivery(Delivery) || 734 {_ChPid, {PubQ, _PendCh, _ChState}} <- maps:to_list(SQ), 735 Delivery <- queue:to_list(PubQ)], 736 AwaitGmDown = [ChPid || {ChPid, {_, _, down_from_ch}} <- maps:to_list(SQ)], 737 KS1 = lists:foldl(fun (ChPid0, KS0) -> 738 pmon:demonitor(ChPid0, KS0) 739 end, KS, AwaitGmDown), 740 rabbit_misc:store_proc_name(rabbit_amqqueue_process, QName), 741 rabbit_amqqueue_process:init_with_backing_queue_state( 742 Q1, rabbit_mirror_queue_master, MasterState, RateTRef, Deliveries, KS1, 743 MTC). 744 745%% We reset mandatory to false here because we will have sent the 746%% mandatory_received already as soon as we got the message. We also 747%% need to send an ack for these messages since the channel is waiting 748%% for one for the via-GM case and we will not now receive one. 749promote_delivery(Delivery = #delivery{sender = Sender, flow = Flow}) -> 750 maybe_flow_ack(Sender, Flow), 751 Delivery#delivery{mandatory = false}. 752 753noreply(State) -> 754 {NewState, Timeout} = next_state(State), 755 {noreply, ensure_rate_timer(NewState), Timeout}. 756 757reply(Reply, State) -> 758 {NewState, Timeout} = next_state(State), 759 {reply, Reply, ensure_rate_timer(NewState), Timeout}. 760 761next_state(State = #state{backing_queue = BQ, backing_queue_state = BQS}) -> 762 {MsgIds, BQS1} = BQ:drain_confirmed(BQS), 763 State1 = confirm_messages(MsgIds, 764 State #state { backing_queue_state = BQS1 }), 765 case BQ:needs_timeout(BQS1) of 766 false -> {stop_sync_timer(State1), hibernate }; 767 idle -> {stop_sync_timer(State1), ?SYNC_INTERVAL}; 768 timed -> {ensure_sync_timer(State1), 0 } 769 end. 770 771backing_queue_timeout(State = #state { backing_queue = BQ, 772 backing_queue_state = BQS }) -> 773 State#state{backing_queue_state = BQ:timeout(BQS)}. 774 775ensure_sync_timer(State) -> 776 rabbit_misc:ensure_timer(State, #state.sync_timer_ref, 777 ?SYNC_INTERVAL, sync_timeout). 778 779stop_sync_timer(State) -> rabbit_misc:stop_timer(State, #state.sync_timer_ref). 780 781ensure_rate_timer(State) -> 782 rabbit_misc:ensure_timer(State, #state.rate_timer_ref, 783 ?RAM_DURATION_UPDATE_INTERVAL, 784 update_ram_duration). 785 786stop_rate_timer(State) -> rabbit_misc:stop_timer(State, #state.rate_timer_ref). 787 788ensure_monitoring(ChPid, State = #state { known_senders = KS }) -> 789 State #state { known_senders = pmon:monitor(ChPid, KS) }. 790 791local_sender_death(ChPid, #state { known_senders = KS }) -> 792 %% The channel will be monitored iff we have received a delivery 793 %% from it but not heard about its death from the master. So if it 794 %% is monitored we need to point the death out to the master (see 795 %% essay). 796 ok = case pmon:is_monitored(ChPid, KS) of 797 false -> ok; 798 true -> confirm_sender_death(ChPid) 799 end. 800 801confirm_sender_death(Pid) -> 802 %% We have to deal with the possibility that we'll be promoted to 803 %% master before this thing gets run. Consequently we set the 804 %% module to rabbit_mirror_queue_master so that if we do become a 805 %% rabbit_amqqueue_process before then, sane things will happen. 806 Fun = 807 fun (?MODULE, State = #state { known_senders = KS, 808 gm = GM }) -> 809 %% We're running still as a mirror 810 %% 811 %% See comment in local_sender_death/2; we might have 812 %% received a sender_death in the meanwhile so check 813 %% again. 814 ok = case pmon:is_monitored(Pid, KS) of 815 false -> ok; 816 true -> gm:broadcast(GM, {ensure_monitoring, [Pid]}), 817 confirm_sender_death(Pid) 818 end, 819 State; 820 (rabbit_mirror_queue_master, State) -> 821 %% We've become a master. State is now opaque to 822 %% us. When we became master, if Pid was still known 823 %% to us then we'd have set up monitoring of it then, 824 %% so this is now a noop. 825 State 826 end, 827 %% Note that we do not remove our knowledge of this ChPid until we 828 %% get the sender_death from GM as well as a DOWN notification. 829 {ok, _TRef} = timer:apply_after( 830 ?DEATH_TIMEOUT, rabbit_amqqueue, run_backing_queue, 831 [self(), rabbit_mirror_queue_master, Fun]), 832 ok. 833 834forget_sender(_, running) -> false; 835forget_sender(down_from_gm, down_from_gm) -> false; %% [1] 836forget_sender(down_from_ch, down_from_ch) -> false; 837forget_sender(Down1, Down2) when Down1 =/= Down2 -> true. 838 839%% [1] If another mirror goes through confirm_sender_death/1 before we 840%% do we can get two GM sender_death messages in a row for the same 841%% channel - don't treat that as anything special. 842 843%% Record and process lifetime events from channels. Forget all about a channel 844%% only when down notifications are received from both the channel and from gm. 845maybe_forget_sender(ChPid, ChState, State = #state { sender_queues = SQ, 846 msg_id_status = MS, 847 known_senders = KS }) -> 848 case maps:find(ChPid, SQ) of 849 error -> 850 State; 851 {ok, {MQ, PendCh, ChStateRecord}} -> 852 case forget_sender(ChState, ChStateRecord) of 853 true -> 854 credit_flow:peer_down(ChPid), 855 State #state { sender_queues = maps:remove(ChPid, SQ), 856 msg_id_status = lists:foldl( 857 fun maps:remove/2, 858 MS, sets:to_list(PendCh)), 859 known_senders = pmon:demonitor(ChPid, KS) }; 860 false -> 861 SQ1 = maps:put(ChPid, {MQ, PendCh, ChState}, SQ), 862 State #state { sender_queues = SQ1 } 863 end 864 end. 865 866maybe_enqueue_message( 867 Delivery = #delivery { message = #basic_message { id = MsgId }, 868 sender = ChPid }, 869 State = #state { sender_queues = SQ, msg_id_status = MS }) -> 870 send_mandatory(Delivery), %% must do this before confirms 871 State1 = ensure_monitoring(ChPid, State), 872 %% We will never see {published, ChPid, MsgSeqNo} here. 873 case maps:find(MsgId, MS) of 874 error -> 875 {MQ, PendingCh, ChState} = get_sender_queue(ChPid, SQ), 876 MQ1 = queue:in(Delivery, MQ), 877 SQ1 = maps:put(ChPid, {MQ1, PendingCh, ChState}, SQ), 878 State1 #state { sender_queues = SQ1 }; 879 {ok, Status} -> 880 MS1 = send_or_record_confirm( 881 Status, Delivery, maps:remove(MsgId, MS), State1), 882 SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), 883 State1 #state { msg_id_status = MS1, 884 sender_queues = SQ1 } 885 end. 886 887get_sender_queue(ChPid, SQ) -> 888 case maps:find(ChPid, SQ) of 889 error -> {queue:new(), sets:new(), running}; 890 {ok, Val} -> Val 891 end. 892 893remove_from_pending_ch(MsgId, ChPid, SQ) -> 894 case maps:find(ChPid, SQ) of 895 error -> 896 SQ; 897 {ok, {MQ, PendingCh, ChState}} -> 898 maps:put(ChPid, {MQ, sets:del_element(MsgId, PendingCh), ChState}, 899 SQ) 900 end. 901 902publish_or_discard(Status, ChPid, MsgId, 903 State = #state { sender_queues = SQ, msg_id_status = MS }) -> 904 %% We really are going to do the publish/discard right now, even 905 %% though we may not have seen it directly from the channel. But 906 %% we cannot issue confirms until the latter has happened. So we 907 %% need to keep track of the MsgId and its confirmation status in 908 %% the meantime. 909 State1 = ensure_monitoring(ChPid, State), 910 {MQ, PendingCh, ChState} = get_sender_queue(ChPid, SQ), 911 {MQ1, PendingCh1, MS1} = 912 case queue:out(MQ) of 913 {empty, _MQ2} -> 914 {MQ, sets:add_element(MsgId, PendingCh), 915 maps:put(MsgId, Status, MS)}; 916 {{value, Delivery = #delivery { 917 message = #basic_message { id = MsgId } }}, MQ2} -> 918 {MQ2, PendingCh, 919 %% We received the msg from the channel first. Thus 920 %% we need to deal with confirms here. 921 send_or_record_confirm(Status, Delivery, MS, State1)}; 922 {{value, #delivery {}}, _MQ2} -> 923 %% The instruction was sent to us before we were 924 %% within the slave_pids within the #amqqueue{} 925 %% record. We'll never receive the message directly 926 %% from the channel. And the channel will not be 927 %% expecting any confirms from us. 928 {MQ, PendingCh, MS} 929 end, 930 SQ1 = maps:put(ChPid, {MQ1, PendingCh1, ChState}, SQ), 931 State1 #state { sender_queues = SQ1, msg_id_status = MS1 }. 932 933 934process_instruction({publish, ChPid, Flow, MsgProps, 935 Msg = #basic_message { id = MsgId }}, State) -> 936 maybe_flow_ack(ChPid, Flow), 937 State1 = #state { backing_queue = BQ, backing_queue_state = BQS } = 938 publish_or_discard(published, ChPid, MsgId, State), 939 BQS1 = BQ:publish(Msg, MsgProps, true, ChPid, Flow, BQS), 940 {ok, State1 #state { backing_queue_state = BQS1 }}; 941process_instruction({batch_publish, ChPid, Flow, Publishes}, State) -> 942 maybe_flow_ack(ChPid, Flow), 943 State1 = #state { backing_queue = BQ, backing_queue_state = BQS } = 944 lists:foldl(fun ({#basic_message { id = MsgId }, 945 _MsgProps, _IsDelivered}, St) -> 946 publish_or_discard(published, ChPid, MsgId, St) 947 end, State, Publishes), 948 BQS1 = BQ:batch_publish(Publishes, ChPid, Flow, BQS), 949 {ok, State1 #state { backing_queue_state = BQS1 }}; 950process_instruction({publish_delivered, ChPid, Flow, MsgProps, 951 Msg = #basic_message { id = MsgId }}, State) -> 952 maybe_flow_ack(ChPid, Flow), 953 State1 = #state { backing_queue = BQ, backing_queue_state = BQS } = 954 publish_or_discard(published, ChPid, MsgId, State), 955 true = BQ:is_empty(BQS), 956 {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, Flow, BQS), 957 {ok, maybe_store_ack(true, MsgId, AckTag, 958 State1 #state { backing_queue_state = BQS1 })}; 959process_instruction({batch_publish_delivered, ChPid, Flow, Publishes}, State) -> 960 maybe_flow_ack(ChPid, Flow), 961 {MsgIds, 962 State1 = #state { backing_queue = BQ, backing_queue_state = BQS }} = 963 lists:foldl(fun ({#basic_message { id = MsgId }, _MsgProps}, 964 {MsgIds, St}) -> 965 {[MsgId | MsgIds], 966 publish_or_discard(published, ChPid, MsgId, St)} 967 end, {[], State}, Publishes), 968 true = BQ:is_empty(BQS), 969 {AckTags, BQS1} = BQ:batch_publish_delivered(Publishes, ChPid, Flow, BQS), 970 MsgIdsAndAcks = lists:zip(lists:reverse(MsgIds), AckTags), 971 State2 = lists:foldl( 972 fun ({MsgId, AckTag}, St) -> 973 maybe_store_ack(true, MsgId, AckTag, St) 974 end, State1 #state { backing_queue_state = BQS1 }, 975 MsgIdsAndAcks), 976 {ok, State2}; 977process_instruction({discard, ChPid, Flow, MsgId}, State) -> 978 maybe_flow_ack(ChPid, Flow), 979 State1 = #state { backing_queue = BQ, backing_queue_state = BQS } = 980 publish_or_discard(discarded, ChPid, MsgId, State), 981 BQS1 = BQ:discard(MsgId, ChPid, Flow, BQS), 982 {ok, State1 #state { backing_queue_state = BQS1 }}; 983process_instruction({drop, Length, Dropped, AckRequired}, 984 State = #state { backing_queue = BQ, 985 backing_queue_state = BQS }) -> 986 QLen = BQ:len(BQS), 987 ToDrop = case QLen - Length of 988 N when N > 0 -> N; 989 _ -> 0 990 end, 991 State1 = lists:foldl( 992 fun (const, StateN = #state{backing_queue_state = BQSN}) -> 993 {{MsgId, AckTag}, BQSN1} = BQ:drop(AckRequired, BQSN), 994 maybe_store_ack( 995 AckRequired, MsgId, AckTag, 996 StateN #state { backing_queue_state = BQSN1 }) 997 end, State, lists:duplicate(ToDrop, const)), 998 {ok, case AckRequired of 999 true -> State1; 1000 false -> update_delta(ToDrop - Dropped, State1) 1001 end}; 1002process_instruction({ack, MsgIds}, 1003 State = #state { backing_queue = BQ, 1004 backing_queue_state = BQS, 1005 msg_id_ack = MA }) -> 1006 {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA), 1007 {MsgIds1, BQS1} = BQ:ack(AckTags, BQS), 1008 [] = MsgIds1 -- MsgIds, %% ASSERTION 1009 {ok, update_delta(length(MsgIds1) - length(MsgIds), 1010 State #state { msg_id_ack = MA1, 1011 backing_queue_state = BQS1 })}; 1012process_instruction({requeue, MsgIds}, 1013 State = #state { backing_queue = BQ, 1014 backing_queue_state = BQS, 1015 msg_id_ack = MA }) -> 1016 {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA), 1017 {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS), 1018 {ok, State #state { msg_id_ack = MA1, 1019 backing_queue_state = BQS1 }}; 1020process_instruction({sender_death, ChPid}, 1021 State = #state { known_senders = KS }) -> 1022 %% The channel will be monitored iff we have received a message 1023 %% from it. In this case we just want to avoid doing work if we 1024 %% never got any messages. 1025 {ok, case pmon:is_monitored(ChPid, KS) of 1026 false -> State; 1027 true -> maybe_forget_sender(ChPid, down_from_gm, State) 1028 end}; 1029process_instruction({depth, Depth}, 1030 State = #state { backing_queue = BQ, 1031 backing_queue_state = BQS }) -> 1032 {ok, set_delta(Depth - BQ:depth(BQS), State)}; 1033 1034process_instruction({delete_and_terminate, Reason}, 1035 State = #state { backing_queue = BQ, 1036 backing_queue_state = BQS }) -> 1037 BQ:delete_and_terminate(Reason, BQS), 1038 {stop, State #state { backing_queue_state = undefined }}; 1039process_instruction({set_queue_mode, Mode}, 1040 State = #state { backing_queue = BQ, 1041 backing_queue_state = BQS }) -> 1042 BQS1 = BQ:set_queue_mode(Mode, BQS), 1043 {ok, State #state { backing_queue_state = BQS1 }}. 1044 1045maybe_flow_ack(Sender, flow) -> credit_flow:ack(Sender); 1046maybe_flow_ack(_Sender, noflow) -> ok. 1047 1048msg_ids_to_acktags(MsgIds, MA) -> 1049 {AckTags, MA1} = 1050 lists:foldl( 1051 fun (MsgId, {Acc, MAN}) -> 1052 case maps:find(MsgId, MA) of 1053 error -> {Acc, MAN}; 1054 {ok, AckTag} -> {[AckTag | Acc], maps:remove(MsgId, MAN)} 1055 end 1056 end, {[], MA}, MsgIds), 1057 {lists:reverse(AckTags), MA1}. 1058 1059maybe_store_ack(false, _MsgId, _AckTag, State) -> 1060 State; 1061maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA }) -> 1062 State #state { msg_id_ack = maps:put(MsgId, AckTag, MA) }. 1063 1064set_delta(0, State = #state { depth_delta = undefined }) -> 1065 ok = record_synchronised(State#state.q), 1066 State #state { depth_delta = 0 }; 1067set_delta(NewDelta, State = #state { depth_delta = undefined }) -> 1068 true = NewDelta > 0, %% assertion 1069 State #state { depth_delta = NewDelta }; 1070set_delta(NewDelta, State = #state { depth_delta = Delta }) -> 1071 update_delta(NewDelta - Delta, State). 1072 1073update_delta(_DeltaChange, State = #state { depth_delta = undefined }) -> 1074 State; 1075update_delta( DeltaChange, State = #state { depth_delta = 0 }) -> 1076 0 = DeltaChange, %% assertion: we cannot become unsync'ed 1077 State; 1078update_delta( DeltaChange, State = #state { depth_delta = Delta }) -> 1079 true = DeltaChange =< 0, %% assertion: we cannot become 'less' sync'ed 1080 set_delta(Delta + DeltaChange, State #state { depth_delta = undefined }). 1081 1082update_ram_duration(BQ, BQS) -> 1083 {RamDuration, BQS1} = BQ:ram_duration(BQS), 1084 DesiredDuration = 1085 rabbit_memory_monitor:report_ram_duration(self(), RamDuration), 1086 BQ:set_ram_duration_target(DesiredDuration, BQS1). 1087 1088record_synchronised(Q0) when ?is_amqqueue(Q0) -> 1089 QName = amqqueue:get_name(Q0), 1090 Self = self(), 1091 F = fun () -> 1092 case mnesia:read({rabbit_queue, QName}) of 1093 [] -> 1094 ok; 1095 [Q1] when ?is_amqqueue(Q1) -> 1096 SSPids = amqqueue:get_sync_slave_pids(Q1), 1097 SSPids1 = [Self | SSPids], 1098 Q2 = amqqueue:set_sync_slave_pids(Q1, SSPids1), 1099 rabbit_mirror_queue_misc:store_updated_slaves(Q2), 1100 {ok, Q2} 1101 end 1102 end, 1103 case rabbit_misc:execute_mnesia_transaction(F) of 1104 ok -> ok; 1105 {ok, Q2} -> rabbit_mirror_queue_misc:maybe_drop_master_after_sync(Q2) 1106 end. 1107