1%% This Source Code Form is subject to the terms of the Mozilla Public 2%% License, v. 2.0. If a copy of the MPL was not distributed with this 3%% file, You can obtain one at https://mozilla.org/MPL/2.0/. 4%% 5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved. 6%% 7 8-module(rabbit_variable_queue). 9 10-export([init/3, terminate/2, delete_and_terminate/2, delete_crashed/1, 11 purge/1, purge_acks/1, 12 publish/6, publish_delivered/5, 13 batch_publish/4, batch_publish_delivered/4, 14 discard/4, drain_confirmed/1, 15 dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/2, 16 ackfold/4, fold/3, len/1, is_empty/1, depth/1, 17 set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, 18 handle_pre_hibernate/1, resume/1, msg_rates/1, 19 info/2, invoke/3, is_duplicate/2, set_queue_mode/2, 20 zip_msgs_and_acks/4, multiple_routing_keys/0, handle_info/2]). 21 22-export([start/2, stop/1]). 23 24%% exported for testing only 25-export([start_msg_store/3, stop_msg_store/1, init/6]). 26 27-export([move_messages_to_vhost_store/0]). 28 29-export([migrate_queue/3, migrate_message/3, get_per_vhost_store_client/2, 30 get_global_store_client/1, log_upgrade_verbose/1, 31 log_upgrade_verbose/2]). 32 33-include_lib("stdlib/include/qlc.hrl"). 34 35-define(QUEUE_MIGRATION_BATCH_SIZE, 100). 36-define(EMPTY_START_FUN_STATE, {fun (ok) -> finished end, ok}). 37 38%%---------------------------------------------------------------------------- 39%% Messages, and their position in the queue, can be in memory or on 40%% disk, or both. Persistent messages will have both message and 41%% position pushed to disk as soon as they arrive; transient messages 42%% can be written to disk (and thus both types can be evicted from 43%% memory) under memory pressure. The question of whether a message is 44%% in RAM and whether it is persistent are orthogonal. 45%% 46%% Messages are persisted using the queue index and the message 47%% store. Normally the queue index holds the position of the message 48%% *within this queue* along with a couple of small bits of metadata, 49%% while the message store holds the message itself (including headers 50%% and other properties). 51%% 52%% However, as an optimisation, small messages can be embedded 53%% directly in the queue index and bypass the message store 54%% altogether. 55%% 56%% Definitions: 57%% 58%% alpha: this is a message where both the message itself, and its 59%% position within the queue are held in RAM 60%% 61%% beta: this is a message where the message itself is only held on 62%% disk (if persisted to the message store) but its position 63%% within the queue is held in RAM. 64%% 65%% gamma: this is a message where the message itself is only held on 66%% disk, but its position is both in RAM and on disk. 67%% 68%% delta: this is a collection of messages, represented by a single 69%% term, where the messages and their position are only held on 70%% disk. 71%% 72%% Note that for persistent messages, the message and its position 73%% within the queue are always held on disk, *in addition* to being in 74%% one of the above classifications. 75%% 76%% Also note that within this code, the term gamma seldom 77%% appears. It's frequently the case that gammas are defined by betas 78%% who have had their queue position recorded on disk. 79%% 80%% In general, messages move q1 -> q2 -> delta -> q3 -> q4, though 81%% many of these steps are frequently skipped. q1 and q4 only hold 82%% alphas, q2 and q3 hold both betas and gammas. When a message 83%% arrives, its classification is determined. It is then added to the 84%% rightmost appropriate queue. 85%% 86%% If a new message is determined to be a beta or gamma, q1 is 87%% empty. If a new message is determined to be a delta, q1 and q2 are 88%% empty (and actually q4 too). 89%% 90%% When removing messages from a queue, if q4 is empty then q3 is read 91%% directly. If q3 becomes empty then the next segment's worth of 92%% messages from delta are read into q3, reducing the size of 93%% delta. If the queue is non empty, either q4 or q3 contain 94%% entries. It is never permitted for delta to hold all the messages 95%% in the queue. 96%% 97%% The duration indicated to us by the memory_monitor is used to 98%% calculate, given our current ingress and egress rates, how many 99%% messages we should hold in RAM (i.e. as alphas). We track the 100%% ingress and egress rates for both messages and pending acks and 101%% rates for both are considered when calculating the number of 102%% messages to hold in RAM. When we need to push alphas to betas or 103%% betas to gammas, we favour writing out messages that are further 104%% from the head of the queue. This minimises writes to disk, as the 105%% messages closer to the tail of the queue stay in the queue for 106%% longer, thus do not need to be replaced as quickly by sending other 107%% messages to disk. 108%% 109%% Whilst messages are pushed to disk and forgotten from RAM as soon 110%% as requested by a new setting of the queue RAM duration, the 111%% inverse is not true: we only load messages back into RAM as 112%% demanded as the queue is read from. Thus only publishes to the 113%% queue will take up available spare capacity. 114%% 115%% When we report our duration to the memory monitor, we calculate 116%% average ingress and egress rates over the last two samples, and 117%% then calculate our duration based on the sum of the ingress and 118%% egress rates. More than two samples could be used, but it's a 119%% balance between responding quickly enough to changes in 120%% producers/consumers versus ignoring temporary blips. The problem 121%% with temporary blips is that with just a few queues, they can have 122%% substantial impact on the calculation of the average duration and 123%% hence cause unnecessary I/O. Another alternative is to increase the 124%% amqqueue_process:RAM_DURATION_UPDATE_PERIOD to beyond 5 125%% seconds. However, that then runs the risk of being too slow to 126%% inform the memory monitor of changes. Thus a 5 second interval, 127%% plus a rolling average over the last two samples seems to work 128%% well in practice. 129%% 130%% The sum of the ingress and egress rates is used because the egress 131%% rate alone is not sufficient. Adding in the ingress rate means that 132%% queues which are being flooded by messages are given more memory, 133%% resulting in them being able to process the messages faster (by 134%% doing less I/O, or at least deferring it) and thus helping keep 135%% their mailboxes empty and thus the queue as a whole is more 136%% responsive. If such a queue also has fast but previously idle 137%% consumers, the consumer can then start to be driven as fast as it 138%% can go, whereas if only egress rate was being used, the incoming 139%% messages may have to be written to disk and then read back in, 140%% resulting in the hard disk being a bottleneck in driving the 141%% consumers. Generally, we want to give Rabbit every chance of 142%% getting rid of messages as fast as possible and remaining 143%% responsive, and using only the egress rate impacts that goal. 144%% 145%% Once the queue has more alphas than the target_ram_count, the 146%% surplus must be converted to betas, if not gammas, if not rolled 147%% into delta. The conditions under which these transitions occur 148%% reflect the conflicting goals of minimising RAM cost per msg, and 149%% minimising CPU cost per msg. Once the msg has become a beta, its 150%% payload is no longer in RAM, thus a read from the msg_store must 151%% occur before the msg can be delivered, but the RAM cost of a beta 152%% is the same as a gamma, so converting a beta to gamma will not free 153%% up any further RAM. To reduce the RAM cost further, the gamma must 154%% be rolled into delta. Whilst recovering a beta or a gamma to an 155%% alpha requires only one disk read (from the msg_store), recovering 156%% a msg from within delta will require two reads (queue_index and 157%% then msg_store). But delta has a near-0 per-msg RAM cost. So the 158%% conflict is between using delta more, which will free up more 159%% memory, but require additional CPU and disk ops, versus using delta 160%% less and gammas and betas more, which will cost more memory, but 161%% require fewer disk ops and less CPU overhead. 162%% 163%% In the case of a persistent msg published to a durable queue, the 164%% msg is immediately written to the msg_store and queue_index. If 165%% then additionally converted from an alpha, it'll immediately go to 166%% a gamma (as it's already in queue_index), and cannot exist as a 167%% beta. Thus a durable queue with a mixture of persistent and 168%% transient msgs in it which has more messages than permitted by the 169%% target_ram_count may contain an interspersed mixture of betas and 170%% gammas in q2 and q3. 171%% 172%% There is then a ratio that controls how many betas and gammas there 173%% can be. This is based on the target_ram_count and thus expresses 174%% the fact that as the number of permitted alphas in the queue falls, 175%% so should the number of betas and gammas fall (i.e. delta 176%% grows). If q2 and q3 contain more than the permitted number of 177%% betas and gammas, then the surplus are forcibly converted to gammas 178%% (as necessary) and then rolled into delta. The ratio is that 179%% delta/(betas+gammas+delta) equals 180%% (betas+gammas+delta)/(target_ram_count+betas+gammas+delta). I.e. as 181%% the target_ram_count shrinks to 0, so must betas and gammas. 182%% 183%% The conversion of betas to deltas is done if there are at least 184%% ?IO_BATCH_SIZE betas in q2 & q3. This value should not be too small, 185%% otherwise the frequent operations on the queues of q2 and q3 will not be 186%% effectively amortised (switching the direction of queue access defeats 187%% amortisation). Note that there is a natural upper bound due to credit_flow 188%% limits on the alpha to beta conversion. 189%% 190%% The conversion from alphas to betas is chunked due to the 191%% credit_flow limits of the msg_store. This further smooths the 192%% effects of changes to the target_ram_count and ensures the queue 193%% remains responsive even when there is a large amount of IO work to 194%% do. The 'resume' callback is utilised to ensure that conversions 195%% are done as promptly as possible whilst ensuring the queue remains 196%% responsive. 197%% 198%% In the queue we keep track of both messages that are pending 199%% delivery and messages that are pending acks. In the event of a 200%% queue purge, we only need to load qi segments if the queue has 201%% elements in deltas (i.e. it came under significant memory 202%% pressure). In the event of a queue deletion, in addition to the 203%% preceding, by keeping track of pending acks in RAM, we do not need 204%% to search through qi segments looking for messages that are yet to 205%% be acknowledged. 206%% 207%% Pending acks are recorded in memory by storing the message itself. 208%% If the message has been sent to disk, we do not store the message 209%% content. During memory reduction, pending acks containing message 210%% content have that content removed and the corresponding messages 211%% are pushed out to disk. 212%% 213%% Messages from pending acks are returned to q4, q3 and delta during 214%% requeue, based on the limits of seq_id contained in each. Requeued 215%% messages retain their original seq_id, maintaining order 216%% when requeued. 217%% 218%% The order in which alphas are pushed to betas and pending acks 219%% are pushed to disk is determined dynamically. We always prefer to 220%% push messages for the source (alphas or acks) that is growing the 221%% fastest (with growth measured as avg. ingress - avg. egress). 222%% 223%% Notes on Clean Shutdown 224%% (This documents behaviour in variable_queue, queue_index and 225%% msg_store.) 226%% 227%% In order to try to achieve as fast a start-up as possible, if a 228%% clean shutdown occurs, we try to save out state to disk to reduce 229%% work on startup. In the msg_store this takes the form of the 230%% index_module's state, plus the file_summary ets table, and client 231%% refs. In the VQ, this takes the form of the count of persistent 232%% messages in the queue and references into the msg_stores. The 233%% queue_index adds to these terms the details of its segments and 234%% stores the terms in the queue directory. 235%% 236%% Two message stores are used. One is created for persistent messages 237%% to durable queues that must survive restarts, and the other is used 238%% for all other messages that just happen to need to be written to 239%% disk. On start up we can therefore nuke the transient message 240%% store, and be sure that the messages in the persistent store are 241%% all that we need. 242%% 243%% The references to the msg_stores are there so that the msg_store 244%% knows to only trust its saved state if all of the queues it was 245%% previously talking to come up cleanly. Likewise, the queues 246%% themselves (esp queue_index) skips work in init if all the queues 247%% and msg_store were shutdown cleanly. This gives both good speed 248%% improvements and also robustness so that if anything possibly went 249%% wrong in shutdown (or there was subsequent manual tampering), all 250%% messages and queues that can be recovered are recovered, safely. 251%% 252%% To delete transient messages lazily, the variable_queue, on 253%% startup, stores the next_seq_id reported by the queue_index as the 254%% transient_threshold. From that point on, whenever it's reading a 255%% message off disk via the queue_index, if the seq_id is below this 256%% threshold and the message is transient then it drops the message 257%% (the message itself won't exist on disk because it would have been 258%% stored in the transient msg_store which would have had its saved 259%% state nuked on startup). This avoids the expensive operation of 260%% scanning the entire queue on startup in order to delete transient 261%% messages that were only pushed to disk to save memory. 262%% 263%%---------------------------------------------------------------------------- 264 265-behaviour(rabbit_backing_queue). 266 267-record(vqstate, 268 { q1, 269 q2, 270 delta, 271 q3, 272 q4, 273 next_seq_id, 274 ram_pending_ack, %% msgs using store, still in RAM 275 disk_pending_ack, %% msgs in store, paged out 276 qi_pending_ack, %% msgs using qi, *can't* be paged out 277 index_state, 278 msg_store_clients, 279 durable, 280 transient_threshold, 281 qi_embed_msgs_below, 282 283 len, %% w/o unacked 284 bytes, %% w/o unacked 285 unacked_bytes, 286 persistent_count, %% w unacked 287 persistent_bytes, %% w unacked 288 delta_transient_bytes, %% 289 290 target_ram_count, 291 ram_msg_count, %% w/o unacked 292 ram_msg_count_prev, 293 ram_ack_count_prev, 294 ram_bytes, %% w unacked 295 out_counter, 296 in_counter, 297 rates, 298 msgs_on_disk, 299 msg_indices_on_disk, 300 unconfirmed, 301 confirmed, 302 ack_out_counter, 303 ack_in_counter, 304 %% Unlike the other counters these two do not feed into 305 %% #rates{} and get reset 306 disk_read_count, 307 disk_write_count, 308 309 io_batch_size, 310 311 %% default queue or lazy queue 312 mode, 313 %% number of reduce_memory_usage executions, once it 314 %% reaches a threshold the queue will manually trigger a runtime GC 315 %% see: maybe_execute_gc/1 316 memory_reduction_run_count, 317 %% Queue data is grouped by VHost. We need to store it 318 %% to work with queue index. 319 virtual_host, 320 waiting_bump = false 321 }). 322 323-record(rates, { in, out, ack_in, ack_out, timestamp }). 324 325-record(msg_status, 326 { seq_id, 327 msg_id, 328 msg, 329 is_persistent, 330 is_delivered, 331 msg_in_store, 332 index_on_disk, 333 persist_to, 334 msg_props 335 }). 336 337-record(delta, 338 { start_seq_id, %% start_seq_id is inclusive 339 count, 340 transient, 341 end_seq_id %% end_seq_id is exclusive 342 }). 343 344-define(HEADER_GUESS_SIZE, 100). %% see determine_persist_to/2 345-define(PERSISTENT_MSG_STORE, msg_store_persistent). 346-define(TRANSIENT_MSG_STORE, msg_store_transient). 347 348-define(QUEUE, lqueue). 349 350-include_lib("rabbit_common/include/rabbit.hrl"). 351-include_lib("rabbit_common/include/rabbit_framing.hrl"). 352-include("amqqueue.hrl"). 353 354%%---------------------------------------------------------------------------- 355 356-rabbit_upgrade({multiple_routing_keys, local, []}). 357-rabbit_upgrade({move_messages_to_vhost_store, message_store, []}). 358 359-type seq_id() :: non_neg_integer(). 360 361-type rates() :: #rates { in :: float(), 362 out :: float(), 363 ack_in :: float(), 364 ack_out :: float(), 365 timestamp :: rabbit_types:timestamp()}. 366 367-type delta() :: #delta { start_seq_id :: non_neg_integer(), 368 count :: non_neg_integer(), 369 end_seq_id :: non_neg_integer() }. 370 371%% The compiler (rightfully) complains that ack() and state() are 372%% unused. For this reason we duplicate a -spec from 373%% rabbit_backing_queue with the only intent being to remove 374%% warnings. The problem here is that we can't parameterise the BQ 375%% behaviour by these two types as we would like to. We still leave 376%% these here for documentation purposes. 377-type ack() :: seq_id(). 378-type state() :: #vqstate { 379 q1 :: ?QUEUE:?QUEUE(), 380 q2 :: ?QUEUE:?QUEUE(), 381 delta :: delta(), 382 q3 :: ?QUEUE:?QUEUE(), 383 q4 :: ?QUEUE:?QUEUE(), 384 next_seq_id :: seq_id(), 385 ram_pending_ack :: gb_trees:tree(), 386 disk_pending_ack :: gb_trees:tree(), 387 qi_pending_ack :: gb_trees:tree(), 388 index_state :: any(), 389 msg_store_clients :: 'undefined' | {{any(), binary()}, 390 {any(), binary()}}, 391 durable :: boolean(), 392 transient_threshold :: non_neg_integer(), 393 qi_embed_msgs_below :: non_neg_integer(), 394 395 len :: non_neg_integer(), 396 bytes :: non_neg_integer(), 397 unacked_bytes :: non_neg_integer(), 398 399 persistent_count :: non_neg_integer(), 400 persistent_bytes :: non_neg_integer(), 401 402 target_ram_count :: non_neg_integer() | 'infinity', 403 ram_msg_count :: non_neg_integer(), 404 ram_msg_count_prev :: non_neg_integer(), 405 ram_ack_count_prev :: non_neg_integer(), 406 ram_bytes :: non_neg_integer(), 407 out_counter :: non_neg_integer(), 408 in_counter :: non_neg_integer(), 409 rates :: rates(), 410 msgs_on_disk :: gb_sets:set(), 411 msg_indices_on_disk :: gb_sets:set(), 412 unconfirmed :: gb_sets:set(), 413 confirmed :: gb_sets:set(), 414 ack_out_counter :: non_neg_integer(), 415 ack_in_counter :: non_neg_integer(), 416 disk_read_count :: non_neg_integer(), 417 disk_write_count :: non_neg_integer(), 418 419 io_batch_size :: pos_integer(), 420 mode :: 'default' | 'lazy', 421 memory_reduction_run_count :: non_neg_integer()}. 422 423-define(BLANK_DELTA, #delta { start_seq_id = undefined, 424 count = 0, 425 transient = 0, 426 end_seq_id = undefined }). 427-define(BLANK_DELTA_PATTERN(Z), #delta { start_seq_id = Z, 428 count = 0, 429 transient = 0, 430 end_seq_id = Z }). 431 432-define(MICROS_PER_SECOND, 1000000.0). 433 434%% We're sampling every 5s for RAM duration; a half life that is of 435%% the same order of magnitude is probably about right. 436-define(RATE_AVG_HALF_LIFE, 5.0). 437 438%% We will recalculate the #rates{} every time we get asked for our 439%% RAM duration, or every N messages published, whichever is 440%% sooner. We do this since the priority calculations in 441%% rabbit_amqqueue_process need fairly fresh rates. 442-define(MSGS_PER_RATE_CALC, 100). 443 444%% we define the garbage collector threshold 445%% it needs to tune the `reduce_memory_use` calls. Thus, the garbage collection. 446%% see: rabbitmq-server-973 and rabbitmq-server-964 447-define(DEFAULT_EXPLICIT_GC_RUN_OP_THRESHOLD, 1000). 448-define(EXPLICIT_GC_RUN_OP_THRESHOLD(Mode), 449 case get(explicit_gc_run_operation_threshold) of 450 undefined -> 451 Val = explicit_gc_run_operation_threshold_for_mode(Mode), 452 put(explicit_gc_run_operation_threshold, Val), 453 Val; 454 Val -> Val 455 end). 456 457explicit_gc_run_operation_threshold_for_mode(Mode) -> 458 {Key, Fallback} = case Mode of 459 lazy -> {lazy_queue_explicit_gc_run_operation_threshold, 460 ?DEFAULT_EXPLICIT_GC_RUN_OP_THRESHOLD}; 461 _ -> {queue_explicit_gc_run_operation_threshold, 462 ?DEFAULT_EXPLICIT_GC_RUN_OP_THRESHOLD} 463 end, 464 rabbit_misc:get_env(rabbit, Key, Fallback). 465 466%%---------------------------------------------------------------------------- 467%% Public API 468%%---------------------------------------------------------------------------- 469 470start(VHost, DurableQueues) -> 471 {AllTerms, StartFunState} = rabbit_queue_index:start(VHost, DurableQueues), 472 %% Group recovery terms by vhost. 473 ClientRefs = [Ref || Terms <- AllTerms, 474 Terms /= non_clean_shutdown, 475 begin 476 Ref = proplists:get_value(persistent_ref, Terms), 477 Ref =/= undefined 478 end], 479 start_msg_store(VHost, ClientRefs, StartFunState), 480 {ok, AllTerms}. 481 482stop(VHost) -> 483 ok = stop_msg_store(VHost), 484 ok = rabbit_queue_index:stop(VHost). 485 486start_msg_store(VHost, Refs, StartFunState) when is_list(Refs); Refs == undefined -> 487 rabbit_log:info("Starting message stores for vhost '~s'", [VHost]), 488 do_start_msg_store(VHost, ?TRANSIENT_MSG_STORE, undefined, ?EMPTY_START_FUN_STATE), 489 do_start_msg_store(VHost, ?PERSISTENT_MSG_STORE, Refs, StartFunState), 490 ok. 491 492do_start_msg_store(VHost, Type, Refs, StartFunState) -> 493 case rabbit_vhost_msg_store:start(VHost, Type, Refs, StartFunState) of 494 {ok, _} -> 495 rabbit_log:info("Started message store of type ~s for vhost '~s'", [abbreviated_type(Type), VHost]); 496 {error, {no_such_vhost, VHost}} = Err -> 497 rabbit_log:error("Failed to start message store of type ~s for vhost '~s': the vhost no longer exists!", 498 [Type, VHost]), 499 exit(Err); 500 {error, Error} -> 501 rabbit_log:error("Failed to start message store of type ~s for vhost '~s': ~p", 502 [Type, VHost, Error]), 503 exit({error, Error}) 504 end. 505 506abbreviated_type(?TRANSIENT_MSG_STORE) -> transient; 507abbreviated_type(?PERSISTENT_MSG_STORE) -> persistent. 508 509stop_msg_store(VHost) -> 510 rabbit_vhost_msg_store:stop(VHost, ?TRANSIENT_MSG_STORE), 511 rabbit_vhost_msg_store:stop(VHost, ?PERSISTENT_MSG_STORE), 512 ok. 513 514init(Queue, Recover, Callback) -> 515 init( 516 Queue, Recover, Callback, 517 fun (MsgIds, ActionTaken) -> 518 msgs_written_to_disk(Callback, MsgIds, ActionTaken) 519 end, 520 fun (MsgIds) -> msg_indices_written_to_disk(Callback, MsgIds) end, 521 fun (MsgIds) -> msgs_and_indices_written_to_disk(Callback, MsgIds) end). 522 523init(Q, new, AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) when ?is_amqqueue(Q) -> 524 QueueName = amqqueue:get_name(Q), 525 IsDurable = amqqueue:is_durable(Q), 526 IndexState = rabbit_queue_index:init(QueueName, 527 MsgIdxOnDiskFun, MsgAndIdxOnDiskFun), 528 VHost = QueueName#resource.virtual_host, 529 init(IsDurable, IndexState, 0, 0, [], 530 case IsDurable of 531 true -> msg_store_client_init(?PERSISTENT_MSG_STORE, 532 MsgOnDiskFun, AsyncCallback, VHost); 533 false -> undefined 534 end, 535 msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, 536 AsyncCallback, VHost), VHost); 537 538%% We can be recovering a transient queue if it crashed 539init(Q, Terms, AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) when ?is_amqqueue(Q) -> 540 QueueName = amqqueue:get_name(Q), 541 IsDurable = amqqueue:is_durable(Q), 542 {PRef, RecoveryTerms} = process_recovery_terms(Terms), 543 VHost = QueueName#resource.virtual_host, 544 {PersistentClient, ContainsCheckFun} = 545 case IsDurable of 546 true -> C = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef, 547 MsgOnDiskFun, AsyncCallback, 548 VHost), 549 {C, fun (MsgId) when is_binary(MsgId) -> 550 rabbit_msg_store:contains(MsgId, C); 551 (#basic_message{is_persistent = Persistent}) -> 552 Persistent 553 end}; 554 false -> {undefined, fun(_MsgId) -> false end} 555 end, 556 TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE, 557 undefined, AsyncCallback, 558 VHost), 559 {DeltaCount, DeltaBytes, IndexState} = 560 rabbit_queue_index:recover( 561 QueueName, RecoveryTerms, 562 rabbit_vhost_msg_store:successfully_recovered_state( 563 VHost, 564 ?PERSISTENT_MSG_STORE), 565 ContainsCheckFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun), 566 init(IsDurable, IndexState, DeltaCount, DeltaBytes, RecoveryTerms, 567 PersistentClient, TransientClient, VHost). 568 569process_recovery_terms(Terms=non_clean_shutdown) -> 570 {rabbit_guid:gen(), Terms}; 571process_recovery_terms(Terms) -> 572 case proplists:get_value(persistent_ref, Terms) of 573 undefined -> {rabbit_guid:gen(), []}; 574 PRef -> {PRef, Terms} 575 end. 576 577terminate(_Reason, State) -> 578 State1 = #vqstate { virtual_host = VHost, 579 persistent_count = PCount, 580 persistent_bytes = PBytes, 581 index_state = IndexState, 582 msg_store_clients = {MSCStateP, MSCStateT} } = 583 purge_pending_ack(true, State), 584 PRef = case MSCStateP of 585 undefined -> undefined; 586 _ -> ok = maybe_client_terminate(MSCStateP), 587 rabbit_msg_store:client_ref(MSCStateP) 588 end, 589 ok = rabbit_msg_store:client_delete_and_terminate(MSCStateT), 590 Terms = [{persistent_ref, PRef}, 591 {persistent_count, PCount}, 592 {persistent_bytes, PBytes}], 593 a(State1#vqstate { 594 index_state = rabbit_queue_index:terminate(VHost, Terms, IndexState), 595 msg_store_clients = undefined }). 596 597%% the only difference between purge and delete is that delete also 598%% needs to delete everything that's been delivered and not ack'd. 599delete_and_terminate(_Reason, State) -> 600 %% Normally when we purge messages we interact with the qi by 601 %% issues delivers and acks for every purged message. In this case 602 %% we don't need to do that, so we just delete the qi. 603 State1 = purge_and_index_reset(State), 604 State2 = #vqstate { msg_store_clients = {MSCStateP, MSCStateT} } = 605 purge_pending_ack_delete_and_terminate(State1), 606 case MSCStateP of 607 undefined -> ok; 608 _ -> rabbit_msg_store:client_delete_and_terminate(MSCStateP) 609 end, 610 rabbit_msg_store:client_delete_and_terminate(MSCStateT), 611 a(State2 #vqstate { msg_store_clients = undefined }). 612 613delete_crashed(Q) when ?is_amqqueue(Q) -> 614 QName = amqqueue:get_name(Q), 615 ok = rabbit_queue_index:erase(QName). 616 617purge(State = #vqstate { len = Len }) -> 618 case is_pending_ack_empty(State) and is_unconfirmed_empty(State) of 619 true -> 620 {Len, purge_and_index_reset(State)}; 621 false -> 622 {Len, purge_when_pending_acks(State)} 623 end. 624 625purge_acks(State) -> a(purge_pending_ack(false, State)). 626 627publish(Msg, MsgProps, IsDelivered, ChPid, Flow, State) -> 628 State1 = 629 publish1(Msg, MsgProps, IsDelivered, ChPid, Flow, 630 fun maybe_write_to_disk/4, 631 State), 632 a(maybe_reduce_memory_use(maybe_update_rates(State1))). 633 634batch_publish(Publishes, ChPid, Flow, State) -> 635 {ChPid, Flow, State1} = 636 lists:foldl(fun batch_publish1/2, {ChPid, Flow, State}, Publishes), 637 State2 = ui(State1), 638 a(maybe_reduce_memory_use(maybe_update_rates(State2))). 639 640publish_delivered(Msg, MsgProps, ChPid, Flow, State) -> 641 {SeqId, State1} = 642 publish_delivered1(Msg, MsgProps, ChPid, Flow, 643 fun maybe_write_to_disk/4, 644 State), 645 {SeqId, a(maybe_reduce_memory_use(maybe_update_rates(State1)))}. 646 647batch_publish_delivered(Publishes, ChPid, Flow, State) -> 648 {ChPid, Flow, SeqIds, State1} = 649 lists:foldl(fun batch_publish_delivered1/2, 650 {ChPid, Flow, [], State}, Publishes), 651 State2 = ui(State1), 652 {lists:reverse(SeqIds), a(maybe_reduce_memory_use(maybe_update_rates(State2)))}. 653 654discard(_MsgId, _ChPid, _Flow, State) -> State. 655 656drain_confirmed(State = #vqstate { confirmed = C }) -> 657 case gb_sets:is_empty(C) of 658 true -> {[], State}; %% common case 659 false -> {gb_sets:to_list(C), State #vqstate { 660 confirmed = gb_sets:new() }} 661 end. 662 663dropwhile(Pred, State) -> 664 {MsgProps, State1} = 665 remove_by_predicate(Pred, State), 666 {MsgProps, a(State1)}. 667 668fetchwhile(Pred, Fun, Acc, State) -> 669 {MsgProps, Acc1, State1} = 670 fetch_by_predicate(Pred, Fun, Acc, State), 671 {MsgProps, Acc1, a(State1)}. 672 673fetch(AckRequired, State) -> 674 case queue_out(State) of 675 {empty, State1} -> 676 {empty, a(State1)}; 677 {{value, MsgStatus}, State1} -> 678 %% it is possible that the message wasn't read from disk 679 %% at this point, so read it in. 680 {Msg, State2} = read_msg(MsgStatus, State1), 681 {AckTag, State3} = remove(AckRequired, MsgStatus, State2), 682 {{Msg, MsgStatus#msg_status.is_delivered, AckTag}, a(State3)} 683 end. 684 685drop(AckRequired, State) -> 686 case queue_out(State) of 687 {empty, State1} -> 688 {empty, a(State1)}; 689 {{value, MsgStatus}, State1} -> 690 {AckTag, State2} = remove(AckRequired, MsgStatus, State1), 691 {{MsgStatus#msg_status.msg_id, AckTag}, a(State2)} 692 end. 693 694%% Duplicated from rabbit_backing_queue 695-spec ack([ack()], state()) -> {[rabbit_guid:guid()], state()}. 696 697ack([], State) -> 698 {[], State}; 699%% optimisation: this head is essentially a partial evaluation of the 700%% general case below, for the single-ack case. 701ack([SeqId], State) -> 702 case remove_pending_ack(true, SeqId, State) of 703 {none, _} -> 704 {[], State}; 705 {#msg_status { msg_id = MsgId, 706 is_persistent = IsPersistent, 707 msg_in_store = MsgInStore, 708 index_on_disk = IndexOnDisk }, 709 State1 = #vqstate { index_state = IndexState, 710 msg_store_clients = MSCState, 711 ack_out_counter = AckOutCount }} -> 712 IndexState1 = case IndexOnDisk of 713 true -> rabbit_queue_index:ack([SeqId], IndexState); 714 false -> IndexState 715 end, 716 case MsgInStore of 717 true -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]); 718 false -> ok 719 end, 720 {[MsgId], 721 a(State1 #vqstate { index_state = IndexState1, 722 ack_out_counter = AckOutCount + 1 })} 723 end; 724ack(AckTags, State) -> 725 {{IndexOnDiskSeqIds, MsgIdsByStore, AllMsgIds}, 726 State1 = #vqstate { index_state = IndexState, 727 msg_store_clients = MSCState, 728 ack_out_counter = AckOutCount }} = 729 lists:foldl( 730 fun (SeqId, {Acc, State2}) -> 731 case remove_pending_ack(true, SeqId, State2) of 732 {none, _} -> 733 {Acc, State2}; 734 {MsgStatus, State3} -> 735 {accumulate_ack(MsgStatus, Acc), State3} 736 end 737 end, {accumulate_ack_init(), State}, AckTags), 738 IndexState1 = rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState), 739 remove_msgs_by_id(MsgIdsByStore, MSCState), 740 {lists:reverse(AllMsgIds), 741 a(State1 #vqstate { index_state = IndexState1, 742 ack_out_counter = AckOutCount + length(AckTags) })}. 743 744requeue(AckTags, #vqstate { mode = default, 745 delta = Delta, 746 q3 = Q3, 747 q4 = Q4, 748 in_counter = InCounter, 749 len = Len } = State) -> 750 {SeqIds, Q4a, MsgIds, State1} = queue_merge(lists:sort(AckTags), Q4, [], 751 beta_limit(Q3), 752 fun publish_alpha/2, State), 753 {SeqIds1, Q3a, MsgIds1, State2} = queue_merge(SeqIds, Q3, MsgIds, 754 delta_limit(Delta), 755 fun publish_beta/2, State1), 756 {Delta1, MsgIds2, State3} = delta_merge(SeqIds1, Delta, MsgIds1, 757 State2), 758 MsgCount = length(MsgIds2), 759 {MsgIds2, a(maybe_reduce_memory_use( 760 maybe_update_rates(ui( 761 State3 #vqstate { delta = Delta1, 762 q3 = Q3a, 763 q4 = Q4a, 764 in_counter = InCounter + MsgCount, 765 len = Len + MsgCount }))))}; 766requeue(AckTags, #vqstate { mode = lazy, 767 delta = Delta, 768 q3 = Q3, 769 in_counter = InCounter, 770 len = Len } = State) -> 771 {SeqIds, Q3a, MsgIds, State1} = queue_merge(lists:sort(AckTags), Q3, [], 772 delta_limit(Delta), 773 fun publish_beta/2, State), 774 {Delta1, MsgIds1, State2} = delta_merge(SeqIds, Delta, MsgIds, 775 State1), 776 MsgCount = length(MsgIds1), 777 {MsgIds1, a(maybe_reduce_memory_use( 778 maybe_update_rates(ui( 779 State2 #vqstate { delta = Delta1, 780 q3 = Q3a, 781 in_counter = InCounter + MsgCount, 782 len = Len + MsgCount }))))}. 783 784ackfold(MsgFun, Acc, State, AckTags) -> 785 {AccN, StateN} = 786 lists:foldl(fun(SeqId, {Acc0, State0}) -> 787 MsgStatus = lookup_pending_ack(SeqId, State0), 788 {Msg, State1} = read_msg(MsgStatus, State0), 789 {MsgFun(Msg, SeqId, Acc0), State1} 790 end, {Acc, State}, AckTags), 791 {AccN, a(StateN)}. 792 793fold(Fun, Acc, State = #vqstate{index_state = IndexState}) -> 794 {Its, IndexState1} = lists:foldl(fun inext/2, {[], IndexState}, 795 [msg_iterator(State), 796 disk_ack_iterator(State), 797 ram_ack_iterator(State), 798 qi_ack_iterator(State)]), 799 ifold(Fun, Acc, Its, State#vqstate{index_state = IndexState1}). 800 801len(#vqstate { len = Len }) -> Len. 802 803is_empty(State) -> 0 == len(State). 804 805depth(State) -> 806 len(State) + count_pending_acks(State). 807 808set_ram_duration_target( 809 DurationTarget, State = #vqstate { 810 rates = #rates { in = AvgIngressRate, 811 out = AvgEgressRate, 812 ack_in = AvgAckIngressRate, 813 ack_out = AvgAckEgressRate }, 814 target_ram_count = TargetRamCount }) -> 815 Rate = 816 AvgEgressRate + AvgIngressRate + AvgAckEgressRate + AvgAckIngressRate, 817 TargetRamCount1 = 818 case DurationTarget of 819 infinity -> infinity; 820 _ -> trunc(DurationTarget * Rate) %% msgs = sec * msgs/sec 821 end, 822 State1 = State #vqstate { target_ram_count = TargetRamCount1 }, 823 a(case TargetRamCount1 == infinity orelse 824 (TargetRamCount =/= infinity andalso 825 TargetRamCount1 >= TargetRamCount) of 826 true -> State1; 827 false -> reduce_memory_use(State1) 828 end). 829 830maybe_update_rates(State = #vqstate{ in_counter = InCount, 831 out_counter = OutCount }) 832 when InCount + OutCount > ?MSGS_PER_RATE_CALC -> 833 update_rates(State); 834maybe_update_rates(State) -> 835 State. 836 837update_rates(State = #vqstate{ in_counter = InCount, 838 out_counter = OutCount, 839 ack_in_counter = AckInCount, 840 ack_out_counter = AckOutCount, 841 rates = #rates{ in = InRate, 842 out = OutRate, 843 ack_in = AckInRate, 844 ack_out = AckOutRate, 845 timestamp = TS }}) -> 846 Now = erlang:monotonic_time(), 847 848 Rates = #rates { in = update_rate(Now, TS, InCount, InRate), 849 out = update_rate(Now, TS, OutCount, OutRate), 850 ack_in = update_rate(Now, TS, AckInCount, AckInRate), 851 ack_out = update_rate(Now, TS, AckOutCount, AckOutRate), 852 timestamp = Now }, 853 854 State#vqstate{ in_counter = 0, 855 out_counter = 0, 856 ack_in_counter = 0, 857 ack_out_counter = 0, 858 rates = Rates }. 859 860update_rate(Now, TS, Count, Rate) -> 861 Time = erlang:convert_time_unit(Now - TS, native, micro_seconds) / 862 ?MICROS_PER_SECOND, 863 if 864 Time == 0 -> Rate; 865 true -> rabbit_misc:moving_average(Time, ?RATE_AVG_HALF_LIFE, 866 Count / Time, Rate) 867 end. 868 869ram_duration(State) -> 870 State1 = #vqstate { rates = #rates { in = AvgIngressRate, 871 out = AvgEgressRate, 872 ack_in = AvgAckIngressRate, 873 ack_out = AvgAckEgressRate }, 874 ram_msg_count = RamMsgCount, 875 ram_msg_count_prev = RamMsgCountPrev, 876 ram_pending_ack = RPA, 877 qi_pending_ack = QPA, 878 ram_ack_count_prev = RamAckCountPrev } = 879 update_rates(State), 880 881 RamAckCount = gb_trees:size(RPA) + gb_trees:size(QPA), 882 883 Duration = %% msgs+acks / (msgs+acks/sec) == sec 884 case lists:all(fun (X) -> X < 0.01 end, 885 [AvgEgressRate, AvgIngressRate, 886 AvgAckEgressRate, AvgAckIngressRate]) of 887 true -> infinity; 888 false -> (RamMsgCountPrev + RamMsgCount + 889 RamAckCount + RamAckCountPrev) / 890 (4 * (AvgEgressRate + AvgIngressRate + 891 AvgAckEgressRate + AvgAckIngressRate)) 892 end, 893 894 {Duration, State1}. 895 896needs_timeout(#vqstate { index_state = IndexState }) -> 897 case rabbit_queue_index:needs_sync(IndexState) of 898 confirms -> timed; 899 other -> idle; 900 false -> false 901 end. 902 903timeout(State = #vqstate { index_state = IndexState }) -> 904 State #vqstate { index_state = rabbit_queue_index:sync(IndexState) }. 905 906handle_pre_hibernate(State = #vqstate { index_state = IndexState }) -> 907 State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }. 908 909handle_info(bump_reduce_memory_use, State = #vqstate{ waiting_bump = true }) -> 910 State#vqstate{ waiting_bump = false }; 911handle_info(bump_reduce_memory_use, State) -> 912 State. 913 914resume(State) -> a(reduce_memory_use(State)). 915 916msg_rates(#vqstate { rates = #rates { in = AvgIngressRate, 917 out = AvgEgressRate } }) -> 918 {AvgIngressRate, AvgEgressRate}. 919 920info(messages_ready_ram, #vqstate{ram_msg_count = RamMsgCount}) -> 921 RamMsgCount; 922info(messages_unacknowledged_ram, #vqstate{ram_pending_ack = RPA, 923 qi_pending_ack = QPA}) -> 924 gb_trees:size(RPA) + gb_trees:size(QPA); 925info(messages_ram, State) -> 926 info(messages_ready_ram, State) + info(messages_unacknowledged_ram, State); 927info(messages_persistent, #vqstate{persistent_count = PersistentCount}) -> 928 PersistentCount; 929info(messages_paged_out, #vqstate{delta = #delta{transient = Count}}) -> 930 Count; 931info(message_bytes, #vqstate{bytes = Bytes, 932 unacked_bytes = UBytes}) -> 933 Bytes + UBytes; 934info(message_bytes_ready, #vqstate{bytes = Bytes}) -> 935 Bytes; 936info(message_bytes_unacknowledged, #vqstate{unacked_bytes = UBytes}) -> 937 UBytes; 938info(message_bytes_ram, #vqstate{ram_bytes = RamBytes}) -> 939 RamBytes; 940info(message_bytes_persistent, #vqstate{persistent_bytes = PersistentBytes}) -> 941 PersistentBytes; 942info(message_bytes_paged_out, #vqstate{delta_transient_bytes = PagedOutBytes}) -> 943 PagedOutBytes; 944info(head_message_timestamp, #vqstate{ 945 q3 = Q3, 946 q4 = Q4, 947 ram_pending_ack = RPA, 948 qi_pending_ack = QPA}) -> 949 head_message_timestamp(Q3, Q4, RPA, QPA); 950info(disk_reads, #vqstate{disk_read_count = Count}) -> 951 Count; 952info(disk_writes, #vqstate{disk_write_count = Count}) -> 953 Count; 954info(backing_queue_status, #vqstate { 955 q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, 956 mode = Mode, 957 len = Len, 958 target_ram_count = TargetRamCount, 959 next_seq_id = NextSeqId, 960 rates = #rates { in = AvgIngressRate, 961 out = AvgEgressRate, 962 ack_in = AvgAckIngressRate, 963 ack_out = AvgAckEgressRate }}) -> 964 965 [ {mode , Mode}, 966 {q1 , ?QUEUE:len(Q1)}, 967 {q2 , ?QUEUE:len(Q2)}, 968 {delta , Delta}, 969 {q3 , ?QUEUE:len(Q3)}, 970 {q4 , ?QUEUE:len(Q4)}, 971 {len , Len}, 972 {target_ram_count , TargetRamCount}, 973 {next_seq_id , NextSeqId}, 974 {avg_ingress_rate , AvgIngressRate}, 975 {avg_egress_rate , AvgEgressRate}, 976 {avg_ack_ingress_rate, AvgAckIngressRate}, 977 {avg_ack_egress_rate , AvgAckEgressRate} ]; 978info(_, _) -> 979 ''. 980 981invoke(?MODULE, Fun, State) -> Fun(?MODULE, State); 982invoke( _, _, State) -> State. 983 984is_duplicate(_Msg, State) -> {false, State}. 985 986set_queue_mode(Mode, State = #vqstate { mode = Mode }) -> 987 State; 988set_queue_mode(lazy, State = #vqstate { 989 target_ram_count = TargetRamCount }) -> 990 %% To become a lazy queue we need to page everything to disk first. 991 State1 = convert_to_lazy(State), 992 %% restore the original target_ram_count 993 a(State1 #vqstate { mode = lazy, target_ram_count = TargetRamCount }); 994set_queue_mode(default, State) -> 995 %% becoming a default queue means loading messages from disk like 996 %% when a queue is recovered. 997 a(maybe_deltas_to_betas(State #vqstate { mode = default })); 998set_queue_mode(_, State) -> 999 State. 1000 1001zip_msgs_and_acks(Msgs, AckTags, Accumulator, _State) -> 1002 lists:foldl(fun ({{#basic_message{ id = Id }, _Props}, AckTag}, Acc) -> 1003 [{Id, AckTag} | Acc] 1004 end, Accumulator, lists:zip(Msgs, AckTags)). 1005 1006convert_to_lazy(State) -> 1007 State1 = #vqstate { delta = Delta, q3 = Q3, len = Len } = 1008 set_ram_duration_target(0, State), 1009 case Delta#delta.count + ?QUEUE:len(Q3) == Len of 1010 true -> 1011 State1; 1012 false -> 1013 %% When pushing messages to disk, we might have been 1014 %% blocked by the msg_store, so we need to see if we have 1015 %% to wait for more credit, and then keep paging messages. 1016 %% 1017 %% The amqqueue_process could have taken care of this, but 1018 %% between the time it receives the bump_credit msg and 1019 %% calls BQ:resume to keep paging messages to disk, some 1020 %% other request may arrive to the BQ which at this moment 1021 %% is not in a proper state for a lazy BQ (unless all 1022 %% messages have been paged to disk already). 1023 wait_for_msg_store_credit(), 1024 convert_to_lazy(resume(State1)) 1025 end. 1026 1027wait_for_msg_store_credit() -> 1028 case credit_flow:blocked() of 1029 true -> receive 1030 {bump_credit, Msg} -> 1031 credit_flow:handle_bump_msg(Msg) 1032 end; 1033 false -> ok 1034 end. 1035 1036%% Get the Timestamp property of the first msg, if present. This is 1037%% the one with the oldest timestamp among the heads of the pending 1038%% acks and unread queues. We can't check disk_pending_acks as these 1039%% are paged out - we assume some will soon be paged in rather than 1040%% forcing it to happen. Pending ack msgs are included as they are 1041%% regarded as unprocessed until acked, this also prevents the result 1042%% apparently oscillating during repeated rejects. Q3 is only checked 1043%% when Q4 is empty as any Q4 msg will be earlier. 1044head_message_timestamp(Q3, Q4, RPA, QPA) -> 1045 HeadMsgs = [ HeadMsgStatus#msg_status.msg || 1046 HeadMsgStatus <- 1047 [ get_qs_head([Q4, Q3]), 1048 get_pa_head(RPA), 1049 get_pa_head(QPA) ], 1050 HeadMsgStatus /= undefined, 1051 HeadMsgStatus#msg_status.msg /= undefined ], 1052 1053 Timestamps = 1054 [Timestamp || HeadMsg <- HeadMsgs, 1055 Timestamp <- [rabbit_basic:extract_timestamp( 1056 HeadMsg#basic_message.content)], 1057 Timestamp /= undefined 1058 ], 1059 1060 case Timestamps == [] of 1061 true -> ''; 1062 false -> lists:min(Timestamps) 1063 end. 1064 1065get_qs_head(Qs) -> 1066 catch lists:foldl( 1067 fun (Q, Acc) -> 1068 case get_q_head(Q) of 1069 undefined -> Acc; 1070 Val -> throw(Val) 1071 end 1072 end, undefined, Qs). 1073 1074get_q_head(Q) -> 1075 get_collection_head(Q, fun ?QUEUE:is_empty/1, fun ?QUEUE:peek/1). 1076 1077get_pa_head(PA) -> 1078 get_collection_head(PA, fun gb_trees:is_empty/1, fun gb_trees:smallest/1). 1079 1080get_collection_head(Col, IsEmpty, GetVal) -> 1081 case IsEmpty(Col) of 1082 false -> 1083 {_, MsgStatus} = GetVal(Col), 1084 MsgStatus; 1085 true -> undefined 1086 end. 1087 1088%%---------------------------------------------------------------------------- 1089%% Minor helpers 1090%%---------------------------------------------------------------------------- 1091a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, 1092 mode = default, 1093 len = Len, 1094 bytes = Bytes, 1095 unacked_bytes = UnackedBytes, 1096 persistent_count = PersistentCount, 1097 persistent_bytes = PersistentBytes, 1098 ram_msg_count = RamMsgCount, 1099 ram_bytes = RamBytes}) -> 1100 E1 = ?QUEUE:is_empty(Q1), 1101 E2 = ?QUEUE:is_empty(Q2), 1102 ED = Delta#delta.count == 0, 1103 E3 = ?QUEUE:is_empty(Q3), 1104 E4 = ?QUEUE:is_empty(Q4), 1105 LZ = Len == 0, 1106 1107 %% if q1 has messages then q3 cannot be empty. See publish/6. 1108 true = E1 or not E3, 1109 %% if q2 has messages then we have messages in delta (paged to 1110 %% disk). See push_alphas_to_betas/2. 1111 true = E2 or not ED, 1112 %% if delta has messages then q3 cannot be empty. This is enforced 1113 %% by paging, where min([segment_entry_count(), len(q3)]) messages 1114 %% are always kept on RAM. 1115 true = ED or not E3, 1116 %% if the queue length is 0, then q3 and q4 must be empty. 1117 true = LZ == (E3 and E4), 1118 1119 true = Len >= 0, 1120 true = Bytes >= 0, 1121 true = UnackedBytes >= 0, 1122 true = PersistentCount >= 0, 1123 true = PersistentBytes >= 0, 1124 true = RamMsgCount >= 0, 1125 true = RamMsgCount =< Len, 1126 true = RamBytes >= 0, 1127 true = RamBytes =< Bytes + UnackedBytes, 1128 1129 State; 1130a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, 1131 mode = lazy, 1132 len = Len, 1133 bytes = Bytes, 1134 unacked_bytes = UnackedBytes, 1135 persistent_count = PersistentCount, 1136 persistent_bytes = PersistentBytes, 1137 ram_msg_count = RamMsgCount, 1138 ram_bytes = RamBytes}) -> 1139 E1 = ?QUEUE:is_empty(Q1), 1140 E2 = ?QUEUE:is_empty(Q2), 1141 ED = Delta#delta.count == 0, 1142 E3 = ?QUEUE:is_empty(Q3), 1143 E4 = ?QUEUE:is_empty(Q4), 1144 LZ = Len == 0, 1145 L3 = ?QUEUE:len(Q3), 1146 1147 %% q1 must always be empty, since q1 only gets messages during 1148 %% publish, but for lazy queues messages go straight to delta. 1149 true = E1, 1150 1151 %% q2 only gets messages from q1 when push_alphas_to_betas is 1152 %% called for a non empty delta, which won't be the case for a 1153 %% lazy queue. This means q2 must always be empty. 1154 true = E2, 1155 1156 %% q4 must always be empty, since q1 only gets messages during 1157 %% publish, but for lazy queues messages go straight to delta. 1158 true = E4, 1159 1160 %% if the queue is empty, then delta is empty and q3 is empty. 1161 true = LZ == (ED and E3), 1162 1163 %% There should be no messages in q1, q2, and q4 1164 true = Delta#delta.count + L3 == Len, 1165 1166 true = Len >= 0, 1167 true = Bytes >= 0, 1168 true = UnackedBytes >= 0, 1169 true = PersistentCount >= 0, 1170 true = PersistentBytes >= 0, 1171 true = RamMsgCount >= 0, 1172 true = RamMsgCount =< Len, 1173 true = RamBytes >= 0, 1174 true = RamBytes =< Bytes + UnackedBytes, 1175 1176 State. 1177 1178d(Delta = #delta { start_seq_id = Start, count = Count, end_seq_id = End }) 1179 when Start + Count =< End -> 1180 Delta. 1181 1182m(MsgStatus = #msg_status { is_persistent = IsPersistent, 1183 msg_in_store = MsgInStore, 1184 index_on_disk = IndexOnDisk }) -> 1185 true = (not IsPersistent) or IndexOnDisk, 1186 true = msg_in_ram(MsgStatus) or MsgInStore, 1187 MsgStatus. 1188 1189one_if(true ) -> 1; 1190one_if(false) -> 0. 1191 1192cons_if(true, E, L) -> [E | L]; 1193cons_if(false, _E, L) -> L. 1194 1195gb_sets_maybe_insert(false, _Val, Set) -> Set; 1196gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set). 1197 1198msg_status(IsPersistent, IsDelivered, SeqId, 1199 Msg = #basic_message {id = MsgId}, MsgProps, IndexMaxSize) -> 1200 #msg_status{seq_id = SeqId, 1201 msg_id = MsgId, 1202 msg = Msg, 1203 is_persistent = IsPersistent, 1204 is_delivered = IsDelivered, 1205 msg_in_store = false, 1206 index_on_disk = false, 1207 persist_to = determine_persist_to(Msg, MsgProps, IndexMaxSize), 1208 msg_props = MsgProps}. 1209 1210beta_msg_status({Msg = #basic_message{id = MsgId}, 1211 SeqId, MsgProps, IsPersistent, IsDelivered}) -> 1212 MS0 = beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered), 1213 MS0#msg_status{msg_id = MsgId, 1214 msg = Msg, 1215 persist_to = queue_index, 1216 msg_in_store = false}; 1217 1218beta_msg_status({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered}) -> 1219 MS0 = beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered), 1220 MS0#msg_status{msg_id = MsgId, 1221 msg = undefined, 1222 persist_to = msg_store, 1223 msg_in_store = true}. 1224 1225beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered) -> 1226 #msg_status{seq_id = SeqId, 1227 msg = undefined, 1228 is_persistent = IsPersistent, 1229 is_delivered = IsDelivered, 1230 index_on_disk = true, 1231 msg_props = MsgProps}. 1232 1233trim_msg_status(MsgStatus) -> 1234 case persist_to(MsgStatus) of 1235 msg_store -> MsgStatus#msg_status{msg = undefined}; 1236 queue_index -> MsgStatus 1237 end. 1238 1239with_msg_store_state({MSCStateP, MSCStateT}, true, Fun) -> 1240 {Result, MSCStateP1} = Fun(MSCStateP), 1241 {Result, {MSCStateP1, MSCStateT}}; 1242with_msg_store_state({MSCStateP, MSCStateT}, false, Fun) -> 1243 {Result, MSCStateT1} = Fun(MSCStateT), 1244 {Result, {MSCStateP, MSCStateT1}}. 1245 1246with_immutable_msg_store_state(MSCState, IsPersistent, Fun) -> 1247 {Res, MSCState} = with_msg_store_state(MSCState, IsPersistent, 1248 fun (MSCState1) -> 1249 {Fun(MSCState1), MSCState1} 1250 end), 1251 Res. 1252 1253msg_store_client_init(MsgStore, MsgOnDiskFun, Callback, VHost) -> 1254 msg_store_client_init(MsgStore, rabbit_guid:gen(), MsgOnDiskFun, 1255 Callback, VHost). 1256 1257msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback, VHost) -> 1258 CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE), 1259 rabbit_vhost_msg_store:client_init(VHost, MsgStore, 1260 Ref, MsgOnDiskFun, 1261 fun () -> 1262 Callback(?MODULE, CloseFDsFun) 1263 end). 1264 1265msg_store_write(MSCState, IsPersistent, MsgId, Msg) -> 1266 with_immutable_msg_store_state( 1267 MSCState, IsPersistent, 1268 fun (MSCState1) -> 1269 rabbit_msg_store:write_flow(MsgId, Msg, MSCState1) 1270 end). 1271 1272msg_store_read(MSCState, IsPersistent, MsgId) -> 1273 with_msg_store_state( 1274 MSCState, IsPersistent, 1275 fun (MSCState1) -> 1276 rabbit_msg_store:read(MsgId, MSCState1) 1277 end). 1278 1279msg_store_remove(MSCState, IsPersistent, MsgIds) -> 1280 with_immutable_msg_store_state( 1281 MSCState, IsPersistent, 1282 fun (MCSState1) -> 1283 rabbit_msg_store:remove(MsgIds, MCSState1) 1284 end). 1285 1286msg_store_close_fds(MSCState, IsPersistent) -> 1287 with_msg_store_state( 1288 MSCState, IsPersistent, 1289 fun (MSCState1) -> rabbit_msg_store:close_all_indicated(MSCState1) end). 1290 1291msg_store_close_fds_fun(IsPersistent) -> 1292 fun (?MODULE, State = #vqstate { msg_store_clients = MSCState }) -> 1293 {ok, MSCState1} = msg_store_close_fds(MSCState, IsPersistent), 1294 State #vqstate { msg_store_clients = MSCState1 } 1295 end. 1296 1297maybe_write_delivered(false, _SeqId, IndexState) -> 1298 IndexState; 1299maybe_write_delivered(true, SeqId, IndexState) -> 1300 rabbit_queue_index:deliver([SeqId], IndexState). 1301 1302betas_from_index_entries(List, TransientThreshold, DelsAndAcksFun, State) -> 1303 {Filtered, Delivers, Acks, RamReadyCount, RamBytes, TransientCount, TransientBytes} = 1304 lists:foldr( 1305 fun ({_MsgOrId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M, 1306 {Filtered1, Delivers1, Acks1, RRC, RB, TC, TB} = Acc) -> 1307 case SeqId < TransientThreshold andalso not IsPersistent of 1308 true -> {Filtered1, 1309 cons_if(not IsDelivered, SeqId, Delivers1), 1310 [SeqId | Acks1], RRC, RB, TC, TB}; 1311 false -> MsgStatus = m(beta_msg_status(M)), 1312 HaveMsg = msg_in_ram(MsgStatus), 1313 Size = msg_size(MsgStatus), 1314 case is_msg_in_pending_acks(SeqId, State) of 1315 false -> {?QUEUE:in_r(MsgStatus, Filtered1), 1316 Delivers1, Acks1, 1317 RRC + one_if(HaveMsg), 1318 RB + one_if(HaveMsg) * Size, 1319 TC + one_if(not IsPersistent), 1320 TB + one_if(not IsPersistent) * Size}; 1321 true -> Acc %% [0] 1322 end 1323 end 1324 end, {?QUEUE:new(), [], [], 0, 0, 0, 0}, List), 1325 {Filtered, RamReadyCount, RamBytes, DelsAndAcksFun(Delivers, Acks, State), 1326 TransientCount, TransientBytes}. 1327%% [0] We don't increase RamBytes here, even though it pertains to 1328%% unacked messages too, since if HaveMsg then the message must have 1329%% been stored in the QI, thus the message must have been in 1330%% qi_pending_ack, thus it must already have been in RAM. 1331 1332is_msg_in_pending_acks(SeqId, #vqstate { ram_pending_ack = RPA, 1333 disk_pending_ack = DPA, 1334 qi_pending_ack = QPA }) -> 1335 (gb_trees:is_defined(SeqId, RPA) orelse 1336 gb_trees:is_defined(SeqId, DPA) orelse 1337 gb_trees:is_defined(SeqId, QPA)). 1338 1339expand_delta(SeqId, ?BLANK_DELTA_PATTERN(X), IsPersistent) -> 1340 d(#delta { start_seq_id = SeqId, count = 1, end_seq_id = SeqId + 1, 1341 transient = one_if(not IsPersistent)}); 1342expand_delta(SeqId, #delta { start_seq_id = StartSeqId, 1343 count = Count, 1344 transient = Transient } = Delta, 1345 IsPersistent ) 1346 when SeqId < StartSeqId -> 1347 d(Delta #delta { start_seq_id = SeqId, count = Count + 1, 1348 transient = Transient + one_if(not IsPersistent)}); 1349expand_delta(SeqId, #delta { count = Count, 1350 end_seq_id = EndSeqId, 1351 transient = Transient } = Delta, 1352 IsPersistent) 1353 when SeqId >= EndSeqId -> 1354 d(Delta #delta { count = Count + 1, end_seq_id = SeqId + 1, 1355 transient = Transient + one_if(not IsPersistent)}); 1356expand_delta(_SeqId, #delta { count = Count, 1357 transient = Transient } = Delta, 1358 IsPersistent ) -> 1359 d(Delta #delta { count = Count + 1, 1360 transient = Transient + one_if(not IsPersistent) }). 1361 1362%%---------------------------------------------------------------------------- 1363%% Internal major helpers for Public API 1364%%---------------------------------------------------------------------------- 1365 1366init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms, 1367 PersistentClient, TransientClient, VHost) -> 1368 {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState), 1369 1370 {DeltaCount1, DeltaBytes1} = 1371 case Terms of 1372 non_clean_shutdown -> {DeltaCount, DeltaBytes}; 1373 _ -> {proplists:get_value(persistent_count, 1374 Terms, DeltaCount), 1375 proplists:get_value(persistent_bytes, 1376 Terms, DeltaBytes)} 1377 end, 1378 Delta = case DeltaCount1 == 0 andalso DeltaCount /= undefined of 1379 true -> ?BLANK_DELTA; 1380 false -> d(#delta { start_seq_id = LowSeqId, 1381 count = DeltaCount1, 1382 transient = 0, 1383 end_seq_id = NextSeqId }) 1384 end, 1385 Now = erlang:monotonic_time(), 1386 IoBatchSize = rabbit_misc:get_env(rabbit, msg_store_io_batch_size, 1387 ?IO_BATCH_SIZE), 1388 1389 {ok, IndexMaxSize} = application:get_env( 1390 rabbit, queue_index_embed_msgs_below), 1391 State = #vqstate { 1392 q1 = ?QUEUE:new(), 1393 q2 = ?QUEUE:new(), 1394 delta = Delta, 1395 q3 = ?QUEUE:new(), 1396 q4 = ?QUEUE:new(), 1397 next_seq_id = NextSeqId, 1398 ram_pending_ack = gb_trees:empty(), 1399 disk_pending_ack = gb_trees:empty(), 1400 qi_pending_ack = gb_trees:empty(), 1401 index_state = IndexState1, 1402 msg_store_clients = {PersistentClient, TransientClient}, 1403 durable = IsDurable, 1404 transient_threshold = NextSeqId, 1405 qi_embed_msgs_below = IndexMaxSize, 1406 1407 len = DeltaCount1, 1408 persistent_count = DeltaCount1, 1409 bytes = DeltaBytes1, 1410 persistent_bytes = DeltaBytes1, 1411 delta_transient_bytes = 0, 1412 1413 target_ram_count = infinity, 1414 ram_msg_count = 0, 1415 ram_msg_count_prev = 0, 1416 ram_ack_count_prev = 0, 1417 ram_bytes = 0, 1418 unacked_bytes = 0, 1419 out_counter = 0, 1420 in_counter = 0, 1421 rates = blank_rates(Now), 1422 msgs_on_disk = gb_sets:new(), 1423 msg_indices_on_disk = gb_sets:new(), 1424 unconfirmed = gb_sets:new(), 1425 confirmed = gb_sets:new(), 1426 ack_out_counter = 0, 1427 ack_in_counter = 0, 1428 disk_read_count = 0, 1429 disk_write_count = 0, 1430 1431 io_batch_size = IoBatchSize, 1432 1433 mode = default, 1434 memory_reduction_run_count = 0, 1435 virtual_host = VHost}, 1436 a(maybe_deltas_to_betas(State)). 1437 1438blank_rates(Now) -> 1439 #rates { in = 0.0, 1440 out = 0.0, 1441 ack_in = 0.0, 1442 ack_out = 0.0, 1443 timestamp = Now}. 1444 1445in_r(MsgStatus = #msg_status { msg = undefined }, 1446 State = #vqstate { mode = default, q3 = Q3, q4 = Q4 }) -> 1447 case ?QUEUE:is_empty(Q4) of 1448 true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) }; 1449 false -> {Msg, State1 = #vqstate { q4 = Q4a }} = 1450 read_msg(MsgStatus, State), 1451 MsgStatus1 = MsgStatus#msg_status{msg = Msg}, 1452 stats(ready0, {MsgStatus, MsgStatus1}, 0, 1453 State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus1, Q4a) }) 1454 end; 1455in_r(MsgStatus, 1456 State = #vqstate { mode = default, q4 = Q4 }) -> 1457 State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) }; 1458%% lazy queues 1459in_r(MsgStatus = #msg_status { seq_id = SeqId, is_persistent = IsPersistent }, 1460 State = #vqstate { mode = lazy, q3 = Q3, delta = Delta}) -> 1461 case ?QUEUE:is_empty(Q3) of 1462 true -> 1463 {_MsgStatus1, State1} = 1464 maybe_write_to_disk(true, true, MsgStatus, State), 1465 State2 = stats(ready0, {MsgStatus, none}, 1, State1), 1466 Delta1 = expand_delta(SeqId, Delta, IsPersistent), 1467 State2 #vqstate{ delta = Delta1}; 1468 false -> 1469 State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) } 1470 end. 1471 1472queue_out(State = #vqstate { mode = default, q4 = Q4 }) -> 1473 case ?QUEUE:out(Q4) of 1474 {empty, _Q4} -> 1475 case fetch_from_q3(State) of 1476 {empty, _State1} = Result -> Result; 1477 {loaded, {MsgStatus, State1}} -> {{value, MsgStatus}, State1} 1478 end; 1479 {{value, MsgStatus}, Q4a} -> 1480 {{value, MsgStatus}, State #vqstate { q4 = Q4a }} 1481 end; 1482%% lazy queues 1483queue_out(State = #vqstate { mode = lazy }) -> 1484 case fetch_from_q3(State) of 1485 {empty, _State1} = Result -> Result; 1486 {loaded, {MsgStatus, State1}} -> {{value, MsgStatus}, State1} 1487 end. 1488 1489read_msg(#msg_status{msg = undefined, 1490 msg_id = MsgId, 1491 is_persistent = IsPersistent}, State) -> 1492 read_msg(MsgId, IsPersistent, State); 1493read_msg(#msg_status{msg = Msg}, State) -> 1494 {Msg, State}. 1495 1496read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState, 1497 disk_read_count = Count}) -> 1498 {{ok, Msg = #basic_message {}}, MSCState1} = 1499 msg_store_read(MSCState, IsPersistent, MsgId), 1500 {Msg, State #vqstate {msg_store_clients = MSCState1, 1501 disk_read_count = Count + 1}}. 1502 1503stats(Signs, Statuses, DeltaPaged, State) -> 1504 stats0(expand_signs(Signs), expand_statuses(Statuses), DeltaPaged, State). 1505 1506expand_signs(ready0) -> {0, 0, true}; 1507expand_signs(lazy_pub) -> {1, 0, true}; 1508expand_signs({A, B}) -> {A, B, false}. 1509 1510expand_statuses({none, A}) -> {false, msg_in_ram(A), A}; 1511expand_statuses({B, none}) -> {msg_in_ram(B), false, B}; 1512expand_statuses({lazy, A}) -> {false , false, A}; 1513expand_statuses({B, A}) -> {msg_in_ram(B), msg_in_ram(A), B}. 1514 1515%% In this function at least, we are religious: the variable name 1516%% contains "Ready" or "Unacked" iff that is what it counts. If 1517%% neither is present it counts both. 1518stats0({DeltaReady, DeltaUnacked, ReadyMsgPaged}, 1519 {InRamBefore, InRamAfter, MsgStatus}, DeltaPaged, 1520 State = #vqstate{len = ReadyCount, 1521 bytes = ReadyBytes, 1522 ram_msg_count = RamReadyCount, 1523 persistent_count = PersistentCount, 1524 unacked_bytes = UnackedBytes, 1525 ram_bytes = RamBytes, 1526 delta_transient_bytes = DeltaBytes, 1527 persistent_bytes = PersistentBytes}) -> 1528 S = msg_size(MsgStatus), 1529 DeltaTotal = DeltaReady + DeltaUnacked, 1530 DeltaRam = case {InRamBefore, InRamAfter} of 1531 {false, false} -> 0; 1532 {false, true} -> 1; 1533 {true, false} -> -1; 1534 {true, true} -> 0 1535 end, 1536 DeltaRamReady = case DeltaReady of 1537 1 -> one_if(InRamAfter); 1538 -1 -> -one_if(InRamBefore); 1539 0 when ReadyMsgPaged -> DeltaRam; 1540 0 -> 0 1541 end, 1542 DeltaPersistent = DeltaTotal * one_if(MsgStatus#msg_status.is_persistent), 1543 State#vqstate{len = ReadyCount + DeltaReady, 1544 ram_msg_count = RamReadyCount + DeltaRamReady, 1545 persistent_count = PersistentCount + DeltaPersistent, 1546 bytes = ReadyBytes + DeltaReady * S, 1547 unacked_bytes = UnackedBytes + DeltaUnacked * S, 1548 ram_bytes = RamBytes + DeltaRam * S, 1549 persistent_bytes = PersistentBytes + DeltaPersistent * S, 1550 delta_transient_bytes = DeltaBytes + DeltaPaged * one_if(not MsgStatus#msg_status.is_persistent) * S}. 1551 1552msg_size(#msg_status{msg_props = #message_properties{size = Size}}) -> Size. 1553 1554msg_in_ram(#msg_status{msg = Msg}) -> Msg =/= undefined. 1555 1556%% first param: AckRequired 1557remove(true, MsgStatus = #msg_status { 1558 seq_id = SeqId, 1559 is_delivered = IsDelivered, 1560 index_on_disk = IndexOnDisk }, 1561 State = #vqstate {out_counter = OutCount, 1562 index_state = IndexState}) -> 1563 %% Mark it delivered if necessary 1564 IndexState1 = maybe_write_delivered( 1565 IndexOnDisk andalso not IsDelivered, 1566 SeqId, IndexState), 1567 1568 State1 = record_pending_ack( 1569 MsgStatus #msg_status { 1570 is_delivered = true }, State), 1571 1572 State2 = stats({-1, 1}, {MsgStatus, MsgStatus}, 0, State1), 1573 1574 {SeqId, maybe_update_rates( 1575 State2 #vqstate {out_counter = OutCount + 1, 1576 index_state = IndexState1})}; 1577 1578%% This function body has the same behaviour as remove_queue_entries/3 1579%% but instead of removing messages based on a ?QUEUE, this removes 1580%% just one message, the one referenced by the MsgStatus provided. 1581remove(false, MsgStatus = #msg_status { 1582 seq_id = SeqId, 1583 msg_id = MsgId, 1584 is_persistent = IsPersistent, 1585 is_delivered = IsDelivered, 1586 msg_in_store = MsgInStore, 1587 index_on_disk = IndexOnDisk }, 1588 State = #vqstate {out_counter = OutCount, 1589 index_state = IndexState, 1590 msg_store_clients = MSCState}) -> 1591 %% Mark it delivered if necessary 1592 IndexState1 = maybe_write_delivered( 1593 IndexOnDisk andalso not IsDelivered, 1594 SeqId, IndexState), 1595 1596 %% Remove from msg_store and queue index, if necessary 1597 case MsgInStore of 1598 true -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]); 1599 false -> ok 1600 end, 1601 1602 IndexState2 = 1603 case IndexOnDisk of 1604 true -> rabbit_queue_index:ack([SeqId], IndexState1); 1605 false -> IndexState1 1606 end, 1607 1608 State1 = stats({-1, 0}, {MsgStatus, none}, 0, State), 1609 1610 {undefined, maybe_update_rates( 1611 State1 #vqstate {out_counter = OutCount + 1, 1612 index_state = IndexState2})}. 1613 1614%% This function exists as a way to improve dropwhile/2 1615%% performance. The idea of having this function is to optimise calls 1616%% to rabbit_queue_index by batching delivers and acks, instead of 1617%% sending them one by one. 1618%% 1619%% Instead of removing every message as their are popped from the 1620%% queue, it first accumulates them and then removes them by calling 1621%% remove_queue_entries/3, since the behaviour of 1622%% remove_queue_entries/3 when used with 1623%% process_delivers_and_acks_fun(deliver_and_ack) is the same as 1624%% calling remove(false, MsgStatus, State). 1625%% 1626%% remove/3 also updates the out_counter in every call, but here we do 1627%% it just once at the end. 1628remove_by_predicate(Pred, State = #vqstate {out_counter = OutCount}) -> 1629 {MsgProps, QAcc, State1} = 1630 collect_by_predicate(Pred, ?QUEUE:new(), State), 1631 State2 = 1632 remove_queue_entries( 1633 QAcc, process_delivers_and_acks_fun(deliver_and_ack), State1), 1634 %% maybe_update_rates/1 is called in remove/2 for every 1635 %% message. Since we update out_counter only once, we call it just 1636 %% there. 1637 {MsgProps, maybe_update_rates( 1638 State2 #vqstate { 1639 out_counter = OutCount + ?QUEUE:len(QAcc)})}. 1640 1641%% This function exists as a way to improve fetchwhile/4 1642%% performance. The idea of having this function is to optimise calls 1643%% to rabbit_queue_index by batching delivers, instead of sending them 1644%% one by one. 1645%% 1646%% Fun is the function passed to fetchwhile/4 that's 1647%% applied to every fetched message and used to build the fetchwhile/4 1648%% result accumulator FetchAcc. 1649fetch_by_predicate(Pred, Fun, FetchAcc, 1650 State = #vqstate { 1651 index_state = IndexState, 1652 out_counter = OutCount}) -> 1653 {MsgProps, QAcc, State1} = 1654 collect_by_predicate(Pred, ?QUEUE:new(), State), 1655 1656 {Delivers, FetchAcc1, State2} = 1657 process_queue_entries(QAcc, Fun, FetchAcc, State1), 1658 1659 IndexState1 = rabbit_queue_index:deliver(Delivers, IndexState), 1660 1661 {MsgProps, FetchAcc1, maybe_update_rates( 1662 State2 #vqstate { 1663 index_state = IndexState1, 1664 out_counter = OutCount + ?QUEUE:len(QAcc)})}. 1665 1666%% We try to do here the same as what remove(true, State) does but 1667%% processing several messages at the same time. The idea is to 1668%% optimize rabbit_queue_index:deliver/2 calls by sending a list of 1669%% SeqIds instead of one by one, thus process_queue_entries1 will 1670%% accumulate the required deliveries, will record_pending_ack for 1671%% each message, and will update stats, like remove/2 does. 1672%% 1673%% For the meaning of Fun and FetchAcc arguments see 1674%% fetch_by_predicate/4 above. 1675process_queue_entries(Q, Fun, FetchAcc, State) -> 1676 ?QUEUE:foldl(fun (MsgStatus, Acc) -> 1677 process_queue_entries1(MsgStatus, Fun, Acc) 1678 end, 1679 {[], FetchAcc, State}, Q). 1680 1681process_queue_entries1( 1682 #msg_status { seq_id = SeqId, is_delivered = IsDelivered, 1683 index_on_disk = IndexOnDisk} = MsgStatus, 1684 Fun, 1685 {Delivers, FetchAcc, State}) -> 1686 {Msg, State1} = read_msg(MsgStatus, State), 1687 State2 = record_pending_ack( 1688 MsgStatus #msg_status { 1689 is_delivered = true }, State1), 1690 {cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers), 1691 Fun(Msg, SeqId, FetchAcc), 1692 stats({-1, 1}, {MsgStatus, MsgStatus}, 0, State2)}. 1693 1694collect_by_predicate(Pred, QAcc, State) -> 1695 case queue_out(State) of 1696 {empty, State1} -> 1697 {undefined, QAcc, State1}; 1698 {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} -> 1699 case Pred(MsgProps) of 1700 true -> collect_by_predicate(Pred, ?QUEUE:in(MsgStatus, QAcc), 1701 State1); 1702 false -> {MsgProps, QAcc, in_r(MsgStatus, State1)} 1703 end 1704 end. 1705 1706%%---------------------------------------------------------------------------- 1707%% Helpers for Public API purge/1 function 1708%%---------------------------------------------------------------------------- 1709 1710%% The difference between purge_when_pending_acks/1 1711%% vs. purge_and_index_reset/1 is that the first one issues a deliver 1712%% and an ack to the queue index for every message that's being 1713%% removed, while the later just resets the queue index state. 1714purge_when_pending_acks(State) -> 1715 State1 = purge1(process_delivers_and_acks_fun(deliver_and_ack), State), 1716 a(State1). 1717 1718purge_and_index_reset(State) -> 1719 State1 = purge1(process_delivers_and_acks_fun(none), State), 1720 a(reset_qi_state(State1)). 1721 1722%% This function removes messages from each of {q1, q2, q3, q4}. 1723%% 1724%% With remove_queue_entries/3 q1 and q4 are emptied, while q2 and q3 1725%% are specially handled by purge_betas_and_deltas/2. 1726%% 1727%% purge_betas_and_deltas/2 loads messages from the queue index, 1728%% filling up q3 and in some cases moving messages form q2 to q3 while 1729%% resetting q2 to an empty queue (see maybe_deltas_to_betas/2). The 1730%% messages loaded into q3 are removed by calling 1731%% remove_queue_entries/3 until there are no more messages to be read 1732%% from the queue index. Messages are read in batches from the queue 1733%% index. 1734purge1(AfterFun, State = #vqstate { q4 = Q4}) -> 1735 State1 = remove_queue_entries(Q4, AfterFun, State), 1736 1737 State2 = #vqstate {q1 = Q1} = 1738 purge_betas_and_deltas(AfterFun, State1#vqstate{q4 = ?QUEUE:new()}), 1739 1740 State3 = remove_queue_entries(Q1, AfterFun, State2), 1741 1742 a(State3#vqstate{q1 = ?QUEUE:new()}). 1743 1744reset_qi_state(State = #vqstate{index_state = IndexState}) -> 1745 State#vqstate{index_state = 1746 rabbit_queue_index:reset_state(IndexState)}. 1747 1748is_pending_ack_empty(State) -> 1749 count_pending_acks(State) =:= 0. 1750 1751is_unconfirmed_empty(#vqstate { unconfirmed = UC }) -> 1752 gb_sets:is_empty(UC). 1753 1754count_pending_acks(#vqstate { ram_pending_ack = RPA, 1755 disk_pending_ack = DPA, 1756 qi_pending_ack = QPA }) -> 1757 gb_trees:size(RPA) + gb_trees:size(DPA) + gb_trees:size(QPA). 1758 1759purge_betas_and_deltas(DelsAndAcksFun, State = #vqstate { mode = Mode }) -> 1760 State0 = #vqstate { q3 = Q3 } = 1761 case Mode of 1762 lazy -> maybe_deltas_to_betas(DelsAndAcksFun, State); 1763 _ -> State 1764 end, 1765 1766 case ?QUEUE:is_empty(Q3) of 1767 true -> State0; 1768 false -> State1 = remove_queue_entries(Q3, DelsAndAcksFun, State0), 1769 purge_betas_and_deltas(DelsAndAcksFun, 1770 maybe_deltas_to_betas( 1771 DelsAndAcksFun, 1772 State1#vqstate{q3 = ?QUEUE:new()})) 1773 end. 1774 1775remove_queue_entries(Q, DelsAndAcksFun, 1776 State = #vqstate{msg_store_clients = MSCState}) -> 1777 {MsgIdsByStore, Delivers, Acks, State1} = 1778 ?QUEUE:foldl(fun remove_queue_entries1/2, 1779 {maps:new(), [], [], State}, Q), 1780 remove_msgs_by_id(MsgIdsByStore, MSCState), 1781 DelsAndAcksFun(Delivers, Acks, State1). 1782 1783remove_queue_entries1( 1784 #msg_status { msg_id = MsgId, seq_id = SeqId, is_delivered = IsDelivered, 1785 msg_in_store = MsgInStore, index_on_disk = IndexOnDisk, 1786 is_persistent = IsPersistent} = MsgStatus, 1787 {MsgIdsByStore, Delivers, Acks, State}) -> 1788 {case MsgInStore of 1789 true -> rabbit_misc:maps_cons(IsPersistent, MsgId, MsgIdsByStore); 1790 false -> MsgIdsByStore 1791 end, 1792 cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers), 1793 cons_if(IndexOnDisk, SeqId, Acks), 1794 stats({-1, 0}, {MsgStatus, none}, 0, State)}. 1795 1796process_delivers_and_acks_fun(deliver_and_ack) -> 1797 fun (Delivers, Acks, State = #vqstate { index_state = IndexState }) -> 1798 IndexState1 = 1799 rabbit_queue_index:ack( 1800 Acks, rabbit_queue_index:deliver(Delivers, IndexState)), 1801 State #vqstate { index_state = IndexState1 } 1802 end; 1803process_delivers_and_acks_fun(_) -> 1804 fun (_, _, State) -> 1805 State 1806 end. 1807 1808%%---------------------------------------------------------------------------- 1809%% Internal gubbins for publishing 1810%%---------------------------------------------------------------------------- 1811 1812publish1(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, 1813 MsgProps = #message_properties { needs_confirming = NeedsConfirming }, 1814 IsDelivered, _ChPid, _Flow, PersistFun, 1815 State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, 1816 mode = default, 1817 qi_embed_msgs_below = IndexMaxSize, 1818 next_seq_id = SeqId, 1819 in_counter = InCount, 1820 durable = IsDurable, 1821 unconfirmed = UC }) -> 1822 IsPersistent1 = IsDurable andalso IsPersistent, 1823 MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps, IndexMaxSize), 1824 {MsgStatus1, State1} = PersistFun(false, false, MsgStatus, State), 1825 State2 = case ?QUEUE:is_empty(Q3) of 1826 false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) }; 1827 true -> State1 #vqstate { q4 = ?QUEUE:in(m(MsgStatus1), Q4) } 1828 end, 1829 InCount1 = InCount + 1, 1830 UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), 1831 stats({1, 0}, {none, MsgStatus1}, 0, 1832 State2#vqstate{ next_seq_id = SeqId + 1, 1833 in_counter = InCount1, 1834 unconfirmed = UC1 }); 1835publish1(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, 1836 MsgProps = #message_properties { needs_confirming = NeedsConfirming }, 1837 IsDelivered, _ChPid, _Flow, PersistFun, 1838 State = #vqstate { mode = lazy, 1839 qi_embed_msgs_below = IndexMaxSize, 1840 next_seq_id = SeqId, 1841 in_counter = InCount, 1842 durable = IsDurable, 1843 unconfirmed = UC, 1844 delta = Delta}) -> 1845 IsPersistent1 = IsDurable andalso IsPersistent, 1846 MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps, IndexMaxSize), 1847 {MsgStatus1, State1} = PersistFun(true, true, MsgStatus, State), 1848 Delta1 = expand_delta(SeqId, Delta, IsPersistent), 1849 UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), 1850 stats(lazy_pub, {lazy, m(MsgStatus1)}, 1, 1851 State1#vqstate{ delta = Delta1, 1852 next_seq_id = SeqId + 1, 1853 in_counter = InCount + 1, 1854 unconfirmed = UC1}). 1855 1856batch_publish1({Msg, MsgProps, IsDelivered}, {ChPid, Flow, State}) -> 1857 {ChPid, Flow, publish1(Msg, MsgProps, IsDelivered, ChPid, Flow, 1858 fun maybe_prepare_write_to_disk/4, State)}. 1859 1860publish_delivered1(Msg = #basic_message { is_persistent = IsPersistent, 1861 id = MsgId }, 1862 MsgProps = #message_properties { 1863 needs_confirming = NeedsConfirming }, 1864 _ChPid, _Flow, PersistFun, 1865 State = #vqstate { mode = default, 1866 qi_embed_msgs_below = IndexMaxSize, 1867 next_seq_id = SeqId, 1868 out_counter = OutCount, 1869 in_counter = InCount, 1870 durable = IsDurable, 1871 unconfirmed = UC }) -> 1872 IsPersistent1 = IsDurable andalso IsPersistent, 1873 MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps, IndexMaxSize), 1874 {MsgStatus1, State1} = PersistFun(false, false, MsgStatus, State), 1875 State2 = record_pending_ack(m(MsgStatus1), State1), 1876 UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), 1877 State3 = stats({0, 1}, {none, MsgStatus1}, 0, 1878 State2 #vqstate { next_seq_id = SeqId + 1, 1879 out_counter = OutCount + 1, 1880 in_counter = InCount + 1, 1881 unconfirmed = UC1 }), 1882 {SeqId, State3}; 1883publish_delivered1(Msg = #basic_message { is_persistent = IsPersistent, 1884 id = MsgId }, 1885 MsgProps = #message_properties { 1886 needs_confirming = NeedsConfirming }, 1887 _ChPid, _Flow, PersistFun, 1888 State = #vqstate { mode = lazy, 1889 qi_embed_msgs_below = IndexMaxSize, 1890 next_seq_id = SeqId, 1891 out_counter = OutCount, 1892 in_counter = InCount, 1893 durable = IsDurable, 1894 unconfirmed = UC }) -> 1895 IsPersistent1 = IsDurable andalso IsPersistent, 1896 MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps, IndexMaxSize), 1897 {MsgStatus1, State1} = PersistFun(true, true, MsgStatus, State), 1898 State2 = record_pending_ack(m(MsgStatus1), State1), 1899 UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), 1900 State3 = stats({0, 1}, {none, MsgStatus1}, 0, 1901 State2 #vqstate { next_seq_id = SeqId + 1, 1902 out_counter = OutCount + 1, 1903 in_counter = InCount + 1, 1904 unconfirmed = UC1 }), 1905 {SeqId, State3}. 1906 1907batch_publish_delivered1({Msg, MsgProps}, {ChPid, Flow, SeqIds, State}) -> 1908 {SeqId, State1} = 1909 publish_delivered1(Msg, MsgProps, ChPid, Flow, 1910 fun maybe_prepare_write_to_disk/4, 1911 State), 1912 {ChPid, Flow, [SeqId | SeqIds], State1}. 1913 1914maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status { 1915 msg_in_store = true }, State) -> 1916 {MsgStatus, State}; 1917maybe_write_msg_to_disk(Force, MsgStatus = #msg_status { 1918 msg = Msg, msg_id = MsgId, 1919 is_persistent = IsPersistent }, 1920 State = #vqstate{ msg_store_clients = MSCState, 1921 disk_write_count = Count}) 1922 when Force orelse IsPersistent -> 1923 case persist_to(MsgStatus) of 1924 msg_store -> ok = msg_store_write(MSCState, IsPersistent, MsgId, 1925 prepare_to_store(Msg)), 1926 {MsgStatus#msg_status{msg_in_store = true}, 1927 State#vqstate{disk_write_count = Count + 1}}; 1928 queue_index -> {MsgStatus, State} 1929 end; 1930maybe_write_msg_to_disk(_Force, MsgStatus, State) -> 1931 {MsgStatus, State}. 1932 1933%% Due to certain optimisations made inside 1934%% rabbit_queue_index:pre_publish/7 we need to have two separate 1935%% functions for index persistence. This one is only used when paging 1936%% during memory pressure. We didn't want to modify 1937%% maybe_write_index_to_disk/3 because that function is used in other 1938%% places. 1939maybe_batch_write_index_to_disk(_Force, 1940 MsgStatus = #msg_status { 1941 index_on_disk = true }, State) -> 1942 {MsgStatus, State}; 1943maybe_batch_write_index_to_disk(Force, 1944 MsgStatus = #msg_status { 1945 msg = Msg, 1946 msg_id = MsgId, 1947 seq_id = SeqId, 1948 is_persistent = IsPersistent, 1949 is_delivered = IsDelivered, 1950 msg_props = MsgProps}, 1951 State = #vqstate { 1952 target_ram_count = TargetRamCount, 1953 disk_write_count = DiskWriteCount, 1954 index_state = IndexState}) 1955 when Force orelse IsPersistent -> 1956 {MsgOrId, DiskWriteCount1} = 1957 case persist_to(MsgStatus) of 1958 msg_store -> {MsgId, DiskWriteCount}; 1959 queue_index -> {prepare_to_store(Msg), DiskWriteCount + 1} 1960 end, 1961 IndexState1 = rabbit_queue_index:pre_publish( 1962 MsgOrId, SeqId, MsgProps, IsPersistent, IsDelivered, 1963 TargetRamCount, IndexState), 1964 {MsgStatus#msg_status{index_on_disk = true}, 1965 State#vqstate{index_state = IndexState1, 1966 disk_write_count = DiskWriteCount1}}; 1967maybe_batch_write_index_to_disk(_Force, MsgStatus, State) -> 1968 {MsgStatus, State}. 1969 1970maybe_write_index_to_disk(_Force, MsgStatus = #msg_status { 1971 index_on_disk = true }, State) -> 1972 {MsgStatus, State}; 1973maybe_write_index_to_disk(Force, MsgStatus = #msg_status { 1974 msg = Msg, 1975 msg_id = MsgId, 1976 seq_id = SeqId, 1977 is_persistent = IsPersistent, 1978 is_delivered = IsDelivered, 1979 msg_props = MsgProps}, 1980 State = #vqstate{target_ram_count = TargetRamCount, 1981 disk_write_count = DiskWriteCount, 1982 index_state = IndexState}) 1983 when Force orelse IsPersistent -> 1984 {MsgOrId, DiskWriteCount1} = 1985 case persist_to(MsgStatus) of 1986 msg_store -> {MsgId, DiskWriteCount}; 1987 queue_index -> {prepare_to_store(Msg), DiskWriteCount + 1} 1988 end, 1989 IndexState1 = rabbit_queue_index:publish( 1990 MsgOrId, SeqId, MsgProps, IsPersistent, TargetRamCount, 1991 IndexState), 1992 IndexState2 = maybe_write_delivered(IsDelivered, SeqId, IndexState1), 1993 {MsgStatus#msg_status{index_on_disk = true}, 1994 State#vqstate{index_state = IndexState2, 1995 disk_write_count = DiskWriteCount1}}; 1996 1997maybe_write_index_to_disk(_Force, MsgStatus, State) -> 1998 {MsgStatus, State}. 1999 2000maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, State) -> 2001 {MsgStatus1, State1} = maybe_write_msg_to_disk(ForceMsg, MsgStatus, State), 2002 maybe_write_index_to_disk(ForceIndex, MsgStatus1, State1). 2003 2004maybe_prepare_write_to_disk(ForceMsg, ForceIndex, MsgStatus, State) -> 2005 {MsgStatus1, State1} = maybe_write_msg_to_disk(ForceMsg, MsgStatus, State), 2006 maybe_batch_write_index_to_disk(ForceIndex, MsgStatus1, State1). 2007 2008determine_persist_to(#basic_message{ 2009 content = #content{properties = Props, 2010 properties_bin = PropsBin}}, 2011 #message_properties{size = BodySize}, 2012 IndexMaxSize) -> 2013 %% The >= is so that you can set the env to 0 and never persist 2014 %% to the index. 2015 %% 2016 %% We want this to be fast, so we avoid size(term_to_binary()) 2017 %% here, or using the term size estimation from truncate.erl, both 2018 %% of which are too slow. So instead, if the message body size 2019 %% goes over the limit then we avoid any other checks. 2020 %% 2021 %% If it doesn't we need to decide if the properties will push 2022 %% it past the limit. If we have the encoded properties (usual 2023 %% case) we can just check their size. If we don't (message came 2024 %% via the direct client), we make a guess based on the number of 2025 %% headers. 2026 case BodySize >= IndexMaxSize of 2027 true -> msg_store; 2028 false -> Est = case is_binary(PropsBin) of 2029 true -> BodySize + size(PropsBin); 2030 false -> #'P_basic'{headers = Hs} = Props, 2031 case Hs of 2032 undefined -> 0; 2033 _ -> length(Hs) 2034 end * ?HEADER_GUESS_SIZE + BodySize 2035 end, 2036 case Est >= IndexMaxSize of 2037 true -> msg_store; 2038 false -> queue_index 2039 end 2040 end. 2041 2042persist_to(#msg_status{persist_to = To}) -> To. 2043 2044prepare_to_store(Msg) -> 2045 Msg#basic_message{ 2046 %% don't persist any recoverable decoded properties 2047 content = rabbit_binary_parser:clear_decoded_content( 2048 Msg #basic_message.content)}. 2049 2050%%---------------------------------------------------------------------------- 2051%% Internal gubbins for acks 2052%%---------------------------------------------------------------------------- 2053 2054record_pending_ack(#msg_status { seq_id = SeqId } = MsgStatus, 2055 State = #vqstate { ram_pending_ack = RPA, 2056 disk_pending_ack = DPA, 2057 qi_pending_ack = QPA, 2058 ack_in_counter = AckInCount}) -> 2059 Insert = fun (Tree) -> gb_trees:insert(SeqId, MsgStatus, Tree) end, 2060 {RPA1, DPA1, QPA1} = 2061 case {msg_in_ram(MsgStatus), persist_to(MsgStatus)} of 2062 {false, _} -> {RPA, Insert(DPA), QPA}; 2063 {_, queue_index} -> {RPA, DPA, Insert(QPA)}; 2064 {_, msg_store} -> {Insert(RPA), DPA, QPA} 2065 end, 2066 State #vqstate { ram_pending_ack = RPA1, 2067 disk_pending_ack = DPA1, 2068 qi_pending_ack = QPA1, 2069 ack_in_counter = AckInCount + 1}. 2070 2071lookup_pending_ack(SeqId, #vqstate { ram_pending_ack = RPA, 2072 disk_pending_ack = DPA, 2073 qi_pending_ack = QPA}) -> 2074 case gb_trees:lookup(SeqId, RPA) of 2075 {value, V} -> V; 2076 none -> case gb_trees:lookup(SeqId, DPA) of 2077 {value, V} -> V; 2078 none -> gb_trees:get(SeqId, QPA) 2079 end 2080 end. 2081 2082%% First parameter = UpdateStats 2083remove_pending_ack(true, SeqId, State) -> 2084 case remove_pending_ack(false, SeqId, State) of 2085 {none, _} -> 2086 {none, State}; 2087 {MsgStatus, State1} -> 2088 {MsgStatus, stats({0, -1}, {MsgStatus, none}, 0, State1)} 2089 end; 2090remove_pending_ack(false, SeqId, State = #vqstate{ram_pending_ack = RPA, 2091 disk_pending_ack = DPA, 2092 qi_pending_ack = QPA}) -> 2093 case gb_trees:lookup(SeqId, RPA) of 2094 {value, V} -> RPA1 = gb_trees:delete(SeqId, RPA), 2095 {V, State #vqstate { ram_pending_ack = RPA1 }}; 2096 none -> case gb_trees:lookup(SeqId, DPA) of 2097 {value, V} -> 2098 DPA1 = gb_trees:delete(SeqId, DPA), 2099 {V, State#vqstate{disk_pending_ack = DPA1}}; 2100 none -> 2101 case gb_trees:lookup(SeqId, QPA) of 2102 {value, V} -> 2103 QPA1 = gb_trees:delete(SeqId, QPA), 2104 {V, State#vqstate{qi_pending_ack = QPA1}}; 2105 none -> 2106 {none, State} 2107 end 2108 end 2109 end. 2110 2111purge_pending_ack(KeepPersistent, 2112 State = #vqstate { index_state = IndexState, 2113 msg_store_clients = MSCState }) -> 2114 {IndexOnDiskSeqIds, MsgIdsByStore, State1} = purge_pending_ack1(State), 2115 case KeepPersistent of 2116 true -> remove_transient_msgs_by_id(MsgIdsByStore, MSCState), 2117 State1; 2118 false -> IndexState1 = 2119 rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState), 2120 remove_msgs_by_id(MsgIdsByStore, MSCState), 2121 State1 #vqstate { index_state = IndexState1 } 2122 end. 2123 2124purge_pending_ack_delete_and_terminate( 2125 State = #vqstate { index_state = IndexState, 2126 msg_store_clients = MSCState }) -> 2127 {_, MsgIdsByStore, State1} = purge_pending_ack1(State), 2128 IndexState1 = rabbit_queue_index:delete_and_terminate(IndexState), 2129 remove_msgs_by_id(MsgIdsByStore, MSCState), 2130 State1 #vqstate { index_state = IndexState1 }. 2131 2132purge_pending_ack1(State = #vqstate { ram_pending_ack = RPA, 2133 disk_pending_ack = DPA, 2134 qi_pending_ack = QPA }) -> 2135 F = fun (_SeqId, MsgStatus, Acc) -> accumulate_ack(MsgStatus, Acc) end, 2136 {IndexOnDiskSeqIds, MsgIdsByStore, _AllMsgIds} = 2137 rabbit_misc:gb_trees_fold( 2138 F, rabbit_misc:gb_trees_fold( 2139 F, rabbit_misc:gb_trees_fold( 2140 F, accumulate_ack_init(), RPA), DPA), QPA), 2141 State1 = State #vqstate { ram_pending_ack = gb_trees:empty(), 2142 disk_pending_ack = gb_trees:empty(), 2143 qi_pending_ack = gb_trees:empty()}, 2144 {IndexOnDiskSeqIds, MsgIdsByStore, State1}. 2145 2146%% MsgIdsByStore is an map with two keys: 2147%% 2148%% true: holds a list of Persistent Message Ids. 2149%% false: holds a list of Transient Message Ids. 2150%% 2151%% When we call maps:to_list/1 we get two sets of msg ids, where 2152%% IsPersistent is either true for persistent messages or false for 2153%% transient ones. The msg_store_remove/3 function takes this boolean 2154%% flag to determine from which store the messages should be removed 2155%% from. 2156remove_msgs_by_id(MsgIdsByStore, MSCState) -> 2157 [ok = msg_store_remove(MSCState, IsPersistent, MsgIds) 2158 || {IsPersistent, MsgIds} <- maps:to_list(MsgIdsByStore)]. 2159 2160remove_transient_msgs_by_id(MsgIdsByStore, MSCState) -> 2161 case maps:find(false, MsgIdsByStore) of 2162 error -> ok; 2163 {ok, MsgIds} -> ok = msg_store_remove(MSCState, false, MsgIds) 2164 end. 2165 2166accumulate_ack_init() -> {[], maps:new(), []}. 2167 2168accumulate_ack(#msg_status { seq_id = SeqId, 2169 msg_id = MsgId, 2170 is_persistent = IsPersistent, 2171 msg_in_store = MsgInStore, 2172 index_on_disk = IndexOnDisk }, 2173 {IndexOnDiskSeqIdsAcc, MsgIdsByStore, AllMsgIds}) -> 2174 {cons_if(IndexOnDisk, SeqId, IndexOnDiskSeqIdsAcc), 2175 case MsgInStore of 2176 true -> rabbit_misc:maps_cons(IsPersistent, MsgId, MsgIdsByStore); 2177 false -> MsgIdsByStore 2178 end, 2179 [MsgId | AllMsgIds]}. 2180 2181%%---------------------------------------------------------------------------- 2182%% Internal plumbing for confirms (aka publisher acks) 2183%%---------------------------------------------------------------------------- 2184 2185record_confirms(MsgIdSet, State = #vqstate { msgs_on_disk = MOD, 2186 msg_indices_on_disk = MIOD, 2187 unconfirmed = UC, 2188 confirmed = C }) -> 2189 State #vqstate { 2190 msgs_on_disk = rabbit_misc:gb_sets_difference(MOD, MsgIdSet), 2191 msg_indices_on_disk = rabbit_misc:gb_sets_difference(MIOD, MsgIdSet), 2192 unconfirmed = rabbit_misc:gb_sets_difference(UC, MsgIdSet), 2193 confirmed = gb_sets:union(C, MsgIdSet) }. 2194 2195msgs_written_to_disk(Callback, MsgIdSet, ignored) -> 2196 Callback(?MODULE, 2197 fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end); 2198msgs_written_to_disk(Callback, MsgIdSet, written) -> 2199 Callback(?MODULE, 2200 fun (?MODULE, State = #vqstate { msgs_on_disk = MOD, 2201 msg_indices_on_disk = MIOD, 2202 unconfirmed = UC }) -> 2203 Confirmed = gb_sets:intersection(UC, MsgIdSet), 2204 record_confirms(gb_sets:intersection(MsgIdSet, MIOD), 2205 State #vqstate { 2206 msgs_on_disk = 2207 gb_sets:union(MOD, Confirmed) }) 2208 end). 2209 2210msg_indices_written_to_disk(Callback, MsgIdSet) -> 2211 Callback(?MODULE, 2212 fun (?MODULE, State = #vqstate { msgs_on_disk = MOD, 2213 msg_indices_on_disk = MIOD, 2214 unconfirmed = UC }) -> 2215 Confirmed = gb_sets:intersection(UC, MsgIdSet), 2216 record_confirms(gb_sets:intersection(MsgIdSet, MOD), 2217 State #vqstate { 2218 msg_indices_on_disk = 2219 gb_sets:union(MIOD, Confirmed) }) 2220 end). 2221 2222msgs_and_indices_written_to_disk(Callback, MsgIdSet) -> 2223 Callback(?MODULE, 2224 fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end). 2225 2226%%---------------------------------------------------------------------------- 2227%% Internal plumbing for requeue 2228%%---------------------------------------------------------------------------- 2229 2230publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) -> 2231 {Msg, State1} = read_msg(MsgStatus, State), 2232 MsgStatus1 = MsgStatus#msg_status { msg = Msg }, 2233 {MsgStatus1, stats({1, -1}, {MsgStatus, MsgStatus1}, 0, State1)}; 2234publish_alpha(MsgStatus, State) -> 2235 {MsgStatus, stats({1, -1}, {MsgStatus, MsgStatus}, 0, State)}. 2236 2237publish_beta(MsgStatus, State) -> 2238 {MsgStatus1, State1} = maybe_prepare_write_to_disk(true, false, MsgStatus, State), 2239 MsgStatus2 = m(trim_msg_status(MsgStatus1)), 2240 {MsgStatus2, stats({1, -1}, {MsgStatus, MsgStatus2}, 0, State1)}. 2241 2242%% Rebuild queue, inserting sequence ids to maintain ordering 2243queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, State) -> 2244 queue_merge(SeqIds, Q, ?QUEUE:new(), MsgIds, 2245 Limit, PubFun, State). 2246 2247queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds, 2248 Limit, PubFun, State) 2249 when Limit == undefined orelse SeqId < Limit -> 2250 case ?QUEUE:out(Q) of 2251 {{value, #msg_status { seq_id = SeqIdQ } = MsgStatus}, Q1} 2252 when SeqIdQ < SeqId -> 2253 %% enqueue from the remaining queue 2254 queue_merge(SeqIds, Q1, ?QUEUE:in(MsgStatus, Front), MsgIds, 2255 Limit, PubFun, State); 2256 {_, _Q1} -> 2257 %% enqueue from the remaining list of sequence ids 2258 case msg_from_pending_ack(SeqId, State) of 2259 {none, _} -> 2260 queue_merge(Rest, Q, Front, MsgIds, Limit, PubFun, State); 2261 {MsgStatus, State1} -> 2262 {#msg_status { msg_id = MsgId } = MsgStatus1, State2} = 2263 PubFun(MsgStatus, State1), 2264 queue_merge(Rest, Q, ?QUEUE:in(MsgStatus1, Front), [MsgId | MsgIds], 2265 Limit, PubFun, State2) 2266 end 2267 end; 2268queue_merge(SeqIds, Q, Front, MsgIds, 2269 _Limit, _PubFun, State) -> 2270 {SeqIds, ?QUEUE:join(Front, Q), MsgIds, State}. 2271 2272delta_merge([], Delta, MsgIds, State) -> 2273 {Delta, MsgIds, State}; 2274delta_merge(SeqIds, Delta, MsgIds, State) -> 2275 lists:foldl(fun (SeqId, {Delta0, MsgIds0, State0} = Acc) -> 2276 case msg_from_pending_ack(SeqId, State0) of 2277 {none, _} -> 2278 Acc; 2279 {#msg_status { msg_id = MsgId, 2280 is_persistent = IsPersistent } = MsgStatus, State1} -> 2281 {_MsgStatus, State2} = 2282 maybe_prepare_write_to_disk(true, true, MsgStatus, State1), 2283 {expand_delta(SeqId, Delta0, IsPersistent), [MsgId | MsgIds0], 2284 stats({1, -1}, {MsgStatus, none}, 1, State2)} 2285 end 2286 end, {Delta, MsgIds, State}, SeqIds). 2287 2288%% Mostly opposite of record_pending_ack/2 2289msg_from_pending_ack(SeqId, State) -> 2290 case remove_pending_ack(false, SeqId, State) of 2291 {none, _} -> 2292 {none, State}; 2293 {#msg_status { msg_props = MsgProps } = MsgStatus, State1} -> 2294 {MsgStatus #msg_status { 2295 msg_props = MsgProps #message_properties { needs_confirming = false } }, 2296 State1} 2297 end. 2298 2299beta_limit(Q) -> 2300 case ?QUEUE:peek(Q) of 2301 {value, #msg_status { seq_id = SeqId }} -> SeqId; 2302 empty -> undefined 2303 end. 2304 2305delta_limit(?BLANK_DELTA_PATTERN(_)) -> undefined; 2306delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId. 2307 2308%%---------------------------------------------------------------------------- 2309%% Iterator 2310%%---------------------------------------------------------------------------- 2311 2312ram_ack_iterator(State) -> 2313 {ack, gb_trees:iterator(State#vqstate.ram_pending_ack)}. 2314 2315disk_ack_iterator(State) -> 2316 {ack, gb_trees:iterator(State#vqstate.disk_pending_ack)}. 2317 2318qi_ack_iterator(State) -> 2319 {ack, gb_trees:iterator(State#vqstate.qi_pending_ack)}. 2320 2321msg_iterator(State) -> istate(start, State). 2322 2323istate(start, State) -> {q4, State#vqstate.q4, State}; 2324istate(q4, State) -> {q3, State#vqstate.q3, State}; 2325istate(q3, State) -> {delta, State#vqstate.delta, State}; 2326istate(delta, State) -> {q2, State#vqstate.q2, State}; 2327istate(q2, State) -> {q1, State#vqstate.q1, State}; 2328istate(q1, _State) -> done. 2329 2330next({ack, It}, IndexState) -> 2331 case gb_trees:next(It) of 2332 none -> {empty, IndexState}; 2333 {_SeqId, MsgStatus, It1} -> Next = {ack, It1}, 2334 {value, MsgStatus, true, Next, IndexState} 2335 end; 2336next(done, IndexState) -> {empty, IndexState}; 2337next({delta, #delta{start_seq_id = SeqId, 2338 end_seq_id = SeqId}, State}, IndexState) -> 2339 next(istate(delta, State), IndexState); 2340next({delta, #delta{start_seq_id = SeqId, 2341 end_seq_id = SeqIdEnd} = Delta, State}, IndexState) -> 2342 SeqIdB = rabbit_queue_index:next_segment_boundary(SeqId), 2343 SeqId1 = lists:min([SeqIdB, SeqIdEnd]), 2344 {List, IndexState1} = rabbit_queue_index:read(SeqId, SeqId1, IndexState), 2345 next({delta, Delta#delta{start_seq_id = SeqId1}, List, State}, IndexState1); 2346next({delta, Delta, [], State}, IndexState) -> 2347 next({delta, Delta, State}, IndexState); 2348next({delta, Delta, [{_, SeqId, _, _, _} = M | Rest], State}, IndexState) -> 2349 case is_msg_in_pending_acks(SeqId, State) of 2350 false -> Next = {delta, Delta, Rest, State}, 2351 {value, beta_msg_status(M), false, Next, IndexState}; 2352 true -> next({delta, Delta, Rest, State}, IndexState) 2353 end; 2354next({Key, Q, State}, IndexState) -> 2355 case ?QUEUE:out(Q) of 2356 {empty, _Q} -> next(istate(Key, State), IndexState); 2357 {{value, MsgStatus}, QN} -> Next = {Key, QN, State}, 2358 {value, MsgStatus, false, Next, IndexState} 2359 end. 2360 2361inext(It, {Its, IndexState}) -> 2362 case next(It, IndexState) of 2363 {empty, IndexState1} -> 2364 {Its, IndexState1}; 2365 {value, MsgStatus1, Unacked, It1, IndexState1} -> 2366 {[{MsgStatus1, Unacked, It1} | Its], IndexState1} 2367 end. 2368 2369ifold(_Fun, Acc, [], State0) -> 2370 {Acc, State0}; 2371ifold(Fun, Acc, Its0, State0) -> 2372 [{MsgStatus, Unacked, It} | Rest] = 2373 lists:sort(fun ({#msg_status{seq_id = SeqId1}, _, _}, 2374 {#msg_status{seq_id = SeqId2}, _, _}) -> 2375 SeqId1 =< SeqId2 2376 end, Its0), 2377 {Msg, State1} = read_msg(MsgStatus, State0), 2378 case Fun(Msg, MsgStatus#msg_status.msg_props, Unacked, Acc) of 2379 {stop, Acc1} -> 2380 {Acc1, State1}; 2381 {cont, Acc1} -> 2382 IndexState0 = State1#vqstate.index_state, 2383 {Its1, IndexState1} = inext(It, {Rest, IndexState0}), 2384 State2 = State1#vqstate{index_state = IndexState1}, 2385 ifold(Fun, Acc1, Its1, State2) 2386 end. 2387 2388%%---------------------------------------------------------------------------- 2389%% Phase changes 2390%%---------------------------------------------------------------------------- 2391 2392maybe_reduce_memory_use(State = #vqstate {memory_reduction_run_count = MRedRunCount, 2393 mode = Mode}) -> 2394 case MRedRunCount >= ?EXPLICIT_GC_RUN_OP_THRESHOLD(Mode) of 2395 true -> State1 = reduce_memory_use(State), 2396 State1#vqstate{memory_reduction_run_count = 0}; 2397 false -> State#vqstate{memory_reduction_run_count = MRedRunCount + 1} 2398 end. 2399 2400reduce_memory_use(State = #vqstate { target_ram_count = infinity }) -> 2401 State; 2402reduce_memory_use(State = #vqstate { 2403 mode = default, 2404 ram_pending_ack = RPA, 2405 ram_msg_count = RamMsgCount, 2406 target_ram_count = TargetRamCount, 2407 io_batch_size = IoBatchSize, 2408 rates = #rates { in = AvgIngress, 2409 out = AvgEgress, 2410 ack_in = AvgAckIngress, 2411 ack_out = AvgAckEgress } }) -> 2412 {CreditDiscBound, _} =rabbit_misc:get_env(rabbit, 2413 msg_store_credit_disc_bound, 2414 ?CREDIT_DISC_BOUND), 2415 {NeedResumeA2B, State1} = {_, #vqstate { q2 = Q2, q3 = Q3 }} = 2416 case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of 2417 0 -> {false, State}; 2418 %% Reduce memory of pending acks and alphas. The order is 2419 %% determined based on which is growing faster. Whichever 2420 %% comes second may very well get a quota of 0 if the 2421 %% first manages to push out the max number of messages. 2422 A2BChunk -> 2423 %% In case there are few messages to be sent to a message store 2424 %% and many messages to be embedded to the queue index, 2425 %% we should limit the number of messages to be flushed 2426 %% to avoid blocking the process. 2427 A2BChunkActual = case A2BChunk > CreditDiscBound * 2 of 2428 true -> CreditDiscBound * 2; 2429 false -> A2BChunk 2430 end, 2431 Funs = case ((AvgAckIngress - AvgAckEgress) > 2432 (AvgIngress - AvgEgress)) of 2433 true -> [fun limit_ram_acks/2, 2434 fun push_alphas_to_betas/2]; 2435 false -> [fun push_alphas_to_betas/2, 2436 fun limit_ram_acks/2] 2437 end, 2438 {Quota, State2} = lists:foldl(fun (ReduceFun, {QuotaN, StateN}) -> 2439 ReduceFun(QuotaN, StateN) 2440 end, {A2BChunkActual, State}, Funs), 2441 {(Quota == 0) andalso (A2BChunk > A2BChunkActual), State2} 2442 end, 2443 Permitted = permitted_beta_count(State1), 2444 {NeedResumeB2D, State3} = 2445 %% If there are more messages with their queue position held in RAM, 2446 %% a.k.a. betas, in Q2 & Q3 than IoBatchSize, 2447 %% write their queue position to disk, a.k.a. push_betas_to_deltas 2448 case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3), 2449 Permitted) of 2450 B2DChunk when B2DChunk >= IoBatchSize -> 2451 %% Same as for alphas to betas. Limit a number of messages 2452 %% to be flushed to disk at once to avoid blocking the process. 2453 B2DChunkActual = case B2DChunk > CreditDiscBound * 2 of 2454 true -> CreditDiscBound * 2; 2455 false -> B2DChunk 2456 end, 2457 StateBD = push_betas_to_deltas(B2DChunkActual, State1), 2458 {B2DChunk > B2DChunkActual, StateBD}; 2459 _ -> 2460 {false, State1} 2461 end, 2462 %% We can be blocked by the credit flow, or limited by a batch size, 2463 %% or finished with flushing. 2464 %% If blocked by the credit flow - the credit grant will resume processing, 2465 %% if limited by a batch - the batch continuation message should be sent. 2466 %% The continuation message will be prioritised over publishes, 2467 %% but not consumptions, so the queue can make progess. 2468 Blocked = credit_flow:blocked(), 2469 case {Blocked, NeedResumeA2B orelse NeedResumeB2D} of 2470 %% Credit bump will continue paging 2471 {true, _} -> State3; 2472 %% Finished with paging 2473 {false, false} -> State3; 2474 %% Planning next batch 2475 {false, true} -> 2476 %% We don't want to use self-credit-flow, because it's harder to 2477 %% reason about. So the process sends a (prioritised) message to 2478 %% itself and sets a waiting_bump value to keep the message box clean 2479 maybe_bump_reduce_memory_use(State3) 2480 end; 2481%% When using lazy queues, there are no alphas, so we don't need to 2482%% call push_alphas_to_betas/2. 2483reduce_memory_use(State = #vqstate { 2484 mode = lazy, 2485 ram_pending_ack = RPA, 2486 ram_msg_count = RamMsgCount, 2487 target_ram_count = TargetRamCount }) -> 2488 State1 = #vqstate { q3 = Q3 } = 2489 case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of 2490 0 -> State; 2491 S1 -> {_, State2} = limit_ram_acks(S1, State), 2492 State2 2493 end, 2494 2495 State3 = 2496 case chunk_size(?QUEUE:len(Q3), 2497 permitted_beta_count(State1)) of 2498 0 -> 2499 State1; 2500 S2 -> 2501 push_betas_to_deltas(S2, State1) 2502 end, 2503 garbage_collect(), 2504 State3. 2505 2506maybe_bump_reduce_memory_use(State = #vqstate{ waiting_bump = true }) -> 2507 State; 2508maybe_bump_reduce_memory_use(State) -> 2509 self() ! bump_reduce_memory_use, 2510 State#vqstate{ waiting_bump = true }. 2511 2512limit_ram_acks(0, State) -> 2513 {0, ui(State)}; 2514limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA, 2515 disk_pending_ack = DPA }) -> 2516 case gb_trees:is_empty(RPA) of 2517 true -> 2518 {Quota, ui(State)}; 2519 false -> 2520 {SeqId, MsgStatus, RPA1} = gb_trees:take_largest(RPA), 2521 {MsgStatus1, State1} = 2522 maybe_prepare_write_to_disk(true, false, MsgStatus, State), 2523 MsgStatus2 = m(trim_msg_status(MsgStatus1)), 2524 DPA1 = gb_trees:insert(SeqId, MsgStatus2, DPA), 2525 limit_ram_acks(Quota - 1, 2526 stats({0, 0}, {MsgStatus, MsgStatus2}, 0, 2527 State1 #vqstate { ram_pending_ack = RPA1, 2528 disk_pending_ack = DPA1 })) 2529 end. 2530 2531permitted_beta_count(#vqstate { len = 0 }) -> 2532 infinity; 2533permitted_beta_count(#vqstate { mode = lazy, 2534 target_ram_count = TargetRamCount}) -> 2535 TargetRamCount; 2536permitted_beta_count(#vqstate { target_ram_count = 0, q3 = Q3 }) -> 2537 lists:min([?QUEUE:len(Q3), rabbit_queue_index:next_segment_boundary(0)]); 2538permitted_beta_count(#vqstate { q1 = Q1, 2539 q4 = Q4, 2540 target_ram_count = TargetRamCount, 2541 len = Len }) -> 2542 BetaDelta = Len - ?QUEUE:len(Q1) - ?QUEUE:len(Q4), 2543 lists:max([rabbit_queue_index:next_segment_boundary(0), 2544 BetaDelta - ((BetaDelta * BetaDelta) div 2545 (BetaDelta + TargetRamCount))]). 2546 2547chunk_size(Current, Permitted) 2548 when Permitted =:= infinity orelse Permitted >= Current -> 2549 0; 2550chunk_size(Current, Permitted) -> 2551 Current - Permitted. 2552 2553fetch_from_q3(State = #vqstate { mode = default, 2554 q1 = Q1, 2555 q2 = Q2, 2556 delta = #delta { count = DeltaCount }, 2557 q3 = Q3, 2558 q4 = Q4 }) -> 2559 case ?QUEUE:out(Q3) of 2560 {empty, _Q3} -> 2561 {empty, State}; 2562 {{value, MsgStatus}, Q3a} -> 2563 State1 = State #vqstate { q3 = Q3a }, 2564 State2 = case {?QUEUE:is_empty(Q3a), 0 == DeltaCount} of 2565 {true, true} -> 2566 %% q3 is now empty, it wasn't before; 2567 %% delta is still empty. So q2 must be 2568 %% empty, and we know q4 is empty 2569 %% otherwise we wouldn't be loading from 2570 %% q3. As such, we can just set q4 to Q1. 2571 true = ?QUEUE:is_empty(Q2), %% ASSERTION 2572 true = ?QUEUE:is_empty(Q4), %% ASSERTION 2573 State1 #vqstate { q1 = ?QUEUE:new(), q4 = Q1 }; 2574 {true, false} -> 2575 maybe_deltas_to_betas(State1); 2576 {false, _} -> 2577 %% q3 still isn't empty, we've not 2578 %% touched delta, so the invariants 2579 %% between q1, q2, delta and q3 are 2580 %% maintained 2581 State1 2582 end, 2583 {loaded, {MsgStatus, State2}} 2584 end; 2585%% lazy queues 2586fetch_from_q3(State = #vqstate { mode = lazy, 2587 delta = #delta { count = DeltaCount }, 2588 q3 = Q3 }) -> 2589 case ?QUEUE:out(Q3) of 2590 {empty, _Q3} when DeltaCount =:= 0 -> 2591 {empty, State}; 2592 {empty, _Q3} -> 2593 fetch_from_q3(maybe_deltas_to_betas(State)); 2594 {{value, MsgStatus}, Q3a} -> 2595 State1 = State #vqstate { q3 = Q3a }, 2596 {loaded, {MsgStatus, State1}} 2597 end. 2598 2599maybe_deltas_to_betas(State) -> 2600 AfterFun = process_delivers_and_acks_fun(deliver_and_ack), 2601 maybe_deltas_to_betas(AfterFun, State). 2602 2603maybe_deltas_to_betas(_DelsAndAcksFun, 2604 State = #vqstate {delta = ?BLANK_DELTA_PATTERN(X) }) -> 2605 State; 2606maybe_deltas_to_betas(DelsAndAcksFun, 2607 State = #vqstate { 2608 q2 = Q2, 2609 delta = Delta, 2610 q3 = Q3, 2611 index_state = IndexState, 2612 ram_msg_count = RamMsgCount, 2613 ram_bytes = RamBytes, 2614 disk_read_count = DiskReadCount, 2615 delta_transient_bytes = DeltaTransientBytes, 2616 transient_threshold = TransientThreshold }) -> 2617 #delta { start_seq_id = DeltaSeqId, 2618 count = DeltaCount, 2619 transient = Transient, 2620 end_seq_id = DeltaSeqIdEnd } = Delta, 2621 DeltaSeqId1 = 2622 lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId), 2623 DeltaSeqIdEnd]), 2624 {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, 2625 IndexState), 2626 {Q3a, RamCountsInc, RamBytesInc, State1, TransientCount, TransientBytes} = 2627 betas_from_index_entries(List, TransientThreshold, 2628 DelsAndAcksFun, 2629 State #vqstate { index_state = IndexState1 }), 2630 State2 = State1 #vqstate { ram_msg_count = RamMsgCount + RamCountsInc, 2631 ram_bytes = RamBytes + RamBytesInc, 2632 disk_read_count = DiskReadCount + RamCountsInc }, 2633 case ?QUEUE:len(Q3a) of 2634 0 -> 2635 %% we ignored every message in the segment due to it being 2636 %% transient and below the threshold 2637 maybe_deltas_to_betas( 2638 DelsAndAcksFun, 2639 State2 #vqstate { 2640 delta = d(Delta #delta { start_seq_id = DeltaSeqId1 })}); 2641 Q3aLen -> 2642 Q3b = ?QUEUE:join(Q3, Q3a), 2643 case DeltaCount - Q3aLen of 2644 0 -> 2645 %% delta is now empty, but it wasn't before, so 2646 %% can now join q2 onto q3 2647 State2 #vqstate { q2 = ?QUEUE:new(), 2648 delta = ?BLANK_DELTA, 2649 q3 = ?QUEUE:join(Q3b, Q2), 2650 delta_transient_bytes = 0}; 2651 N when N > 0 -> 2652 Delta1 = d(#delta { start_seq_id = DeltaSeqId1, 2653 count = N, 2654 transient = Transient - TransientCount, 2655 end_seq_id = DeltaSeqIdEnd }), 2656 State2 #vqstate { delta = Delta1, 2657 q3 = Q3b, 2658 delta_transient_bytes = DeltaTransientBytes - TransientBytes } 2659 end 2660 end. 2661 2662push_alphas_to_betas(Quota, State) -> 2663 {Quota1, State1} = 2664 push_alphas_to_betas( 2665 fun ?QUEUE:out/1, 2666 fun (MsgStatus, Q1a, 2667 State0 = #vqstate { q3 = Q3, delta = #delta { count = 0, 2668 transient = 0 } }) -> 2669 State0 #vqstate { q1 = Q1a, q3 = ?QUEUE:in(MsgStatus, Q3) }; 2670 (MsgStatus, Q1a, State0 = #vqstate { q2 = Q2 }) -> 2671 State0 #vqstate { q1 = Q1a, q2 = ?QUEUE:in(MsgStatus, Q2) } 2672 end, Quota, State #vqstate.q1, State), 2673 {Quota2, State2} = 2674 push_alphas_to_betas( 2675 fun ?QUEUE:out_r/1, 2676 fun (MsgStatus, Q4a, State0 = #vqstate { q3 = Q3 }) -> 2677 State0 #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3), q4 = Q4a } 2678 end, Quota1, State1 #vqstate.q4, State1), 2679 {Quota2, State2}. 2680 2681push_alphas_to_betas(_Generator, _Consumer, Quota, _Q, 2682 State = #vqstate { ram_msg_count = RamMsgCount, 2683 target_ram_count = TargetRamCount }) 2684 when Quota =:= 0 orelse 2685 TargetRamCount =:= infinity orelse 2686 TargetRamCount >= RamMsgCount -> 2687 {Quota, ui(State)}; 2688push_alphas_to_betas(Generator, Consumer, Quota, Q, State) -> 2689 %% We consume credits from the message_store whenever we need to 2690 %% persist a message to disk. See: 2691 %% rabbit_variable_queue:msg_store_write/4. So perhaps the 2692 %% msg_store is trying to throttle down our queue. 2693 case credit_flow:blocked() of 2694 true -> {Quota, ui(State)}; 2695 false -> case Generator(Q) of 2696 {empty, _Q} -> 2697 {Quota, ui(State)}; 2698 {{value, MsgStatus}, Qa} -> 2699 {MsgStatus1, State1} = 2700 maybe_prepare_write_to_disk(true, false, MsgStatus, 2701 State), 2702 MsgStatus2 = m(trim_msg_status(MsgStatus1)), 2703 State2 = stats( 2704 ready0, {MsgStatus, MsgStatus2}, 0, State1), 2705 State3 = Consumer(MsgStatus2, Qa, State2), 2706 push_alphas_to_betas(Generator, Consumer, Quota - 1, 2707 Qa, State3) 2708 end 2709 end. 2710 2711push_betas_to_deltas(Quota, State = #vqstate { mode = default, 2712 q2 = Q2, 2713 delta = Delta, 2714 q3 = Q3}) -> 2715 PushState = {Quota, Delta, State}, 2716 {Q3a, PushState1} = push_betas_to_deltas( 2717 fun ?QUEUE:out_r/1, 2718 fun rabbit_queue_index:next_segment_boundary/1, 2719 Q3, PushState), 2720 {Q2a, PushState2} = push_betas_to_deltas( 2721 fun ?QUEUE:out/1, 2722 fun (Q2MinSeqId) -> Q2MinSeqId end, 2723 Q2, PushState1), 2724 {_, Delta1, State1} = PushState2, 2725 State1 #vqstate { q2 = Q2a, 2726 delta = Delta1, 2727 q3 = Q3a }; 2728%% In the case of lazy queues we want to page as many messages as 2729%% possible from q3. 2730push_betas_to_deltas(Quota, State = #vqstate { mode = lazy, 2731 delta = Delta, 2732 q3 = Q3}) -> 2733 PushState = {Quota, Delta, State}, 2734 {Q3a, PushState1} = push_betas_to_deltas( 2735 fun ?QUEUE:out_r/1, 2736 fun (Q2MinSeqId) -> Q2MinSeqId end, 2737 Q3, PushState), 2738 {_, Delta1, State1} = PushState1, 2739 State1 #vqstate { delta = Delta1, 2740 q3 = Q3a }. 2741 2742 2743push_betas_to_deltas(Generator, LimitFun, Q, PushState) -> 2744 case ?QUEUE:is_empty(Q) of 2745 true -> 2746 {Q, PushState}; 2747 false -> 2748 {value, #msg_status { seq_id = MinSeqId }} = ?QUEUE:peek(Q), 2749 {value, #msg_status { seq_id = MaxSeqId }} = ?QUEUE:peek_r(Q), 2750 Limit = LimitFun(MinSeqId), 2751 case MaxSeqId < Limit of 2752 true -> {Q, PushState}; 2753 false -> push_betas_to_deltas1(Generator, Limit, Q, PushState) 2754 end 2755 end. 2756 2757push_betas_to_deltas1(_Generator, _Limit, Q, {0, Delta, State}) -> 2758 {Q, {0, Delta, ui(State)}}; 2759push_betas_to_deltas1(Generator, Limit, Q, {Quota, Delta, State}) -> 2760 case Generator(Q) of 2761 {empty, _Q} -> 2762 {Q, {Quota, Delta, ui(State)}}; 2763 {{value, #msg_status { seq_id = SeqId }}, _Qa} 2764 when SeqId < Limit -> 2765 {Q, {Quota, Delta, ui(State)}}; 2766 {{value, MsgStatus = #msg_status { seq_id = SeqId }}, Qa} -> 2767 {#msg_status { index_on_disk = true, 2768 is_persistent = IsPersistent }, State1} = 2769 maybe_batch_write_index_to_disk(true, MsgStatus, State), 2770 State2 = stats(ready0, {MsgStatus, none}, 1, State1), 2771 Delta1 = expand_delta(SeqId, Delta, IsPersistent), 2772 push_betas_to_deltas1(Generator, Limit, Qa, 2773 {Quota - 1, Delta1, State2}) 2774 end. 2775 2776%% Flushes queue index batch caches and updates queue index state. 2777ui(#vqstate{index_state = IndexState, 2778 target_ram_count = TargetRamCount} = State) -> 2779 IndexState1 = rabbit_queue_index:flush_pre_publish_cache( 2780 TargetRamCount, IndexState), 2781 State#vqstate{index_state = IndexState1}. 2782 2783%%---------------------------------------------------------------------------- 2784%% Upgrading 2785%%---------------------------------------------------------------------------- 2786 2787-spec multiple_routing_keys() -> 'ok'. 2788 2789multiple_routing_keys() -> 2790 transform_storage( 2791 fun ({basic_message, ExchangeName, Routing_Key, Content, 2792 MsgId, Persistent}) -> 2793 {ok, {basic_message, ExchangeName, [Routing_Key], Content, 2794 MsgId, Persistent}}; 2795 (_) -> {error, corrupt_message} 2796 end), 2797 ok. 2798 2799 2800%% Assumes message store is not running 2801transform_storage(TransformFun) -> 2802 transform_store(?PERSISTENT_MSG_STORE, TransformFun), 2803 transform_store(?TRANSIENT_MSG_STORE, TransformFun). 2804 2805transform_store(Store, TransformFun) -> 2806 rabbit_msg_store:force_recovery(rabbit_mnesia:dir(), Store), 2807 rabbit_msg_store:transform_dir(rabbit_mnesia:dir(), Store, TransformFun). 2808 2809move_messages_to_vhost_store() -> 2810 case list_persistent_queues() of 2811 [] -> 2812 log_upgrade("No durable queues found." 2813 " Skipping message store migration"), 2814 ok; 2815 Queues -> 2816 move_messages_to_vhost_store(Queues) 2817 end, 2818 ok = delete_old_store(), 2819 ok = rabbit_queue_index:cleanup_global_recovery_terms(). 2820 2821move_messages_to_vhost_store(Queues) -> 2822 log_upgrade("Moving messages to per-vhost message store"), 2823 %% Move the queue index for each persistent queue to the new store 2824 lists:foreach( 2825 fun(Queue) -> 2826 QueueName = amqqueue:get_name(Queue), 2827 rabbit_queue_index:move_to_per_vhost_stores(QueueName) 2828 end, 2829 Queues), 2830 %% Legacy (global) msg_store may require recovery. 2831 %% This upgrade step should only be started 2832 %% if we are upgrading from a pre-3.7.0 version. 2833 {QueuesWithTerms, RecoveryRefs, StartFunState} = read_old_recovery_terms(Queues), 2834 2835 OldStore = run_old_persistent_store(RecoveryRefs, StartFunState), 2836 2837 VHosts = rabbit_vhost:list_names(), 2838 2839 %% New store should not be recovered. 2840 NewMsgStore = start_new_store(VHosts), 2841 %% Recovery terms should be started for all vhosts for new store. 2842 [ok = rabbit_recovery_terms:open_table(VHost) || VHost <- VHosts], 2843 2844 MigrationBatchSize = application:get_env(rabbit, queue_migration_batch_size, 2845 ?QUEUE_MIGRATION_BATCH_SIZE), 2846 in_batches(MigrationBatchSize, 2847 {rabbit_variable_queue, migrate_queue, [OldStore, NewMsgStore]}, 2848 QueuesWithTerms, 2849 "message_store upgrades: Migrating batch ~p of ~p queues. Out of total ~p ", 2850 "message_store upgrades: Batch ~p of ~p queues migrated ~n. ~p total left"), 2851 2852 log_upgrade("Message store migration finished"), 2853 ok = rabbit_sup:stop_child(OldStore), 2854 [ok= rabbit_recovery_terms:close_table(VHost) || VHost <- VHosts], 2855 ok = stop_new_store(NewMsgStore). 2856 2857in_batches(Size, MFA, List, MessageStart, MessageEnd) -> 2858 in_batches(Size, 1, MFA, List, MessageStart, MessageEnd). 2859 2860in_batches(_, _, _, [], _, _) -> ok; 2861in_batches(Size, BatchNum, MFA, List, MessageStart, MessageEnd) -> 2862 Length = length(List), 2863 {Batch, Tail} = case Size > Length of 2864 true -> {List, []}; 2865 false -> lists:split(Size, List) 2866 end, 2867 ProcessedLength = (BatchNum - 1) * Size, 2868 rabbit_log:info(MessageStart, [BatchNum, Size, ProcessedLength + Length]), 2869 {M, F, A} = MFA, 2870 Keys = [ rpc:async_call(node(), M, F, [El | A]) || El <- Batch ], 2871 lists:foreach(fun(Key) -> 2872 case rpc:yield(Key) of 2873 {badrpc, Err} -> throw(Err); 2874 _ -> ok 2875 end 2876 end, 2877 Keys), 2878 rabbit_log:info(MessageEnd, [BatchNum, Size, length(Tail)]), 2879 in_batches(Size, BatchNum + 1, MFA, Tail, MessageStart, MessageEnd). 2880 2881migrate_queue({QueueName = #resource{virtual_host = VHost, name = Name}, 2882 RecoveryTerm}, 2883 OldStore, NewStore) -> 2884 log_upgrade_verbose( 2885 "Migrating messages in queue ~s in vhost ~s to per-vhost message store", 2886 [Name, VHost]), 2887 OldStoreClient = get_global_store_client(OldStore), 2888 NewStoreClient = get_per_vhost_store_client(QueueName, NewStore), 2889 %% WARNING: During scan_queue_segments queue index state is being recovered 2890 %% and terminated. This can cause side effects! 2891 rabbit_queue_index:scan_queue_segments( 2892 %% We migrate only persistent messages which are found in message store 2893 %% and are not acked yet 2894 fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, no_ack, OldC) 2895 when is_binary(MsgId) -> 2896 migrate_message(MsgId, OldC, NewStoreClient); 2897 (_SeqId, _MsgId, _MsgProps, 2898 _IsPersistent, _IsDelivered, _IsAcked, OldC) -> 2899 OldC 2900 end, 2901 OldStoreClient, 2902 QueueName), 2903 rabbit_msg_store:client_terminate(OldStoreClient), 2904 rabbit_msg_store:client_terminate(NewStoreClient), 2905 NewClientRef = rabbit_msg_store:client_ref(NewStoreClient), 2906 case RecoveryTerm of 2907 non_clean_shutdown -> ok; 2908 Term when is_list(Term) -> 2909 NewRecoveryTerm = lists:keyreplace(persistent_ref, 1, RecoveryTerm, 2910 {persistent_ref, NewClientRef}), 2911 rabbit_queue_index:update_recovery_term(QueueName, NewRecoveryTerm) 2912 end, 2913 log_upgrade_verbose("Finished migrating queue ~s in vhost ~s", [Name, VHost]), 2914 {QueueName, NewClientRef}. 2915 2916migrate_message(MsgId, OldC, NewC) -> 2917 case rabbit_msg_store:read(MsgId, OldC) of 2918 {{ok, Msg}, OldC1} -> 2919 ok = rabbit_msg_store:write(MsgId, Msg, NewC), 2920 OldC1; 2921 _ -> OldC 2922 end. 2923 2924get_per_vhost_store_client(#resource{virtual_host = VHost}, NewStore) -> 2925 {VHost, StorePid} = lists:keyfind(VHost, 1, NewStore), 2926 rabbit_msg_store:client_init(StorePid, rabbit_guid:gen(), 2927 fun(_,_) -> ok end, fun() -> ok end). 2928 2929get_global_store_client(OldStore) -> 2930 rabbit_msg_store:client_init(OldStore, 2931 rabbit_guid:gen(), 2932 fun(_,_) -> ok end, 2933 fun() -> ok end). 2934 2935list_persistent_queues() -> 2936 Node = node(), 2937 mnesia:async_dirty( 2938 fun () -> 2939 qlc:e(qlc:q([Q || Q <- mnesia:table(rabbit_durable_queue), 2940 ?amqqueue_is_classic(Q), 2941 amqqueue:qnode(Q) == Node, 2942 mnesia:read(rabbit_queue, amqqueue:get_name(Q), read) =:= []])) 2943 end). 2944 2945read_old_recovery_terms([]) -> 2946 {[], [], ?EMPTY_START_FUN_STATE}; 2947read_old_recovery_terms(Queues) -> 2948 QueueNames = [amqqueue:get_name(Q) || Q <- Queues], 2949 {AllTerms, StartFunState} = rabbit_queue_index:read_global_recovery_terms(QueueNames), 2950 Refs = [Ref || Terms <- AllTerms, 2951 Terms /= non_clean_shutdown, 2952 begin 2953 Ref = proplists:get_value(persistent_ref, Terms), 2954 Ref =/= undefined 2955 end], 2956 {lists:zip(QueueNames, AllTerms), Refs, StartFunState}. 2957 2958run_old_persistent_store(Refs, StartFunState) -> 2959 OldStoreName = ?PERSISTENT_MSG_STORE, 2960 ok = rabbit_sup:start_child(OldStoreName, rabbit_msg_store, start_global_store_link, 2961 [OldStoreName, rabbit_mnesia:dir(), 2962 Refs, StartFunState]), 2963 OldStoreName. 2964 2965start_new_store(VHosts) -> 2966 %% Ensure vhost supervisor is started, so we can add vhosts to it. 2967 lists:map(fun(VHost) -> 2968 VHostDir = rabbit_vhost:msg_store_dir_path(VHost), 2969 {ok, Pid} = rabbit_msg_store:start_link(?PERSISTENT_MSG_STORE, 2970 VHostDir, 2971 undefined, 2972 ?EMPTY_START_FUN_STATE), 2973 {VHost, Pid} 2974 end, 2975 VHosts). 2976 2977stop_new_store(NewStore) -> 2978 lists:foreach(fun({_VHost, StorePid}) -> 2979 unlink(StorePid), 2980 exit(StorePid, shutdown) 2981 end, 2982 NewStore), 2983 ok. 2984 2985delete_old_store() -> 2986 log_upgrade("Removing the old message store data"), 2987 rabbit_file:recursive_delete( 2988 [filename:join([rabbit_mnesia:dir(), ?PERSISTENT_MSG_STORE])]), 2989 %% Delete old transient store as well 2990 rabbit_file:recursive_delete( 2991 [filename:join([rabbit_mnesia:dir(), ?TRANSIENT_MSG_STORE])]), 2992 ok. 2993 2994log_upgrade(Msg) -> 2995 log_upgrade(Msg, []). 2996 2997log_upgrade(Msg, Args) -> 2998 rabbit_log:info("message_store upgrades: " ++ Msg, Args). 2999 3000log_upgrade_verbose(Msg) -> 3001 log_upgrade_verbose(Msg, []). 3002 3003log_upgrade_verbose(Msg, Args) -> 3004 rabbit_log_upgrade:info(Msg, Args). 3005 3006maybe_client_terminate(MSCStateP) -> 3007 %% Queue might have been asked to stop by the supervisor, it needs a clean 3008 %% shutdown in order for the supervising strategy to work - if it reaches max 3009 %% restarts might bring the vhost down. 3010 try 3011 rabbit_msg_store:client_terminate(MSCStateP) 3012 catch 3013 _:_ -> 3014 ok 3015 end. 3016