1
2-type option(T) :: undefined | T.
3
4-type raw_msg() :: term().
5%% The raw message. It is opaque to rabbit_fifo.
6
7-type msg_in_id() :: non_neg_integer().
8% a queue scoped monotonically incrementing integer used to enforce order
9% in the unassigned messages map
10
11-type msg_id() :: non_neg_integer().
12%% A consumer-scoped monotonically incrementing integer included with a
13%% {@link delivery/0.}. Used to settle deliveries using
14%% {@link rabbit_fifo_client:settle/3.}
15
16-type msg_seqno() :: non_neg_integer().
17%% A sender process scoped monotonically incrementing integer included
18%% in enqueue messages. Used to ensure ordering of messages send from the
19%% same process
20
21-type msg_header() :: msg_size() |
22                      #{size := msg_size(),
23                        delivery_count => non_neg_integer()}.
24%% The message header:
25%% delivery_count: the number of unsuccessful delivery attempts.
26%%                 A non-zero value indicates a previous attempt.
27%% If it only contains the size it can be condensed to an integer only
28
29-type msg() :: {msg_header(), raw_msg()}.
30%% message with a header map.
31
32-type msg_size() :: non_neg_integer().
33%% the size in bytes of the msg payload
34
35-type indexed_msg() :: {ra:index(), msg()}.
36
37-type prefix_msg() :: {'$prefix_msg', msg_header()}.
38
39-type delivery_msg() :: {msg_id(), msg()}.
40%% A tuple consisting of the message id and the headered message.
41
42-type consumer_tag() :: binary().
43%% An arbitrary binary tag used to distinguish between different consumers
44%% set up by the same process. See: {@link rabbit_fifo_client:checkout/3.}
45
46-type delivery() :: {delivery, consumer_tag(), [delivery_msg()]}.
47%% Represents the delivery of one or more rabbit_fifo messages.
48
49-type consumer_id() :: {consumer_tag(), pid()}.
50%% The entity that receives messages. Uniquely identifies a consumer.
51
52-type credit_mode() :: simple_prefetch | credited.
53%% determines how credit is replenished
54
55-type checkout_spec() :: {once | auto, Num :: non_neg_integer(),
56                          credit_mode()} |
57                         {dequeue, settled | unsettled} |
58                         cancel.
59
60-type consumer_meta() :: #{ack => boolean(),
61                           username => binary(),
62                           prefetch => non_neg_integer(),
63                           args => list()}.
64%% static meta data associated with a consumer
65
66
67-type applied_mfa() :: {module(), atom(), list()}.
68% represents a partially applied module call
69
70-define(RELEASE_CURSOR_EVERY, 64000).
71-define(RELEASE_CURSOR_EVERY_MAX, 3200000).
72-define(USE_AVG_HALF_LIFE, 10000.0).
73%% an average QQ without any message uses about 100KB so setting this limit
74%% to ~10 times that should be relatively safe.
75-define(GC_MEM_LIMIT_B, 2000000).
76
77-define(MB, 1048576).
78-define(STATE, rabbit_fifo).
79
80-record(consumer,
81        {meta = #{} :: consumer_meta(),
82         checked_out = #{} :: #{msg_id() => {msg_in_id(), indexed_msg()}},
83         next_msg_id = 0 :: msg_id(), % part of snapshot data
84         %% max number of messages that can be sent
85         %% decremented for each delivery
86         credit = 0 : non_neg_integer(),
87         %% total number of checked out messages - ever
88         %% incremented for each delivery
89         delivery_count = 0 :: non_neg_integer(),
90         %% the mode of how credit is incremented
91         %% simple_prefetch: credit is re-filled as deliveries are settled
92         %% or returned.
93         %% credited: credit can only be changed by receiving a consumer_credit
94         %% command: `{consumer_credit, ReceiverDeliveryCount, Credit}'
95         credit_mode = simple_prefetch :: credit_mode(), % part of snapshot data
96         lifetime = once :: once | auto,
97         status = up :: up | suspected_down | cancelled
98        }).
99
100-type consumer() :: #consumer{}.
101
102-type consumer_strategy() :: competing | single_active.
103
104-record(enqueuer,
105        {next_seqno = 1 :: msg_seqno(),
106         % out of order enqueues - sorted list
107         pending = [] :: [{msg_seqno(), ra:index(), raw_msg()}],
108         status = up :: up | suspected_down
109        }).
110
111-record(cfg,
112        {name :: atom(),
113         resource :: rabbit_types:r('queue'),
114         release_cursor_interval ::
115             undefined | non_neg_integer() |
116             {non_neg_integer(), non_neg_integer()},
117         dead_letter_handler :: option(applied_mfa()),
118         become_leader_handler :: option(applied_mfa()),
119         max_length :: option(non_neg_integer()),
120         max_bytes :: option(non_neg_integer()),
121         %% whether single active consumer is on or not for this queue
122         consumer_strategy = competing :: consumer_strategy(),
123         %% the maximum number of unsuccessful delivery attempts permitted
124         delivery_limit :: option(non_neg_integer()),
125         max_in_memory_length :: option(non_neg_integer()),
126         max_in_memory_bytes :: option(non_neg_integer())
127        }).
128
129-type prefix_msgs() :: {list(), list()} |
130                       {non_neg_integer(), list(),
131                        non_neg_integer(), list()}.
132
133-record(?STATE,
134        {cfg :: #cfg{},
135         % unassigned messages
136         messages = #{} :: #{msg_in_id() => indexed_msg()},
137         % defines the lowest message in id available in the messages map
138         % that isn't a return
139         low_msg_num :: option(msg_in_id()),
140         % defines the next message in id to be added to the messages map
141         next_msg_num = 1 :: msg_in_id(),
142         % list of returned msg_in_ids - when checking out it picks from
143         % this list first before taking low_msg_num
144         returns = lqueue:new() :: lqueue:lqueue(prefix_msg() |
145                                                 {msg_in_id(), indexed_msg()}),
146         % a counter of enqueues - used to trigger shadow copy points
147         enqueue_count = 0 :: non_neg_integer(),
148         % a map containing all the live processes that have ever enqueued
149         % a message to this queue as well as a cached value of the smallest
150         % ra_index of all pending enqueues
151         enqueuers = #{} :: #{pid() => #enqueuer{}},
152         % master index of all enqueue raft indexes including pending
153         % enqueues
154         % rabbit_fifo_index can be slow when calculating the smallest
155         % index when there are large gaps but should be faster than gb_trees
156         % for normal appending operations as it's backed by a map
157         ra_indexes = rabbit_fifo_index:empty() :: rabbit_fifo_index:state(),
158         release_cursors = lqueue:new() :: lqueue:lqueue({release_cursor,
159                                                          ra:index(), #?STATE{}}),
160         % consumers need to reflect consumer state at time of snapshot
161         % needs to be part of snapshot
162         consumers = #{} :: #{consumer_id() => #consumer{}},
163         % consumers that require further service are queued here
164         % needs to be part of snapshot
165         service_queue = queue:new() :: queue:queue(consumer_id()),
166         %% This is a special field that is only used for snapshots
167         %% It represents the queued messages at the time the
168         %% dehydrated snapshot state was cached.
169         %% As release_cursors are only emitted for raft indexes where all
170         %% prior messages no longer contribute to the current state we can
171         %% replace all message payloads with their sizes (to be used for
172         %% overflow calculations).
173         %% This is done so that consumers are still served in a deterministic
174         %% order on recovery.
175         prefix_msgs = {0, [], 0, []} :: prefix_msgs(),
176         msg_bytes_enqueue = 0 :: non_neg_integer(),
177         msg_bytes_checkout = 0 :: non_neg_integer(),
178         %% waiting consumers, one is picked active consumer is cancelled or dies
179         %% used only when single active consumer is on
180         waiting_consumers = [] :: [{consumer_id(), consumer()}],
181         msg_bytes_in_memory = 0 :: non_neg_integer(),
182         msgs_ready_in_memory = 0 :: non_neg_integer()
183        }).
184
185-type config() :: #{name := atom(),
186                    queue_resource := rabbit_types:r('queue'),
187                    dead_letter_handler => applied_mfa(),
188                    become_leader_handler => applied_mfa(),
189                    release_cursor_interval => non_neg_integer(),
190                    max_length => non_neg_integer(),
191                    max_bytes => non_neg_integer(),
192                    max_in_memory_length => non_neg_integer(),
193                    max_in_memory_bytes => non_neg_integer(),
194                    single_active_consumer_on => boolean(),
195                    delivery_limit => non_neg_integer()}.
196