1%% This Source Code Form is subject to the terms of the Mozilla Public
2%% License, v. 2.0. If a copy of the MPL was not distributed with this
3%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4%%
5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates.  All rights reserved.
6%%
7
8-module(rabbit_amqqueue).
9
10-export([store_queue_ram_dirty/1]).
11-export([warn_file_limit/0]).
12-export([recover/1, stop/1, start/1, declare/6, declare/7,
13         delete_immediately/1, delete_exclusive/2, delete/4, purge/1,
14         forget_all_durable/1]).
15-export([pseudo_queue/2, pseudo_queue/3, immutable/1]).
16-export([lookup/1, lookup_many/1, not_found_or_absent/1, not_found_or_absent_dirty/1,
17         with/2, with/3, with_or_die/2,
18         assert_equivalence/5,
19         check_exclusive_access/2, with_exclusive_access_or_die/3,
20         stat/1, deliver/2,
21         requeue/3, ack/3, reject/4]).
22-export([not_found/1, absent/2]).
23-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2,
24         emit_info_all/5, list_local/1, info_local/1,
25         emit_info_local/4, emit_info_down/4]).
26-export([count/0]).
27-export([list_down/1, count/1, list_names/0, list_names/1, list_local_names/0,
28         list_local_names_down/0, list_with_possible_retry/1]).
29-export([list_by_type/1, sample_local_queues/0, sample_n_by_name/2, sample_n/2]).
30-export([force_event_refresh/1, notify_policy_changed/1]).
31-export([consumers/1, consumers_all/1,  emit_consumers_all/4, consumer_info_keys/0]).
32-export([basic_get/5, basic_consume/12, basic_cancel/5, notify_decorators/1]).
33-export([notify_sent/2, notify_sent_queue_down/1, resume/2]).
34-export([notify_down_all/2, notify_down_all/3, activate_limit_all/2, credit/5]).
35-export([on_node_up/1, on_node_down/1]).
36-export([update/2, store_queue/1, update_decorators/1, policy_changed/2]).
37-export([update_mirroring/1, sync_mirrors/1, cancel_sync_mirrors/1]).
38-export([emit_unresponsive/6, emit_unresponsive_local/5, is_unresponsive/2]).
39-export([has_synchronised_mirrors_online/1, is_match/2, is_in_virtual_host/2]).
40-export([is_replicated/1, is_exclusive/1, is_not_exclusive/1, is_dead_exclusive/1]).
41-export([list_local_quorum_queues/0, list_local_quorum_queue_names/0, list_local_stream_queues/0,
42         list_local_mirrored_classic_queues/0, list_local_mirrored_classic_names/0,
43         list_local_leaders/0, list_local_followers/0, get_quorum_nodes/1,
44         list_local_mirrored_classic_without_synchronised_mirrors/0,
45         list_local_mirrored_classic_without_synchronised_mirrors_for_cli/0,
46         list_local_quorum_queues_with_name_matching/1,
47         list_local_quorum_queues_with_name_matching/2]).
48-export([ensure_rabbit_queue_record_is_initialized/1]).
49-export([format/1]).
50-export([delete_immediately_by_resource/1]).
51-export([delete_crashed/1,
52         delete_crashed/2,
53         delete_crashed_internal/2]).
54
55-export([pid_of/1, pid_of/2]).
56-export([mark_local_durable_queues_stopped/1]).
57
58-export([rebalance/3]).
59-export([collect_info_all/2]).
60
61-export([is_policy_applicable/2, declare_args/0, consume_args/0]).
62-export([is_server_named_allowed/1]).
63
64-export([check_max_age/1]).
65-export([get_queue_type/1, get_resource_vhost_name/1, get_resource_name/1]).
66
67-export([deactivate_limit_all/2]).
68
69%% internal
70-export([internal_declare/2, internal_delete/2, run_backing_queue/3,
71         set_ram_duration_target/2, set_maximum_since_use/2,
72         emit_consumers_local/3, internal_delete/3]).
73
74-include_lib("rabbit_common/include/rabbit.hrl").
75-include_lib("stdlib/include/qlc.hrl").
76-include("amqqueue.hrl").
77
78-define(INTEGER_ARG_TYPES, [byte, short, signedint, long,
79                            unsignedbyte, unsignedshort, unsignedint]).
80
81-define(IS_CLASSIC(QPid), is_pid(QPid)).
82-define(IS_QUORUM(QPid), is_tuple(QPid)).
83%%----------------------------------------------------------------------------
84
85-export_type([name/0, qmsg/0, absent_reason/0]).
86
87-type name() :: rabbit_types:r('queue').
88
89-type qpids() :: [pid()].
90-type qlen() :: rabbit_types:ok(non_neg_integer()).
91-type qfun(A) :: fun ((amqqueue:amqqueue()) -> A | no_return()).
92-type qmsg() :: {name(), pid() | {atom(), pid()}, msg_id(),
93                 boolean(), rabbit_types:message()}.
94-type msg_id() :: non_neg_integer().
95-type ok_or_errors() ::
96        'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}.
97-type absent_reason() :: 'nodedown' | 'crashed' | stopped | timeout.
98-type queue_not_found() :: not_found.
99-type queue_absent() :: {'absent', amqqueue:amqqueue(), absent_reason()}.
100-type not_found_or_absent() :: queue_not_found() | queue_absent().
101
102%%----------------------------------------------------------------------------
103
104-define(CONSUMER_INFO_KEYS,
105        [queue_name, channel_pid, consumer_tag, ack_required, prefetch_count,
106         active, activity_status, arguments]).
107
108warn_file_limit() ->
109    DurableQueues = find_recoverable_queues(),
110    L = length(DurableQueues),
111
112    %% if there are not enough file handles, the server might hang
113    %% when trying to recover queues, warn the user:
114    case file_handle_cache:get_limit() < L of
115        true ->
116            rabbit_log:warning(
117              "Recovering ~p queues, available file handles: ~p. Please increase max open file handles limit to at least ~p!",
118              [L, file_handle_cache:get_limit(), L]);
119        false ->
120            ok
121    end.
122
123-spec recover(rabbit_types:vhost()) ->
124    {Recovered :: [amqqueue:amqqueue()],
125     Failed :: [amqqueue:amqqueue()]}.
126recover(VHost) ->
127    AllDurable = find_local_durable_queues(VHost),
128    rabbit_queue_type:recover(VHost, AllDurable).
129
130filter_pid_per_type(QPids) ->
131    lists:partition(fun(QPid) -> ?IS_CLASSIC(QPid) end, QPids).
132
133filter_resource_per_type(Resources) ->
134    Queues = [begin
135                  {ok, Q} = lookup(Resource),
136                  QPid = amqqueue:get_pid(Q),
137                  {Resource, QPid}
138              end || Resource <- Resources],
139    lists:partition(fun({_Resource, QPid}) -> ?IS_CLASSIC(QPid) end, Queues).
140
141-spec stop(rabbit_types:vhost()) -> 'ok'.
142stop(VHost) ->
143    %% Classic queues
144    ok = rabbit_amqqueue_sup_sup:stop_for_vhost(VHost),
145    {ok, BQ} = application:get_env(rabbit, backing_queue_module),
146    ok = BQ:stop(VHost),
147    rabbit_quorum_queue:stop(VHost).
148
149-spec start([amqqueue:amqqueue()]) -> 'ok'.
150
151start(Qs) ->
152    %% At this point all recovered queues and their bindings are
153    %% visible to routing, so now it is safe for them to complete
154    %% their initialisation (which may involve interacting with other
155    %% queues).
156    _ = [amqqueue:get_pid(Q) ! {self(), go}
157         || Q <- Qs,
158            %% All queues are supposed to be classic here.
159            amqqueue:is_classic(Q)],
160    ok.
161
162mark_local_durable_queues_stopped(VHost) ->
163    ?try_mnesia_tx_or_upgrade_amqqueue_and_retry(
164       do_mark_local_durable_queues_stopped(VHost),
165       do_mark_local_durable_queues_stopped(VHost)).
166
167do_mark_local_durable_queues_stopped(VHost) ->
168    Qs = find_local_durable_queues(VHost),
169    rabbit_misc:execute_mnesia_transaction(
170        fun() ->
171            [ store_queue(amqqueue:set_state(Q, stopped))
172              || Q <- Qs, amqqueue:get_type(Q) =:= rabbit_classic_queue,
173                 amqqueue:get_state(Q) =/= stopped ]
174        end).
175
176find_local_durable_queues(VHost) ->
177    mnesia:async_dirty(
178      fun () ->
179              qlc:e(
180                qlc:q(
181                  [Q || Q <- mnesia:table(rabbit_durable_queue),
182                        amqqueue:get_vhost(Q) =:= VHost andalso
183                        rabbit_queue_type:is_recoverable(Q)
184                  ]))
185      end).
186
187find_recoverable_queues() ->
188    mnesia:async_dirty(
189      fun () ->
190              qlc:e(qlc:q([Q || Q <- mnesia:table(rabbit_durable_queue),
191                                rabbit_queue_type:is_recoverable(Q)]))
192      end).
193
194-spec declare(name(),
195              boolean(),
196              boolean(),
197              rabbit_framing:amqp_table(),
198              rabbit_types:maybe(pid()),
199              rabbit_types:username()) ->
200    {'new' | 'existing' | 'owner_died', amqqueue:amqqueue()} |
201    {'new', amqqueue:amqqueue(), rabbit_fifo_client:state()} |
202    {'absent', amqqueue:amqqueue(), absent_reason()} |
203    {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
204declare(QueueName, Durable, AutoDelete, Args, Owner, ActingUser) ->
205    declare(QueueName, Durable, AutoDelete, Args, Owner, ActingUser, node()).
206
207
208%% The Node argument suggests where the queue (master if mirrored)
209%% should be. Note that in some cases (e.g. with "nodes" policy in
210%% effect) this might not be possible to satisfy.
211
212-spec declare(name(),
213              boolean(),
214              boolean(),
215              rabbit_framing:amqp_table(),
216              rabbit_types:maybe(pid()),
217              rabbit_types:username(),
218              node()) ->
219    {'new' | 'existing' | 'owner_died', amqqueue:amqqueue()} |
220    {'absent', amqqueue:amqqueue(), absent_reason()} |
221    {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
222declare(QueueName = #resource{virtual_host = VHost}, Durable, AutoDelete, Args,
223        Owner, ActingUser, Node) ->
224    ok = check_declare_arguments(QueueName, Args),
225    Type = get_queue_type(Args),
226    case rabbit_queue_type:is_enabled(Type) of
227        true ->
228            Q = amqqueue:new(QueueName,
229                              none,
230                              Durable,
231                              AutoDelete,
232                              Owner,
233                              Args,
234                              VHost,
235                              #{user => ActingUser},
236                              Type),
237            rabbit_queue_type:declare(Q, Node);
238        false ->
239            {protocol_error, internal_error,
240             "Cannot declare a queue '~s' of type '~s' on node '~s': "
241             "the corresponding feature flag is disabled",
242              [rabbit_misc:rs(QueueName), Type, Node]}
243    end.
244
245get_queue_type(Args) ->
246    case rabbit_misc:table_lookup(Args, <<"x-queue-type">>) of
247        undefined ->
248            rabbit_queue_type:default();
249        {_, V} ->
250            rabbit_queue_type:discover(V)
251    end.
252
253-spec internal_declare(amqqueue:amqqueue(), boolean()) ->
254    {created | existing, amqqueue:amqqueue()} | queue_absent().
255
256internal_declare(Q, Recover) ->
257    ?try_mnesia_tx_or_upgrade_amqqueue_and_retry(
258        do_internal_declare(Q, Recover),
259        begin
260            Q1 = amqqueue:upgrade(Q),
261            do_internal_declare(Q1, Recover)
262        end).
263
264do_internal_declare(Q, true) ->
265    rabbit_misc:execute_mnesia_tx_with_tail(
266      fun () ->
267              ok = store_queue(amqqueue:set_state(Q, live)),
268              rabbit_misc:const({created, Q})
269      end);
270do_internal_declare(Q, false) ->
271    QueueName = amqqueue:get_name(Q),
272    rabbit_misc:execute_mnesia_tx_with_tail(
273      fun () ->
274              case mnesia:wread({rabbit_queue, QueueName}) of
275                  [] ->
276                      case not_found_or_absent(QueueName) of
277                          not_found           -> Q1 = rabbit_policy:set(Q),
278                                                 Q2 = amqqueue:set_state(Q1, live),
279                                                 ok = store_queue(Q2),
280                                                 fun () -> {created, Q2} end;
281                          {absent, _Q, _} = R -> rabbit_misc:const(R)
282                      end;
283                  [ExistingQ] ->
284                      rabbit_misc:const({existing, ExistingQ})
285              end
286      end).
287
288-spec update
289        (name(), fun((amqqueue:amqqueue()) -> amqqueue:amqqueue())) ->
290            'not_found' | amqqueue:amqqueue().
291
292update(Name, Fun) ->
293    case mnesia:wread({rabbit_queue, Name}) of
294        [Q] ->
295            Durable = amqqueue:is_durable(Q),
296            Q1 = Fun(Q),
297            ok = mnesia:write(rabbit_queue, Q1, write),
298            case Durable of
299                true -> ok = mnesia:write(rabbit_durable_queue, Q1, write);
300                _    -> ok
301            end,
302            Q1;
303        [] ->
304            not_found
305    end.
306
307%% only really used for quorum queues to ensure the rabbit_queue record
308%% is initialised
309ensure_rabbit_queue_record_is_initialized(Q) ->
310    ?try_mnesia_tx_or_upgrade_amqqueue_and_retry(
311       do_ensure_rabbit_queue_record_is_initialized(Q),
312       begin
313           Q1 = amqqueue:upgrade(Q),
314           do_ensure_rabbit_queue_record_is_initialized(Q1)
315       end).
316
317do_ensure_rabbit_queue_record_is_initialized(Q) ->
318    rabbit_misc:execute_mnesia_tx_with_tail(
319      fun () ->
320              ok = store_queue(Q),
321              rabbit_misc:const({ok, Q})
322      end).
323
324-spec store_queue(amqqueue:amqqueue()) -> 'ok'.
325
326store_queue(Q) when ?amqqueue_is_durable(Q) ->
327    Q1 = amqqueue:reset_mirroring_and_decorators(Q),
328    ok = mnesia:write(rabbit_durable_queue, Q1, write),
329    store_queue_ram(Q);
330store_queue(Q) when not ?amqqueue_is_durable(Q) ->
331    store_queue_ram(Q).
332
333store_queue_ram(Q) ->
334    ok = mnesia:write(rabbit_queue, rabbit_queue_decorator:set(Q), write).
335
336store_queue_ram_dirty(Q) ->
337    ok = mnesia:dirty_write(rabbit_queue, rabbit_queue_decorator:set(Q)).
338
339-spec update_decorators(name()) -> 'ok'.
340
341update_decorators(Name) ->
342    rabbit_misc:execute_mnesia_transaction(
343      fun() ->
344              case mnesia:wread({rabbit_queue, Name}) of
345                  [Q] -> store_queue_ram(Q),
346                         ok;
347                  []  -> ok
348              end
349      end).
350
351-spec policy_changed(amqqueue:amqqueue(), amqqueue:amqqueue()) ->
352          'ok'.
353
354policy_changed(Q1, Q2) ->
355    Decorators1 = amqqueue:get_decorators(Q1),
356    Decorators2 = amqqueue:get_decorators(Q2),
357    rabbit_mirror_queue_misc:update_mirrors(Q1, Q2),
358    D1 = rabbit_queue_decorator:select(Decorators1),
359    D2 = rabbit_queue_decorator:select(Decorators2),
360    [ok = M:policy_changed(Q1, Q2) || M <- lists:usort(D1 ++ D2)],
361    %% Make sure we emit a stats event even if nothing
362    %% mirroring-related has changed - the policy may have changed anyway.
363    notify_policy_changed(Q2).
364
365is_policy_applicable(QName, Policy) ->
366    case lookup(QName) of
367        {ok, Q} ->
368            rabbit_queue_type:is_policy_applicable(Q, Policy);
369        _ ->
370            %% Defaults to previous behaviour. Apply always
371            true
372    end.
373
374is_server_named_allowed(Args) ->
375    Type = get_queue_type(Args),
376    rabbit_queue_type:is_server_named_allowed(Type).
377
378-spec lookup
379        (name()) ->
380            rabbit_types:ok(amqqueue:amqqueue()) |
381            rabbit_types:error('not_found');
382        ([name()]) ->
383            [amqqueue:amqqueue()].
384
385lookup([])     -> [];                             %% optimisation
386lookup([Name]) -> ets:lookup(rabbit_queue, Name); %% optimisation
387lookup(Names) when is_list(Names) ->
388    %% Normally we'd call mnesia:dirty_read/1 here, but that is quite
389    %% expensive for reasons explained in rabbit_misc:dirty_read/1.
390    lists:append([ets:lookup(rabbit_queue, Name) || Name <- Names]);
391lookup(Name) ->
392    rabbit_misc:dirty_read({rabbit_queue, Name}).
393
394-spec lookup_many ([name()]) -> [amqqueue:amqqueue()].
395
396lookup_many(Names) when is_list(Names) ->
397    lookup(Names).
398
399-spec not_found_or_absent(name()) -> not_found_or_absent().
400
401not_found_or_absent(Name) ->
402    %% NB: we assume that the caller has already performed a lookup on
403    %% rabbit_queue and not found anything
404    case mnesia:read({rabbit_durable_queue, Name}) of
405        []  -> not_found;
406        [Q] -> {absent, Q, nodedown} %% Q exists on stopped node
407    end.
408
409-spec not_found_or_absent_dirty(name()) -> not_found_or_absent().
410
411not_found_or_absent_dirty(Name) ->
412    %% We should read from both tables inside a tx, to get a
413    %% consistent view. But the chances of an inconsistency are small,
414    %% and only affect the error kind.
415    case rabbit_misc:dirty_read({rabbit_durable_queue, Name}) of
416        {error, not_found} -> not_found;
417        {ok, Q}            -> {absent, Q, nodedown}
418    end.
419
420-spec get_rebalance_lock(pid()) ->
421    {true, {rebalance_queues, pid()}} | false.
422get_rebalance_lock(Pid) when is_pid(Pid) ->
423    Id = {rebalance_queues, Pid},
424    Nodes = [node()|nodes()],
425    %% Note that we're not re-trying. We want to immediately know
426    %% if a re-balance is taking place and stop accordingly.
427    case global:set_lock(Id, Nodes, 0) of
428        true ->
429            {true, Id};
430        false ->
431            false
432    end.
433
434-spec rebalance('all' | 'quorum' | 'classic', binary(), binary()) ->
435                       {ok, [{node(), pos_integer()}]} | {error, term()}.
436rebalance(Type, VhostSpec, QueueSpec) ->
437    %% We have not yet acquired the rebalance_queues global lock.
438    maybe_rebalance(get_rebalance_lock(self()), Type, VhostSpec, QueueSpec).
439
440maybe_rebalance({true, Id}, Type, VhostSpec, QueueSpec) ->
441    rabbit_log:info("Starting queue rebalance operation: '~s' for vhosts matching '~s' and queues matching '~s'",
442                    [Type, VhostSpec, QueueSpec]),
443    Running = rabbit_maintenance:filter_out_drained_nodes_consistent_read(rabbit_nodes:all_running()),
444    NumRunning = length(Running),
445    ToRebalance = [Q || Q <- rabbit_amqqueue:list(),
446                        filter_per_type(Type, Q),
447                        is_replicated(Q),
448                        is_match(amqqueue:get_vhost(Q), VhostSpec) andalso
449                            is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec)],
450    NumToRebalance = length(ToRebalance),
451    ByNode = group_by_node(ToRebalance),
452    Rem = case (NumToRebalance rem NumRunning) of
453            0 -> 0;
454            _ -> 1
455        end,
456    MaxQueuesDesired = (NumToRebalance div NumRunning) + Rem,
457    Result = iterative_rebalance(ByNode, MaxQueuesDesired),
458    global:del_lock(Id),
459    rabbit_log:info("Finished queue rebalance operation"),
460    Result;
461maybe_rebalance(false, _Type, _VhostSpec, _QueueSpec) ->
462    rabbit_log:warning("Queue rebalance operation is in progress, please wait."),
463    {error, rebalance_in_progress}.
464
465%% Stream queues don't yet support rebalance
466filter_per_type(all, Q)  ->
467    ?amqqueue_is_quorum(Q) or ?amqqueue_is_classic(Q);
468filter_per_type(quorum, Q) ->
469    ?amqqueue_is_quorum(Q);
470filter_per_type(classic, Q) ->
471    ?amqqueue_is_classic(Q).
472
473rebalance_module(Q) when ?amqqueue_is_quorum(Q) ->
474    rabbit_quorum_queue;
475rebalance_module(Q) when ?amqqueue_is_classic(Q) ->
476    rabbit_mirror_queue_misc.
477
478get_resource_name(#resource{name = Name}) ->
479    Name.
480
481get_resource_vhost_name(#resource{virtual_host = VHostName}) ->
482    VHostName.
483
484is_match(Subj, RegEx) ->
485   nomatch /= re:run(Subj, RegEx).
486
487iterative_rebalance(ByNode, MaxQueuesDesired) ->
488    case maybe_migrate(ByNode, MaxQueuesDesired) of
489        {ok, Summary} ->
490            rabbit_log:info("All queue masters are balanced"),
491            {ok, Summary};
492        {migrated, Other} ->
493            iterative_rebalance(Other, MaxQueuesDesired);
494        {not_migrated, Other} ->
495            iterative_rebalance(Other, MaxQueuesDesired)
496    end.
497
498maybe_migrate(ByNode, MaxQueuesDesired) ->
499    maybe_migrate(ByNode, MaxQueuesDesired, maps:keys(ByNode)).
500
501maybe_migrate(ByNode, _, []) ->
502    {ok, maps:fold(fun(K, V, Acc) ->
503                           {CQs, QQs} = lists:partition(fun({_, Q, _}) ->
504                                                                ?amqqueue_is_classic(Q)
505                                                        end, V),
506                           [[{<<"Node name">>, K}, {<<"Number of quorum queues">>, length(QQs)},
507                             {<<"Number of replicated classic queues">>, length(CQs)}] | Acc]
508                   end, [], ByNode)};
509maybe_migrate(ByNode, MaxQueuesDesired, [N | Nodes]) ->
510    case maps:get(N, ByNode, []) of
511        [{_, Q, false} = Queue | Queues] = All when length(All) > MaxQueuesDesired ->
512            Name = amqqueue:get_name(Q),
513            Module = rebalance_module(Q),
514            Candidates = rabbit_maintenance:filter_out_drained_nodes_local_read(Module:get_replicas(Q) -- [N]),
515            case Candidates of
516                [] ->
517                    {not_migrated, update_not_migrated_queue(N, Queue, Queues, ByNode)};
518                _ ->
519                    [{Length, Destination} | _] = sort_by_number_of_queues(Candidates, ByNode),
520                    rabbit_log:info("Migrating queue ~p from node ~p with ~p queues to node ~p with ~p queues",
521                                       [Name, N, length(All), Destination, Length]),
522                    case Module:transfer_leadership(Q, Destination) of
523                        {migrated, NewNode} ->
524                            rabbit_log:info("Queue ~p migrated to ~p", [Name, NewNode]),
525                            {migrated, update_migrated_queue(Destination, N, Queue, Queues, ByNode)};
526                        {not_migrated, Reason} ->
527                            rabbit_log:warning("Error migrating queue ~p: ~p", [Name, Reason]),
528                            {not_migrated, update_not_migrated_queue(N, Queue, Queues, ByNode)}
529                    end
530            end;
531        [{_, _, true} | _] = All when length(All) > MaxQueuesDesired ->
532            rabbit_log:warning("Node ~p contains ~p queues, but all have already migrated. "
533                               "Do nothing", [N, length(All)]),
534            maybe_migrate(ByNode, MaxQueuesDesired, Nodes);
535        All ->
536            rabbit_log:debug("Node ~p only contains ~p queues, do nothing",
537                               [N, length(All)]),
538            maybe_migrate(ByNode, MaxQueuesDesired, Nodes)
539    end.
540
541update_not_migrated_queue(N, {Entries, Q, _}, Queues, ByNode) ->
542    maps:update(N, Queues ++ [{Entries, Q, true}], ByNode).
543
544update_migrated_queue(NewNode, OldNode, {Entries, Q, _}, Queues, ByNode) ->
545    maps:update_with(NewNode,
546                     fun(L) -> L ++ [{Entries, Q, true}] end,
547                     [{Entries, Q, true}], maps:update(OldNode, Queues, ByNode)).
548
549sort_by_number_of_queues(Nodes, ByNode) ->
550    lists:keysort(1,
551                  lists:map(fun(Node) ->
552                                    {num_queues(Node, ByNode), Node}
553                            end, Nodes)).
554
555num_queues(Node, ByNode) ->
556    length(maps:get(Node, ByNode, [])).
557
558group_by_node(Queues) ->
559    ByNode = lists:foldl(fun(Q, Acc) ->
560                                 Module = rebalance_module(Q),
561                                 Length = Module:queue_length(Q),
562                                 maps:update_with(amqqueue:qnode(Q),
563                                                  fun(L) -> [{Length, Q, false} | L] end,
564                                                  [{Length, Q, false}], Acc)
565                         end, #{}, Queues),
566    maps:map(fun(_K, V) -> lists:keysort(1, V) end, ByNode).
567
568-spec with(name(),
569           qfun(A),
570           fun((not_found_or_absent()) -> rabbit_types:channel_exit())) ->
571    A | rabbit_types:channel_exit().
572
573with(Name, F, E) ->
574    with(Name, F, E, 2000).
575
576with(#resource{} = Name, F, E, RetriesLeft) ->
577    case lookup(Name) of
578        {ok, Q} when ?amqqueue_state_is(Q, live) andalso RetriesLeft =:= 0 ->
579            %% Something bad happened to that queue, we are bailing out
580            %% on processing current request.
581            E({absent, Q, timeout});
582        {ok, Q} when ?amqqueue_state_is(Q, stopped) andalso RetriesLeft =:= 0 ->
583            %% The queue was stopped and not migrated
584            E({absent, Q, stopped});
585        %% The queue process has crashed with unknown error
586        {ok, Q} when ?amqqueue_state_is(Q, crashed) ->
587            E({absent, Q, crashed});
588        %% The queue process has been stopped by a supervisor.
589        %% In that case a synchronised mirror can take over
590        %% so we should retry.
591        {ok, Q} when ?amqqueue_state_is(Q, stopped) ->
592            %% The queue process was stopped by the supervisor
593            rabbit_misc:with_exit_handler(
594              fun () -> retry_wait(Q, F, E, RetriesLeft) end,
595              fun () -> F(Q) end);
596        %% The queue is supposed to be active.
597        %% The master node can go away or queue can be killed
598        %% so we retry, waiting for a mirror to take over.
599        {ok, Q} when ?amqqueue_state_is(Q, live) ->
600            %% We check is_process_alive(QPid) in case we receive a
601            %% nodedown (for example) in F() that has nothing to do
602            %% with the QPid. F() should be written s.t. that this
603            %% cannot happen, so we bail if it does since that
604            %% indicates a code bug and we don't want to get stuck in
605            %% the retry loop.
606            rabbit_misc:with_exit_handler(
607              fun () -> retry_wait(Q, F, E, RetriesLeft) end,
608              fun () -> F(Q) end);
609        {error, not_found} ->
610            E(not_found_or_absent_dirty(Name))
611    end.
612
613-spec retry_wait(amqqueue:amqqueue(),
614                 qfun(A),
615                 fun((not_found_or_absent()) -> rabbit_types:channel_exit()),
616                 non_neg_integer()) ->
617    A | rabbit_types:channel_exit().
618
619retry_wait(Q, F, E, RetriesLeft) ->
620    Name = amqqueue:get_name(Q),
621    QPid = amqqueue:get_pid(Q),
622    QState = amqqueue:get_state(Q),
623    case {QState, is_replicated(Q)} of
624        %% We don't want to repeat an operation if
625        %% there are no mirrors to migrate to
626        {stopped, false} ->
627            E({absent, Q, stopped});
628        _ ->
629            case rabbit_mnesia:is_process_alive(QPid) of
630                true ->
631                    % rabbitmq-server#1682
632                    % The old check would have crashed here,
633                    % instead, log it and run the exit fun. absent & alive is weird,
634                    % but better than crashing with badmatch,true
635                    rabbit_log:debug("Unexpected alive queue process ~p", [QPid]),
636                    E({absent, Q, alive});
637                false ->
638                    ok % Expected result
639            end,
640            timer:sleep(30),
641            with(Name, F, E, RetriesLeft - 1)
642    end.
643
644-spec with(name(), qfun(A)) ->
645          A | rabbit_types:error(not_found_or_absent()).
646
647with(Name, F) -> with(Name, F, fun (E) -> {error, E} end).
648
649-spec with_or_die(name(), qfun(A)) -> A | rabbit_types:channel_exit().
650
651with_or_die(Name, F) ->
652    with(Name, F, die_fun(Name)).
653
654-spec die_fun(name()) ->
655    fun((not_found_or_absent()) -> rabbit_types:channel_exit()).
656
657die_fun(Name) ->
658    fun (not_found)           -> not_found(Name);
659        ({absent, Q, Reason}) -> absent(Q, Reason)
660    end.
661
662-spec not_found(name()) -> rabbit_types:channel_exit().
663
664not_found(R) -> rabbit_misc:protocol_error(not_found, "no ~s", [rabbit_misc:rs(R)]).
665
666-spec absent(amqqueue:amqqueue(), absent_reason()) ->
667    rabbit_types:channel_exit().
668
669absent(Q, AbsentReason) ->
670    QueueName = amqqueue:get_name(Q),
671    QPid = amqqueue:get_pid(Q),
672    IsDurable = amqqueue:is_durable(Q),
673    priv_absent(QueueName, QPid, IsDurable, AbsentReason).
674
675-spec priv_absent(name(), pid(), boolean(), absent_reason()) ->
676    rabbit_types:channel_exit().
677
678priv_absent(QueueName, QPid, true, nodedown) ->
679    %% The assertion of durability is mainly there because we mention
680    %% durability in the error message. That way we will hopefully
681    %% notice if at some future point our logic changes s.t. we get
682    %% here with non-durable queues.
683    rabbit_misc:protocol_error(
684      not_found,
685      "home node '~s' of durable ~s is down or inaccessible",
686      [amqqueue:qnode(QPid), rabbit_misc:rs(QueueName)]);
687
688priv_absent(QueueName, _QPid, _IsDurable, stopped) ->
689    rabbit_misc:protocol_error(
690      not_found,
691      "~s process is stopped by supervisor", [rabbit_misc:rs(QueueName)]);
692
693priv_absent(QueueName, _QPid, _IsDurable, crashed) ->
694    rabbit_misc:protocol_error(
695      not_found,
696      "~s has crashed and failed to restart", [rabbit_misc:rs(QueueName)]);
697
698priv_absent(QueueName, _QPid, _IsDurable, timeout) ->
699    rabbit_misc:protocol_error(
700      not_found,
701      "failed to perform operation on ~s due to timeout", [rabbit_misc:rs(QueueName)]);
702
703priv_absent(QueueName, QPid, _IsDurable, alive) ->
704    rabbit_misc:protocol_error(
705      not_found,
706      "failed to perform operation on ~s: its master replica ~w may be stopping or being demoted",
707      [rabbit_misc:rs(QueueName), QPid]).
708
709-spec assert_equivalence
710        (amqqueue:amqqueue(), boolean(), boolean(),
711         rabbit_framing:amqp_table(), rabbit_types:maybe(pid())) ->
712            'ok' | rabbit_types:channel_exit() | rabbit_types:connection_exit().
713
714assert_equivalence(Q, DurableDeclare, AutoDeleteDeclare, Args1, Owner) ->
715    QName = amqqueue:get_name(Q),
716    DurableQ = amqqueue:is_durable(Q),
717    AutoDeleteQ = amqqueue:is_auto_delete(Q),
718    ok = check_exclusive_access(Q, Owner, strict),
719    ok = rabbit_misc:assert_field_equivalence(DurableQ, DurableDeclare, QName, durable),
720    ok = rabbit_misc:assert_field_equivalence(AutoDeleteQ, AutoDeleteDeclare, QName, auto_delete),
721    ok = assert_args_equivalence(Q, Args1).
722
723-spec check_exclusive_access(amqqueue:amqqueue(), pid()) ->
724          'ok' | rabbit_types:channel_exit().
725
726check_exclusive_access(Q, Owner) -> check_exclusive_access(Q, Owner, lax).
727
728check_exclusive_access(Q, Owner, _MatchType)
729  when ?amqqueue_exclusive_owner_is(Q, Owner) ->
730    ok;
731check_exclusive_access(Q, _ReaderPid, lax)
732  when ?amqqueue_exclusive_owner_is(Q, none) ->
733    ok;
734check_exclusive_access(Q, _ReaderPid, _MatchType) ->
735    QueueName = amqqueue:get_name(Q),
736    rabbit_misc:protocol_error(
737      resource_locked,
738      "cannot obtain exclusive access to locked ~s. It could be originally "
739      "declared on another connection or the exclusive property value does not "
740      "match that of the original declaration.",
741      [rabbit_misc:rs(QueueName)]).
742
743-spec with_exclusive_access_or_die(name(), pid(), qfun(A)) ->
744          A | rabbit_types:channel_exit().
745
746with_exclusive_access_or_die(Name, ReaderPid, F) ->
747    with_or_die(Name,
748                fun (Q) -> check_exclusive_access(Q, ReaderPid), F(Q) end).
749
750assert_args_equivalence(Q, RequiredArgs) ->
751    QueueName = amqqueue:get_name(Q),
752    Args = amqqueue:get_arguments(Q),
753    rabbit_misc:assert_args_equivalence(Args, RequiredArgs, QueueName,
754                                        [Key || {Key, _Fun} <- declare_args()]).
755
756check_declare_arguments(QueueName, Args) ->
757    check_arguments(QueueName, Args, declare_args()).
758
759check_consume_arguments(QueueName, Args) ->
760    check_arguments(QueueName, Args, consume_args()).
761
762check_arguments(QueueName, Args, Validators) ->
763    [case rabbit_misc:table_lookup(Args, Key) of
764         undefined -> ok;
765         TypeVal   -> case Fun(TypeVal, Args) of
766                          ok             -> ok;
767                          {error, Error} -> rabbit_misc:protocol_error(
768                                              precondition_failed,
769                                              "invalid arg '~s' for ~s: ~255p",
770                                              [Key, rabbit_misc:rs(QueueName),
771                                               Error])
772                      end
773     end || {Key, Fun} <- Validators],
774    ok.
775
776declare_args() ->
777    [{<<"x-expires">>,                 fun check_expires_arg/2},
778     {<<"x-message-ttl">>,             fun check_message_ttl_arg/2},
779     {<<"x-dead-letter-exchange">>,    fun check_dlxname_arg/2},
780     {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2},
781     {<<"x-max-length">>,              fun check_non_neg_int_arg/2},
782     {<<"x-max-length-bytes">>,        fun check_non_neg_int_arg/2},
783     {<<"x-max-in-memory-length">>,    fun check_non_neg_int_arg/2},
784     {<<"x-max-in-memory-bytes">>,     fun check_non_neg_int_arg/2},
785     {<<"x-max-priority">>,            fun check_max_priority_arg/2},
786     {<<"x-overflow">>,                fun check_overflow/2},
787     {<<"x-queue-mode">>,              fun check_queue_mode/2},
788     {<<"x-single-active-consumer">>,  fun check_single_active_consumer_arg/2},
789     {<<"x-queue-type">>,              fun check_queue_type/2},
790     {<<"x-quorum-initial-group-size">>,     fun check_initial_cluster_size_arg/2},
791     {<<"x-max-age">>,                 fun check_max_age_arg/2},
792     {<<"x-stream-max-segment-size-bytes">>,        fun check_non_neg_int_arg/2},
793     {<<"x-initial-cluster-size">>,    fun check_initial_cluster_size_arg/2},
794     {<<"x-queue-leader-locator">>,    fun check_queue_leader_locator_arg/2}].
795
796consume_args() -> [{<<"x-priority">>,              fun check_int_arg/2},
797                   {<<"x-cancel-on-ha-failover">>, fun check_bool_arg/2},
798                   {<<"x-stream-offset">>, fun check_stream_offset_arg/2}].
799
800check_int_arg({Type, _}, _) ->
801    case lists:member(Type, ?INTEGER_ARG_TYPES) of
802        true  -> ok;
803        false -> {error, rabbit_misc:format("expected integer, got ~p", [Type])}
804    end;
805check_int_arg(Val, _) when is_integer(Val) ->
806    ok;
807check_int_arg(_Val, _) ->
808    {error, {unacceptable_type, "expected integer"}}.
809
810check_bool_arg({bool, _}, _) -> ok;
811check_bool_arg({Type, _}, _) -> {error, {unacceptable_type, Type}};
812check_bool_arg(true, _)  -> ok;
813check_bool_arg(false, _) -> ok;
814check_bool_arg(_Val, _) -> {error, {unacceptable_type, "expected boolean"}}.
815
816check_non_neg_int_arg({Type, Val}, Args) ->
817    case check_int_arg({Type, Val}, Args) of
818        ok when Val >= 0 -> ok;
819        ok               -> {error, {value_negative, Val}};
820        Error            -> Error
821    end;
822check_non_neg_int_arg(Val, Args) ->
823    case check_int_arg(Val, Args) of
824        ok when Val >= 0 -> ok;
825        ok               -> {error, {value_negative, Val}};
826        Error            -> Error
827    end.
828
829check_expires_arg({Type, Val}, Args) ->
830    case check_int_arg({Type, Val}, Args) of
831        ok when Val == 0 -> {error, {value_zero, Val}};
832        ok               -> rabbit_misc:check_expiry(Val);
833        Error            -> Error
834    end;
835check_expires_arg(Val, Args) ->
836    case check_int_arg(Val, Args) of
837        ok when Val == 0 -> {error, {value_zero, Val}};
838        ok               -> rabbit_misc:check_expiry(Val);
839        Error            -> Error
840    end.
841
842check_message_ttl_arg({Type, Val}, Args) ->
843    case check_int_arg({Type, Val}, Args) of
844        ok    -> rabbit_misc:check_expiry(Val);
845        Error -> Error
846    end;
847check_message_ttl_arg(Val, Args) ->
848    case check_int_arg(Val, Args) of
849        ok    -> rabbit_misc:check_expiry(Val);
850        Error -> Error
851    end.
852
853check_max_priority_arg({Type, Val}, Args) ->
854    case check_non_neg_int_arg({Type, Val}, Args) of
855        ok when Val =< ?MAX_SUPPORTED_PRIORITY -> ok;
856        ok                                     -> {error, {max_value_exceeded, Val}};
857        Error                                  -> Error
858    end;
859check_max_priority_arg(Val, Args) ->
860    case check_non_neg_int_arg(Val, Args) of
861        ok when Val =< ?MAX_SUPPORTED_PRIORITY -> ok;
862        ok                                     -> {error, {max_value_exceeded, Val}};
863        Error                                  -> Error
864    end.
865
866check_single_active_consumer_arg({Type, Val}, Args) ->
867    check_bool_arg({Type, Val}, Args);
868check_single_active_consumer_arg(Val, Args) ->
869    check_bool_arg(Val, Args).
870
871check_initial_cluster_size_arg({Type, Val}, Args) ->
872    case check_non_neg_int_arg({Type, Val}, Args) of
873        ok when Val == 0 -> {error, {value_zero, Val}};
874        ok               -> ok;
875        Error            -> Error
876    end;
877check_initial_cluster_size_arg(Val, Args) ->
878    case check_non_neg_int_arg(Val, Args) of
879        ok when Val == 0 -> {error, {value_zero, Val}};
880        ok               -> ok;
881        Error            -> Error
882    end.
883
884check_max_age_arg({longstr, Val}, _Args) ->
885    case check_max_age(Val) of
886        {error, _} = E ->
887            E;
888        _ ->
889            ok
890    end;
891check_max_age_arg({Type,    _}, _Args) ->
892    {error, {unacceptable_type, Type}}.
893
894check_max_age(MaxAge) ->
895    case re:run(MaxAge, "(^[0-9]*)(.*)", [{capture, all_but_first, list}]) of
896        {match, [Value, Unit]} ->
897            case list_to_integer(Value) of
898                I when I > 0 ->
899                    case lists:member(Unit, ["Y", "M", "D", "h", "m", "s"]) of
900                        true ->
901                            Int = list_to_integer(Value),
902                            Int * unit_value_in_ms(Unit);
903                        false ->
904                            {error, invalid_max_age}
905                    end;
906                _ ->
907                    {error, invalid_max_age}
908            end;
909        _ ->
910            {error, invalid_max_age}
911    end.
912
913unit_value_in_ms("Y") ->
914    365 * unit_value_in_ms("D");
915unit_value_in_ms("M") ->
916    30 * unit_value_in_ms("D");
917unit_value_in_ms("D") ->
918    24 * unit_value_in_ms("h");
919unit_value_in_ms("h") ->
920    3600 * unit_value_in_ms("s");
921unit_value_in_ms("m") ->
922    60 * unit_value_in_ms("s");
923unit_value_in_ms("s") ->
924    1000.
925
926%% Note that the validity of x-dead-letter-exchange is already verified
927%% by rabbit_channel's queue.declare handler.
928check_dlxname_arg({longstr, _}, _) -> ok;
929check_dlxname_arg({Type,    _}, _) -> {error, {unacceptable_type, Type}};
930check_dlxname_arg(Val, _) when is_list(Val) or is_binary(Val) -> ok;
931check_dlxname_arg(_Val, _) -> {error, {unacceptable_type, "expected a string (valid exchange name)"}}.
932
933check_dlxrk_arg({longstr, _}, Args) ->
934    case rabbit_misc:table_lookup(Args, <<"x-dead-letter-exchange">>) of
935        undefined -> {error, routing_key_but_no_dlx_defined};
936        _         -> ok
937    end;
938check_dlxrk_arg({Type,    _}, _Args) ->
939    {error, {unacceptable_type, Type}};
940check_dlxrk_arg(Val, Args) when is_binary(Val) ->
941    case rabbit_misc:table_lookup(Args, <<"x-dead-letter-exchange">>) of
942        undefined -> {error, routing_key_but_no_dlx_defined};
943        _         -> ok
944    end;
945check_dlxrk_arg(_Val, _Args) ->
946    {error, {unacceptable_type, "expected a string"}}.
947
948-define(KNOWN_OVERFLOW_MODES, [<<"drop-head">>, <<"reject-publish">>, <<"reject-publish-dlx">>]).
949check_overflow({longstr, Val}, _Args) ->
950    case lists:member(Val, ?KNOWN_OVERFLOW_MODES) of
951        true  -> ok;
952        false -> {error, invalid_overflow}
953    end;
954check_overflow({Type,    _}, _Args) ->
955    {error, {unacceptable_type, Type}};
956check_overflow(Val, _Args) when is_binary(Val) ->
957    case lists:member(Val, ?KNOWN_OVERFLOW_MODES) of
958        true  -> ok;
959        false -> {error, invalid_overflow}
960    end;
961check_overflow(_Val, _Args) ->
962    {error, invalid_overflow}.
963
964-define(KNOWN_LEADER_LOCATORS, [<<"client-local">>, <<"random">>, <<"least-leaders">>]).
965check_queue_leader_locator_arg({longstr, Val}, _Args) ->
966    case lists:member(Val, ?KNOWN_LEADER_LOCATORS) of
967        true  -> ok;
968        false -> {error, invalid_queue_locator_arg}
969    end;
970check_queue_leader_locator_arg({Type, _}, _Args) ->
971    {error, {unacceptable_type, Type}};
972check_queue_leader_locator_arg(Val, _Args) when is_binary(Val) ->
973    case lists:member(Val, ?KNOWN_LEADER_LOCATORS) of
974        true  -> ok;
975        false -> {error, invalid_queue_locator_arg}
976    end;
977check_queue_leader_locator_arg(_Val, _Args) ->
978    {error, invalid_queue_locator_arg}.
979
980check_stream_offset_arg(Val, _Args) ->
981    case rabbit_stream_queue:parse_offset_arg(Val) of
982        {ok, _} ->
983            ok;
984        {error, _} ->
985            {error, {invalid_stream_offset_arg, Val}}
986    end.
987
988-define(KNOWN_QUEUE_MODES, [<<"default">>, <<"lazy">>]).
989check_queue_mode({longstr, Val}, _Args) ->
990    case lists:member(Val, ?KNOWN_QUEUE_MODES) of
991        true  -> ok;
992        false -> {error, rabbit_misc:format("unsupported queue mode '~s'", [Val])}
993    end;
994check_queue_mode({Type,    _}, _Args) ->
995    {error, {unacceptable_type, Type}};
996check_queue_mode(Val, _Args) when is_binary(Val) ->
997    case lists:member(Val, ?KNOWN_QUEUE_MODES) of
998        true  -> ok;
999        false -> {error, rabbit_misc:format("unsupported queue mode '~s'", [Val])}
1000    end;
1001check_queue_mode(_Val, _Args) ->
1002    {error, invalid_queue_mode}.
1003
1004-define(KNOWN_QUEUE_TYPES, [<<"classic">>, <<"quorum">>, <<"stream">>]).
1005check_queue_type({longstr, Val}, _Args) ->
1006    case lists:member(Val, ?KNOWN_QUEUE_TYPES) of
1007        true  -> ok;
1008        false -> {error, rabbit_misc:format("unsupported queue type '~s'", [Val])}
1009    end;
1010check_queue_type({Type,    _}, _Args) ->
1011    {error, {unacceptable_type, Type}};
1012check_queue_type(Val, _Args) when is_binary(Val) ->
1013    case lists:member(Val, ?KNOWN_QUEUE_TYPES) of
1014        true  -> ok;
1015        false -> {error, rabbit_misc:format("unsupported queue type '~s'", [Val])}
1016    end;
1017check_queue_type(_Val, _Args) ->
1018    {error, invalid_queue_type}.
1019
1020-spec list() -> [amqqueue:amqqueue()].
1021
1022list() ->
1023    list_with_possible_retry(fun do_list/0).
1024
1025do_list() ->
1026    mnesia:dirty_match_object(rabbit_queue, amqqueue:pattern_match_all()).
1027
1028-spec count() -> non_neg_integer().
1029
1030count() ->
1031    mnesia:table_info(rabbit_queue, size).
1032
1033-spec list_names() -> [rabbit_amqqueue:name()].
1034
1035list_names() -> mnesia:dirty_all_keys(rabbit_queue).
1036
1037list_names(VHost) -> [amqqueue:get_name(Q) || Q <- list(VHost)].
1038
1039list_local_names() ->
1040    [ amqqueue:get_name(Q) || Q <- list(),
1041           amqqueue:get_state(Q) =/= crashed, is_local_to_node(amqqueue:get_pid(Q), node())].
1042
1043list_local_names_down() ->
1044    [ amqqueue:get_name(Q) || Q <- list(),
1045                              is_local_to_node(amqqueue:get_pid(Q), node()),
1046                              is_down(Q)].
1047
1048is_down(Q) ->
1049    try
1050        info(Q, [state]) == [{state, down}]
1051    catch
1052        _:_ ->
1053            true
1054    end.
1055
1056
1057-spec sample_local_queues() -> [amqqueue:amqqueue()].
1058sample_local_queues() -> sample_n_by_name(list_local_names(), 300).
1059
1060-spec sample_n_by_name([rabbit_amqqueue:name()], pos_integer()) -> [amqqueue:amqqueue()].
1061sample_n_by_name([], _N) ->
1062    [];
1063sample_n_by_name(Names, N) when is_list(Names) andalso is_integer(N) andalso N > 0 ->
1064    %% lists:nth/2 throws when position is > list length
1065    M = erlang:min(N, length(Names)),
1066    Ids = lists:foldl(fun( _, Acc) when length(Acc) >= 100 ->
1067                            Acc;
1068                        (_, Acc) ->
1069                            Pick = lists:nth(rand:uniform(M), Names),
1070                            [Pick | Acc]
1071                     end,
1072         [], lists:seq(1, M)),
1073    lists:map(fun (Id) ->
1074                {ok, Q} = rabbit_amqqueue:lookup(Id),
1075                Q
1076              end,
1077              lists:usort(Ids)).
1078
1079-spec sample_n([amqqueue:amqqueue()], pos_integer()) -> [amqqueue:amqqueue()].
1080sample_n([], _N) ->
1081    [];
1082sample_n(Queues, N) when is_list(Queues) andalso is_integer(N) andalso N > 0 ->
1083    Names = [amqqueue:get_name(Q) || Q <- Queues],
1084    sample_n_by_name(Names, N).
1085
1086
1087-spec list_by_type(atom()) -> [amqqueue:amqqueue()].
1088
1089list_by_type(classic) -> list_by_type(rabbit_classic_queue);
1090list_by_type(quorum)  -> list_by_type(rabbit_quorum_queue);
1091list_by_type(stream)  -> list_by_type(rabbit_stream_queue);
1092list_by_type(Type) ->
1093    {atomic, Qs} =
1094        mnesia:sync_transaction(
1095          fun () ->
1096                  mnesia:match_object(rabbit_durable_queue,
1097                                      amqqueue:pattern_match_on_type(Type),
1098                                      read)
1099          end),
1100    Qs.
1101
1102-spec list_local_quorum_queue_names() -> [rabbit_amqqueue:name()].
1103
1104list_local_quorum_queue_names() ->
1105    [ amqqueue:get_name(Q) || Q <- list_by_type(quorum),
1106           amqqueue:get_state(Q) =/= crashed,
1107      lists:member(node(), get_quorum_nodes(Q))].
1108
1109-spec list_local_quorum_queues() -> [amqqueue:amqqueue()].
1110list_local_quorum_queues() ->
1111    [ Q || Q <- list_by_type(quorum),
1112      amqqueue:get_state(Q) =/= crashed,
1113      lists:member(node(), get_quorum_nodes(Q))].
1114
1115-spec list_local_stream_queues() -> [amqqueue:amqqueue()].
1116list_local_stream_queues() ->
1117    [ Q || Q <- list_by_type(stream),
1118      amqqueue:get_state(Q) =/= crashed,
1119      lists:member(node(), get_quorum_nodes(Q))].
1120
1121-spec list_local_leaders() -> [amqqueue:amqqueue()].
1122list_local_leaders() ->
1123    [ Q || Q <- list(),
1124         amqqueue:is_quorum(Q),
1125         amqqueue:get_state(Q) =/= crashed, amqqueue:get_leader(Q) =:= node()].
1126
1127-spec list_local_followers() -> [amqqueue:amqqueue()].
1128list_local_followers() ->
1129    [Q
1130      || Q <- list(),
1131         amqqueue:is_quorum(Q),
1132         amqqueue:get_state(Q) =/= crashed,
1133         amqqueue:get_leader(Q) =/= node(),
1134         rabbit_quorum_queue:is_recoverable(Q)
1135         ].
1136
1137-spec list_local_mirrored_classic_queues() -> [amqqueue:amqqueue()].
1138list_local_mirrored_classic_queues() ->
1139    [ Q || Q <- list(),
1140        amqqueue:get_state(Q) =/= crashed,
1141        amqqueue:is_classic(Q),
1142        is_local_to_node(amqqueue:get_pid(Q), node()),
1143        is_replicated(Q)].
1144
1145-spec list_local_mirrored_classic_names() -> [rabbit_amqqueue:name()].
1146list_local_mirrored_classic_names() ->
1147    [ amqqueue:get_name(Q) || Q <- list(),
1148           amqqueue:get_state(Q) =/= crashed,
1149           amqqueue:is_classic(Q),
1150           is_local_to_node(amqqueue:get_pid(Q), node()),
1151           is_replicated(Q)].
1152
1153-spec list_local_mirrored_classic_without_synchronised_mirrors() ->
1154    [amqqueue:amqqueue()].
1155list_local_mirrored_classic_without_synchronised_mirrors() ->
1156    [ Q || Q <- list(),
1157         amqqueue:get_state(Q) =/= crashed,
1158         amqqueue:is_classic(Q),
1159         %% filter out exclusive queues as they won't actually be mirrored
1160         is_not_exclusive(Q),
1161         is_local_to_node(amqqueue:get_pid(Q), node()),
1162         is_replicated(Q),
1163         not has_synchronised_mirrors_online(Q)].
1164
1165-spec list_local_mirrored_classic_without_synchronised_mirrors_for_cli() ->
1166    [#{binary => any()}].
1167list_local_mirrored_classic_without_synchronised_mirrors_for_cli() ->
1168    ClassicQs = list_local_mirrored_classic_without_synchronised_mirrors(),
1169    [begin
1170         #resource{name = Name} = amqqueue:get_name(Q),
1171         #{
1172             <<"readable_name">> => rabbit_data_coercion:to_binary(rabbit_misc:rs(amqqueue:get_name(Q))),
1173             <<"name">>          => Name,
1174             <<"virtual_host">>  => amqqueue:get_vhost(Q),
1175             <<"type">>          => <<"classic">>
1176         }
1177     end || Q <- ClassicQs].
1178
1179-spec list_local_quorum_queues_with_name_matching(binary()) -> [amqqueue:amqqueue()].
1180list_local_quorum_queues_with_name_matching(Pattern) ->
1181    [ Q || Q <- list_by_type(quorum),
1182      amqqueue:get_state(Q) =/= crashed,
1183      lists:member(node(), get_quorum_nodes(Q)),
1184      is_match(get_resource_name(amqqueue:get_name(Q)), Pattern)].
1185
1186-spec list_local_quorum_queues_with_name_matching(vhost:name(), binary()) -> [amqqueue:amqqueue()].
1187list_local_quorum_queues_with_name_matching(VHostName, Pattern) ->
1188    [ Q || Q <- list_by_type(quorum),
1189      amqqueue:get_state(Q) =/= crashed,
1190      lists:member(node(), get_quorum_nodes(Q)),
1191      is_in_virtual_host(Q, VHostName),
1192      is_match(get_resource_name(amqqueue:get_name(Q)), Pattern)].
1193
1194is_local_to_node(QPid, Node) when ?IS_CLASSIC(QPid) ->
1195    Node =:= node(QPid);
1196is_local_to_node({_, Leader} = QPid, Node) when ?IS_QUORUM(QPid) ->
1197    Node =:= Leader;
1198is_local_to_node(_QPid, _Node) ->
1199    false.
1200
1201is_in_virtual_host(Q, VHostName) ->
1202    VHostName =:= get_resource_vhost_name(amqqueue:get_name(Q)).
1203
1204-spec list(vhost:name()) -> [amqqueue:amqqueue()].
1205list(VHostPath) ->
1206    list(VHostPath, rabbit_queue).
1207
1208list(VHostPath, TableName) ->
1209    list_with_possible_retry(fun() -> do_list(VHostPath, TableName) end).
1210
1211%% Not dirty_match_object since that would not be transactional when used in a
1212%% tx context
1213do_list(VHostPath, TableName) ->
1214    mnesia:async_dirty(
1215      fun () ->
1216              mnesia:match_object(
1217                TableName,
1218                amqqueue:pattern_match_on_name(rabbit_misc:r(VHostPath, queue)),
1219                read)
1220      end).
1221
1222list_with_possible_retry(Fun) ->
1223    %% amqqueue migration:
1224    %% The `rabbit_queue` or `rabbit_durable_queue` tables
1225    %% might be migrated between the time we query the pattern
1226    %% (with the `amqqueue` module) and the time we call
1227    %% `mnesia:dirty_match_object()`. This would lead to an empty list
1228    %% (no object matching the now incorrect pattern), not a Mnesia
1229    %% error.
1230    %%
1231    %% So if the result is an empty list and the version of the
1232    %% `amqqueue` record changed in between, we retry the operation.
1233    %%
1234    %% However, we don't do this if inside a Mnesia transaction: we
1235    %% could end up with a live lock between this started transaction
1236    %% and the Mnesia table migration which is blocked (but the
1237    %% rabbit_feature_flags lock is held).
1238    AmqqueueRecordVersion = amqqueue:record_version_to_use(),
1239    case Fun() of
1240        [] ->
1241            case mnesia:is_transaction() of
1242                true ->
1243                    [];
1244                false ->
1245                    case amqqueue:record_version_to_use() of
1246                        AmqqueueRecordVersion -> [];
1247                        _                     -> Fun()
1248                    end
1249            end;
1250        Ret ->
1251            Ret
1252    end.
1253
1254-spec list_down(rabbit_types:vhost()) -> [amqqueue:amqqueue()].
1255
1256list_down(VHostPath) ->
1257    case rabbit_vhost:exists(VHostPath) of
1258        false -> [];
1259        true  ->
1260            Present = list(VHostPath),
1261            Durable = list(VHostPath, rabbit_durable_queue),
1262            PresentS = sets:from_list([amqqueue:get_name(Q) || Q <- Present]),
1263            sets:to_list(sets:filter(fun (Q) ->
1264                                             N = amqqueue:get_name(Q),
1265                                             not sets:is_element(N, PresentS)
1266                                     end, sets:from_list(Durable)))
1267    end.
1268
1269count(VHost) ->
1270  try
1271    %% this is certainly suboptimal but there is no way to count
1272    %% things using a secondary index in Mnesia. Our counter-table-per-node
1273    %% won't work here because with master migration of mirrored queues
1274    %% the "ownership" of queues by nodes becomes a non-trivial problem
1275    %% that requires a proper consensus algorithm.
1276    length(list_for_count(VHost))
1277  catch _:Err ->
1278    rabbit_log:error("Failed to fetch number of queues in vhost ~p:~n~p",
1279                     [VHost, Err]),
1280    0
1281  end.
1282
1283list_for_count(VHost) ->
1284    list_with_possible_retry(
1285      fun() ->
1286              mnesia:dirty_index_read(rabbit_queue,
1287                                      VHost,
1288                                      amqqueue:field_vhost())
1289      end).
1290
1291-spec info_keys() -> rabbit_types:info_keys().
1292
1293%% It should no default to classic queue keys, but a subset of those that must be shared
1294%% by all queue types. Not sure this is even being used, so will leave it here for backwards
1295%% compatibility. Each queue type handles now info(Q, all_keys) with the keys it supports.
1296info_keys() -> rabbit_amqqueue_process:info_keys().
1297
1298map(Qs, F) -> rabbit_misc:filter_exit_map(F, Qs).
1299
1300is_unresponsive(Q, _Timeout) when ?amqqueue_state_is(Q, crashed) ->
1301    false;
1302is_unresponsive(Q, Timeout) when ?amqqueue_is_classic(Q) ->
1303    QPid = amqqueue:get_pid(Q),
1304    try
1305        delegate:invoke(QPid, {gen_server2, call, [{info, [name]}, Timeout]}),
1306        false
1307    catch
1308        %% TODO catch any exit??
1309        exit:{timeout, _} ->
1310            true
1311    end;
1312is_unresponsive(Q, Timeout) when ?amqqueue_is_quorum(Q) ->
1313    try
1314        Leader = amqqueue:get_pid(Q),
1315        case rabbit_fifo_client:stat(Leader, Timeout) of
1316          {ok, _, _}   -> false;
1317          {timeout, _} -> true;
1318          {error, _}   -> true
1319        end
1320    catch
1321        exit:{timeout, _} ->
1322            true
1323    end;
1324is_unresponsive(Q, Timeout) when ?amqqueue_is_stream(Q) ->
1325    try
1326        #{leader_pid := LeaderPid} = amqqueue:get_type_state(Q),
1327        case gen_batch_server:call(LeaderPid, get_reader_context, Timeout) of
1328            #{dir := _} -> false;
1329            _ -> true
1330        end
1331    catch
1332        exit:{timeout, _} ->
1333            true
1334    end.
1335
1336format(Q) when ?amqqueue_is_quorum(Q) -> rabbit_quorum_queue:format(Q);
1337format(Q) -> rabbit_amqqueue_process:format(Q).
1338
1339-spec info(amqqueue:amqqueue()) -> rabbit_types:infos().
1340
1341info(Q) when ?is_amqqueue(Q) -> rabbit_queue_type:info(Q, all_keys).
1342
1343
1344-spec info(amqqueue:amqqueue(), rabbit_types:info_keys()) ->
1345          rabbit_types:infos().
1346
1347info(Q, Items) when ?is_amqqueue(Q) ->
1348    rabbit_queue_type:info(Q, Items).
1349
1350info_down(Q, DownReason) ->
1351    rabbit_queue_type:info_down(Q, DownReason).
1352
1353info_down(Q, Items, DownReason) ->
1354    rabbit_queue_type:info_down(Q, Items, DownReason).
1355
1356-spec info_all(rabbit_types:vhost()) -> [rabbit_types:infos()].
1357
1358info_all(VHostPath) ->
1359    map(list(VHostPath), fun (Q) -> info(Q) end) ++
1360        map(list_down(VHostPath), fun (Q) -> info_down(Q, down) end).
1361
1362-spec info_all(rabbit_types:vhost(), rabbit_types:info_keys()) ->
1363          [rabbit_types:infos()].
1364
1365info_all(VHostPath, Items) ->
1366    map(list(VHostPath), fun (Q) -> info(Q, Items) end) ++
1367        map(list_down(VHostPath), fun (Q) -> info_down(Q, Items, down) end).
1368
1369emit_info_local(VHostPath, Items, Ref, AggregatorPid) ->
1370    rabbit_control_misc:emitting_map_with_exit_handler(
1371      AggregatorPid, Ref, fun(Q) -> info(Q, Items) end, list_local(VHostPath)).
1372
1373emit_info_all(Nodes, VHostPath, Items, Ref, AggregatorPid) ->
1374    Pids = [ spawn_link(Node, rabbit_amqqueue, emit_info_local, [VHostPath, Items, Ref, AggregatorPid]) || Node <- Nodes ],
1375    rabbit_control_misc:await_emitters_termination(Pids).
1376
1377collect_info_all(VHostPath, Items) ->
1378    Nodes = rabbit_nodes:all_running(),
1379    Ref = make_ref(),
1380    Pids = [ spawn_link(Node, rabbit_amqqueue, emit_info_local, [VHostPath, Items, Ref, self()]) || Node <- Nodes ],
1381    rabbit_control_misc:await_emitters_termination(Pids),
1382    wait_for_queues(Ref, length(Pids), []).
1383
1384wait_for_queues(Ref, N, Acc) ->
1385    receive
1386        {Ref, finished} when N == 1 ->
1387            Acc;
1388        {Ref, finished} ->
1389            wait_for_queues(Ref, N - 1, Acc);
1390        {Ref, Items, continue} ->
1391            wait_for_queues(Ref, N, [Items | Acc])
1392    after
1393        1000 ->
1394            Acc
1395    end.
1396
1397emit_info_down(VHostPath, Items, Ref, AggregatorPid) ->
1398    rabbit_control_misc:emitting_map_with_exit_handler(
1399      AggregatorPid, Ref, fun(Q) -> info_down(Q, Items, down) end,
1400      list_down(VHostPath)).
1401
1402emit_unresponsive_local(VHostPath, Items, Timeout, Ref, AggregatorPid) ->
1403    rabbit_control_misc:emitting_map_with_exit_handler(
1404      AggregatorPid, Ref, fun(Q) -> case is_unresponsive(Q, Timeout) of
1405                                        true -> info_down(Q, Items, unresponsive);
1406                                        false -> []
1407                                    end
1408                          end, list_local(VHostPath)
1409     ).
1410
1411emit_unresponsive(Nodes, VHostPath, Items, Timeout, Ref, AggregatorPid) ->
1412    Pids = [ spawn_link(Node, rabbit_amqqueue, emit_unresponsive_local,
1413                        [VHostPath, Items, Timeout, Ref, AggregatorPid]) || Node <- Nodes ],
1414    rabbit_control_misc:await_emitters_termination(Pids).
1415
1416info_local(VHostPath) ->
1417    map(list_local(VHostPath), fun (Q) -> info(Q, [name]) end).
1418
1419list_local(VHostPath) ->
1420    [Q || Q <- list(VHostPath),
1421          amqqueue:get_state(Q) =/= crashed, is_local_to_node(amqqueue:get_pid(Q), node())].
1422
1423-spec force_event_refresh(reference()) -> 'ok'.
1424
1425% Note: https://www.pivotaltracker.com/story/show/166962656
1426% This event is necessary for the stats timer to be initialized with
1427% the correct values once the management agent has started
1428force_event_refresh(Ref) ->
1429    %% note: quorum queuse emit stats on periodic ticks that run unconditionally,
1430    %%       so force_event_refresh is unnecessary (and, in fact, would only produce log noise) for QQs.
1431    ClassicQs = list_by_type(rabbit_classic_queue),
1432    [gen_server2:cast(amqqueue:get_pid(Q),
1433                      {force_event_refresh, Ref}) || Q <- ClassicQs],
1434    ok.
1435
1436-spec notify_policy_changed(amqqueue:amqqueue()) -> 'ok'.
1437notify_policy_changed(Q) when ?is_amqqueue(Q) ->
1438    rabbit_queue_type:policy_changed(Q).
1439
1440-spec consumers(amqqueue:amqqueue()) ->
1441          [{pid(), rabbit_types:ctag(), boolean(), non_neg_integer(),
1442            boolean(), atom(),
1443            rabbit_framing:amqp_table(), rabbit_types:username()}].
1444
1445consumers(Q) when ?amqqueue_is_classic(Q) ->
1446    QPid = amqqueue:get_pid(Q),
1447    delegate:invoke(QPid, {gen_server2, call, [consumers, infinity]});
1448consumers(Q) when ?amqqueue_is_quorum(Q) ->
1449    QPid = amqqueue:get_pid(Q),
1450    case ra:local_query(QPid, fun rabbit_fifo:query_consumers/1) of
1451        {ok, {_, Result}, _} -> maps:values(Result);
1452        _                    -> []
1453    end;
1454consumers(Q) when ?amqqueue_is_stream(Q) ->
1455    %% TODO how??? they only exist on the channel
1456    %% we could list the offset listener on the writer but we don't even have a consumer tag,
1457    %% only a (channel) pid and offset
1458    [].
1459
1460-spec consumer_info_keys() -> rabbit_types:info_keys().
1461
1462consumer_info_keys() -> ?CONSUMER_INFO_KEYS.
1463
1464-spec consumers_all(rabbit_types:vhost()) ->
1465          [{name(), pid(), rabbit_types:ctag(), boolean(),
1466            non_neg_integer(), rabbit_framing:amqp_table()}].
1467
1468consumers_all(VHostPath) ->
1469    ConsumerInfoKeys = consumer_info_keys(),
1470    lists:append(
1471      map(list(VHostPath),
1472          fun(Q) -> get_queue_consumer_info(Q, ConsumerInfoKeys) end)).
1473
1474emit_consumers_all(Nodes, VHostPath, Ref, AggregatorPid) ->
1475    Pids = [ spawn_link(Node, rabbit_amqqueue, emit_consumers_local, [VHostPath, Ref, AggregatorPid]) || Node <- Nodes ],
1476    rabbit_control_misc:await_emitters_termination(Pids),
1477    ok.
1478
1479emit_consumers_local(VHostPath, Ref, AggregatorPid) ->
1480    ConsumerInfoKeys = consumer_info_keys(),
1481    rabbit_control_misc:emitting_map(
1482      AggregatorPid, Ref,
1483      fun(Q) -> get_queue_consumer_info(Q, ConsumerInfoKeys) end,
1484      list_local(VHostPath)).
1485
1486get_queue_consumer_info(Q, ConsumerInfoKeys) ->
1487    [lists:zip(ConsumerInfoKeys,
1488               [amqqueue:get_name(Q), ChPid, CTag,
1489                AckRequired, Prefetch, Active, ActivityStatus, Args]) ||
1490        {ChPid, CTag, AckRequired, Prefetch, Active, ActivityStatus, Args, _} <- consumers(Q)].
1491
1492-spec stat(amqqueue:amqqueue()) ->
1493          {'ok', non_neg_integer(), non_neg_integer()}.
1494stat(Q) ->
1495    rabbit_queue_type:stat(Q).
1496
1497-spec pid_of(amqqueue:amqqueue()) ->
1498          pid().
1499
1500pid_of(Q) -> amqqueue:get_pid(Q).
1501
1502-spec pid_of(rabbit_types:vhost(), rabbit_misc:resource_name()) ->
1503          pid() | rabbit_types:error('not_found').
1504
1505pid_of(VHost, QueueName) ->
1506  case lookup(rabbit_misc:r(VHost, queue, QueueName)) of
1507    {ok, Q}                -> pid_of(Q);
1508    {error, not_found} = E -> E
1509  end.
1510
1511-spec delete_exclusive(qpids(), pid()) -> 'ok'.
1512
1513delete_exclusive(QPids, ConnId) ->
1514    rabbit_amqqueue_common:delete_exclusive(QPids, ConnId).
1515
1516-spec delete_immediately(qpids()) -> 'ok'.
1517
1518delete_immediately(QPids) ->
1519    {Classic, Quorum} = filter_pid_per_type(QPids),
1520    [gen_server2:cast(QPid, delete_immediately) || QPid <- Classic],
1521    case Quorum of
1522        [] -> ok;
1523        _ -> {error, cannot_delete_quorum_queues, Quorum}
1524    end.
1525
1526delete_immediately_by_resource(Resources) ->
1527    {Classic, Quorum} = filter_resource_per_type(Resources),
1528    [gen_server2:cast(QPid, delete_immediately) || {_, QPid} <- Classic],
1529    [rabbit_quorum_queue:delete_immediately(Resource, QPid)
1530     || {Resource, QPid} <- Quorum],
1531    ok.
1532
1533-spec delete
1534        (amqqueue:amqqueue(), 'false', 'false', rabbit_types:username()) ->
1535            qlen() |
1536            {protocol_error, Type :: atom(), Reason :: string(), Args :: term()};
1537        (amqqueue:amqqueue(), 'true' , 'false', rabbit_types:username()) ->
1538            qlen() | rabbit_types:error('in_use') |
1539            {protocol_error, Type :: atom(), Reason :: string(), Args :: term()};
1540        (amqqueue:amqqueue(), 'false', 'true', rabbit_types:username()) ->
1541            qlen() | rabbit_types:error('not_empty') |
1542            {protocol_error, Type :: atom(), Reason :: string(), Args :: term()};
1543        (amqqueue:amqqueue(), 'true' , 'true', rabbit_types:username()) ->
1544            qlen() |
1545            rabbit_types:error('in_use') |
1546            rabbit_types:error('not_empty') |
1547            {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
1548delete(Q, IfUnused, IfEmpty, ActingUser) ->
1549    rabbit_queue_type:delete(Q, IfUnused, IfEmpty, ActingUser).
1550
1551%% delete_crashed* INCLUDED FOR BACKWARDS COMPATBILITY REASONS
1552delete_crashed(Q) when ?amqqueue_is_classic(Q) ->
1553    rabbit_classic_queue:delete_crashed(Q).
1554
1555delete_crashed(Q, ActingUser) when ?amqqueue_is_classic(Q) ->
1556    rabbit_classic_queue:delete_crashed(Q, ActingUser).
1557
1558-spec delete_crashed_internal(amqqueue:amqqueue(), rabbit_types:username()) -> 'ok'.
1559delete_crashed_internal(Q, ActingUser) when ?amqqueue_is_classic(Q) ->
1560    rabbit_classic_queue:delete_crashed_internal(Q, ActingUser).
1561
1562-spec purge(amqqueue:amqqueue()) -> qlen().
1563purge(Q) when ?is_amqqueue(Q) ->
1564    rabbit_queue_type:purge(Q).
1565
1566-spec requeue(name(),
1567              {rabbit_fifo:consumer_tag(), [msg_id()]},
1568              rabbit_queue_type:state()) ->
1569    {ok, rabbit_queue_type:state(), rabbit_queue_type:actions()}.
1570requeue(QRef, {CTag, MsgIds}, QStates) ->
1571    reject(QRef, true, {CTag, MsgIds}, QStates).
1572
1573-spec ack(name(),
1574          {rabbit_fifo:consumer_tag(), [msg_id()]},
1575          rabbit_queue_type:state()) ->
1576    {ok, rabbit_queue_type:state(), rabbit_queue_type:actions()}.
1577ack(QPid, {CTag, MsgIds}, QueueStates) ->
1578    rabbit_queue_type:settle(QPid, complete, CTag, MsgIds, QueueStates).
1579
1580
1581-spec reject(name(),
1582             boolean(),
1583             {rabbit_fifo:consumer_tag(), [msg_id()]},
1584             rabbit_queue_type:state()) ->
1585    {ok, rabbit_queue_type:state(), rabbit_queue_type:actions()}.
1586reject(QRef, Requeue, {CTag, MsgIds}, QStates) ->
1587    Op = case Requeue of
1588             true -> requeue;
1589             false -> discard
1590         end,
1591    rabbit_queue_type:settle(QRef, Op, CTag, MsgIds, QStates).
1592
1593-spec notify_down_all(qpids(), pid()) -> ok_or_errors().
1594notify_down_all(QPids, ChPid) ->
1595    notify_down_all(QPids, ChPid, ?CHANNEL_OPERATION_TIMEOUT).
1596
1597-spec notify_down_all(qpids(), pid(), non_neg_integer()) ->
1598          ok_or_errors().
1599notify_down_all(QPids, ChPid, Timeout) ->
1600    case rpc:call(node(), delegate, invoke,
1601                  [QPids, {gen_server2, call, [{notify_down, ChPid}, infinity]}], Timeout) of
1602        {badrpc, timeout} -> {error, {channel_operation_timeout, Timeout}};
1603        {badrpc, Reason}  -> {error, Reason};
1604        {_, Bads} ->
1605            case lists:filter(
1606                   fun ({_Pid, {exit, {R, _}, _}}) ->
1607                           rabbit_misc:is_abnormal_exit(R);
1608                       ({_Pid, _})                 -> false
1609                   end, Bads) of
1610                []    -> ok;
1611                Bads1 -> {error, Bads1}
1612            end;
1613        Error         -> {error, Error}
1614    end.
1615
1616-spec activate_limit_all(qpids(), pid()) -> ok.
1617
1618activate_limit_all(QRefs, ChPid) ->
1619    QPids = [P || P <- QRefs, ?IS_CLASSIC(P)],
1620    delegate:invoke_no_result(QPids, {gen_server2, cast,
1621                                      [{activate_limit, ChPid}]}).
1622
1623-spec deactivate_limit_all(qpids(), pid()) -> ok.
1624
1625deactivate_limit_all(QRefs, ChPid) ->
1626    QPids = [P || P <- QRefs, ?IS_CLASSIC(P)],
1627    delegate:invoke_no_result(QPids, {gen_server2, cast,
1628                                      [{deactivate_limit, ChPid}]}).
1629
1630-spec credit(amqqueue:amqqueue(),
1631             rabbit_types:ctag(),
1632             non_neg_integer(),
1633             boolean(),
1634             rabbit_queue_type:state()) ->
1635    {ok, rabbit_queue_type:state(), rabbit_queue_type:actions()}.
1636credit(Q, CTag, Credit, Drain, QStates) ->
1637    rabbit_queue_type:credit(Q, CTag, Credit, Drain, QStates).
1638
1639-spec basic_get(amqqueue:amqqueue(), boolean(), pid(), rabbit_types:ctag(),
1640                rabbit_queue_type:state()) ->
1641          {'ok', non_neg_integer(), qmsg(), rabbit_queue_type:state()} |
1642          {'empty', rabbit_queue_type:state()} |
1643          {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
1644basic_get(Q, NoAck, LimiterPid, CTag, QStates0) ->
1645    rabbit_queue_type:dequeue(Q, NoAck, LimiterPid, CTag, QStates0).
1646
1647
1648-spec basic_consume(amqqueue:amqqueue(), boolean(), pid(), pid(), boolean(),
1649                    non_neg_integer(), rabbit_types:ctag(), boolean(),
1650                    rabbit_framing:amqp_table(), any(), rabbit_types:username(),
1651                    rabbit_queue_type:state()) ->
1652    {ok, rabbit_queue_type:state(), rabbit_queue_type:actions()} |
1653    {error, term()} |
1654    {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
1655basic_consume(Q, NoAck, ChPid, LimiterPid,
1656              LimiterActive, ConsumerPrefetchCount, ConsumerTag,
1657              ExclusiveConsume, Args, OkMsg, ActingUser, Contexts) ->
1658
1659    QName = amqqueue:get_name(Q),
1660    %% first phase argument validation
1661    %% each queue type may do further validations
1662    ok = check_consume_arguments(QName, Args),
1663    Spec = #{no_ack => NoAck,
1664             channel_pid => ChPid,
1665             limiter_pid => LimiterPid,
1666             limiter_active => LimiterActive,
1667             prefetch_count => ConsumerPrefetchCount,
1668             consumer_tag => ConsumerTag,
1669             exclusive_consume => ExclusiveConsume,
1670             args => Args,
1671             ok_msg => OkMsg,
1672             acting_user =>  ActingUser},
1673    rabbit_queue_type:consume(Q, Spec, Contexts).
1674
1675-spec basic_cancel(amqqueue:amqqueue(), rabbit_types:ctag(), any(),
1676                   rabbit_types:username(),
1677                   rabbit_queue_type:state()) ->
1678    {ok, rabbit_queue_type:state()} | {error, term()}.
1679basic_cancel(Q, ConsumerTag, OkMsg, ActingUser, QStates) ->
1680    rabbit_queue_type:cancel(Q, ConsumerTag,
1681                             OkMsg, ActingUser, QStates).
1682
1683-spec notify_decorators(amqqueue:amqqueue()) -> 'ok'.
1684
1685notify_decorators(Q) ->
1686    rabbit_queue_type:notify_decorators(Q).
1687
1688notify_sent(QPid, ChPid) ->
1689    rabbit_amqqueue_common:notify_sent(QPid, ChPid).
1690
1691notify_sent_queue_down(QPid) ->
1692    rabbit_amqqueue_common:notify_sent_queue_down(QPid).
1693
1694-spec resume(pid(), pid()) -> 'ok'.
1695
1696resume(QPid, ChPid) -> delegate:invoke_no_result(QPid, {gen_server2, cast,
1697                                                        [{resume, ChPid}]}).
1698
1699internal_delete1(QueueName, OnlyDurable) ->
1700    internal_delete1(QueueName, OnlyDurable, normal).
1701
1702internal_delete1(QueueName, OnlyDurable, Reason) ->
1703    ok = mnesia:delete({rabbit_queue, QueueName}),
1704    case Reason of
1705        auto_delete ->
1706            case mnesia:wread({rabbit_durable_queue, QueueName}) of
1707                []  -> ok;
1708                [_] -> ok = mnesia:delete({rabbit_durable_queue, QueueName})
1709            end;
1710        _ ->
1711            mnesia:delete({rabbit_durable_queue, QueueName})
1712    end,
1713    %% we want to execute some things, as decided by rabbit_exchange,
1714    %% after the transaction.
1715    rabbit_binding:remove_for_destination(QueueName, OnlyDurable).
1716
1717-spec internal_delete(name(), rabbit_types:username()) -> 'ok'.
1718
1719internal_delete(QueueName, ActingUser) ->
1720    internal_delete(QueueName, ActingUser, normal).
1721
1722internal_delete(QueueName, ActingUser, Reason) ->
1723    rabbit_misc:execute_mnesia_tx_with_tail(
1724      fun () ->
1725              case {mnesia:wread({rabbit_queue, QueueName}),
1726                    mnesia:wread({rabbit_durable_queue, QueueName})} of
1727                  {[], []} ->
1728                      rabbit_misc:const(ok);
1729                  _ ->
1730                      Deletions = internal_delete1(QueueName, false, Reason),
1731                      T = rabbit_binding:process_deletions(Deletions,
1732                                                           ?INTERNAL_USER),
1733                      fun() ->
1734                              ok = T(),
1735                              rabbit_core_metrics:queue_deleted(QueueName),
1736                              ok = rabbit_event:notify(queue_deleted,
1737                                                       [{name, QueueName},
1738                                                        {user_who_performed_action, ActingUser}])
1739                      end
1740              end
1741      end).
1742
1743-spec forget_all_durable(node()) -> 'ok'.
1744
1745forget_all_durable(Node) ->
1746    %% Note rabbit is not running so we avoid e.g. the worker pool. Also why
1747    %% we don't invoke the return from rabbit_binding:process_deletions/1.
1748    {atomic, ok} =
1749        mnesia:sync_transaction(
1750          fun () ->
1751                  Qs = mnesia:match_object(rabbit_durable_queue,
1752                                           amqqueue:pattern_match_all(), write),
1753                  [forget_node_for_queue(Node, Q) ||
1754                      Q <- Qs,
1755                      is_local_to_node(amqqueue:get_pid(Q), Node)],
1756                  ok
1757          end),
1758    ok.
1759
1760%% Try to promote a mirror while down - it should recover as a
1761%% master. We try to take the oldest mirror here for best chance of
1762%% recovery.
1763forget_node_for_queue(_DeadNode, Q)
1764  when ?amqqueue_is_quorum(Q) ->
1765    ok;
1766forget_node_for_queue(DeadNode, Q) ->
1767    RS = amqqueue:get_recoverable_slaves(Q),
1768    forget_node_for_queue(DeadNode, RS, Q).
1769
1770forget_node_for_queue(_DeadNode, [], Q) ->
1771    %% No mirrors to recover from, queue is gone.
1772    %% Don't process_deletions since that just calls callbacks and we
1773    %% are not really up.
1774    Name = amqqueue:get_name(Q),
1775    internal_delete1(Name, true);
1776
1777%% Should not happen, but let's be conservative.
1778forget_node_for_queue(DeadNode, [DeadNode | T], Q) ->
1779    forget_node_for_queue(DeadNode, T, Q);
1780
1781forget_node_for_queue(DeadNode, [H|T], Q) when ?is_amqqueue(Q) ->
1782    Type = amqqueue:get_type(Q),
1783    case {node_permits_offline_promotion(H), Type} of
1784        {false, _} -> forget_node_for_queue(DeadNode, T, Q);
1785        {true, rabbit_classic_queue} ->
1786            Q1 = amqqueue:set_pid(Q, rabbit_misc:node_to_fake_pid(H)),
1787            ok = mnesia:write(rabbit_durable_queue, Q1, write);
1788        {true, rabbit_quorum_queue} ->
1789            ok
1790    end.
1791
1792node_permits_offline_promotion(Node) ->
1793    case node() of
1794        Node -> not rabbit:is_running(); %% [1]
1795        _    -> All = rabbit_nodes:all(),
1796                Running = rabbit_nodes:all_running(),
1797                lists:member(Node, All) andalso
1798                    not lists:member(Node, Running) %% [2]
1799    end.
1800%% [1] In this case if we are a real running node (i.e. rabbitmqctl
1801%% has RPCed into us) then we cannot allow promotion. If on the other
1802%% hand we *are* rabbitmqctl impersonating the node for offline
1803%% node-forgetting then we can.
1804%%
1805%% [2] This is simpler; as long as it's down that's OK
1806
1807-spec run_backing_queue
1808        (pid(), atom(), (fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) ->
1809            'ok'.
1810
1811run_backing_queue(QPid, Mod, Fun) ->
1812    gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}).
1813
1814-spec set_ram_duration_target(pid(), number() | 'infinity') -> 'ok'.
1815
1816set_ram_duration_target(QPid, Duration) ->
1817    gen_server2:cast(QPid, {set_ram_duration_target, Duration}).
1818
1819-spec set_maximum_since_use(pid(), non_neg_integer()) -> 'ok'.
1820
1821set_maximum_since_use(QPid, Age) ->
1822    gen_server2:cast(QPid, {set_maximum_since_use, Age}).
1823
1824-spec update_mirroring(pid()) -> 'ok'.
1825
1826update_mirroring(QPid) ->
1827    ok = delegate:invoke_no_result(QPid, {gen_server2, cast, [update_mirroring]}).
1828
1829-spec sync_mirrors(amqqueue:amqqueue() | pid()) ->
1830          'ok' | rabbit_types:error('not_mirrored').
1831
1832sync_mirrors(Q) when ?is_amqqueue(Q) ->
1833    QPid = amqqueue:get_pid(Q),
1834    delegate:invoke(QPid, {gen_server2, call, [sync_mirrors, infinity]});
1835sync_mirrors(QPid) ->
1836    delegate:invoke(QPid, {gen_server2, call, [sync_mirrors, infinity]}).
1837
1838-spec cancel_sync_mirrors(amqqueue:amqqueue() | pid()) ->
1839          'ok' | {'ok', 'not_syncing'}.
1840
1841cancel_sync_mirrors(Q) when ?is_amqqueue(Q) ->
1842    QPid = amqqueue:get_pid(Q),
1843    delegate:invoke(QPid, {gen_server2, call, [cancel_sync_mirrors, infinity]});
1844cancel_sync_mirrors(QPid) ->
1845    delegate:invoke(QPid, {gen_server2, call, [cancel_sync_mirrors, infinity]}).
1846
1847-spec is_replicated(amqqueue:amqqueue()) -> boolean().
1848
1849is_replicated(Q) when ?amqqueue_is_classic(Q) ->
1850    rabbit_mirror_queue_misc:is_mirrored(Q);
1851is_replicated(_Q) ->
1852    %% streams and quorum queues are all replicated
1853    true.
1854
1855is_exclusive(Q) when ?amqqueue_exclusive_owner_is(Q, none) ->
1856    false;
1857is_exclusive(Q) when ?amqqueue_exclusive_owner_is_pid(Q) ->
1858    true.
1859
1860is_not_exclusive(Q) ->
1861    not is_exclusive(Q).
1862
1863is_dead_exclusive(Q) when ?amqqueue_exclusive_owner_is(Q, none) ->
1864    false;
1865is_dead_exclusive(Q) when ?amqqueue_exclusive_owner_is_pid(Q) ->
1866    Pid = amqqueue:get_pid(Q),
1867    not rabbit_mnesia:is_process_alive(Pid).
1868
1869-spec has_synchronised_mirrors_online(amqqueue:amqqueue()) -> boolean().
1870has_synchronised_mirrors_online(Q) ->
1871    %% a queue with all mirrors down would have no mirror pids.
1872    %% We treat these as in sync intentionally to avoid false positives.
1873    MirrorPids = amqqueue:get_sync_slave_pids(Q),
1874    MirrorPids =/= [] andalso lists:any(fun rabbit_misc:is_process_alive/1, MirrorPids).
1875
1876-spec on_node_up(node()) -> 'ok'.
1877
1878on_node_up(Node) ->
1879    ok = rabbit_misc:execute_mnesia_transaction(
1880           fun () ->
1881                   Qs = mnesia:match_object(rabbit_queue,
1882                                            amqqueue:pattern_match_all(), write),
1883                   [maybe_clear_recoverable_node(Node, Q) || Q <- Qs],
1884                   ok
1885           end).
1886
1887maybe_clear_recoverable_node(Node, Q) ->
1888    SPids = amqqueue:get_sync_slave_pids(Q),
1889    RSs = amqqueue:get_recoverable_slaves(Q),
1890    case lists:member(Node, RSs) of
1891        true  ->
1892            %% There is a race with
1893            %% rabbit_mirror_queue_slave:record_synchronised/1 called
1894            %% by the incoming mirror node and this function, called
1895            %% by the master node. If this function is executed after
1896            %% record_synchronised/1, the node is erroneously removed
1897            %% from the recoverable mirrors list.
1898            %%
1899            %% We check if the mirror node's queue PID is alive. If it is
1900            %% the case, then this function is executed after. In this
1901            %% situation, we don't touch the queue record, it is already
1902            %% correct.
1903            DoClearNode =
1904                case [SP || SP <- SPids, node(SP) =:= Node] of
1905                    [SPid] -> not rabbit_misc:is_process_alive(SPid);
1906                    _      -> true
1907                end,
1908            if
1909                DoClearNode -> RSs1 = RSs -- [Node],
1910                               store_queue(
1911                                 amqqueue:set_recoverable_slaves(Q, RSs1));
1912                true        -> ok
1913            end;
1914        false ->
1915            ok
1916    end.
1917
1918-spec on_node_down(node()) -> 'ok'.
1919
1920on_node_down(Node) ->
1921    {Time, {QueueNames, QueueDeletions}} = timer:tc(fun() -> delete_queues_on_node_down(Node) end),
1922    case length(QueueNames) of
1923        0 -> ok;
1924        _ -> rabbit_log:info("~p transient queues from an old incarnation of node ~p deleted in ~fs", [length(QueueNames), Node, Time/1000000])
1925    end,
1926    notify_queue_binding_deletions(QueueDeletions),
1927    rabbit_core_metrics:queues_deleted(QueueNames),
1928    notify_queues_deleted(QueueNames),
1929    ok.
1930
1931delete_queues_on_node_down(Node) ->
1932    lists:unzip(lists:flatten([
1933        rabbit_misc:execute_mnesia_transaction(
1934          fun () -> [{Queue, delete_queue(Queue)} || Queue <- Queues] end
1935        ) || Queues <- partition_queues(queues_to_delete_when_node_down(Node))
1936    ])).
1937
1938delete_queue(QueueName) ->
1939    ok = mnesia:delete({rabbit_queue, QueueName}),
1940    rabbit_binding:remove_transient_for_destination(QueueName).
1941
1942% If there are many queues and we delete them all in a single Mnesia transaction,
1943% this can block all other Mnesia operations for a really long time.
1944% In situations where a node wants to (re-)join a cluster,
1945% Mnesia won't be able to sync on the new node until this operation finishes.
1946% As a result, we want to have multiple Mnesia transactions so that other
1947% operations can make progress in between these queue delete transactions.
1948%
1949% 10 queues per Mnesia transaction is an arbitrary number, but it seems to work OK with 50k queues per node.
1950partition_queues([Q0,Q1,Q2,Q3,Q4,Q5,Q6,Q7,Q8,Q9 | T]) ->
1951    [[Q0,Q1,Q2,Q3,Q4,Q5,Q6,Q7,Q8,Q9] | partition_queues(T)];
1952partition_queues(T) ->
1953    [T].
1954
1955queues_to_delete_when_node_down(NodeDown) ->
1956    rabbit_misc:execute_mnesia_transaction(fun () ->
1957        qlc:e(qlc:q([amqqueue:get_name(Q) ||
1958            Q <- mnesia:table(rabbit_queue),
1959                amqqueue:qnode(Q) == NodeDown andalso
1960                not rabbit_mnesia:is_process_alive(amqqueue:get_pid(Q)) andalso
1961                (not rabbit_amqqueue:is_replicated(Q) orelse
1962                rabbit_amqqueue:is_dead_exclusive(Q))]
1963        ))
1964    end).
1965
1966notify_queue_binding_deletions(QueueDeletions) ->
1967    rabbit_misc:execute_mnesia_tx_with_tail(
1968        fun() ->
1969            rabbit_binding:process_deletions(
1970                lists:foldl(
1971                    fun rabbit_binding:combine_deletions/2,
1972                    rabbit_binding:new_deletions(),
1973                    QueueDeletions
1974                ),
1975                ?INTERNAL_USER
1976            )
1977        end
1978    ).
1979
1980notify_queues_deleted(QueueDeletions) ->
1981    lists:foreach(
1982      fun(Queue) ->
1983              ok = rabbit_event:notify(queue_deleted,
1984                                       [{name, Queue},
1985                                        {user, ?INTERNAL_USER}])
1986      end,
1987      QueueDeletions).
1988
1989-spec pseudo_queue(name(), pid()) -> amqqueue:amqqueue().
1990
1991pseudo_queue(QueueName, Pid) ->
1992    pseudo_queue(QueueName, Pid, false).
1993
1994-spec pseudo_queue(name(), pid(), boolean()) -> amqqueue:amqqueue().
1995
1996pseudo_queue(#resource{kind = queue} = QueueName, Pid, Durable)
1997  when is_pid(Pid) andalso
1998       is_boolean(Durable) ->
1999    amqqueue:new(QueueName,
2000                 Pid,
2001                 Durable,
2002                 false,
2003                 none, % Owner,
2004                 [],
2005                 undefined, % VHost,
2006                 #{user => undefined}, % ActingUser
2007                 rabbit_classic_queue % Type
2008                ).
2009
2010-spec immutable(amqqueue:amqqueue()) -> amqqueue:amqqueue().
2011
2012immutable(Q) -> amqqueue:set_immutable(Q).
2013
2014-spec deliver([amqqueue:amqqueue()], rabbit_types:delivery()) -> 'ok'.
2015
2016deliver(Qs, Delivery) ->
2017    _ = rabbit_queue_type:deliver(Qs, Delivery, stateless),
2018    ok.
2019
2020get_quorum_nodes(Q) ->
2021    case amqqueue:get_type_state(Q) of
2022        #{nodes := Nodes} ->
2023            Nodes;
2024        _ ->
2025            []
2026    end.
2027