1%% This Source Code Form is subject to the terms of the Mozilla Public 2%% License, v. 2.0. If a copy of the MPL was not distributed with this 3%% file, You can obtain one at https://mozilla.org/MPL/2.0/. 4%% 5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved. 6%% 7 8-module(rabbit_backing_queue). 9 10-export([info_keys/0]). 11 12-define(INFO_KEYS, [messages_ram, messages_ready_ram, 13 messages_unacknowledged_ram, messages_persistent, 14 message_bytes, message_bytes_ready, 15 message_bytes_unacknowledged, message_bytes_ram, 16 message_bytes_persistent, head_message_timestamp, 17 disk_reads, disk_writes, backing_queue_status, 18 messages_paged_out, message_bytes_paged_out]). 19 20%% We can't specify a per-queue ack/state with callback signatures 21-type ack() :: any(). 22-type state() :: any(). 23 24-type flow() :: 'flow' | 'noflow'. 25-type msg_ids() :: [rabbit_types:msg_id()]. 26-type publish() :: {rabbit_types:basic_message(), 27 rabbit_types:message_properties(), boolean()}. 28-type delivered_publish() :: {rabbit_types:basic_message(), 29 rabbit_types:message_properties()}. 30-type fetch_result(Ack) :: 31 ('empty' | {rabbit_types:basic_message(), boolean(), Ack}). 32-type drop_result(Ack) :: 33 ('empty' | {rabbit_types:msg_id(), Ack}). 34-type recovery_terms() :: [term()] | 'non_clean_shutdown'. 35-type recovery_info() :: 'new' | recovery_terms(). 36-type purged_msg_count() :: non_neg_integer(). 37-type async_callback() :: 38 fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok'). 39-type duration() :: ('undefined' | 'infinity' | number()). 40 41-type msg_fun(A) :: fun ((rabbit_types:basic_message(), ack(), A) -> A). 42-type msg_pred() :: fun ((rabbit_types:message_properties()) -> boolean()). 43 44-type queue_mode() :: atom(). 45 46%% Called on startup with a vhost and a list of durable queue names on this vhost. 47%% The queues aren't being started at this point, but this call allows the 48%% backing queue to perform any checking necessary for the consistency 49%% of those queues, or initialise any other shared resources. 50%% 51%% The list of queue recovery terms returned as {ok, Terms} must be given 52%% in the same order as the list of queue names supplied. 53-callback start(rabbit_types:vhost(), [rabbit_amqqueue:name()]) -> rabbit_types:ok(recovery_terms()). 54 55%% Called to tear down any state/resources for vhost. NB: Implementations should 56%% not depend on this function being called on shutdown and instead 57%% should hook into the rabbit supervision hierarchy. 58-callback stop(rabbit_types:vhost()) -> 'ok'. 59 60%% Initialise the backing queue and its state. 61%% 62%% Takes 63%% 1. the amqqueue record 64%% 2. a term indicating whether the queue is an existing queue that 65%% should be recovered or not. When 'new' is given, no recovery is 66%% taking place, otherwise a list of recovery terms is given, or 67%% the atom 'non_clean_shutdown' if no recovery terms are available. 68%% 3. an asynchronous callback which accepts a function of type 69%% backing-queue-state to backing-queue-state. This callback 70%% function can be safely invoked from any process, which makes it 71%% useful for passing messages back into the backing queue, 72%% especially as the backing queue does not have control of its own 73%% mailbox. 74-callback init(amqqueue:amqqueue(), recovery_info(), 75 async_callback()) -> state(). 76 77%% Called on queue shutdown when queue isn't being deleted. 78-callback terminate(any(), state()) -> state(). 79 80%% Called when the queue is terminating and needs to delete all its 81%% content. 82-callback delete_and_terminate(any(), state()) -> state(). 83 84%% Called to clean up after a crashed queue. In this case we don't 85%% have a process and thus a state(), we are just removing on-disk data. 86-callback delete_crashed(amqqueue:amqqueue()) -> 'ok'. 87 88%% Remove all 'fetchable' messages from the queue, i.e. all messages 89%% except those that have been fetched already and are pending acks. 90-callback purge(state()) -> {purged_msg_count(), state()}. 91 92%% Remove all messages in the queue which have been fetched and are 93%% pending acks. 94-callback purge_acks(state()) -> state(). 95 96%% Publish a message. 97-callback publish(rabbit_types:basic_message(), 98 rabbit_types:message_properties(), boolean(), pid(), flow(), 99 state()) -> state(). 100 101%% Like publish/6 but for batches of publishes. 102-callback batch_publish([publish()], pid(), flow(), state()) -> state(). 103 104%% Called for messages which have already been passed straight 105%% out to a client. The queue will be empty for these calls 106%% (i.e. saves the round trip through the backing queue). 107-callback publish_delivered(rabbit_types:basic_message(), 108 rabbit_types:message_properties(), pid(), flow(), 109 state()) 110 -> {ack(), state()}. 111 112%% Like publish_delivered/5 but for batches of publishes. 113-callback batch_publish_delivered([delivered_publish()], pid(), flow(), 114 state()) 115 -> {[ack()], state()}. 116 117%% Called to inform the BQ about messages which have reached the 118%% queue, but are not going to be further passed to BQ. 119-callback discard(rabbit_types:msg_id(), pid(), flow(), state()) -> state(). 120 121%% Return ids of messages which have been confirmed since the last 122%% invocation of this function (or initialisation). 123%% 124%% Message ids should only appear in the result of drain_confirmed 125%% under the following circumstances: 126%% 127%% 1. The message appears in a call to publish_delivered/4 and the 128%% first argument (ack_required) is false; or 129%% 2. The message is fetched from the queue with fetch/2 and the first 130%% argument (ack_required) is false; or 131%% 3. The message is acked (ack/2 is called for the message); or 132%% 4. The message is fully fsync'd to disk in such a way that the 133%% recovery of the message is guaranteed in the event of a crash of 134%% this rabbit node (excluding hardware failure). 135%% 136%% In addition to the above conditions, a message id may only appear 137%% in the result of drain_confirmed if 138%% #message_properties.needs_confirming = true when the msg was 139%% published (through whichever means) to the backing queue. 140%% 141%% It is legal for the same message id to appear in the results of 142%% multiple calls to drain_confirmed, which means that the backing 143%% queue is not required to keep track of which messages it has 144%% already confirmed. The confirm will be issued to the publisher the 145%% first time the message id appears in the result of 146%% drain_confirmed. All subsequent appearances of that message id will 147%% be ignored. 148-callback drain_confirmed(state()) -> {msg_ids(), state()}. 149 150%% Drop messages from the head of the queue while the supplied 151%% predicate on message properties returns true. Returns the first 152%% message properties for which the predicate returned false, or 153%% 'undefined' if the whole backing queue was traversed w/o the 154%% predicate ever returning false. 155-callback dropwhile(msg_pred(), state()) 156 -> {rabbit_types:message_properties() | undefined, state()}. 157 158%% Like dropwhile, except messages are fetched in "require 159%% acknowledgement" mode and are passed, together with their ack tag, 160%% to the supplied function. The function is also fed an 161%% accumulator. The result of fetchwhile is as for dropwhile plus the 162%% accumulator. 163-callback fetchwhile(msg_pred(), msg_fun(A), A, state()) 164 -> {rabbit_types:message_properties() | undefined, 165 A, state()}. 166 167%% Produce the next message. 168-callback fetch(true, state()) -> {fetch_result(ack()), state()}; 169 (false, state()) -> {fetch_result(undefined), state()}. 170 171%% Remove the next message. 172-callback drop(true, state()) -> {drop_result(ack()), state()}; 173 (false, state()) -> {drop_result(undefined), state()}. 174 175%% Acktags supplied are for messages which can now be forgotten 176%% about. Must return 1 msg_id per Ack, in the same order as Acks. 177-callback ack([ack()], state()) -> {msg_ids(), state()}. 178 179%% Reinsert messages into the queue which have already been delivered 180%% and were pending acknowledgement. 181-callback requeue([ack()], state()) -> {msg_ids(), state()}. 182 183%% Fold over messages by ack tag. The supplied function is called with 184%% each message, its ack tag, and an accumulator. 185-callback ackfold(msg_fun(A), A, state(), [ack()]) -> {A, state()}. 186 187%% Fold over all the messages in a queue and return the accumulated 188%% results, leaving the queue undisturbed. 189-callback fold(fun((rabbit_types:basic_message(), 190 rabbit_types:message_properties(), 191 boolean(), A) -> {('stop' | 'cont'), A}), 192 A, state()) -> {A, state()}. 193 194%% How long is my queue? 195-callback len(state()) -> non_neg_integer(). 196 197%% Is my queue empty? 198-callback is_empty(state()) -> boolean(). 199 200%% What's the queue depth, where depth = length + number of pending acks 201-callback depth(state()) -> non_neg_integer(). 202 203%% For the next three functions, the assumption is that you're 204%% monitoring something like the ingress and egress rates of the 205%% queue. The RAM duration is thus the length of time represented by 206%% the messages held in RAM given the current rates. If you want to 207%% ignore all of this stuff, then do so, and return 0 in 208%% ram_duration/1. 209 210%% The target is to have no more messages in RAM than indicated by the 211%% duration and the current queue rates. 212-callback set_ram_duration_target(duration(), state()) -> state(). 213 214%% Optionally recalculate the duration internally (likely to be just 215%% update your internal rates), and report how many seconds the 216%% messages in RAM represent given the current rates of the queue. 217-callback ram_duration(state()) -> {duration(), state()}. 218 219%% Should 'timeout' be called as soon as the queue process can manage 220%% (either on an empty mailbox, or when a timer fires)? 221-callback needs_timeout(state()) -> 'false' | 'timed' | 'idle'. 222 223%% Called (eventually) after needs_timeout returns 'idle' or 'timed'. 224%% Note this may be called more than once for each 'idle' or 'timed' 225%% returned from needs_timeout 226-callback timeout(state()) -> state(). 227 228%% Called immediately before the queue hibernates. 229-callback handle_pre_hibernate(state()) -> state(). 230 231%% Called when more credit has become available for credit_flow. 232-callback resume(state()) -> state(). 233 234%% Used to help prioritisation in rabbit_amqqueue_process. The rate of 235%% inbound messages and outbound messages at the moment. 236-callback msg_rates(state()) -> {float(), float()}. 237 238-callback info(atom(), state()) -> any(). 239 240%% Passed a function to be invoked with the relevant backing queue's 241%% state. Useful for when the backing queue or other components need 242%% to pass functions into the backing queue. 243-callback invoke(atom(), fun ((atom(), A) -> A), state()) -> state(). 244 245%% Called prior to a publish or publish_delivered call. Allows the BQ 246%% to signal that it's already seen this message, (e.g. it was published 247%% or discarded previously) specifying whether to drop the message or reject it. 248-callback is_duplicate(rabbit_types:basic_message(), state()) 249 -> {{true, drop} | {true, reject} | boolean(), state()}. 250 251-callback set_queue_mode(queue_mode(), state()) -> state(). 252 253-callback zip_msgs_and_acks([delivered_publish()], 254 [ack()], Acc, state()) 255 -> Acc. 256 257%% Called when rabbit_amqqueue_process receives a message via 258%% handle_info and it should be processed by the backing 259%% queue 260-callback handle_info(term(), state()) -> state(). 261 262-spec info_keys() -> rabbit_types:info_keys(). 263 264info_keys() -> ?INFO_KEYS. 265