1%% This Source Code Form is subject to the terms of the Mozilla Public 2%% License, v. 2.0. If a copy of the MPL was not distributed with this 3%% file, You can obtain one at https://mozilla.org/MPL/2.0/. 4%% 5%% Copyright (c) 2018-2021 VMware, Inc. or its affiliates. All rights reserved. 6%% 7 8-module(amqqueue_v1). 9 10-include_lib("rabbit_common/include/resource.hrl"). 11-include("amqqueue.hrl"). 12 13-export([new/8, 14 new/9, 15 new_with_version/9, 16 new_with_version/10, 17 fields/0, 18 fields/1, 19 field_vhost/0, 20 record_version_to_use/0, 21 upgrade/1, 22 upgrade_to/2, 23 % arguments 24 get_arguments/1, 25 set_arguments/2, 26 % decorators 27 get_decorators/1, 28 set_decorators/2, 29 % exclusive_owner 30 get_exclusive_owner/1, 31 % gm_pids 32 get_gm_pids/1, 33 set_gm_pids/2, 34 get_leader/1, 35 % name (#resource) 36 get_name/1, 37 set_name/2, 38 % operator_policy 39 get_operator_policy/1, 40 set_operator_policy/2, 41 % options 42 get_options/1, 43 set_options/2, 44 % pid 45 get_pid/1, 46 set_pid/2, 47 % policy 48 get_policy/1, 49 set_policy/2, 50 % policy_version 51 get_policy_version/1, 52 set_policy_version/2, 53 % type_state 54 get_type_state/1, 55 set_type_state/2, 56 % recoverable_slaves 57 get_recoverable_slaves/1, 58 set_recoverable_slaves/2, 59 % slave_pids 60 get_slave_pids/1, 61 set_slave_pids/2, 62 % slave_pids_pending_shutdown 63 get_slave_pids_pending_shutdown/1, 64 set_slave_pids_pending_shutdown/2, 65 % state 66 get_state/1, 67 set_state/2, 68 % sync_slave_pids 69 get_sync_slave_pids/1, 70 set_sync_slave_pids/2, 71 get_type/1, 72 get_vhost/1, 73 is_amqqueue/1, 74 is_auto_delete/1, 75 is_durable/1, 76 is_classic/1, 77 is_quorum/1, 78 pattern_match_all/0, 79 pattern_match_on_name/1, 80 pattern_match_on_type/1, 81 reset_mirroring_and_decorators/1, 82 set_immutable/1, 83 qnode/1, 84 macros/0]). 85 86-define(record_version, ?MODULE). 87-define(is_backwards_compat_classic(T), 88 (T =:= classic orelse T =:= ?amqqueue_v1_type)). 89 90-record(amqqueue, { 91 name :: rabbit_amqqueue:name() | '_', %% immutable 92 durable :: boolean() | '_', %% immutable 93 auto_delete :: boolean() | '_', %% immutable 94 exclusive_owner = none :: pid() | none | '_', %% immutable 95 arguments = [] :: rabbit_framing:amqp_table() | '_', %% immutable 96 pid :: pid() | none | '_', %% durable (just so we 97 %% know home node) 98 slave_pids = [] :: [pid()] | none | '_', %% transient 99 sync_slave_pids = [] :: [pid()] | none| '_',%% transient 100 recoverable_slaves = [] :: [atom()] | none | '_', %% durable 101 policy :: binary() | none | undefined | '_', %% durable, implicit 102 %% update as above 103 operator_policy :: binary() | none | undefined | '_', %% durable, 104 %% implicit 105 %% update 106 %% as above 107 gm_pids = [] :: [{pid(), pid()}] | none | '_', %% transient 108 decorators :: [atom()] | none | undefined | '_', %% transient, 109 %% recalculated 110 %% as above 111 state = live :: atom() | none | '_', %% durable (have we crashed?) 112 policy_version = 0 :: non_neg_integer() | '_', 113 slave_pids_pending_shutdown = [] :: [pid()] | '_', 114 vhost :: rabbit_types:vhost() | undefined | '_', %% secondary index 115 options = #{} :: map() | '_' 116 }). 117 118-type amqqueue() :: amqqueue_v1(). 119-type amqqueue_v1() :: #amqqueue{ 120 name :: rabbit_amqqueue:name(), 121 durable :: boolean(), 122 auto_delete :: boolean(), 123 exclusive_owner :: pid() | none, 124 arguments :: rabbit_framing:amqp_table(), 125 pid :: pid() | none, 126 slave_pids :: [pid()] | none, 127 sync_slave_pids :: [pid()] | none, 128 recoverable_slaves :: [atom()] | none, 129 policy :: binary() | none | undefined, 130 operator_policy :: binary() | none | undefined, 131 gm_pids :: [{pid(), pid()}] | none, 132 decorators :: [atom()] | none | undefined, 133 state :: atom() | none, 134 policy_version :: non_neg_integer(), 135 slave_pids_pending_shutdown :: [pid()], 136 vhost :: rabbit_types:vhost() | undefined, 137 options :: map() 138 }. 139 140-type amqqueue_pattern() :: amqqueue_v1_pattern(). 141-type amqqueue_v1_pattern() :: #amqqueue{ 142 name :: rabbit_amqqueue:name() | '_', 143 durable :: '_', 144 auto_delete :: '_', 145 exclusive_owner :: '_', 146 arguments :: '_', 147 pid :: '_', 148 slave_pids :: '_', 149 sync_slave_pids :: '_', 150 recoverable_slaves :: '_', 151 policy :: '_', 152 operator_policy :: '_', 153 gm_pids :: '_', 154 decorators :: '_', 155 state :: '_', 156 policy_version :: '_', 157 slave_pids_pending_shutdown :: '_', 158 vhost :: '_', 159 options :: '_' 160 }. 161 162-export_type([amqqueue/0, 163 amqqueue_v1/0, 164 amqqueue_pattern/0, 165 amqqueue_v1_pattern/0]). 166 167-spec new(rabbit_amqqueue:name(), 168 pid() | none, 169 boolean(), 170 boolean(), 171 pid() | none, 172 rabbit_framing:amqp_table(), 173 rabbit_types:vhost() | undefined, 174 map()) -> amqqueue(). 175 176new(#resource{kind = queue} = Name, 177 Pid, 178 Durable, 179 AutoDelete, 180 Owner, 181 Args, 182 VHost, 183 Options) 184 when (is_pid(Pid) orelse Pid =:= none) andalso 185 is_boolean(Durable) andalso 186 is_boolean(AutoDelete) andalso 187 (is_pid(Owner) orelse Owner =:= none) andalso 188 is_list(Args) andalso 189 (is_binary(VHost) orelse VHost =:= undefined) andalso 190 is_map(Options) -> 191 new_with_version( 192 ?record_version, 193 Name, 194 Pid, 195 Durable, 196 AutoDelete, 197 Owner, 198 Args, 199 VHost, 200 Options). 201 202-spec new(rabbit_amqqueue:name(), 203 pid() | none, 204 boolean(), 205 boolean(), 206 pid() | none, 207 rabbit_framing:amqp_table(), 208 rabbit_types:vhost() | undefined, 209 map(), 210 ?amqqueue_v1_type | classic) -> amqqueue(). 211 212new(#resource{kind = queue} = Name, 213 Pid, 214 Durable, 215 AutoDelete, 216 Owner, 217 Args, 218 VHost, 219 Options, 220 Type) 221 when (is_pid(Pid) orelse Pid =:= none) andalso 222 is_boolean(Durable) andalso 223 is_boolean(AutoDelete) andalso 224 (is_pid(Owner) orelse Owner =:= none) andalso 225 is_list(Args) andalso 226 (is_binary(VHost) orelse VHost =:= undefined) andalso 227 is_map(Options) andalso 228 ?is_backwards_compat_classic(Type) -> 229 new( 230 Name, 231 Pid, 232 Durable, 233 AutoDelete, 234 Owner, 235 Args, 236 VHost, 237 Options). 238 239-spec new_with_version(amqqueue_v1, 240 rabbit_amqqueue:name(), 241 pid() | none, 242 boolean(), 243 boolean(), 244 pid() | none, 245 rabbit_framing:amqp_table(), 246 rabbit_types:vhost() | undefined, 247 map()) -> amqqueue(). 248 249new_with_version(?record_version, 250 #resource{kind = queue} = Name, 251 Pid, 252 Durable, 253 AutoDelete, 254 Owner, 255 Args, 256 VHost, 257 Options) 258 when (is_pid(Pid) orelse Pid =:= none) andalso 259 is_boolean(Durable) andalso 260 is_boolean(AutoDelete) andalso 261 (is_pid(Owner) orelse Owner =:= none) andalso 262 is_list(Args) andalso 263 (is_binary(VHost) orelse VHost =:= undefined) andalso 264 is_map(Options) -> 265 #amqqueue{name = Name, 266 durable = Durable, 267 auto_delete = AutoDelete, 268 arguments = Args, 269 exclusive_owner = Owner, 270 pid = Pid, 271 vhost = VHost, 272 options = Options}. 273 274-spec new_with_version(amqqueue_v1, 275 rabbit_amqqueue:name(), 276 pid() | none, 277 boolean(), 278 boolean(), 279 pid() | none, 280 rabbit_framing:amqp_table(), 281 rabbit_types:vhost() | undefined, 282 map(), 283 ?amqqueue_v1_type | classic) -> amqqueue(). 284 285new_with_version(?record_version, 286 #resource{kind = queue} = Name, 287 Pid, 288 Durable, 289 AutoDelete, 290 Owner, 291 Args, 292 VHost, 293 Options, 294 Type) 295 when (is_pid(Pid) orelse Pid =:= none) andalso 296 is_boolean(Durable) andalso 297 is_boolean(AutoDelete) andalso 298 (is_pid(Owner) orelse Owner =:= none) andalso 299 is_list(Args) andalso 300 (is_binary(VHost) orelse VHost =:= undefined) andalso 301 is_map(Options) andalso 302 ?is_backwards_compat_classic(Type) -> 303 new_with_version( 304 ?record_version, 305 Name, 306 Pid, 307 Durable, 308 AutoDelete, 309 Owner, 310 Args, 311 VHost, 312 Options). 313 314-spec is_amqqueue(any()) -> boolean(). 315 316is_amqqueue(#amqqueue{}) -> true; 317is_amqqueue(_) -> false. 318 319-spec record_version_to_use() -> amqqueue_v1. 320 321record_version_to_use() -> 322 ?record_version. 323 324-spec upgrade(amqqueue()) -> amqqueue(). 325 326upgrade(#amqqueue{} = Queue) -> Queue. 327 328-spec upgrade_to(amqqueue_v1, amqqueue()) -> amqqueue(). 329 330upgrade_to(?record_version, #amqqueue{} = Queue) -> 331 Queue. 332 333% arguments 334 335-spec get_arguments(amqqueue()) -> rabbit_framing:amqp_table(). 336 337get_arguments(#amqqueue{arguments = Args}) -> Args. 338 339-spec set_arguments(amqqueue(), rabbit_framing:amqp_table()) -> amqqueue(). 340 341set_arguments(#amqqueue{} = Queue, Args) -> 342 Queue#amqqueue{arguments = Args}. 343 344% decorators 345 346-spec get_decorators(amqqueue()) -> [atom()] | none | undefined. 347 348get_decorators(#amqqueue{decorators = Decorators}) -> Decorators. 349 350-spec set_decorators(amqqueue(), [atom()] | none | undefined) -> amqqueue(). 351 352set_decorators(#amqqueue{} = Queue, Decorators) -> 353 Queue#amqqueue{decorators = Decorators}. 354 355-spec get_exclusive_owner(amqqueue()) -> pid() | none. 356 357get_exclusive_owner(#amqqueue{exclusive_owner = Owner}) -> Owner. 358 359% gm_pids 360 361-spec get_gm_pids(amqqueue()) -> [{pid(), pid()}] | none. 362 363get_gm_pids(#amqqueue{gm_pids = GMPids}) -> GMPids. 364 365-spec set_gm_pids(amqqueue(), [{pid(), pid()}] | none) -> amqqueue(). 366 367set_gm_pids(#amqqueue{} = Queue, GMPids) -> 368 Queue#amqqueue{gm_pids = GMPids}. 369 370-spec get_leader(amqqueue_v1()) -> no_return(). 371 372get_leader(_) -> throw({unsupported, ?record_version, get_leader}). 373 374% operator_policy 375 376-spec get_operator_policy(amqqueue()) -> binary() | none | undefined. 377 378get_operator_policy(#amqqueue{operator_policy = OpPolicy}) -> OpPolicy. 379 380-spec set_operator_policy(amqqueue(), binary() | none | undefined) -> 381 amqqueue(). 382 383set_operator_policy(#amqqueue{} = Queue, OpPolicy) -> 384 Queue#amqqueue{operator_policy = OpPolicy}. 385 386% name 387 388-spec get_name(amqqueue()) -> rabbit_amqqueue:name(). 389 390get_name(#amqqueue{name = Name}) -> Name. 391 392-spec set_name(amqqueue(), rabbit_amqqueue:name()) -> amqqueue(). 393 394set_name(#amqqueue{} = Queue, Name) -> 395 Queue#amqqueue{name = Name}. 396 397%% options 398 399-spec get_options(amqqueue()) -> map(). 400 401get_options(#amqqueue{options = Options}) -> Options. 402 403-spec set_options(amqqueue(), map()) -> amqqueue(). 404 405set_options(#amqqueue{} = Queue, Options) -> 406 Queue#amqqueue{options = Options}. 407 408% pid 409 410-spec get_pid 411(amqqueue_v1:amqqueue_v1()) -> pid() | none. 412 413get_pid(#amqqueue{pid = Pid}) -> Pid. 414 415-spec set_pid 416(amqqueue_v1:amqqueue_v1(), pid() | none) -> amqqueue_v1:amqqueue_v1(). 417 418set_pid(#amqqueue{} = Queue, Pid) -> 419 Queue#amqqueue{pid = Pid}. 420 421% policy 422 423-spec get_policy(amqqueue()) -> proplists:proplist() | none | undefined. 424 425get_policy(#amqqueue{policy = Policy}) -> Policy. 426 427-spec set_policy(amqqueue(), binary() | none | undefined) -> amqqueue(). 428 429set_policy(#amqqueue{} = Queue, Policy) -> 430 Queue#amqqueue{policy = Policy}. 431 432% policy_version 433 434-spec get_policy_version(amqqueue()) -> non_neg_integer(). 435 436get_policy_version(#amqqueue{policy_version = PV}) -> 437 PV. 438 439-spec set_policy_version(amqqueue(), non_neg_integer()) -> amqqueue(). 440 441set_policy_version(#amqqueue{} = Queue, PV) -> 442 Queue#amqqueue{policy_version = PV}. 443 444% recoverable_slaves 445 446-spec get_recoverable_slaves(amqqueue()) -> [atom()] | none. 447 448get_recoverable_slaves(#amqqueue{recoverable_slaves = Slaves}) -> 449 Slaves. 450 451-spec set_recoverable_slaves(amqqueue(), [atom()] | none) -> amqqueue(). 452 453set_recoverable_slaves(#amqqueue{} = Queue, Slaves) -> 454 Queue#amqqueue{recoverable_slaves = Slaves}. 455 456% type_state (new in v2) 457 458-spec get_type_state(amqqueue()) -> no_return(). 459 460get_type_state(_) -> throw({unsupported, ?record_version, get_type_state}). 461 462-spec set_type_state(amqqueue(), [node()]) -> no_return(). 463 464set_type_state(_, _) -> 465 throw({unsupported, ?record_version, set_type_state}). 466 467% slave_pids 468 469get_slave_pids(#amqqueue{slave_pids = Slaves}) -> 470 Slaves. 471 472set_slave_pids(#amqqueue{} = Queue, SlavePids) -> 473 Queue#amqqueue{slave_pids = SlavePids}. 474 475% slave_pids_pending_shutdown 476 477get_slave_pids_pending_shutdown( 478 #amqqueue{slave_pids_pending_shutdown = Slaves}) -> 479 Slaves. 480 481set_slave_pids_pending_shutdown(#amqqueue{} = Queue, SlavePids) -> 482 Queue#amqqueue{slave_pids_pending_shutdown = SlavePids}. 483 484% state 485 486-spec get_state(amqqueue()) -> atom() | none. 487 488get_state(#amqqueue{state = State}) -> State. 489 490-spec set_state(amqqueue(), atom() | none) -> amqqueue(). 491 492set_state(#amqqueue{} = Queue, State) -> 493 Queue#amqqueue{state = State}. 494 495% sync_slave_pids 496 497-spec get_sync_slave_pids(amqqueue()) -> [pid()] | none. 498 499get_sync_slave_pids(#amqqueue{sync_slave_pids = Pids}) -> 500 Pids. 501 502-spec set_sync_slave_pids(amqqueue(), [pid()] | none) -> amqqueue(). 503 504set_sync_slave_pids(#amqqueue{} = Queue, Pids) -> 505 Queue#amqqueue{sync_slave_pids = Pids}. 506 507%% New in v2. 508 509-spec get_type(amqqueue()) -> atom(). 510 511get_type(Queue) when ?is_amqqueue(Queue) -> ?amqqueue_v1_type. 512 513-spec get_vhost(amqqueue()) -> rabbit_types:vhost() | undefined. 514 515get_vhost(#amqqueue{vhost = VHost}) -> VHost. 516 517-spec is_auto_delete(amqqueue()) -> boolean(). 518 519is_auto_delete(#amqqueue{auto_delete = AutoDelete}) -> AutoDelete. 520 521-spec is_durable(amqqueue()) -> boolean(). 522 523is_durable(#amqqueue{durable = Durable}) -> Durable. 524 525-spec is_classic(amqqueue()) -> boolean(). 526 527is_classic(Queue) -> 528 get_type(Queue) =:= ?amqqueue_v1_type. 529 530-spec is_quorum(amqqueue()) -> boolean(). 531 532is_quorum(Queue) when ?is_amqqueue(Queue) -> 533 false. 534 535fields() -> fields(?record_version). 536 537fields(?record_version) -> record_info(fields, amqqueue). 538 539field_vhost() -> #amqqueue.vhost. 540 541-spec pattern_match_all() -> amqqueue_pattern(). 542 543pattern_match_all() -> #amqqueue{_ = '_'}. 544 545-spec pattern_match_on_name(rabbit_amqqueue:name()) -> 546 amqqueue_pattern(). 547 548pattern_match_on_name(Name) -> #amqqueue{name = Name, _ = '_'}. 549 550-spec pattern_match_on_type(atom()) -> no_return(). 551 552pattern_match_on_type(_) -> 553 throw({unsupported, ?record_version, pattern_match_on_type}). 554 555reset_mirroring_and_decorators(#amqqueue{} = Queue) -> 556 Queue#amqqueue{slave_pids = [], 557 sync_slave_pids = [], 558 gm_pids = [], 559 decorators = undefined}. 560 561set_immutable(#amqqueue{} = Queue) -> 562 Queue#amqqueue{pid = none, 563 slave_pids = none, 564 sync_slave_pids = none, 565 recoverable_slaves = none, 566 gm_pids = none, 567 policy = none, 568 decorators = none, 569 state = none}. 570 571-spec qnode(amqqueue() | pid()) -> node(). 572 573qnode(Queue) when ?is_amqqueue(Queue) -> 574 QPid = get_pid(Queue), 575 qnode(QPid); 576qnode(QPid) when is_pid(QPid) -> 577 node(QPid). 578 579macros() -> 580 io:format( 581 "-define(is_~s(Q), is_record(Q, amqqueue, ~b)).~n~n", 582 [?record_version, record_info(size, amqqueue)]), 583 %% The field number starts at 2 because the first element is the 584 %% record name. 585 macros(record_info(fields, amqqueue), 2). 586 587macros([Field | Rest], I) -> 588 io:format( 589 "-define(~s_field_~s(Q), element(~b, Q)).~n", 590 [?record_version, Field, I]), 591 macros(Rest, I + 1); 592macros([], _) -> 593 ok. 594