1-module(rabbit_queue_type). 2-include("amqqueue.hrl"). 3-include_lib("rabbit_common/include/resource.hrl"). 4 5-export([ 6 init/0, 7 close/1, 8 discover/1, 9 default/0, 10 is_enabled/1, 11 declare/2, 12 delete/4, 13 is_recoverable/1, 14 recover/2, 15 purge/1, 16 policy_changed/1, 17 stat/1, 18 remove/2, 19 info/2, 20 state_info/1, 21 info_down/2, 22 info_down/3, 23 %% stateful client API 24 new/2, 25 consume/3, 26 cancel/5, 27 handle_down/3, 28 handle_event/3, 29 module/2, 30 deliver/3, 31 settle/5, 32 credit/5, 33 dequeue/5, 34 fold_state/3, 35 is_policy_applicable/2, 36 is_server_named_allowed/1, 37 notify_decorators/1 38 ]). 39 40-type queue_name() :: rabbit_types:r(queue). 41-type queue_ref() :: queue_name() | atom(). 42-type queue_state() :: term(). 43-type msg_tag() :: term(). 44 45-define(STATE, ?MODULE). 46 47%% Recoverable slaves shouldn't really be a generic one, but let's keep it here until 48%% mirrored queues are deprecated. 49-define(DOWN_KEYS, [name, durable, auto_delete, arguments, pid, recoverable_slaves, type, state]). 50 51-define(QREF(QueueReference), 52 (is_tuple(QueueReference) andalso element(1, QueueReference) == resource) 53 orelse is_atom(QueueReference)). 54%% anything that the host process needs to do on behalf of the queue type 55%% session, like knowing when to notify on monitor down 56-type action() :: 57 {monitor, Pid :: pid(), queue_ref()} | 58 %% indicate to the queue type module that a message has been delivered 59 %% fully to the queue 60 {settled, Success :: boolean(), [msg_tag()]} | 61 {deliver, rabbit_types:ctag(), boolean(), [rabbit_amqqueue:qmsg()]}. 62 63-type actions() :: [action()]. 64 65-type event() :: 66 {down, pid(), Info :: term()} | 67 term(). 68 69-record(ctx, {module :: module(), 70 name :: queue_name(), 71 %% "publisher confirm queue accounting" 72 %% queue type implementation should emit a: 73 %% {settle, Success :: boolean(), msg_tag()} 74 %% to either settle or reject the delivery of a 75 %% message to the queue instance 76 %% The queue type module will then emit a {confirm | reject, [msg_tag()} 77 %% action to the channel or channel like process when a msg_tag 78 %% has reached its conclusion 79 state :: queue_state()}). 80 81 82-record(?STATE, {ctxs = #{} :: #{queue_ref() => #ctx{} | queue_ref()}, 83 monitor_registry = #{} :: #{pid() => queue_ref()} 84 }). 85 86-opaque state() :: #?STATE{}. 87 88-type consume_spec() :: #{no_ack := boolean(), 89 channel_pid := pid(), 90 limiter_pid => pid(), 91 limiter_active => boolean(), 92 prefetch_count => non_neg_integer(), 93 consumer_tag := rabbit_types:ctag(), 94 exclusive_consume => boolean(), 95 args => rabbit_framing:amqp_table(), 96 ok_msg := term(), 97 acting_user := rabbit_types:username()}. 98 99 100 101% copied from rabbit_amqqueue 102-type absent_reason() :: 'nodedown' | 'crashed' | stopped | timeout. 103 104-type settle_op() :: 'complete' | 'requeue' | 'discard'. 105 106-export_type([state/0, 107 consume_spec/0, 108 action/0, 109 actions/0, 110 settle_op/0]). 111 112%% is the queue type feature enabled 113-callback is_enabled() -> boolean(). 114 115-callback declare(amqqueue:amqqueue(), node()) -> 116 {'new' | 'existing' | 'owner_died', amqqueue:amqqueue()} | 117 {'absent', amqqueue:amqqueue(), absent_reason()} | 118 {'protocol_error', Type :: atom(), Reason :: string(), Args :: term()}. 119 120-callback delete(amqqueue:amqqueue(), 121 boolean(), 122 boolean(), 123 rabbit_types:username()) -> 124 rabbit_types:ok(non_neg_integer()) | 125 rabbit_types:error(in_use | not_empty) | 126 {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}. 127 128-callback recover(rabbit_types:vhost(), [amqqueue:amqqueue()]) -> 129 {Recovered :: [amqqueue:amqqueue()], 130 Failed :: [amqqueue:amqqueue()]}. 131 132%% checks if the queue should be recovered 133-callback is_recoverable(amqqueue:amqqueue()) -> 134 boolean(). 135 136-callback purge(amqqueue:amqqueue()) -> 137 {ok, non_neg_integer()} | {error, term()}. 138 139-callback policy_changed(amqqueue:amqqueue()) -> ok. 140 141%% stateful 142%% intitialise and return a queue type specific session context 143-callback init(amqqueue:amqqueue()) -> {ok, queue_state()} | {error, Reason :: term()}. 144 145-callback close(queue_state()) -> ok. 146%% update the queue type state from amqqrecord 147-callback update(amqqueue:amqqueue(), queue_state()) -> queue_state(). 148 149-callback consume(amqqueue:amqqueue(), 150 consume_spec(), 151 queue_state()) -> 152 {ok, queue_state(), actions()} | {error, term()} | 153 {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}. 154 155-callback cancel(amqqueue:amqqueue(), 156 rabbit_types:ctag(), 157 term(), 158 rabbit_types:username(), 159 queue_state()) -> 160 {ok, queue_state()} | {error, term()}. 161 162%% any async events returned from the queue system should be processed through 163%% this 164-callback handle_event(Event :: event(), 165 queue_state()) -> 166 {ok, queue_state(), actions()} | {error, term()} | eol | 167 {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}. 168 169-callback deliver([{amqqueue:amqqueue(), queue_state()}], 170 Delivery :: term()) -> 171 {[{amqqueue:amqqueue(), queue_state()}], actions()}. 172 173-callback settle(settle_op(), rabbit_types:ctag(), [non_neg_integer()], queue_state()) -> 174 {queue_state(), actions()} | 175 {'protocol_error', Type :: atom(), Reason :: string(), Args :: term()}. 176 177-callback credit(rabbit_types:ctag(), 178 non_neg_integer(), Drain :: boolean(), queue_state()) -> 179 {queue_state(), actions()}. 180 181-callback dequeue(NoAck :: boolean(), LimiterPid :: pid(), 182 rabbit_types:ctag(), queue_state()) -> 183 {ok, Count :: non_neg_integer(), rabbit_amqqueue:qmsg(), queue_state()} | 184 {empty, queue_state()} | 185 {error, term()} | 186 {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}. 187 188%% return a map of state summary information 189-callback state_info(queue_state()) -> 190 #{atom() := term()}. 191 192%% general queue info 193-callback info(amqqueue:amqqueue(), all_keys | rabbit_types:info_keys()) -> 194 rabbit_types:infos(). 195 196-callback stat(amqqueue:amqqueue()) -> 197 {'ok', non_neg_integer(), non_neg_integer()}. 198 199-callback capabilities() -> 200 #{atom() := term()}. 201 202-callback notify_decorators(amqqueue:amqqueue()) -> 203 ok. 204 205%% TODO: this should be controlled by a registry that is populated on boot 206discover(<<"quorum">>) -> 207 rabbit_quorum_queue; 208discover(<<"classic">>) -> 209 rabbit_classic_queue; 210discover(<<"stream">>) -> 211 rabbit_stream_queue. 212 213default() -> 214 rabbit_classic_queue. 215 216-spec is_enabled(module()) -> boolean(). 217is_enabled(Type) -> 218 Type:is_enabled(). 219 220-spec declare(amqqueue:amqqueue(), node()) -> 221 {'new' | 'existing' | 'owner_died', amqqueue:amqqueue()} | 222 {'absent', amqqueue:amqqueue(), absent_reason()} | 223 {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}. 224declare(Q0, Node) -> 225 Q = rabbit_queue_decorator:set(rabbit_policy:set(Q0)), 226 Mod = amqqueue:get_type(Q), 227 Mod:declare(Q, Node). 228 229-spec delete(amqqueue:amqqueue(), boolean(), 230 boolean(), rabbit_types:username()) -> 231 rabbit_types:ok(non_neg_integer()) | 232 rabbit_types:error(in_use | not_empty) | 233 {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}. 234delete(Q, IfUnused, IfEmpty, ActingUser) -> 235 Mod = amqqueue:get_type(Q), 236 Mod:delete(Q, IfUnused, IfEmpty, ActingUser). 237 238-spec purge(amqqueue:amqqueue()) -> 239 {'ok', non_neg_integer()} | {error, term()}. 240purge(Q) -> 241 Mod = amqqueue:get_type(Q), 242 Mod:purge(Q). 243 244-spec policy_changed(amqqueue:amqqueue()) -> 'ok'. 245policy_changed(Q) -> 246 Mod = amqqueue:get_type(Q), 247 Mod:policy_changed(Q). 248 249-spec stat(amqqueue:amqqueue()) -> 250 {'ok', non_neg_integer(), non_neg_integer()}. 251stat(Q) -> 252 Mod = amqqueue:get_type(Q), 253 Mod:stat(Q). 254 255-spec remove(queue_ref(), state()) -> state(). 256remove(QRef, #?STATE{ctxs = Ctxs0} = State) -> 257 case maps:take(QRef, Ctxs0) of 258 error -> 259 State; 260 {_, Ctxs} -> 261 State#?STATE{ctxs = Ctxs} 262 end. 263 264-spec info(amqqueue:amqqueue(), all_keys | rabbit_types:info_keys()) -> 265 rabbit_types:infos(). 266info(Q, Items) when ?amqqueue_state_is(Q, crashed) -> 267 info_down(Q, Items, crashed); 268info(Q, Items) when ?amqqueue_state_is(Q, stopped) -> 269 info_down(Q, Items, stopped); 270info(Q, Items) -> 271 Mod = amqqueue:get_type(Q), 272 Mod:info(Q, Items). 273 274fold_state(Fun, Acc, #?STATE{ctxs = Ctxs}) -> 275 maps:fold(Fun, Acc, Ctxs). 276 277state_info(#ctx{state = S, 278 module = Mod}) -> 279 Mod:state_info(S); 280state_info(_) -> 281 #{}. 282 283down_keys() -> ?DOWN_KEYS. 284 285info_down(Q, DownReason) -> 286 info_down(Q, down_keys(), DownReason). 287 288info_down(Q, all_keys, DownReason) -> 289 info_down(Q, down_keys(), DownReason); 290info_down(Q, Items, DownReason) -> 291 [{Item, i_down(Item, Q, DownReason)} || Item <- Items]. 292 293i_down(name, Q, _) -> amqqueue:get_name(Q); 294i_down(durable, Q, _) -> amqqueue:is_durable(Q); 295i_down(auto_delete, Q, _) -> amqqueue:is_auto_delete(Q); 296i_down(arguments, Q, _) -> amqqueue:get_arguments(Q); 297i_down(pid, Q, _) -> amqqueue:get_pid(Q); 298i_down(recoverable_slaves, Q, _) -> amqqueue:get_recoverable_slaves(Q); 299i_down(type, Q, _) -> amqqueue:get_type(Q); 300i_down(state, _Q, DownReason) -> DownReason; 301i_down(_K, _Q, _DownReason) -> ''. 302 303is_policy_applicable(Q, Policy) -> 304 Mod = amqqueue:get_type(Q), 305 Capabilities = Mod:capabilities(), 306 NotApplicable = maps:get(unsupported_policies, Capabilities, []), 307 lists:all(fun({P, _}) -> 308 not lists:member(P, NotApplicable) 309 end, Policy). 310 311is_server_named_allowed(Type) -> 312 Capabilities = Type:capabilities(), 313 maps:get(server_named, Capabilities, false). 314 315notify_decorators(Q) -> 316 Mod = amqqueue:get_type(Q), 317 Mod:notify_decorators(Q). 318 319-spec init() -> state(). 320init() -> 321 #?STATE{}. 322 323-spec close(state()) -> ok. 324close(#?STATE{ctxs = Contexts}) -> 325 _ = maps:map( 326 fun (_, #ctx{module = Mod, 327 state = S}) -> 328 ok = Mod:close(S) 329 end, Contexts), 330 ok. 331 332-spec new(amqqueue:amqqueue(), state()) -> state(). 333new(Q, State) when ?is_amqqueue(Q) -> 334 Ctx = get_ctx(Q, State), 335 set_ctx(Q, Ctx, State). 336 337-spec consume(amqqueue:amqqueue(), consume_spec(), state()) -> 338 {ok, state(), actions()} | {error, term()}. 339consume(Q, Spec, State) -> 340 #ctx{state = CtxState0} = Ctx = get_ctx(Q, State), 341 Mod = amqqueue:get_type(Q), 342 case Mod:consume(Q, Spec, CtxState0) of 343 {ok, CtxState, Actions} -> 344 return_ok(set_ctx(Q, Ctx#ctx{state = CtxState}, State), Actions); 345 Err -> 346 Err 347 end. 348 349%% TODO switch to cancel spec api 350-spec cancel(amqqueue:amqqueue(), 351 rabbit_types:ctag(), 352 term(), 353 rabbit_types:username(), 354 state()) -> 355 {ok, state()} | {error, term()}. 356cancel(Q, Tag, OkMsg, ActiveUser, Ctxs) -> 357 #ctx{state = State0} = Ctx = get_ctx(Q, Ctxs), 358 Mod = amqqueue:get_type(Q), 359 case Mod:cancel(Q, Tag, OkMsg, ActiveUser, State0) of 360 {ok, State} -> 361 {ok, set_ctx(Q, Ctx#ctx{state = State}, Ctxs)}; 362 Err -> 363 Err 364 end. 365 366-spec is_recoverable(amqqueue:amqqueue()) -> 367 boolean(). 368is_recoverable(Q) -> 369 Mod = amqqueue:get_type(Q), 370 Mod:is_recoverable(Q). 371 372-spec recover(rabbit_types:vhost(), [amqqueue:amqqueue()]) -> 373 {Recovered :: [amqqueue:amqqueue()], 374 Failed :: [amqqueue:amqqueue()]}. 375recover(VHost, Qs) -> 376 ByType = lists:foldl( 377 fun (Q, Acc) -> 378 T = amqqueue:get_type(Q), 379 maps:update_with(T, fun (X) -> 380 [Q | X] 381 end, Acc) 382 %% TODO resolve all registered queue types from registry 383 end, #{rabbit_classic_queue => [], 384 rabbit_quorum_queue => [], 385 rabbit_stream_queue => []}, Qs), 386 maps:fold(fun (Mod, Queues, {R0, F0}) -> 387 {Taken, {R, F}} = timer:tc(Mod, recover, [VHost, Queues]), 388 rabbit_log:info("Recovering ~b queues of type ~s took ~bms", 389 [length(Queues), Mod, Taken div 1000]), 390 {R0 ++ R, F0 ++ F} 391 end, {[], []}, ByType). 392 393-spec handle_down(pid(), term(), state()) -> 394 {ok, state(), actions()} | {eol, queue_ref()} | {error, term()}. 395handle_down(Pid, Info, #?STATE{monitor_registry = Reg0} = State0) -> 396 %% lookup queue ref in monitor registry 397 case maps:take(Pid, Reg0) of 398 {QRef, Reg} -> 399 case handle_event(QRef, {down, Pid, Info}, State0) of 400 {ok, State, Actions} -> 401 {ok, State#?STATE{monitor_registry = Reg}, Actions}; 402 eol -> 403 {eol, State0#?STATE{monitor_registry = Reg}, QRef}; 404 Err -> 405 Err 406 end; 407 error -> 408 {ok, State0, []} 409 end. 410 411%% messages sent from queues 412-spec handle_event(queue_ref(), term(), state()) -> 413 {ok, state(), actions()} | eol | {error, term()} | 414 {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}. 415handle_event(QRef, Evt, Ctxs) -> 416 %% events can arrive after a queue state has been cleared up 417 %% so need to be defensive here 418 case get_ctx(QRef, Ctxs, undefined) of 419 #ctx{module = Mod, 420 state = State0} = Ctx -> 421 case Mod:handle_event(Evt, State0) of 422 {ok, State, Actions} -> 423 return_ok(set_ctx(QRef, Ctx#ctx{state = State}, Ctxs), Actions); 424 Err -> 425 Err 426 end; 427 undefined -> 428 {ok, Ctxs, []} 429 end. 430 431-spec module(queue_ref(), state()) -> 432 {ok, module()} | {error, not_found}. 433module(QRef, Ctxs) -> 434 %% events can arrive after a queue state has been cleared up 435 %% so need to be defensive here 436 case get_ctx(QRef, Ctxs, undefined) of 437 #ctx{module = Mod} -> 438 {ok, Mod}; 439 undefined -> 440 {error, not_found} 441 end. 442 443-spec deliver([amqqueue:amqqueue()], Delivery :: term(), 444 stateless | state()) -> 445 {ok, state(), actions()} | {error, Reason :: term()}. 446deliver(Qs, Delivery, State) -> 447 try 448 deliver0(Qs, Delivery, State) 449 catch 450 exit:Reason -> 451 {error, Reason} 452 end. 453 454deliver0(Qs, Delivery, stateless) -> 455 _ = lists:map(fun(Q) -> 456 Mod = amqqueue:get_type(Q), 457 _ = Mod:deliver([{Q, stateless}], Delivery) 458 end, Qs), 459 {ok, stateless, []}; 460deliver0(Qs, Delivery, #?STATE{} = State0) -> 461 %% TODO: optimise single queue case? 462 %% sort by queue type - then dispatch each group 463 ByType = lists:foldl( 464 fun (Q, Acc) -> 465 T = amqqueue:get_type(Q), 466 Ctx = get_ctx(Q, State0), 467 maps:update_with( 468 T, fun (A) -> 469 [{Q, Ctx#ctx.state} | A] 470 end, [{Q, Ctx#ctx.state}], Acc) 471 end, #{}, Qs), 472 %%% dispatch each group to queue type interface? 473 {Xs, Actions} = maps:fold(fun(Mod, QSs, {X0, A0}) -> 474 {X, A} = Mod:deliver(QSs, Delivery), 475 {X0 ++ X, A0 ++ A} 476 end, {[], []}, ByType), 477 State = lists:foldl( 478 fun({Q, S}, Acc) -> 479 Ctx = get_ctx_with(Q, Acc, S), 480 set_ctx(qref(Q), Ctx#ctx{state = S}, Acc) 481 end, State0, Xs), 482 return_ok(State, Actions). 483 484 485-spec settle(queue_ref(), settle_op(), rabbit_types:ctag(), 486 [non_neg_integer()], state()) -> 487 {ok, state(), actions()} | 488 {'protocol_error', Type :: atom(), Reason :: string(), Args :: term()}. 489settle(QRef, Op, CTag, MsgIds, Ctxs) 490 when ?QREF(QRef) -> 491 case get_ctx(QRef, Ctxs, undefined) of 492 undefined -> 493 %% if we receive a settlement and there is no queue state it means 494 %% the queue was deleted with active consumers 495 {ok, Ctxs, []}; 496 #ctx{state = State0, 497 module = Mod} = Ctx -> 498 case Mod:settle(Op, CTag, MsgIds, State0) of 499 {State, Actions} -> 500 {ok, set_ctx(QRef, Ctx#ctx{state = State}, Ctxs), Actions}; 501 Err -> 502 Err 503 end 504 end. 505 506-spec credit(amqqueue:amqqueue() | queue_ref(), 507 rabbit_types:ctag(), non_neg_integer(), 508 boolean(), state()) -> {ok, state(), actions()}. 509credit(Q, CTag, Credit, Drain, Ctxs) -> 510 #ctx{state = State0, 511 module = Mod} = Ctx = get_ctx(Q, Ctxs), 512 {State, Actions} = Mod:credit(CTag, Credit, Drain, State0), 513 {ok, set_ctx(Q, Ctx#ctx{state = State}, Ctxs), Actions}. 514 515-spec dequeue(amqqueue:amqqueue(), boolean(), 516 pid(), rabbit_types:ctag(), state()) -> 517 {ok, non_neg_integer(), term(), state()} | 518 {empty, state()}. 519dequeue(Q, NoAck, LimiterPid, CTag, Ctxs) -> 520 #ctx{state = State0} = Ctx = get_ctx(Q, Ctxs), 521 Mod = amqqueue:get_type(Q), 522 case Mod:dequeue(NoAck, LimiterPid, CTag, State0) of 523 {ok, Num, Msg, State} -> 524 {ok, Num, Msg, set_ctx(Q, Ctx#ctx{state = State}, Ctxs)}; 525 {empty, State} -> 526 {empty, set_ctx(Q, Ctx#ctx{state = State}, Ctxs)}; 527 {error, _} = Err -> 528 Err; 529 {protocol_error, _, _, _} = Err -> 530 Err 531 end. 532 533get_ctx(QOrQref, State) -> 534 get_ctx_with(QOrQref, State, undefined). 535 536get_ctx_with(Q, #?STATE{ctxs = Contexts}, InitState) 537 when ?is_amqqueue(Q) -> 538 Ref = qref(Q), 539 case Contexts of 540 #{Ref := #ctx{module = Mod, 541 state = State} = Ctx} -> 542 Ctx#ctx{state = Mod:update(Q, State)}; 543 _ when InitState == undefined -> 544 %% not found and no initial state passed - initialize new state 545 Mod = amqqueue:get_type(Q), 546 Name = amqqueue:get_name(Q), 547 case Mod:init(Q) of 548 {error, Reason} -> 549 exit({Reason, Ref}); 550 {ok, QState} -> 551 #ctx{module = Mod, 552 name = Name, 553 state = QState} 554 end; 555 _ -> 556 %% not found - initialize with supplied initial state 557 Mod = amqqueue:get_type(Q), 558 Name = amqqueue:get_name(Q), 559 #ctx{module = Mod, 560 name = Name, 561 state = InitState} 562 end; 563get_ctx_with(QRef, Contexts, undefined) when ?QREF(QRef) -> 564 case get_ctx(QRef, Contexts, undefined) of 565 undefined -> 566 exit({queue_context_not_found, QRef}); 567 Ctx -> 568 Ctx 569 end. 570 571get_ctx(QRef, #?STATE{ctxs = Contexts}, Default) -> 572 Ref = qref(QRef), 573 %% if we use a QRef it should always be initialised 574 case maps:get(Ref, Contexts, undefined) of 575 #ctx{} = Ctx -> 576 Ctx; 577 undefined -> 578 Default 579 end. 580 581set_ctx(Q, Ctx, #?STATE{ctxs = Contexts} = State) 582 when ?is_amqqueue(Q) -> 583 Ref = qref(Q), 584 State#?STATE{ctxs = maps:put(Ref, Ctx, Contexts)}; 585set_ctx(QRef, Ctx, #?STATE{ctxs = Contexts} = State) -> 586 Ref = qref(QRef), 587 State#?STATE{ctxs = maps:put(Ref, Ctx, Contexts)}. 588 589qref(#resource{kind = queue} = QName) -> 590 QName; 591qref(Q) when ?is_amqqueue(Q) -> 592 amqqueue:get_name(Q). 593 594return_ok(State0, []) -> 595 {ok, State0, []}; 596return_ok(State0, Actions0) -> 597 {State, Actions} = 598 lists:foldl( 599 fun({monitor, Pid, QRef}, 600 {#?STATE{monitor_registry = M0} = S0, A0}) -> 601 case M0 of 602 #{Pid := QRef} -> 603 %% already monitored by the qref 604 {S0, A0}; 605 #{Pid := _} -> 606 %% TODO: allow multiple Qrefs to monitor the same pid 607 exit(return_ok_duplicate_monitored_pid); 608 _ -> 609 _ = erlang:monitor(process, Pid), 610 M = M0#{Pid => QRef}, 611 {S0#?STATE{monitor_registry = M}, A0} 612 end; 613 (Act, {S, A0}) -> 614 {S, [Act | A0]} 615 end, {State0, []}, Actions0), 616 {ok, State, lists:reverse(Actions)}. 617