1-module(rabbit_queue_type).
2-include("amqqueue.hrl").
3-include_lib("rabbit_common/include/resource.hrl").
4
5-export([
6         init/0,
7         close/1,
8         discover/1,
9         default/0,
10         is_enabled/1,
11         declare/2,
12         delete/4,
13         is_recoverable/1,
14         recover/2,
15         purge/1,
16         policy_changed/1,
17         stat/1,
18         remove/2,
19         info/2,
20         state_info/1,
21         info_down/2,
22         info_down/3,
23         %% stateful client API
24         new/2,
25         consume/3,
26         cancel/5,
27         handle_down/3,
28         handle_event/3,
29         module/2,
30         deliver/3,
31         settle/5,
32         credit/5,
33         dequeue/5,
34         fold_state/3,
35         is_policy_applicable/2,
36         is_server_named_allowed/1,
37         notify_decorators/1
38         ]).
39
40-type queue_name() :: rabbit_types:r(queue).
41-type queue_ref() :: queue_name() | atom().
42-type queue_state() :: term().
43-type msg_tag() :: term().
44
45-define(STATE, ?MODULE).
46
47%% Recoverable slaves shouldn't really be a generic one, but let's keep it here until
48%% mirrored queues are deprecated.
49-define(DOWN_KEYS, [name, durable, auto_delete, arguments, pid, recoverable_slaves, type, state]).
50
51-define(QREF(QueueReference),
52    (is_tuple(QueueReference) andalso element(1, QueueReference) == resource)
53    orelse is_atom(QueueReference)).
54%% anything that the host process needs to do on behalf of the queue type
55%% session, like knowing when to notify on monitor down
56-type action() ::
57    {monitor, Pid :: pid(), queue_ref()} |
58    %% indicate to the queue type module that a message has been delivered
59    %% fully to the queue
60    {settled, Success :: boolean(), [msg_tag()]} |
61    {deliver, rabbit_types:ctag(), boolean(), [rabbit_amqqueue:qmsg()]}.
62
63-type actions() :: [action()].
64
65-type event() ::
66    {down, pid(), Info :: term()} |
67    term().
68
69-record(ctx, {module :: module(),
70              name :: queue_name(),
71              %% "publisher confirm queue accounting"
72              %% queue type implementation should emit a:
73              %% {settle, Success :: boolean(), msg_tag()}
74              %% to either settle or reject the delivery of a
75              %% message to the queue instance
76              %% The queue type module will then emit a {confirm | reject, [msg_tag()}
77              %% action to the channel or channel like process when a msg_tag
78              %% has reached its conclusion
79              state :: queue_state()}).
80
81
82-record(?STATE, {ctxs = #{} :: #{queue_ref() => #ctx{} | queue_ref()},
83                 monitor_registry = #{} :: #{pid() => queue_ref()}
84                }).
85
86-opaque state() :: #?STATE{}.
87
88-type consume_spec() :: #{no_ack := boolean(),
89                          channel_pid := pid(),
90                          limiter_pid => pid(),
91                          limiter_active => boolean(),
92                          prefetch_count => non_neg_integer(),
93                          consumer_tag := rabbit_types:ctag(),
94                          exclusive_consume => boolean(),
95                          args => rabbit_framing:amqp_table(),
96                          ok_msg := term(),
97                          acting_user :=  rabbit_types:username()}.
98
99
100
101% copied from rabbit_amqqueue
102-type absent_reason() :: 'nodedown' | 'crashed' | stopped | timeout.
103
104-type settle_op() :: 'complete' | 'requeue' | 'discard'.
105
106-export_type([state/0,
107              consume_spec/0,
108              action/0,
109              actions/0,
110              settle_op/0]).
111
112%% is the queue type feature enabled
113-callback is_enabled() -> boolean().
114
115-callback declare(amqqueue:amqqueue(), node()) ->
116    {'new' | 'existing' | 'owner_died', amqqueue:amqqueue()} |
117    {'absent', amqqueue:amqqueue(), absent_reason()} |
118    {'protocol_error', Type :: atom(), Reason :: string(), Args :: term()}.
119
120-callback delete(amqqueue:amqqueue(),
121                 boolean(),
122                 boolean(),
123                 rabbit_types:username()) ->
124    rabbit_types:ok(non_neg_integer()) |
125    rabbit_types:error(in_use | not_empty) |
126    {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
127
128-callback recover(rabbit_types:vhost(), [amqqueue:amqqueue()]) ->
129    {Recovered :: [amqqueue:amqqueue()],
130     Failed :: [amqqueue:amqqueue()]}.
131
132%% checks if the queue should be recovered
133-callback is_recoverable(amqqueue:amqqueue()) ->
134    boolean().
135
136-callback purge(amqqueue:amqqueue()) ->
137    {ok, non_neg_integer()} | {error, term()}.
138
139-callback policy_changed(amqqueue:amqqueue()) -> ok.
140
141%% stateful
142%% intitialise and return a queue type specific session context
143-callback init(amqqueue:amqqueue()) -> {ok, queue_state()} | {error, Reason :: term()}.
144
145-callback close(queue_state()) -> ok.
146%% update the queue type state from amqqrecord
147-callback update(amqqueue:amqqueue(), queue_state()) -> queue_state().
148
149-callback consume(amqqueue:amqqueue(),
150                  consume_spec(),
151                  queue_state()) ->
152    {ok, queue_state(), actions()} | {error, term()} |
153    {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
154
155-callback cancel(amqqueue:amqqueue(),
156                 rabbit_types:ctag(),
157                 term(),
158                 rabbit_types:username(),
159                 queue_state()) ->
160    {ok, queue_state()} | {error, term()}.
161
162%% any async events returned from the queue system should be processed through
163%% this
164-callback handle_event(Event :: event(),
165                       queue_state()) ->
166    {ok, queue_state(), actions()} | {error, term()} | eol |
167    {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
168
169-callback deliver([{amqqueue:amqqueue(), queue_state()}],
170                  Delivery :: term()) ->
171    {[{amqqueue:amqqueue(), queue_state()}], actions()}.
172
173-callback settle(settle_op(), rabbit_types:ctag(), [non_neg_integer()], queue_state()) ->
174    {queue_state(), actions()} |
175    {'protocol_error', Type :: atom(), Reason :: string(), Args :: term()}.
176
177-callback credit(rabbit_types:ctag(),
178                 non_neg_integer(), Drain :: boolean(), queue_state()) ->
179    {queue_state(), actions()}.
180
181-callback dequeue(NoAck :: boolean(), LimiterPid :: pid(),
182                  rabbit_types:ctag(), queue_state()) ->
183    {ok, Count :: non_neg_integer(), rabbit_amqqueue:qmsg(), queue_state()} |
184    {empty, queue_state()} |
185    {error, term()} |
186    {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
187
188%% return a map of state summary information
189-callback state_info(queue_state()) ->
190    #{atom() := term()}.
191
192%% general queue info
193-callback info(amqqueue:amqqueue(), all_keys | rabbit_types:info_keys()) ->
194    rabbit_types:infos().
195
196-callback stat(amqqueue:amqqueue()) ->
197    {'ok', non_neg_integer(), non_neg_integer()}.
198
199-callback capabilities() ->
200    #{atom() := term()}.
201
202-callback notify_decorators(amqqueue:amqqueue()) ->
203    ok.
204
205%% TODO: this should be controlled by a registry that is populated on boot
206discover(<<"quorum">>) ->
207    rabbit_quorum_queue;
208discover(<<"classic">>) ->
209    rabbit_classic_queue;
210discover(<<"stream">>) ->
211    rabbit_stream_queue.
212
213default() ->
214    rabbit_classic_queue.
215
216-spec is_enabled(module()) -> boolean().
217is_enabled(Type) ->
218    Type:is_enabled().
219
220-spec declare(amqqueue:amqqueue(), node()) ->
221    {'new' | 'existing' | 'owner_died', amqqueue:amqqueue()} |
222    {'absent', amqqueue:amqqueue(), absent_reason()} |
223    {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
224declare(Q0, Node) ->
225    Q = rabbit_queue_decorator:set(rabbit_policy:set(Q0)),
226    Mod = amqqueue:get_type(Q),
227    Mod:declare(Q, Node).
228
229-spec delete(amqqueue:amqqueue(), boolean(),
230             boolean(), rabbit_types:username()) ->
231    rabbit_types:ok(non_neg_integer()) |
232    rabbit_types:error(in_use | not_empty) |
233    {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
234delete(Q, IfUnused, IfEmpty, ActingUser) ->
235    Mod = amqqueue:get_type(Q),
236    Mod:delete(Q, IfUnused, IfEmpty, ActingUser).
237
238-spec purge(amqqueue:amqqueue()) ->
239    {'ok', non_neg_integer()} | {error, term()}.
240purge(Q) ->
241    Mod = amqqueue:get_type(Q),
242    Mod:purge(Q).
243
244-spec policy_changed(amqqueue:amqqueue()) -> 'ok'.
245policy_changed(Q) ->
246    Mod = amqqueue:get_type(Q),
247    Mod:policy_changed(Q).
248
249-spec stat(amqqueue:amqqueue()) ->
250    {'ok', non_neg_integer(), non_neg_integer()}.
251stat(Q) ->
252    Mod = amqqueue:get_type(Q),
253    Mod:stat(Q).
254
255-spec remove(queue_ref(), state()) -> state().
256remove(QRef, #?STATE{ctxs = Ctxs0} = State) ->
257    case maps:take(QRef, Ctxs0) of
258        error ->
259            State;
260        {_, Ctxs} ->
261            State#?STATE{ctxs = Ctxs}
262    end.
263
264-spec info(amqqueue:amqqueue(), all_keys | rabbit_types:info_keys()) ->
265    rabbit_types:infos().
266info(Q, Items) when ?amqqueue_state_is(Q, crashed) ->
267    info_down(Q, Items, crashed);
268info(Q, Items) when ?amqqueue_state_is(Q, stopped) ->
269    info_down(Q, Items, stopped);
270info(Q, Items) ->
271    Mod = amqqueue:get_type(Q),
272    Mod:info(Q, Items).
273
274fold_state(Fun, Acc, #?STATE{ctxs = Ctxs}) ->
275    maps:fold(Fun, Acc, Ctxs).
276
277state_info(#ctx{state = S,
278                module = Mod}) ->
279    Mod:state_info(S);
280state_info(_) ->
281    #{}.
282
283down_keys() -> ?DOWN_KEYS.
284
285info_down(Q, DownReason) ->
286    info_down(Q, down_keys(), DownReason).
287
288info_down(Q, all_keys, DownReason) ->
289    info_down(Q, down_keys(), DownReason);
290info_down(Q, Items, DownReason) ->
291    [{Item, i_down(Item, Q, DownReason)} || Item <- Items].
292
293i_down(name,               Q, _) -> amqqueue:get_name(Q);
294i_down(durable,            Q, _) -> amqqueue:is_durable(Q);
295i_down(auto_delete,        Q, _) -> amqqueue:is_auto_delete(Q);
296i_down(arguments,          Q, _) -> amqqueue:get_arguments(Q);
297i_down(pid,                Q, _) -> amqqueue:get_pid(Q);
298i_down(recoverable_slaves, Q, _) -> amqqueue:get_recoverable_slaves(Q);
299i_down(type,               Q, _) -> amqqueue:get_type(Q);
300i_down(state, _Q, DownReason)    -> DownReason;
301i_down(_K, _Q, _DownReason) -> ''.
302
303is_policy_applicable(Q, Policy) ->
304    Mod = amqqueue:get_type(Q),
305    Capabilities = Mod:capabilities(),
306    NotApplicable = maps:get(unsupported_policies, Capabilities, []),
307    lists:all(fun({P, _}) ->
308                      not lists:member(P, NotApplicable)
309              end, Policy).
310
311is_server_named_allowed(Type) ->
312    Capabilities = Type:capabilities(),
313    maps:get(server_named, Capabilities, false).
314
315notify_decorators(Q) ->
316    Mod = amqqueue:get_type(Q),
317    Mod:notify_decorators(Q).
318
319-spec init() -> state().
320init() ->
321    #?STATE{}.
322
323-spec close(state()) -> ok.
324close(#?STATE{ctxs = Contexts}) ->
325    _ = maps:map(
326          fun (_, #ctx{module = Mod,
327                       state = S}) ->
328                  ok = Mod:close(S)
329          end, Contexts),
330    ok.
331
332-spec new(amqqueue:amqqueue(), state()) -> state().
333new(Q, State) when ?is_amqqueue(Q) ->
334    Ctx = get_ctx(Q, State),
335    set_ctx(Q, Ctx, State).
336
337-spec consume(amqqueue:amqqueue(), consume_spec(), state()) ->
338    {ok, state(), actions()} | {error, term()}.
339consume(Q, Spec, State) ->
340    #ctx{state = CtxState0} = Ctx = get_ctx(Q, State),
341    Mod = amqqueue:get_type(Q),
342    case Mod:consume(Q, Spec, CtxState0) of
343        {ok, CtxState, Actions} ->
344            return_ok(set_ctx(Q, Ctx#ctx{state = CtxState}, State), Actions);
345        Err ->
346            Err
347    end.
348
349%% TODO switch to cancel spec api
350-spec cancel(amqqueue:amqqueue(),
351             rabbit_types:ctag(),
352             term(),
353             rabbit_types:username(),
354             state()) ->
355    {ok, state()} | {error, term()}.
356cancel(Q, Tag, OkMsg, ActiveUser, Ctxs) ->
357    #ctx{state = State0} = Ctx = get_ctx(Q, Ctxs),
358    Mod = amqqueue:get_type(Q),
359    case Mod:cancel(Q, Tag, OkMsg, ActiveUser, State0) of
360        {ok, State} ->
361            {ok, set_ctx(Q, Ctx#ctx{state = State}, Ctxs)};
362        Err ->
363            Err
364    end.
365
366-spec is_recoverable(amqqueue:amqqueue()) ->
367    boolean().
368is_recoverable(Q) ->
369    Mod = amqqueue:get_type(Q),
370    Mod:is_recoverable(Q).
371
372-spec recover(rabbit_types:vhost(), [amqqueue:amqqueue()]) ->
373    {Recovered :: [amqqueue:amqqueue()],
374     Failed :: [amqqueue:amqqueue()]}.
375recover(VHost, Qs) ->
376   ByType = lists:foldl(
377              fun (Q, Acc) ->
378                      T = amqqueue:get_type(Q),
379                      maps:update_with(T, fun (X) ->
380                                                  [Q | X]
381                                          end, Acc)
382                      %% TODO resolve all registered queue types from registry
383              end, #{rabbit_classic_queue => [],
384                     rabbit_quorum_queue => [],
385                     rabbit_stream_queue => []}, Qs),
386   maps:fold(fun (Mod, Queues, {R0, F0}) ->
387                     {Taken, {R, F}} =  timer:tc(Mod, recover, [VHost, Queues]),
388                     rabbit_log:info("Recovering ~b queues of type ~s took ~bms",
389                                    [length(Queues), Mod, Taken div 1000]),
390                     {R0 ++ R, F0 ++ F}
391             end, {[], []}, ByType).
392
393-spec handle_down(pid(), term(), state()) ->
394    {ok, state(), actions()} | {eol, queue_ref()} | {error, term()}.
395handle_down(Pid, Info, #?STATE{monitor_registry = Reg0} = State0) ->
396    %% lookup queue ref in monitor registry
397    case maps:take(Pid, Reg0) of
398        {QRef, Reg} ->
399            case handle_event(QRef, {down, Pid, Info}, State0) of
400                {ok, State, Actions} ->
401                    {ok, State#?STATE{monitor_registry = Reg}, Actions};
402                eol ->
403                    {eol, State0#?STATE{monitor_registry = Reg}, QRef};
404                Err ->
405                    Err
406            end;
407        error ->
408            {ok, State0, []}
409    end.
410
411%% messages sent from queues
412-spec handle_event(queue_ref(), term(), state()) ->
413    {ok, state(), actions()} | eol | {error, term()} |
414    {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
415handle_event(QRef, Evt, Ctxs) ->
416    %% events can arrive after a queue state has been cleared up
417    %% so need to be defensive here
418    case get_ctx(QRef, Ctxs, undefined) of
419        #ctx{module = Mod,
420             state = State0} = Ctx  ->
421            case Mod:handle_event(Evt, State0) of
422                {ok, State, Actions} ->
423                    return_ok(set_ctx(QRef, Ctx#ctx{state = State}, Ctxs), Actions);
424                Err ->
425                    Err
426            end;
427        undefined ->
428            {ok, Ctxs, []}
429    end.
430
431-spec module(queue_ref(), state()) ->
432    {ok, module()} | {error, not_found}.
433module(QRef, Ctxs) ->
434    %% events can arrive after a queue state has been cleared up
435    %% so need to be defensive here
436    case get_ctx(QRef, Ctxs, undefined) of
437        #ctx{module = Mod} ->
438            {ok, Mod};
439        undefined ->
440            {error, not_found}
441    end.
442
443-spec deliver([amqqueue:amqqueue()], Delivery :: term(),
444              stateless | state()) ->
445    {ok, state(), actions()} | {error, Reason :: term()}.
446deliver(Qs, Delivery, State) ->
447    try
448        deliver0(Qs, Delivery, State)
449    catch
450        exit:Reason ->
451            {error, Reason}
452    end.
453
454deliver0(Qs, Delivery, stateless) ->
455    _ = lists:map(fun(Q) ->
456                          Mod = amqqueue:get_type(Q),
457                          _ = Mod:deliver([{Q, stateless}], Delivery)
458                  end, Qs),
459    {ok, stateless, []};
460deliver0(Qs, Delivery, #?STATE{} = State0) ->
461    %% TODO: optimise single queue case?
462    %% sort by queue type - then dispatch each group
463    ByType = lists:foldl(
464               fun (Q, Acc) ->
465                       T = amqqueue:get_type(Q),
466                       Ctx = get_ctx(Q, State0),
467                       maps:update_with(
468                         T, fun (A) ->
469                                    [{Q, Ctx#ctx.state} | A]
470                            end, [{Q, Ctx#ctx.state}], Acc)
471               end, #{}, Qs),
472    %%% dispatch each group to queue type interface?
473    {Xs, Actions} = maps:fold(fun(Mod, QSs, {X0, A0}) ->
474                                      {X, A} = Mod:deliver(QSs, Delivery),
475                                      {X0 ++ X, A0 ++ A}
476                              end, {[], []}, ByType),
477    State = lists:foldl(
478              fun({Q, S}, Acc) ->
479                      Ctx = get_ctx_with(Q, Acc, S),
480                      set_ctx(qref(Q), Ctx#ctx{state = S}, Acc)
481              end, State0, Xs),
482    return_ok(State, Actions).
483
484
485-spec settle(queue_ref(), settle_op(), rabbit_types:ctag(),
486             [non_neg_integer()], state()) ->
487          {ok, state(), actions()} |
488          {'protocol_error', Type :: atom(), Reason :: string(), Args :: term()}.
489settle(QRef, Op, CTag, MsgIds, Ctxs)
490  when ?QREF(QRef) ->
491    case get_ctx(QRef, Ctxs, undefined) of
492        undefined ->
493            %% if we receive a settlement and there is no queue state it means
494            %% the queue was deleted with active consumers
495            {ok, Ctxs, []};
496        #ctx{state = State0,
497             module = Mod} = Ctx ->
498            case Mod:settle(Op, CTag, MsgIds, State0) of
499                {State, Actions} ->
500                    {ok, set_ctx(QRef, Ctx#ctx{state = State}, Ctxs), Actions};
501                Err ->
502                    Err
503            end
504    end.
505
506-spec credit(amqqueue:amqqueue() | queue_ref(),
507             rabbit_types:ctag(), non_neg_integer(),
508             boolean(), state()) -> {ok, state(), actions()}.
509credit(Q, CTag, Credit, Drain, Ctxs) ->
510    #ctx{state = State0,
511         module = Mod} = Ctx = get_ctx(Q, Ctxs),
512    {State, Actions} = Mod:credit(CTag, Credit, Drain, State0),
513    {ok, set_ctx(Q, Ctx#ctx{state = State}, Ctxs), Actions}.
514
515-spec dequeue(amqqueue:amqqueue(), boolean(),
516              pid(), rabbit_types:ctag(), state()) ->
517    {ok, non_neg_integer(), term(), state()}  |
518    {empty, state()}.
519dequeue(Q, NoAck, LimiterPid, CTag, Ctxs) ->
520    #ctx{state = State0} = Ctx = get_ctx(Q, Ctxs),
521    Mod = amqqueue:get_type(Q),
522    case Mod:dequeue(NoAck, LimiterPid, CTag, State0) of
523        {ok, Num, Msg, State} ->
524            {ok, Num, Msg, set_ctx(Q, Ctx#ctx{state = State}, Ctxs)};
525        {empty, State} ->
526            {empty, set_ctx(Q, Ctx#ctx{state = State}, Ctxs)};
527        {error, _} = Err ->
528            Err;
529        {protocol_error, _, _, _} = Err ->
530            Err
531    end.
532
533get_ctx(QOrQref, State) ->
534    get_ctx_with(QOrQref, State, undefined).
535
536get_ctx_with(Q, #?STATE{ctxs = Contexts}, InitState)
537  when ?is_amqqueue(Q) ->
538    Ref = qref(Q),
539    case Contexts of
540        #{Ref := #ctx{module = Mod,
541                      state = State} = Ctx} ->
542            Ctx#ctx{state = Mod:update(Q, State)};
543        _ when InitState == undefined ->
544            %% not found and no initial state passed - initialize new state
545            Mod = amqqueue:get_type(Q),
546            Name = amqqueue:get_name(Q),
547            case Mod:init(Q) of
548                {error, Reason} ->
549                    exit({Reason, Ref});
550                {ok, QState} ->
551                    #ctx{module = Mod,
552                         name = Name,
553                         state = QState}
554            end;
555        _  ->
556            %% not found - initialize with supplied initial state
557            Mod = amqqueue:get_type(Q),
558            Name = amqqueue:get_name(Q),
559            #ctx{module = Mod,
560                 name = Name,
561                 state = InitState}
562    end;
563get_ctx_with(QRef, Contexts, undefined) when ?QREF(QRef) ->
564    case get_ctx(QRef, Contexts, undefined) of
565        undefined ->
566            exit({queue_context_not_found, QRef});
567        Ctx ->
568            Ctx
569    end.
570
571get_ctx(QRef, #?STATE{ctxs = Contexts}, Default) ->
572    Ref = qref(QRef),
573    %% if we use a QRef it should always be initialised
574    case maps:get(Ref, Contexts, undefined) of
575        #ctx{} = Ctx ->
576            Ctx;
577        undefined ->
578            Default
579    end.
580
581set_ctx(Q, Ctx, #?STATE{ctxs = Contexts} = State)
582  when ?is_amqqueue(Q) ->
583    Ref = qref(Q),
584    State#?STATE{ctxs = maps:put(Ref, Ctx, Contexts)};
585set_ctx(QRef, Ctx, #?STATE{ctxs = Contexts} = State) ->
586    Ref = qref(QRef),
587    State#?STATE{ctxs = maps:put(Ref, Ctx, Contexts)}.
588
589qref(#resource{kind = queue} = QName) ->
590    QName;
591qref(Q) when ?is_amqqueue(Q) ->
592    amqqueue:get_name(Q).
593
594return_ok(State0, []) ->
595    {ok, State0, []};
596return_ok(State0, Actions0) ->
597    {State, Actions} =
598        lists:foldl(
599          fun({monitor, Pid, QRef},
600              {#?STATE{monitor_registry = M0} = S0, A0}) ->
601                  case M0 of
602                      #{Pid := QRef} ->
603                          %% already monitored by the qref
604                          {S0, A0};
605                      #{Pid := _} ->
606                          %% TODO: allow multiple Qrefs to monitor the same pid
607                          exit(return_ok_duplicate_monitored_pid);
608                      _ ->
609                          _ = erlang:monitor(process, Pid),
610                          M = M0#{Pid => QRef},
611                          {S0#?STATE{monitor_registry = M}, A0}
612                  end;
613             (Act, {S, A0}) ->
614                  {S, [Act | A0]}
615          end, {State0, []}, Actions0),
616    {ok, State, lists:reverse(Actions)}.
617