1%% This Source Code Form is subject to the terms of the Mozilla Public 2%% License, v. 2.0. If a copy of the MPL was not distributed with this 3%% file, You can obtain one at https://mozilla.org/MPL/2.0/. 4%% 5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved. 6%% 7 8-module(rabbit_amqqueue). 9 10-export([store_queue_ram_dirty/1]). 11-export([warn_file_limit/0]). 12-export([recover/1, stop/1, start/1, declare/6, declare/7, 13 delete_immediately/1, delete_exclusive/2, delete/4, purge/1, 14 forget_all_durable/1]). 15-export([pseudo_queue/2, pseudo_queue/3, immutable/1]). 16-export([lookup/1, lookup_many/1, not_found_or_absent/1, not_found_or_absent_dirty/1, 17 with/2, with/3, with_or_die/2, 18 assert_equivalence/5, 19 check_exclusive_access/2, with_exclusive_access_or_die/3, 20 stat/1, deliver/2, 21 requeue/3, ack/3, reject/4]). 22-export([not_found/1, absent/2]). 23-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2, 24 emit_info_all/5, list_local/1, info_local/1, 25 emit_info_local/4, emit_info_down/4]). 26-export([count/0]). 27-export([list_down/1, count/1, list_names/0, list_names/1, list_local_names/0, 28 list_local_names_down/0, list_with_possible_retry/1]). 29-export([list_by_type/1, sample_local_queues/0, sample_n_by_name/2, sample_n/2]). 30-export([force_event_refresh/1, notify_policy_changed/1]). 31-export([consumers/1, consumers_all/1, emit_consumers_all/4, consumer_info_keys/0]). 32-export([basic_get/5, basic_consume/12, basic_cancel/5, notify_decorators/1]). 33-export([notify_sent/2, notify_sent_queue_down/1, resume/2]). 34-export([notify_down_all/2, notify_down_all/3, activate_limit_all/2, credit/5]). 35-export([on_node_up/1, on_node_down/1]). 36-export([update/2, store_queue/1, update_decorators/1, policy_changed/2]). 37-export([update_mirroring/1, sync_mirrors/1, cancel_sync_mirrors/1]). 38-export([emit_unresponsive/6, emit_unresponsive_local/5, is_unresponsive/2]). 39-export([has_synchronised_mirrors_online/1, is_match/2, is_in_virtual_host/2]). 40-export([is_replicated/1, is_exclusive/1, is_not_exclusive/1, is_dead_exclusive/1]). 41-export([list_local_quorum_queues/0, list_local_quorum_queue_names/0, list_local_stream_queues/0, 42 list_local_mirrored_classic_queues/0, list_local_mirrored_classic_names/0, 43 list_local_leaders/0, list_local_followers/0, get_quorum_nodes/1, 44 list_local_mirrored_classic_without_synchronised_mirrors/0, 45 list_local_mirrored_classic_without_synchronised_mirrors_for_cli/0, 46 list_local_quorum_queues_with_name_matching/1, 47 list_local_quorum_queues_with_name_matching/2]). 48-export([ensure_rabbit_queue_record_is_initialized/1]). 49-export([format/1]). 50-export([delete_immediately_by_resource/1]). 51-export([delete_crashed/1, 52 delete_crashed/2, 53 delete_crashed_internal/2]). 54 55-export([pid_of/1, pid_of/2]). 56-export([mark_local_durable_queues_stopped/1]). 57 58-export([rebalance/3]). 59-export([collect_info_all/2]). 60 61-export([is_policy_applicable/2, declare_args/0, consume_args/0]). 62-export([is_server_named_allowed/1]). 63 64-export([check_max_age/1]). 65-export([get_queue_type/1, get_resource_vhost_name/1, get_resource_name/1]). 66 67-export([deactivate_limit_all/2]). 68 69%% internal 70-export([internal_declare/2, internal_delete/2, run_backing_queue/3, 71 set_ram_duration_target/2, set_maximum_since_use/2, 72 emit_consumers_local/3, internal_delete/3]). 73 74-include_lib("rabbit_common/include/rabbit.hrl"). 75-include_lib("stdlib/include/qlc.hrl"). 76-include("amqqueue.hrl"). 77 78-define(INTEGER_ARG_TYPES, [byte, short, signedint, long, 79 unsignedbyte, unsignedshort, unsignedint]). 80 81-define(IS_CLASSIC(QPid), is_pid(QPid)). 82-define(IS_QUORUM(QPid), is_tuple(QPid)). 83%%---------------------------------------------------------------------------- 84 85-export_type([name/0, qmsg/0, absent_reason/0]). 86 87-type name() :: rabbit_types:r('queue'). 88 89-type qpids() :: [pid()]. 90-type qlen() :: rabbit_types:ok(non_neg_integer()). 91-type qfun(A) :: fun ((amqqueue:amqqueue()) -> A | no_return()). 92-type qmsg() :: {name(), pid() | {atom(), pid()}, msg_id(), 93 boolean(), rabbit_types:message()}. 94-type msg_id() :: non_neg_integer(). 95-type ok_or_errors() :: 96 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}. 97-type absent_reason() :: 'nodedown' | 'crashed' | stopped | timeout. 98-type queue_not_found() :: not_found. 99-type queue_absent() :: {'absent', amqqueue:amqqueue(), absent_reason()}. 100-type not_found_or_absent() :: queue_not_found() | queue_absent(). 101 102%%---------------------------------------------------------------------------- 103 104-define(CONSUMER_INFO_KEYS, 105 [queue_name, channel_pid, consumer_tag, ack_required, prefetch_count, 106 active, activity_status, arguments]). 107 108warn_file_limit() -> 109 DurableQueues = find_recoverable_queues(), 110 L = length(DurableQueues), 111 112 %% if there are not enough file handles, the server might hang 113 %% when trying to recover queues, warn the user: 114 case file_handle_cache:get_limit() < L of 115 true -> 116 rabbit_log:warning( 117 "Recovering ~p queues, available file handles: ~p. Please increase max open file handles limit to at least ~p!", 118 [L, file_handle_cache:get_limit(), L]); 119 false -> 120 ok 121 end. 122 123-spec recover(rabbit_types:vhost()) -> 124 {Recovered :: [amqqueue:amqqueue()], 125 Failed :: [amqqueue:amqqueue()]}. 126recover(VHost) -> 127 AllDurable = find_local_durable_queues(VHost), 128 rabbit_queue_type:recover(VHost, AllDurable). 129 130filter_pid_per_type(QPids) -> 131 lists:partition(fun(QPid) -> ?IS_CLASSIC(QPid) end, QPids). 132 133filter_resource_per_type(Resources) -> 134 Queues = [begin 135 {ok, Q} = lookup(Resource), 136 QPid = amqqueue:get_pid(Q), 137 {Resource, QPid} 138 end || Resource <- Resources], 139 lists:partition(fun({_Resource, QPid}) -> ?IS_CLASSIC(QPid) end, Queues). 140 141-spec stop(rabbit_types:vhost()) -> 'ok'. 142stop(VHost) -> 143 %% Classic queues 144 ok = rabbit_amqqueue_sup_sup:stop_for_vhost(VHost), 145 {ok, BQ} = application:get_env(rabbit, backing_queue_module), 146 ok = BQ:stop(VHost), 147 rabbit_quorum_queue:stop(VHost). 148 149-spec start([amqqueue:amqqueue()]) -> 'ok'. 150 151start(Qs) -> 152 %% At this point all recovered queues and their bindings are 153 %% visible to routing, so now it is safe for them to complete 154 %% their initialisation (which may involve interacting with other 155 %% queues). 156 _ = [amqqueue:get_pid(Q) ! {self(), go} 157 || Q <- Qs, 158 %% All queues are supposed to be classic here. 159 amqqueue:is_classic(Q)], 160 ok. 161 162mark_local_durable_queues_stopped(VHost) -> 163 ?try_mnesia_tx_or_upgrade_amqqueue_and_retry( 164 do_mark_local_durable_queues_stopped(VHost), 165 do_mark_local_durable_queues_stopped(VHost)). 166 167do_mark_local_durable_queues_stopped(VHost) -> 168 Qs = find_local_durable_queues(VHost), 169 rabbit_misc:execute_mnesia_transaction( 170 fun() -> 171 [ store_queue(amqqueue:set_state(Q, stopped)) 172 || Q <- Qs, amqqueue:get_type(Q) =:= rabbit_classic_queue, 173 amqqueue:get_state(Q) =/= stopped ] 174 end). 175 176find_local_durable_queues(VHost) -> 177 mnesia:async_dirty( 178 fun () -> 179 qlc:e( 180 qlc:q( 181 [Q || Q <- mnesia:table(rabbit_durable_queue), 182 amqqueue:get_vhost(Q) =:= VHost andalso 183 rabbit_queue_type:is_recoverable(Q) 184 ])) 185 end). 186 187find_recoverable_queues() -> 188 mnesia:async_dirty( 189 fun () -> 190 qlc:e(qlc:q([Q || Q <- mnesia:table(rabbit_durable_queue), 191 rabbit_queue_type:is_recoverable(Q)])) 192 end). 193 194-spec declare(name(), 195 boolean(), 196 boolean(), 197 rabbit_framing:amqp_table(), 198 rabbit_types:maybe(pid()), 199 rabbit_types:username()) -> 200 {'new' | 'existing' | 'owner_died', amqqueue:amqqueue()} | 201 {'new', amqqueue:amqqueue(), rabbit_fifo_client:state()} | 202 {'absent', amqqueue:amqqueue(), absent_reason()} | 203 {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}. 204declare(QueueName, Durable, AutoDelete, Args, Owner, ActingUser) -> 205 declare(QueueName, Durable, AutoDelete, Args, Owner, ActingUser, node()). 206 207 208%% The Node argument suggests where the queue (master if mirrored) 209%% should be. Note that in some cases (e.g. with "nodes" policy in 210%% effect) this might not be possible to satisfy. 211 212-spec declare(name(), 213 boolean(), 214 boolean(), 215 rabbit_framing:amqp_table(), 216 rabbit_types:maybe(pid()), 217 rabbit_types:username(), 218 node()) -> 219 {'new' | 'existing' | 'owner_died', amqqueue:amqqueue()} | 220 {'absent', amqqueue:amqqueue(), absent_reason()} | 221 {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}. 222declare(QueueName = #resource{virtual_host = VHost}, Durable, AutoDelete, Args, 223 Owner, ActingUser, Node) -> 224 ok = check_declare_arguments(QueueName, Args), 225 Type = get_queue_type(Args), 226 case rabbit_queue_type:is_enabled(Type) of 227 true -> 228 Q = amqqueue:new(QueueName, 229 none, 230 Durable, 231 AutoDelete, 232 Owner, 233 Args, 234 VHost, 235 #{user => ActingUser}, 236 Type), 237 rabbit_queue_type:declare(Q, Node); 238 false -> 239 {protocol_error, internal_error, 240 "Cannot declare a queue '~s' of type '~s' on node '~s': " 241 "the corresponding feature flag is disabled", 242 [rabbit_misc:rs(QueueName), Type, Node]} 243 end. 244 245get_queue_type(Args) -> 246 case rabbit_misc:table_lookup(Args, <<"x-queue-type">>) of 247 undefined -> 248 rabbit_queue_type:default(); 249 {_, V} -> 250 rabbit_queue_type:discover(V) 251 end. 252 253-spec internal_declare(amqqueue:amqqueue(), boolean()) -> 254 {created | existing, amqqueue:amqqueue()} | queue_absent(). 255 256internal_declare(Q, Recover) -> 257 ?try_mnesia_tx_or_upgrade_amqqueue_and_retry( 258 do_internal_declare(Q, Recover), 259 begin 260 Q1 = amqqueue:upgrade(Q), 261 do_internal_declare(Q1, Recover) 262 end). 263 264do_internal_declare(Q, true) -> 265 rabbit_misc:execute_mnesia_tx_with_tail( 266 fun () -> 267 ok = store_queue(amqqueue:set_state(Q, live)), 268 rabbit_misc:const({created, Q}) 269 end); 270do_internal_declare(Q, false) -> 271 QueueName = amqqueue:get_name(Q), 272 rabbit_misc:execute_mnesia_tx_with_tail( 273 fun () -> 274 case mnesia:wread({rabbit_queue, QueueName}) of 275 [] -> 276 case not_found_or_absent(QueueName) of 277 not_found -> Q1 = rabbit_policy:set(Q), 278 Q2 = amqqueue:set_state(Q1, live), 279 ok = store_queue(Q2), 280 fun () -> {created, Q2} end; 281 {absent, _Q, _} = R -> rabbit_misc:const(R) 282 end; 283 [ExistingQ] -> 284 rabbit_misc:const({existing, ExistingQ}) 285 end 286 end). 287 288-spec update 289 (name(), fun((amqqueue:amqqueue()) -> amqqueue:amqqueue())) -> 290 'not_found' | amqqueue:amqqueue(). 291 292update(Name, Fun) -> 293 case mnesia:wread({rabbit_queue, Name}) of 294 [Q] -> 295 Durable = amqqueue:is_durable(Q), 296 Q1 = Fun(Q), 297 ok = mnesia:write(rabbit_queue, Q1, write), 298 case Durable of 299 true -> ok = mnesia:write(rabbit_durable_queue, Q1, write); 300 _ -> ok 301 end, 302 Q1; 303 [] -> 304 not_found 305 end. 306 307%% only really used for quorum queues to ensure the rabbit_queue record 308%% is initialised 309ensure_rabbit_queue_record_is_initialized(Q) -> 310 ?try_mnesia_tx_or_upgrade_amqqueue_and_retry( 311 do_ensure_rabbit_queue_record_is_initialized(Q), 312 begin 313 Q1 = amqqueue:upgrade(Q), 314 do_ensure_rabbit_queue_record_is_initialized(Q1) 315 end). 316 317do_ensure_rabbit_queue_record_is_initialized(Q) -> 318 rabbit_misc:execute_mnesia_tx_with_tail( 319 fun () -> 320 ok = store_queue(Q), 321 rabbit_misc:const({ok, Q}) 322 end). 323 324-spec store_queue(amqqueue:amqqueue()) -> 'ok'. 325 326store_queue(Q) when ?amqqueue_is_durable(Q) -> 327 Q1 = amqqueue:reset_mirroring_and_decorators(Q), 328 ok = mnesia:write(rabbit_durable_queue, Q1, write), 329 store_queue_ram(Q); 330store_queue(Q) when not ?amqqueue_is_durable(Q) -> 331 store_queue_ram(Q). 332 333store_queue_ram(Q) -> 334 ok = mnesia:write(rabbit_queue, rabbit_queue_decorator:set(Q), write). 335 336store_queue_ram_dirty(Q) -> 337 ok = mnesia:dirty_write(rabbit_queue, rabbit_queue_decorator:set(Q)). 338 339-spec update_decorators(name()) -> 'ok'. 340 341update_decorators(Name) -> 342 rabbit_misc:execute_mnesia_transaction( 343 fun() -> 344 case mnesia:wread({rabbit_queue, Name}) of 345 [Q] -> store_queue_ram(Q), 346 ok; 347 [] -> ok 348 end 349 end). 350 351-spec policy_changed(amqqueue:amqqueue(), amqqueue:amqqueue()) -> 352 'ok'. 353 354policy_changed(Q1, Q2) -> 355 Decorators1 = amqqueue:get_decorators(Q1), 356 Decorators2 = amqqueue:get_decorators(Q2), 357 rabbit_mirror_queue_misc:update_mirrors(Q1, Q2), 358 D1 = rabbit_queue_decorator:select(Decorators1), 359 D2 = rabbit_queue_decorator:select(Decorators2), 360 [ok = M:policy_changed(Q1, Q2) || M <- lists:usort(D1 ++ D2)], 361 %% Make sure we emit a stats event even if nothing 362 %% mirroring-related has changed - the policy may have changed anyway. 363 notify_policy_changed(Q2). 364 365is_policy_applicable(QName, Policy) -> 366 case lookup(QName) of 367 {ok, Q} -> 368 rabbit_queue_type:is_policy_applicable(Q, Policy); 369 _ -> 370 %% Defaults to previous behaviour. Apply always 371 true 372 end. 373 374is_server_named_allowed(Args) -> 375 Type = get_queue_type(Args), 376 rabbit_queue_type:is_server_named_allowed(Type). 377 378-spec lookup 379 (name()) -> 380 rabbit_types:ok(amqqueue:amqqueue()) | 381 rabbit_types:error('not_found'); 382 ([name()]) -> 383 [amqqueue:amqqueue()]. 384 385lookup([]) -> []; %% optimisation 386lookup([Name]) -> ets:lookup(rabbit_queue, Name); %% optimisation 387lookup(Names) when is_list(Names) -> 388 %% Normally we'd call mnesia:dirty_read/1 here, but that is quite 389 %% expensive for reasons explained in rabbit_misc:dirty_read/1. 390 lists:append([ets:lookup(rabbit_queue, Name) || Name <- Names]); 391lookup(Name) -> 392 rabbit_misc:dirty_read({rabbit_queue, Name}). 393 394-spec lookup_many ([name()]) -> [amqqueue:amqqueue()]. 395 396lookup_many(Names) when is_list(Names) -> 397 lookup(Names). 398 399-spec not_found_or_absent(name()) -> not_found_or_absent(). 400 401not_found_or_absent(Name) -> 402 %% NB: we assume that the caller has already performed a lookup on 403 %% rabbit_queue and not found anything 404 case mnesia:read({rabbit_durable_queue, Name}) of 405 [] -> not_found; 406 [Q] -> {absent, Q, nodedown} %% Q exists on stopped node 407 end. 408 409-spec not_found_or_absent_dirty(name()) -> not_found_or_absent(). 410 411not_found_or_absent_dirty(Name) -> 412 %% We should read from both tables inside a tx, to get a 413 %% consistent view. But the chances of an inconsistency are small, 414 %% and only affect the error kind. 415 case rabbit_misc:dirty_read({rabbit_durable_queue, Name}) of 416 {error, not_found} -> not_found; 417 {ok, Q} -> {absent, Q, nodedown} 418 end. 419 420-spec get_rebalance_lock(pid()) -> 421 {true, {rebalance_queues, pid()}} | false. 422get_rebalance_lock(Pid) when is_pid(Pid) -> 423 Id = {rebalance_queues, Pid}, 424 Nodes = [node()|nodes()], 425 %% Note that we're not re-trying. We want to immediately know 426 %% if a re-balance is taking place and stop accordingly. 427 case global:set_lock(Id, Nodes, 0) of 428 true -> 429 {true, Id}; 430 false -> 431 false 432 end. 433 434-spec rebalance('all' | 'quorum' | 'classic', binary(), binary()) -> 435 {ok, [{node(), pos_integer()}]} | {error, term()}. 436rebalance(Type, VhostSpec, QueueSpec) -> 437 %% We have not yet acquired the rebalance_queues global lock. 438 maybe_rebalance(get_rebalance_lock(self()), Type, VhostSpec, QueueSpec). 439 440maybe_rebalance({true, Id}, Type, VhostSpec, QueueSpec) -> 441 rabbit_log:info("Starting queue rebalance operation: '~s' for vhosts matching '~s' and queues matching '~s'", 442 [Type, VhostSpec, QueueSpec]), 443 Running = rabbit_maintenance:filter_out_drained_nodes_consistent_read(rabbit_nodes:all_running()), 444 NumRunning = length(Running), 445 ToRebalance = [Q || Q <- rabbit_amqqueue:list(), 446 filter_per_type(Type, Q), 447 is_replicated(Q), 448 is_match(amqqueue:get_vhost(Q), VhostSpec) andalso 449 is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec)], 450 NumToRebalance = length(ToRebalance), 451 ByNode = group_by_node(ToRebalance), 452 Rem = case (NumToRebalance rem NumRunning) of 453 0 -> 0; 454 _ -> 1 455 end, 456 MaxQueuesDesired = (NumToRebalance div NumRunning) + Rem, 457 Result = iterative_rebalance(ByNode, MaxQueuesDesired), 458 global:del_lock(Id), 459 rabbit_log:info("Finished queue rebalance operation"), 460 Result; 461maybe_rebalance(false, _Type, _VhostSpec, _QueueSpec) -> 462 rabbit_log:warning("Queue rebalance operation is in progress, please wait."), 463 {error, rebalance_in_progress}. 464 465%% Stream queues don't yet support rebalance 466filter_per_type(all, Q) -> 467 ?amqqueue_is_quorum(Q) or ?amqqueue_is_classic(Q); 468filter_per_type(quorum, Q) -> 469 ?amqqueue_is_quorum(Q); 470filter_per_type(classic, Q) -> 471 ?amqqueue_is_classic(Q). 472 473rebalance_module(Q) when ?amqqueue_is_quorum(Q) -> 474 rabbit_quorum_queue; 475rebalance_module(Q) when ?amqqueue_is_classic(Q) -> 476 rabbit_mirror_queue_misc. 477 478get_resource_name(#resource{name = Name}) -> 479 Name. 480 481get_resource_vhost_name(#resource{virtual_host = VHostName}) -> 482 VHostName. 483 484is_match(Subj, RegEx) -> 485 nomatch /= re:run(Subj, RegEx). 486 487iterative_rebalance(ByNode, MaxQueuesDesired) -> 488 case maybe_migrate(ByNode, MaxQueuesDesired) of 489 {ok, Summary} -> 490 rabbit_log:info("All queue masters are balanced"), 491 {ok, Summary}; 492 {migrated, Other} -> 493 iterative_rebalance(Other, MaxQueuesDesired); 494 {not_migrated, Other} -> 495 iterative_rebalance(Other, MaxQueuesDesired) 496 end. 497 498maybe_migrate(ByNode, MaxQueuesDesired) -> 499 maybe_migrate(ByNode, MaxQueuesDesired, maps:keys(ByNode)). 500 501maybe_migrate(ByNode, _, []) -> 502 {ok, maps:fold(fun(K, V, Acc) -> 503 {CQs, QQs} = lists:partition(fun({_, Q, _}) -> 504 ?amqqueue_is_classic(Q) 505 end, V), 506 [[{<<"Node name">>, K}, {<<"Number of quorum queues">>, length(QQs)}, 507 {<<"Number of replicated classic queues">>, length(CQs)}] | Acc] 508 end, [], ByNode)}; 509maybe_migrate(ByNode, MaxQueuesDesired, [N | Nodes]) -> 510 case maps:get(N, ByNode, []) of 511 [{_, Q, false} = Queue | Queues] = All when length(All) > MaxQueuesDesired -> 512 Name = amqqueue:get_name(Q), 513 Module = rebalance_module(Q), 514 Candidates = rabbit_maintenance:filter_out_drained_nodes_local_read(Module:get_replicas(Q) -- [N]), 515 case Candidates of 516 [] -> 517 {not_migrated, update_not_migrated_queue(N, Queue, Queues, ByNode)}; 518 _ -> 519 [{Length, Destination} | _] = sort_by_number_of_queues(Candidates, ByNode), 520 rabbit_log:info("Migrating queue ~p from node ~p with ~p queues to node ~p with ~p queues", 521 [Name, N, length(All), Destination, Length]), 522 case Module:transfer_leadership(Q, Destination) of 523 {migrated, NewNode} -> 524 rabbit_log:info("Queue ~p migrated to ~p", [Name, NewNode]), 525 {migrated, update_migrated_queue(Destination, N, Queue, Queues, ByNode)}; 526 {not_migrated, Reason} -> 527 rabbit_log:warning("Error migrating queue ~p: ~p", [Name, Reason]), 528 {not_migrated, update_not_migrated_queue(N, Queue, Queues, ByNode)} 529 end 530 end; 531 [{_, _, true} | _] = All when length(All) > MaxQueuesDesired -> 532 rabbit_log:warning("Node ~p contains ~p queues, but all have already migrated. " 533 "Do nothing", [N, length(All)]), 534 maybe_migrate(ByNode, MaxQueuesDesired, Nodes); 535 All -> 536 rabbit_log:debug("Node ~p only contains ~p queues, do nothing", 537 [N, length(All)]), 538 maybe_migrate(ByNode, MaxQueuesDesired, Nodes) 539 end. 540 541update_not_migrated_queue(N, {Entries, Q, _}, Queues, ByNode) -> 542 maps:update(N, Queues ++ [{Entries, Q, true}], ByNode). 543 544update_migrated_queue(NewNode, OldNode, {Entries, Q, _}, Queues, ByNode) -> 545 maps:update_with(NewNode, 546 fun(L) -> L ++ [{Entries, Q, true}] end, 547 [{Entries, Q, true}], maps:update(OldNode, Queues, ByNode)). 548 549sort_by_number_of_queues(Nodes, ByNode) -> 550 lists:keysort(1, 551 lists:map(fun(Node) -> 552 {num_queues(Node, ByNode), Node} 553 end, Nodes)). 554 555num_queues(Node, ByNode) -> 556 length(maps:get(Node, ByNode, [])). 557 558group_by_node(Queues) -> 559 ByNode = lists:foldl(fun(Q, Acc) -> 560 Module = rebalance_module(Q), 561 Length = Module:queue_length(Q), 562 maps:update_with(amqqueue:qnode(Q), 563 fun(L) -> [{Length, Q, false} | L] end, 564 [{Length, Q, false}], Acc) 565 end, #{}, Queues), 566 maps:map(fun(_K, V) -> lists:keysort(1, V) end, ByNode). 567 568-spec with(name(), 569 qfun(A), 570 fun((not_found_or_absent()) -> rabbit_types:channel_exit())) -> 571 A | rabbit_types:channel_exit(). 572 573with(Name, F, E) -> 574 with(Name, F, E, 2000). 575 576with(#resource{} = Name, F, E, RetriesLeft) -> 577 case lookup(Name) of 578 {ok, Q} when ?amqqueue_state_is(Q, live) andalso RetriesLeft =:= 0 -> 579 %% Something bad happened to that queue, we are bailing out 580 %% on processing current request. 581 E({absent, Q, timeout}); 582 {ok, Q} when ?amqqueue_state_is(Q, stopped) andalso RetriesLeft =:= 0 -> 583 %% The queue was stopped and not migrated 584 E({absent, Q, stopped}); 585 %% The queue process has crashed with unknown error 586 {ok, Q} when ?amqqueue_state_is(Q, crashed) -> 587 E({absent, Q, crashed}); 588 %% The queue process has been stopped by a supervisor. 589 %% In that case a synchronised mirror can take over 590 %% so we should retry. 591 {ok, Q} when ?amqqueue_state_is(Q, stopped) -> 592 %% The queue process was stopped by the supervisor 593 rabbit_misc:with_exit_handler( 594 fun () -> retry_wait(Q, F, E, RetriesLeft) end, 595 fun () -> F(Q) end); 596 %% The queue is supposed to be active. 597 %% The master node can go away or queue can be killed 598 %% so we retry, waiting for a mirror to take over. 599 {ok, Q} when ?amqqueue_state_is(Q, live) -> 600 %% We check is_process_alive(QPid) in case we receive a 601 %% nodedown (for example) in F() that has nothing to do 602 %% with the QPid. F() should be written s.t. that this 603 %% cannot happen, so we bail if it does since that 604 %% indicates a code bug and we don't want to get stuck in 605 %% the retry loop. 606 rabbit_misc:with_exit_handler( 607 fun () -> retry_wait(Q, F, E, RetriesLeft) end, 608 fun () -> F(Q) end); 609 {error, not_found} -> 610 E(not_found_or_absent_dirty(Name)) 611 end. 612 613-spec retry_wait(amqqueue:amqqueue(), 614 qfun(A), 615 fun((not_found_or_absent()) -> rabbit_types:channel_exit()), 616 non_neg_integer()) -> 617 A | rabbit_types:channel_exit(). 618 619retry_wait(Q, F, E, RetriesLeft) -> 620 Name = amqqueue:get_name(Q), 621 QPid = amqqueue:get_pid(Q), 622 QState = amqqueue:get_state(Q), 623 case {QState, is_replicated(Q)} of 624 %% We don't want to repeat an operation if 625 %% there are no mirrors to migrate to 626 {stopped, false} -> 627 E({absent, Q, stopped}); 628 _ -> 629 case rabbit_mnesia:is_process_alive(QPid) of 630 true -> 631 % rabbitmq-server#1682 632 % The old check would have crashed here, 633 % instead, log it and run the exit fun. absent & alive is weird, 634 % but better than crashing with badmatch,true 635 rabbit_log:debug("Unexpected alive queue process ~p", [QPid]), 636 E({absent, Q, alive}); 637 false -> 638 ok % Expected result 639 end, 640 timer:sleep(30), 641 with(Name, F, E, RetriesLeft - 1) 642 end. 643 644-spec with(name(), qfun(A)) -> 645 A | rabbit_types:error(not_found_or_absent()). 646 647with(Name, F) -> with(Name, F, fun (E) -> {error, E} end). 648 649-spec with_or_die(name(), qfun(A)) -> A | rabbit_types:channel_exit(). 650 651with_or_die(Name, F) -> 652 with(Name, F, die_fun(Name)). 653 654-spec die_fun(name()) -> 655 fun((not_found_or_absent()) -> rabbit_types:channel_exit()). 656 657die_fun(Name) -> 658 fun (not_found) -> not_found(Name); 659 ({absent, Q, Reason}) -> absent(Q, Reason) 660 end. 661 662-spec not_found(name()) -> rabbit_types:channel_exit(). 663 664not_found(R) -> rabbit_misc:protocol_error(not_found, "no ~s", [rabbit_misc:rs(R)]). 665 666-spec absent(amqqueue:amqqueue(), absent_reason()) -> 667 rabbit_types:channel_exit(). 668 669absent(Q, AbsentReason) -> 670 QueueName = amqqueue:get_name(Q), 671 QPid = amqqueue:get_pid(Q), 672 IsDurable = amqqueue:is_durable(Q), 673 priv_absent(QueueName, QPid, IsDurable, AbsentReason). 674 675-spec priv_absent(name(), pid(), boolean(), absent_reason()) -> 676 rabbit_types:channel_exit(). 677 678priv_absent(QueueName, QPid, true, nodedown) -> 679 %% The assertion of durability is mainly there because we mention 680 %% durability in the error message. That way we will hopefully 681 %% notice if at some future point our logic changes s.t. we get 682 %% here with non-durable queues. 683 rabbit_misc:protocol_error( 684 not_found, 685 "home node '~s' of durable ~s is down or inaccessible", 686 [amqqueue:qnode(QPid), rabbit_misc:rs(QueueName)]); 687 688priv_absent(QueueName, _QPid, _IsDurable, stopped) -> 689 rabbit_misc:protocol_error( 690 not_found, 691 "~s process is stopped by supervisor", [rabbit_misc:rs(QueueName)]); 692 693priv_absent(QueueName, _QPid, _IsDurable, crashed) -> 694 rabbit_misc:protocol_error( 695 not_found, 696 "~s has crashed and failed to restart", [rabbit_misc:rs(QueueName)]); 697 698priv_absent(QueueName, _QPid, _IsDurable, timeout) -> 699 rabbit_misc:protocol_error( 700 not_found, 701 "failed to perform operation on ~s due to timeout", [rabbit_misc:rs(QueueName)]); 702 703priv_absent(QueueName, QPid, _IsDurable, alive) -> 704 rabbit_misc:protocol_error( 705 not_found, 706 "failed to perform operation on ~s: its master replica ~w may be stopping or being demoted", 707 [rabbit_misc:rs(QueueName), QPid]). 708 709-spec assert_equivalence 710 (amqqueue:amqqueue(), boolean(), boolean(), 711 rabbit_framing:amqp_table(), rabbit_types:maybe(pid())) -> 712 'ok' | rabbit_types:channel_exit() | rabbit_types:connection_exit(). 713 714assert_equivalence(Q, DurableDeclare, AutoDeleteDeclare, Args1, Owner) -> 715 QName = amqqueue:get_name(Q), 716 DurableQ = amqqueue:is_durable(Q), 717 AutoDeleteQ = amqqueue:is_auto_delete(Q), 718 ok = check_exclusive_access(Q, Owner, strict), 719 ok = rabbit_misc:assert_field_equivalence(DurableQ, DurableDeclare, QName, durable), 720 ok = rabbit_misc:assert_field_equivalence(AutoDeleteQ, AutoDeleteDeclare, QName, auto_delete), 721 ok = assert_args_equivalence(Q, Args1). 722 723-spec check_exclusive_access(amqqueue:amqqueue(), pid()) -> 724 'ok' | rabbit_types:channel_exit(). 725 726check_exclusive_access(Q, Owner) -> check_exclusive_access(Q, Owner, lax). 727 728check_exclusive_access(Q, Owner, _MatchType) 729 when ?amqqueue_exclusive_owner_is(Q, Owner) -> 730 ok; 731check_exclusive_access(Q, _ReaderPid, lax) 732 when ?amqqueue_exclusive_owner_is(Q, none) -> 733 ok; 734check_exclusive_access(Q, _ReaderPid, _MatchType) -> 735 QueueName = amqqueue:get_name(Q), 736 rabbit_misc:protocol_error( 737 resource_locked, 738 "cannot obtain exclusive access to locked ~s. It could be originally " 739 "declared on another connection or the exclusive property value does not " 740 "match that of the original declaration.", 741 [rabbit_misc:rs(QueueName)]). 742 743-spec with_exclusive_access_or_die(name(), pid(), qfun(A)) -> 744 A | rabbit_types:channel_exit(). 745 746with_exclusive_access_or_die(Name, ReaderPid, F) -> 747 with_or_die(Name, 748 fun (Q) -> check_exclusive_access(Q, ReaderPid), F(Q) end). 749 750assert_args_equivalence(Q, RequiredArgs) -> 751 QueueName = amqqueue:get_name(Q), 752 Args = amqqueue:get_arguments(Q), 753 rabbit_misc:assert_args_equivalence(Args, RequiredArgs, QueueName, 754 [Key || {Key, _Fun} <- declare_args()]). 755 756check_declare_arguments(QueueName, Args) -> 757 check_arguments(QueueName, Args, declare_args()). 758 759check_consume_arguments(QueueName, Args) -> 760 check_arguments(QueueName, Args, consume_args()). 761 762check_arguments(QueueName, Args, Validators) -> 763 [case rabbit_misc:table_lookup(Args, Key) of 764 undefined -> ok; 765 TypeVal -> case Fun(TypeVal, Args) of 766 ok -> ok; 767 {error, Error} -> rabbit_misc:protocol_error( 768 precondition_failed, 769 "invalid arg '~s' for ~s: ~255p", 770 [Key, rabbit_misc:rs(QueueName), 771 Error]) 772 end 773 end || {Key, Fun} <- Validators], 774 ok. 775 776declare_args() -> 777 [{<<"x-expires">>, fun check_expires_arg/2}, 778 {<<"x-message-ttl">>, fun check_message_ttl_arg/2}, 779 {<<"x-dead-letter-exchange">>, fun check_dlxname_arg/2}, 780 {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}, 781 {<<"x-max-length">>, fun check_non_neg_int_arg/2}, 782 {<<"x-max-length-bytes">>, fun check_non_neg_int_arg/2}, 783 {<<"x-max-in-memory-length">>, fun check_non_neg_int_arg/2}, 784 {<<"x-max-in-memory-bytes">>, fun check_non_neg_int_arg/2}, 785 {<<"x-max-priority">>, fun check_max_priority_arg/2}, 786 {<<"x-overflow">>, fun check_overflow/2}, 787 {<<"x-queue-mode">>, fun check_queue_mode/2}, 788 {<<"x-single-active-consumer">>, fun check_single_active_consumer_arg/2}, 789 {<<"x-queue-type">>, fun check_queue_type/2}, 790 {<<"x-quorum-initial-group-size">>, fun check_initial_cluster_size_arg/2}, 791 {<<"x-max-age">>, fun check_max_age_arg/2}, 792 {<<"x-stream-max-segment-size-bytes">>, fun check_non_neg_int_arg/2}, 793 {<<"x-initial-cluster-size">>, fun check_initial_cluster_size_arg/2}, 794 {<<"x-queue-leader-locator">>, fun check_queue_leader_locator_arg/2}]. 795 796consume_args() -> [{<<"x-priority">>, fun check_int_arg/2}, 797 {<<"x-cancel-on-ha-failover">>, fun check_bool_arg/2}, 798 {<<"x-stream-offset">>, fun check_stream_offset_arg/2}]. 799 800check_int_arg({Type, _}, _) -> 801 case lists:member(Type, ?INTEGER_ARG_TYPES) of 802 true -> ok; 803 false -> {error, rabbit_misc:format("expected integer, got ~p", [Type])} 804 end; 805check_int_arg(Val, _) when is_integer(Val) -> 806 ok; 807check_int_arg(_Val, _) -> 808 {error, {unacceptable_type, "expected integer"}}. 809 810check_bool_arg({bool, _}, _) -> ok; 811check_bool_arg({Type, _}, _) -> {error, {unacceptable_type, Type}}; 812check_bool_arg(true, _) -> ok; 813check_bool_arg(false, _) -> ok; 814check_bool_arg(_Val, _) -> {error, {unacceptable_type, "expected boolean"}}. 815 816check_non_neg_int_arg({Type, Val}, Args) -> 817 case check_int_arg({Type, Val}, Args) of 818 ok when Val >= 0 -> ok; 819 ok -> {error, {value_negative, Val}}; 820 Error -> Error 821 end; 822check_non_neg_int_arg(Val, Args) -> 823 case check_int_arg(Val, Args) of 824 ok when Val >= 0 -> ok; 825 ok -> {error, {value_negative, Val}}; 826 Error -> Error 827 end. 828 829check_expires_arg({Type, Val}, Args) -> 830 case check_int_arg({Type, Val}, Args) of 831 ok when Val == 0 -> {error, {value_zero, Val}}; 832 ok -> rabbit_misc:check_expiry(Val); 833 Error -> Error 834 end; 835check_expires_arg(Val, Args) -> 836 case check_int_arg(Val, Args) of 837 ok when Val == 0 -> {error, {value_zero, Val}}; 838 ok -> rabbit_misc:check_expiry(Val); 839 Error -> Error 840 end. 841 842check_message_ttl_arg({Type, Val}, Args) -> 843 case check_int_arg({Type, Val}, Args) of 844 ok -> rabbit_misc:check_expiry(Val); 845 Error -> Error 846 end; 847check_message_ttl_arg(Val, Args) -> 848 case check_int_arg(Val, Args) of 849 ok -> rabbit_misc:check_expiry(Val); 850 Error -> Error 851 end. 852 853check_max_priority_arg({Type, Val}, Args) -> 854 case check_non_neg_int_arg({Type, Val}, Args) of 855 ok when Val =< ?MAX_SUPPORTED_PRIORITY -> ok; 856 ok -> {error, {max_value_exceeded, Val}}; 857 Error -> Error 858 end; 859check_max_priority_arg(Val, Args) -> 860 case check_non_neg_int_arg(Val, Args) of 861 ok when Val =< ?MAX_SUPPORTED_PRIORITY -> ok; 862 ok -> {error, {max_value_exceeded, Val}}; 863 Error -> Error 864 end. 865 866check_single_active_consumer_arg({Type, Val}, Args) -> 867 check_bool_arg({Type, Val}, Args); 868check_single_active_consumer_arg(Val, Args) -> 869 check_bool_arg(Val, Args). 870 871check_initial_cluster_size_arg({Type, Val}, Args) -> 872 case check_non_neg_int_arg({Type, Val}, Args) of 873 ok when Val == 0 -> {error, {value_zero, Val}}; 874 ok -> ok; 875 Error -> Error 876 end; 877check_initial_cluster_size_arg(Val, Args) -> 878 case check_non_neg_int_arg(Val, Args) of 879 ok when Val == 0 -> {error, {value_zero, Val}}; 880 ok -> ok; 881 Error -> Error 882 end. 883 884check_max_age_arg({longstr, Val}, _Args) -> 885 case check_max_age(Val) of 886 {error, _} = E -> 887 E; 888 _ -> 889 ok 890 end; 891check_max_age_arg({Type, _}, _Args) -> 892 {error, {unacceptable_type, Type}}. 893 894check_max_age(MaxAge) -> 895 case re:run(MaxAge, "(^[0-9]*)(.*)", [{capture, all_but_first, list}]) of 896 {match, [Value, Unit]} -> 897 case list_to_integer(Value) of 898 I when I > 0 -> 899 case lists:member(Unit, ["Y", "M", "D", "h", "m", "s"]) of 900 true -> 901 Int = list_to_integer(Value), 902 Int * unit_value_in_ms(Unit); 903 false -> 904 {error, invalid_max_age} 905 end; 906 _ -> 907 {error, invalid_max_age} 908 end; 909 _ -> 910 {error, invalid_max_age} 911 end. 912 913unit_value_in_ms("Y") -> 914 365 * unit_value_in_ms("D"); 915unit_value_in_ms("M") -> 916 30 * unit_value_in_ms("D"); 917unit_value_in_ms("D") -> 918 24 * unit_value_in_ms("h"); 919unit_value_in_ms("h") -> 920 3600 * unit_value_in_ms("s"); 921unit_value_in_ms("m") -> 922 60 * unit_value_in_ms("s"); 923unit_value_in_ms("s") -> 924 1000. 925 926%% Note that the validity of x-dead-letter-exchange is already verified 927%% by rabbit_channel's queue.declare handler. 928check_dlxname_arg({longstr, _}, _) -> ok; 929check_dlxname_arg({Type, _}, _) -> {error, {unacceptable_type, Type}}; 930check_dlxname_arg(Val, _) when is_list(Val) or is_binary(Val) -> ok; 931check_dlxname_arg(_Val, _) -> {error, {unacceptable_type, "expected a string (valid exchange name)"}}. 932 933check_dlxrk_arg({longstr, _}, Args) -> 934 case rabbit_misc:table_lookup(Args, <<"x-dead-letter-exchange">>) of 935 undefined -> {error, routing_key_but_no_dlx_defined}; 936 _ -> ok 937 end; 938check_dlxrk_arg({Type, _}, _Args) -> 939 {error, {unacceptable_type, Type}}; 940check_dlxrk_arg(Val, Args) when is_binary(Val) -> 941 case rabbit_misc:table_lookup(Args, <<"x-dead-letter-exchange">>) of 942 undefined -> {error, routing_key_but_no_dlx_defined}; 943 _ -> ok 944 end; 945check_dlxrk_arg(_Val, _Args) -> 946 {error, {unacceptable_type, "expected a string"}}. 947 948-define(KNOWN_OVERFLOW_MODES, [<<"drop-head">>, <<"reject-publish">>, <<"reject-publish-dlx">>]). 949check_overflow({longstr, Val}, _Args) -> 950 case lists:member(Val, ?KNOWN_OVERFLOW_MODES) of 951 true -> ok; 952 false -> {error, invalid_overflow} 953 end; 954check_overflow({Type, _}, _Args) -> 955 {error, {unacceptable_type, Type}}; 956check_overflow(Val, _Args) when is_binary(Val) -> 957 case lists:member(Val, ?KNOWN_OVERFLOW_MODES) of 958 true -> ok; 959 false -> {error, invalid_overflow} 960 end; 961check_overflow(_Val, _Args) -> 962 {error, invalid_overflow}. 963 964-define(KNOWN_LEADER_LOCATORS, [<<"client-local">>, <<"random">>, <<"least-leaders">>]). 965check_queue_leader_locator_arg({longstr, Val}, _Args) -> 966 case lists:member(Val, ?KNOWN_LEADER_LOCATORS) of 967 true -> ok; 968 false -> {error, invalid_queue_locator_arg} 969 end; 970check_queue_leader_locator_arg({Type, _}, _Args) -> 971 {error, {unacceptable_type, Type}}; 972check_queue_leader_locator_arg(Val, _Args) when is_binary(Val) -> 973 case lists:member(Val, ?KNOWN_LEADER_LOCATORS) of 974 true -> ok; 975 false -> {error, invalid_queue_locator_arg} 976 end; 977check_queue_leader_locator_arg(_Val, _Args) -> 978 {error, invalid_queue_locator_arg}. 979 980check_stream_offset_arg(Val, _Args) -> 981 case rabbit_stream_queue:parse_offset_arg(Val) of 982 {ok, _} -> 983 ok; 984 {error, _} -> 985 {error, {invalid_stream_offset_arg, Val}} 986 end. 987 988-define(KNOWN_QUEUE_MODES, [<<"default">>, <<"lazy">>]). 989check_queue_mode({longstr, Val}, _Args) -> 990 case lists:member(Val, ?KNOWN_QUEUE_MODES) of 991 true -> ok; 992 false -> {error, rabbit_misc:format("unsupported queue mode '~s'", [Val])} 993 end; 994check_queue_mode({Type, _}, _Args) -> 995 {error, {unacceptable_type, Type}}; 996check_queue_mode(Val, _Args) when is_binary(Val) -> 997 case lists:member(Val, ?KNOWN_QUEUE_MODES) of 998 true -> ok; 999 false -> {error, rabbit_misc:format("unsupported queue mode '~s'", [Val])} 1000 end; 1001check_queue_mode(_Val, _Args) -> 1002 {error, invalid_queue_mode}. 1003 1004-define(KNOWN_QUEUE_TYPES, [<<"classic">>, <<"quorum">>, <<"stream">>]). 1005check_queue_type({longstr, Val}, _Args) -> 1006 case lists:member(Val, ?KNOWN_QUEUE_TYPES) of 1007 true -> ok; 1008 false -> {error, rabbit_misc:format("unsupported queue type '~s'", [Val])} 1009 end; 1010check_queue_type({Type, _}, _Args) -> 1011 {error, {unacceptable_type, Type}}; 1012check_queue_type(Val, _Args) when is_binary(Val) -> 1013 case lists:member(Val, ?KNOWN_QUEUE_TYPES) of 1014 true -> ok; 1015 false -> {error, rabbit_misc:format("unsupported queue type '~s'", [Val])} 1016 end; 1017check_queue_type(_Val, _Args) -> 1018 {error, invalid_queue_type}. 1019 1020-spec list() -> [amqqueue:amqqueue()]. 1021 1022list() -> 1023 list_with_possible_retry(fun do_list/0). 1024 1025do_list() -> 1026 mnesia:dirty_match_object(rabbit_queue, amqqueue:pattern_match_all()). 1027 1028-spec count() -> non_neg_integer(). 1029 1030count() -> 1031 mnesia:table_info(rabbit_queue, size). 1032 1033-spec list_names() -> [rabbit_amqqueue:name()]. 1034 1035list_names() -> mnesia:dirty_all_keys(rabbit_queue). 1036 1037list_names(VHost) -> [amqqueue:get_name(Q) || Q <- list(VHost)]. 1038 1039list_local_names() -> 1040 [ amqqueue:get_name(Q) || Q <- list(), 1041 amqqueue:get_state(Q) =/= crashed, is_local_to_node(amqqueue:get_pid(Q), node())]. 1042 1043list_local_names_down() -> 1044 [ amqqueue:get_name(Q) || Q <- list(), 1045 is_local_to_node(amqqueue:get_pid(Q), node()), 1046 is_down(Q)]. 1047 1048is_down(Q) -> 1049 try 1050 info(Q, [state]) == [{state, down}] 1051 catch 1052 _:_ -> 1053 true 1054 end. 1055 1056 1057-spec sample_local_queues() -> [amqqueue:amqqueue()]. 1058sample_local_queues() -> sample_n_by_name(list_local_names(), 300). 1059 1060-spec sample_n_by_name([rabbit_amqqueue:name()], pos_integer()) -> [amqqueue:amqqueue()]. 1061sample_n_by_name([], _N) -> 1062 []; 1063sample_n_by_name(Names, N) when is_list(Names) andalso is_integer(N) andalso N > 0 -> 1064 %% lists:nth/2 throws when position is > list length 1065 M = erlang:min(N, length(Names)), 1066 Ids = lists:foldl(fun( _, Acc) when length(Acc) >= 100 -> 1067 Acc; 1068 (_, Acc) -> 1069 Pick = lists:nth(rand:uniform(M), Names), 1070 [Pick | Acc] 1071 end, 1072 [], lists:seq(1, M)), 1073 lists:map(fun (Id) -> 1074 {ok, Q} = rabbit_amqqueue:lookup(Id), 1075 Q 1076 end, 1077 lists:usort(Ids)). 1078 1079-spec sample_n([amqqueue:amqqueue()], pos_integer()) -> [amqqueue:amqqueue()]. 1080sample_n([], _N) -> 1081 []; 1082sample_n(Queues, N) when is_list(Queues) andalso is_integer(N) andalso N > 0 -> 1083 Names = [amqqueue:get_name(Q) || Q <- Queues], 1084 sample_n_by_name(Names, N). 1085 1086 1087-spec list_by_type(atom()) -> [amqqueue:amqqueue()]. 1088 1089list_by_type(classic) -> list_by_type(rabbit_classic_queue); 1090list_by_type(quorum) -> list_by_type(rabbit_quorum_queue); 1091list_by_type(stream) -> list_by_type(rabbit_stream_queue); 1092list_by_type(Type) -> 1093 {atomic, Qs} = 1094 mnesia:sync_transaction( 1095 fun () -> 1096 mnesia:match_object(rabbit_durable_queue, 1097 amqqueue:pattern_match_on_type(Type), 1098 read) 1099 end), 1100 Qs. 1101 1102-spec list_local_quorum_queue_names() -> [rabbit_amqqueue:name()]. 1103 1104list_local_quorum_queue_names() -> 1105 [ amqqueue:get_name(Q) || Q <- list_by_type(quorum), 1106 amqqueue:get_state(Q) =/= crashed, 1107 lists:member(node(), get_quorum_nodes(Q))]. 1108 1109-spec list_local_quorum_queues() -> [amqqueue:amqqueue()]. 1110list_local_quorum_queues() -> 1111 [ Q || Q <- list_by_type(quorum), 1112 amqqueue:get_state(Q) =/= crashed, 1113 lists:member(node(), get_quorum_nodes(Q))]. 1114 1115-spec list_local_stream_queues() -> [amqqueue:amqqueue()]. 1116list_local_stream_queues() -> 1117 [ Q || Q <- list_by_type(stream), 1118 amqqueue:get_state(Q) =/= crashed, 1119 lists:member(node(), get_quorum_nodes(Q))]. 1120 1121-spec list_local_leaders() -> [amqqueue:amqqueue()]. 1122list_local_leaders() -> 1123 [ Q || Q <- list(), 1124 amqqueue:is_quorum(Q), 1125 amqqueue:get_state(Q) =/= crashed, amqqueue:get_leader(Q) =:= node()]. 1126 1127-spec list_local_followers() -> [amqqueue:amqqueue()]. 1128list_local_followers() -> 1129 [Q 1130 || Q <- list(), 1131 amqqueue:is_quorum(Q), 1132 amqqueue:get_state(Q) =/= crashed, 1133 amqqueue:get_leader(Q) =/= node(), 1134 rabbit_quorum_queue:is_recoverable(Q) 1135 ]. 1136 1137-spec list_local_mirrored_classic_queues() -> [amqqueue:amqqueue()]. 1138list_local_mirrored_classic_queues() -> 1139 [ Q || Q <- list(), 1140 amqqueue:get_state(Q) =/= crashed, 1141 amqqueue:is_classic(Q), 1142 is_local_to_node(amqqueue:get_pid(Q), node()), 1143 is_replicated(Q)]. 1144 1145-spec list_local_mirrored_classic_names() -> [rabbit_amqqueue:name()]. 1146list_local_mirrored_classic_names() -> 1147 [ amqqueue:get_name(Q) || Q <- list(), 1148 amqqueue:get_state(Q) =/= crashed, 1149 amqqueue:is_classic(Q), 1150 is_local_to_node(amqqueue:get_pid(Q), node()), 1151 is_replicated(Q)]. 1152 1153-spec list_local_mirrored_classic_without_synchronised_mirrors() -> 1154 [amqqueue:amqqueue()]. 1155list_local_mirrored_classic_without_synchronised_mirrors() -> 1156 [ Q || Q <- list(), 1157 amqqueue:get_state(Q) =/= crashed, 1158 amqqueue:is_classic(Q), 1159 %% filter out exclusive queues as they won't actually be mirrored 1160 is_not_exclusive(Q), 1161 is_local_to_node(amqqueue:get_pid(Q), node()), 1162 is_replicated(Q), 1163 not has_synchronised_mirrors_online(Q)]. 1164 1165-spec list_local_mirrored_classic_without_synchronised_mirrors_for_cli() -> 1166 [#{binary => any()}]. 1167list_local_mirrored_classic_without_synchronised_mirrors_for_cli() -> 1168 ClassicQs = list_local_mirrored_classic_without_synchronised_mirrors(), 1169 [begin 1170 #resource{name = Name} = amqqueue:get_name(Q), 1171 #{ 1172 <<"readable_name">> => rabbit_data_coercion:to_binary(rabbit_misc:rs(amqqueue:get_name(Q))), 1173 <<"name">> => Name, 1174 <<"virtual_host">> => amqqueue:get_vhost(Q), 1175 <<"type">> => <<"classic">> 1176 } 1177 end || Q <- ClassicQs]. 1178 1179-spec list_local_quorum_queues_with_name_matching(binary()) -> [amqqueue:amqqueue()]. 1180list_local_quorum_queues_with_name_matching(Pattern) -> 1181 [ Q || Q <- list_by_type(quorum), 1182 amqqueue:get_state(Q) =/= crashed, 1183 lists:member(node(), get_quorum_nodes(Q)), 1184 is_match(get_resource_name(amqqueue:get_name(Q)), Pattern)]. 1185 1186-spec list_local_quorum_queues_with_name_matching(vhost:name(), binary()) -> [amqqueue:amqqueue()]. 1187list_local_quorum_queues_with_name_matching(VHostName, Pattern) -> 1188 [ Q || Q <- list_by_type(quorum), 1189 amqqueue:get_state(Q) =/= crashed, 1190 lists:member(node(), get_quorum_nodes(Q)), 1191 is_in_virtual_host(Q, VHostName), 1192 is_match(get_resource_name(amqqueue:get_name(Q)), Pattern)]. 1193 1194is_local_to_node(QPid, Node) when ?IS_CLASSIC(QPid) -> 1195 Node =:= node(QPid); 1196is_local_to_node({_, Leader} = QPid, Node) when ?IS_QUORUM(QPid) -> 1197 Node =:= Leader; 1198is_local_to_node(_QPid, _Node) -> 1199 false. 1200 1201is_in_virtual_host(Q, VHostName) -> 1202 VHostName =:= get_resource_vhost_name(amqqueue:get_name(Q)). 1203 1204-spec list(vhost:name()) -> [amqqueue:amqqueue()]. 1205list(VHostPath) -> 1206 list(VHostPath, rabbit_queue). 1207 1208list(VHostPath, TableName) -> 1209 list_with_possible_retry(fun() -> do_list(VHostPath, TableName) end). 1210 1211%% Not dirty_match_object since that would not be transactional when used in a 1212%% tx context 1213do_list(VHostPath, TableName) -> 1214 mnesia:async_dirty( 1215 fun () -> 1216 mnesia:match_object( 1217 TableName, 1218 amqqueue:pattern_match_on_name(rabbit_misc:r(VHostPath, queue)), 1219 read) 1220 end). 1221 1222list_with_possible_retry(Fun) -> 1223 %% amqqueue migration: 1224 %% The `rabbit_queue` or `rabbit_durable_queue` tables 1225 %% might be migrated between the time we query the pattern 1226 %% (with the `amqqueue` module) and the time we call 1227 %% `mnesia:dirty_match_object()`. This would lead to an empty list 1228 %% (no object matching the now incorrect pattern), not a Mnesia 1229 %% error. 1230 %% 1231 %% So if the result is an empty list and the version of the 1232 %% `amqqueue` record changed in between, we retry the operation. 1233 %% 1234 %% However, we don't do this if inside a Mnesia transaction: we 1235 %% could end up with a live lock between this started transaction 1236 %% and the Mnesia table migration which is blocked (but the 1237 %% rabbit_feature_flags lock is held). 1238 AmqqueueRecordVersion = amqqueue:record_version_to_use(), 1239 case Fun() of 1240 [] -> 1241 case mnesia:is_transaction() of 1242 true -> 1243 []; 1244 false -> 1245 case amqqueue:record_version_to_use() of 1246 AmqqueueRecordVersion -> []; 1247 _ -> Fun() 1248 end 1249 end; 1250 Ret -> 1251 Ret 1252 end. 1253 1254-spec list_down(rabbit_types:vhost()) -> [amqqueue:amqqueue()]. 1255 1256list_down(VHostPath) -> 1257 case rabbit_vhost:exists(VHostPath) of 1258 false -> []; 1259 true -> 1260 Present = list(VHostPath), 1261 Durable = list(VHostPath, rabbit_durable_queue), 1262 PresentS = sets:from_list([amqqueue:get_name(Q) || Q <- Present]), 1263 sets:to_list(sets:filter(fun (Q) -> 1264 N = amqqueue:get_name(Q), 1265 not sets:is_element(N, PresentS) 1266 end, sets:from_list(Durable))) 1267 end. 1268 1269count(VHost) -> 1270 try 1271 %% this is certainly suboptimal but there is no way to count 1272 %% things using a secondary index in Mnesia. Our counter-table-per-node 1273 %% won't work here because with master migration of mirrored queues 1274 %% the "ownership" of queues by nodes becomes a non-trivial problem 1275 %% that requires a proper consensus algorithm. 1276 length(list_for_count(VHost)) 1277 catch _:Err -> 1278 rabbit_log:error("Failed to fetch number of queues in vhost ~p:~n~p", 1279 [VHost, Err]), 1280 0 1281 end. 1282 1283list_for_count(VHost) -> 1284 list_with_possible_retry( 1285 fun() -> 1286 mnesia:dirty_index_read(rabbit_queue, 1287 VHost, 1288 amqqueue:field_vhost()) 1289 end). 1290 1291-spec info_keys() -> rabbit_types:info_keys(). 1292 1293%% It should no default to classic queue keys, but a subset of those that must be shared 1294%% by all queue types. Not sure this is even being used, so will leave it here for backwards 1295%% compatibility. Each queue type handles now info(Q, all_keys) with the keys it supports. 1296info_keys() -> rabbit_amqqueue_process:info_keys(). 1297 1298map(Qs, F) -> rabbit_misc:filter_exit_map(F, Qs). 1299 1300is_unresponsive(Q, _Timeout) when ?amqqueue_state_is(Q, crashed) -> 1301 false; 1302is_unresponsive(Q, Timeout) when ?amqqueue_is_classic(Q) -> 1303 QPid = amqqueue:get_pid(Q), 1304 try 1305 delegate:invoke(QPid, {gen_server2, call, [{info, [name]}, Timeout]}), 1306 false 1307 catch 1308 %% TODO catch any exit?? 1309 exit:{timeout, _} -> 1310 true 1311 end; 1312is_unresponsive(Q, Timeout) when ?amqqueue_is_quorum(Q) -> 1313 try 1314 Leader = amqqueue:get_pid(Q), 1315 case rabbit_fifo_client:stat(Leader, Timeout) of 1316 {ok, _, _} -> false; 1317 {timeout, _} -> true; 1318 {error, _} -> true 1319 end 1320 catch 1321 exit:{timeout, _} -> 1322 true 1323 end; 1324is_unresponsive(Q, Timeout) when ?amqqueue_is_stream(Q) -> 1325 try 1326 #{leader_pid := LeaderPid} = amqqueue:get_type_state(Q), 1327 case gen_batch_server:call(LeaderPid, get_reader_context, Timeout) of 1328 #{dir := _} -> false; 1329 _ -> true 1330 end 1331 catch 1332 exit:{timeout, _} -> 1333 true 1334 end. 1335 1336format(Q) when ?amqqueue_is_quorum(Q) -> rabbit_quorum_queue:format(Q); 1337format(Q) -> rabbit_amqqueue_process:format(Q). 1338 1339-spec info(amqqueue:amqqueue()) -> rabbit_types:infos(). 1340 1341info(Q) when ?is_amqqueue(Q) -> rabbit_queue_type:info(Q, all_keys). 1342 1343 1344-spec info(amqqueue:amqqueue(), rabbit_types:info_keys()) -> 1345 rabbit_types:infos(). 1346 1347info(Q, Items) when ?is_amqqueue(Q) -> 1348 rabbit_queue_type:info(Q, Items). 1349 1350info_down(Q, DownReason) -> 1351 rabbit_queue_type:info_down(Q, DownReason). 1352 1353info_down(Q, Items, DownReason) -> 1354 rabbit_queue_type:info_down(Q, Items, DownReason). 1355 1356-spec info_all(rabbit_types:vhost()) -> [rabbit_types:infos()]. 1357 1358info_all(VHostPath) -> 1359 map(list(VHostPath), fun (Q) -> info(Q) end) ++ 1360 map(list_down(VHostPath), fun (Q) -> info_down(Q, down) end). 1361 1362-spec info_all(rabbit_types:vhost(), rabbit_types:info_keys()) -> 1363 [rabbit_types:infos()]. 1364 1365info_all(VHostPath, Items) -> 1366 map(list(VHostPath), fun (Q) -> info(Q, Items) end) ++ 1367 map(list_down(VHostPath), fun (Q) -> info_down(Q, Items, down) end). 1368 1369emit_info_local(VHostPath, Items, Ref, AggregatorPid) -> 1370 rabbit_control_misc:emitting_map_with_exit_handler( 1371 AggregatorPid, Ref, fun(Q) -> info(Q, Items) end, list_local(VHostPath)). 1372 1373emit_info_all(Nodes, VHostPath, Items, Ref, AggregatorPid) -> 1374 Pids = [ spawn_link(Node, rabbit_amqqueue, emit_info_local, [VHostPath, Items, Ref, AggregatorPid]) || Node <- Nodes ], 1375 rabbit_control_misc:await_emitters_termination(Pids). 1376 1377collect_info_all(VHostPath, Items) -> 1378 Nodes = rabbit_nodes:all_running(), 1379 Ref = make_ref(), 1380 Pids = [ spawn_link(Node, rabbit_amqqueue, emit_info_local, [VHostPath, Items, Ref, self()]) || Node <- Nodes ], 1381 rabbit_control_misc:await_emitters_termination(Pids), 1382 wait_for_queues(Ref, length(Pids), []). 1383 1384wait_for_queues(Ref, N, Acc) -> 1385 receive 1386 {Ref, finished} when N == 1 -> 1387 Acc; 1388 {Ref, finished} -> 1389 wait_for_queues(Ref, N - 1, Acc); 1390 {Ref, Items, continue} -> 1391 wait_for_queues(Ref, N, [Items | Acc]) 1392 after 1393 1000 -> 1394 Acc 1395 end. 1396 1397emit_info_down(VHostPath, Items, Ref, AggregatorPid) -> 1398 rabbit_control_misc:emitting_map_with_exit_handler( 1399 AggregatorPid, Ref, fun(Q) -> info_down(Q, Items, down) end, 1400 list_down(VHostPath)). 1401 1402emit_unresponsive_local(VHostPath, Items, Timeout, Ref, AggregatorPid) -> 1403 rabbit_control_misc:emitting_map_with_exit_handler( 1404 AggregatorPid, Ref, fun(Q) -> case is_unresponsive(Q, Timeout) of 1405 true -> info_down(Q, Items, unresponsive); 1406 false -> [] 1407 end 1408 end, list_local(VHostPath) 1409 ). 1410 1411emit_unresponsive(Nodes, VHostPath, Items, Timeout, Ref, AggregatorPid) -> 1412 Pids = [ spawn_link(Node, rabbit_amqqueue, emit_unresponsive_local, 1413 [VHostPath, Items, Timeout, Ref, AggregatorPid]) || Node <- Nodes ], 1414 rabbit_control_misc:await_emitters_termination(Pids). 1415 1416info_local(VHostPath) -> 1417 map(list_local(VHostPath), fun (Q) -> info(Q, [name]) end). 1418 1419list_local(VHostPath) -> 1420 [Q || Q <- list(VHostPath), 1421 amqqueue:get_state(Q) =/= crashed, is_local_to_node(amqqueue:get_pid(Q), node())]. 1422 1423-spec force_event_refresh(reference()) -> 'ok'. 1424 1425% Note: https://www.pivotaltracker.com/story/show/166962656 1426% This event is necessary for the stats timer to be initialized with 1427% the correct values once the management agent has started 1428force_event_refresh(Ref) -> 1429 %% note: quorum queuse emit stats on periodic ticks that run unconditionally, 1430 %% so force_event_refresh is unnecessary (and, in fact, would only produce log noise) for QQs. 1431 ClassicQs = list_by_type(rabbit_classic_queue), 1432 [gen_server2:cast(amqqueue:get_pid(Q), 1433 {force_event_refresh, Ref}) || Q <- ClassicQs], 1434 ok. 1435 1436-spec notify_policy_changed(amqqueue:amqqueue()) -> 'ok'. 1437notify_policy_changed(Q) when ?is_amqqueue(Q) -> 1438 rabbit_queue_type:policy_changed(Q). 1439 1440-spec consumers(amqqueue:amqqueue()) -> 1441 [{pid(), rabbit_types:ctag(), boolean(), non_neg_integer(), 1442 boolean(), atom(), 1443 rabbit_framing:amqp_table(), rabbit_types:username()}]. 1444 1445consumers(Q) when ?amqqueue_is_classic(Q) -> 1446 QPid = amqqueue:get_pid(Q), 1447 delegate:invoke(QPid, {gen_server2, call, [consumers, infinity]}); 1448consumers(Q) when ?amqqueue_is_quorum(Q) -> 1449 QPid = amqqueue:get_pid(Q), 1450 case ra:local_query(QPid, fun rabbit_fifo:query_consumers/1) of 1451 {ok, {_, Result}, _} -> maps:values(Result); 1452 _ -> [] 1453 end; 1454consumers(Q) when ?amqqueue_is_stream(Q) -> 1455 %% TODO how??? they only exist on the channel 1456 %% we could list the offset listener on the writer but we don't even have a consumer tag, 1457 %% only a (channel) pid and offset 1458 []. 1459 1460-spec consumer_info_keys() -> rabbit_types:info_keys(). 1461 1462consumer_info_keys() -> ?CONSUMER_INFO_KEYS. 1463 1464-spec consumers_all(rabbit_types:vhost()) -> 1465 [{name(), pid(), rabbit_types:ctag(), boolean(), 1466 non_neg_integer(), rabbit_framing:amqp_table()}]. 1467 1468consumers_all(VHostPath) -> 1469 ConsumerInfoKeys = consumer_info_keys(), 1470 lists:append( 1471 map(list(VHostPath), 1472 fun(Q) -> get_queue_consumer_info(Q, ConsumerInfoKeys) end)). 1473 1474emit_consumers_all(Nodes, VHostPath, Ref, AggregatorPid) -> 1475 Pids = [ spawn_link(Node, rabbit_amqqueue, emit_consumers_local, [VHostPath, Ref, AggregatorPid]) || Node <- Nodes ], 1476 rabbit_control_misc:await_emitters_termination(Pids), 1477 ok. 1478 1479emit_consumers_local(VHostPath, Ref, AggregatorPid) -> 1480 ConsumerInfoKeys = consumer_info_keys(), 1481 rabbit_control_misc:emitting_map( 1482 AggregatorPid, Ref, 1483 fun(Q) -> get_queue_consumer_info(Q, ConsumerInfoKeys) end, 1484 list_local(VHostPath)). 1485 1486get_queue_consumer_info(Q, ConsumerInfoKeys) -> 1487 [lists:zip(ConsumerInfoKeys, 1488 [amqqueue:get_name(Q), ChPid, CTag, 1489 AckRequired, Prefetch, Active, ActivityStatus, Args]) || 1490 {ChPid, CTag, AckRequired, Prefetch, Active, ActivityStatus, Args, _} <- consumers(Q)]. 1491 1492-spec stat(amqqueue:amqqueue()) -> 1493 {'ok', non_neg_integer(), non_neg_integer()}. 1494stat(Q) -> 1495 rabbit_queue_type:stat(Q). 1496 1497-spec pid_of(amqqueue:amqqueue()) -> 1498 pid(). 1499 1500pid_of(Q) -> amqqueue:get_pid(Q). 1501 1502-spec pid_of(rabbit_types:vhost(), rabbit_misc:resource_name()) -> 1503 pid() | rabbit_types:error('not_found'). 1504 1505pid_of(VHost, QueueName) -> 1506 case lookup(rabbit_misc:r(VHost, queue, QueueName)) of 1507 {ok, Q} -> pid_of(Q); 1508 {error, not_found} = E -> E 1509 end. 1510 1511-spec delete_exclusive(qpids(), pid()) -> 'ok'. 1512 1513delete_exclusive(QPids, ConnId) -> 1514 rabbit_amqqueue_common:delete_exclusive(QPids, ConnId). 1515 1516-spec delete_immediately(qpids()) -> 'ok'. 1517 1518delete_immediately(QPids) -> 1519 {Classic, Quorum} = filter_pid_per_type(QPids), 1520 [gen_server2:cast(QPid, delete_immediately) || QPid <- Classic], 1521 case Quorum of 1522 [] -> ok; 1523 _ -> {error, cannot_delete_quorum_queues, Quorum} 1524 end. 1525 1526delete_immediately_by_resource(Resources) -> 1527 {Classic, Quorum} = filter_resource_per_type(Resources), 1528 [gen_server2:cast(QPid, delete_immediately) || {_, QPid} <- Classic], 1529 [rabbit_quorum_queue:delete_immediately(Resource, QPid) 1530 || {Resource, QPid} <- Quorum], 1531 ok. 1532 1533-spec delete 1534 (amqqueue:amqqueue(), 'false', 'false', rabbit_types:username()) -> 1535 qlen() | 1536 {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}; 1537 (amqqueue:amqqueue(), 'true' , 'false', rabbit_types:username()) -> 1538 qlen() | rabbit_types:error('in_use') | 1539 {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}; 1540 (amqqueue:amqqueue(), 'false', 'true', rabbit_types:username()) -> 1541 qlen() | rabbit_types:error('not_empty') | 1542 {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}; 1543 (amqqueue:amqqueue(), 'true' , 'true', rabbit_types:username()) -> 1544 qlen() | 1545 rabbit_types:error('in_use') | 1546 rabbit_types:error('not_empty') | 1547 {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}. 1548delete(Q, IfUnused, IfEmpty, ActingUser) -> 1549 rabbit_queue_type:delete(Q, IfUnused, IfEmpty, ActingUser). 1550 1551%% delete_crashed* INCLUDED FOR BACKWARDS COMPATBILITY REASONS 1552delete_crashed(Q) when ?amqqueue_is_classic(Q) -> 1553 rabbit_classic_queue:delete_crashed(Q). 1554 1555delete_crashed(Q, ActingUser) when ?amqqueue_is_classic(Q) -> 1556 rabbit_classic_queue:delete_crashed(Q, ActingUser). 1557 1558-spec delete_crashed_internal(amqqueue:amqqueue(), rabbit_types:username()) -> 'ok'. 1559delete_crashed_internal(Q, ActingUser) when ?amqqueue_is_classic(Q) -> 1560 rabbit_classic_queue:delete_crashed_internal(Q, ActingUser). 1561 1562-spec purge(amqqueue:amqqueue()) -> qlen(). 1563purge(Q) when ?is_amqqueue(Q) -> 1564 rabbit_queue_type:purge(Q). 1565 1566-spec requeue(name(), 1567 {rabbit_fifo:consumer_tag(), [msg_id()]}, 1568 rabbit_queue_type:state()) -> 1569 {ok, rabbit_queue_type:state(), rabbit_queue_type:actions()}. 1570requeue(QRef, {CTag, MsgIds}, QStates) -> 1571 reject(QRef, true, {CTag, MsgIds}, QStates). 1572 1573-spec ack(name(), 1574 {rabbit_fifo:consumer_tag(), [msg_id()]}, 1575 rabbit_queue_type:state()) -> 1576 {ok, rabbit_queue_type:state(), rabbit_queue_type:actions()}. 1577ack(QPid, {CTag, MsgIds}, QueueStates) -> 1578 rabbit_queue_type:settle(QPid, complete, CTag, MsgIds, QueueStates). 1579 1580 1581-spec reject(name(), 1582 boolean(), 1583 {rabbit_fifo:consumer_tag(), [msg_id()]}, 1584 rabbit_queue_type:state()) -> 1585 {ok, rabbit_queue_type:state(), rabbit_queue_type:actions()}. 1586reject(QRef, Requeue, {CTag, MsgIds}, QStates) -> 1587 Op = case Requeue of 1588 true -> requeue; 1589 false -> discard 1590 end, 1591 rabbit_queue_type:settle(QRef, Op, CTag, MsgIds, QStates). 1592 1593-spec notify_down_all(qpids(), pid()) -> ok_or_errors(). 1594notify_down_all(QPids, ChPid) -> 1595 notify_down_all(QPids, ChPid, ?CHANNEL_OPERATION_TIMEOUT). 1596 1597-spec notify_down_all(qpids(), pid(), non_neg_integer()) -> 1598 ok_or_errors(). 1599notify_down_all(QPids, ChPid, Timeout) -> 1600 case rpc:call(node(), delegate, invoke, 1601 [QPids, {gen_server2, call, [{notify_down, ChPid}, infinity]}], Timeout) of 1602 {badrpc, timeout} -> {error, {channel_operation_timeout, Timeout}}; 1603 {badrpc, Reason} -> {error, Reason}; 1604 {_, Bads} -> 1605 case lists:filter( 1606 fun ({_Pid, {exit, {R, _}, _}}) -> 1607 rabbit_misc:is_abnormal_exit(R); 1608 ({_Pid, _}) -> false 1609 end, Bads) of 1610 [] -> ok; 1611 Bads1 -> {error, Bads1} 1612 end; 1613 Error -> {error, Error} 1614 end. 1615 1616-spec activate_limit_all(qpids(), pid()) -> ok. 1617 1618activate_limit_all(QRefs, ChPid) -> 1619 QPids = [P || P <- QRefs, ?IS_CLASSIC(P)], 1620 delegate:invoke_no_result(QPids, {gen_server2, cast, 1621 [{activate_limit, ChPid}]}). 1622 1623-spec deactivate_limit_all(qpids(), pid()) -> ok. 1624 1625deactivate_limit_all(QRefs, ChPid) -> 1626 QPids = [P || P <- QRefs, ?IS_CLASSIC(P)], 1627 delegate:invoke_no_result(QPids, {gen_server2, cast, 1628 [{deactivate_limit, ChPid}]}). 1629 1630-spec credit(amqqueue:amqqueue(), 1631 rabbit_types:ctag(), 1632 non_neg_integer(), 1633 boolean(), 1634 rabbit_queue_type:state()) -> 1635 {ok, rabbit_queue_type:state(), rabbit_queue_type:actions()}. 1636credit(Q, CTag, Credit, Drain, QStates) -> 1637 rabbit_queue_type:credit(Q, CTag, Credit, Drain, QStates). 1638 1639-spec basic_get(amqqueue:amqqueue(), boolean(), pid(), rabbit_types:ctag(), 1640 rabbit_queue_type:state()) -> 1641 {'ok', non_neg_integer(), qmsg(), rabbit_queue_type:state()} | 1642 {'empty', rabbit_queue_type:state()} | 1643 {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}. 1644basic_get(Q, NoAck, LimiterPid, CTag, QStates0) -> 1645 rabbit_queue_type:dequeue(Q, NoAck, LimiterPid, CTag, QStates0). 1646 1647 1648-spec basic_consume(amqqueue:amqqueue(), boolean(), pid(), pid(), boolean(), 1649 non_neg_integer(), rabbit_types:ctag(), boolean(), 1650 rabbit_framing:amqp_table(), any(), rabbit_types:username(), 1651 rabbit_queue_type:state()) -> 1652 {ok, rabbit_queue_type:state(), rabbit_queue_type:actions()} | 1653 {error, term()} | 1654 {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}. 1655basic_consume(Q, NoAck, ChPid, LimiterPid, 1656 LimiterActive, ConsumerPrefetchCount, ConsumerTag, 1657 ExclusiveConsume, Args, OkMsg, ActingUser, Contexts) -> 1658 1659 QName = amqqueue:get_name(Q), 1660 %% first phase argument validation 1661 %% each queue type may do further validations 1662 ok = check_consume_arguments(QName, Args), 1663 Spec = #{no_ack => NoAck, 1664 channel_pid => ChPid, 1665 limiter_pid => LimiterPid, 1666 limiter_active => LimiterActive, 1667 prefetch_count => ConsumerPrefetchCount, 1668 consumer_tag => ConsumerTag, 1669 exclusive_consume => ExclusiveConsume, 1670 args => Args, 1671 ok_msg => OkMsg, 1672 acting_user => ActingUser}, 1673 rabbit_queue_type:consume(Q, Spec, Contexts). 1674 1675-spec basic_cancel(amqqueue:amqqueue(), rabbit_types:ctag(), any(), 1676 rabbit_types:username(), 1677 rabbit_queue_type:state()) -> 1678 {ok, rabbit_queue_type:state()} | {error, term()}. 1679basic_cancel(Q, ConsumerTag, OkMsg, ActingUser, QStates) -> 1680 rabbit_queue_type:cancel(Q, ConsumerTag, 1681 OkMsg, ActingUser, QStates). 1682 1683-spec notify_decorators(amqqueue:amqqueue()) -> 'ok'. 1684 1685notify_decorators(Q) -> 1686 rabbit_queue_type:notify_decorators(Q). 1687 1688notify_sent(QPid, ChPid) -> 1689 rabbit_amqqueue_common:notify_sent(QPid, ChPid). 1690 1691notify_sent_queue_down(QPid) -> 1692 rabbit_amqqueue_common:notify_sent_queue_down(QPid). 1693 1694-spec resume(pid(), pid()) -> 'ok'. 1695 1696resume(QPid, ChPid) -> delegate:invoke_no_result(QPid, {gen_server2, cast, 1697 [{resume, ChPid}]}). 1698 1699internal_delete1(QueueName, OnlyDurable) -> 1700 internal_delete1(QueueName, OnlyDurable, normal). 1701 1702internal_delete1(QueueName, OnlyDurable, Reason) -> 1703 ok = mnesia:delete({rabbit_queue, QueueName}), 1704 case Reason of 1705 auto_delete -> 1706 case mnesia:wread({rabbit_durable_queue, QueueName}) of 1707 [] -> ok; 1708 [_] -> ok = mnesia:delete({rabbit_durable_queue, QueueName}) 1709 end; 1710 _ -> 1711 mnesia:delete({rabbit_durable_queue, QueueName}) 1712 end, 1713 %% we want to execute some things, as decided by rabbit_exchange, 1714 %% after the transaction. 1715 rabbit_binding:remove_for_destination(QueueName, OnlyDurable). 1716 1717-spec internal_delete(name(), rabbit_types:username()) -> 'ok'. 1718 1719internal_delete(QueueName, ActingUser) -> 1720 internal_delete(QueueName, ActingUser, normal). 1721 1722internal_delete(QueueName, ActingUser, Reason) -> 1723 rabbit_misc:execute_mnesia_tx_with_tail( 1724 fun () -> 1725 case {mnesia:wread({rabbit_queue, QueueName}), 1726 mnesia:wread({rabbit_durable_queue, QueueName})} of 1727 {[], []} -> 1728 rabbit_misc:const(ok); 1729 _ -> 1730 Deletions = internal_delete1(QueueName, false, Reason), 1731 T = rabbit_binding:process_deletions(Deletions, 1732 ?INTERNAL_USER), 1733 fun() -> 1734 ok = T(), 1735 rabbit_core_metrics:queue_deleted(QueueName), 1736 ok = rabbit_event:notify(queue_deleted, 1737 [{name, QueueName}, 1738 {user_who_performed_action, ActingUser}]) 1739 end 1740 end 1741 end). 1742 1743-spec forget_all_durable(node()) -> 'ok'. 1744 1745forget_all_durable(Node) -> 1746 %% Note rabbit is not running so we avoid e.g. the worker pool. Also why 1747 %% we don't invoke the return from rabbit_binding:process_deletions/1. 1748 {atomic, ok} = 1749 mnesia:sync_transaction( 1750 fun () -> 1751 Qs = mnesia:match_object(rabbit_durable_queue, 1752 amqqueue:pattern_match_all(), write), 1753 [forget_node_for_queue(Node, Q) || 1754 Q <- Qs, 1755 is_local_to_node(amqqueue:get_pid(Q), Node)], 1756 ok 1757 end), 1758 ok. 1759 1760%% Try to promote a mirror while down - it should recover as a 1761%% master. We try to take the oldest mirror here for best chance of 1762%% recovery. 1763forget_node_for_queue(_DeadNode, Q) 1764 when ?amqqueue_is_quorum(Q) -> 1765 ok; 1766forget_node_for_queue(DeadNode, Q) -> 1767 RS = amqqueue:get_recoverable_slaves(Q), 1768 forget_node_for_queue(DeadNode, RS, Q). 1769 1770forget_node_for_queue(_DeadNode, [], Q) -> 1771 %% No mirrors to recover from, queue is gone. 1772 %% Don't process_deletions since that just calls callbacks and we 1773 %% are not really up. 1774 Name = amqqueue:get_name(Q), 1775 internal_delete1(Name, true); 1776 1777%% Should not happen, but let's be conservative. 1778forget_node_for_queue(DeadNode, [DeadNode | T], Q) -> 1779 forget_node_for_queue(DeadNode, T, Q); 1780 1781forget_node_for_queue(DeadNode, [H|T], Q) when ?is_amqqueue(Q) -> 1782 Type = amqqueue:get_type(Q), 1783 case {node_permits_offline_promotion(H), Type} of 1784 {false, _} -> forget_node_for_queue(DeadNode, T, Q); 1785 {true, rabbit_classic_queue} -> 1786 Q1 = amqqueue:set_pid(Q, rabbit_misc:node_to_fake_pid(H)), 1787 ok = mnesia:write(rabbit_durable_queue, Q1, write); 1788 {true, rabbit_quorum_queue} -> 1789 ok 1790 end. 1791 1792node_permits_offline_promotion(Node) -> 1793 case node() of 1794 Node -> not rabbit:is_running(); %% [1] 1795 _ -> All = rabbit_nodes:all(), 1796 Running = rabbit_nodes:all_running(), 1797 lists:member(Node, All) andalso 1798 not lists:member(Node, Running) %% [2] 1799 end. 1800%% [1] In this case if we are a real running node (i.e. rabbitmqctl 1801%% has RPCed into us) then we cannot allow promotion. If on the other 1802%% hand we *are* rabbitmqctl impersonating the node for offline 1803%% node-forgetting then we can. 1804%% 1805%% [2] This is simpler; as long as it's down that's OK 1806 1807-spec run_backing_queue 1808 (pid(), atom(), (fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 1809 'ok'. 1810 1811run_backing_queue(QPid, Mod, Fun) -> 1812 gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}). 1813 1814-spec set_ram_duration_target(pid(), number() | 'infinity') -> 'ok'. 1815 1816set_ram_duration_target(QPid, Duration) -> 1817 gen_server2:cast(QPid, {set_ram_duration_target, Duration}). 1818 1819-spec set_maximum_since_use(pid(), non_neg_integer()) -> 'ok'. 1820 1821set_maximum_since_use(QPid, Age) -> 1822 gen_server2:cast(QPid, {set_maximum_since_use, Age}). 1823 1824-spec update_mirroring(pid()) -> 'ok'. 1825 1826update_mirroring(QPid) -> 1827 ok = delegate:invoke_no_result(QPid, {gen_server2, cast, [update_mirroring]}). 1828 1829-spec sync_mirrors(amqqueue:amqqueue() | pid()) -> 1830 'ok' | rabbit_types:error('not_mirrored'). 1831 1832sync_mirrors(Q) when ?is_amqqueue(Q) -> 1833 QPid = amqqueue:get_pid(Q), 1834 delegate:invoke(QPid, {gen_server2, call, [sync_mirrors, infinity]}); 1835sync_mirrors(QPid) -> 1836 delegate:invoke(QPid, {gen_server2, call, [sync_mirrors, infinity]}). 1837 1838-spec cancel_sync_mirrors(amqqueue:amqqueue() | pid()) -> 1839 'ok' | {'ok', 'not_syncing'}. 1840 1841cancel_sync_mirrors(Q) when ?is_amqqueue(Q) -> 1842 QPid = amqqueue:get_pid(Q), 1843 delegate:invoke(QPid, {gen_server2, call, [cancel_sync_mirrors, infinity]}); 1844cancel_sync_mirrors(QPid) -> 1845 delegate:invoke(QPid, {gen_server2, call, [cancel_sync_mirrors, infinity]}). 1846 1847-spec is_replicated(amqqueue:amqqueue()) -> boolean(). 1848 1849is_replicated(Q) when ?amqqueue_is_classic(Q) -> 1850 rabbit_mirror_queue_misc:is_mirrored(Q); 1851is_replicated(_Q) -> 1852 %% streams and quorum queues are all replicated 1853 true. 1854 1855is_exclusive(Q) when ?amqqueue_exclusive_owner_is(Q, none) -> 1856 false; 1857is_exclusive(Q) when ?amqqueue_exclusive_owner_is_pid(Q) -> 1858 true. 1859 1860is_not_exclusive(Q) -> 1861 not is_exclusive(Q). 1862 1863is_dead_exclusive(Q) when ?amqqueue_exclusive_owner_is(Q, none) -> 1864 false; 1865is_dead_exclusive(Q) when ?amqqueue_exclusive_owner_is_pid(Q) -> 1866 Pid = amqqueue:get_pid(Q), 1867 not rabbit_mnesia:is_process_alive(Pid). 1868 1869-spec has_synchronised_mirrors_online(amqqueue:amqqueue()) -> boolean(). 1870has_synchronised_mirrors_online(Q) -> 1871 %% a queue with all mirrors down would have no mirror pids. 1872 %% We treat these as in sync intentionally to avoid false positives. 1873 MirrorPids = amqqueue:get_sync_slave_pids(Q), 1874 MirrorPids =/= [] andalso lists:any(fun rabbit_misc:is_process_alive/1, MirrorPids). 1875 1876-spec on_node_up(node()) -> 'ok'. 1877 1878on_node_up(Node) -> 1879 ok = rabbit_misc:execute_mnesia_transaction( 1880 fun () -> 1881 Qs = mnesia:match_object(rabbit_queue, 1882 amqqueue:pattern_match_all(), write), 1883 [maybe_clear_recoverable_node(Node, Q) || Q <- Qs], 1884 ok 1885 end). 1886 1887maybe_clear_recoverable_node(Node, Q) -> 1888 SPids = amqqueue:get_sync_slave_pids(Q), 1889 RSs = amqqueue:get_recoverable_slaves(Q), 1890 case lists:member(Node, RSs) of 1891 true -> 1892 %% There is a race with 1893 %% rabbit_mirror_queue_slave:record_synchronised/1 called 1894 %% by the incoming mirror node and this function, called 1895 %% by the master node. If this function is executed after 1896 %% record_synchronised/1, the node is erroneously removed 1897 %% from the recoverable mirrors list. 1898 %% 1899 %% We check if the mirror node's queue PID is alive. If it is 1900 %% the case, then this function is executed after. In this 1901 %% situation, we don't touch the queue record, it is already 1902 %% correct. 1903 DoClearNode = 1904 case [SP || SP <- SPids, node(SP) =:= Node] of 1905 [SPid] -> not rabbit_misc:is_process_alive(SPid); 1906 _ -> true 1907 end, 1908 if 1909 DoClearNode -> RSs1 = RSs -- [Node], 1910 store_queue( 1911 amqqueue:set_recoverable_slaves(Q, RSs1)); 1912 true -> ok 1913 end; 1914 false -> 1915 ok 1916 end. 1917 1918-spec on_node_down(node()) -> 'ok'. 1919 1920on_node_down(Node) -> 1921 {Time, {QueueNames, QueueDeletions}} = timer:tc(fun() -> delete_queues_on_node_down(Node) end), 1922 case length(QueueNames) of 1923 0 -> ok; 1924 _ -> rabbit_log:info("~p transient queues from an old incarnation of node ~p deleted in ~fs", [length(QueueNames), Node, Time/1000000]) 1925 end, 1926 notify_queue_binding_deletions(QueueDeletions), 1927 rabbit_core_metrics:queues_deleted(QueueNames), 1928 notify_queues_deleted(QueueNames), 1929 ok. 1930 1931delete_queues_on_node_down(Node) -> 1932 lists:unzip(lists:flatten([ 1933 rabbit_misc:execute_mnesia_transaction( 1934 fun () -> [{Queue, delete_queue(Queue)} || Queue <- Queues] end 1935 ) || Queues <- partition_queues(queues_to_delete_when_node_down(Node)) 1936 ])). 1937 1938delete_queue(QueueName) -> 1939 ok = mnesia:delete({rabbit_queue, QueueName}), 1940 rabbit_binding:remove_transient_for_destination(QueueName). 1941 1942% If there are many queues and we delete them all in a single Mnesia transaction, 1943% this can block all other Mnesia operations for a really long time. 1944% In situations where a node wants to (re-)join a cluster, 1945% Mnesia won't be able to sync on the new node until this operation finishes. 1946% As a result, we want to have multiple Mnesia transactions so that other 1947% operations can make progress in between these queue delete transactions. 1948% 1949% 10 queues per Mnesia transaction is an arbitrary number, but it seems to work OK with 50k queues per node. 1950partition_queues([Q0,Q1,Q2,Q3,Q4,Q5,Q6,Q7,Q8,Q9 | T]) -> 1951 [[Q0,Q1,Q2,Q3,Q4,Q5,Q6,Q7,Q8,Q9] | partition_queues(T)]; 1952partition_queues(T) -> 1953 [T]. 1954 1955queues_to_delete_when_node_down(NodeDown) -> 1956 rabbit_misc:execute_mnesia_transaction(fun () -> 1957 qlc:e(qlc:q([amqqueue:get_name(Q) || 1958 Q <- mnesia:table(rabbit_queue), 1959 amqqueue:qnode(Q) == NodeDown andalso 1960 not rabbit_mnesia:is_process_alive(amqqueue:get_pid(Q)) andalso 1961 (not rabbit_amqqueue:is_replicated(Q) orelse 1962 rabbit_amqqueue:is_dead_exclusive(Q))] 1963 )) 1964 end). 1965 1966notify_queue_binding_deletions(QueueDeletions) -> 1967 rabbit_misc:execute_mnesia_tx_with_tail( 1968 fun() -> 1969 rabbit_binding:process_deletions( 1970 lists:foldl( 1971 fun rabbit_binding:combine_deletions/2, 1972 rabbit_binding:new_deletions(), 1973 QueueDeletions 1974 ), 1975 ?INTERNAL_USER 1976 ) 1977 end 1978 ). 1979 1980notify_queues_deleted(QueueDeletions) -> 1981 lists:foreach( 1982 fun(Queue) -> 1983 ok = rabbit_event:notify(queue_deleted, 1984 [{name, Queue}, 1985 {user, ?INTERNAL_USER}]) 1986 end, 1987 QueueDeletions). 1988 1989-spec pseudo_queue(name(), pid()) -> amqqueue:amqqueue(). 1990 1991pseudo_queue(QueueName, Pid) -> 1992 pseudo_queue(QueueName, Pid, false). 1993 1994-spec pseudo_queue(name(), pid(), boolean()) -> amqqueue:amqqueue(). 1995 1996pseudo_queue(#resource{kind = queue} = QueueName, Pid, Durable) 1997 when is_pid(Pid) andalso 1998 is_boolean(Durable) -> 1999 amqqueue:new(QueueName, 2000 Pid, 2001 Durable, 2002 false, 2003 none, % Owner, 2004 [], 2005 undefined, % VHost, 2006 #{user => undefined}, % ActingUser 2007 rabbit_classic_queue % Type 2008 ). 2009 2010-spec immutable(amqqueue:amqqueue()) -> amqqueue:amqqueue(). 2011 2012immutable(Q) -> amqqueue:set_immutable(Q). 2013 2014-spec deliver([amqqueue:amqqueue()], rabbit_types:delivery()) -> 'ok'. 2015 2016deliver(Qs, Delivery) -> 2017 _ = rabbit_queue_type:deliver(Qs, Delivery, stateless), 2018 ok. 2019 2020get_quorum_nodes(Q) -> 2021 case amqqueue:get_type_state(Q) of 2022 #{nodes := Nodes} -> 2023 Nodes; 2024 _ -> 2025 [] 2026 end. 2027