1 2-type option(T) :: undefined | T. 3 4-type raw_msg() :: term(). 5%% The raw message. It is opaque to rabbit_fifo. 6 7-type msg_in_id() :: non_neg_integer(). 8% a queue scoped monotonically incrementing integer used to enforce order 9% in the unassigned messages map 10 11-type msg_id() :: non_neg_integer(). 12%% A consumer-scoped monotonically incrementing integer included with a 13%% {@link delivery/0.}. Used to settle deliveries using 14%% {@link rabbit_fifo_client:settle/3.} 15 16-type msg_seqno() :: non_neg_integer(). 17%% A sender process scoped monotonically incrementing integer included 18%% in enqueue messages. Used to ensure ordering of messages send from the 19%% same process 20 21-type msg_header() :: msg_size() | 22 #{size := msg_size(), 23 delivery_count => non_neg_integer()}. 24%% The message header: 25%% delivery_count: the number of unsuccessful delivery attempts. 26%% A non-zero value indicates a previous attempt. 27%% If it only contains the size it can be condensed to an integer only 28 29-type msg() :: {msg_header(), raw_msg()}. 30%% message with a header map. 31 32-type msg_size() :: non_neg_integer(). 33%% the size in bytes of the msg payload 34 35-type indexed_msg() :: {ra:index(), msg()}. 36 37-type prefix_msg() :: {'$prefix_msg', msg_header()}. 38 39-type delivery_msg() :: {msg_id(), msg()}. 40%% A tuple consisting of the message id and the headered message. 41 42-type consumer_tag() :: binary(). 43%% An arbitrary binary tag used to distinguish between different consumers 44%% set up by the same process. See: {@link rabbit_fifo_client:checkout/3.} 45 46-type delivery() :: {delivery, consumer_tag(), [delivery_msg()]}. 47%% Represents the delivery of one or more rabbit_fifo messages. 48 49-type consumer_id() :: {consumer_tag(), pid()}. 50%% The entity that receives messages. Uniquely identifies a consumer. 51 52-type credit_mode() :: simple_prefetch | credited. 53%% determines how credit is replenished 54 55-type checkout_spec() :: {once | auto, Num :: non_neg_integer(), 56 credit_mode()} | 57 {dequeue, settled | unsettled} | 58 cancel. 59 60-type consumer_meta() :: #{ack => boolean(), 61 username => binary(), 62 prefetch => non_neg_integer(), 63 args => list()}. 64%% static meta data associated with a consumer 65 66 67-type applied_mfa() :: {module(), atom(), list()}. 68% represents a partially applied module call 69 70-define(RELEASE_CURSOR_EVERY, 64000). 71-define(RELEASE_CURSOR_EVERY_MAX, 3200000). 72-define(USE_AVG_HALF_LIFE, 10000.0). 73%% an average QQ without any message uses about 100KB so setting this limit 74%% to ~10 times that should be relatively safe. 75-define(GC_MEM_LIMIT_B, 2000000). 76 77-define(MB, 1048576). 78-define(STATE, rabbit_fifo). 79 80-record(consumer, 81 {meta = #{} :: consumer_meta(), 82 checked_out = #{} :: #{msg_id() => {msg_in_id(), indexed_msg()}}, 83 next_msg_id = 0 :: msg_id(), % part of snapshot data 84 %% max number of messages that can be sent 85 %% decremented for each delivery 86 credit = 0 : non_neg_integer(), 87 %% total number of checked out messages - ever 88 %% incremented for each delivery 89 delivery_count = 0 :: non_neg_integer(), 90 %% the mode of how credit is incremented 91 %% simple_prefetch: credit is re-filled as deliveries are settled 92 %% or returned. 93 %% credited: credit can only be changed by receiving a consumer_credit 94 %% command: `{consumer_credit, ReceiverDeliveryCount, Credit}' 95 credit_mode = simple_prefetch :: credit_mode(), % part of snapshot data 96 lifetime = once :: once | auto, 97 status = up :: up | suspected_down | cancelled 98 }). 99 100-type consumer() :: #consumer{}. 101 102-type consumer_strategy() :: competing | single_active. 103 104-record(enqueuer, 105 {next_seqno = 1 :: msg_seqno(), 106 % out of order enqueues - sorted list 107 pending = [] :: [{msg_seqno(), ra:index(), raw_msg()}], 108 status = up :: up | suspected_down 109 }). 110 111-record(cfg, 112 {name :: atom(), 113 resource :: rabbit_types:r('queue'), 114 release_cursor_interval :: 115 undefined | non_neg_integer() | 116 {non_neg_integer(), non_neg_integer()}, 117 dead_letter_handler :: option(applied_mfa()), 118 become_leader_handler :: option(applied_mfa()), 119 max_length :: option(non_neg_integer()), 120 max_bytes :: option(non_neg_integer()), 121 %% whether single active consumer is on or not for this queue 122 consumer_strategy = competing :: consumer_strategy(), 123 %% the maximum number of unsuccessful delivery attempts permitted 124 delivery_limit :: option(non_neg_integer()), 125 max_in_memory_length :: option(non_neg_integer()), 126 max_in_memory_bytes :: option(non_neg_integer()) 127 }). 128 129-type prefix_msgs() :: {list(), list()} | 130 {non_neg_integer(), list(), 131 non_neg_integer(), list()}. 132 133-record(?STATE, 134 {cfg :: #cfg{}, 135 % unassigned messages 136 messages = #{} :: #{msg_in_id() => indexed_msg()}, 137 % defines the lowest message in id available in the messages map 138 % that isn't a return 139 low_msg_num :: option(msg_in_id()), 140 % defines the next message in id to be added to the messages map 141 next_msg_num = 1 :: msg_in_id(), 142 % list of returned msg_in_ids - when checking out it picks from 143 % this list first before taking low_msg_num 144 returns = lqueue:new() :: lqueue:lqueue(prefix_msg() | 145 {msg_in_id(), indexed_msg()}), 146 % a counter of enqueues - used to trigger shadow copy points 147 enqueue_count = 0 :: non_neg_integer(), 148 % a map containing all the live processes that have ever enqueued 149 % a message to this queue as well as a cached value of the smallest 150 % ra_index of all pending enqueues 151 enqueuers = #{} :: #{pid() => #enqueuer{}}, 152 % master index of all enqueue raft indexes including pending 153 % enqueues 154 % rabbit_fifo_index can be slow when calculating the smallest 155 % index when there are large gaps but should be faster than gb_trees 156 % for normal appending operations as it's backed by a map 157 ra_indexes = rabbit_fifo_index:empty() :: rabbit_fifo_index:state(), 158 release_cursors = lqueue:new() :: lqueue:lqueue({release_cursor, 159 ra:index(), #?STATE{}}), 160 % consumers need to reflect consumer state at time of snapshot 161 % needs to be part of snapshot 162 consumers = #{} :: #{consumer_id() => #consumer{}}, 163 % consumers that require further service are queued here 164 % needs to be part of snapshot 165 service_queue = queue:new() :: queue:queue(consumer_id()), 166 %% This is a special field that is only used for snapshots 167 %% It represents the queued messages at the time the 168 %% dehydrated snapshot state was cached. 169 %% As release_cursors are only emitted for raft indexes where all 170 %% prior messages no longer contribute to the current state we can 171 %% replace all message payloads with their sizes (to be used for 172 %% overflow calculations). 173 %% This is done so that consumers are still served in a deterministic 174 %% order on recovery. 175 prefix_msgs = {0, [], 0, []} :: prefix_msgs(), 176 msg_bytes_enqueue = 0 :: non_neg_integer(), 177 msg_bytes_checkout = 0 :: non_neg_integer(), 178 %% waiting consumers, one is picked active consumer is cancelled or dies 179 %% used only when single active consumer is on 180 waiting_consumers = [] :: [{consumer_id(), consumer()}], 181 msg_bytes_in_memory = 0 :: non_neg_integer(), 182 msgs_ready_in_memory = 0 :: non_neg_integer() 183 }). 184 185-type config() :: #{name := atom(), 186 queue_resource := rabbit_types:r('queue'), 187 dead_letter_handler => applied_mfa(), 188 become_leader_handler => applied_mfa(), 189 release_cursor_interval => non_neg_integer(), 190 max_length => non_neg_integer(), 191 max_bytes => non_neg_integer(), 192 max_in_memory_length => non_neg_integer(), 193 max_in_memory_bytes => non_neg_integer(), 194 single_active_consumer_on => boolean(), 195 delivery_limit => non_neg_integer()}. 196