1%% This Source Code Form is subject to the terms of the Mozilla Public
2%% License, v. 2.0. If a copy of the MPL was not distributed with this
3%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4%%
5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates.  All rights reserved.
6%%
7
8%% @doc Provides an easy to consume API for interacting with the {@link rabbit_fifo.}
9%% state machine implementation running inside a `ra' raft system.
10%%
11%% Handles command tracking and other non-functional concerns.
12-module(rabbit_fifo_client).
13
14-export([
15         init/2,
16         init/3,
17         init/5,
18         checkout/5,
19         cancel_checkout/2,
20         enqueue/2,
21         enqueue/3,
22         dequeue/3,
23         settle/3,
24         return/3,
25         discard/3,
26         credit/4,
27         handle_ra_event/3,
28         untracked_enqueue/2,
29         purge/1,
30         cluster_name/1,
31         update_machine_state/2,
32         pending_size/1,
33         stat/1,
34         stat/2,
35         query_single_active_consumer/1
36         ]).
37
38-include_lib("rabbit_common/include/rabbit.hrl").
39
40-define(SOFT_LIMIT, 32).
41-define(TIMER_TIME, 10000).
42-define(COMMAND_TIMEOUT, 30000).
43
44-type seq() :: non_neg_integer().
45-type maybe_seq() :: integer().
46-type action() :: {send_credit_reply, Available :: non_neg_integer()} |
47                  {send_drained, CTagCredit ::
48                   {rabbit_fifo:consumer_tag(), non_neg_integer()}}.
49-type actions() :: [action()].
50
51-type cluster_name() :: rabbit_types:r(queue).
52
53-record(consumer, {last_msg_id :: seq() | -1,
54                   ack = false :: boolean(),
55                   delivery_count = 0 :: non_neg_integer()}).
56
57-record(cfg, {cluster_name :: cluster_name(),
58              servers = [] :: [ra:server_id()],
59              soft_limit = ?SOFT_LIMIT :: non_neg_integer(),
60              block_handler = fun() -> ok end :: fun(() -> term()),
61              unblock_handler = fun() -> ok end :: fun(() -> ok),
62              timeout :: non_neg_integer(),
63              version = 0 :: non_neg_integer()}).
64
65-record(state, {cfg :: #cfg{},
66                leader :: undefined | ra:server_id(),
67                queue_status  :: undefined | go | reject_publish,
68                next_seq = 0 :: seq(),
69                %% Last applied is initialise to -1 to note that no command has yet been
70                %% applied, but allowing to resend messages if the first ones on the sequence
71                %% are lost (messages are sent from last_applied + 1)
72                last_applied = -1 :: maybe_seq(),
73                next_enqueue_seq = 1 :: seq(),
74                %% indicates that we've exceeded the soft limit
75                slow = false :: boolean(),
76                unsent_commands = #{} :: #{rabbit_fifo:consumer_id() =>
77                                           {[seq()], [seq()], [seq()]}},
78                pending = #{} :: #{seq() =>
79                                   {term(), rabbit_fifo:command()}},
80                consumer_deliveries = #{} :: #{rabbit_fifo:consumer_tag() =>
81                                               #consumer{}},
82                timer_state :: term()
83               }).
84
85-opaque state() :: #state{}.
86
87-export_type([
88              state/0,
89              actions/0
90             ]).
91
92
93%% @doc Create the initial state for a new rabbit_fifo sessions. A state is needed
94%% to interact with a rabbit_fifo queue using @module.
95%% @param ClusterName the id of the cluster to interact with
96%% @param Servers The known servers of the queue. If the current leader is known
97%% ensure the leader node is at the head of the list.
98-spec init(cluster_name(), [ra:server_id()]) -> state().
99init(ClusterName, Servers) ->
100    init(ClusterName, Servers, ?SOFT_LIMIT).
101
102%% @doc Create the initial state for a new rabbit_fifo sessions. A state is needed
103%% to interact with a rabbit_fifo queue using @module.
104%% @param ClusterName the id of the cluster to interact with
105%% @param Servers The known servers of the queue. If the current leader is known
106%% ensure the leader node is at the head of the list.
107%% @param MaxPending size defining the max number of pending commands.
108-spec init(cluster_name(), [ra:server_id()], non_neg_integer()) -> state().
109init(ClusterName = #resource{}, Servers, SoftLimit) ->
110    Timeout = application:get_env(kernel, net_ticktime, 60) + 5,
111    #state{cfg = #cfg{cluster_name = ClusterName,
112                      servers = Servers,
113                      soft_limit = SoftLimit,
114                      timeout = Timeout * 1000}}.
115
116-spec init(cluster_name(), [ra:server_id()], non_neg_integer(), fun(() -> ok),
117           fun(() -> ok)) -> state().
118init(ClusterName = #resource{}, Servers, SoftLimit, BlockFun, UnblockFun) ->
119    %% net ticktime is in seconds
120    Timeout = application:get_env(kernel, net_ticktime, 60) + 5,
121    #state{cfg = #cfg{cluster_name = ClusterName,
122                      servers = Servers,
123                      block_handler = BlockFun,
124                      unblock_handler = UnblockFun,
125                      soft_limit = SoftLimit,
126                      timeout = Timeout * 1000}}.
127
128
129%% @doc Enqueues a message.
130%% @param Correlation an arbitrary erlang term used to correlate this
131%% command when it has been applied.
132%% @param Msg an arbitrary erlang term representing the message.
133%% @param State the current {@module} state.
134%% @returns
135%% `{ok | slow, State}' if the command was successfully sent. If the return
136%% tag is `slow' it means the limit is approaching and it is time to slow down
137%% the sending rate.
138%% {@module} assigns a sequence number to every raft command it issues. The
139%% SequenceNumber can be correlated to the applied sequence numbers returned
140%% by the {@link handle_ra_event/2. handle_ra_event/2} function.
141-spec enqueue(Correlation :: term(), Msg :: term(), State :: state()) ->
142    {ok | slow | reject_publish, state()}.
143enqueue(Correlation, Msg,
144        #state{queue_status = undefined,
145               next_enqueue_seq = 1,
146               cfg = #cfg{servers = Servers,
147                          timeout = Timeout}} = State0) ->
148    %% it is the first enqueue, check the version
149    {_, Node} = pick_server(State0),
150    case rpc:call(Node, ra_machine, version, [{machine, rabbit_fifo, #{}}]) of
151        0 ->
152            %% the leader is running the old version
153            enqueue(Correlation, Msg, State0#state{queue_status = go});
154        1 ->
155            %% were running the new version on the leader do sync initialisation
156            %% of enqueuer session
157            Reg = rabbit_fifo:make_register_enqueuer(self()),
158            case ra:process_command(Servers, Reg, Timeout) of
159                {ok, reject_publish, Leader} ->
160                    {reject_publish, State0#state{leader = Leader,
161                                                  queue_status = reject_publish}};
162                {ok, ok, Leader} ->
163                    enqueue(Correlation, Msg, State0#state{leader = Leader,
164                                                           queue_status = go});
165                {error, {no_more_servers_to_try, _Errs}} ->
166                    %% if we are not able to process the register command
167                    %% it is safe to reject the message as we never attempted
168                    %% to send it
169                    {reject_publish, State0};
170                %% TODO: not convinced this can ever happen when using
171                %% a list of servers
172                {timeout, _} ->
173                    {reject_publish, State0};
174                Err ->
175                    exit(Err)
176            end;
177        {badrpc, nodedown} ->
178            {reject_publish, State0}
179    end;
180enqueue(_Correlation, _Msg,
181        #state{queue_status = reject_publish,
182               cfg = #cfg{}} = State) ->
183    {reject_publish, State};
184enqueue(Correlation, Msg,
185        #state{slow = Slow,
186               queue_status = go,
187               cfg = #cfg{block_handler = BlockFun}} = State0) ->
188    Node = pick_server(State0),
189    {Next, State1} = next_enqueue_seq(State0),
190    % by default there is no correlation id
191    Cmd = rabbit_fifo:make_enqueue(self(), Next, Msg),
192    case send_command(Node, Correlation, Cmd, low, State1) of
193        {slow, State} when not Slow ->
194            BlockFun(),
195            {slow, set_timer(State)};
196        Any ->
197            Any
198    end.
199
200%% @doc Enqueues a message.
201%% @param Msg an arbitrary erlang term representing the message.
202%% @param State the current {@module} state.
203%% @returns
204%% `{ok | slow, State}' if the command was successfully sent. If the return
205%% tag is `slow' it means the limit is approaching and it is time to slow down
206%% the sending rate.
207%% {@module} assigns a sequence number to every raft command it issues. The
208%% SequenceNumber can be correlated to the applied sequence numbers returned
209%% by the {@link handle_ra_event/2. handle_ra_event/2} function.
210%%
211-spec enqueue(Msg :: term(), State :: state()) ->
212    {ok | slow | reject_publish, state()}.
213enqueue(Msg, State) ->
214    enqueue(undefined, Msg, State).
215
216%% @doc Dequeue a message from the queue.
217%%
218%% This is a synchronous call. I.e. the call will block until the command
219%% has been accepted by the ra process or it times out.
220%%
221%% @param ConsumerTag a unique tag to identify this particular consumer.
222%% @param Settlement either `settled' or `unsettled'. When `settled' no
223%% further settlement needs to be done.
224%% @param State The {@module} state.
225%%
226%% @returns `{ok, IdMsg, State}' or `{error | timeout, term()}'
227-spec dequeue(rabbit_fifo:consumer_tag(),
228              Settlement :: settled | unsettled, state()) ->
229    {ok, non_neg_integer(), term(), non_neg_integer()}
230     | {empty, state()} | {error | timeout, term()}.
231dequeue(ConsumerTag, Settlement,
232        #state{cfg = #cfg{timeout = Timeout,
233                          cluster_name = QName}} = State0) ->
234    Node = pick_server(State0),
235    ConsumerId = consumer_id(ConsumerTag),
236    case ra:process_command(Node,
237                            rabbit_fifo:make_checkout(ConsumerId,
238                                                      {dequeue, Settlement},
239                                                      #{}),
240                            Timeout) of
241        {ok, {dequeue, empty}, Leader} ->
242            {empty, State0#state{leader = Leader}};
243        {ok, {dequeue, {MsgId, {MsgHeader, Msg0}}, MsgsReady}, Leader} ->
244            Count = case MsgHeader of
245                        #{delivery_count := C} -> C;
246                       _ -> 0
247                    end,
248            IsDelivered = Count > 0,
249            Msg = add_delivery_count_header(Msg0, Count),
250            {ok, MsgsReady,
251             {QName, qref(Leader), MsgId, IsDelivered, Msg},
252             State0#state{leader = Leader}};
253        {ok, {error, _} = Err, _Leader} ->
254            Err;
255        Err ->
256            Err
257    end.
258
259add_delivery_count_header(#basic_message{} = Msg0, Count)
260  when is_integer(Count) ->
261    rabbit_basic:add_header(<<"x-delivery-count">>, long, Count, Msg0);
262add_delivery_count_header(Msg, _Count) ->
263    Msg.
264
265
266%% @doc Settle a message. Permanently removes message from the queue.
267%% @param ConsumerTag the tag uniquely identifying the consumer.
268%% @param MsgIds the message ids received with the {@link rabbit_fifo:delivery/0.}
269%% @param State the {@module} state
270%% @returns
271%% `{ok | slow, State}' if the command was successfully sent. If the return
272%% tag is `slow' it means the limit is approaching and it is time to slow down
273%% the sending rate.
274%%
275-spec settle(rabbit_fifo:consumer_tag(), [rabbit_fifo:msg_id()], state()) ->
276    {state(), list()}.
277settle(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) ->
278    Node = pick_server(State0),
279    Cmd = rabbit_fifo:make_settle(consumer_id(ConsumerTag), MsgIds),
280    case send_command(Node, undefined, Cmd, normal, State0) of
281        {_, S} ->
282            % turn slow into ok for this function
283            {S, []}
284    end;
285settle(ConsumerTag, [_|_] = MsgIds,
286       #state{unsent_commands = Unsent0} = State0) ->
287    ConsumerId = consumer_id(ConsumerTag),
288    %% we've reached the soft limit so will stash the command to be
289    %% sent once we have seen enough notifications
290    Unsent = maps:update_with(ConsumerId,
291                              fun ({Settles, Returns, Discards}) ->
292                                      {Settles ++ MsgIds, Returns, Discards}
293                              end, {MsgIds, [], []}, Unsent0),
294    {State0#state{unsent_commands = Unsent}, []}.
295
296%% @doc Return a message to the queue.
297%% @param ConsumerTag the tag uniquely identifying the consumer.
298%% @param MsgIds the message ids to return received
299%% from {@link rabbit_fifo:delivery/0.}
300%% @param State the {@module} state
301%% @returns
302%% `{State, list()}' if the command was successfully sent. If the return
303%% tag is `slow' it means the limit is approaching and it is time to slow down
304%% the sending rate.
305%%
306-spec return(rabbit_fifo:consumer_tag(), [rabbit_fifo:msg_id()], state()) ->
307    {state(), list()}.
308return(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) ->
309    Node = pick_server(State0),
310    % TODO: make rabbit_fifo return support lists of message ids
311    Cmd = rabbit_fifo:make_return(consumer_id(ConsumerTag), MsgIds),
312    {_Tag, State1} = send_command(Node, undefined, Cmd, normal, State0),
313    {State1, []};
314return(ConsumerTag, [_|_] = MsgIds,
315       #state{unsent_commands = Unsent0} = State0) ->
316    ConsumerId = consumer_id(ConsumerTag),
317    %% we've reached the soft limit so will stash the command to be
318    %% sent once we have seen enough notifications
319    Unsent = maps:update_with(ConsumerId,
320                              fun ({Settles, Returns, Discards}) ->
321                                      {Settles, Returns ++ MsgIds, Discards}
322                              end, {[], MsgIds, []}, Unsent0),
323    State1 = State0#state{unsent_commands = Unsent},
324    {State1, []}.
325
326%% @doc Discards a checked out message.
327%% If the queue has a dead_letter_handler configured this will be called.
328%% @param ConsumerTag the tag uniquely identifying the consumer.
329%% @param MsgIds the message ids to discard
330%% from {@link rabbit_fifo:delivery/0.}
331%% @param State the {@module} state
332%% @returns
333%% `{ok | slow, State}' if the command was successfully sent. If the return
334%% tag is `slow' it means the limit is approaching and it is time to slow down
335%% the sending rate.
336-spec discard(rabbit_fifo:consumer_tag(), [rabbit_fifo:msg_id()], state()) ->
337    {state(), list()}.
338discard(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) ->
339    Node = pick_server(State0),
340    Cmd = rabbit_fifo:make_discard(consumer_id(ConsumerTag), MsgIds),
341    case send_command(Node, undefined, Cmd, normal, State0) of
342        {_, S} ->
343            % turn slow into ok for this function
344            {S, []}
345    end;
346discard(ConsumerTag, [_|_] = MsgIds,
347        #state{unsent_commands = Unsent0} = State0) ->
348    ConsumerId = consumer_id(ConsumerTag),
349    %% we've reached the soft limit so will stash the command to be
350    %% sent once we have seen enough notifications
351    Unsent = maps:update_with(ConsumerId,
352                              fun ({Settles, Returns, Discards}) ->
353                                      {Settles, Returns, Discards ++ MsgIds}
354                              end, {[], [], MsgIds}, Unsent0),
355    {State0#state{unsent_commands = Unsent}, []}.
356
357%% @doc Register with the rabbit_fifo queue to "checkout" messages as they
358%% become available.
359%%
360%% This is a synchronous call. I.e. the call will block until the command
361%% has been accepted by the ra process or it times out.
362%%
363%% @param ConsumerTag a unique tag to identify this particular consumer.
364%% @param NumUnsettled the maximum number of in-flight messages. Once this
365%% number of messages has been received but not settled no further messages
366%% will be delivered to the consumer.
367%% @param CreditMode The credit mode to use for the checkout.
368%%     simple_prefetch: credit is auto topped up as deliveries are settled
369%%     credited: credit is only increased by sending credit to the queue
370%% @param State The {@module} state.
371%%
372%% @returns `{ok, State}' or `{error | timeout, term()}'
373-spec checkout(rabbit_fifo:consumer_tag(),
374               NumUnsettled :: non_neg_integer(),
375               CreditMode :: rabbit_fifo:credit_mode(),
376               Meta :: rabbit_fifo:consumer_meta(),
377               state()) -> {ok, state()} | {error | timeout, term()}.
378checkout(ConsumerTag, NumUnsettled, CreditMode, Meta,
379         #state{consumer_deliveries = CDels0} = State0) ->
380    Servers = sorted_servers(State0),
381    ConsumerId = {ConsumerTag, self()},
382    Cmd = rabbit_fifo:make_checkout(ConsumerId,
383                                    {auto, NumUnsettled, CreditMode},
384                                    Meta),
385    %% ???
386    Ack = maps:get(ack, Meta, true),
387
388    SDels = maps:update_with(ConsumerTag,
389                             fun (V) ->
390                                     V#consumer{ack = Ack}
391                             end,
392                             #consumer{last_msg_id = -1,
393                                       ack = Ack}, CDels0),
394    try_process_command(Servers, Cmd, State0#state{consumer_deliveries = SDels}).
395
396
397-spec query_single_active_consumer(state()) ->
398    {ok, term()} | {error, term()} | {timeout, term()}.
399query_single_active_consumer(#state{leader = undefined}) ->
400    {error, leader_not_known};
401query_single_active_consumer(#state{leader = Leader}) ->
402    case ra:local_query(Leader, fun rabbit_fifo:query_single_active_consumer/1,
403                        ?COMMAND_TIMEOUT) of
404        {ok, {_, Reply}, _} ->
405            {ok, Reply};
406        Err ->
407            Err
408    end.
409
410%% @doc Provide credit to the queue
411%%
412%% This only has an effect if the consumer uses credit mode: credited
413%% @param ConsumerTag a unique tag to identify this particular consumer.
414%% @param Credit the amount of credit to provide to theq queue
415%% @param Drain tells the queue to use up any credit that cannot be immediately
416%% fulfilled. (i.e. there are not enough messages on queue to use up all the
417%% provided credit).
418-spec credit(rabbit_fifo:consumer_tag(),
419             Credit :: non_neg_integer(),
420             Drain :: boolean(),
421             state()) ->
422          {state(), actions()}.
423credit(ConsumerTag, Credit, Drain,
424       #state{consumer_deliveries = CDels} = State0) ->
425    ConsumerId = consumer_id(ConsumerTag),
426    %% the last received msgid provides us with the delivery count if we
427    %% add one as it is 0 indexed
428    C = maps:get(ConsumerTag, CDels, #consumer{last_msg_id = -1}),
429    Node = pick_server(State0),
430    Cmd = rabbit_fifo:make_credit(ConsumerId, Credit,
431                                  C#consumer.last_msg_id + 1, Drain),
432    case send_command(Node, undefined, Cmd, normal, State0) of
433        {_, S} ->
434            % turn slow into ok for this function
435            {S, []}
436    end.
437
438%% @doc Cancels a checkout with the rabbit_fifo queue  for the consumer tag
439%%
440%% This is a synchronous call. I.e. the call will block until the command
441%% has been accepted by the ra process or it times out.
442%%
443%% @param ConsumerTag a unique tag to identify this particular consumer.
444%% @param State The {@module} state.
445%%
446%% @returns `{ok, State}' or `{error | timeout, term()}'
447-spec cancel_checkout(rabbit_fifo:consumer_tag(), state()) ->
448    {ok, state()} | {error | timeout, term()}.
449cancel_checkout(ConsumerTag, #state{consumer_deliveries = CDels} = State0) ->
450    Servers = sorted_servers(State0),
451    ConsumerId = {ConsumerTag, self()},
452    Cmd = rabbit_fifo:make_checkout(ConsumerId, cancel, #{}),
453    State = State0#state{consumer_deliveries = maps:remove(ConsumerTag, CDels)},
454    try_process_command(Servers, Cmd, State).
455
456%% @doc Purges all the messages from a rabbit_fifo queue and returns the number
457%% of messages purged.
458-spec purge(ra:server_id()) -> {ok, non_neg_integer()} | {error | timeout, term()}.
459purge(Server) ->
460    case ra:process_command(Server, rabbit_fifo:make_purge(), ?COMMAND_TIMEOUT) of
461        {ok, {purge, Reply}, _} ->
462            {ok, Reply};
463        Err ->
464            Err
465    end.
466
467-spec pending_size(state()) -> non_neg_integer().
468pending_size(#state{pending = Pend}) ->
469    maps:size(Pend).
470
471-spec stat(ra:server_id()) ->
472    {ok, non_neg_integer(), non_neg_integer()}
473    | {error | timeout, term()}.
474stat(Leader) ->
475    %% short timeout as we don't want to spend too long if it is going to
476    %% fail anyway
477    stat(Leader, 250).
478
479-spec stat(ra:server_id(), non_neg_integer()) ->
480    {ok, non_neg_integer(), non_neg_integer()}
481    | {error | timeout, term()}.
482stat(Leader, Timeout) ->
483    %% short timeout as we don't want to spend too long if it is going to
484    %% fail anyway
485    case ra:local_query(Leader, fun rabbit_fifo:query_stat/1, Timeout) of
486      {ok, {_, {R, C}}, _} -> {ok, R, C};
487      {error, _} = Error   -> Error;
488      {timeout, _} = Error -> Error
489    end.
490
491%% @doc returns the cluster name
492-spec cluster_name(state()) -> cluster_name().
493cluster_name(#state{cfg = #cfg{cluster_name = ClusterName}}) ->
494    ClusterName.
495
496update_machine_state(Server, Conf) ->
497    case ra:process_command(Server, rabbit_fifo:make_update_config(Conf), ?COMMAND_TIMEOUT) of
498        {ok, ok, _} ->
499            ok;
500        Err ->
501            Err
502    end.
503
504%% @doc Handles incoming `ra_events'. Events carry both internal "bookeeping"
505%% events emitted by the `ra' leader as well as `rabbit_fifo' emitted events such
506%% as message deliveries. All ra events need to be handled by {@module}
507%% to ensure bookeeping, resends and flow control is correctly handled.
508%%
509%% If the `ra_event' contains a `rabbit_fifo' generated message it will be returned
510%% for further processing.
511%%
512%% Example:
513%%
514%% ```
515%%  receive
516%%     {ra_event, From, Evt} ->
517%%         case rabbit_fifo_client:handle_ra_event(From, Evt, State0) of
518%%             {internal, _Seq, State} -> State;
519%%             {{delivery, _ConsumerTag, Msgs}, State} ->
520%%                  handle_messages(Msgs),
521%%                  ...
522%%         end
523%%  end
524%% '''
525%%
526%% @param From the {@link ra:server_id().} of the sending process.
527%% @param Event the body of the `ra_event'.
528%% @param State the current {@module} state.
529%%
530%% @returns
531%% `{internal, AppliedCorrelations, State}' if the event contained an internally
532%% handled event such as a notification and a correlation was included with
533%% the command (e.g. in a call to `enqueue/3' the correlation terms are returned
534%% here.
535%%
536%% `{RaFifoEvent, State}' if the event contained a client message generated by
537%% the `rabbit_fifo' state machine such as a delivery.
538%%
539%% The type of `rabbit_fifo' client messages that can be received are:
540%%
541%% `{delivery, ConsumerTag, [{MsgId, {MsgHeader, Msg}}]}'
542%%
543%% <li>`ConsumerTag' the binary tag passed to {@link checkout/3.}</li>
544%% <li>`MsgId' is a consumer scoped monotonically incrementing id that can be
545%% used to {@link settle/3.} (roughly: AMQP 0.9.1 ack) message once finished
546%% with them.</li>
547-spec handle_ra_event(ra:server_id(), ra_server_proc:ra_event_body(), state()) ->
548    {internal, Correlators :: [term()], actions(), state()} |
549    {rabbit_fifo:client_msg(), state()} | eol.
550handle_ra_event(From, {applied, Seqs},
551                #state{cfg = #cfg{cluster_name = QRef,
552                                  soft_limit = SftLmt,
553                                  unblock_handler = UnblockFun}} = State0) ->
554
555    {Corrs, Actions0, State1} = lists:foldl(fun seq_applied/2,
556                                           {[], [], State0#state{leader = From}},
557                                           Seqs),
558    Actions = case Corrs of
559                  [] ->
560                      lists:reverse(Actions0);
561                  _ ->
562                      [{settled, QRef, Corrs}
563                       | lists:reverse(Actions0)]
564              end,
565    case maps:size(State1#state.pending) < SftLmt of
566        true when State1#state.slow == true ->
567            % we have exited soft limit state
568            % send any unsent commands and cancel the time as
569            % TODO: really the timer should only be cancelled when the channel
570            % exits flow state (which depends on the state of all queues the
571            % channel is interacting with)
572            % but the fact the queue has just applied suggests
573            % it's ok to cancel here anyway
574            State2 = cancel_timer(State1#state{slow = false,
575                                               unsent_commands = #{}}),
576            % build up a list of commands to issue
577            Commands = maps:fold(
578                         fun (Cid, {Settled, Returns, Discards}, Acc) ->
579                                 add_command(Cid, settle, Settled,
580                                             add_command(Cid, return, Returns,
581                                                         add_command(Cid, discard,
582                                                                     Discards, Acc)))
583                         end, [], State1#state.unsent_commands),
584            Node = pick_server(State2),
585            %% send all the settlements and returns
586            State = lists:foldl(fun (C, S0) ->
587                                        case send_command(Node, undefined,
588                                                          C, normal, S0) of
589                                            {T, S} when T =/= error ->
590                                                S
591                                        end
592                                end, State2, Commands),
593            UnblockFun(),
594            {ok, State, Actions};
595        _ ->
596            {ok, State1, Actions}
597    end;
598handle_ra_event(From, {machine, {delivery, _ConsumerTag, _} = Del}, State0) ->
599    handle_delivery(From, Del, State0);
600handle_ra_event(_, {machine, {queue_status, Status}},
601                #state{} = State) ->
602    %% just set the queue status
603    {ok, State#state{queue_status = Status}, []};
604handle_ra_event(Leader, {machine, leader_change},
605                #state{leader = Leader} = State) ->
606    %% leader already known
607    {ok, State, []};
608handle_ra_event(Leader, {machine, leader_change}, State0) ->
609    %% we need to update leader
610    %% and resend any pending commands
611    State = resend_all_pending(State0#state{leader = Leader}),
612    {ok, cancel_timer(State), []};
613handle_ra_event(_From, {rejected, {not_leader, undefined, _Seq}}, State0) ->
614    % TODO: how should these be handled? re-sent on timer or try random
615    {ok, State0, []};
616handle_ra_event(_From, {rejected, {not_leader, Leader, Seq}}, State0) ->
617    State1 = State0#state{leader = Leader},
618    State = resend(Seq, State1),
619    {ok, State, []};
620handle_ra_event(_, timeout, #state{cfg = #cfg{servers = Servers}} = State0) ->
621    case find_leader(Servers) of
622        undefined ->
623            %% still no leader, set the timer again
624            {ok, set_timer(State0), []};
625        Leader ->
626            State = resend_all_pending(State0#state{leader = Leader}),
627            {ok, State, []}
628    end;
629handle_ra_event(_Leader, {machine, eol}, _State0) ->
630    eol.
631
632%% @doc Attempts to enqueue a message using cast semantics. This provides no
633%% guarantees or retries if the message fails to achieve consensus or if the
634%% servers sent to happens not to be available. If the message is sent to a
635%% follower it will attempt the deliver it to the leader, if known. Else it will
636%% drop the messages.
637%%
638%% NB: only use this for non-critical enqueues where a full rabbit_fifo_client state
639%% cannot be maintained.
640%%
641%% @param CusterId  the cluster id.
642%% @param Servers the known servers in the cluster.
643%% @param Msg the message to enqueue.
644%%
645%% @returns `ok'
646-spec untracked_enqueue([ra:server_id()], term()) ->
647    ok.
648untracked_enqueue([Node | _], Msg) ->
649    Cmd = rabbit_fifo:make_enqueue(undefined, undefined, Msg),
650    ok = ra:pipeline_command(Node, Cmd),
651    ok.
652
653%% Internal
654
655try_process_command([Server | Rem], Cmd,
656                    #state{cfg = #cfg{timeout = Timeout}} = State) ->
657    case ra:process_command(Server, Cmd, Timeout) of
658        {ok, _, Leader} ->
659            {ok, State#state{leader = Leader}};
660        Err when length(Rem) =:= 0 ->
661            Err;
662        _ ->
663            try_process_command(Rem, Cmd, State)
664    end.
665
666seq_applied({Seq, MaybeAction},
667            {Corrs, Actions0, #state{last_applied = Last} = State0})
668  when Seq > Last ->
669    State1 = do_resends(Last+1, Seq-1, State0),
670    {Actions, State} = maybe_add_action(MaybeAction, Actions0, State1),
671    case maps:take(Seq, State#state.pending) of
672        {{undefined, _}, Pending} ->
673            {Corrs, Actions, State#state{pending = Pending,
674                                         last_applied = Seq}};
675        {{Corr, _}, Pending} ->
676            {[Corr | Corrs], Actions, State#state{pending = Pending,
677                                                  last_applied = Seq}};
678        error ->
679            % must have already been resent or removed for some other reason
680            % still need to update last_applied or we may inadvertently resend
681            % stuff later
682            {Corrs, Actions, State#state{last_applied = Seq}}
683    end;
684seq_applied(_Seq, Acc) ->
685    Acc.
686
687maybe_add_action(ok, Acc, State) ->
688    {Acc, State};
689maybe_add_action({multi, Actions}, Acc0, State0) ->
690    lists:foldl(fun (Act, {Acc, State}) ->
691                        maybe_add_action(Act, Acc, State)
692                end, {Acc0, State0}, Actions);
693maybe_add_action({send_drained, {Tag, Credit}} = Action, Acc,
694                 #state{consumer_deliveries = CDels} = State) ->
695    %% add credit to consumer delivery_count
696    C = maps:get(Tag, CDels),
697    {[Action | Acc],
698     State#state{consumer_deliveries =
699                 update_consumer(Tag, C#consumer.last_msg_id,
700                                 Credit, C, CDels)}};
701maybe_add_action(Action, Acc, State) ->
702    %% anything else is assumed to be an action
703    {[Action | Acc], State}.
704
705do_resends(From, To, State) when From =< To ->
706    lists:foldl(fun resend/2, State, lists:seq(From, To));
707do_resends(_, _, State) ->
708    State.
709
710% resends a command with a new sequence number
711resend(OldSeq, #state{pending = Pending0, leader = Leader} = State) ->
712    case maps:take(OldSeq, Pending0) of
713        {{Corr, Cmd}, Pending} ->
714            %% resends aren't subject to flow control here
715            resend_command(Leader, Corr, Cmd, State#state{pending = Pending});
716        error ->
717            State
718    end.
719
720resend_all_pending(#state{pending = Pend} = State) ->
721    Seqs = lists:sort(maps:keys(Pend)),
722    lists:foldl(fun resend/2, State, Seqs).
723
724maybe_auto_ack(true, Deliver, State0) ->
725    %% manual ack is enabled
726    {ok, State0, [Deliver]};
727maybe_auto_ack(false, {deliver, Tag, _Ack, Msgs} = Deliver, State0) ->
728    %% we have to auto ack these deliveries
729    MsgIds = [I || {_, _, I, _, _} <- Msgs],
730    {State, Actions} = settle(Tag, MsgIds, State0),
731    {ok, State, [Deliver] ++ Actions}.
732
733handle_delivery(Leader, {delivery, Tag, [{FstId, _} | _] = IdMsgs},
734                #state{cfg = #cfg{cluster_name = QName},
735                       consumer_deliveries = CDels0} = State0)
736  when is_map_key(Tag, CDels0) ->
737    QRef = qref(Leader),
738    {LastId, _} = lists:last(IdMsgs),
739    Consumer = #consumer{ack = Ack} = maps:get(Tag, CDels0),
740    %% format as a deliver action
741    Del = {deliver, Tag, Ack, transform_msgs(QName, QRef, IdMsgs)},
742    %% TODO: remove potential default allocation
743    case Consumer of
744        #consumer{last_msg_id = Prev} = C
745          when FstId =:= Prev+1 ->
746            maybe_auto_ack(Ack, Del,
747                           State0#state{consumer_deliveries =
748                                        update_consumer(Tag, LastId,
749                                                        length(IdMsgs), C,
750                                                        CDels0)});
751        #consumer{last_msg_id = Prev} = C
752          when FstId > Prev+1 ->
753            NumMissing = FstId - Prev + 1,
754            %% there may actually be fewer missing messages returned than expected
755            %% This can happen when a node the channel is on gets disconnected
756            %% from the node the leader is on and then reconnected afterwards.
757            %% When the node is disconnected the leader will return all checked
758            %% out messages to the main queue to ensure they don't get stuck in
759            %% case the node never comes back.
760            case get_missing_deliveries(Leader, Prev+1, FstId-1, Tag) of
761                {protocol_error, _, _, _} = Err ->
762                    Err;
763                Missing ->
764                    XDel = {deliver, Tag, Ack, transform_msgs(QName, QRef,
765                                                              Missing ++ IdMsgs)},
766                    maybe_auto_ack(Ack, XDel,
767                                   State0#state{consumer_deliveries =
768                                                    update_consumer(Tag, LastId,
769                                                                    length(IdMsgs) + NumMissing,
770                                                                    C, CDels0)})
771            end;
772        #consumer{last_msg_id = Prev}
773          when FstId =< Prev ->
774            case lists:dropwhile(fun({Id, _}) -> Id =< Prev end, IdMsgs) of
775                [] ->
776                    {ok, State0, []};
777                IdMsgs2 ->
778                    handle_delivery(Leader, {delivery, Tag, IdMsgs2}, State0)
779            end;
780        C when FstId =:= 0 ->
781            % the very first delivery
782            maybe_auto_ack(Ack, Del,
783                           State0#state{consumer_deliveries =
784                                        update_consumer(Tag, LastId,
785                                                        length(IdMsgs),
786                                                        C#consumer{last_msg_id = LastId},
787                                                        CDels0)})
788    end;
789handle_delivery(_Leader, {delivery, Tag, [_ | _] = IdMsgs},
790                #state{consumer_deliveries = CDels0} = State0)
791  when not is_map_key(Tag, CDels0) ->
792    %% Note:
793    %% https://github.com/rabbitmq/rabbitmq-server/issues/3729
794    %% If the consumer is no longer in the deliveries map,
795    %% we should return all messages.
796    MsgIntIds = [Id || {Id, _} <- IdMsgs],
797    {State1, Deliveries} = return(Tag, MsgIntIds, State0),
798    {ok, State1, Deliveries}.
799
800transform_msgs(QName, QRef, Msgs) ->
801    lists:map(
802      fun({MsgId, {MsgHeader, Msg0}}) ->
803              {Msg, Redelivered} = case MsgHeader of
804                                       #{delivery_count := C} ->
805                                           {add_delivery_count_header(Msg0, C), true};
806                                       _ ->
807                                           {Msg0, false}
808                                   end,
809              {QName, QRef, MsgId, Redelivered, Msg}
810      end, Msgs).
811
812update_consumer(Tag, LastId, DelCntIncr,
813                #consumer{delivery_count = D} = C, Consumers) ->
814    maps:put(Tag,
815             C#consumer{last_msg_id = LastId,
816                        delivery_count = D + DelCntIncr},
817             Consumers).
818
819
820get_missing_deliveries(Leader, From, To, ConsumerTag) ->
821    ConsumerId = consumer_id(ConsumerTag),
822    % ?INFO("get_missing_deliveries for ~w from ~b to ~b",
823    %       [ConsumerId, From, To]),
824    Query = fun (State) ->
825                    rabbit_fifo:get_checked_out(ConsumerId, From, To, State)
826            end,
827    case ra:local_query(Leader, Query, ?COMMAND_TIMEOUT) of
828        {ok, {_, Missing}, _} ->
829            Missing;
830        {error, Error} ->
831            {protocol_error, internal_error, "Cannot query missing deliveries from ~p: ~p",
832             [Leader, Error]};
833        {timeout, _} ->
834            {protocol_error, internal_error, "Cannot query missing deliveries from ~p: timeout",
835             [Leader]}
836    end.
837
838pick_server(#state{leader = undefined,
839                   cfg = #cfg{servers = [N | _]}}) ->
840    %% TODO: pick random rather that first?
841    N;
842pick_server(#state{leader = Leader}) ->
843    Leader.
844
845% servers sorted by last known leader
846sorted_servers(#state{leader = undefined,
847                      cfg = #cfg{servers = Servers}}) ->
848    Servers;
849sorted_servers(#state{leader = Leader,
850                      cfg = #cfg{servers = Servers}}) ->
851    [Leader | lists:delete(Leader, Servers)].
852
853next_seq(#state{next_seq = Seq} = State) ->
854    {Seq, State#state{next_seq = Seq + 1}}.
855
856next_enqueue_seq(#state{next_enqueue_seq = Seq} = State) ->
857    {Seq, State#state{next_enqueue_seq = Seq + 1}}.
858
859consumer_id(ConsumerTag) ->
860    {ConsumerTag, self()}.
861
862send_command(Server, Correlation, Command, Priority,
863             #state{pending = Pending,
864                    cfg = #cfg{soft_limit = SftLmt}} = State0) ->
865    {Seq, State} = next_seq(State0),
866    ok = ra:pipeline_command(Server, Command, Seq, Priority),
867    Tag = case maps:size(Pending) >= SftLmt of
868              true -> slow;
869              false -> ok
870          end,
871    {Tag, State#state{pending = Pending#{Seq => {Correlation, Command}},
872                      slow = Tag == slow}}.
873
874resend_command(Node, Correlation, Command,
875               #state{pending = Pending} = State0) ->
876    {Seq, State} = next_seq(State0),
877    ok = ra:pipeline_command(Node, Command, Seq),
878    State#state{pending = Pending#{Seq => {Correlation, Command}}}.
879
880add_command(_, _, [], Acc) ->
881    Acc;
882add_command(Cid, settle, MsgIds, Acc) ->
883    [rabbit_fifo:make_settle(Cid, MsgIds) | Acc];
884add_command(Cid, return, MsgIds, Acc) ->
885    [rabbit_fifo:make_return(Cid, MsgIds) | Acc];
886add_command(Cid, discard, MsgIds, Acc) ->
887    [rabbit_fifo:make_discard(Cid, MsgIds) | Acc].
888
889set_timer(#state{leader = Leader0,
890                 cfg = #cfg{servers = [Server | _],
891                            cluster_name = QName}} = State) ->
892    Leader = case Leader0 of
893                 undefined -> Server;
894                 _ ->
895                     Leader0
896             end,
897    Ref = erlang:send_after(?TIMER_TIME, self(),
898                            {'$gen_cast',
899                             {queue_event, QName, {Leader, timeout}}}),
900    State#state{timer_state = Ref}.
901
902cancel_timer(#state{timer_state = undefined} = State) ->
903    State;
904cancel_timer(#state{timer_state = Ref} = State) ->
905    erlang:cancel_timer(Ref, [{async, true}, {info, false}]),
906    State#state{timer_state = undefined}.
907
908find_leader([]) ->
909    undefined;
910find_leader([Server | Servers]) ->
911    case ra:members(Server, 500) of
912        {ok, _, Leader} -> Leader;
913        _ ->
914            find_leader(Servers)
915    end.
916
917qref({Ref, _}) -> Ref;
918qref(Ref) -> Ref.
919