1%% This Source Code Form is subject to the terms of the Mozilla Public 2%% License, v. 2.0. If a copy of the MPL was not distributed with this 3%% file, You can obtain one at https://mozilla.org/MPL/2.0/. 4%% 5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved. 6%% 7 8%% @doc Provides an easy to consume API for interacting with the {@link rabbit_fifo.} 9%% state machine implementation running inside a `ra' raft system. 10%% 11%% Handles command tracking and other non-functional concerns. 12-module(rabbit_fifo_client). 13 14-export([ 15 init/2, 16 init/3, 17 init/5, 18 checkout/5, 19 cancel_checkout/2, 20 enqueue/2, 21 enqueue/3, 22 dequeue/3, 23 settle/3, 24 return/3, 25 discard/3, 26 credit/4, 27 handle_ra_event/3, 28 untracked_enqueue/2, 29 purge/1, 30 cluster_name/1, 31 update_machine_state/2, 32 pending_size/1, 33 stat/1, 34 stat/2, 35 query_single_active_consumer/1 36 ]). 37 38-include_lib("rabbit_common/include/rabbit.hrl"). 39 40-define(SOFT_LIMIT, 32). 41-define(TIMER_TIME, 10000). 42-define(COMMAND_TIMEOUT, 30000). 43 44-type seq() :: non_neg_integer(). 45-type maybe_seq() :: integer(). 46-type action() :: {send_credit_reply, Available :: non_neg_integer()} | 47 {send_drained, CTagCredit :: 48 {rabbit_fifo:consumer_tag(), non_neg_integer()}}. 49-type actions() :: [action()]. 50 51-type cluster_name() :: rabbit_types:r(queue). 52 53-record(consumer, {last_msg_id :: seq() | -1, 54 ack = false :: boolean(), 55 delivery_count = 0 :: non_neg_integer()}). 56 57-record(cfg, {cluster_name :: cluster_name(), 58 servers = [] :: [ra:server_id()], 59 soft_limit = ?SOFT_LIMIT :: non_neg_integer(), 60 block_handler = fun() -> ok end :: fun(() -> term()), 61 unblock_handler = fun() -> ok end :: fun(() -> ok), 62 timeout :: non_neg_integer(), 63 version = 0 :: non_neg_integer()}). 64 65-record(state, {cfg :: #cfg{}, 66 leader :: undefined | ra:server_id(), 67 queue_status :: undefined | go | reject_publish, 68 next_seq = 0 :: seq(), 69 %% Last applied is initialise to -1 to note that no command has yet been 70 %% applied, but allowing to resend messages if the first ones on the sequence 71 %% are lost (messages are sent from last_applied + 1) 72 last_applied = -1 :: maybe_seq(), 73 next_enqueue_seq = 1 :: seq(), 74 %% indicates that we've exceeded the soft limit 75 slow = false :: boolean(), 76 unsent_commands = #{} :: #{rabbit_fifo:consumer_id() => 77 {[seq()], [seq()], [seq()]}}, 78 pending = #{} :: #{seq() => 79 {term(), rabbit_fifo:command()}}, 80 consumer_deliveries = #{} :: #{rabbit_fifo:consumer_tag() => 81 #consumer{}}, 82 timer_state :: term() 83 }). 84 85-opaque state() :: #state{}. 86 87-export_type([ 88 state/0, 89 actions/0 90 ]). 91 92 93%% @doc Create the initial state for a new rabbit_fifo sessions. A state is needed 94%% to interact with a rabbit_fifo queue using @module. 95%% @param ClusterName the id of the cluster to interact with 96%% @param Servers The known servers of the queue. If the current leader is known 97%% ensure the leader node is at the head of the list. 98-spec init(cluster_name(), [ra:server_id()]) -> state(). 99init(ClusterName, Servers) -> 100 init(ClusterName, Servers, ?SOFT_LIMIT). 101 102%% @doc Create the initial state for a new rabbit_fifo sessions. A state is needed 103%% to interact with a rabbit_fifo queue using @module. 104%% @param ClusterName the id of the cluster to interact with 105%% @param Servers The known servers of the queue. If the current leader is known 106%% ensure the leader node is at the head of the list. 107%% @param MaxPending size defining the max number of pending commands. 108-spec init(cluster_name(), [ra:server_id()], non_neg_integer()) -> state(). 109init(ClusterName = #resource{}, Servers, SoftLimit) -> 110 Timeout = application:get_env(kernel, net_ticktime, 60) + 5, 111 #state{cfg = #cfg{cluster_name = ClusterName, 112 servers = Servers, 113 soft_limit = SoftLimit, 114 timeout = Timeout * 1000}}. 115 116-spec init(cluster_name(), [ra:server_id()], non_neg_integer(), fun(() -> ok), 117 fun(() -> ok)) -> state(). 118init(ClusterName = #resource{}, Servers, SoftLimit, BlockFun, UnblockFun) -> 119 %% net ticktime is in seconds 120 Timeout = application:get_env(kernel, net_ticktime, 60) + 5, 121 #state{cfg = #cfg{cluster_name = ClusterName, 122 servers = Servers, 123 block_handler = BlockFun, 124 unblock_handler = UnblockFun, 125 soft_limit = SoftLimit, 126 timeout = Timeout * 1000}}. 127 128 129%% @doc Enqueues a message. 130%% @param Correlation an arbitrary erlang term used to correlate this 131%% command when it has been applied. 132%% @param Msg an arbitrary erlang term representing the message. 133%% @param State the current {@module} state. 134%% @returns 135%% `{ok | slow, State}' if the command was successfully sent. If the return 136%% tag is `slow' it means the limit is approaching and it is time to slow down 137%% the sending rate. 138%% {@module} assigns a sequence number to every raft command it issues. The 139%% SequenceNumber can be correlated to the applied sequence numbers returned 140%% by the {@link handle_ra_event/2. handle_ra_event/2} function. 141-spec enqueue(Correlation :: term(), Msg :: term(), State :: state()) -> 142 {ok | slow | reject_publish, state()}. 143enqueue(Correlation, Msg, 144 #state{queue_status = undefined, 145 next_enqueue_seq = 1, 146 cfg = #cfg{servers = Servers, 147 timeout = Timeout}} = State0) -> 148 %% it is the first enqueue, check the version 149 {_, Node} = pick_server(State0), 150 case rpc:call(Node, ra_machine, version, [{machine, rabbit_fifo, #{}}]) of 151 0 -> 152 %% the leader is running the old version 153 enqueue(Correlation, Msg, State0#state{queue_status = go}); 154 1 -> 155 %% were running the new version on the leader do sync initialisation 156 %% of enqueuer session 157 Reg = rabbit_fifo:make_register_enqueuer(self()), 158 case ra:process_command(Servers, Reg, Timeout) of 159 {ok, reject_publish, Leader} -> 160 {reject_publish, State0#state{leader = Leader, 161 queue_status = reject_publish}}; 162 {ok, ok, Leader} -> 163 enqueue(Correlation, Msg, State0#state{leader = Leader, 164 queue_status = go}); 165 {error, {no_more_servers_to_try, _Errs}} -> 166 %% if we are not able to process the register command 167 %% it is safe to reject the message as we never attempted 168 %% to send it 169 {reject_publish, State0}; 170 %% TODO: not convinced this can ever happen when using 171 %% a list of servers 172 {timeout, _} -> 173 {reject_publish, State0}; 174 Err -> 175 exit(Err) 176 end; 177 {badrpc, nodedown} -> 178 {reject_publish, State0} 179 end; 180enqueue(_Correlation, _Msg, 181 #state{queue_status = reject_publish, 182 cfg = #cfg{}} = State) -> 183 {reject_publish, State}; 184enqueue(Correlation, Msg, 185 #state{slow = Slow, 186 queue_status = go, 187 cfg = #cfg{block_handler = BlockFun}} = State0) -> 188 Node = pick_server(State0), 189 {Next, State1} = next_enqueue_seq(State0), 190 % by default there is no correlation id 191 Cmd = rabbit_fifo:make_enqueue(self(), Next, Msg), 192 case send_command(Node, Correlation, Cmd, low, State1) of 193 {slow, State} when not Slow -> 194 BlockFun(), 195 {slow, set_timer(State)}; 196 Any -> 197 Any 198 end. 199 200%% @doc Enqueues a message. 201%% @param Msg an arbitrary erlang term representing the message. 202%% @param State the current {@module} state. 203%% @returns 204%% `{ok | slow, State}' if the command was successfully sent. If the return 205%% tag is `slow' it means the limit is approaching and it is time to slow down 206%% the sending rate. 207%% {@module} assigns a sequence number to every raft command it issues. The 208%% SequenceNumber can be correlated to the applied sequence numbers returned 209%% by the {@link handle_ra_event/2. handle_ra_event/2} function. 210%% 211-spec enqueue(Msg :: term(), State :: state()) -> 212 {ok | slow | reject_publish, state()}. 213enqueue(Msg, State) -> 214 enqueue(undefined, Msg, State). 215 216%% @doc Dequeue a message from the queue. 217%% 218%% This is a synchronous call. I.e. the call will block until the command 219%% has been accepted by the ra process or it times out. 220%% 221%% @param ConsumerTag a unique tag to identify this particular consumer. 222%% @param Settlement either `settled' or `unsettled'. When `settled' no 223%% further settlement needs to be done. 224%% @param State The {@module} state. 225%% 226%% @returns `{ok, IdMsg, State}' or `{error | timeout, term()}' 227-spec dequeue(rabbit_fifo:consumer_tag(), 228 Settlement :: settled | unsettled, state()) -> 229 {ok, non_neg_integer(), term(), non_neg_integer()} 230 | {empty, state()} | {error | timeout, term()}. 231dequeue(ConsumerTag, Settlement, 232 #state{cfg = #cfg{timeout = Timeout, 233 cluster_name = QName}} = State0) -> 234 Node = pick_server(State0), 235 ConsumerId = consumer_id(ConsumerTag), 236 case ra:process_command(Node, 237 rabbit_fifo:make_checkout(ConsumerId, 238 {dequeue, Settlement}, 239 #{}), 240 Timeout) of 241 {ok, {dequeue, empty}, Leader} -> 242 {empty, State0#state{leader = Leader}}; 243 {ok, {dequeue, {MsgId, {MsgHeader, Msg0}}, MsgsReady}, Leader} -> 244 Count = case MsgHeader of 245 #{delivery_count := C} -> C; 246 _ -> 0 247 end, 248 IsDelivered = Count > 0, 249 Msg = add_delivery_count_header(Msg0, Count), 250 {ok, MsgsReady, 251 {QName, qref(Leader), MsgId, IsDelivered, Msg}, 252 State0#state{leader = Leader}}; 253 {ok, {error, _} = Err, _Leader} -> 254 Err; 255 Err -> 256 Err 257 end. 258 259add_delivery_count_header(#basic_message{} = Msg0, Count) 260 when is_integer(Count) -> 261 rabbit_basic:add_header(<<"x-delivery-count">>, long, Count, Msg0); 262add_delivery_count_header(Msg, _Count) -> 263 Msg. 264 265 266%% @doc Settle a message. Permanently removes message from the queue. 267%% @param ConsumerTag the tag uniquely identifying the consumer. 268%% @param MsgIds the message ids received with the {@link rabbit_fifo:delivery/0.} 269%% @param State the {@module} state 270%% @returns 271%% `{ok | slow, State}' if the command was successfully sent. If the return 272%% tag is `slow' it means the limit is approaching and it is time to slow down 273%% the sending rate. 274%% 275-spec settle(rabbit_fifo:consumer_tag(), [rabbit_fifo:msg_id()], state()) -> 276 {state(), list()}. 277settle(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) -> 278 Node = pick_server(State0), 279 Cmd = rabbit_fifo:make_settle(consumer_id(ConsumerTag), MsgIds), 280 case send_command(Node, undefined, Cmd, normal, State0) of 281 {_, S} -> 282 % turn slow into ok for this function 283 {S, []} 284 end; 285settle(ConsumerTag, [_|_] = MsgIds, 286 #state{unsent_commands = Unsent0} = State0) -> 287 ConsumerId = consumer_id(ConsumerTag), 288 %% we've reached the soft limit so will stash the command to be 289 %% sent once we have seen enough notifications 290 Unsent = maps:update_with(ConsumerId, 291 fun ({Settles, Returns, Discards}) -> 292 {Settles ++ MsgIds, Returns, Discards} 293 end, {MsgIds, [], []}, Unsent0), 294 {State0#state{unsent_commands = Unsent}, []}. 295 296%% @doc Return a message to the queue. 297%% @param ConsumerTag the tag uniquely identifying the consumer. 298%% @param MsgIds the message ids to return received 299%% from {@link rabbit_fifo:delivery/0.} 300%% @param State the {@module} state 301%% @returns 302%% `{State, list()}' if the command was successfully sent. If the return 303%% tag is `slow' it means the limit is approaching and it is time to slow down 304%% the sending rate. 305%% 306-spec return(rabbit_fifo:consumer_tag(), [rabbit_fifo:msg_id()], state()) -> 307 {state(), list()}. 308return(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) -> 309 Node = pick_server(State0), 310 % TODO: make rabbit_fifo return support lists of message ids 311 Cmd = rabbit_fifo:make_return(consumer_id(ConsumerTag), MsgIds), 312 {_Tag, State1} = send_command(Node, undefined, Cmd, normal, State0), 313 {State1, []}; 314return(ConsumerTag, [_|_] = MsgIds, 315 #state{unsent_commands = Unsent0} = State0) -> 316 ConsumerId = consumer_id(ConsumerTag), 317 %% we've reached the soft limit so will stash the command to be 318 %% sent once we have seen enough notifications 319 Unsent = maps:update_with(ConsumerId, 320 fun ({Settles, Returns, Discards}) -> 321 {Settles, Returns ++ MsgIds, Discards} 322 end, {[], MsgIds, []}, Unsent0), 323 State1 = State0#state{unsent_commands = Unsent}, 324 {State1, []}. 325 326%% @doc Discards a checked out message. 327%% If the queue has a dead_letter_handler configured this will be called. 328%% @param ConsumerTag the tag uniquely identifying the consumer. 329%% @param MsgIds the message ids to discard 330%% from {@link rabbit_fifo:delivery/0.} 331%% @param State the {@module} state 332%% @returns 333%% `{ok | slow, State}' if the command was successfully sent. If the return 334%% tag is `slow' it means the limit is approaching and it is time to slow down 335%% the sending rate. 336-spec discard(rabbit_fifo:consumer_tag(), [rabbit_fifo:msg_id()], state()) -> 337 {state(), list()}. 338discard(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) -> 339 Node = pick_server(State0), 340 Cmd = rabbit_fifo:make_discard(consumer_id(ConsumerTag), MsgIds), 341 case send_command(Node, undefined, Cmd, normal, State0) of 342 {_, S} -> 343 % turn slow into ok for this function 344 {S, []} 345 end; 346discard(ConsumerTag, [_|_] = MsgIds, 347 #state{unsent_commands = Unsent0} = State0) -> 348 ConsumerId = consumer_id(ConsumerTag), 349 %% we've reached the soft limit so will stash the command to be 350 %% sent once we have seen enough notifications 351 Unsent = maps:update_with(ConsumerId, 352 fun ({Settles, Returns, Discards}) -> 353 {Settles, Returns, Discards ++ MsgIds} 354 end, {[], [], MsgIds}, Unsent0), 355 {State0#state{unsent_commands = Unsent}, []}. 356 357%% @doc Register with the rabbit_fifo queue to "checkout" messages as they 358%% become available. 359%% 360%% This is a synchronous call. I.e. the call will block until the command 361%% has been accepted by the ra process or it times out. 362%% 363%% @param ConsumerTag a unique tag to identify this particular consumer. 364%% @param NumUnsettled the maximum number of in-flight messages. Once this 365%% number of messages has been received but not settled no further messages 366%% will be delivered to the consumer. 367%% @param CreditMode The credit mode to use for the checkout. 368%% simple_prefetch: credit is auto topped up as deliveries are settled 369%% credited: credit is only increased by sending credit to the queue 370%% @param State The {@module} state. 371%% 372%% @returns `{ok, State}' or `{error | timeout, term()}' 373-spec checkout(rabbit_fifo:consumer_tag(), 374 NumUnsettled :: non_neg_integer(), 375 CreditMode :: rabbit_fifo:credit_mode(), 376 Meta :: rabbit_fifo:consumer_meta(), 377 state()) -> {ok, state()} | {error | timeout, term()}. 378checkout(ConsumerTag, NumUnsettled, CreditMode, Meta, 379 #state{consumer_deliveries = CDels0} = State0) -> 380 Servers = sorted_servers(State0), 381 ConsumerId = {ConsumerTag, self()}, 382 Cmd = rabbit_fifo:make_checkout(ConsumerId, 383 {auto, NumUnsettled, CreditMode}, 384 Meta), 385 %% ??? 386 Ack = maps:get(ack, Meta, true), 387 388 SDels = maps:update_with(ConsumerTag, 389 fun (V) -> 390 V#consumer{ack = Ack} 391 end, 392 #consumer{last_msg_id = -1, 393 ack = Ack}, CDels0), 394 try_process_command(Servers, Cmd, State0#state{consumer_deliveries = SDels}). 395 396 397-spec query_single_active_consumer(state()) -> 398 {ok, term()} | {error, term()} | {timeout, term()}. 399query_single_active_consumer(#state{leader = undefined}) -> 400 {error, leader_not_known}; 401query_single_active_consumer(#state{leader = Leader}) -> 402 case ra:local_query(Leader, fun rabbit_fifo:query_single_active_consumer/1, 403 ?COMMAND_TIMEOUT) of 404 {ok, {_, Reply}, _} -> 405 {ok, Reply}; 406 Err -> 407 Err 408 end. 409 410%% @doc Provide credit to the queue 411%% 412%% This only has an effect if the consumer uses credit mode: credited 413%% @param ConsumerTag a unique tag to identify this particular consumer. 414%% @param Credit the amount of credit to provide to theq queue 415%% @param Drain tells the queue to use up any credit that cannot be immediately 416%% fulfilled. (i.e. there are not enough messages on queue to use up all the 417%% provided credit). 418-spec credit(rabbit_fifo:consumer_tag(), 419 Credit :: non_neg_integer(), 420 Drain :: boolean(), 421 state()) -> 422 {state(), actions()}. 423credit(ConsumerTag, Credit, Drain, 424 #state{consumer_deliveries = CDels} = State0) -> 425 ConsumerId = consumer_id(ConsumerTag), 426 %% the last received msgid provides us with the delivery count if we 427 %% add one as it is 0 indexed 428 C = maps:get(ConsumerTag, CDels, #consumer{last_msg_id = -1}), 429 Node = pick_server(State0), 430 Cmd = rabbit_fifo:make_credit(ConsumerId, Credit, 431 C#consumer.last_msg_id + 1, Drain), 432 case send_command(Node, undefined, Cmd, normal, State0) of 433 {_, S} -> 434 % turn slow into ok for this function 435 {S, []} 436 end. 437 438%% @doc Cancels a checkout with the rabbit_fifo queue for the consumer tag 439%% 440%% This is a synchronous call. I.e. the call will block until the command 441%% has been accepted by the ra process or it times out. 442%% 443%% @param ConsumerTag a unique tag to identify this particular consumer. 444%% @param State The {@module} state. 445%% 446%% @returns `{ok, State}' or `{error | timeout, term()}' 447-spec cancel_checkout(rabbit_fifo:consumer_tag(), state()) -> 448 {ok, state()} | {error | timeout, term()}. 449cancel_checkout(ConsumerTag, #state{consumer_deliveries = CDels} = State0) -> 450 Servers = sorted_servers(State0), 451 ConsumerId = {ConsumerTag, self()}, 452 Cmd = rabbit_fifo:make_checkout(ConsumerId, cancel, #{}), 453 State = State0#state{consumer_deliveries = maps:remove(ConsumerTag, CDels)}, 454 try_process_command(Servers, Cmd, State). 455 456%% @doc Purges all the messages from a rabbit_fifo queue and returns the number 457%% of messages purged. 458-spec purge(ra:server_id()) -> {ok, non_neg_integer()} | {error | timeout, term()}. 459purge(Server) -> 460 case ra:process_command(Server, rabbit_fifo:make_purge(), ?COMMAND_TIMEOUT) of 461 {ok, {purge, Reply}, _} -> 462 {ok, Reply}; 463 Err -> 464 Err 465 end. 466 467-spec pending_size(state()) -> non_neg_integer(). 468pending_size(#state{pending = Pend}) -> 469 maps:size(Pend). 470 471-spec stat(ra:server_id()) -> 472 {ok, non_neg_integer(), non_neg_integer()} 473 | {error | timeout, term()}. 474stat(Leader) -> 475 %% short timeout as we don't want to spend too long if it is going to 476 %% fail anyway 477 stat(Leader, 250). 478 479-spec stat(ra:server_id(), non_neg_integer()) -> 480 {ok, non_neg_integer(), non_neg_integer()} 481 | {error | timeout, term()}. 482stat(Leader, Timeout) -> 483 %% short timeout as we don't want to spend too long if it is going to 484 %% fail anyway 485 case ra:local_query(Leader, fun rabbit_fifo:query_stat/1, Timeout) of 486 {ok, {_, {R, C}}, _} -> {ok, R, C}; 487 {error, _} = Error -> Error; 488 {timeout, _} = Error -> Error 489 end. 490 491%% @doc returns the cluster name 492-spec cluster_name(state()) -> cluster_name(). 493cluster_name(#state{cfg = #cfg{cluster_name = ClusterName}}) -> 494 ClusterName. 495 496update_machine_state(Server, Conf) -> 497 case ra:process_command(Server, rabbit_fifo:make_update_config(Conf), ?COMMAND_TIMEOUT) of 498 {ok, ok, _} -> 499 ok; 500 Err -> 501 Err 502 end. 503 504%% @doc Handles incoming `ra_events'. Events carry both internal "bookeeping" 505%% events emitted by the `ra' leader as well as `rabbit_fifo' emitted events such 506%% as message deliveries. All ra events need to be handled by {@module} 507%% to ensure bookeeping, resends and flow control is correctly handled. 508%% 509%% If the `ra_event' contains a `rabbit_fifo' generated message it will be returned 510%% for further processing. 511%% 512%% Example: 513%% 514%% ``` 515%% receive 516%% {ra_event, From, Evt} -> 517%% case rabbit_fifo_client:handle_ra_event(From, Evt, State0) of 518%% {internal, _Seq, State} -> State; 519%% {{delivery, _ConsumerTag, Msgs}, State} -> 520%% handle_messages(Msgs), 521%% ... 522%% end 523%% end 524%% ''' 525%% 526%% @param From the {@link ra:server_id().} of the sending process. 527%% @param Event the body of the `ra_event'. 528%% @param State the current {@module} state. 529%% 530%% @returns 531%% `{internal, AppliedCorrelations, State}' if the event contained an internally 532%% handled event such as a notification and a correlation was included with 533%% the command (e.g. in a call to `enqueue/3' the correlation terms are returned 534%% here. 535%% 536%% `{RaFifoEvent, State}' if the event contained a client message generated by 537%% the `rabbit_fifo' state machine such as a delivery. 538%% 539%% The type of `rabbit_fifo' client messages that can be received are: 540%% 541%% `{delivery, ConsumerTag, [{MsgId, {MsgHeader, Msg}}]}' 542%% 543%% <li>`ConsumerTag' the binary tag passed to {@link checkout/3.}</li> 544%% <li>`MsgId' is a consumer scoped monotonically incrementing id that can be 545%% used to {@link settle/3.} (roughly: AMQP 0.9.1 ack) message once finished 546%% with them.</li> 547-spec handle_ra_event(ra:server_id(), ra_server_proc:ra_event_body(), state()) -> 548 {internal, Correlators :: [term()], actions(), state()} | 549 {rabbit_fifo:client_msg(), state()} | eol. 550handle_ra_event(From, {applied, Seqs}, 551 #state{cfg = #cfg{cluster_name = QRef, 552 soft_limit = SftLmt, 553 unblock_handler = UnblockFun}} = State0) -> 554 555 {Corrs, Actions0, State1} = lists:foldl(fun seq_applied/2, 556 {[], [], State0#state{leader = From}}, 557 Seqs), 558 Actions = case Corrs of 559 [] -> 560 lists:reverse(Actions0); 561 _ -> 562 [{settled, QRef, Corrs} 563 | lists:reverse(Actions0)] 564 end, 565 case maps:size(State1#state.pending) < SftLmt of 566 true when State1#state.slow == true -> 567 % we have exited soft limit state 568 % send any unsent commands and cancel the time as 569 % TODO: really the timer should only be cancelled when the channel 570 % exits flow state (which depends on the state of all queues the 571 % channel is interacting with) 572 % but the fact the queue has just applied suggests 573 % it's ok to cancel here anyway 574 State2 = cancel_timer(State1#state{slow = false, 575 unsent_commands = #{}}), 576 % build up a list of commands to issue 577 Commands = maps:fold( 578 fun (Cid, {Settled, Returns, Discards}, Acc) -> 579 add_command(Cid, settle, Settled, 580 add_command(Cid, return, Returns, 581 add_command(Cid, discard, 582 Discards, Acc))) 583 end, [], State1#state.unsent_commands), 584 Node = pick_server(State2), 585 %% send all the settlements and returns 586 State = lists:foldl(fun (C, S0) -> 587 case send_command(Node, undefined, 588 C, normal, S0) of 589 {T, S} when T =/= error -> 590 S 591 end 592 end, State2, Commands), 593 UnblockFun(), 594 {ok, State, Actions}; 595 _ -> 596 {ok, State1, Actions} 597 end; 598handle_ra_event(From, {machine, {delivery, _ConsumerTag, _} = Del}, State0) -> 599 handle_delivery(From, Del, State0); 600handle_ra_event(_, {machine, {queue_status, Status}}, 601 #state{} = State) -> 602 %% just set the queue status 603 {ok, State#state{queue_status = Status}, []}; 604handle_ra_event(Leader, {machine, leader_change}, 605 #state{leader = Leader} = State) -> 606 %% leader already known 607 {ok, State, []}; 608handle_ra_event(Leader, {machine, leader_change}, State0) -> 609 %% we need to update leader 610 %% and resend any pending commands 611 State = resend_all_pending(State0#state{leader = Leader}), 612 {ok, cancel_timer(State), []}; 613handle_ra_event(_From, {rejected, {not_leader, undefined, _Seq}}, State0) -> 614 % TODO: how should these be handled? re-sent on timer or try random 615 {ok, State0, []}; 616handle_ra_event(_From, {rejected, {not_leader, Leader, Seq}}, State0) -> 617 State1 = State0#state{leader = Leader}, 618 State = resend(Seq, State1), 619 {ok, State, []}; 620handle_ra_event(_, timeout, #state{cfg = #cfg{servers = Servers}} = State0) -> 621 case find_leader(Servers) of 622 undefined -> 623 %% still no leader, set the timer again 624 {ok, set_timer(State0), []}; 625 Leader -> 626 State = resend_all_pending(State0#state{leader = Leader}), 627 {ok, State, []} 628 end; 629handle_ra_event(_Leader, {machine, eol}, _State0) -> 630 eol. 631 632%% @doc Attempts to enqueue a message using cast semantics. This provides no 633%% guarantees or retries if the message fails to achieve consensus or if the 634%% servers sent to happens not to be available. If the message is sent to a 635%% follower it will attempt the deliver it to the leader, if known. Else it will 636%% drop the messages. 637%% 638%% NB: only use this for non-critical enqueues where a full rabbit_fifo_client state 639%% cannot be maintained. 640%% 641%% @param CusterId the cluster id. 642%% @param Servers the known servers in the cluster. 643%% @param Msg the message to enqueue. 644%% 645%% @returns `ok' 646-spec untracked_enqueue([ra:server_id()], term()) -> 647 ok. 648untracked_enqueue([Node | _], Msg) -> 649 Cmd = rabbit_fifo:make_enqueue(undefined, undefined, Msg), 650 ok = ra:pipeline_command(Node, Cmd), 651 ok. 652 653%% Internal 654 655try_process_command([Server | Rem], Cmd, 656 #state{cfg = #cfg{timeout = Timeout}} = State) -> 657 case ra:process_command(Server, Cmd, Timeout) of 658 {ok, _, Leader} -> 659 {ok, State#state{leader = Leader}}; 660 Err when length(Rem) =:= 0 -> 661 Err; 662 _ -> 663 try_process_command(Rem, Cmd, State) 664 end. 665 666seq_applied({Seq, MaybeAction}, 667 {Corrs, Actions0, #state{last_applied = Last} = State0}) 668 when Seq > Last -> 669 State1 = do_resends(Last+1, Seq-1, State0), 670 {Actions, State} = maybe_add_action(MaybeAction, Actions0, State1), 671 case maps:take(Seq, State#state.pending) of 672 {{undefined, _}, Pending} -> 673 {Corrs, Actions, State#state{pending = Pending, 674 last_applied = Seq}}; 675 {{Corr, _}, Pending} -> 676 {[Corr | Corrs], Actions, State#state{pending = Pending, 677 last_applied = Seq}}; 678 error -> 679 % must have already been resent or removed for some other reason 680 % still need to update last_applied or we may inadvertently resend 681 % stuff later 682 {Corrs, Actions, State#state{last_applied = Seq}} 683 end; 684seq_applied(_Seq, Acc) -> 685 Acc. 686 687maybe_add_action(ok, Acc, State) -> 688 {Acc, State}; 689maybe_add_action({multi, Actions}, Acc0, State0) -> 690 lists:foldl(fun (Act, {Acc, State}) -> 691 maybe_add_action(Act, Acc, State) 692 end, {Acc0, State0}, Actions); 693maybe_add_action({send_drained, {Tag, Credit}} = Action, Acc, 694 #state{consumer_deliveries = CDels} = State) -> 695 %% add credit to consumer delivery_count 696 C = maps:get(Tag, CDels), 697 {[Action | Acc], 698 State#state{consumer_deliveries = 699 update_consumer(Tag, C#consumer.last_msg_id, 700 Credit, C, CDels)}}; 701maybe_add_action(Action, Acc, State) -> 702 %% anything else is assumed to be an action 703 {[Action | Acc], State}. 704 705do_resends(From, To, State) when From =< To -> 706 lists:foldl(fun resend/2, State, lists:seq(From, To)); 707do_resends(_, _, State) -> 708 State. 709 710% resends a command with a new sequence number 711resend(OldSeq, #state{pending = Pending0, leader = Leader} = State) -> 712 case maps:take(OldSeq, Pending0) of 713 {{Corr, Cmd}, Pending} -> 714 %% resends aren't subject to flow control here 715 resend_command(Leader, Corr, Cmd, State#state{pending = Pending}); 716 error -> 717 State 718 end. 719 720resend_all_pending(#state{pending = Pend} = State) -> 721 Seqs = lists:sort(maps:keys(Pend)), 722 lists:foldl(fun resend/2, State, Seqs). 723 724maybe_auto_ack(true, Deliver, State0) -> 725 %% manual ack is enabled 726 {ok, State0, [Deliver]}; 727maybe_auto_ack(false, {deliver, Tag, _Ack, Msgs} = Deliver, State0) -> 728 %% we have to auto ack these deliveries 729 MsgIds = [I || {_, _, I, _, _} <- Msgs], 730 {State, Actions} = settle(Tag, MsgIds, State0), 731 {ok, State, [Deliver] ++ Actions}. 732 733handle_delivery(Leader, {delivery, Tag, [{FstId, _} | _] = IdMsgs}, 734 #state{cfg = #cfg{cluster_name = QName}, 735 consumer_deliveries = CDels0} = State0) 736 when is_map_key(Tag, CDels0) -> 737 QRef = qref(Leader), 738 {LastId, _} = lists:last(IdMsgs), 739 Consumer = #consumer{ack = Ack} = maps:get(Tag, CDels0), 740 %% format as a deliver action 741 Del = {deliver, Tag, Ack, transform_msgs(QName, QRef, IdMsgs)}, 742 %% TODO: remove potential default allocation 743 case Consumer of 744 #consumer{last_msg_id = Prev} = C 745 when FstId =:= Prev+1 -> 746 maybe_auto_ack(Ack, Del, 747 State0#state{consumer_deliveries = 748 update_consumer(Tag, LastId, 749 length(IdMsgs), C, 750 CDels0)}); 751 #consumer{last_msg_id = Prev} = C 752 when FstId > Prev+1 -> 753 NumMissing = FstId - Prev + 1, 754 %% there may actually be fewer missing messages returned than expected 755 %% This can happen when a node the channel is on gets disconnected 756 %% from the node the leader is on and then reconnected afterwards. 757 %% When the node is disconnected the leader will return all checked 758 %% out messages to the main queue to ensure they don't get stuck in 759 %% case the node never comes back. 760 case get_missing_deliveries(Leader, Prev+1, FstId-1, Tag) of 761 {protocol_error, _, _, _} = Err -> 762 Err; 763 Missing -> 764 XDel = {deliver, Tag, Ack, transform_msgs(QName, QRef, 765 Missing ++ IdMsgs)}, 766 maybe_auto_ack(Ack, XDel, 767 State0#state{consumer_deliveries = 768 update_consumer(Tag, LastId, 769 length(IdMsgs) + NumMissing, 770 C, CDels0)}) 771 end; 772 #consumer{last_msg_id = Prev} 773 when FstId =< Prev -> 774 case lists:dropwhile(fun({Id, _}) -> Id =< Prev end, IdMsgs) of 775 [] -> 776 {ok, State0, []}; 777 IdMsgs2 -> 778 handle_delivery(Leader, {delivery, Tag, IdMsgs2}, State0) 779 end; 780 C when FstId =:= 0 -> 781 % the very first delivery 782 maybe_auto_ack(Ack, Del, 783 State0#state{consumer_deliveries = 784 update_consumer(Tag, LastId, 785 length(IdMsgs), 786 C#consumer{last_msg_id = LastId}, 787 CDels0)}) 788 end; 789handle_delivery(_Leader, {delivery, Tag, [_ | _] = IdMsgs}, 790 #state{consumer_deliveries = CDels0} = State0) 791 when not is_map_key(Tag, CDels0) -> 792 %% Note: 793 %% https://github.com/rabbitmq/rabbitmq-server/issues/3729 794 %% If the consumer is no longer in the deliveries map, 795 %% we should return all messages. 796 MsgIntIds = [Id || {Id, _} <- IdMsgs], 797 {State1, Deliveries} = return(Tag, MsgIntIds, State0), 798 {ok, State1, Deliveries}. 799 800transform_msgs(QName, QRef, Msgs) -> 801 lists:map( 802 fun({MsgId, {MsgHeader, Msg0}}) -> 803 {Msg, Redelivered} = case MsgHeader of 804 #{delivery_count := C} -> 805 {add_delivery_count_header(Msg0, C), true}; 806 _ -> 807 {Msg0, false} 808 end, 809 {QName, QRef, MsgId, Redelivered, Msg} 810 end, Msgs). 811 812update_consumer(Tag, LastId, DelCntIncr, 813 #consumer{delivery_count = D} = C, Consumers) -> 814 maps:put(Tag, 815 C#consumer{last_msg_id = LastId, 816 delivery_count = D + DelCntIncr}, 817 Consumers). 818 819 820get_missing_deliveries(Leader, From, To, ConsumerTag) -> 821 ConsumerId = consumer_id(ConsumerTag), 822 % ?INFO("get_missing_deliveries for ~w from ~b to ~b", 823 % [ConsumerId, From, To]), 824 Query = fun (State) -> 825 rabbit_fifo:get_checked_out(ConsumerId, From, To, State) 826 end, 827 case ra:local_query(Leader, Query, ?COMMAND_TIMEOUT) of 828 {ok, {_, Missing}, _} -> 829 Missing; 830 {error, Error} -> 831 {protocol_error, internal_error, "Cannot query missing deliveries from ~p: ~p", 832 [Leader, Error]}; 833 {timeout, _} -> 834 {protocol_error, internal_error, "Cannot query missing deliveries from ~p: timeout", 835 [Leader]} 836 end. 837 838pick_server(#state{leader = undefined, 839 cfg = #cfg{servers = [N | _]}}) -> 840 %% TODO: pick random rather that first? 841 N; 842pick_server(#state{leader = Leader}) -> 843 Leader. 844 845% servers sorted by last known leader 846sorted_servers(#state{leader = undefined, 847 cfg = #cfg{servers = Servers}}) -> 848 Servers; 849sorted_servers(#state{leader = Leader, 850 cfg = #cfg{servers = Servers}}) -> 851 [Leader | lists:delete(Leader, Servers)]. 852 853next_seq(#state{next_seq = Seq} = State) -> 854 {Seq, State#state{next_seq = Seq + 1}}. 855 856next_enqueue_seq(#state{next_enqueue_seq = Seq} = State) -> 857 {Seq, State#state{next_enqueue_seq = Seq + 1}}. 858 859consumer_id(ConsumerTag) -> 860 {ConsumerTag, self()}. 861 862send_command(Server, Correlation, Command, Priority, 863 #state{pending = Pending, 864 cfg = #cfg{soft_limit = SftLmt}} = State0) -> 865 {Seq, State} = next_seq(State0), 866 ok = ra:pipeline_command(Server, Command, Seq, Priority), 867 Tag = case maps:size(Pending) >= SftLmt of 868 true -> slow; 869 false -> ok 870 end, 871 {Tag, State#state{pending = Pending#{Seq => {Correlation, Command}}, 872 slow = Tag == slow}}. 873 874resend_command(Node, Correlation, Command, 875 #state{pending = Pending} = State0) -> 876 {Seq, State} = next_seq(State0), 877 ok = ra:pipeline_command(Node, Command, Seq), 878 State#state{pending = Pending#{Seq => {Correlation, Command}}}. 879 880add_command(_, _, [], Acc) -> 881 Acc; 882add_command(Cid, settle, MsgIds, Acc) -> 883 [rabbit_fifo:make_settle(Cid, MsgIds) | Acc]; 884add_command(Cid, return, MsgIds, Acc) -> 885 [rabbit_fifo:make_return(Cid, MsgIds) | Acc]; 886add_command(Cid, discard, MsgIds, Acc) -> 887 [rabbit_fifo:make_discard(Cid, MsgIds) | Acc]. 888 889set_timer(#state{leader = Leader0, 890 cfg = #cfg{servers = [Server | _], 891 cluster_name = QName}} = State) -> 892 Leader = case Leader0 of 893 undefined -> Server; 894 _ -> 895 Leader0 896 end, 897 Ref = erlang:send_after(?TIMER_TIME, self(), 898 {'$gen_cast', 899 {queue_event, QName, {Leader, timeout}}}), 900 State#state{timer_state = Ref}. 901 902cancel_timer(#state{timer_state = undefined} = State) -> 903 State; 904cancel_timer(#state{timer_state = Ref} = State) -> 905 erlang:cancel_timer(Ref, [{async, true}, {info, false}]), 906 State#state{timer_state = undefined}. 907 908find_leader([]) -> 909 undefined; 910find_leader([Server | Servers]) -> 911 case ra:members(Server, 500) of 912 {ok, _, Leader} -> Leader; 913 _ -> 914 find_leader(Servers) 915 end. 916 917qref({Ref, _}) -> Ref; 918qref(Ref) -> Ref. 919