1%% This Source Code Form is subject to the terms of the Mozilla Public 2%% License, v. 2.0. If a copy of the MPL was not distributed with this 3%% file, You can obtain one at https://mozilla.org/MPL/2.0/. 4%% 5%% Copyright (c) 2018-2021 VMware, Inc. or its affiliates. All rights reserved. 6%% 7 8-module(amqqueue). %% Could become amqqueue_v2 in the future. 9 10-include_lib("rabbit_common/include/rabbit.hrl"). 11-include("amqqueue.hrl"). 12 13-export([new/8, 14 new/9, 15 new_with_version/9, 16 new_with_version/10, 17 fields/0, 18 fields/1, 19 field_vhost/0, 20 record_version_to_use/0, 21 upgrade/1, 22 upgrade_to/2, 23 % arguments 24 get_arguments/1, 25 set_arguments/2, 26 % decorators 27 get_decorators/1, 28 set_decorators/2, 29 % exclusive_owner 30 get_exclusive_owner/1, 31 % gm_pids 32 get_gm_pids/1, 33 set_gm_pids/2, 34 get_leader/1, 35 % name (#resource) 36 get_name/1, 37 set_name/2, 38 % operator_policy 39 get_operator_policy/1, 40 set_operator_policy/2, 41 % options 42 get_options/1, 43 set_options/2, 44 % pid 45 get_pid/1, 46 set_pid/2, 47 % policy 48 get_policy/1, 49 set_policy/2, 50 % policy_version 51 get_policy_version/1, 52 set_policy_version/2, 53 % type_state 54 get_type_state/1, 55 set_type_state/2, 56 % recoverable_slaves 57 get_recoverable_slaves/1, 58 set_recoverable_slaves/2, 59 % slave_pids 60 get_slave_pids/1, 61 set_slave_pids/2, 62 % slave_pids_pending_shutdown 63 get_slave_pids_pending_shutdown/1, 64 set_slave_pids_pending_shutdown/2, 65 % state 66 get_state/1, 67 set_state/2, 68 % sync_slave_pids 69 get_sync_slave_pids/1, 70 set_sync_slave_pids/2, 71 get_type/1, 72 get_vhost/1, 73 is_amqqueue/1, 74 is_auto_delete/1, 75 is_durable/1, 76 is_classic/1, 77 is_quorum/1, 78 pattern_match_all/0, 79 pattern_match_on_name/1, 80 pattern_match_on_type/1, 81 reset_mirroring_and_decorators/1, 82 set_immutable/1, 83 qnode/1, 84 macros/0]). 85 86-define(record_version, amqqueue_v2). 87-define(is_backwards_compat_classic(T), 88 (T =:= classic orelse T =:= ?amqqueue_v1_type)). 89 90-record(amqqueue, { 91 name :: rabbit_amqqueue:name() | '_', %% immutable 92 durable :: boolean() | '_', %% immutable 93 auto_delete :: boolean() | '_', %% immutable 94 exclusive_owner = none :: pid() | none | '_', %% immutable 95 arguments = [] :: rabbit_framing:amqp_table() | '_', %% immutable 96 pid :: pid() | ra_server_id() | none | '_', %% durable (just so we 97 %% know home node) 98 slave_pids = [] :: [pid()] | none | '_', %% transient 99 sync_slave_pids = [] :: [pid()] | none| '_',%% transient 100 recoverable_slaves = [] :: [atom()] | none | '_', %% durable 101 policy :: binary() | none | undefined | '_', %% durable, implicit 102 %% update as above 103 operator_policy :: binary() | none | undefined | '_', %% durable, 104 %% implicit 105 %% update 106 %% as above 107 gm_pids = [] :: [{pid(), pid()}] | none | '_', %% transient 108 decorators :: [atom()] | none | undefined | '_', %% transient, 109 %% recalculated 110 %% as above 111 state = live :: atom() | none | '_', %% durable (have we crashed?) 112 policy_version = 0 :: non_neg_integer() | '_', 113 slave_pids_pending_shutdown = [] :: [pid()] | '_', 114 vhost :: rabbit_types:vhost() | undefined | '_', %% secondary index 115 options = #{} :: map() | '_', 116 type = ?amqqueue_v1_type :: module() | '_', 117 type_state = #{} :: map() | '_' 118 }). 119 120-type amqqueue() :: amqqueue_v1:amqqueue_v1() | amqqueue_v2(). 121-type amqqueue_v2() :: #amqqueue{ 122 name :: rabbit_amqqueue:name(), 123 durable :: boolean(), 124 auto_delete :: boolean(), 125 exclusive_owner :: pid() | none, 126 arguments :: rabbit_framing:amqp_table(), 127 pid :: pid() | ra_server_id() | none, 128 slave_pids :: [pid()] | none, 129 sync_slave_pids :: [pid()] | none, 130 recoverable_slaves :: [atom()] | none, 131 policy :: binary() | none | undefined, 132 operator_policy :: binary() | none | undefined, 133 gm_pids :: [{pid(), pid()}] | none, 134 decorators :: [atom()] | none | undefined, 135 state :: atom() | none, 136 policy_version :: non_neg_integer(), 137 slave_pids_pending_shutdown :: [pid()], 138 vhost :: rabbit_types:vhost() | undefined, 139 options :: map(), 140 type :: atom(), 141 type_state :: #{} 142 }. 143 144-type ra_server_id() :: {Name :: atom(), Node :: node()}. 145 146-type amqqueue_pattern() :: amqqueue_v1:amqqueue_v1_pattern() | 147 amqqueue_v2_pattern(). 148-type amqqueue_v2_pattern() :: #amqqueue{ 149 name :: rabbit_amqqueue:name() | '_', 150 durable :: '_', 151 auto_delete :: '_', 152 exclusive_owner :: '_', 153 arguments :: '_', 154 pid :: '_', 155 slave_pids :: '_', 156 sync_slave_pids :: '_', 157 recoverable_slaves :: '_', 158 policy :: '_', 159 operator_policy :: '_', 160 gm_pids :: '_', 161 decorators :: '_', 162 state :: '_', 163 policy_version :: '_', 164 slave_pids_pending_shutdown :: '_', 165 vhost :: '_', 166 options :: '_', 167 type :: atom() | '_', 168 type_state :: '_' 169 }. 170 171-export_type([amqqueue/0, 172 amqqueue_v2/0, 173 amqqueue_pattern/0, 174 amqqueue_v2_pattern/0, 175 ra_server_id/0]). 176 177-spec new(rabbit_amqqueue:name(), 178 pid() | ra_server_id() | none, 179 boolean(), 180 boolean(), 181 pid() | none, 182 rabbit_framing:amqp_table(), 183 rabbit_types:vhost() | undefined, 184 map()) -> amqqueue(). 185 186new(#resource{kind = queue} = Name, 187 Pid, 188 Durable, 189 AutoDelete, 190 Owner, 191 Args, 192 VHost, 193 Options) 194 when (is_pid(Pid) orelse is_tuple(Pid) orelse Pid =:= none) andalso 195 is_boolean(Durable) andalso 196 is_boolean(AutoDelete) andalso 197 (is_pid(Owner) orelse Owner =:= none) andalso 198 is_list(Args) andalso 199 (is_binary(VHost) orelse VHost =:= undefined) andalso 200 is_map(Options) -> 201 new(Name, 202 Pid, 203 Durable, 204 AutoDelete, 205 Owner, 206 Args, 207 VHost, 208 Options, 209 ?amqqueue_v1_type). 210 211-spec new(rabbit_amqqueue:name(), 212 pid() | ra_server_id() | none, 213 boolean(), 214 boolean(), 215 pid() | none, 216 rabbit_framing:amqp_table(), 217 rabbit_types:vhost() | undefined, 218 map(), 219 atom()) -> amqqueue(). 220 221new(#resource{kind = queue} = Name, 222 Pid, 223 Durable, 224 AutoDelete, 225 Owner, 226 Args, 227 VHost, 228 Options, 229 Type) 230 when (is_pid(Pid) orelse is_tuple(Pid) orelse Pid =:= none) andalso 231 is_boolean(Durable) andalso 232 is_boolean(AutoDelete) andalso 233 (is_pid(Owner) orelse Owner =:= none) andalso 234 is_list(Args) andalso 235 (is_binary(VHost) orelse VHost =:= undefined) andalso 236 is_map(Options) andalso 237 is_atom(Type) -> 238 case record_version_to_use() of 239 ?record_version -> 240 new_with_version( 241 ?record_version, 242 Name, 243 Pid, 244 Durable, 245 AutoDelete, 246 Owner, 247 Args, 248 VHost, 249 Options, 250 Type); 251 _ -> 252 amqqueue_v1:new( 253 Name, 254 Pid, 255 Durable, 256 AutoDelete, 257 Owner, 258 Args, 259 VHost, 260 Options, 261 Type) 262 end. 263 264-spec new_with_version 265(amqqueue_v1 | amqqueue_v2, 266 rabbit_amqqueue:name(), 267 pid() | ra_server_id() | none, 268 boolean(), 269 boolean(), 270 pid() | none, 271 rabbit_framing:amqp_table(), 272 rabbit_types:vhost() | undefined, 273 map()) -> amqqueue(). 274 275new_with_version(RecordVersion, 276 #resource{kind = queue} = Name, 277 Pid, 278 Durable, 279 AutoDelete, 280 Owner, 281 Args, 282 VHost, 283 Options) 284 when (is_pid(Pid) orelse is_tuple(Pid) orelse Pid =:= none) andalso 285 is_boolean(Durable) andalso 286 is_boolean(AutoDelete) andalso 287 (is_pid(Owner) orelse Owner =:= none) andalso 288 is_list(Args) andalso 289 (is_binary(VHost) orelse VHost =:= undefined) andalso 290 is_map(Options) -> 291 new_with_version(RecordVersion, 292 Name, 293 Pid, 294 Durable, 295 AutoDelete, 296 Owner, 297 Args, 298 VHost, 299 Options, 300 ?amqqueue_v1_type). 301 302-spec new_with_version 303(amqqueue_v1 | amqqueue_v2, 304 rabbit_amqqueue:name(), 305 pid() | ra_server_id() | none, 306 boolean(), 307 boolean(), 308 pid() | none, 309 rabbit_framing:amqp_table(), 310 rabbit_types:vhost() | undefined, 311 map(), 312 atom()) -> amqqueue(). 313 314new_with_version(?record_version, 315 #resource{kind = queue} = Name, 316 Pid, 317 Durable, 318 AutoDelete, 319 Owner, 320 Args, 321 VHost, 322 Options, 323 Type) 324 when (is_pid(Pid) orelse is_tuple(Pid) orelse Pid =:= none) andalso 325 is_boolean(Durable) andalso 326 is_boolean(AutoDelete) andalso 327 (is_pid(Owner) orelse Owner =:= none) andalso 328 is_list(Args) andalso 329 (is_binary(VHost) orelse VHost =:= undefined) andalso 330 is_map(Options) andalso 331 is_atom(Type) -> 332 #amqqueue{name = Name, 333 durable = Durable, 334 auto_delete = AutoDelete, 335 arguments = Args, 336 exclusive_owner = Owner, 337 pid = Pid, 338 vhost = VHost, 339 options = Options, 340 type = ensure_type_compat(Type)}; 341new_with_version(Version, 342 Name, 343 Pid, 344 Durable, 345 AutoDelete, 346 Owner, 347 Args, 348 VHost, 349 Options, 350 Type) 351 when ?is_backwards_compat_classic(Type) -> 352 amqqueue_v1:new_with_version( 353 Version, 354 Name, 355 Pid, 356 Durable, 357 AutoDelete, 358 Owner, 359 Args, 360 VHost, 361 Options). 362 363-spec is_amqqueue(any()) -> boolean(). 364 365is_amqqueue(#amqqueue{}) -> true; 366is_amqqueue(Queue) -> amqqueue_v1:is_amqqueue(Queue). 367 368-spec record_version_to_use() -> amqqueue_v1 | amqqueue_v2. 369 370record_version_to_use() -> 371 case rabbit_feature_flags:is_enabled(quorum_queue) of 372 true -> ?record_version; 373 false -> amqqueue_v1:record_version_to_use() 374 end. 375 376-spec upgrade(amqqueue()) -> amqqueue(). 377 378upgrade(#amqqueue{} = Queue) -> Queue; 379upgrade(OldQueue) -> upgrade_to(record_version_to_use(), OldQueue). 380 381-spec upgrade_to 382(amqqueue_v2, amqqueue()) -> amqqueue_v2(); 383(amqqueue_v1, amqqueue_v1:amqqueue_v1()) -> amqqueue_v1:amqqueue_v1(). 384 385upgrade_to(?record_version, #amqqueue{} = Queue) -> 386 Queue; 387upgrade_to(?record_version, OldQueue) -> 388 Fields = erlang:tuple_to_list(OldQueue) ++ [?amqqueue_v1_type, 389 undefined], 390 #amqqueue{} = erlang:list_to_tuple(Fields); 391upgrade_to(Version, OldQueue) -> 392 amqqueue_v1:upgrade_to(Version, OldQueue). 393 394% arguments 395 396-spec get_arguments(amqqueue()) -> rabbit_framing:amqp_table(). 397 398get_arguments(#amqqueue{arguments = Args}) -> 399 Args; 400get_arguments(Queue) -> 401 amqqueue_v1:get_arguments(Queue). 402 403-spec set_arguments(amqqueue(), rabbit_framing:amqp_table()) -> amqqueue(). 404 405set_arguments(#amqqueue{} = Queue, Args) -> 406 Queue#amqqueue{arguments = Args}; 407set_arguments(Queue, Args) -> 408 amqqueue_v1:set_arguments(Queue, Args). 409 410% decorators 411 412-spec get_decorators(amqqueue()) -> [atom()] | none | undefined. 413 414get_decorators(#amqqueue{decorators = Decorators}) -> 415 Decorators; 416get_decorators(Queue) -> 417 amqqueue_v1:get_decorators(Queue). 418 419-spec set_decorators(amqqueue(), [atom()] | none | undefined) -> amqqueue(). 420 421set_decorators(#amqqueue{} = Queue, Decorators) -> 422 Queue#amqqueue{decorators = Decorators}; 423set_decorators(Queue, Decorators) -> 424 amqqueue_v1:set_decorators(Queue, Decorators). 425 426-spec get_exclusive_owner(amqqueue()) -> pid() | none. 427 428get_exclusive_owner(#amqqueue{exclusive_owner = Owner}) -> 429 Owner; 430get_exclusive_owner(Queue) -> 431 amqqueue_v1:get_exclusive_owner(Queue). 432 433% gm_pids 434 435-spec get_gm_pids(amqqueue()) -> [{pid(), pid()}] | none. 436 437get_gm_pids(#amqqueue{gm_pids = GMPids}) -> 438 GMPids; 439get_gm_pids(Queue) -> 440 amqqueue_v1:get_gm_pids(Queue). 441 442-spec set_gm_pids(amqqueue(), [{pid(), pid()}] | none) -> amqqueue(). 443 444set_gm_pids(#amqqueue{} = Queue, GMPids) -> 445 Queue#amqqueue{gm_pids = GMPids}; 446set_gm_pids(Queue, GMPids) -> 447 amqqueue_v1:set_gm_pids(Queue, GMPids). 448 449-spec get_leader(amqqueue_v2()) -> node(). 450 451get_leader(#amqqueue{type = rabbit_quorum_queue, pid = {_, Leader}}) -> Leader. 452 453% operator_policy 454 455-spec get_operator_policy(amqqueue()) -> binary() | none | undefined. 456 457get_operator_policy(#amqqueue{operator_policy = OpPolicy}) -> OpPolicy; 458get_operator_policy(Queue) -> amqqueue_v1:get_operator_policy(Queue). 459 460-spec set_operator_policy(amqqueue(), binary() | none | undefined) -> 461 amqqueue(). 462 463set_operator_policy(#amqqueue{} = Queue, Policy) -> 464 Queue#amqqueue{operator_policy = Policy}; 465set_operator_policy(Queue, Policy) -> 466 amqqueue_v1:set_operator_policy(Queue, Policy). 467 468% name 469 470-spec get_name(amqqueue()) -> rabbit_amqqueue:name(). 471 472get_name(#amqqueue{name = Name}) -> Name; 473get_name(Queue) -> amqqueue_v1:get_name(Queue). 474 475-spec set_name(amqqueue(), rabbit_amqqueue:name()) -> amqqueue(). 476 477set_name(#amqqueue{} = Queue, Name) -> 478 Queue#amqqueue{name = Name}; 479set_name(Queue, Name) -> 480 amqqueue_v1:set_name(Queue, Name). 481 482-spec get_options(amqqueue()) -> map(). 483 484get_options(#amqqueue{options = Options}) -> Options; 485get_options(Queue) -> amqqueue_v1:get_options(Queue). 486 487-spec set_options(amqqueue(), map()) -> amqqueue(). 488 489set_options(#amqqueue{} = Queue, Options) -> 490 Queue#amqqueue{options = Options}; 491set_options(Queue, Options) -> 492 amqqueue_v1:set_options(Queue, Options). 493 494% pid 495 496-spec get_pid 497(amqqueue_v2()) -> pid() | ra_server_id() | none; 498(amqqueue_v1:amqqueue_v1()) -> pid() | none. 499 500get_pid(#amqqueue{pid = Pid}) -> Pid; 501get_pid(Queue) -> amqqueue_v1:get_pid(Queue). 502 503-spec set_pid 504(amqqueue_v2(), pid() | ra_server_id() | none) -> amqqueue_v2(); 505(amqqueue_v1:amqqueue_v1(), pid() | none) -> amqqueue_v1:amqqueue_v1(). 506 507set_pid(#amqqueue{} = Queue, Pid) -> 508 Queue#amqqueue{pid = Pid}; 509set_pid(Queue, Pid) -> 510 amqqueue_v1:set_pid(Queue, Pid). 511 512% policy 513 514-spec get_policy(amqqueue()) -> proplists:proplist() | none | undefined. 515 516get_policy(#amqqueue{policy = Policy}) -> Policy; 517get_policy(Queue) -> amqqueue_v1:get_policy(Queue). 518 519-spec set_policy(amqqueue(), binary() | none | undefined) -> amqqueue(). 520 521set_policy(#amqqueue{} = Queue, Policy) -> 522 Queue#amqqueue{policy = Policy}; 523set_policy(Queue, Policy) -> 524 amqqueue_v1:set_policy(Queue, Policy). 525 526% policy_version 527 528-spec get_policy_version(amqqueue()) -> non_neg_integer(). 529 530get_policy_version(#amqqueue{policy_version = PV}) -> 531 PV; 532get_policy_version(Queue) -> 533 amqqueue_v1:get_policy_version(Queue). 534 535-spec set_policy_version(amqqueue(), non_neg_integer()) -> amqqueue(). 536 537set_policy_version(#amqqueue{} = Queue, PV) -> 538 Queue#amqqueue{policy_version = PV}; 539set_policy_version(Queue, PV) -> 540 amqqueue_v1:set_policy_version(Queue, PV). 541 542% recoverable_slaves 543 544-spec get_recoverable_slaves(amqqueue()) -> [atom()] | none. 545 546get_recoverable_slaves(#amqqueue{recoverable_slaves = Slaves}) -> 547 Slaves; 548get_recoverable_slaves(Queue) -> 549 amqqueue_v1:get_recoverable_slaves(Queue). 550 551-spec set_recoverable_slaves(amqqueue(), [atom()] | none) -> amqqueue(). 552 553set_recoverable_slaves(#amqqueue{} = Queue, Slaves) -> 554 Queue#amqqueue{recoverable_slaves = Slaves}; 555set_recoverable_slaves(Queue, Slaves) -> 556 amqqueue_v1:set_recoverable_slaves(Queue, Slaves). 557 558% type_state (new in v2) 559 560-spec get_type_state(amqqueue()) -> map(). 561get_type_state(#amqqueue{type_state = TState}) -> 562 TState; 563get_type_state(_) -> 564 #{}. 565 566-spec set_type_state(amqqueue(), map()) -> amqqueue(). 567set_type_state(#amqqueue{} = Queue, TState) -> 568 Queue#amqqueue{type_state = TState}; 569set_type_state(Queue, _TState) -> 570 Queue. 571 572% slave_pids 573 574-spec get_slave_pids(amqqueue()) -> [pid()] | none. 575 576get_slave_pids(#amqqueue{slave_pids = Slaves}) -> 577 Slaves; 578get_slave_pids(Queue) -> 579 amqqueue_v1:get_slave_pids(Queue). 580 581-spec set_slave_pids(amqqueue(), [pid()] | none) -> amqqueue(). 582 583set_slave_pids(#amqqueue{} = Queue, SlavePids) -> 584 Queue#amqqueue{slave_pids = SlavePids}; 585set_slave_pids(Queue, SlavePids) -> 586 amqqueue_v1:set_slave_pids(Queue, SlavePids). 587 588% slave_pids_pending_shutdown 589 590-spec get_slave_pids_pending_shutdown(amqqueue()) -> [pid()]. 591 592get_slave_pids_pending_shutdown( 593 #amqqueue{slave_pids_pending_shutdown = Slaves}) -> 594 Slaves; 595get_slave_pids_pending_shutdown(Queue) -> 596 amqqueue_v1:get_slave_pids_pending_shutdown(Queue). 597 598-spec set_slave_pids_pending_shutdown(amqqueue(), [pid()]) -> amqqueue(). 599 600set_slave_pids_pending_shutdown(#amqqueue{} = Queue, SlavePids) -> 601 Queue#amqqueue{slave_pids_pending_shutdown = SlavePids}; 602set_slave_pids_pending_shutdown(Queue, SlavePids) -> 603 amqqueue_v1:set_slave_pids_pending_shutdown(Queue, SlavePids). 604 605% state 606 607-spec get_state(amqqueue()) -> atom() | none. 608 609get_state(#amqqueue{state = State}) -> State; 610get_state(Queue) -> amqqueue_v1:get_state(Queue). 611 612-spec set_state(amqqueue(), atom() | none) -> amqqueue(). 613 614set_state(#amqqueue{} = Queue, State) -> 615 Queue#amqqueue{state = State}; 616set_state(Queue, State) -> 617 amqqueue_v1:set_state(Queue, State). 618 619% sync_slave_pids 620 621-spec get_sync_slave_pids(amqqueue()) -> [pid()] | none. 622 623get_sync_slave_pids(#amqqueue{sync_slave_pids = Pids}) -> 624 Pids; 625get_sync_slave_pids(Queue) -> 626 amqqueue_v1:get_sync_slave_pids(Queue). 627 628-spec set_sync_slave_pids(amqqueue(), [pid()] | none) -> amqqueue(). 629 630set_sync_slave_pids(#amqqueue{} = Queue, Pids) -> 631 Queue#amqqueue{sync_slave_pids = Pids}; 632set_sync_slave_pids(Queue, Pids) -> 633 amqqueue_v1:set_sync_slave_pids(Queue, Pids). 634 635%% New in v2. 636 637-spec get_type(amqqueue()) -> atom(). 638 639get_type(#amqqueue{type = Type}) -> Type; 640get_type(Queue) when ?is_amqqueue(Queue) -> ?amqqueue_v1_type. 641 642-spec get_vhost(amqqueue()) -> rabbit_types:vhost() | undefined. 643 644get_vhost(#amqqueue{vhost = VHost}) -> VHost; 645get_vhost(Queue) -> amqqueue_v1:get_vhost(Queue). 646 647-spec is_auto_delete(amqqueue()) -> boolean(). 648 649is_auto_delete(#amqqueue{auto_delete = AutoDelete}) -> 650 AutoDelete; 651is_auto_delete(Queue) -> 652 amqqueue_v1:is_auto_delete(Queue). 653 654-spec is_durable(amqqueue()) -> boolean(). 655 656is_durable(#amqqueue{durable = Durable}) -> Durable; 657is_durable(Queue) -> amqqueue_v1:is_durable(Queue). 658 659-spec is_classic(amqqueue()) -> boolean(). 660 661is_classic(Queue) -> 662 get_type(Queue) =:= ?amqqueue_v1_type. 663 664-spec is_quorum(amqqueue()) -> boolean(). 665 666is_quorum(Queue) -> 667 get_type(Queue) =:= rabbit_quorum_queue. 668 669fields() -> 670 case record_version_to_use() of 671 ?record_version -> fields(?record_version); 672 _ -> amqqueue_v1:fields() 673 end. 674 675fields(?record_version) -> record_info(fields, amqqueue); 676fields(Version) -> amqqueue_v1:fields(Version). 677 678field_vhost() -> 679 case record_version_to_use() of 680 ?record_version -> #amqqueue.vhost; 681 _ -> amqqueue_v1:field_vhost() 682 end. 683 684-spec pattern_match_all() -> amqqueue_pattern(). 685 686pattern_match_all() -> 687 case record_version_to_use() of 688 ?record_version -> #amqqueue{_ = '_'}; 689 _ -> amqqueue_v1:pattern_match_all() 690 end. 691 692-spec pattern_match_on_name(rabbit_amqqueue:name()) -> amqqueue_pattern(). 693 694pattern_match_on_name(Name) -> 695 case record_version_to_use() of 696 ?record_version -> #amqqueue{name = Name, _ = '_'}; 697 _ -> amqqueue_v1:pattern_match_on_name(Name) 698 end. 699 700-spec pattern_match_on_type(atom()) -> amqqueue_pattern(). 701 702pattern_match_on_type(Type) -> 703 case record_version_to_use() of 704 ?record_version -> 705 #amqqueue{type = Type, _ = '_'}; 706 _ when ?is_backwards_compat_classic(Type) -> 707 amqqueue_v1:pattern_match_all(); 708 %% FIXME: We try a pattern which should never match when the 709 %% `quorum_queue` feature flag is not enabled yet. Is there 710 %% a better solution? 711 _ -> 712 amqqueue_v1:pattern_match_on_name( 713 rabbit_misc:r(<<0>>, queue, <<0>>)) 714 end. 715 716-spec reset_mirroring_and_decorators(amqqueue()) -> amqqueue(). 717 718reset_mirroring_and_decorators(#amqqueue{} = Queue) -> 719 Queue#amqqueue{slave_pids = [], 720 sync_slave_pids = [], 721 gm_pids = [], 722 decorators = undefined}; 723reset_mirroring_and_decorators(Queue) -> 724 amqqueue_v1:reset_mirroring_and_decorators(Queue). 725 726-spec set_immutable(amqqueue()) -> amqqueue(). 727 728set_immutable(#amqqueue{} = Queue) -> 729 Queue#amqqueue{pid = none, 730 slave_pids = [], 731 sync_slave_pids = none, 732 recoverable_slaves = none, 733 gm_pids = none, 734 policy = none, 735 decorators = none, 736 state = none}; 737set_immutable(Queue) -> 738 amqqueue_v1:set_immutable(Queue). 739 740-spec qnode(amqqueue() | pid() | ra_server_id()) -> node(). 741 742qnode(Queue) when ?is_amqqueue(Queue) -> 743 QPid = get_pid(Queue), 744 qnode(QPid); 745qnode(QPid) when is_pid(QPid) -> 746 node(QPid); 747qnode({_, Node}) -> 748 Node. 749 750% private 751 752macros() -> 753 io:format( 754 "-define(is_~s(Q), is_record(Q, amqqueue, ~b)).~n~n", 755 [?record_version, record_info(size, amqqueue)]), 756 %% The field number starts at 2 because the first element is the 757 %% record name. 758 macros(record_info(fields, amqqueue), 2). 759 760macros([Field | Rest], I) -> 761 io:format( 762 "-define(~s_field_~s(Q), element(~b, Q)).~n", 763 [?record_version, Field, I]), 764 macros(Rest, I + 1); 765macros([], _) -> 766 ok. 767 768ensure_type_compat(classic) -> 769 ?amqqueue_v1_type; 770ensure_type_compat(Type) -> 771 Type. 772