1%% This Source Code Form is subject to the terms of the Mozilla Public
2%% License, v. 2.0. If a copy of the MPL was not distributed with this
3%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4%%
5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates.  All rights reserved.
6%%
7
8-module(rabbit_variable_queue).
9
10-export([init/3, terminate/2, delete_and_terminate/2, delete_crashed/1,
11         purge/1, purge_acks/1,
12         publish/6, publish_delivered/5,
13         batch_publish/4, batch_publish_delivered/4,
14         discard/4, drain_confirmed/1,
15         dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/2,
16         ackfold/4, fold/3, len/1, is_empty/1, depth/1,
17         set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1,
18         handle_pre_hibernate/1, resume/1, msg_rates/1,
19         info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
20         zip_msgs_and_acks/4,  multiple_routing_keys/0, handle_info/2]).
21
22-export([start/2, stop/1]).
23
24%% exported for testing only
25-export([start_msg_store/3, stop_msg_store/1, init/6]).
26
27-export([move_messages_to_vhost_store/0]).
28
29-export([migrate_queue/3, migrate_message/3, get_per_vhost_store_client/2,
30         get_global_store_client/1, log_upgrade_verbose/1,
31         log_upgrade_verbose/2]).
32
33-include_lib("stdlib/include/qlc.hrl").
34
35-define(QUEUE_MIGRATION_BATCH_SIZE, 100).
36-define(EMPTY_START_FUN_STATE, {fun (ok) -> finished end, ok}).
37
38%%----------------------------------------------------------------------------
39%% Messages, and their position in the queue, can be in memory or on
40%% disk, or both. Persistent messages will have both message and
41%% position pushed to disk as soon as they arrive; transient messages
42%% can be written to disk (and thus both types can be evicted from
43%% memory) under memory pressure. The question of whether a message is
44%% in RAM and whether it is persistent are orthogonal.
45%%
46%% Messages are persisted using the queue index and the message
47%% store. Normally the queue index holds the position of the message
48%% *within this queue* along with a couple of small bits of metadata,
49%% while the message store holds the message itself (including headers
50%% and other properties).
51%%
52%% However, as an optimisation, small messages can be embedded
53%% directly in the queue index and bypass the message store
54%% altogether.
55%%
56%% Definitions:
57%%
58%% alpha: this is a message where both the message itself, and its
59%%        position within the queue are held in RAM
60%%
61%% beta:  this is a message where the message itself is only held on
62%%        disk (if persisted to the message store) but its position
63%%        within the queue is held in RAM.
64%%
65%% gamma: this is a message where the message itself is only held on
66%%        disk, but its position is both in RAM and on disk.
67%%
68%% delta: this is a collection of messages, represented by a single
69%%        term, where the messages and their position are only held on
70%%        disk.
71%%
72%% Note that for persistent messages, the message and its position
73%% within the queue are always held on disk, *in addition* to being in
74%% one of the above classifications.
75%%
76%% Also note that within this code, the term gamma seldom
77%% appears. It's frequently the case that gammas are defined by betas
78%% who have had their queue position recorded on disk.
79%%
80%% In general, messages move q1 -> q2 -> delta -> q3 -> q4, though
81%% many of these steps are frequently skipped. q1 and q4 only hold
82%% alphas, q2 and q3 hold both betas and gammas. When a message
83%% arrives, its classification is determined. It is then added to the
84%% rightmost appropriate queue.
85%%
86%% If a new message is determined to be a beta or gamma, q1 is
87%% empty. If a new message is determined to be a delta, q1 and q2 are
88%% empty (and actually q4 too).
89%%
90%% When removing messages from a queue, if q4 is empty then q3 is read
91%% directly. If q3 becomes empty then the next segment's worth of
92%% messages from delta are read into q3, reducing the size of
93%% delta. If the queue is non empty, either q4 or q3 contain
94%% entries. It is never permitted for delta to hold all the messages
95%% in the queue.
96%%
97%% The duration indicated to us by the memory_monitor is used to
98%% calculate, given our current ingress and egress rates, how many
99%% messages we should hold in RAM (i.e. as alphas). We track the
100%% ingress and egress rates for both messages and pending acks and
101%% rates for both are considered when calculating the number of
102%% messages to hold in RAM. When we need to push alphas to betas or
103%% betas to gammas, we favour writing out messages that are further
104%% from the head of the queue. This minimises writes to disk, as the
105%% messages closer to the tail of the queue stay in the queue for
106%% longer, thus do not need to be replaced as quickly by sending other
107%% messages to disk.
108%%
109%% Whilst messages are pushed to disk and forgotten from RAM as soon
110%% as requested by a new setting of the queue RAM duration, the
111%% inverse is not true: we only load messages back into RAM as
112%% demanded as the queue is read from. Thus only publishes to the
113%% queue will take up available spare capacity.
114%%
115%% When we report our duration to the memory monitor, we calculate
116%% average ingress and egress rates over the last two samples, and
117%% then calculate our duration based on the sum of the ingress and
118%% egress rates. More than two samples could be used, but it's a
119%% balance between responding quickly enough to changes in
120%% producers/consumers versus ignoring temporary blips. The problem
121%% with temporary blips is that with just a few queues, they can have
122%% substantial impact on the calculation of the average duration and
123%% hence cause unnecessary I/O. Another alternative is to increase the
124%% amqqueue_process:RAM_DURATION_UPDATE_PERIOD to beyond 5
125%% seconds. However, that then runs the risk of being too slow to
126%% inform the memory monitor of changes. Thus a 5 second interval,
127%% plus a rolling average over the last two samples seems to work
128%% well in practice.
129%%
130%% The sum of the ingress and egress rates is used because the egress
131%% rate alone is not sufficient. Adding in the ingress rate means that
132%% queues which are being flooded by messages are given more memory,
133%% resulting in them being able to process the messages faster (by
134%% doing less I/O, or at least deferring it) and thus helping keep
135%% their mailboxes empty and thus the queue as a whole is more
136%% responsive. If such a queue also has fast but previously idle
137%% consumers, the consumer can then start to be driven as fast as it
138%% can go, whereas if only egress rate was being used, the incoming
139%% messages may have to be written to disk and then read back in,
140%% resulting in the hard disk being a bottleneck in driving the
141%% consumers. Generally, we want to give Rabbit every chance of
142%% getting rid of messages as fast as possible and remaining
143%% responsive, and using only the egress rate impacts that goal.
144%%
145%% Once the queue has more alphas than the target_ram_count, the
146%% surplus must be converted to betas, if not gammas, if not rolled
147%% into delta. The conditions under which these transitions occur
148%% reflect the conflicting goals of minimising RAM cost per msg, and
149%% minimising CPU cost per msg. Once the msg has become a beta, its
150%% payload is no longer in RAM, thus a read from the msg_store must
151%% occur before the msg can be delivered, but the RAM cost of a beta
152%% is the same as a gamma, so converting a beta to gamma will not free
153%% up any further RAM. To reduce the RAM cost further, the gamma must
154%% be rolled into delta. Whilst recovering a beta or a gamma to an
155%% alpha requires only one disk read (from the msg_store), recovering
156%% a msg from within delta will require two reads (queue_index and
157%% then msg_store). But delta has a near-0 per-msg RAM cost. So the
158%% conflict is between using delta more, which will free up more
159%% memory, but require additional CPU and disk ops, versus using delta
160%% less and gammas and betas more, which will cost more memory, but
161%% require fewer disk ops and less CPU overhead.
162%%
163%% In the case of a persistent msg published to a durable queue, the
164%% msg is immediately written to the msg_store and queue_index. If
165%% then additionally converted from an alpha, it'll immediately go to
166%% a gamma (as it's already in queue_index), and cannot exist as a
167%% beta. Thus a durable queue with a mixture of persistent and
168%% transient msgs in it which has more messages than permitted by the
169%% target_ram_count may contain an interspersed mixture of betas and
170%% gammas in q2 and q3.
171%%
172%% There is then a ratio that controls how many betas and gammas there
173%% can be. This is based on the target_ram_count and thus expresses
174%% the fact that as the number of permitted alphas in the queue falls,
175%% so should the number of betas and gammas fall (i.e. delta
176%% grows). If q2 and q3 contain more than the permitted number of
177%% betas and gammas, then the surplus are forcibly converted to gammas
178%% (as necessary) and then rolled into delta. The ratio is that
179%% delta/(betas+gammas+delta) equals
180%% (betas+gammas+delta)/(target_ram_count+betas+gammas+delta). I.e. as
181%% the target_ram_count shrinks to 0, so must betas and gammas.
182%%
183%% The conversion of betas to deltas is done if there are at least
184%% ?IO_BATCH_SIZE betas in q2 & q3. This value should not be too small,
185%% otherwise the frequent operations on the queues of q2 and q3 will not be
186%% effectively amortised (switching the direction of queue access defeats
187%% amortisation). Note that there is a natural upper bound due to credit_flow
188%% limits on the alpha to beta conversion.
189%%
190%% The conversion from alphas to betas is chunked due to the
191%% credit_flow limits of the msg_store. This further smooths the
192%% effects of changes to the target_ram_count and ensures the queue
193%% remains responsive even when there is a large amount of IO work to
194%% do. The 'resume' callback is utilised to ensure that conversions
195%% are done as promptly as possible whilst ensuring the queue remains
196%% responsive.
197%%
198%% In the queue we keep track of both messages that are pending
199%% delivery and messages that are pending acks. In the event of a
200%% queue purge, we only need to load qi segments if the queue has
201%% elements in deltas (i.e. it came under significant memory
202%% pressure). In the event of a queue deletion, in addition to the
203%% preceding, by keeping track of pending acks in RAM, we do not need
204%% to search through qi segments looking for messages that are yet to
205%% be acknowledged.
206%%
207%% Pending acks are recorded in memory by storing the message itself.
208%% If the message has been sent to disk, we do not store the message
209%% content. During memory reduction, pending acks containing message
210%% content have that content removed and the corresponding messages
211%% are pushed out to disk.
212%%
213%% Messages from pending acks are returned to q4, q3 and delta during
214%% requeue, based on the limits of seq_id contained in each. Requeued
215%% messages retain their original seq_id, maintaining order
216%% when requeued.
217%%
218%% The order in which alphas are pushed to betas and pending acks
219%% are pushed to disk is determined dynamically. We always prefer to
220%% push messages for the source (alphas or acks) that is growing the
221%% fastest (with growth measured as avg. ingress - avg. egress).
222%%
223%% Notes on Clean Shutdown
224%% (This documents behaviour in variable_queue, queue_index and
225%% msg_store.)
226%%
227%% In order to try to achieve as fast a start-up as possible, if a
228%% clean shutdown occurs, we try to save out state to disk to reduce
229%% work on startup. In the msg_store this takes the form of the
230%% index_module's state, plus the file_summary ets table, and client
231%% refs. In the VQ, this takes the form of the count of persistent
232%% messages in the queue and references into the msg_stores. The
233%% queue_index adds to these terms the details of its segments and
234%% stores the terms in the queue directory.
235%%
236%% Two message stores are used. One is created for persistent messages
237%% to durable queues that must survive restarts, and the other is used
238%% for all other messages that just happen to need to be written to
239%% disk. On start up we can therefore nuke the transient message
240%% store, and be sure that the messages in the persistent store are
241%% all that we need.
242%%
243%% The references to the msg_stores are there so that the msg_store
244%% knows to only trust its saved state if all of the queues it was
245%% previously talking to come up cleanly. Likewise, the queues
246%% themselves (esp queue_index) skips work in init if all the queues
247%% and msg_store were shutdown cleanly. This gives both good speed
248%% improvements and also robustness so that if anything possibly went
249%% wrong in shutdown (or there was subsequent manual tampering), all
250%% messages and queues that can be recovered are recovered, safely.
251%%
252%% To delete transient messages lazily, the variable_queue, on
253%% startup, stores the next_seq_id reported by the queue_index as the
254%% transient_threshold. From that point on, whenever it's reading a
255%% message off disk via the queue_index, if the seq_id is below this
256%% threshold and the message is transient then it drops the message
257%% (the message itself won't exist on disk because it would have been
258%% stored in the transient msg_store which would have had its saved
259%% state nuked on startup). This avoids the expensive operation of
260%% scanning the entire queue on startup in order to delete transient
261%% messages that were only pushed to disk to save memory.
262%%
263%%----------------------------------------------------------------------------
264
265-behaviour(rabbit_backing_queue).
266
267-record(vqstate,
268        { q1,
269          q2,
270          delta,
271          q3,
272          q4,
273          next_seq_id,
274          ram_pending_ack,    %% msgs using store, still in RAM
275          disk_pending_ack,   %% msgs in store, paged out
276          qi_pending_ack,     %% msgs using qi, *can't* be paged out
277          index_state,
278          msg_store_clients,
279          durable,
280          transient_threshold,
281          qi_embed_msgs_below,
282
283          len,                %% w/o unacked
284          bytes,              %% w/o unacked
285          unacked_bytes,
286          persistent_count,   %% w   unacked
287          persistent_bytes,   %% w   unacked
288          delta_transient_bytes,        %%
289
290          target_ram_count,
291          ram_msg_count,      %% w/o unacked
292          ram_msg_count_prev,
293          ram_ack_count_prev,
294          ram_bytes,          %% w   unacked
295          out_counter,
296          in_counter,
297          rates,
298          msgs_on_disk,
299          msg_indices_on_disk,
300          unconfirmed,
301          confirmed,
302          ack_out_counter,
303          ack_in_counter,
304          %% Unlike the other counters these two do not feed into
305          %% #rates{} and get reset
306          disk_read_count,
307          disk_write_count,
308
309          io_batch_size,
310
311          %% default queue or lazy queue
312          mode,
313          %% number of reduce_memory_usage executions, once it
314          %% reaches a threshold the queue will manually trigger a runtime GC
315          %% see: maybe_execute_gc/1
316          memory_reduction_run_count,
317          %% Queue data is grouped by VHost. We need to store it
318          %% to work with queue index.
319          virtual_host,
320          waiting_bump = false
321        }).
322
323-record(rates, { in, out, ack_in, ack_out, timestamp }).
324
325-record(msg_status,
326        { seq_id,
327          msg_id,
328          msg,
329          is_persistent,
330          is_delivered,
331          msg_in_store,
332          index_on_disk,
333          persist_to,
334          msg_props
335        }).
336
337-record(delta,
338        { start_seq_id, %% start_seq_id is inclusive
339          count,
340          transient,
341          end_seq_id    %% end_seq_id is exclusive
342        }).
343
344-define(HEADER_GUESS_SIZE, 100). %% see determine_persist_to/2
345-define(PERSISTENT_MSG_STORE, msg_store_persistent).
346-define(TRANSIENT_MSG_STORE,  msg_store_transient).
347
348-define(QUEUE, lqueue).
349
350-include_lib("rabbit_common/include/rabbit.hrl").
351-include_lib("rabbit_common/include/rabbit_framing.hrl").
352-include("amqqueue.hrl").
353
354%%----------------------------------------------------------------------------
355
356-rabbit_upgrade({multiple_routing_keys, local, []}).
357-rabbit_upgrade({move_messages_to_vhost_store, message_store, []}).
358
359-type seq_id()  :: non_neg_integer().
360
361-type rates() :: #rates { in        :: float(),
362                          out       :: float(),
363                          ack_in    :: float(),
364                          ack_out   :: float(),
365                          timestamp :: rabbit_types:timestamp()}.
366
367-type delta() :: #delta { start_seq_id :: non_neg_integer(),
368                          count        :: non_neg_integer(),
369                          end_seq_id   :: non_neg_integer() }.
370
371%% The compiler (rightfully) complains that ack() and state() are
372%% unused. For this reason we duplicate a -spec from
373%% rabbit_backing_queue with the only intent being to remove
374%% warnings. The problem here is that we can't parameterise the BQ
375%% behaviour by these two types as we would like to. We still leave
376%% these here for documentation purposes.
377-type ack() :: seq_id().
378-type state() :: #vqstate {
379             q1                    :: ?QUEUE:?QUEUE(),
380             q2                    :: ?QUEUE:?QUEUE(),
381             delta                 :: delta(),
382             q3                    :: ?QUEUE:?QUEUE(),
383             q4                    :: ?QUEUE:?QUEUE(),
384             next_seq_id           :: seq_id(),
385             ram_pending_ack       :: gb_trees:tree(),
386             disk_pending_ack      :: gb_trees:tree(),
387             qi_pending_ack        :: gb_trees:tree(),
388             index_state           :: any(),
389             msg_store_clients     :: 'undefined' | {{any(), binary()},
390                                                    {any(), binary()}},
391             durable               :: boolean(),
392             transient_threshold   :: non_neg_integer(),
393             qi_embed_msgs_below   :: non_neg_integer(),
394
395             len                   :: non_neg_integer(),
396             bytes                 :: non_neg_integer(),
397             unacked_bytes         :: non_neg_integer(),
398
399             persistent_count      :: non_neg_integer(),
400             persistent_bytes      :: non_neg_integer(),
401
402             target_ram_count      :: non_neg_integer() | 'infinity',
403             ram_msg_count         :: non_neg_integer(),
404             ram_msg_count_prev    :: non_neg_integer(),
405             ram_ack_count_prev    :: non_neg_integer(),
406             ram_bytes             :: non_neg_integer(),
407             out_counter           :: non_neg_integer(),
408             in_counter            :: non_neg_integer(),
409             rates                 :: rates(),
410             msgs_on_disk          :: gb_sets:set(),
411             msg_indices_on_disk   :: gb_sets:set(),
412             unconfirmed           :: gb_sets:set(),
413             confirmed             :: gb_sets:set(),
414             ack_out_counter       :: non_neg_integer(),
415             ack_in_counter        :: non_neg_integer(),
416             disk_read_count       :: non_neg_integer(),
417             disk_write_count      :: non_neg_integer(),
418
419             io_batch_size         :: pos_integer(),
420             mode                  :: 'default' | 'lazy',
421             memory_reduction_run_count :: non_neg_integer()}.
422
423-define(BLANK_DELTA, #delta { start_seq_id = undefined,
424                              count        = 0,
425                              transient    = 0,
426                              end_seq_id   = undefined }).
427-define(BLANK_DELTA_PATTERN(Z), #delta { start_seq_id = Z,
428                                         count        = 0,
429                                         transient    = 0,
430                                         end_seq_id   = Z }).
431
432-define(MICROS_PER_SECOND, 1000000.0).
433
434%% We're sampling every 5s for RAM duration; a half life that is of
435%% the same order of magnitude is probably about right.
436-define(RATE_AVG_HALF_LIFE, 5.0).
437
438%% We will recalculate the #rates{} every time we get asked for our
439%% RAM duration, or every N messages published, whichever is
440%% sooner. We do this since the priority calculations in
441%% rabbit_amqqueue_process need fairly fresh rates.
442-define(MSGS_PER_RATE_CALC, 100).
443
444%% we define the garbage collector threshold
445%% it needs to tune the `reduce_memory_use` calls. Thus, the garbage collection.
446%% see: rabbitmq-server-973 and rabbitmq-server-964
447-define(DEFAULT_EXPLICIT_GC_RUN_OP_THRESHOLD, 1000).
448-define(EXPLICIT_GC_RUN_OP_THRESHOLD(Mode),
449    case get(explicit_gc_run_operation_threshold) of
450        undefined ->
451            Val = explicit_gc_run_operation_threshold_for_mode(Mode),
452            put(explicit_gc_run_operation_threshold, Val),
453            Val;
454        Val       -> Val
455    end).
456
457explicit_gc_run_operation_threshold_for_mode(Mode) ->
458    {Key, Fallback} = case Mode of
459        lazy -> {lazy_queue_explicit_gc_run_operation_threshold,
460                 ?DEFAULT_EXPLICIT_GC_RUN_OP_THRESHOLD};
461        _    -> {queue_explicit_gc_run_operation_threshold,
462                 ?DEFAULT_EXPLICIT_GC_RUN_OP_THRESHOLD}
463    end,
464    rabbit_misc:get_env(rabbit, Key, Fallback).
465
466%%----------------------------------------------------------------------------
467%% Public API
468%%----------------------------------------------------------------------------
469
470start(VHost, DurableQueues) ->
471    {AllTerms, StartFunState} = rabbit_queue_index:start(VHost, DurableQueues),
472    %% Group recovery terms by vhost.
473    ClientRefs = [Ref || Terms <- AllTerms,
474                      Terms /= non_clean_shutdown,
475                      begin
476                          Ref = proplists:get_value(persistent_ref, Terms),
477                          Ref =/= undefined
478                      end],
479    start_msg_store(VHost, ClientRefs, StartFunState),
480    {ok, AllTerms}.
481
482stop(VHost) ->
483    ok = stop_msg_store(VHost),
484    ok = rabbit_queue_index:stop(VHost).
485
486start_msg_store(VHost, Refs, StartFunState) when is_list(Refs); Refs == undefined ->
487    rabbit_log:info("Starting message stores for vhost '~s'", [VHost]),
488    do_start_msg_store(VHost, ?TRANSIENT_MSG_STORE, undefined, ?EMPTY_START_FUN_STATE),
489    do_start_msg_store(VHost, ?PERSISTENT_MSG_STORE, Refs, StartFunState),
490    ok.
491
492do_start_msg_store(VHost, Type, Refs, StartFunState) ->
493    case rabbit_vhost_msg_store:start(VHost, Type, Refs, StartFunState) of
494        {ok, _} ->
495            rabbit_log:info("Started message store of type ~s for vhost '~s'", [abbreviated_type(Type), VHost]);
496        {error, {no_such_vhost, VHost}} = Err ->
497            rabbit_log:error("Failed to start message store of type ~s for vhost '~s': the vhost no longer exists!",
498                             [Type, VHost]),
499            exit(Err);
500        {error, Error} ->
501            rabbit_log:error("Failed to start message store of type ~s for vhost '~s': ~p",
502                             [Type, VHost, Error]),
503            exit({error, Error})
504    end.
505
506abbreviated_type(?TRANSIENT_MSG_STORE)  -> transient;
507abbreviated_type(?PERSISTENT_MSG_STORE) -> persistent.
508
509stop_msg_store(VHost) ->
510    rabbit_vhost_msg_store:stop(VHost, ?TRANSIENT_MSG_STORE),
511    rabbit_vhost_msg_store:stop(VHost, ?PERSISTENT_MSG_STORE),
512    ok.
513
514init(Queue, Recover, Callback) ->
515    init(
516      Queue, Recover, Callback,
517      fun (MsgIds, ActionTaken) ->
518              msgs_written_to_disk(Callback, MsgIds, ActionTaken)
519      end,
520      fun (MsgIds) -> msg_indices_written_to_disk(Callback, MsgIds) end,
521      fun (MsgIds) -> msgs_and_indices_written_to_disk(Callback, MsgIds) end).
522
523init(Q, new, AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) when ?is_amqqueue(Q) ->
524    QueueName = amqqueue:get_name(Q),
525    IsDurable = amqqueue:is_durable(Q),
526    IndexState = rabbit_queue_index:init(QueueName,
527                                         MsgIdxOnDiskFun, MsgAndIdxOnDiskFun),
528    VHost = QueueName#resource.virtual_host,
529    init(IsDurable, IndexState, 0, 0, [],
530         case IsDurable of
531             true  -> msg_store_client_init(?PERSISTENT_MSG_STORE,
532                                            MsgOnDiskFun, AsyncCallback, VHost);
533             false -> undefined
534         end,
535         msg_store_client_init(?TRANSIENT_MSG_STORE, undefined,
536                               AsyncCallback, VHost), VHost);
537
538%% We can be recovering a transient queue if it crashed
539init(Q, Terms, AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) when ?is_amqqueue(Q) ->
540    QueueName = amqqueue:get_name(Q),
541    IsDurable = amqqueue:is_durable(Q),
542    {PRef, RecoveryTerms} = process_recovery_terms(Terms),
543    VHost = QueueName#resource.virtual_host,
544    {PersistentClient, ContainsCheckFun} =
545        case IsDurable of
546            true  -> C = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef,
547                                               MsgOnDiskFun, AsyncCallback,
548                                               VHost),
549                     {C, fun (MsgId) when is_binary(MsgId) ->
550                                 rabbit_msg_store:contains(MsgId, C);
551                             (#basic_message{is_persistent = Persistent}) ->
552                                 Persistent
553                         end};
554            false -> {undefined, fun(_MsgId) -> false end}
555        end,
556    TransientClient  = msg_store_client_init(?TRANSIENT_MSG_STORE,
557                                             undefined, AsyncCallback,
558                                             VHost),
559    {DeltaCount, DeltaBytes, IndexState} =
560        rabbit_queue_index:recover(
561          QueueName, RecoveryTerms,
562          rabbit_vhost_msg_store:successfully_recovered_state(
563              VHost,
564              ?PERSISTENT_MSG_STORE),
565          ContainsCheckFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun),
566    init(IsDurable, IndexState, DeltaCount, DeltaBytes, RecoveryTerms,
567         PersistentClient, TransientClient, VHost).
568
569process_recovery_terms(Terms=non_clean_shutdown) ->
570    {rabbit_guid:gen(), Terms};
571process_recovery_terms(Terms) ->
572    case proplists:get_value(persistent_ref, Terms) of
573        undefined -> {rabbit_guid:gen(), []};
574        PRef      -> {PRef, Terms}
575    end.
576
577terminate(_Reason, State) ->
578    State1 = #vqstate { virtual_host      = VHost,
579                        persistent_count  = PCount,
580                        persistent_bytes  = PBytes,
581                        index_state       = IndexState,
582                        msg_store_clients = {MSCStateP, MSCStateT} } =
583        purge_pending_ack(true, State),
584    PRef = case MSCStateP of
585               undefined -> undefined;
586               _         -> ok = maybe_client_terminate(MSCStateP),
587                            rabbit_msg_store:client_ref(MSCStateP)
588           end,
589    ok = rabbit_msg_store:client_delete_and_terminate(MSCStateT),
590    Terms = [{persistent_ref,   PRef},
591             {persistent_count, PCount},
592             {persistent_bytes, PBytes}],
593    a(State1#vqstate {
594        index_state = rabbit_queue_index:terminate(VHost, Terms, IndexState),
595        msg_store_clients = undefined }).
596
597%% the only difference between purge and delete is that delete also
598%% needs to delete everything that's been delivered and not ack'd.
599delete_and_terminate(_Reason, State) ->
600    %% Normally when we purge messages we interact with the qi by
601    %% issues delivers and acks for every purged message. In this case
602    %% we don't need to do that, so we just delete the qi.
603    State1 = purge_and_index_reset(State),
604    State2 = #vqstate { msg_store_clients = {MSCStateP, MSCStateT} } =
605        purge_pending_ack_delete_and_terminate(State1),
606    case MSCStateP of
607        undefined -> ok;
608        _         -> rabbit_msg_store:client_delete_and_terminate(MSCStateP)
609    end,
610    rabbit_msg_store:client_delete_and_terminate(MSCStateT),
611    a(State2 #vqstate { msg_store_clients = undefined }).
612
613delete_crashed(Q) when ?is_amqqueue(Q) ->
614    QName = amqqueue:get_name(Q),
615    ok = rabbit_queue_index:erase(QName).
616
617purge(State = #vqstate { len = Len }) ->
618    case is_pending_ack_empty(State) and is_unconfirmed_empty(State) of
619        true ->
620            {Len, purge_and_index_reset(State)};
621        false ->
622            {Len, purge_when_pending_acks(State)}
623    end.
624
625purge_acks(State) -> a(purge_pending_ack(false, State)).
626
627publish(Msg, MsgProps, IsDelivered, ChPid, Flow, State) ->
628    State1 =
629        publish1(Msg, MsgProps, IsDelivered, ChPid, Flow,
630                 fun maybe_write_to_disk/4,
631                 State),
632    a(maybe_reduce_memory_use(maybe_update_rates(State1))).
633
634batch_publish(Publishes, ChPid, Flow, State) ->
635    {ChPid, Flow, State1} =
636        lists:foldl(fun batch_publish1/2, {ChPid, Flow, State}, Publishes),
637    State2 = ui(State1),
638    a(maybe_reduce_memory_use(maybe_update_rates(State2))).
639
640publish_delivered(Msg, MsgProps, ChPid, Flow, State) ->
641    {SeqId, State1} =
642        publish_delivered1(Msg, MsgProps, ChPid, Flow,
643                           fun maybe_write_to_disk/4,
644                           State),
645    {SeqId, a(maybe_reduce_memory_use(maybe_update_rates(State1)))}.
646
647batch_publish_delivered(Publishes, ChPid, Flow, State) ->
648    {ChPid, Flow, SeqIds, State1} =
649        lists:foldl(fun batch_publish_delivered1/2,
650                    {ChPid, Flow, [], State}, Publishes),
651    State2 = ui(State1),
652    {lists:reverse(SeqIds), a(maybe_reduce_memory_use(maybe_update_rates(State2)))}.
653
654discard(_MsgId, _ChPid, _Flow, State) -> State.
655
656drain_confirmed(State = #vqstate { confirmed = C }) ->
657    case gb_sets:is_empty(C) of
658        true  -> {[], State}; %% common case
659        false -> {gb_sets:to_list(C), State #vqstate {
660                                        confirmed = gb_sets:new() }}
661    end.
662
663dropwhile(Pred, State) ->
664    {MsgProps, State1} =
665        remove_by_predicate(Pred, State),
666    {MsgProps, a(State1)}.
667
668fetchwhile(Pred, Fun, Acc, State) ->
669    {MsgProps, Acc1, State1} =
670         fetch_by_predicate(Pred, Fun, Acc, State),
671    {MsgProps, Acc1, a(State1)}.
672
673fetch(AckRequired, State) ->
674    case queue_out(State) of
675        {empty, State1} ->
676            {empty, a(State1)};
677        {{value, MsgStatus}, State1} ->
678            %% it is possible that the message wasn't read from disk
679            %% at this point, so read it in.
680            {Msg, State2} = read_msg(MsgStatus, State1),
681            {AckTag, State3} = remove(AckRequired, MsgStatus, State2),
682            {{Msg, MsgStatus#msg_status.is_delivered, AckTag}, a(State3)}
683    end.
684
685drop(AckRequired, State) ->
686    case queue_out(State) of
687        {empty, State1} ->
688            {empty, a(State1)};
689        {{value, MsgStatus}, State1} ->
690            {AckTag, State2} = remove(AckRequired, MsgStatus, State1),
691            {{MsgStatus#msg_status.msg_id, AckTag}, a(State2)}
692    end.
693
694%% Duplicated from rabbit_backing_queue
695-spec ack([ack()], state()) -> {[rabbit_guid:guid()], state()}.
696
697ack([], State) ->
698    {[], State};
699%% optimisation: this head is essentially a partial evaluation of the
700%% general case below, for the single-ack case.
701ack([SeqId], State) ->
702    case remove_pending_ack(true, SeqId, State) of
703        {none, _} ->
704            {[], State};
705        {#msg_status { msg_id        = MsgId,
706                       is_persistent = IsPersistent,
707                       msg_in_store  = MsgInStore,
708                       index_on_disk = IndexOnDisk },
709         State1 = #vqstate { index_state       = IndexState,
710                             msg_store_clients = MSCState,
711                             ack_out_counter   = AckOutCount }} ->
712            IndexState1 = case IndexOnDisk of
713                              true  -> rabbit_queue_index:ack([SeqId], IndexState);
714                              false -> IndexState
715                          end,
716            case MsgInStore of
717                true  -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]);
718                false -> ok
719            end,
720            {[MsgId],
721             a(State1 #vqstate { index_state      = IndexState1,
722                                 ack_out_counter  = AckOutCount + 1 })}
723    end;
724ack(AckTags, State) ->
725    {{IndexOnDiskSeqIds, MsgIdsByStore, AllMsgIds},
726     State1 = #vqstate { index_state       = IndexState,
727                         msg_store_clients = MSCState,
728                         ack_out_counter   = AckOutCount }} =
729        lists:foldl(
730          fun (SeqId, {Acc, State2}) ->
731                  case remove_pending_ack(true, SeqId, State2) of
732                      {none, _} ->
733                          {Acc, State2};
734                      {MsgStatus, State3} ->
735                          {accumulate_ack(MsgStatus, Acc), State3}
736                  end
737          end, {accumulate_ack_init(), State}, AckTags),
738    IndexState1 = rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState),
739    remove_msgs_by_id(MsgIdsByStore, MSCState),
740    {lists:reverse(AllMsgIds),
741     a(State1 #vqstate { index_state      = IndexState1,
742                         ack_out_counter  = AckOutCount + length(AckTags) })}.
743
744requeue(AckTags, #vqstate { mode       = default,
745                            delta      = Delta,
746                            q3         = Q3,
747                            q4         = Q4,
748                            in_counter = InCounter,
749                            len        = Len } = State) ->
750    {SeqIds,  Q4a, MsgIds,  State1} = queue_merge(lists:sort(AckTags), Q4, [],
751                                                  beta_limit(Q3),
752                                                  fun publish_alpha/2, State),
753    {SeqIds1, Q3a, MsgIds1, State2} = queue_merge(SeqIds, Q3, MsgIds,
754                                                  delta_limit(Delta),
755                                                  fun publish_beta/2, State1),
756    {Delta1, MsgIds2, State3}       = delta_merge(SeqIds1, Delta, MsgIds1,
757                                                  State2),
758    MsgCount = length(MsgIds2),
759    {MsgIds2, a(maybe_reduce_memory_use(
760                  maybe_update_rates(ui(
761                    State3 #vqstate { delta      = Delta1,
762                                      q3         = Q3a,
763                                      q4         = Q4a,
764                                      in_counter = InCounter + MsgCount,
765                                      len        = Len + MsgCount }))))};
766requeue(AckTags, #vqstate { mode       = lazy,
767                            delta      = Delta,
768                            q3         = Q3,
769                            in_counter = InCounter,
770                            len        = Len } = State) ->
771    {SeqIds, Q3a, MsgIds, State1} = queue_merge(lists:sort(AckTags), Q3, [],
772                                                delta_limit(Delta),
773                                                fun publish_beta/2, State),
774    {Delta1, MsgIds1, State2}     = delta_merge(SeqIds, Delta, MsgIds,
775                                                State1),
776    MsgCount = length(MsgIds1),
777    {MsgIds1, a(maybe_reduce_memory_use(
778                  maybe_update_rates(ui(
779                    State2 #vqstate { delta      = Delta1,
780                                      q3         = Q3a,
781                                      in_counter = InCounter + MsgCount,
782                                      len        = Len + MsgCount }))))}.
783
784ackfold(MsgFun, Acc, State, AckTags) ->
785    {AccN, StateN} =
786        lists:foldl(fun(SeqId, {Acc0, State0}) ->
787                            MsgStatus = lookup_pending_ack(SeqId, State0),
788                            {Msg, State1} = read_msg(MsgStatus, State0),
789                            {MsgFun(Msg, SeqId, Acc0), State1}
790                    end, {Acc, State}, AckTags),
791    {AccN, a(StateN)}.
792
793fold(Fun, Acc, State = #vqstate{index_state = IndexState}) ->
794    {Its, IndexState1} = lists:foldl(fun inext/2, {[], IndexState},
795                                     [msg_iterator(State),
796                                      disk_ack_iterator(State),
797                                      ram_ack_iterator(State),
798                                      qi_ack_iterator(State)]),
799    ifold(Fun, Acc, Its, State#vqstate{index_state = IndexState1}).
800
801len(#vqstate { len = Len }) -> Len.
802
803is_empty(State) -> 0 == len(State).
804
805depth(State) ->
806    len(State) + count_pending_acks(State).
807
808set_ram_duration_target(
809  DurationTarget, State = #vqstate {
810                    rates = #rates { in      = AvgIngressRate,
811                                     out     = AvgEgressRate,
812                                     ack_in  = AvgAckIngressRate,
813                                     ack_out = AvgAckEgressRate },
814                    target_ram_count = TargetRamCount }) ->
815    Rate =
816        AvgEgressRate + AvgIngressRate + AvgAckEgressRate + AvgAckIngressRate,
817    TargetRamCount1 =
818        case DurationTarget of
819            infinity  -> infinity;
820            _         -> trunc(DurationTarget * Rate) %% msgs = sec * msgs/sec
821        end,
822    State1 = State #vqstate { target_ram_count = TargetRamCount1 },
823    a(case TargetRamCount1 == infinity orelse
824          (TargetRamCount =/= infinity andalso
825           TargetRamCount1 >= TargetRamCount) of
826          true  -> State1;
827          false -> reduce_memory_use(State1)
828      end).
829
830maybe_update_rates(State = #vqstate{ in_counter  = InCount,
831                                     out_counter = OutCount })
832  when InCount + OutCount > ?MSGS_PER_RATE_CALC ->
833    update_rates(State);
834maybe_update_rates(State) ->
835    State.
836
837update_rates(State = #vqstate{ in_counter      =     InCount,
838                               out_counter     =    OutCount,
839                               ack_in_counter  =  AckInCount,
840                               ack_out_counter = AckOutCount,
841                               rates = #rates{ in        =     InRate,
842                                               out       =    OutRate,
843                                               ack_in    =  AckInRate,
844                                               ack_out   = AckOutRate,
845                                               timestamp = TS }}) ->
846    Now = erlang:monotonic_time(),
847
848    Rates = #rates { in        = update_rate(Now, TS,     InCount,     InRate),
849                     out       = update_rate(Now, TS,    OutCount,    OutRate),
850                     ack_in    = update_rate(Now, TS,  AckInCount,  AckInRate),
851                     ack_out   = update_rate(Now, TS, AckOutCount, AckOutRate),
852                     timestamp = Now },
853
854    State#vqstate{ in_counter      = 0,
855                   out_counter     = 0,
856                   ack_in_counter  = 0,
857                   ack_out_counter = 0,
858                   rates           = Rates }.
859
860update_rate(Now, TS, Count, Rate) ->
861    Time = erlang:convert_time_unit(Now - TS, native, micro_seconds) /
862        ?MICROS_PER_SECOND,
863    if
864        Time == 0 -> Rate;
865        true      -> rabbit_misc:moving_average(Time, ?RATE_AVG_HALF_LIFE,
866                                                Count / Time, Rate)
867    end.
868
869ram_duration(State) ->
870    State1 = #vqstate { rates = #rates { in      = AvgIngressRate,
871                                         out     = AvgEgressRate,
872                                         ack_in  = AvgAckIngressRate,
873                                         ack_out = AvgAckEgressRate },
874                        ram_msg_count      = RamMsgCount,
875                        ram_msg_count_prev = RamMsgCountPrev,
876                        ram_pending_ack    = RPA,
877                        qi_pending_ack     = QPA,
878                        ram_ack_count_prev = RamAckCountPrev } =
879        update_rates(State),
880
881    RamAckCount = gb_trees:size(RPA) + gb_trees:size(QPA),
882
883    Duration = %% msgs+acks / (msgs+acks/sec) == sec
884        case lists:all(fun (X) -> X < 0.01 end,
885                       [AvgEgressRate, AvgIngressRate,
886                        AvgAckEgressRate, AvgAckIngressRate]) of
887            true  -> infinity;
888            false -> (RamMsgCountPrev + RamMsgCount +
889                          RamAckCount + RamAckCountPrev) /
890                         (4 * (AvgEgressRate + AvgIngressRate +
891                                   AvgAckEgressRate + AvgAckIngressRate))
892        end,
893
894    {Duration, State1}.
895
896needs_timeout(#vqstate { index_state = IndexState }) ->
897    case rabbit_queue_index:needs_sync(IndexState) of
898        confirms -> timed;
899        other    -> idle;
900        false    -> false
901    end.
902
903timeout(State = #vqstate { index_state = IndexState }) ->
904    State #vqstate { index_state = rabbit_queue_index:sync(IndexState) }.
905
906handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
907    State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }.
908
909handle_info(bump_reduce_memory_use, State = #vqstate{ waiting_bump = true }) ->
910    State#vqstate{ waiting_bump = false };
911handle_info(bump_reduce_memory_use, State) ->
912    State.
913
914resume(State) -> a(reduce_memory_use(State)).
915
916msg_rates(#vqstate { rates = #rates { in  = AvgIngressRate,
917                                      out = AvgEgressRate } }) ->
918    {AvgIngressRate, AvgEgressRate}.
919
920info(messages_ready_ram, #vqstate{ram_msg_count = RamMsgCount}) ->
921    RamMsgCount;
922info(messages_unacknowledged_ram, #vqstate{ram_pending_ack = RPA,
923                                           qi_pending_ack  = QPA}) ->
924    gb_trees:size(RPA) + gb_trees:size(QPA);
925info(messages_ram, State) ->
926    info(messages_ready_ram, State) + info(messages_unacknowledged_ram, State);
927info(messages_persistent, #vqstate{persistent_count = PersistentCount}) ->
928    PersistentCount;
929info(messages_paged_out, #vqstate{delta = #delta{transient = Count}}) ->
930    Count;
931info(message_bytes, #vqstate{bytes         = Bytes,
932                             unacked_bytes = UBytes}) ->
933    Bytes + UBytes;
934info(message_bytes_ready, #vqstate{bytes = Bytes}) ->
935    Bytes;
936info(message_bytes_unacknowledged, #vqstate{unacked_bytes = UBytes}) ->
937    UBytes;
938info(message_bytes_ram, #vqstate{ram_bytes = RamBytes}) ->
939    RamBytes;
940info(message_bytes_persistent, #vqstate{persistent_bytes = PersistentBytes}) ->
941    PersistentBytes;
942info(message_bytes_paged_out, #vqstate{delta_transient_bytes = PagedOutBytes}) ->
943    PagedOutBytes;
944info(head_message_timestamp, #vqstate{
945          q3               = Q3,
946          q4               = Q4,
947          ram_pending_ack  = RPA,
948          qi_pending_ack   = QPA}) ->
949          head_message_timestamp(Q3, Q4, RPA, QPA);
950info(disk_reads, #vqstate{disk_read_count = Count}) ->
951    Count;
952info(disk_writes, #vqstate{disk_write_count = Count}) ->
953    Count;
954info(backing_queue_status, #vqstate {
955          q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
956          mode             = Mode,
957          len              = Len,
958          target_ram_count = TargetRamCount,
959          next_seq_id      = NextSeqId,
960          rates            = #rates { in      = AvgIngressRate,
961                                      out     = AvgEgressRate,
962                                      ack_in  = AvgAckIngressRate,
963                                      ack_out = AvgAckEgressRate }}) ->
964
965    [ {mode                , Mode},
966      {q1                  , ?QUEUE:len(Q1)},
967      {q2                  , ?QUEUE:len(Q2)},
968      {delta               , Delta},
969      {q3                  , ?QUEUE:len(Q3)},
970      {q4                  , ?QUEUE:len(Q4)},
971      {len                 , Len},
972      {target_ram_count    , TargetRamCount},
973      {next_seq_id         , NextSeqId},
974      {avg_ingress_rate    , AvgIngressRate},
975      {avg_egress_rate     , AvgEgressRate},
976      {avg_ack_ingress_rate, AvgAckIngressRate},
977      {avg_ack_egress_rate , AvgAckEgressRate} ];
978info(_, _) ->
979    ''.
980
981invoke(?MODULE, Fun, State) -> Fun(?MODULE, State);
982invoke(      _,   _, State) -> State.
983
984is_duplicate(_Msg, State) -> {false, State}.
985
986set_queue_mode(Mode, State = #vqstate { mode = Mode }) ->
987    State;
988set_queue_mode(lazy, State = #vqstate {
989                                target_ram_count = TargetRamCount }) ->
990    %% To become a lazy queue we need to page everything to disk first.
991    State1 = convert_to_lazy(State),
992    %% restore the original target_ram_count
993    a(State1 #vqstate { mode = lazy, target_ram_count = TargetRamCount });
994set_queue_mode(default, State) ->
995    %% becoming a default queue means loading messages from disk like
996    %% when a queue is recovered.
997    a(maybe_deltas_to_betas(State #vqstate { mode = default }));
998set_queue_mode(_, State) ->
999    State.
1000
1001zip_msgs_and_acks(Msgs, AckTags, Accumulator, _State) ->
1002    lists:foldl(fun ({{#basic_message{ id = Id }, _Props}, AckTag}, Acc) ->
1003                        [{Id, AckTag} | Acc]
1004                end, Accumulator, lists:zip(Msgs, AckTags)).
1005
1006convert_to_lazy(State) ->
1007    State1 = #vqstate { delta = Delta, q3 = Q3, len = Len } =
1008        set_ram_duration_target(0, State),
1009    case Delta#delta.count + ?QUEUE:len(Q3) == Len of
1010        true ->
1011            State1;
1012        false ->
1013            %% When pushing messages to disk, we might have been
1014            %% blocked by the msg_store, so we need to see if we have
1015            %% to wait for more credit, and then keep paging messages.
1016            %%
1017            %% The amqqueue_process could have taken care of this, but
1018            %% between the time it receives the bump_credit msg and
1019            %% calls BQ:resume to keep paging messages to disk, some
1020            %% other request may arrive to the BQ which at this moment
1021            %% is not in a proper state for a lazy BQ (unless all
1022            %% messages have been paged to disk already).
1023            wait_for_msg_store_credit(),
1024            convert_to_lazy(resume(State1))
1025    end.
1026
1027wait_for_msg_store_credit() ->
1028    case credit_flow:blocked() of
1029        true  -> receive
1030                     {bump_credit, Msg} ->
1031                         credit_flow:handle_bump_msg(Msg)
1032                 end;
1033        false -> ok
1034    end.
1035
1036%% Get the Timestamp property of the first msg, if present. This is
1037%% the one with the oldest timestamp among the heads of the pending
1038%% acks and unread queues.  We can't check disk_pending_acks as these
1039%% are paged out - we assume some will soon be paged in rather than
1040%% forcing it to happen.  Pending ack msgs are included as they are
1041%% regarded as unprocessed until acked, this also prevents the result
1042%% apparently oscillating during repeated rejects.  Q3 is only checked
1043%% when Q4 is empty as any Q4 msg will be earlier.
1044head_message_timestamp(Q3, Q4, RPA, QPA) ->
1045    HeadMsgs = [ HeadMsgStatus#msg_status.msg ||
1046                   HeadMsgStatus <-
1047                       [ get_qs_head([Q4, Q3]),
1048                         get_pa_head(RPA),
1049                         get_pa_head(QPA) ],
1050                   HeadMsgStatus /= undefined,
1051                   HeadMsgStatus#msg_status.msg /= undefined ],
1052
1053    Timestamps =
1054        [Timestamp || HeadMsg <- HeadMsgs,
1055                      Timestamp <- [rabbit_basic:extract_timestamp(
1056                                      HeadMsg#basic_message.content)],
1057                      Timestamp /= undefined
1058        ],
1059
1060    case Timestamps == [] of
1061        true -> '';
1062        false -> lists:min(Timestamps)
1063    end.
1064
1065get_qs_head(Qs) ->
1066    catch lists:foldl(
1067            fun (Q, Acc) ->
1068                    case get_q_head(Q) of
1069                        undefined -> Acc;
1070                        Val -> throw(Val)
1071                    end
1072            end, undefined, Qs).
1073
1074get_q_head(Q) ->
1075    get_collection_head(Q, fun ?QUEUE:is_empty/1, fun ?QUEUE:peek/1).
1076
1077get_pa_head(PA) ->
1078    get_collection_head(PA, fun gb_trees:is_empty/1, fun gb_trees:smallest/1).
1079
1080get_collection_head(Col, IsEmpty, GetVal) ->
1081    case IsEmpty(Col) of
1082        false ->
1083            {_, MsgStatus} = GetVal(Col),
1084            MsgStatus;
1085        true  -> undefined
1086    end.
1087
1088%%----------------------------------------------------------------------------
1089%% Minor helpers
1090%%----------------------------------------------------------------------------
1091a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
1092                     mode             = default,
1093                     len              = Len,
1094                     bytes            = Bytes,
1095                     unacked_bytes    = UnackedBytes,
1096                     persistent_count = PersistentCount,
1097                     persistent_bytes = PersistentBytes,
1098                     ram_msg_count    = RamMsgCount,
1099                     ram_bytes        = RamBytes}) ->
1100    E1 = ?QUEUE:is_empty(Q1),
1101    E2 = ?QUEUE:is_empty(Q2),
1102    ED = Delta#delta.count == 0,
1103    E3 = ?QUEUE:is_empty(Q3),
1104    E4 = ?QUEUE:is_empty(Q4),
1105    LZ = Len == 0,
1106
1107    %% if q1 has messages then q3 cannot be empty. See publish/6.
1108    true = E1 or not E3,
1109    %% if q2 has messages then we have messages in delta (paged to
1110    %% disk). See push_alphas_to_betas/2.
1111    true = E2 or not ED,
1112    %% if delta has messages then q3 cannot be empty. This is enforced
1113    %% by paging, where min([segment_entry_count(), len(q3)]) messages
1114    %% are always kept on RAM.
1115    true = ED or not E3,
1116    %% if the queue length is 0, then q3 and q4 must be empty.
1117    true = LZ == (E3 and E4),
1118
1119    true = Len             >= 0,
1120    true = Bytes           >= 0,
1121    true = UnackedBytes    >= 0,
1122    true = PersistentCount >= 0,
1123    true = PersistentBytes >= 0,
1124    true = RamMsgCount     >= 0,
1125    true = RamMsgCount     =< Len,
1126    true = RamBytes        >= 0,
1127    true = RamBytes        =< Bytes + UnackedBytes,
1128
1129    State;
1130a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
1131                     mode             = lazy,
1132                     len              = Len,
1133                     bytes            = Bytes,
1134                     unacked_bytes    = UnackedBytes,
1135                     persistent_count = PersistentCount,
1136                     persistent_bytes = PersistentBytes,
1137                     ram_msg_count    = RamMsgCount,
1138                     ram_bytes        = RamBytes}) ->
1139    E1 = ?QUEUE:is_empty(Q1),
1140    E2 = ?QUEUE:is_empty(Q2),
1141    ED = Delta#delta.count == 0,
1142    E3 = ?QUEUE:is_empty(Q3),
1143    E4 = ?QUEUE:is_empty(Q4),
1144    LZ = Len == 0,
1145    L3 = ?QUEUE:len(Q3),
1146
1147    %% q1 must always be empty, since q1 only gets messages during
1148    %% publish, but for lazy queues messages go straight to delta.
1149    true = E1,
1150
1151    %% q2 only gets messages from q1 when push_alphas_to_betas is
1152    %% called for a non empty delta, which won't be the case for a
1153    %% lazy queue. This means q2 must always be empty.
1154    true = E2,
1155
1156    %% q4 must always be empty, since q1 only gets messages during
1157    %% publish, but for lazy queues messages go straight to delta.
1158    true = E4,
1159
1160    %% if the queue is empty, then delta is empty and q3 is empty.
1161    true = LZ == (ED and E3),
1162
1163    %% There should be no messages in q1, q2, and q4
1164    true = Delta#delta.count + L3 == Len,
1165
1166    true = Len             >= 0,
1167    true = Bytes           >= 0,
1168    true = UnackedBytes    >= 0,
1169    true = PersistentCount >= 0,
1170    true = PersistentBytes >= 0,
1171    true = RamMsgCount     >= 0,
1172    true = RamMsgCount     =< Len,
1173    true = RamBytes        >= 0,
1174    true = RamBytes        =< Bytes + UnackedBytes,
1175
1176    State.
1177
1178d(Delta = #delta { start_seq_id = Start, count = Count, end_seq_id = End })
1179  when Start + Count =< End ->
1180    Delta.
1181
1182m(MsgStatus = #msg_status { is_persistent = IsPersistent,
1183                            msg_in_store  = MsgInStore,
1184                            index_on_disk = IndexOnDisk }) ->
1185    true = (not IsPersistent) or IndexOnDisk,
1186    true = msg_in_ram(MsgStatus) or MsgInStore,
1187    MsgStatus.
1188
1189one_if(true ) -> 1;
1190one_if(false) -> 0.
1191
1192cons_if(true,   E, L) -> [E | L];
1193cons_if(false, _E, L) -> L.
1194
1195gb_sets_maybe_insert(false, _Val, Set) -> Set;
1196gb_sets_maybe_insert(true,   Val, Set) -> gb_sets:add(Val, Set).
1197
1198msg_status(IsPersistent, IsDelivered, SeqId,
1199           Msg = #basic_message {id = MsgId}, MsgProps, IndexMaxSize) ->
1200    #msg_status{seq_id        = SeqId,
1201                msg_id        = MsgId,
1202                msg           = Msg,
1203                is_persistent = IsPersistent,
1204                is_delivered  = IsDelivered,
1205                msg_in_store  = false,
1206                index_on_disk = false,
1207                persist_to    = determine_persist_to(Msg, MsgProps, IndexMaxSize),
1208                msg_props     = MsgProps}.
1209
1210beta_msg_status({Msg = #basic_message{id = MsgId},
1211                 SeqId, MsgProps, IsPersistent, IsDelivered}) ->
1212    MS0 = beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered),
1213    MS0#msg_status{msg_id       = MsgId,
1214                   msg          = Msg,
1215                   persist_to   = queue_index,
1216                   msg_in_store = false};
1217
1218beta_msg_status({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered}) ->
1219    MS0 = beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered),
1220    MS0#msg_status{msg_id       = MsgId,
1221                   msg          = undefined,
1222                   persist_to   = msg_store,
1223                   msg_in_store = true}.
1224
1225beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered) ->
1226  #msg_status{seq_id        = SeqId,
1227              msg           = undefined,
1228              is_persistent = IsPersistent,
1229              is_delivered  = IsDelivered,
1230              index_on_disk = true,
1231              msg_props     = MsgProps}.
1232
1233trim_msg_status(MsgStatus) ->
1234    case persist_to(MsgStatus) of
1235        msg_store   -> MsgStatus#msg_status{msg = undefined};
1236        queue_index -> MsgStatus
1237    end.
1238
1239with_msg_store_state({MSCStateP, MSCStateT},  true, Fun) ->
1240    {Result, MSCStateP1} = Fun(MSCStateP),
1241    {Result, {MSCStateP1, MSCStateT}};
1242with_msg_store_state({MSCStateP, MSCStateT}, false, Fun) ->
1243    {Result, MSCStateT1} = Fun(MSCStateT),
1244    {Result, {MSCStateP, MSCStateT1}}.
1245
1246with_immutable_msg_store_state(MSCState, IsPersistent, Fun) ->
1247    {Res, MSCState} = with_msg_store_state(MSCState, IsPersistent,
1248                                           fun (MSCState1) ->
1249                                                   {Fun(MSCState1), MSCState1}
1250                                           end),
1251    Res.
1252
1253msg_store_client_init(MsgStore, MsgOnDiskFun, Callback, VHost) ->
1254    msg_store_client_init(MsgStore, rabbit_guid:gen(), MsgOnDiskFun,
1255                          Callback, VHost).
1256
1257msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback, VHost) ->
1258    CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE),
1259    rabbit_vhost_msg_store:client_init(VHost, MsgStore,
1260                                       Ref, MsgOnDiskFun,
1261                                       fun () ->
1262                                           Callback(?MODULE, CloseFDsFun)
1263                                       end).
1264
1265msg_store_write(MSCState, IsPersistent, MsgId, Msg) ->
1266    with_immutable_msg_store_state(
1267      MSCState, IsPersistent,
1268      fun (MSCState1) ->
1269              rabbit_msg_store:write_flow(MsgId, Msg, MSCState1)
1270      end).
1271
1272msg_store_read(MSCState, IsPersistent, MsgId) ->
1273    with_msg_store_state(
1274      MSCState, IsPersistent,
1275      fun (MSCState1) ->
1276              rabbit_msg_store:read(MsgId, MSCState1)
1277      end).
1278
1279msg_store_remove(MSCState, IsPersistent, MsgIds) ->
1280    with_immutable_msg_store_state(
1281      MSCState, IsPersistent,
1282      fun (MCSState1) ->
1283              rabbit_msg_store:remove(MsgIds, MCSState1)
1284      end).
1285
1286msg_store_close_fds(MSCState, IsPersistent) ->
1287    with_msg_store_state(
1288      MSCState, IsPersistent,
1289      fun (MSCState1) -> rabbit_msg_store:close_all_indicated(MSCState1) end).
1290
1291msg_store_close_fds_fun(IsPersistent) ->
1292    fun (?MODULE, State = #vqstate { msg_store_clients = MSCState }) ->
1293            {ok, MSCState1} = msg_store_close_fds(MSCState, IsPersistent),
1294            State #vqstate { msg_store_clients = MSCState1 }
1295    end.
1296
1297maybe_write_delivered(false, _SeqId, IndexState) ->
1298    IndexState;
1299maybe_write_delivered(true, SeqId, IndexState) ->
1300    rabbit_queue_index:deliver([SeqId], IndexState).
1301
1302betas_from_index_entries(List, TransientThreshold, DelsAndAcksFun, State) ->
1303    {Filtered, Delivers, Acks, RamReadyCount, RamBytes, TransientCount, TransientBytes} =
1304        lists:foldr(
1305          fun ({_MsgOrId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M,
1306               {Filtered1, Delivers1, Acks1, RRC, RB, TC, TB} = Acc) ->
1307                  case SeqId < TransientThreshold andalso not IsPersistent of
1308                      true  -> {Filtered1,
1309                                cons_if(not IsDelivered, SeqId, Delivers1),
1310                                [SeqId | Acks1], RRC, RB, TC, TB};
1311                      false -> MsgStatus = m(beta_msg_status(M)),
1312                               HaveMsg = msg_in_ram(MsgStatus),
1313                               Size = msg_size(MsgStatus),
1314                               case is_msg_in_pending_acks(SeqId, State) of
1315                                   false -> {?QUEUE:in_r(MsgStatus, Filtered1),
1316                                             Delivers1, Acks1,
1317                                             RRC + one_if(HaveMsg),
1318                                             RB + one_if(HaveMsg) * Size,
1319                                             TC + one_if(not IsPersistent),
1320                                             TB + one_if(not IsPersistent) * Size};
1321                                   true  -> Acc %% [0]
1322                               end
1323                  end
1324          end, {?QUEUE:new(), [], [], 0, 0, 0, 0}, List),
1325    {Filtered, RamReadyCount, RamBytes, DelsAndAcksFun(Delivers, Acks, State),
1326     TransientCount, TransientBytes}.
1327%% [0] We don't increase RamBytes here, even though it pertains to
1328%% unacked messages too, since if HaveMsg then the message must have
1329%% been stored in the QI, thus the message must have been in
1330%% qi_pending_ack, thus it must already have been in RAM.
1331
1332is_msg_in_pending_acks(SeqId, #vqstate { ram_pending_ack  = RPA,
1333                                         disk_pending_ack = DPA,
1334                                         qi_pending_ack   = QPA }) ->
1335    (gb_trees:is_defined(SeqId, RPA) orelse
1336     gb_trees:is_defined(SeqId, DPA) orelse
1337     gb_trees:is_defined(SeqId, QPA)).
1338
1339expand_delta(SeqId, ?BLANK_DELTA_PATTERN(X), IsPersistent) ->
1340    d(#delta { start_seq_id = SeqId, count = 1, end_seq_id = SeqId + 1,
1341               transient = one_if(not IsPersistent)});
1342expand_delta(SeqId, #delta { start_seq_id = StartSeqId,
1343                             count        = Count,
1344                             transient    = Transient } = Delta,
1345             IsPersistent )
1346  when SeqId < StartSeqId ->
1347    d(Delta #delta { start_seq_id = SeqId, count = Count + 1,
1348                     transient = Transient + one_if(not IsPersistent)});
1349expand_delta(SeqId, #delta { count        = Count,
1350                             end_seq_id   = EndSeqId,
1351                             transient    = Transient } = Delta,
1352             IsPersistent)
1353  when SeqId >= EndSeqId ->
1354    d(Delta #delta { count = Count + 1, end_seq_id = SeqId + 1,
1355                     transient = Transient + one_if(not IsPersistent)});
1356expand_delta(_SeqId, #delta { count       = Count,
1357                              transient   = Transient } = Delta,
1358             IsPersistent ) ->
1359    d(Delta #delta { count = Count + 1,
1360                     transient = Transient + one_if(not IsPersistent) }).
1361
1362%%----------------------------------------------------------------------------
1363%% Internal major helpers for Public API
1364%%----------------------------------------------------------------------------
1365
1366init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms,
1367     PersistentClient, TransientClient, VHost) ->
1368    {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState),
1369
1370    {DeltaCount1, DeltaBytes1} =
1371        case Terms of
1372            non_clean_shutdown -> {DeltaCount, DeltaBytes};
1373            _                  -> {proplists:get_value(persistent_count,
1374                                                       Terms, DeltaCount),
1375                                   proplists:get_value(persistent_bytes,
1376                                                       Terms, DeltaBytes)}
1377        end,
1378    Delta = case DeltaCount1 == 0 andalso DeltaCount /= undefined of
1379                true  -> ?BLANK_DELTA;
1380                false -> d(#delta { start_seq_id = LowSeqId,
1381                                    count        = DeltaCount1,
1382                                    transient    = 0,
1383                                    end_seq_id   = NextSeqId })
1384            end,
1385    Now = erlang:monotonic_time(),
1386    IoBatchSize = rabbit_misc:get_env(rabbit, msg_store_io_batch_size,
1387                                      ?IO_BATCH_SIZE),
1388
1389    {ok, IndexMaxSize} = application:get_env(
1390                           rabbit, queue_index_embed_msgs_below),
1391    State = #vqstate {
1392      q1                  = ?QUEUE:new(),
1393      q2                  = ?QUEUE:new(),
1394      delta               = Delta,
1395      q3                  = ?QUEUE:new(),
1396      q4                  = ?QUEUE:new(),
1397      next_seq_id         = NextSeqId,
1398      ram_pending_ack     = gb_trees:empty(),
1399      disk_pending_ack    = gb_trees:empty(),
1400      qi_pending_ack      = gb_trees:empty(),
1401      index_state         = IndexState1,
1402      msg_store_clients   = {PersistentClient, TransientClient},
1403      durable             = IsDurable,
1404      transient_threshold = NextSeqId,
1405      qi_embed_msgs_below = IndexMaxSize,
1406
1407      len                 = DeltaCount1,
1408      persistent_count    = DeltaCount1,
1409      bytes               = DeltaBytes1,
1410      persistent_bytes    = DeltaBytes1,
1411      delta_transient_bytes = 0,
1412
1413      target_ram_count    = infinity,
1414      ram_msg_count       = 0,
1415      ram_msg_count_prev  = 0,
1416      ram_ack_count_prev  = 0,
1417      ram_bytes           = 0,
1418      unacked_bytes       = 0,
1419      out_counter         = 0,
1420      in_counter          = 0,
1421      rates               = blank_rates(Now),
1422      msgs_on_disk        = gb_sets:new(),
1423      msg_indices_on_disk = gb_sets:new(),
1424      unconfirmed         = gb_sets:new(),
1425      confirmed           = gb_sets:new(),
1426      ack_out_counter     = 0,
1427      ack_in_counter      = 0,
1428      disk_read_count     = 0,
1429      disk_write_count    = 0,
1430
1431      io_batch_size       = IoBatchSize,
1432
1433      mode                = default,
1434      memory_reduction_run_count = 0,
1435      virtual_host        = VHost},
1436    a(maybe_deltas_to_betas(State)).
1437
1438blank_rates(Now) ->
1439    #rates { in        = 0.0,
1440             out       = 0.0,
1441             ack_in    = 0.0,
1442             ack_out   = 0.0,
1443             timestamp = Now}.
1444
1445in_r(MsgStatus = #msg_status { msg = undefined },
1446     State = #vqstate { mode = default, q3 = Q3, q4 = Q4 }) ->
1447    case ?QUEUE:is_empty(Q4) of
1448        true  -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) };
1449        false -> {Msg, State1 = #vqstate { q4 = Q4a }} =
1450                     read_msg(MsgStatus, State),
1451                 MsgStatus1 = MsgStatus#msg_status{msg = Msg},
1452                 stats(ready0, {MsgStatus, MsgStatus1}, 0,
1453                       State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus1, Q4a) })
1454    end;
1455in_r(MsgStatus,
1456     State = #vqstate { mode = default, q4 = Q4 }) ->
1457    State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) };
1458%% lazy queues
1459in_r(MsgStatus = #msg_status { seq_id = SeqId, is_persistent = IsPersistent },
1460     State = #vqstate { mode = lazy, q3 = Q3, delta = Delta}) ->
1461    case ?QUEUE:is_empty(Q3) of
1462        true  ->
1463            {_MsgStatus1, State1} =
1464                maybe_write_to_disk(true, true, MsgStatus, State),
1465            State2 = stats(ready0, {MsgStatus, none}, 1, State1),
1466            Delta1 = expand_delta(SeqId, Delta, IsPersistent),
1467            State2 #vqstate{ delta = Delta1};
1468        false ->
1469            State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) }
1470    end.
1471
1472queue_out(State = #vqstate { mode = default, q4 = Q4 }) ->
1473    case ?QUEUE:out(Q4) of
1474        {empty, _Q4} ->
1475            case fetch_from_q3(State) of
1476                {empty, _State1} = Result     -> Result;
1477                {loaded, {MsgStatus, State1}} -> {{value, MsgStatus}, State1}
1478            end;
1479        {{value, MsgStatus}, Q4a} ->
1480            {{value, MsgStatus}, State #vqstate { q4 = Q4a }}
1481    end;
1482%% lazy queues
1483queue_out(State = #vqstate { mode = lazy }) ->
1484    case fetch_from_q3(State) of
1485        {empty, _State1} = Result     -> Result;
1486        {loaded, {MsgStatus, State1}} -> {{value, MsgStatus}, State1}
1487    end.
1488
1489read_msg(#msg_status{msg           = undefined,
1490                     msg_id        = MsgId,
1491                     is_persistent = IsPersistent}, State) ->
1492    read_msg(MsgId, IsPersistent, State);
1493read_msg(#msg_status{msg = Msg}, State) ->
1494    {Msg, State}.
1495
1496read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState,
1497                                               disk_read_count   = Count}) ->
1498    {{ok, Msg = #basic_message {}}, MSCState1} =
1499        msg_store_read(MSCState, IsPersistent, MsgId),
1500    {Msg, State #vqstate {msg_store_clients = MSCState1,
1501                          disk_read_count   = Count + 1}}.
1502
1503stats(Signs, Statuses, DeltaPaged, State) ->
1504    stats0(expand_signs(Signs), expand_statuses(Statuses), DeltaPaged, State).
1505
1506expand_signs(ready0)        -> {0, 0, true};
1507expand_signs(lazy_pub)      -> {1, 0, true};
1508expand_signs({A, B})        -> {A, B, false}.
1509
1510expand_statuses({none, A})    -> {false,         msg_in_ram(A), A};
1511expand_statuses({B,    none}) -> {msg_in_ram(B), false,         B};
1512expand_statuses({lazy, A})    -> {false        , false,         A};
1513expand_statuses({B,    A})    -> {msg_in_ram(B), msg_in_ram(A), B}.
1514
1515%% In this function at least, we are religious: the variable name
1516%% contains "Ready" or "Unacked" iff that is what it counts. If
1517%% neither is present it counts both.
1518stats0({DeltaReady, DeltaUnacked, ReadyMsgPaged},
1519       {InRamBefore, InRamAfter, MsgStatus}, DeltaPaged,
1520       State = #vqstate{len              = ReadyCount,
1521                        bytes            = ReadyBytes,
1522                        ram_msg_count    = RamReadyCount,
1523                        persistent_count = PersistentCount,
1524                        unacked_bytes    = UnackedBytes,
1525                        ram_bytes        = RamBytes,
1526                        delta_transient_bytes = DeltaBytes,
1527                        persistent_bytes = PersistentBytes}) ->
1528    S = msg_size(MsgStatus),
1529    DeltaTotal = DeltaReady + DeltaUnacked,
1530    DeltaRam = case {InRamBefore, InRamAfter} of
1531                   {false, false} ->  0;
1532                   {false, true}  ->  1;
1533                   {true,  false} -> -1;
1534                   {true,  true}  ->  0
1535               end,
1536    DeltaRamReady = case DeltaReady of
1537                        1                    -> one_if(InRamAfter);
1538                        -1                   -> -one_if(InRamBefore);
1539                        0 when ReadyMsgPaged -> DeltaRam;
1540                        0                    -> 0
1541                    end,
1542    DeltaPersistent = DeltaTotal * one_if(MsgStatus#msg_status.is_persistent),
1543    State#vqstate{len               = ReadyCount      + DeltaReady,
1544                  ram_msg_count     = RamReadyCount   + DeltaRamReady,
1545                  persistent_count  = PersistentCount + DeltaPersistent,
1546                  bytes             = ReadyBytes      + DeltaReady       * S,
1547                  unacked_bytes     = UnackedBytes    + DeltaUnacked     * S,
1548                  ram_bytes         = RamBytes        + DeltaRam         * S,
1549                  persistent_bytes  = PersistentBytes + DeltaPersistent  * S,
1550                  delta_transient_bytes = DeltaBytes  + DeltaPaged * one_if(not MsgStatus#msg_status.is_persistent) * S}.
1551
1552msg_size(#msg_status{msg_props = #message_properties{size = Size}}) -> Size.
1553
1554msg_in_ram(#msg_status{msg = Msg}) -> Msg =/= undefined.
1555
1556%% first param: AckRequired
1557remove(true, MsgStatus = #msg_status {
1558               seq_id        = SeqId,
1559               is_delivered  = IsDelivered,
1560               index_on_disk = IndexOnDisk },
1561       State = #vqstate {out_counter       = OutCount,
1562                         index_state       = IndexState}) ->
1563    %% Mark it delivered if necessary
1564    IndexState1 = maybe_write_delivered(
1565                    IndexOnDisk andalso not IsDelivered,
1566                    SeqId, IndexState),
1567
1568    State1 = record_pending_ack(
1569               MsgStatus #msg_status {
1570                 is_delivered = true }, State),
1571
1572    State2 = stats({-1, 1}, {MsgStatus, MsgStatus}, 0, State1),
1573
1574    {SeqId, maybe_update_rates(
1575              State2 #vqstate {out_counter = OutCount + 1,
1576                               index_state = IndexState1})};
1577
1578%% This function body has the same behaviour as remove_queue_entries/3
1579%% but instead of removing messages based on a ?QUEUE, this removes
1580%% just one message, the one referenced by the MsgStatus provided.
1581remove(false, MsgStatus = #msg_status {
1582                seq_id        = SeqId,
1583                msg_id        = MsgId,
1584                is_persistent = IsPersistent,
1585                is_delivered  = IsDelivered,
1586                msg_in_store  = MsgInStore,
1587                index_on_disk = IndexOnDisk },
1588       State = #vqstate {out_counter       = OutCount,
1589                         index_state       = IndexState,
1590                         msg_store_clients = MSCState}) ->
1591    %% Mark it delivered if necessary
1592    IndexState1 = maybe_write_delivered(
1593                    IndexOnDisk andalso not IsDelivered,
1594                    SeqId, IndexState),
1595
1596    %% Remove from msg_store and queue index, if necessary
1597    case MsgInStore of
1598        true  -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]);
1599        false -> ok
1600    end,
1601
1602    IndexState2 =
1603        case IndexOnDisk of
1604            true  -> rabbit_queue_index:ack([SeqId], IndexState1);
1605            false -> IndexState1
1606        end,
1607
1608    State1 = stats({-1, 0}, {MsgStatus, none}, 0, State),
1609
1610    {undefined, maybe_update_rates(
1611                  State1 #vqstate {out_counter = OutCount + 1,
1612                                   index_state = IndexState2})}.
1613
1614%% This function exists as a way to improve dropwhile/2
1615%% performance. The idea of having this function is to optimise calls
1616%% to rabbit_queue_index by batching delivers and acks, instead of
1617%% sending them one by one.
1618%%
1619%% Instead of removing every message as their are popped from the
1620%% queue, it first accumulates them and then removes them by calling
1621%% remove_queue_entries/3, since the behaviour of
1622%% remove_queue_entries/3 when used with
1623%% process_delivers_and_acks_fun(deliver_and_ack) is the same as
1624%% calling remove(false, MsgStatus, State).
1625%%
1626%% remove/3 also updates the out_counter in every call, but here we do
1627%% it just once at the end.
1628remove_by_predicate(Pred, State = #vqstate {out_counter = OutCount}) ->
1629    {MsgProps, QAcc, State1} =
1630        collect_by_predicate(Pred, ?QUEUE:new(), State),
1631    State2 =
1632        remove_queue_entries(
1633          QAcc, process_delivers_and_acks_fun(deliver_and_ack), State1),
1634    %% maybe_update_rates/1 is called in remove/2 for every
1635    %% message. Since we update out_counter only once, we call it just
1636    %% there.
1637    {MsgProps, maybe_update_rates(
1638                 State2 #vqstate {
1639                   out_counter = OutCount + ?QUEUE:len(QAcc)})}.
1640
1641%% This function exists as a way to improve fetchwhile/4
1642%% performance. The idea of having this function is to optimise calls
1643%% to rabbit_queue_index by batching delivers, instead of sending them
1644%% one by one.
1645%%
1646%% Fun is the function passed to fetchwhile/4 that's
1647%% applied to every fetched message and used to build the fetchwhile/4
1648%% result accumulator FetchAcc.
1649fetch_by_predicate(Pred, Fun, FetchAcc,
1650                   State = #vqstate {
1651                              index_state = IndexState,
1652                              out_counter = OutCount}) ->
1653    {MsgProps, QAcc, State1} =
1654        collect_by_predicate(Pred, ?QUEUE:new(), State),
1655
1656    {Delivers, FetchAcc1, State2} =
1657        process_queue_entries(QAcc, Fun, FetchAcc, State1),
1658
1659    IndexState1 = rabbit_queue_index:deliver(Delivers, IndexState),
1660
1661    {MsgProps, FetchAcc1, maybe_update_rates(
1662                            State2 #vqstate {
1663                              index_state = IndexState1,
1664                              out_counter = OutCount + ?QUEUE:len(QAcc)})}.
1665
1666%% We try to do here the same as what remove(true, State) does but
1667%% processing several messages at the same time. The idea is to
1668%% optimize rabbit_queue_index:deliver/2 calls by sending a list of
1669%% SeqIds instead of one by one, thus process_queue_entries1 will
1670%% accumulate the required deliveries, will record_pending_ack for
1671%% each message, and will update stats, like remove/2 does.
1672%%
1673%% For the meaning of Fun and FetchAcc arguments see
1674%% fetch_by_predicate/4 above.
1675process_queue_entries(Q, Fun, FetchAcc, State) ->
1676    ?QUEUE:foldl(fun (MsgStatus, Acc) ->
1677                         process_queue_entries1(MsgStatus, Fun, Acc)
1678                 end,
1679                 {[], FetchAcc, State}, Q).
1680
1681process_queue_entries1(
1682  #msg_status { seq_id = SeqId, is_delivered = IsDelivered,
1683                index_on_disk = IndexOnDisk} = MsgStatus,
1684  Fun,
1685  {Delivers, FetchAcc, State}) ->
1686    {Msg, State1} = read_msg(MsgStatus, State),
1687    State2 = record_pending_ack(
1688               MsgStatus #msg_status {
1689                 is_delivered = true }, State1),
1690    {cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers),
1691     Fun(Msg, SeqId, FetchAcc),
1692     stats({-1, 1}, {MsgStatus, MsgStatus}, 0, State2)}.
1693
1694collect_by_predicate(Pred, QAcc, State) ->
1695    case queue_out(State) of
1696        {empty, State1} ->
1697            {undefined, QAcc, State1};
1698        {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
1699            case Pred(MsgProps) of
1700                true  -> collect_by_predicate(Pred, ?QUEUE:in(MsgStatus, QAcc),
1701                                              State1);
1702                false -> {MsgProps, QAcc, in_r(MsgStatus, State1)}
1703            end
1704    end.
1705
1706%%----------------------------------------------------------------------------
1707%% Helpers for Public API purge/1 function
1708%%----------------------------------------------------------------------------
1709
1710%% The difference between purge_when_pending_acks/1
1711%% vs. purge_and_index_reset/1 is that the first one issues a deliver
1712%% and an ack to the queue index for every message that's being
1713%% removed, while the later just resets the queue index state.
1714purge_when_pending_acks(State) ->
1715    State1 = purge1(process_delivers_and_acks_fun(deliver_and_ack), State),
1716    a(State1).
1717
1718purge_and_index_reset(State) ->
1719    State1 = purge1(process_delivers_and_acks_fun(none), State),
1720    a(reset_qi_state(State1)).
1721
1722%% This function removes messages from each of {q1, q2, q3, q4}.
1723%%
1724%% With remove_queue_entries/3 q1 and q4 are emptied, while q2 and q3
1725%% are specially handled by purge_betas_and_deltas/2.
1726%%
1727%% purge_betas_and_deltas/2 loads messages from the queue index,
1728%% filling up q3 and in some cases moving messages form q2 to q3 while
1729%% resetting q2 to an empty queue (see maybe_deltas_to_betas/2). The
1730%% messages loaded into q3 are removed by calling
1731%% remove_queue_entries/3 until there are no more messages to be read
1732%% from the queue index. Messages are read in batches from the queue
1733%% index.
1734purge1(AfterFun, State = #vqstate { q4 = Q4}) ->
1735    State1 = remove_queue_entries(Q4, AfterFun, State),
1736
1737    State2 = #vqstate {q1 = Q1} =
1738        purge_betas_and_deltas(AfterFun, State1#vqstate{q4 = ?QUEUE:new()}),
1739
1740    State3 = remove_queue_entries(Q1, AfterFun, State2),
1741
1742    a(State3#vqstate{q1 = ?QUEUE:new()}).
1743
1744reset_qi_state(State = #vqstate{index_state = IndexState}) ->
1745    State#vqstate{index_state =
1746                         rabbit_queue_index:reset_state(IndexState)}.
1747
1748is_pending_ack_empty(State) ->
1749    count_pending_acks(State) =:= 0.
1750
1751is_unconfirmed_empty(#vqstate { unconfirmed = UC }) ->
1752    gb_sets:is_empty(UC).
1753
1754count_pending_acks(#vqstate { ram_pending_ack   = RPA,
1755                              disk_pending_ack  = DPA,
1756                              qi_pending_ack    = QPA }) ->
1757    gb_trees:size(RPA) + gb_trees:size(DPA) + gb_trees:size(QPA).
1758
1759purge_betas_and_deltas(DelsAndAcksFun, State = #vqstate { mode = Mode }) ->
1760    State0 = #vqstate { q3 = Q3 } =
1761        case Mode of
1762            lazy -> maybe_deltas_to_betas(DelsAndAcksFun, State);
1763            _    -> State
1764        end,
1765
1766    case ?QUEUE:is_empty(Q3) of
1767        true  -> State0;
1768        false -> State1 = remove_queue_entries(Q3, DelsAndAcksFun, State0),
1769                 purge_betas_and_deltas(DelsAndAcksFun,
1770                                        maybe_deltas_to_betas(
1771                                          DelsAndAcksFun,
1772                                          State1#vqstate{q3 = ?QUEUE:new()}))
1773    end.
1774
1775remove_queue_entries(Q, DelsAndAcksFun,
1776                     State = #vqstate{msg_store_clients = MSCState}) ->
1777    {MsgIdsByStore, Delivers, Acks, State1} =
1778        ?QUEUE:foldl(fun remove_queue_entries1/2,
1779                     {maps:new(), [], [], State}, Q),
1780    remove_msgs_by_id(MsgIdsByStore, MSCState),
1781    DelsAndAcksFun(Delivers, Acks, State1).
1782
1783remove_queue_entries1(
1784  #msg_status { msg_id = MsgId, seq_id = SeqId, is_delivered = IsDelivered,
1785                msg_in_store = MsgInStore, index_on_disk = IndexOnDisk,
1786                is_persistent = IsPersistent} = MsgStatus,
1787  {MsgIdsByStore, Delivers, Acks, State}) ->
1788    {case MsgInStore of
1789         true  -> rabbit_misc:maps_cons(IsPersistent, MsgId, MsgIdsByStore);
1790         false -> MsgIdsByStore
1791     end,
1792     cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers),
1793     cons_if(IndexOnDisk, SeqId, Acks),
1794     stats({-1, 0}, {MsgStatus, none}, 0, State)}.
1795
1796process_delivers_and_acks_fun(deliver_and_ack) ->
1797    fun (Delivers, Acks, State = #vqstate { index_state = IndexState }) ->
1798            IndexState1 =
1799                rabbit_queue_index:ack(
1800                  Acks, rabbit_queue_index:deliver(Delivers, IndexState)),
1801            State #vqstate { index_state = IndexState1 }
1802    end;
1803process_delivers_and_acks_fun(_) ->
1804    fun (_, _, State) ->
1805            State
1806    end.
1807
1808%%----------------------------------------------------------------------------
1809%% Internal gubbins for publishing
1810%%----------------------------------------------------------------------------
1811
1812publish1(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
1813         MsgProps = #message_properties { needs_confirming = NeedsConfirming },
1814         IsDelivered, _ChPid, _Flow, PersistFun,
1815         State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
1816                            mode                = default,
1817                            qi_embed_msgs_below = IndexMaxSize,
1818                            next_seq_id         = SeqId,
1819                            in_counter          = InCount,
1820                            durable             = IsDurable,
1821                            unconfirmed         = UC }) ->
1822    IsPersistent1 = IsDurable andalso IsPersistent,
1823    MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps, IndexMaxSize),
1824    {MsgStatus1, State1} = PersistFun(false, false, MsgStatus, State),
1825    State2 = case ?QUEUE:is_empty(Q3) of
1826                 false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) };
1827                 true  -> State1 #vqstate { q4 = ?QUEUE:in(m(MsgStatus1), Q4) }
1828             end,
1829    InCount1 = InCount + 1,
1830    UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
1831    stats({1, 0}, {none, MsgStatus1}, 0,
1832          State2#vqstate{ next_seq_id = SeqId + 1,
1833                          in_counter  = InCount1,
1834                          unconfirmed = UC1 });
1835publish1(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
1836             MsgProps = #message_properties { needs_confirming = NeedsConfirming },
1837             IsDelivered, _ChPid, _Flow, PersistFun,
1838             State = #vqstate { mode                = lazy,
1839                                qi_embed_msgs_below = IndexMaxSize,
1840                                next_seq_id         = SeqId,
1841                                in_counter          = InCount,
1842                                durable             = IsDurable,
1843                                unconfirmed         = UC,
1844                                delta               = Delta}) ->
1845    IsPersistent1 = IsDurable andalso IsPersistent,
1846    MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps, IndexMaxSize),
1847    {MsgStatus1, State1} = PersistFun(true, true, MsgStatus, State),
1848    Delta1 = expand_delta(SeqId, Delta, IsPersistent),
1849    UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
1850    stats(lazy_pub, {lazy, m(MsgStatus1)}, 1,
1851          State1#vqstate{ delta       = Delta1,
1852                          next_seq_id = SeqId + 1,
1853                          in_counter  = InCount + 1,
1854                          unconfirmed = UC1}).
1855
1856batch_publish1({Msg, MsgProps, IsDelivered}, {ChPid, Flow, State}) ->
1857    {ChPid, Flow, publish1(Msg, MsgProps, IsDelivered, ChPid, Flow,
1858                           fun maybe_prepare_write_to_disk/4, State)}.
1859
1860publish_delivered1(Msg = #basic_message { is_persistent = IsPersistent,
1861                                          id = MsgId },
1862                   MsgProps = #message_properties {
1863                                 needs_confirming = NeedsConfirming },
1864                   _ChPid, _Flow, PersistFun,
1865                   State = #vqstate { mode                = default,
1866                                      qi_embed_msgs_below = IndexMaxSize,
1867                                      next_seq_id         = SeqId,
1868                                      out_counter         = OutCount,
1869                                      in_counter          = InCount,
1870                                      durable             = IsDurable,
1871                                      unconfirmed         = UC }) ->
1872    IsPersistent1 = IsDurable andalso IsPersistent,
1873    MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps, IndexMaxSize),
1874    {MsgStatus1, State1} = PersistFun(false, false, MsgStatus, State),
1875    State2 = record_pending_ack(m(MsgStatus1), State1),
1876    UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
1877    State3 = stats({0, 1}, {none, MsgStatus1}, 0,
1878                   State2 #vqstate { next_seq_id      = SeqId    + 1,
1879                                     out_counter      = OutCount + 1,
1880                                     in_counter       = InCount  + 1,
1881                                     unconfirmed      = UC1 }),
1882    {SeqId, State3};
1883publish_delivered1(Msg = #basic_message { is_persistent = IsPersistent,
1884                                          id = MsgId },
1885                   MsgProps = #message_properties {
1886                                 needs_confirming = NeedsConfirming },
1887                   _ChPid, _Flow, PersistFun,
1888                   State = #vqstate { mode                = lazy,
1889                                      qi_embed_msgs_below = IndexMaxSize,
1890                                      next_seq_id         = SeqId,
1891                                      out_counter         = OutCount,
1892                                      in_counter          = InCount,
1893                                      durable             = IsDurable,
1894                                      unconfirmed         = UC }) ->
1895    IsPersistent1 = IsDurable andalso IsPersistent,
1896    MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps, IndexMaxSize),
1897    {MsgStatus1, State1} = PersistFun(true, true, MsgStatus, State),
1898    State2 = record_pending_ack(m(MsgStatus1), State1),
1899    UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
1900    State3 = stats({0, 1}, {none, MsgStatus1}, 0,
1901                   State2 #vqstate { next_seq_id      = SeqId    + 1,
1902                                     out_counter      = OutCount + 1,
1903                                     in_counter       = InCount  + 1,
1904                                     unconfirmed      = UC1 }),
1905    {SeqId, State3}.
1906
1907batch_publish_delivered1({Msg, MsgProps}, {ChPid, Flow, SeqIds, State}) ->
1908    {SeqId, State1} =
1909        publish_delivered1(Msg, MsgProps, ChPid, Flow,
1910                           fun maybe_prepare_write_to_disk/4,
1911                           State),
1912    {ChPid, Flow, [SeqId | SeqIds], State1}.
1913
1914maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status {
1915                                  msg_in_store = true }, State) ->
1916    {MsgStatus, State};
1917maybe_write_msg_to_disk(Force, MsgStatus = #msg_status {
1918                                 msg = Msg, msg_id = MsgId,
1919                                 is_persistent = IsPersistent },
1920                        State = #vqstate{ msg_store_clients = MSCState,
1921                                          disk_write_count  = Count})
1922  when Force orelse IsPersistent ->
1923    case persist_to(MsgStatus) of
1924        msg_store   -> ok = msg_store_write(MSCState, IsPersistent, MsgId,
1925                                            prepare_to_store(Msg)),
1926                       {MsgStatus#msg_status{msg_in_store = true},
1927                        State#vqstate{disk_write_count = Count + 1}};
1928        queue_index -> {MsgStatus, State}
1929    end;
1930maybe_write_msg_to_disk(_Force, MsgStatus, State) ->
1931    {MsgStatus, State}.
1932
1933%% Due to certain optimisations made inside
1934%% rabbit_queue_index:pre_publish/7 we need to have two separate
1935%% functions for index persistence. This one is only used when paging
1936%% during memory pressure. We didn't want to modify
1937%% maybe_write_index_to_disk/3 because that function is used in other
1938%% places.
1939maybe_batch_write_index_to_disk(_Force,
1940                                MsgStatus = #msg_status {
1941                                  index_on_disk = true }, State) ->
1942    {MsgStatus, State};
1943maybe_batch_write_index_to_disk(Force,
1944                                MsgStatus = #msg_status {
1945                                  msg           = Msg,
1946                                  msg_id        = MsgId,
1947                                  seq_id        = SeqId,
1948                                  is_persistent = IsPersistent,
1949                                  is_delivered  = IsDelivered,
1950                                  msg_props     = MsgProps},
1951                                State = #vqstate {
1952                                           target_ram_count = TargetRamCount,
1953                                           disk_write_count = DiskWriteCount,
1954                                           index_state      = IndexState})
1955  when Force orelse IsPersistent ->
1956    {MsgOrId, DiskWriteCount1} =
1957        case persist_to(MsgStatus) of
1958            msg_store   -> {MsgId, DiskWriteCount};
1959            queue_index -> {prepare_to_store(Msg), DiskWriteCount + 1}
1960        end,
1961    IndexState1 = rabbit_queue_index:pre_publish(
1962                    MsgOrId, SeqId, MsgProps, IsPersistent, IsDelivered,
1963                    TargetRamCount, IndexState),
1964    {MsgStatus#msg_status{index_on_disk = true},
1965     State#vqstate{index_state      = IndexState1,
1966                   disk_write_count = DiskWriteCount1}};
1967maybe_batch_write_index_to_disk(_Force, MsgStatus, State) ->
1968    {MsgStatus, State}.
1969
1970maybe_write_index_to_disk(_Force, MsgStatus = #msg_status {
1971                                    index_on_disk = true }, State) ->
1972    {MsgStatus, State};
1973maybe_write_index_to_disk(Force, MsgStatus = #msg_status {
1974                                   msg           = Msg,
1975                                   msg_id        = MsgId,
1976                                   seq_id        = SeqId,
1977                                   is_persistent = IsPersistent,
1978                                   is_delivered  = IsDelivered,
1979                                   msg_props     = MsgProps},
1980                          State = #vqstate{target_ram_count = TargetRamCount,
1981                                           disk_write_count = DiskWriteCount,
1982                                           index_state      = IndexState})
1983  when Force orelse IsPersistent ->
1984    {MsgOrId, DiskWriteCount1} =
1985        case persist_to(MsgStatus) of
1986            msg_store   -> {MsgId, DiskWriteCount};
1987            queue_index -> {prepare_to_store(Msg), DiskWriteCount + 1}
1988        end,
1989    IndexState1 = rabbit_queue_index:publish(
1990                    MsgOrId, SeqId, MsgProps, IsPersistent, TargetRamCount,
1991                    IndexState),
1992    IndexState2 = maybe_write_delivered(IsDelivered, SeqId, IndexState1),
1993    {MsgStatus#msg_status{index_on_disk = true},
1994     State#vqstate{index_state      = IndexState2,
1995                   disk_write_count = DiskWriteCount1}};
1996
1997maybe_write_index_to_disk(_Force, MsgStatus, State) ->
1998    {MsgStatus, State}.
1999
2000maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, State) ->
2001    {MsgStatus1, State1} = maybe_write_msg_to_disk(ForceMsg, MsgStatus, State),
2002    maybe_write_index_to_disk(ForceIndex, MsgStatus1, State1).
2003
2004maybe_prepare_write_to_disk(ForceMsg, ForceIndex, MsgStatus, State) ->
2005    {MsgStatus1, State1} = maybe_write_msg_to_disk(ForceMsg, MsgStatus, State),
2006    maybe_batch_write_index_to_disk(ForceIndex, MsgStatus1, State1).
2007
2008determine_persist_to(#basic_message{
2009                        content = #content{properties     = Props,
2010                                           properties_bin = PropsBin}},
2011                     #message_properties{size = BodySize},
2012                     IndexMaxSize) ->
2013    %% The >= is so that you can set the env to 0 and never persist
2014    %% to the index.
2015    %%
2016    %% We want this to be fast, so we avoid size(term_to_binary())
2017    %% here, or using the term size estimation from truncate.erl, both
2018    %% of which are too slow. So instead, if the message body size
2019    %% goes over the limit then we avoid any other checks.
2020    %%
2021    %% If it doesn't we need to decide if the properties will push
2022    %% it past the limit. If we have the encoded properties (usual
2023    %% case) we can just check their size. If we don't (message came
2024    %% via the direct client), we make a guess based on the number of
2025    %% headers.
2026    case BodySize >= IndexMaxSize of
2027        true  -> msg_store;
2028        false -> Est = case is_binary(PropsBin) of
2029                           true  -> BodySize + size(PropsBin);
2030                           false -> #'P_basic'{headers = Hs} = Props,
2031                                    case Hs of
2032                                        undefined -> 0;
2033                                        _         -> length(Hs)
2034                                    end * ?HEADER_GUESS_SIZE + BodySize
2035                       end,
2036                 case Est >= IndexMaxSize of
2037                     true  -> msg_store;
2038                     false -> queue_index
2039                 end
2040    end.
2041
2042persist_to(#msg_status{persist_to = To}) -> To.
2043
2044prepare_to_store(Msg) ->
2045    Msg#basic_message{
2046      %% don't persist any recoverable decoded properties
2047      content = rabbit_binary_parser:clear_decoded_content(
2048                  Msg #basic_message.content)}.
2049
2050%%----------------------------------------------------------------------------
2051%% Internal gubbins for acks
2052%%----------------------------------------------------------------------------
2053
2054record_pending_ack(#msg_status { seq_id = SeqId } = MsgStatus,
2055                   State = #vqstate { ram_pending_ack  = RPA,
2056                                      disk_pending_ack = DPA,
2057                                      qi_pending_ack   = QPA,
2058                                      ack_in_counter   = AckInCount}) ->
2059    Insert = fun (Tree) -> gb_trees:insert(SeqId, MsgStatus, Tree) end,
2060    {RPA1, DPA1, QPA1} =
2061        case {msg_in_ram(MsgStatus), persist_to(MsgStatus)} of
2062            {false, _}           -> {RPA, Insert(DPA), QPA};
2063            {_,     queue_index} -> {RPA, DPA, Insert(QPA)};
2064            {_,     msg_store}   -> {Insert(RPA), DPA, QPA}
2065        end,
2066    State #vqstate { ram_pending_ack  = RPA1,
2067                     disk_pending_ack = DPA1,
2068                     qi_pending_ack   = QPA1,
2069                     ack_in_counter   = AckInCount + 1}.
2070
2071lookup_pending_ack(SeqId, #vqstate { ram_pending_ack  = RPA,
2072                                     disk_pending_ack = DPA,
2073                                     qi_pending_ack   = QPA}) ->
2074    case gb_trees:lookup(SeqId, RPA) of
2075        {value, V} -> V;
2076        none       -> case gb_trees:lookup(SeqId, DPA) of
2077                          {value, V} -> V;
2078                          none       -> gb_trees:get(SeqId, QPA)
2079                      end
2080    end.
2081
2082%% First parameter = UpdateStats
2083remove_pending_ack(true, SeqId, State) ->
2084    case remove_pending_ack(false, SeqId, State) of
2085        {none, _} ->
2086            {none, State};
2087        {MsgStatus, State1} ->
2088            {MsgStatus, stats({0, -1}, {MsgStatus, none}, 0, State1)}
2089    end;
2090remove_pending_ack(false, SeqId, State = #vqstate{ram_pending_ack  = RPA,
2091                                                  disk_pending_ack = DPA,
2092                                                  qi_pending_ack   = QPA}) ->
2093    case gb_trees:lookup(SeqId, RPA) of
2094        {value, V} -> RPA1 = gb_trees:delete(SeqId, RPA),
2095                      {V, State #vqstate { ram_pending_ack = RPA1 }};
2096        none       -> case gb_trees:lookup(SeqId, DPA) of
2097                          {value, V} ->
2098                              DPA1 = gb_trees:delete(SeqId, DPA),
2099                              {V, State#vqstate{disk_pending_ack = DPA1}};
2100                          none ->
2101                              case gb_trees:lookup(SeqId, QPA) of
2102                                  {value, V} ->
2103                                      QPA1 = gb_trees:delete(SeqId, QPA),
2104                                      {V, State#vqstate{qi_pending_ack = QPA1}};
2105                                  none ->
2106                                      {none, State}
2107                              end
2108                      end
2109    end.
2110
2111purge_pending_ack(KeepPersistent,
2112                  State = #vqstate { index_state       = IndexState,
2113                                     msg_store_clients = MSCState }) ->
2114    {IndexOnDiskSeqIds, MsgIdsByStore, State1} = purge_pending_ack1(State),
2115    case KeepPersistent of
2116        true  -> remove_transient_msgs_by_id(MsgIdsByStore, MSCState),
2117                 State1;
2118        false -> IndexState1 =
2119                     rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState),
2120                 remove_msgs_by_id(MsgIdsByStore, MSCState),
2121                 State1 #vqstate { index_state = IndexState1 }
2122    end.
2123
2124purge_pending_ack_delete_and_terminate(
2125  State = #vqstate { index_state       = IndexState,
2126                     msg_store_clients = MSCState }) ->
2127    {_, MsgIdsByStore, State1} = purge_pending_ack1(State),
2128    IndexState1 = rabbit_queue_index:delete_and_terminate(IndexState),
2129    remove_msgs_by_id(MsgIdsByStore, MSCState),
2130    State1 #vqstate { index_state = IndexState1 }.
2131
2132purge_pending_ack1(State = #vqstate { ram_pending_ack   = RPA,
2133                                      disk_pending_ack  = DPA,
2134                                      qi_pending_ack    = QPA }) ->
2135    F = fun (_SeqId, MsgStatus, Acc) -> accumulate_ack(MsgStatus, Acc) end,
2136    {IndexOnDiskSeqIds, MsgIdsByStore, _AllMsgIds} =
2137        rabbit_misc:gb_trees_fold(
2138          F, rabbit_misc:gb_trees_fold(
2139               F,  rabbit_misc:gb_trees_fold(
2140                     F, accumulate_ack_init(), RPA), DPA), QPA),
2141    State1 = State #vqstate { ram_pending_ack  = gb_trees:empty(),
2142                              disk_pending_ack = gb_trees:empty(),
2143                              qi_pending_ack   = gb_trees:empty()},
2144    {IndexOnDiskSeqIds, MsgIdsByStore, State1}.
2145
2146%% MsgIdsByStore is an map with two keys:
2147%%
2148%% true: holds a list of Persistent Message Ids.
2149%% false: holds a list of Transient Message Ids.
2150%%
2151%% When we call maps:to_list/1 we get two sets of msg ids, where
2152%% IsPersistent is either true for persistent messages or false for
2153%% transient ones. The msg_store_remove/3 function takes this boolean
2154%% flag to determine from which store the messages should be removed
2155%% from.
2156remove_msgs_by_id(MsgIdsByStore, MSCState) ->
2157    [ok = msg_store_remove(MSCState, IsPersistent, MsgIds)
2158     || {IsPersistent, MsgIds} <- maps:to_list(MsgIdsByStore)].
2159
2160remove_transient_msgs_by_id(MsgIdsByStore, MSCState) ->
2161    case maps:find(false, MsgIdsByStore) of
2162        error        -> ok;
2163        {ok, MsgIds} -> ok = msg_store_remove(MSCState, false, MsgIds)
2164    end.
2165
2166accumulate_ack_init() -> {[], maps:new(), []}.
2167
2168accumulate_ack(#msg_status { seq_id        = SeqId,
2169                             msg_id        = MsgId,
2170                             is_persistent = IsPersistent,
2171                             msg_in_store  = MsgInStore,
2172                             index_on_disk = IndexOnDisk },
2173               {IndexOnDiskSeqIdsAcc, MsgIdsByStore, AllMsgIds}) ->
2174    {cons_if(IndexOnDisk, SeqId, IndexOnDiskSeqIdsAcc),
2175     case MsgInStore of
2176         true  -> rabbit_misc:maps_cons(IsPersistent, MsgId, MsgIdsByStore);
2177         false -> MsgIdsByStore
2178     end,
2179     [MsgId | AllMsgIds]}.
2180
2181%%----------------------------------------------------------------------------
2182%% Internal plumbing for confirms (aka publisher acks)
2183%%----------------------------------------------------------------------------
2184
2185record_confirms(MsgIdSet, State = #vqstate { msgs_on_disk        = MOD,
2186                                             msg_indices_on_disk = MIOD,
2187                                             unconfirmed         = UC,
2188                                             confirmed           = C }) ->
2189    State #vqstate {
2190      msgs_on_disk        = rabbit_misc:gb_sets_difference(MOD,  MsgIdSet),
2191      msg_indices_on_disk = rabbit_misc:gb_sets_difference(MIOD, MsgIdSet),
2192      unconfirmed         = rabbit_misc:gb_sets_difference(UC,   MsgIdSet),
2193      confirmed           = gb_sets:union(C, MsgIdSet) }.
2194
2195msgs_written_to_disk(Callback, MsgIdSet, ignored) ->
2196    Callback(?MODULE,
2197             fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end);
2198msgs_written_to_disk(Callback, MsgIdSet, written) ->
2199    Callback(?MODULE,
2200             fun (?MODULE, State = #vqstate { msgs_on_disk        = MOD,
2201                                              msg_indices_on_disk = MIOD,
2202                                              unconfirmed         = UC }) ->
2203                     Confirmed = gb_sets:intersection(UC, MsgIdSet),
2204                     record_confirms(gb_sets:intersection(MsgIdSet, MIOD),
2205                                     State #vqstate {
2206                                       msgs_on_disk =
2207                                           gb_sets:union(MOD, Confirmed) })
2208             end).
2209
2210msg_indices_written_to_disk(Callback, MsgIdSet) ->
2211    Callback(?MODULE,
2212             fun (?MODULE, State = #vqstate { msgs_on_disk        = MOD,
2213                                              msg_indices_on_disk = MIOD,
2214                                              unconfirmed         = UC }) ->
2215                     Confirmed = gb_sets:intersection(UC, MsgIdSet),
2216                     record_confirms(gb_sets:intersection(MsgIdSet, MOD),
2217                                     State #vqstate {
2218                                       msg_indices_on_disk =
2219                                           gb_sets:union(MIOD, Confirmed) })
2220             end).
2221
2222msgs_and_indices_written_to_disk(Callback, MsgIdSet) ->
2223    Callback(?MODULE,
2224             fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end).
2225
2226%%----------------------------------------------------------------------------
2227%% Internal plumbing for requeue
2228%%----------------------------------------------------------------------------
2229
2230publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) ->
2231    {Msg, State1} = read_msg(MsgStatus, State),
2232    MsgStatus1 = MsgStatus#msg_status { msg = Msg },
2233    {MsgStatus1, stats({1, -1}, {MsgStatus, MsgStatus1}, 0, State1)};
2234publish_alpha(MsgStatus, State) ->
2235    {MsgStatus, stats({1, -1}, {MsgStatus, MsgStatus}, 0, State)}.
2236
2237publish_beta(MsgStatus, State) ->
2238    {MsgStatus1, State1} = maybe_prepare_write_to_disk(true, false, MsgStatus, State),
2239    MsgStatus2 = m(trim_msg_status(MsgStatus1)),
2240    {MsgStatus2, stats({1, -1}, {MsgStatus, MsgStatus2}, 0, State1)}.
2241
2242%% Rebuild queue, inserting sequence ids to maintain ordering
2243queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, State) ->
2244    queue_merge(SeqIds, Q, ?QUEUE:new(), MsgIds,
2245                Limit, PubFun, State).
2246
2247queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds,
2248            Limit, PubFun, State)
2249  when Limit == undefined orelse SeqId < Limit ->
2250    case ?QUEUE:out(Q) of
2251        {{value, #msg_status { seq_id = SeqIdQ } = MsgStatus}, Q1}
2252          when SeqIdQ < SeqId ->
2253            %% enqueue from the remaining queue
2254            queue_merge(SeqIds, Q1, ?QUEUE:in(MsgStatus, Front), MsgIds,
2255                        Limit, PubFun, State);
2256        {_, _Q1} ->
2257            %% enqueue from the remaining list of sequence ids
2258            case msg_from_pending_ack(SeqId, State) of
2259                {none, _} ->
2260                    queue_merge(Rest, Q, Front, MsgIds, Limit, PubFun, State);
2261                {MsgStatus, State1} ->
2262                    {#msg_status { msg_id = MsgId } = MsgStatus1, State2} =
2263                        PubFun(MsgStatus, State1),
2264                    queue_merge(Rest, Q, ?QUEUE:in(MsgStatus1, Front), [MsgId | MsgIds],
2265                                Limit, PubFun, State2)
2266            end
2267    end;
2268queue_merge(SeqIds, Q, Front, MsgIds,
2269            _Limit, _PubFun, State) ->
2270    {SeqIds, ?QUEUE:join(Front, Q), MsgIds, State}.
2271
2272delta_merge([], Delta, MsgIds, State) ->
2273    {Delta, MsgIds, State};
2274delta_merge(SeqIds, Delta, MsgIds, State) ->
2275    lists:foldl(fun (SeqId, {Delta0, MsgIds0, State0} = Acc) ->
2276                        case msg_from_pending_ack(SeqId, State0) of
2277                            {none, _} ->
2278                                Acc;
2279                        {#msg_status { msg_id = MsgId,
2280                                       is_persistent = IsPersistent } = MsgStatus, State1} ->
2281                                {_MsgStatus, State2} =
2282                                    maybe_prepare_write_to_disk(true, true, MsgStatus, State1),
2283                                {expand_delta(SeqId, Delta0, IsPersistent), [MsgId | MsgIds0],
2284                                 stats({1, -1}, {MsgStatus, none}, 1, State2)}
2285                        end
2286                end, {Delta, MsgIds, State}, SeqIds).
2287
2288%% Mostly opposite of record_pending_ack/2
2289msg_from_pending_ack(SeqId, State) ->
2290    case remove_pending_ack(false, SeqId, State) of
2291        {none, _} ->
2292            {none, State};
2293        {#msg_status { msg_props = MsgProps } = MsgStatus, State1} ->
2294            {MsgStatus #msg_status {
2295               msg_props = MsgProps #message_properties { needs_confirming = false } },
2296             State1}
2297    end.
2298
2299beta_limit(Q) ->
2300    case ?QUEUE:peek(Q) of
2301        {value, #msg_status { seq_id = SeqId }} -> SeqId;
2302        empty                                   -> undefined
2303    end.
2304
2305delta_limit(?BLANK_DELTA_PATTERN(_))              -> undefined;
2306delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId.
2307
2308%%----------------------------------------------------------------------------
2309%% Iterator
2310%%----------------------------------------------------------------------------
2311
2312ram_ack_iterator(State) ->
2313    {ack, gb_trees:iterator(State#vqstate.ram_pending_ack)}.
2314
2315disk_ack_iterator(State) ->
2316    {ack, gb_trees:iterator(State#vqstate.disk_pending_ack)}.
2317
2318qi_ack_iterator(State) ->
2319    {ack, gb_trees:iterator(State#vqstate.qi_pending_ack)}.
2320
2321msg_iterator(State) -> istate(start, State).
2322
2323istate(start, State) -> {q4,    State#vqstate.q4,    State};
2324istate(q4,    State) -> {q3,    State#vqstate.q3,    State};
2325istate(q3,    State) -> {delta, State#vqstate.delta, State};
2326istate(delta, State) -> {q2,    State#vqstate.q2,    State};
2327istate(q2,    State) -> {q1,    State#vqstate.q1,    State};
2328istate(q1,   _State) -> done.
2329
2330next({ack, It}, IndexState) ->
2331    case gb_trees:next(It) of
2332        none                     -> {empty, IndexState};
2333        {_SeqId, MsgStatus, It1} -> Next = {ack, It1},
2334                                    {value, MsgStatus, true, Next, IndexState}
2335    end;
2336next(done, IndexState) -> {empty, IndexState};
2337next({delta, #delta{start_seq_id = SeqId,
2338                    end_seq_id   = SeqId}, State}, IndexState) ->
2339    next(istate(delta, State), IndexState);
2340next({delta, #delta{start_seq_id = SeqId,
2341                    end_seq_id   = SeqIdEnd} = Delta, State}, IndexState) ->
2342    SeqIdB = rabbit_queue_index:next_segment_boundary(SeqId),
2343    SeqId1 = lists:min([SeqIdB, SeqIdEnd]),
2344    {List, IndexState1} = rabbit_queue_index:read(SeqId, SeqId1, IndexState),
2345    next({delta, Delta#delta{start_seq_id = SeqId1}, List, State}, IndexState1);
2346next({delta, Delta, [], State}, IndexState) ->
2347    next({delta, Delta, State}, IndexState);
2348next({delta, Delta, [{_, SeqId, _, _, _} = M | Rest], State}, IndexState) ->
2349    case is_msg_in_pending_acks(SeqId, State) of
2350        false -> Next = {delta, Delta, Rest, State},
2351                 {value, beta_msg_status(M), false, Next, IndexState};
2352        true  -> next({delta, Delta, Rest, State}, IndexState)
2353    end;
2354next({Key, Q, State}, IndexState) ->
2355    case ?QUEUE:out(Q) of
2356        {empty, _Q}              -> next(istate(Key, State), IndexState);
2357        {{value, MsgStatus}, QN} -> Next = {Key, QN, State},
2358                                    {value, MsgStatus, false, Next, IndexState}
2359    end.
2360
2361inext(It, {Its, IndexState}) ->
2362    case next(It, IndexState) of
2363        {empty, IndexState1} ->
2364            {Its, IndexState1};
2365        {value, MsgStatus1, Unacked, It1, IndexState1} ->
2366            {[{MsgStatus1, Unacked, It1} | Its], IndexState1}
2367    end.
2368
2369ifold(_Fun, Acc, [], State0) ->
2370    {Acc, State0};
2371ifold(Fun, Acc, Its0, State0) ->
2372    [{MsgStatus, Unacked, It} | Rest] =
2373        lists:sort(fun ({#msg_status{seq_id = SeqId1}, _, _},
2374                        {#msg_status{seq_id = SeqId2}, _, _}) ->
2375                           SeqId1 =< SeqId2
2376                   end, Its0),
2377    {Msg, State1} = read_msg(MsgStatus, State0),
2378    case Fun(Msg, MsgStatus#msg_status.msg_props, Unacked, Acc) of
2379        {stop, Acc1} ->
2380            {Acc1, State1};
2381        {cont, Acc1} ->
2382            IndexState0 = State1#vqstate.index_state,
2383            {Its1, IndexState1} = inext(It, {Rest, IndexState0}),
2384            State2 = State1#vqstate{index_state = IndexState1},
2385            ifold(Fun, Acc1, Its1, State2)
2386    end.
2387
2388%%----------------------------------------------------------------------------
2389%% Phase changes
2390%%----------------------------------------------------------------------------
2391
2392maybe_reduce_memory_use(State = #vqstate {memory_reduction_run_count = MRedRunCount,
2393                                          mode = Mode}) ->
2394    case MRedRunCount >= ?EXPLICIT_GC_RUN_OP_THRESHOLD(Mode) of
2395        true -> State1 = reduce_memory_use(State),
2396                State1#vqstate{memory_reduction_run_count =  0};
2397        false -> State#vqstate{memory_reduction_run_count =  MRedRunCount + 1}
2398    end.
2399
2400reduce_memory_use(State = #vqstate { target_ram_count = infinity }) ->
2401    State;
2402reduce_memory_use(State = #vqstate {
2403                    mode             = default,
2404                    ram_pending_ack  = RPA,
2405                    ram_msg_count    = RamMsgCount,
2406                    target_ram_count = TargetRamCount,
2407                    io_batch_size    = IoBatchSize,
2408                    rates            = #rates { in      = AvgIngress,
2409                                                out     = AvgEgress,
2410                                                ack_in  = AvgAckIngress,
2411                                                ack_out = AvgAckEgress } }) ->
2412    {CreditDiscBound, _} =rabbit_misc:get_env(rabbit,
2413                                              msg_store_credit_disc_bound,
2414                                              ?CREDIT_DISC_BOUND),
2415    {NeedResumeA2B, State1} = {_, #vqstate { q2 = Q2, q3 = Q3 }} =
2416        case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of
2417            0  -> {false, State};
2418            %% Reduce memory of pending acks and alphas. The order is
2419            %% determined based on which is growing faster. Whichever
2420            %% comes second may very well get a quota of 0 if the
2421            %% first manages to push out the max number of messages.
2422            A2BChunk ->
2423                %% In case there are few messages to be sent to a message store
2424                %% and many messages to be embedded to the queue index,
2425                %% we should limit the number of messages to be flushed
2426                %% to avoid blocking the process.
2427                A2BChunkActual = case A2BChunk > CreditDiscBound * 2 of
2428                    true  -> CreditDiscBound * 2;
2429                    false -> A2BChunk
2430                end,
2431                Funs = case ((AvgAckIngress - AvgAckEgress) >
2432                                   (AvgIngress - AvgEgress)) of
2433                             true  -> [fun limit_ram_acks/2,
2434                                       fun push_alphas_to_betas/2];
2435                             false -> [fun push_alphas_to_betas/2,
2436                                       fun limit_ram_acks/2]
2437                         end,
2438                {Quota, State2} = lists:foldl(fun (ReduceFun, {QuotaN, StateN}) ->
2439                                                    ReduceFun(QuotaN, StateN)
2440                                              end, {A2BChunkActual, State}, Funs),
2441                {(Quota == 0) andalso (A2BChunk > A2BChunkActual), State2}
2442        end,
2443    Permitted = permitted_beta_count(State1),
2444    {NeedResumeB2D, State3} =
2445        %% If there are more messages with their queue position held in RAM,
2446        %% a.k.a. betas, in Q2 & Q3 than IoBatchSize,
2447        %% write their queue position to disk, a.k.a. push_betas_to_deltas
2448        case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3),
2449                        Permitted) of
2450            B2DChunk when B2DChunk >= IoBatchSize ->
2451                %% Same as for alphas to betas. Limit a number of messages
2452                %% to be flushed to disk at once to avoid blocking the process.
2453                B2DChunkActual = case B2DChunk > CreditDiscBound * 2 of
2454                    true -> CreditDiscBound * 2;
2455                    false -> B2DChunk
2456                end,
2457                StateBD = push_betas_to_deltas(B2DChunkActual, State1),
2458                {B2DChunk > B2DChunkActual, StateBD};
2459            _  ->
2460                {false, State1}
2461        end,
2462    %% We can be blocked by the credit flow, or limited by a batch size,
2463    %% or finished with flushing.
2464    %% If blocked by the credit flow - the credit grant will resume processing,
2465    %% if limited by a batch - the batch continuation message should be sent.
2466    %% The continuation message will be prioritised over publishes,
2467    %% but not consumptions, so the queue can make progess.
2468    Blocked = credit_flow:blocked(),
2469    case {Blocked, NeedResumeA2B orelse NeedResumeB2D} of
2470        %% Credit bump will continue paging
2471        {true, _}      -> State3;
2472        %% Finished with paging
2473        {false, false} -> State3;
2474        %% Planning next batch
2475        {false, true}  ->
2476            %% We don't want to use self-credit-flow, because it's harder to
2477            %% reason about. So the process sends a (prioritised) message to
2478            %% itself and sets a waiting_bump value to keep the message box clean
2479            maybe_bump_reduce_memory_use(State3)
2480    end;
2481%% When using lazy queues, there are no alphas, so we don't need to
2482%% call push_alphas_to_betas/2.
2483reduce_memory_use(State = #vqstate {
2484                             mode = lazy,
2485                             ram_pending_ack  = RPA,
2486                             ram_msg_count    = RamMsgCount,
2487                             target_ram_count = TargetRamCount }) ->
2488    State1 = #vqstate { q3 = Q3 } =
2489        case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of
2490            0  -> State;
2491            S1 -> {_, State2} = limit_ram_acks(S1, State),
2492                  State2
2493        end,
2494
2495    State3 =
2496        case chunk_size(?QUEUE:len(Q3),
2497                        permitted_beta_count(State1)) of
2498            0  ->
2499                State1;
2500            S2 ->
2501                push_betas_to_deltas(S2, State1)
2502        end,
2503    garbage_collect(),
2504    State3.
2505
2506maybe_bump_reduce_memory_use(State = #vqstate{ waiting_bump = true }) ->
2507    State;
2508maybe_bump_reduce_memory_use(State) ->
2509    self() ! bump_reduce_memory_use,
2510    State#vqstate{ waiting_bump = true }.
2511
2512limit_ram_acks(0, State) ->
2513    {0, ui(State)};
2514limit_ram_acks(Quota, State = #vqstate { ram_pending_ack  = RPA,
2515                                         disk_pending_ack = DPA }) ->
2516    case gb_trees:is_empty(RPA) of
2517        true ->
2518            {Quota, ui(State)};
2519        false ->
2520            {SeqId, MsgStatus, RPA1} = gb_trees:take_largest(RPA),
2521            {MsgStatus1, State1} =
2522                maybe_prepare_write_to_disk(true, false, MsgStatus, State),
2523            MsgStatus2 = m(trim_msg_status(MsgStatus1)),
2524            DPA1 = gb_trees:insert(SeqId, MsgStatus2, DPA),
2525            limit_ram_acks(Quota - 1,
2526                           stats({0, 0}, {MsgStatus, MsgStatus2}, 0,
2527                                 State1 #vqstate { ram_pending_ack  = RPA1,
2528                                                   disk_pending_ack = DPA1 }))
2529    end.
2530
2531permitted_beta_count(#vqstate { len = 0 }) ->
2532    infinity;
2533permitted_beta_count(#vqstate { mode             = lazy,
2534                                target_ram_count = TargetRamCount}) ->
2535    TargetRamCount;
2536permitted_beta_count(#vqstate { target_ram_count = 0, q3 = Q3 }) ->
2537    lists:min([?QUEUE:len(Q3), rabbit_queue_index:next_segment_boundary(0)]);
2538permitted_beta_count(#vqstate { q1               = Q1,
2539                                q4               = Q4,
2540                                target_ram_count = TargetRamCount,
2541                                len              = Len }) ->
2542    BetaDelta = Len - ?QUEUE:len(Q1) - ?QUEUE:len(Q4),
2543    lists:max([rabbit_queue_index:next_segment_boundary(0),
2544               BetaDelta - ((BetaDelta * BetaDelta) div
2545                                (BetaDelta + TargetRamCount))]).
2546
2547chunk_size(Current, Permitted)
2548  when Permitted =:= infinity orelse Permitted >= Current ->
2549    0;
2550chunk_size(Current, Permitted) ->
2551    Current - Permitted.
2552
2553fetch_from_q3(State = #vqstate { mode  = default,
2554                                 q1    = Q1,
2555                                 q2    = Q2,
2556                                 delta = #delta { count = DeltaCount },
2557                                 q3    = Q3,
2558                                 q4    = Q4 }) ->
2559    case ?QUEUE:out(Q3) of
2560        {empty, _Q3} ->
2561            {empty, State};
2562        {{value, MsgStatus}, Q3a} ->
2563            State1 = State #vqstate { q3 = Q3a },
2564            State2 = case {?QUEUE:is_empty(Q3a), 0 == DeltaCount} of
2565                         {true, true} ->
2566                             %% q3 is now empty, it wasn't before;
2567                             %% delta is still empty. So q2 must be
2568                             %% empty, and we know q4 is empty
2569                             %% otherwise we wouldn't be loading from
2570                             %% q3. As such, we can just set q4 to Q1.
2571                             true = ?QUEUE:is_empty(Q2), %% ASSERTION
2572                             true = ?QUEUE:is_empty(Q4), %% ASSERTION
2573                             State1 #vqstate { q1 = ?QUEUE:new(), q4 = Q1 };
2574                         {true, false} ->
2575                             maybe_deltas_to_betas(State1);
2576                         {false, _} ->
2577                             %% q3 still isn't empty, we've not
2578                             %% touched delta, so the invariants
2579                             %% between q1, q2, delta and q3 are
2580                             %% maintained
2581                             State1
2582                     end,
2583            {loaded, {MsgStatus, State2}}
2584    end;
2585%% lazy queues
2586fetch_from_q3(State = #vqstate { mode  = lazy,
2587                                 delta = #delta { count = DeltaCount },
2588                                 q3    = Q3 }) ->
2589    case ?QUEUE:out(Q3) of
2590        {empty, _Q3} when DeltaCount =:= 0 ->
2591            {empty, State};
2592        {empty, _Q3} ->
2593            fetch_from_q3(maybe_deltas_to_betas(State));
2594        {{value, MsgStatus}, Q3a} ->
2595            State1 = State #vqstate { q3 = Q3a },
2596            {loaded, {MsgStatus, State1}}
2597    end.
2598
2599maybe_deltas_to_betas(State) ->
2600    AfterFun = process_delivers_and_acks_fun(deliver_and_ack),
2601    maybe_deltas_to_betas(AfterFun, State).
2602
2603maybe_deltas_to_betas(_DelsAndAcksFun,
2604                      State = #vqstate {delta = ?BLANK_DELTA_PATTERN(X) }) ->
2605    State;
2606maybe_deltas_to_betas(DelsAndAcksFun,
2607                      State = #vqstate {
2608                        q2                   = Q2,
2609                        delta                = Delta,
2610                        q3                   = Q3,
2611                        index_state          = IndexState,
2612                        ram_msg_count        = RamMsgCount,
2613                        ram_bytes            = RamBytes,
2614                        disk_read_count      = DiskReadCount,
2615                        delta_transient_bytes = DeltaTransientBytes,
2616                        transient_threshold  = TransientThreshold }) ->
2617    #delta { start_seq_id = DeltaSeqId,
2618             count        = DeltaCount,
2619             transient    = Transient,
2620             end_seq_id   = DeltaSeqIdEnd } = Delta,
2621    DeltaSeqId1 =
2622        lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId),
2623                   DeltaSeqIdEnd]),
2624    {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1,
2625                                                  IndexState),
2626    {Q3a, RamCountsInc, RamBytesInc, State1, TransientCount, TransientBytes} =
2627        betas_from_index_entries(List, TransientThreshold,
2628                                 DelsAndAcksFun,
2629                                 State #vqstate { index_state = IndexState1 }),
2630    State2 = State1 #vqstate { ram_msg_count     = RamMsgCount   + RamCountsInc,
2631                               ram_bytes         = RamBytes      + RamBytesInc,
2632                               disk_read_count   = DiskReadCount + RamCountsInc },
2633    case ?QUEUE:len(Q3a) of
2634        0 ->
2635            %% we ignored every message in the segment due to it being
2636            %% transient and below the threshold
2637            maybe_deltas_to_betas(
2638              DelsAndAcksFun,
2639              State2 #vqstate {
2640                delta = d(Delta #delta { start_seq_id = DeltaSeqId1 })});
2641        Q3aLen ->
2642            Q3b = ?QUEUE:join(Q3, Q3a),
2643            case DeltaCount - Q3aLen of
2644                0 ->
2645                    %% delta is now empty, but it wasn't before, so
2646                    %% can now join q2 onto q3
2647                    State2 #vqstate { q2    = ?QUEUE:new(),
2648                                      delta = ?BLANK_DELTA,
2649                                      q3    = ?QUEUE:join(Q3b, Q2),
2650                                      delta_transient_bytes = 0};
2651                N when N > 0 ->
2652                    Delta1 = d(#delta { start_seq_id = DeltaSeqId1,
2653                                        count        = N,
2654                                        transient    = Transient - TransientCount,
2655                                        end_seq_id   = DeltaSeqIdEnd }),
2656                    State2 #vqstate { delta = Delta1,
2657                                      q3    = Q3b,
2658                                      delta_transient_bytes = DeltaTransientBytes - TransientBytes }
2659            end
2660    end.
2661
2662push_alphas_to_betas(Quota, State) ->
2663    {Quota1, State1} =
2664        push_alphas_to_betas(
2665          fun ?QUEUE:out/1,
2666          fun (MsgStatus, Q1a,
2667               State0 = #vqstate { q3 = Q3, delta = #delta { count = 0,
2668                                                             transient = 0 } }) ->
2669                  State0 #vqstate { q1 = Q1a, q3 = ?QUEUE:in(MsgStatus, Q3) };
2670              (MsgStatus, Q1a, State0 = #vqstate { q2 = Q2 }) ->
2671                  State0 #vqstate { q1 = Q1a, q2 = ?QUEUE:in(MsgStatus, Q2) }
2672          end, Quota, State #vqstate.q1, State),
2673    {Quota2, State2} =
2674        push_alphas_to_betas(
2675          fun ?QUEUE:out_r/1,
2676          fun (MsgStatus, Q4a, State0 = #vqstate { q3 = Q3 }) ->
2677                  State0 #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3), q4 = Q4a }
2678          end, Quota1, State1 #vqstate.q4, State1),
2679    {Quota2, State2}.
2680
2681push_alphas_to_betas(_Generator, _Consumer, Quota, _Q,
2682                     State = #vqstate { ram_msg_count    = RamMsgCount,
2683                                        target_ram_count = TargetRamCount })
2684  when Quota =:= 0 orelse
2685       TargetRamCount =:= infinity orelse
2686       TargetRamCount >= RamMsgCount ->
2687    {Quota, ui(State)};
2688push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
2689    %% We consume credits from the message_store whenever we need to
2690    %% persist a message to disk. See:
2691    %% rabbit_variable_queue:msg_store_write/4. So perhaps the
2692    %% msg_store is trying to throttle down our queue.
2693    case credit_flow:blocked() of
2694        true  -> {Quota, ui(State)};
2695        false -> case Generator(Q) of
2696                     {empty, _Q} ->
2697                         {Quota, ui(State)};
2698                     {{value, MsgStatus}, Qa} ->
2699                         {MsgStatus1, State1} =
2700                             maybe_prepare_write_to_disk(true, false, MsgStatus,
2701                                                         State),
2702                         MsgStatus2 = m(trim_msg_status(MsgStatus1)),
2703                         State2 = stats(
2704                                    ready0, {MsgStatus, MsgStatus2}, 0, State1),
2705                         State3 = Consumer(MsgStatus2, Qa, State2),
2706                         push_alphas_to_betas(Generator, Consumer, Quota - 1,
2707                                              Qa, State3)
2708                 end
2709    end.
2710
2711push_betas_to_deltas(Quota, State = #vqstate { mode  = default,
2712                                               q2    = Q2,
2713                                               delta = Delta,
2714                                               q3    = Q3}) ->
2715    PushState = {Quota, Delta, State},
2716    {Q3a, PushState1} = push_betas_to_deltas(
2717                          fun ?QUEUE:out_r/1,
2718                          fun rabbit_queue_index:next_segment_boundary/1,
2719                          Q3, PushState),
2720    {Q2a, PushState2} = push_betas_to_deltas(
2721                          fun ?QUEUE:out/1,
2722                          fun (Q2MinSeqId) -> Q2MinSeqId end,
2723                          Q2, PushState1),
2724    {_, Delta1, State1} = PushState2,
2725    State1 #vqstate { q2    = Q2a,
2726                      delta = Delta1,
2727                      q3    = Q3a };
2728%% In the case of lazy queues we want to page as many messages as
2729%% possible from q3.
2730push_betas_to_deltas(Quota, State = #vqstate { mode  = lazy,
2731                                               delta = Delta,
2732                                               q3    = Q3}) ->
2733    PushState = {Quota, Delta, State},
2734    {Q3a, PushState1} = push_betas_to_deltas(
2735                          fun ?QUEUE:out_r/1,
2736                          fun (Q2MinSeqId) -> Q2MinSeqId end,
2737                          Q3, PushState),
2738    {_, Delta1, State1} = PushState1,
2739    State1 #vqstate { delta = Delta1,
2740                      q3    = Q3a }.
2741
2742
2743push_betas_to_deltas(Generator, LimitFun, Q, PushState) ->
2744    case ?QUEUE:is_empty(Q) of
2745        true ->
2746            {Q, PushState};
2747        false ->
2748            {value, #msg_status { seq_id = MinSeqId }} = ?QUEUE:peek(Q),
2749            {value, #msg_status { seq_id = MaxSeqId }} = ?QUEUE:peek_r(Q),
2750            Limit = LimitFun(MinSeqId),
2751            case MaxSeqId < Limit of
2752                true  -> {Q, PushState};
2753                false -> push_betas_to_deltas1(Generator, Limit, Q, PushState)
2754            end
2755    end.
2756
2757push_betas_to_deltas1(_Generator, _Limit, Q, {0, Delta, State}) ->
2758    {Q, {0, Delta, ui(State)}};
2759push_betas_to_deltas1(Generator, Limit, Q, {Quota, Delta, State}) ->
2760    case Generator(Q) of
2761        {empty, _Q} ->
2762            {Q, {Quota, Delta, ui(State)}};
2763        {{value, #msg_status { seq_id = SeqId }}, _Qa}
2764          when SeqId < Limit ->
2765            {Q, {Quota, Delta, ui(State)}};
2766        {{value, MsgStatus = #msg_status { seq_id = SeqId }}, Qa} ->
2767            {#msg_status { index_on_disk = true,
2768                           is_persistent = IsPersistent }, State1} =
2769                maybe_batch_write_index_to_disk(true, MsgStatus, State),
2770            State2 = stats(ready0, {MsgStatus, none}, 1, State1),
2771            Delta1 = expand_delta(SeqId, Delta, IsPersistent),
2772            push_betas_to_deltas1(Generator, Limit, Qa,
2773                                  {Quota - 1, Delta1, State2})
2774    end.
2775
2776%% Flushes queue index batch caches and updates queue index state.
2777ui(#vqstate{index_state      = IndexState,
2778            target_ram_count = TargetRamCount} = State) ->
2779    IndexState1 = rabbit_queue_index:flush_pre_publish_cache(
2780                    TargetRamCount, IndexState),
2781    State#vqstate{index_state = IndexState1}.
2782
2783%%----------------------------------------------------------------------------
2784%% Upgrading
2785%%----------------------------------------------------------------------------
2786
2787-spec multiple_routing_keys() -> 'ok'.
2788
2789multiple_routing_keys() ->
2790    transform_storage(
2791      fun ({basic_message, ExchangeName, Routing_Key, Content,
2792            MsgId, Persistent}) ->
2793              {ok, {basic_message, ExchangeName, [Routing_Key], Content,
2794                    MsgId, Persistent}};
2795          (_) -> {error, corrupt_message}
2796      end),
2797    ok.
2798
2799
2800%% Assumes message store is not running
2801transform_storage(TransformFun) ->
2802    transform_store(?PERSISTENT_MSG_STORE, TransformFun),
2803    transform_store(?TRANSIENT_MSG_STORE, TransformFun).
2804
2805transform_store(Store, TransformFun) ->
2806    rabbit_msg_store:force_recovery(rabbit_mnesia:dir(), Store),
2807    rabbit_msg_store:transform_dir(rabbit_mnesia:dir(), Store, TransformFun).
2808
2809move_messages_to_vhost_store() ->
2810    case list_persistent_queues() of
2811        []     ->
2812            log_upgrade("No durable queues found."
2813                        " Skipping message store migration"),
2814            ok;
2815        Queues ->
2816            move_messages_to_vhost_store(Queues)
2817    end,
2818    ok = delete_old_store(),
2819    ok = rabbit_queue_index:cleanup_global_recovery_terms().
2820
2821move_messages_to_vhost_store(Queues) ->
2822    log_upgrade("Moving messages to per-vhost message store"),
2823    %% Move the queue index for each persistent queue to the new store
2824    lists:foreach(
2825        fun(Queue) ->
2826            QueueName = amqqueue:get_name(Queue),
2827            rabbit_queue_index:move_to_per_vhost_stores(QueueName)
2828        end,
2829        Queues),
2830    %% Legacy (global) msg_store may require recovery.
2831    %% This upgrade step should only be started
2832    %% if we are upgrading from a pre-3.7.0 version.
2833    {QueuesWithTerms, RecoveryRefs, StartFunState} = read_old_recovery_terms(Queues),
2834
2835    OldStore = run_old_persistent_store(RecoveryRefs, StartFunState),
2836
2837    VHosts = rabbit_vhost:list_names(),
2838
2839    %% New store should not be recovered.
2840    NewMsgStore = start_new_store(VHosts),
2841    %% Recovery terms should be started for all vhosts for new store.
2842    [ok = rabbit_recovery_terms:open_table(VHost) || VHost <- VHosts],
2843
2844    MigrationBatchSize = application:get_env(rabbit, queue_migration_batch_size,
2845                                             ?QUEUE_MIGRATION_BATCH_SIZE),
2846    in_batches(MigrationBatchSize,
2847        {rabbit_variable_queue, migrate_queue, [OldStore, NewMsgStore]},
2848        QueuesWithTerms,
2849        "message_store upgrades: Migrating batch ~p of ~p queues. Out of total ~p ",
2850        "message_store upgrades: Batch ~p of ~p queues migrated ~n. ~p total left"),
2851
2852    log_upgrade("Message store migration finished"),
2853    ok = rabbit_sup:stop_child(OldStore),
2854    [ok= rabbit_recovery_terms:close_table(VHost) || VHost <- VHosts],
2855    ok = stop_new_store(NewMsgStore).
2856
2857in_batches(Size, MFA, List, MessageStart, MessageEnd) ->
2858    in_batches(Size, 1, MFA, List, MessageStart, MessageEnd).
2859
2860in_batches(_, _, _, [], _, _) -> ok;
2861in_batches(Size, BatchNum, MFA, List, MessageStart, MessageEnd) ->
2862    Length = length(List),
2863    {Batch, Tail} = case Size > Length of
2864        true  -> {List, []};
2865        false -> lists:split(Size, List)
2866    end,
2867    ProcessedLength = (BatchNum - 1) * Size,
2868    rabbit_log:info(MessageStart, [BatchNum, Size, ProcessedLength + Length]),
2869    {M, F, A} = MFA,
2870    Keys = [ rpc:async_call(node(), M, F, [El | A]) || El <- Batch ],
2871    lists:foreach(fun(Key) ->
2872        case rpc:yield(Key) of
2873            {badrpc, Err} -> throw(Err);
2874            _             -> ok
2875        end
2876    end,
2877    Keys),
2878    rabbit_log:info(MessageEnd, [BatchNum, Size, length(Tail)]),
2879    in_batches(Size, BatchNum + 1, MFA, Tail, MessageStart, MessageEnd).
2880
2881migrate_queue({QueueName = #resource{virtual_host = VHost, name = Name},
2882               RecoveryTerm},
2883              OldStore, NewStore) ->
2884    log_upgrade_verbose(
2885        "Migrating messages in queue ~s in vhost ~s to per-vhost message store",
2886        [Name, VHost]),
2887    OldStoreClient = get_global_store_client(OldStore),
2888    NewStoreClient = get_per_vhost_store_client(QueueName, NewStore),
2889    %% WARNING: During scan_queue_segments queue index state is being recovered
2890    %% and terminated. This can cause side effects!
2891    rabbit_queue_index:scan_queue_segments(
2892        %% We migrate only persistent messages which are found in message store
2893        %% and are not acked yet
2894        fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, no_ack, OldC)
2895            when is_binary(MsgId) ->
2896                migrate_message(MsgId, OldC, NewStoreClient);
2897            (_SeqId, _MsgId, _MsgProps,
2898             _IsPersistent, _IsDelivered, _IsAcked, OldC) ->
2899                OldC
2900        end,
2901        OldStoreClient,
2902        QueueName),
2903    rabbit_msg_store:client_terminate(OldStoreClient),
2904    rabbit_msg_store:client_terminate(NewStoreClient),
2905    NewClientRef = rabbit_msg_store:client_ref(NewStoreClient),
2906    case RecoveryTerm of
2907        non_clean_shutdown -> ok;
2908        Term when is_list(Term) ->
2909            NewRecoveryTerm = lists:keyreplace(persistent_ref, 1, RecoveryTerm,
2910                                               {persistent_ref, NewClientRef}),
2911            rabbit_queue_index:update_recovery_term(QueueName, NewRecoveryTerm)
2912    end,
2913    log_upgrade_verbose("Finished migrating queue ~s in vhost ~s", [Name, VHost]),
2914    {QueueName, NewClientRef}.
2915
2916migrate_message(MsgId, OldC, NewC) ->
2917    case rabbit_msg_store:read(MsgId, OldC) of
2918        {{ok, Msg}, OldC1} ->
2919            ok = rabbit_msg_store:write(MsgId, Msg, NewC),
2920            OldC1;
2921        _ -> OldC
2922    end.
2923
2924get_per_vhost_store_client(#resource{virtual_host = VHost}, NewStore) ->
2925    {VHost, StorePid} = lists:keyfind(VHost, 1, NewStore),
2926    rabbit_msg_store:client_init(StorePid, rabbit_guid:gen(),
2927                                 fun(_,_) -> ok end, fun() -> ok end).
2928
2929get_global_store_client(OldStore) ->
2930    rabbit_msg_store:client_init(OldStore,
2931                                 rabbit_guid:gen(),
2932                                 fun(_,_) -> ok end,
2933                                 fun() -> ok end).
2934
2935list_persistent_queues() ->
2936    Node = node(),
2937    mnesia:async_dirty(
2938      fun () ->
2939              qlc:e(qlc:q([Q || Q <- mnesia:table(rabbit_durable_queue),
2940                                ?amqqueue_is_classic(Q),
2941                                amqqueue:qnode(Q) == Node,
2942                                mnesia:read(rabbit_queue, amqqueue:get_name(Q), read) =:= []]))
2943      end).
2944
2945read_old_recovery_terms([]) ->
2946    {[], [], ?EMPTY_START_FUN_STATE};
2947read_old_recovery_terms(Queues) ->
2948    QueueNames = [amqqueue:get_name(Q) || Q <- Queues],
2949    {AllTerms, StartFunState} = rabbit_queue_index:read_global_recovery_terms(QueueNames),
2950    Refs = [Ref || Terms <- AllTerms,
2951                   Terms /= non_clean_shutdown,
2952                   begin
2953                       Ref = proplists:get_value(persistent_ref, Terms),
2954                       Ref =/= undefined
2955                   end],
2956    {lists:zip(QueueNames, AllTerms), Refs, StartFunState}.
2957
2958run_old_persistent_store(Refs, StartFunState) ->
2959    OldStoreName = ?PERSISTENT_MSG_STORE,
2960    ok = rabbit_sup:start_child(OldStoreName, rabbit_msg_store, start_global_store_link,
2961                                [OldStoreName, rabbit_mnesia:dir(),
2962                                 Refs, StartFunState]),
2963    OldStoreName.
2964
2965start_new_store(VHosts) ->
2966    %% Ensure vhost supervisor is started, so we can add vhosts to it.
2967    lists:map(fun(VHost) ->
2968        VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
2969        {ok, Pid} = rabbit_msg_store:start_link(?PERSISTENT_MSG_STORE,
2970                                                VHostDir,
2971                                                undefined,
2972                                                ?EMPTY_START_FUN_STATE),
2973        {VHost, Pid}
2974    end,
2975    VHosts).
2976
2977stop_new_store(NewStore) ->
2978    lists:foreach(fun({_VHost, StorePid}) ->
2979        unlink(StorePid),
2980        exit(StorePid, shutdown)
2981    end,
2982    NewStore),
2983    ok.
2984
2985delete_old_store() ->
2986    log_upgrade("Removing the old message store data"),
2987    rabbit_file:recursive_delete(
2988        [filename:join([rabbit_mnesia:dir(), ?PERSISTENT_MSG_STORE])]),
2989    %% Delete old transient store as well
2990    rabbit_file:recursive_delete(
2991        [filename:join([rabbit_mnesia:dir(), ?TRANSIENT_MSG_STORE])]),
2992    ok.
2993
2994log_upgrade(Msg) ->
2995    log_upgrade(Msg, []).
2996
2997log_upgrade(Msg, Args) ->
2998    rabbit_log:info("message_store upgrades: " ++ Msg, Args).
2999
3000log_upgrade_verbose(Msg) ->
3001    log_upgrade_verbose(Msg, []).
3002
3003log_upgrade_verbose(Msg, Args) ->
3004    rabbit_log_upgrade:info(Msg, Args).
3005
3006maybe_client_terminate(MSCStateP) ->
3007    %% Queue might have been asked to stop by the supervisor, it needs a clean
3008    %% shutdown in order for the supervising strategy to work - if it reaches max
3009    %% restarts might bring the vhost down.
3010    try
3011        rabbit_msg_store:client_terminate(MSCStateP)
3012    catch
3013        _:_ ->
3014            ok
3015    end.
3016