1%% This Source Code Form is subject to the terms of the Mozilla Public
2%% License, v. 2.0. If a copy of the MPL was not distributed with this
3%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4%%
5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates.  All rights reserved.
6%%
7
8-module(rabbit_backing_queue).
9
10-export([info_keys/0]).
11
12-define(INFO_KEYS, [messages_ram, messages_ready_ram,
13                    messages_unacknowledged_ram, messages_persistent,
14                    message_bytes, message_bytes_ready,
15                    message_bytes_unacknowledged, message_bytes_ram,
16                    message_bytes_persistent, head_message_timestamp,
17                    disk_reads, disk_writes, backing_queue_status,
18                    messages_paged_out, message_bytes_paged_out]).
19
20%% We can't specify a per-queue ack/state with callback signatures
21-type ack()   :: any().
22-type state() :: any().
23
24-type flow() :: 'flow' | 'noflow'.
25-type msg_ids() :: [rabbit_types:msg_id()].
26-type publish() :: {rabbit_types:basic_message(),
27                    rabbit_types:message_properties(), boolean()}.
28-type delivered_publish() :: {rabbit_types:basic_message(),
29                              rabbit_types:message_properties()}.
30-type fetch_result(Ack) ::
31        ('empty' | {rabbit_types:basic_message(), boolean(), Ack}).
32-type drop_result(Ack) ::
33        ('empty' | {rabbit_types:msg_id(), Ack}).
34-type recovery_terms() :: [term()] | 'non_clean_shutdown'.
35-type recovery_info() :: 'new' | recovery_terms().
36-type purged_msg_count() :: non_neg_integer().
37-type async_callback() ::
38        fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok').
39-type duration() :: ('undefined' | 'infinity' | number()).
40
41-type msg_fun(A) :: fun ((rabbit_types:basic_message(), ack(), A) -> A).
42-type msg_pred() :: fun ((rabbit_types:message_properties()) -> boolean()).
43
44-type queue_mode() :: atom().
45
46%% Called on startup with a vhost and a list of durable queue names on this vhost.
47%% The queues aren't being started at this point, but this call allows the
48%% backing queue to perform any checking necessary for the consistency
49%% of those queues, or initialise any other shared resources.
50%%
51%% The list of queue recovery terms returned as {ok, Terms} must be given
52%% in the same order as the list of queue names supplied.
53-callback start(rabbit_types:vhost(), [rabbit_amqqueue:name()]) -> rabbit_types:ok(recovery_terms()).
54
55%% Called to tear down any state/resources for vhost. NB: Implementations should
56%% not depend on this function being called on shutdown and instead
57%% should hook into the rabbit supervision hierarchy.
58-callback stop(rabbit_types:vhost()) -> 'ok'.
59
60%% Initialise the backing queue and its state.
61%%
62%% Takes
63%% 1. the amqqueue record
64%% 2. a term indicating whether the queue is an existing queue that
65%%    should be recovered or not. When 'new' is given, no recovery is
66%%    taking place, otherwise a list of recovery terms is given, or
67%%    the atom 'non_clean_shutdown' if no recovery terms are available.
68%% 3. an asynchronous callback which accepts a function of type
69%%    backing-queue-state to backing-queue-state. This callback
70%%    function can be safely invoked from any process, which makes it
71%%    useful for passing messages back into the backing queue,
72%%    especially as the backing queue does not have control of its own
73%%    mailbox.
74-callback init(amqqueue:amqqueue(), recovery_info(),
75               async_callback()) -> state().
76
77%% Called on queue shutdown when queue isn't being deleted.
78-callback terminate(any(), state()) -> state().
79
80%% Called when the queue is terminating and needs to delete all its
81%% content.
82-callback delete_and_terminate(any(), state()) -> state().
83
84%% Called to clean up after a crashed queue. In this case we don't
85%% have a process and thus a state(), we are just removing on-disk data.
86-callback delete_crashed(amqqueue:amqqueue()) -> 'ok'.
87
88%% Remove all 'fetchable' messages from the queue, i.e. all messages
89%% except those that have been fetched already and are pending acks.
90-callback purge(state()) -> {purged_msg_count(), state()}.
91
92%% Remove all messages in the queue which have been fetched and are
93%% pending acks.
94-callback purge_acks(state()) -> state().
95
96%% Publish a message.
97-callback publish(rabbit_types:basic_message(),
98                  rabbit_types:message_properties(), boolean(), pid(), flow(),
99                  state()) -> state().
100
101%% Like publish/6 but for batches of publishes.
102-callback batch_publish([publish()], pid(), flow(), state()) -> state().
103
104%% Called for messages which have already been passed straight
105%% out to a client. The queue will be empty for these calls
106%% (i.e. saves the round trip through the backing queue).
107-callback publish_delivered(rabbit_types:basic_message(),
108                            rabbit_types:message_properties(), pid(), flow(),
109                            state())
110                           -> {ack(), state()}.
111
112%% Like publish_delivered/5 but for batches of publishes.
113-callback batch_publish_delivered([delivered_publish()], pid(), flow(),
114                                  state())
115                                 -> {[ack()], state()}.
116
117%% Called to inform the BQ about messages which have reached the
118%% queue, but are not going to be further passed to BQ.
119-callback discard(rabbit_types:msg_id(), pid(), flow(), state()) -> state().
120
121%% Return ids of messages which have been confirmed since the last
122%% invocation of this function (or initialisation).
123%%
124%% Message ids should only appear in the result of drain_confirmed
125%% under the following circumstances:
126%%
127%% 1. The message appears in a call to publish_delivered/4 and the
128%%    first argument (ack_required) is false; or
129%% 2. The message is fetched from the queue with fetch/2 and the first
130%%    argument (ack_required) is false; or
131%% 3. The message is acked (ack/2 is called for the message); or
132%% 4. The message is fully fsync'd to disk in such a way that the
133%%    recovery of the message is guaranteed in the event of a crash of
134%%    this rabbit node (excluding hardware failure).
135%%
136%% In addition to the above conditions, a message id may only appear
137%% in the result of drain_confirmed if
138%% #message_properties.needs_confirming = true when the msg was
139%% published (through whichever means) to the backing queue.
140%%
141%% It is legal for the same message id to appear in the results of
142%% multiple calls to drain_confirmed, which means that the backing
143%% queue is not required to keep track of which messages it has
144%% already confirmed. The confirm will be issued to the publisher the
145%% first time the message id appears in the result of
146%% drain_confirmed. All subsequent appearances of that message id will
147%% be ignored.
148-callback drain_confirmed(state()) -> {msg_ids(), state()}.
149
150%% Drop messages from the head of the queue while the supplied
151%% predicate on message properties returns true. Returns the first
152%% message properties for which the predicate returned false, or
153%% 'undefined' if the whole backing queue was traversed w/o the
154%% predicate ever returning false.
155-callback dropwhile(msg_pred(), state())
156                   -> {rabbit_types:message_properties() | undefined, state()}.
157
158%% Like dropwhile, except messages are fetched in "require
159%% acknowledgement" mode and are passed, together with their ack tag,
160%% to the supplied function. The function is also fed an
161%% accumulator. The result of fetchwhile is as for dropwhile plus the
162%% accumulator.
163-callback fetchwhile(msg_pred(), msg_fun(A), A, state())
164                     -> {rabbit_types:message_properties() | undefined,
165                         A, state()}.
166
167%% Produce the next message.
168-callback fetch(true,  state()) -> {fetch_result(ack()), state()};
169               (false, state()) -> {fetch_result(undefined), state()}.
170
171%% Remove the next message.
172-callback drop(true,  state()) -> {drop_result(ack()), state()};
173              (false, state()) -> {drop_result(undefined), state()}.
174
175%% Acktags supplied are for messages which can now be forgotten
176%% about. Must return 1 msg_id per Ack, in the same order as Acks.
177-callback ack([ack()], state()) -> {msg_ids(), state()}.
178
179%% Reinsert messages into the queue which have already been delivered
180%% and were pending acknowledgement.
181-callback requeue([ack()], state()) -> {msg_ids(), state()}.
182
183%% Fold over messages by ack tag. The supplied function is called with
184%% each message, its ack tag, and an accumulator.
185-callback ackfold(msg_fun(A), A, state(), [ack()]) -> {A, state()}.
186
187%% Fold over all the messages in a queue and return the accumulated
188%% results, leaving the queue undisturbed.
189-callback fold(fun((rabbit_types:basic_message(),
190                    rabbit_types:message_properties(),
191                    boolean(), A) -> {('stop' | 'cont'), A}),
192               A, state()) -> {A, state()}.
193
194%% How long is my queue?
195-callback len(state()) -> non_neg_integer().
196
197%% Is my queue empty?
198-callback is_empty(state()) -> boolean().
199
200%% What's the queue depth, where depth = length + number of pending acks
201-callback depth(state()) -> non_neg_integer().
202
203%% For the next three functions, the assumption is that you're
204%% monitoring something like the ingress and egress rates of the
205%% queue. The RAM duration is thus the length of time represented by
206%% the messages held in RAM given the current rates. If you want to
207%% ignore all of this stuff, then do so, and return 0 in
208%% ram_duration/1.
209
210%% The target is to have no more messages in RAM than indicated by the
211%% duration and the current queue rates.
212-callback set_ram_duration_target(duration(), state()) -> state().
213
214%% Optionally recalculate the duration internally (likely to be just
215%% update your internal rates), and report how many seconds the
216%% messages in RAM represent given the current rates of the queue.
217-callback ram_duration(state()) -> {duration(), state()}.
218
219%% Should 'timeout' be called as soon as the queue process can manage
220%% (either on an empty mailbox, or when a timer fires)?
221-callback needs_timeout(state()) -> 'false' | 'timed' | 'idle'.
222
223%% Called (eventually) after needs_timeout returns 'idle' or 'timed'.
224%% Note this may be called more than once for each 'idle' or 'timed'
225%% returned from needs_timeout
226-callback timeout(state()) -> state().
227
228%% Called immediately before the queue hibernates.
229-callback handle_pre_hibernate(state()) -> state().
230
231%% Called when more credit has become available for credit_flow.
232-callback resume(state()) -> state().
233
234%% Used to help prioritisation in rabbit_amqqueue_process. The rate of
235%% inbound messages and outbound messages at the moment.
236-callback msg_rates(state()) -> {float(), float()}.
237
238-callback info(atom(), state()) -> any().
239
240%% Passed a function to be invoked with the relevant backing queue's
241%% state. Useful for when the backing queue or other components need
242%% to pass functions into the backing queue.
243-callback invoke(atom(), fun ((atom(), A) -> A), state()) -> state().
244
245%% Called prior to a publish or publish_delivered call. Allows the BQ
246%% to signal that it's already seen this message, (e.g. it was published
247%% or discarded previously) specifying whether to drop the message or reject it.
248-callback is_duplicate(rabbit_types:basic_message(), state())
249                      -> {{true, drop} | {true, reject} | boolean(), state()}.
250
251-callback set_queue_mode(queue_mode(), state()) -> state().
252
253-callback zip_msgs_and_acks([delivered_publish()],
254                            [ack()], Acc, state())
255                           -> Acc.
256
257%% Called when rabbit_amqqueue_process receives a message via
258%% handle_info and it should be processed by the backing
259%% queue
260-callback handle_info(term(), state()) -> state().
261
262-spec info_keys() -> rabbit_types:info_keys().
263
264info_keys() -> ?INFO_KEYS.
265